Source code for aiodocker.containers

import json
import shlex
import tarfile
from typing import Any, Dict, Mapping, Optional, Sequence, Tuple, Union

from multidict import MultiDict
from yarl import URL

from .exceptions import DockerContainerError, DockerError
from .execs import Exec
from .jsonstream import json_stream_list, json_stream_stream
from .logs import DockerLog
from .multiplexed import multiplexed_result_list, multiplexed_result_stream
from .stream import Stream
from .utils import identical, parse_result


[docs]class DockerContainers: def __init__(self, docker): self.docker = docker
[docs] async def list(self, **kwargs): data = await self.docker._query_json( "containers/json", method="GET", params=kwargs ) return [DockerContainer(self.docker, **x) for x in data]
[docs] async def create_or_replace(self, name, config): container = None try: container = await self.get(name) if not identical(config, container._container): running = container._container.get("State", {}).get("Running", False) if running: await container.stop() await container.delete() container = None except DockerError: pass if container is None: container = await self.create(config, name=name) return container
[docs] async def create(self, config, *, name=None): url = "containers/create" config = json.dumps(config, sort_keys=True).encode("utf-8") kwargs = {} if name: kwargs["name"] = name data = await self.docker._query_json( url, method="POST", data=config, params=kwargs ) return DockerContainer(self.docker, id=data["Id"])
[docs] async def run( self, config, *, auth: Optional[Union[Mapping, str, bytes]] = None, name: Optional[str] = None, ): """ Create and start a container. If container.start() will raise an error the exception will contain a `container_id` attribute with the id of the container. Use `auth` for specifying credentials for pulling absent image from a private registry. """ try: container = await self.create(config, name=name) except DockerError as err: # image not fount, try pulling it if err.status == 404 and "Image" in config: await self.docker.pull(config["Image"], auth=auth) container = await self.create(config, name=name) else: raise err try: await container.start() except DockerError as err: raise DockerContainerError( err.status, {"message": err.message}, container["id"] ) return container
[docs] async def get(self, container, **kwargs): data = await self.docker._query_json( f"containers/{container}/json", method="GET", params=kwargs, ) return DockerContainer(self.docker, **data)
[docs] def container(self, container_id, **kwargs): data = {"id": container_id} data.update(kwargs) return DockerContainer(self.docker, **data)
[docs] def exec(self, exec_id: str) -> Exec: """Return Exec instance for already created exec object.""" return Exec(self.docker, exec_id, None)
[docs]class DockerContainer: def __init__(self, docker, **kwargs): self.docker = docker self._container = kwargs self._id = self._container.get( "id", self._container.get("ID", self._container.get("Id")) ) self.logs = DockerLog(docker, self) @property def id(self) -> str: return self._id
[docs] def log(self, *, stdout=False, stderr=False, follow=False, **kwargs): if stdout is False and stderr is False: raise TypeError("Need one of stdout or stderr") params = {"stdout": stdout, "stderr": stderr, "follow": follow} params.update(kwargs) cm = self.docker._query( f"containers/{self._id}/logs", method="GET", params=params ) if follow: return self._logs_stream(cm) else: return self._logs_list(cm)
async def _logs_stream(self, cm): inspect_info = await self.show() is_tty = inspect_info["Config"]["Tty"] async with cm as response: async for item in multiplexed_result_stream(response, is_tty=is_tty): yield item async def _logs_list(self, cm): inspect_info = await self.show() is_tty = inspect_info["Config"]["Tty"] async with cm as response: return await multiplexed_result_list(response, is_tty=is_tty)
[docs] async def get_archive(self, path: str) -> tarfile.TarFile: async with self.docker._query( f"containers/{self._id}/archive", method="GET", params={"path": path}, ) as response: data = await parse_result(response) return data
[docs] async def put_archive(self, path, data): async with self.docker._query( f"containers/{self._id}/archive", method="PUT", data=data, headers={"content-type": "application/json"}, params={"path": path}, ) as response: data = await parse_result(response) return data
[docs] async def show(self, **kwargs): data = await self.docker._query_json( f"containers/{self._id}/json", method="GET", params=kwargs ) self._container = data return data
[docs] async def stop(self, **kwargs): async with self.docker._query( f"containers/{self._id}/stop", method="POST", params=kwargs ): pass
[docs] async def start(self, **kwargs): async with self.docker._query( f"containers/{self._id}/start", method="POST", headers={"content-type": "application/json"}, data=kwargs, ): pass
[docs] async def restart(self, timeout=None): params = {} if timeout is not None: params["t"] = timeout async with self.docker._query( f"containers/{self._id}/restart", method="POST", params=params, ): pass
[docs] async def kill(self, **kwargs): async with self.docker._query( f"containers/{self._id}/kill", method="POST", params=kwargs ): pass
[docs] async def wait(self, *, timeout=None, **kwargs): data = await self.docker._query_json( f"containers/{self._id}/wait", method="POST", params=kwargs, timeout=timeout, ) return data
[docs] async def delete(self, **kwargs): async with self.docker._query( f"containers/{self._id}", method="DELETE", params=kwargs ): pass
[docs] async def rename(self, newname): async with self.docker._query( f"containers/{self._id}/rename", method="POST", headers={"content-type": "application/json"}, params={"name": newname}, ): pass
[docs] async def websocket(self, **params): if not params: params = {"stdin": True, "stdout": True, "stderr": True, "stream": True} path = f"containers/{self._id}/attach/ws" ws = await self.docker._websocket(path, **params) return ws
[docs] def attach( self, *, stdout: bool = False, stderr: bool = False, stdin: bool = False, detach_keys: Optional[str] = None, logs: bool = False, ) -> Stream: async def setup() -> Tuple[URL, Optional[bytes], bool]: params: MultiDict[Union[str, int]] = MultiDict() if detach_keys: params.add("detachKeys", detach_keys) else: params.add("detachKeys", "") params.add("logs", int(logs)) params.add("stdin", int(stdin)) params.add("stdout", int(stdout)) params.add("stderr", int(stderr)) params.add("stream", 1) inspect_info = await self.show() return ( URL(f"containers/{self._id}/attach").with_query(params), None, inspect_info["Config"]["Tty"], ) return Stream(self.docker, setup, None)
[docs] async def port(self, private_port): if "NetworkSettings" not in self._container: await self.show() private_port = str(private_port) h_ports = None # Port settings is None when the container is running with # network_mode=host. port_settings = self._container.get("NetworkSettings", {}).get("Ports") if port_settings is None: return None if "/" in private_port: return port_settings.get(private_port) h_ports = port_settings.get(private_port + "/tcp") if h_ports is None: h_ports = port_settings.get(private_port + "/udp") return h_ports
[docs] def stats(self, *, stream=True): cm = self.docker._query( f"containers/{self._id}/stats", params={"stream": "1" if stream else "0"}, ) if stream: return self._stats_stream(cm) else: return self._stats_list(cm)
async def _stats_stream(self, cm): async with cm as response: async for item in json_stream_stream(response): yield item async def _stats_list(self, cm): async with cm as response: return await json_stream_list(response)
[docs] async def exec( self, cmd: Union[str, Sequence[str]], stdout: bool = True, stderr: bool = True, stdin: bool = False, tty: bool = False, privileged: bool = False, user: str = "", # root by default environment: Optional[Union[Mapping[str, str], Sequence[str]]] = None, workdir: Optional[str] = None, detach_keys: Optional[str] = None, ): if isinstance(cmd, str): cmd = shlex.split(cmd) if environment is None: pass elif isinstance(environment, Mapping): environment = [f"{key}={value}" for key, value in environment.items()] else: environment = list(environment) data = { "Container": self._id, "Privileged": privileged, "Tty": tty, "AttachStdin": stdin, "AttachStdout": stdout, "AttachStderr": stderr, "Cmd": cmd, "Env": environment, } if workdir is not None: data["WorkingDir"] = workdir else: data["WorkingDir"] = "" if detach_keys: data["detachKeys"] = detach_keys else: data["detachKeys"] = "" if user: data["User"] = user else: data["User"] = "" data = await self.docker._query_json( f"containers/{self._id}/exec", method="POST", data=data ) return Exec(self.docker, data["Id"], tty=tty)
[docs] async def resize(self, *, h: int, w: int) -> None: url = URL(f"containers/{self._id}/resize").with_query(h=h, w=w) await self.docker._query_json(url, method="POST")
[docs] async def commit( self, *, repository: Optional[str] = None, tag: Optional[str] = None, message: Optional[str] = None, author: Optional[str] = None, changes: Optional[Union[str, Sequence[str]]] = None, config: Optional[Dict[str, Any]] = None, pause: bool = True, ) -> Dict[str, Any]: """ Commit a container to an image. Similar to the ``docker commit`` command. """ params = {"container": self._id, "pause": pause} if repository is not None: params["repo"] = repository if tag is not None: params["tag"] = tag if message is not None: params["comment"] = message if author is not None: params["author"] = author if changes is not None: if not isinstance(changes, str): changes = "\n".join(changes) params["changes"] = changes return await self.docker._query_json( "commit", method="POST", params=params, data=config )
[docs] async def pause(self) -> None: async with self.docker._query(f"containers/{self._id}/pause", method="POST"): pass
[docs] async def unpause(self) -> None: async with self.docker._query(f"containers/{self._id}/unpause", method="POST"): pass
def __getitem__(self, key): return self._container[key] def __hasitem__(self, key): return key in self._container