Source code for aiodocker.containers

from __future__ import annotations

import json
import shlex
import tarfile
from collections.abc import AsyncIterator, Mapping, Sequence
from contextlib import AbstractAsyncContextManager
from typing import (
    TYPE_CHECKING,
    Any,
    Literal,
    overload,
)

from aiohttp import ClientResponse, ClientTimeout, ClientWebSocketResponse
from multidict import MultiDict
from yarl import URL

from .exceptions import DockerContainerError, DockerError
from .execs import Exec
from .jsonstream import json_stream_list, json_stream_stream
from .logs import DockerLog
from .multiplexed import multiplexed_result_list, multiplexed_result_stream
from .stream import Stream
from .types import SENTINEL, JSONObject, MutableJSONObject, PortInfo, Sentinel
from .utils import clean_filters, identical, parse_result


if TYPE_CHECKING:
    from .docker import Docker


[docs] class DockerContainers: def __init__(self, docker: Docker) -> None: self.docker = docker
[docs] async def list(self, **kwargs) -> list[DockerContainer]: data = await self.docker._query_json( "containers/json", method="GET", params=kwargs ) return [DockerContainer(self.docker, **x) for x in data]
[docs] async def create_or_replace( self, name: str, config: JSONObject, ) -> DockerContainer: container = None try: container = await self.get(name) if not identical(config, container._container): running = container._container.get("State", {}).get("Running", False) if running: await container.stop() await container.delete() container = None except DockerError: pass if container is None: container = await self.create(config, name=name) return container
[docs] async def create( self, config: JSONObject, *, name: str | None = None, ) -> DockerContainer: url = "containers/create" encoded_config = json.dumps(config, sort_keys=True).encode("utf-8") kwargs = {} if name: kwargs["name"] = name data = await self.docker._query_json( url, method="POST", data=encoded_config, params=kwargs ) return DockerContainer(self.docker, id=data["Id"])
[docs] async def run( self, config: JSONObject, *, auth: Mapping | str | bytes | None = None, name: str | None = None, ) -> DockerContainer: """ Create and start a container. If container.start() will raise an error the exception will contain a `container_id` attribute with the id of the container. Use `auth` for specifying credentials for pulling absent image from a private registry. """ try: container = await self.create(config, name=name) except DockerError as err: # image not found, try pulling it if err.status == 404 and "Image" in config: await self.docker.pull(str(config["Image"]), auth=auth) container = await self.create(config, name=name) else: raise err try: await container.start() except DockerError as err: raise DockerContainerError(err.status, err.message, container["id"]) return container
[docs] async def get(self, container_id: str, **kwargs) -> DockerContainer: data = await self.docker._query_json( f"containers/{container_id}/json", method="GET", params=kwargs, ) return DockerContainer(self.docker, **data)
[docs] def container(self, container_id: str, **kwargs) -> DockerContainer: data = {"id": container_id} data.update(kwargs) return DockerContainer(self.docker, **data)
[docs] def exec(self, exec_id: str) -> Exec: """Return Exec instance for already created exec object.""" return Exec(self.docker, exec_id)
[docs] async def prune( self, *, filters: Mapping[str, Any] | None = None, ) -> dict[str, Any]: """ Delete stopped containers Args: filters: Filter expressions to limit which containers are pruned. Available filters: - until: Only remove containers created before given timestamp - label: Only remove containers with (or without, if label!=<key> is used) the specified labels Returns: Dictionary containing information about deleted containers and space reclaimed """ params = {} if filters is not None: params["filters"] = clean_filters(filters) response = await self.docker._query_json( "containers/prune", "POST", params=params ) return response
[docs] class DockerContainer: _container: dict[str, Any] _id: str def __init__(self, docker: Docker, **kwargs) -> None: self.docker = docker self._container = kwargs _id = self._container.get( "id", self._container.get("ID", self._container.get("Id")) ) if _id is None: raise ValueError( "DockerContainer should be initialized with explicit container ID." ) self._id = _id self.logs = DockerLog(docker, self) @property def id(self) -> str: return self._id @overload async def log( self, *, stdout: bool = False, stderr: bool = False, follow: Literal[False] = False, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, **kwargs, ) -> list[str]: ... @overload def log( self, *, stdout: bool = False, stderr: bool = False, follow: Literal[True], timeout: float | ClientTimeout | Sentinel | None = SENTINEL, **kwargs, ) -> AsyncIterator[str]: ...
[docs] def log( self, *, stdout: bool = False, stderr: bool = False, follow: bool = False, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, **kwargs, ) -> Any: if stdout is False and stderr is False: raise TypeError("Need one of stdout or stderr") params = {"stdout": stdout, "stderr": stderr, "follow": follow} params.update(kwargs) # Default to infinite timeout for log operations timeout_config = self.docker._resolve_long_running_timeout(timeout) cm = self.docker._query( f"containers/{self._id}/logs", method="GET", params=params, timeout=timeout_config, ) if follow: return self._logs_stream(cm) else: return self._logs_list(cm)
async def _logs_stream( self, cm: AbstractAsyncContextManager[ClientResponse] ) -> AsyncIterator[str]: try: inspect_info = await self.show() except DockerError: raise is_tty = inspect_info["Config"]["Tty"] async with cm as response: async for item in multiplexed_result_stream(response, is_tty=is_tty): yield item async def _logs_list( self, cm: AbstractAsyncContextManager[ClientResponse] ) -> Sequence[str]: try: inspect_info = await self.show() except DockerError: raise is_tty = inspect_info["Config"]["Tty"] async with cm as response: return await multiplexed_result_list(response, is_tty=is_tty)
[docs] async def get_archive(self, path: str) -> tarfile.TarFile: async with self.docker._query( f"containers/{self._id}/archive", method="GET", params={"path": path}, ) as response: data = await parse_result(response) assert isinstance(data, tarfile.TarFile) return data
[docs] async def put_archive(self, path, data): async with self.docker._query( f"containers/{self._id}/archive", method="PUT", data=data, headers={"content-type": "application/json"}, params={"path": path}, ) as response: data = await parse_result(response) return data
[docs] async def show(self, **kwargs) -> dict[str, Any]: data = await self.docker._query_json( f"containers/{self._id}/json", method="GET", params=kwargs ) self._container = data return data
[docs] async def stop( self, *, t: int | None = None, signal: str | None = None, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, ) -> None: """Stop the container. Args: t: Number of seconds to wait for the container to stop before killing it. If None, uses the container's configured stop timeout (default 10 seconds). This is a Docker API parameter that controls the graceful shutdown period. signal: Signal to send to the container (e.g., "SIGTERM", "SIGKILL"). If None, uses the default SIGTERM signal. This parameter may not be supported in older Docker API versions. timeout: HTTP request timeout for the stop operation (infinite by default). This controls how long to wait for the Docker daemon to respond, not the container stop duration. """ params: dict[str, Any] = {} if t is not None: params["t"] = t if signal is not None: params["signal"] = signal # Default to infinite timeout for stop operations timeout_config = self.docker._resolve_long_running_timeout(timeout) async with self.docker._query( f"containers/{self._id}/stop", method="POST", params=params, timeout=timeout_config, ): pass
[docs] async def start(self, **kwargs) -> None: async with self.docker._query( f"containers/{self._id}/start", method="POST", headers={"content-type": "application/json"}, params=kwargs, ): pass
[docs] async def restart( self, *, t: int | None = None, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, ) -> None: """Restart the container. Args: t: Number of seconds to wait for the container to stop before killing it. If None, uses the container's configured stop timeout (default 10 seconds). This is a Docker API parameter that controls the graceful shutdown period. timeout: HTTP request timeout for the restart operation (infinite by default). This controls how long to wait for the Docker daemon to respond, not the container restart duration. """ params = {} if t is not None: params["t"] = t # Default to infinite timeout for restart operations timeout_config = self.docker._resolve_long_running_timeout(timeout) async with self.docker._query( f"containers/{self._id}/restart", method="POST", params=params, timeout=timeout_config, ): pass
[docs] async def kill( self, *, signal: str | None = None, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, ) -> None: """Kill the container by sending a signal. Args: signal: Signal to send to the container (e.g., "SIGKILL", "SIGTERM", "SIGHUP"). Can be a signal name (with or without SIG prefix) or a signal number. If None, uses the default SIGKILL signal. timeout: HTTP request timeout for the kill operation. """ params: dict[str, Any] = {} if signal is not None: params["signal"] = signal # Use standard timeout resolution timeout_config = self.docker._resolve_long_running_timeout(timeout) async with self.docker._query( f"containers/{self._id}/kill", method="POST", params=params, timeout=timeout_config, ): pass
[docs] async def wait( self, *, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, **kwargs, ) -> dict[str, Any]: # Default to infinite timeout for wait operations timeout_config = self.docker._resolve_long_running_timeout(timeout) data = await self.docker._query_json( f"containers/{self._id}/wait", method="POST", params=kwargs, timeout=timeout_config, ) return data
[docs] async def delete( self, *, force: bool = False, v: bool = False, link: bool = False, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, ) -> None: """Remove the container. Args: force: If True, kill the container before removing it (using SIGKILL). If False, the container must be stopped before it can be removed. v: If True, remove anonymous volumes associated with the container. link: If True, remove the specified link (legacy networking feature). timeout: HTTP request timeout for the delete operation. """ params: dict[str, Any] = {} if force: params["force"] = force if v: params["v"] = v if link: params["link"] = link # Use standard timeout resolution timeout_config = self.docker._resolve_long_running_timeout(timeout) async with self.docker._query( f"containers/{self._id}", method="DELETE", params=params, timeout=timeout_config, ): pass
[docs] async def rename(self, newname) -> None: async with self.docker._query( f"containers/{self._id}/rename", method="POST", headers={"content-type": "application/json"}, params={"name": newname}, ): pass
[docs] async def websocket(self, **params) -> ClientWebSocketResponse: if not params: params = {"stdin": True, "stdout": True, "stderr": True, "stream": True} path = f"containers/{self._id}/attach/ws" ws = await self.docker._websocket(path, **params) return ws
[docs] def attach( self, *, stdout: bool = False, stderr: bool = False, stdin: bool = False, detach_keys: str | None = None, logs: bool = False, timeout: ClientTimeout | Sentinel | None = SENTINEL, ) -> Stream: async def setup() -> tuple[URL, bytes | None, bool]: params: MultiDict[str | int] = MultiDict() if detach_keys: params.add("detachKeys", detach_keys) else: params.add("detachKeys", "") params.add("logs", int(logs)) params.add("stdin", int(stdin)) params.add("stdout", int(stdout)) params.add("stderr", int(stderr)) params.add("stream", 1) inspect_info = await self.show() return ( URL(f"containers/{self._id}/attach").with_query(params), None, inspect_info["Config"]["Tty"], ) # Default to infinite timeout for attach operations timeout_config = self.docker._resolve_long_running_timeout(timeout) return Stream(self.docker, setup, timeout_config)
[docs] async def port(self, private_port: int | str) -> list[PortInfo] | None: if "NetworkSettings" not in self._container: await self.show() private_port = str(private_port) h_ports = None # Port settings is None when the container is running with # network_mode=host. port_settings = self._container.get("NetworkSettings", {}).get("Ports") if port_settings is None: return None if "/" in private_port: return port_settings.get(private_port) h_ports = port_settings.get(private_port + "/tcp") if h_ports is None: h_ports = port_settings.get(private_port + "/udp") return h_ports
@overload def stats( self, *, stream: Literal[True] = True, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, ) -> AsyncIterator[dict[str, Any]]: ... @overload async def stats( self, *, stream: Literal[False], timeout: float | ClientTimeout | Sentinel | None = SENTINEL, ) -> list[dict[str, Any]]: ...
[docs] def stats( self, *, stream: bool = True, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, ) -> Any: # Default to infinite timeout for stats operations timeout_config = self.docker._resolve_long_running_timeout(timeout) cm = self.docker._query( f"containers/{self._id}/stats", params={"stream": "1" if stream else "0"}, timeout=timeout_config, ) if stream: return self._stats_stream(cm) else: return self._stats_list(cm)
async def _stats_stream(self, cm): async with cm as response: async for item in json_stream_stream(response, raise_on_error=False): yield item async def _stats_list(self, cm): async with cm as response: return await json_stream_list(response, raise_on_error=False)
[docs] async def exec( self, cmd: str | Sequence[str], stdout: bool = True, stderr: bool = True, stdin: bool = False, tty: bool = False, privileged: bool = False, user: str = "", # root by default environment: Mapping[str, str] | Sequence[str] | None = None, workdir: str | None = None, detach_keys: str | None = None, ) -> Exec: if isinstance(cmd, str): cmd = shlex.split(cmd) if environment is None: pass elif isinstance(environment, Mapping): environment = [f"{key}={value}" for key, value in environment.items()] else: environment = list(environment) data = { "Container": self._id, "Privileged": privileged, "Tty": tty, "AttachStdin": stdin, "AttachStdout": stdout, "AttachStderr": stderr, "Cmd": cmd, "Env": environment, } if workdir is not None: data["WorkingDir"] = workdir else: data["WorkingDir"] = "" if detach_keys: data["detachKeys"] = detach_keys else: data["detachKeys"] = "" if user: data["User"] = user else: data["User"] = "" data = await self.docker._query_json( f"containers/{self._id}/exec", method="POST", data=data ) return Exec(self.docker, data["Id"], tty=tty)
[docs] async def resize(self, *, h: int, w: int) -> None: url = URL(f"containers/{self._id}/resize").with_query(h=h, w=w) await self.docker._query_json(url, method="POST")
[docs] async def commit( self, *, repository: str | None = None, tag: str | None = None, message: str | None = None, author: str | None = None, changes: str | Sequence[str] | None = None, config: dict[str, Any] | None = None, pause: bool = True, timeout: float | ClientTimeout | Sentinel | None = SENTINEL, ) -> dict[str, Any]: """ Commit a container to an image. Similar to the ``docker commit`` command. """ params: MutableJSONObject = {"container": self._id, "pause": pause} if repository is not None: params["repo"] = repository if tag is not None: params["tag"] = tag if message is not None: params["comment"] = message if author is not None: params["author"] = author if changes is not None: if not isinstance(changes, str): changes = "\n".join(changes) params["changes"] = changes # Default to infinite timeout for commit operations timeout_config = self.docker._resolve_long_running_timeout(timeout) return await self.docker._query_json( "commit", method="POST", params=params, data=config, timeout=timeout_config )
[docs] async def top(self, *, ps_args: str | None = None) -> dict[str, Any]: """List processes running inside the container. Args: ps_args: Arguments passed to ``ps`` (e.g. ``"aux"``). If ``None``, the Docker daemon default is used. Returns: Dict with ``Titles`` (column names) and ``Processes`` (list of rows) keys, as returned by the Docker API. """ params: dict[str, Any] = {} if ps_args is not None: params["ps_args"] = ps_args return await self.docker._query_json( f"containers/{self._id}/top", method="GET", params=params )
[docs] async def pause(self) -> None: async with self.docker._query(f"containers/{self._id}/pause", method="POST"): pass
[docs] async def unpause(self) -> None: async with self.docker._query(f"containers/{self._id}/unpause", method="POST"): pass
def __getitem__(self, key: str) -> Any: return self._container[key] def __contains__(self, key: str) -> bool: return key in self._container