Source code for aiodocker.services

import json
from typing import Any, AsyncIterator, List, Mapping, MutableMapping, Optional, Union

from .multiplexed import multiplexed_result_list, multiplexed_result_stream
from .utils import (
    clean_filters,
    clean_map,
    clean_networks,
    compose_auth_header,
    format_env,
)


[docs]class DockerServices: def __init__(self, docker): self.docker = docker
[docs] async def list(self, *, filters: Mapping = None) -> List[Mapping]: """ Return a list of services Args: filters: a dict with a list of filters Available filters: id=<service id> label=<service label> mode=["replicated"|"global"] name=<service name> """ params = {"filters": clean_filters(filters)} response = await self.docker._query_json( "services", method="GET", params=params ) return response
[docs] async def create( self, task_template: Mapping[str, Any], *, name: str = None, labels: Optional[Mapping[str, str]] = None, mode: Mapping = None, update_config: Mapping = None, rollback_config: Mapping = None, networks: List = None, endpoint_spec: Mapping = None, auth: Optional[Union[MutableMapping, str, bytes]] = None, registry: str = None, ) -> Mapping[str, Any]: """ Create a service Args: task_template: user modifiable task configuration name: name of the service labels: user-defined key/value metadata mode: scheduling mode for the service update_config: update strategy of the service rollback_config: rollback strategy of the service networks: array of network names/IDs to attach the service to endpoint_spec: ports to expose auth: authentication information, can be a string, dict or bytes registry: used when auth is specified, it provides domain/IP of the registry without a protocol Returns: a dict with info of the created service """ if "Image" not in task_template["ContainerSpec"]: raise KeyError("Missing mandatory Image key in ContainerSpec") if auth and registry is None: raise KeyError( "When auth is specified you need to specifiy also the registry" ) # from {"key":"value"} to ["key=value"] if "Env" in task_template["ContainerSpec"]: task_template["ContainerSpec"]["Env"] = [ format_env(k, v) for k, v in task_template["ContainerSpec"]["Env"].items() ] headers = None if auth: headers = {"X-Registry-Auth": compose_auth_header(auth, registry)} config = { "TaskTemplate": task_template, "Name": name, "Labels": labels, "Mode": mode, "UpdateConfig": update_config, "RollbackConfig": rollback_config, "Networks": clean_networks(networks), "EndpointSpec": endpoint_spec, } data = json.dumps(clean_map(config)) response = await self.docker._query_json( "services/create", method="POST", data=data, headers=headers ) return response
[docs] async def update( self, service_id: str, version: str, *, image: str = None, rollback: bool = False, ) -> bool: """ Update a service. If rollback is True image will be ignored. Args: service_id: ID or name of the service. version: Version of the service that you want to update. rollback: Rollback the service to the previous service spec. Returns: True if successful. """ if image is None and rollback is False: raise ValueError("You need to specify an image.") inspect_service = await self.inspect(service_id) spec = inspect_service["Spec"] if image is not None: spec["TaskTemplate"]["ContainerSpec"]["Image"] = image params = {"version": version} if rollback is True: params["rollback"] = "previous" data = json.dumps(clean_map(spec)) await self.docker._query_json( f"services/{service_id}/update", method="POST", data=data, params=params, ) return True
[docs] async def delete(self, service_id: str) -> bool: """ Remove a service Args: service_id: ID or name of the service Returns: True if successful """ async with self.docker._query(f"services/{service_id}", method="DELETE"): return True
[docs] async def inspect(self, service_id: str) -> Mapping[str, Any]: """ Inspect a service Args: service_id: ID or name of the service Returns: a dict with info about a service """ response = await self.docker._query_json(f"services/{service_id}", method="GET") return response
[docs] def logs( self, service_id: str, *, details: bool = False, follow: bool = False, stdout: bool = False, stderr: bool = False, since: int = 0, timestamps: bool = False, is_tty: bool = False, tail: str = "all", ) -> Union[str, AsyncIterator[str]]: """ Retrieve logs of the given service Args: details: show service context and extra details provided to logs follow: return the logs as a stream. stdout: return logs from stdout stderr: return logs from stderr since: return logs since this time, as a UNIX timestamp timestamps: add timestamps to every log line is_tty: the service has a pseudo-TTY allocated tail: only return this number of log lines from the end of the logs, specify as an integer or `all` to output all log lines. """ if stdout is False and stderr is False: raise TypeError("Need one of stdout or stderr") params = { "details": details, "follow": follow, "stdout": stdout, "stderr": stderr, "since": since, "timestamps": timestamps, "tail": tail, } cm = self.docker._query( f"services/{service_id}/logs", method="GET", params=params, ) if follow: return self._logs_stream(cm, is_tty) else: return self._logs_list(cm, is_tty)
async def _logs_stream(self, cm, is_tty): async with cm as response: async for item in multiplexed_result_stream(response, is_tty=is_tty): yield item async def _logs_list(self, cm, is_tty): async with cm as response: return await multiplexed_result_list(response, is_tty=is_tty)