Source code for aiodocker.logs

import warnings
from collections import ChainMap
from typing import TYPE_CHECKING, Any

import aiohttp

from .channel import Channel, ChannelSubscriber


if TYPE_CHECKING:
    from .containers import DockerContainer
    from .docker import Docker


[docs]class DockerLog: def __init__(self, docker: "Docker", container: "DockerContainer") -> None: self.docker = docker self.channel = Channel() self.container = container self.response = None
[docs] def listen(self) -> ChannelSubscriber: warnings.warn( "use subscribe() method instead", DeprecationWarning, stacklevel=2 ) return self.channel.subscribe()
[docs] def subscribe(self) -> ChannelSubscriber: return self.channel.subscribe()
[docs] async def run(self, **params: Any) -> None: if self.response: warnings.warn("already running", RuntimeWarning, stackelevel=2) return forced_params = {"follow": True} default_params = {"stdout": True, "stderr": True} params2 = ChainMap(forced_params, params, default_params) try: self.response = await self.docker._query( f"containers/{self.container._id}/logs", params=params2 ) assert self.response is not None while True: msg = await self.response.content.readline() if not msg: break await self.channel.publish(msg) except (aiohttp.ClientConnectionError, aiohttp.ServerDisconnectedError): pass finally: # signal termination to subscribers await self.channel.publish(None) if self.response is not None: try: await self.response.release() except Exception: pass self.response = None
[docs] async def stop(self) -> None: if self.response: await self.response.release()