Source code for aiodocker.tasks

from typing import Any, List, Mapping

from .utils import clean_filters


[docs]class DockerTasks: def __init__(self, docker): self.docker = docker
[docs] async def list(self, *, filters: Mapping = None) -> List[Mapping]: """ Return a list of tasks Args: filters: a collection of filters Available filters: desired-state=(running | shutdown | accepted) id=<task id> label=key or label="key=value" name=<task name> node=<node id or name> service=<service name> """ params = {"filters": clean_filters(filters)} response = await self.docker._query_json("tasks", method="GET", params=params) return response
[docs] async def inspect(self, task_id: str) -> Mapping[str, Any]: """ Return info about a task Args: task_id: is ID of the task """ response = await self.docker._query_json(f"tasks/{task_id}", method="GET") return response