Source code for aiodocker.events

import asyncio
import datetime as dt
import warnings
from collections import ChainMap

from .channel import Channel
from .jsonstream import json_stream_stream


[docs]class DockerEvents: def __init__(self, docker): self.docker = docker self.channel = Channel() self.json_stream = None self.task = None
[docs] def listen(self): warnings.warn( "use subscribe() method instead", DeprecationWarning, stacklevel=2 ) return self.channel.subscribe()
[docs] def subscribe(self, *, create_task=True, **params): """Subscribes to the Docker events channel. Use the keyword argument create_task=False to prevent automatically spawning the background tasks that listen to the events. This function returns a ChannelSubscriber object. """ if create_task and not self.task: self.task = asyncio.ensure_future(self.run(**params)) return self.channel.subscribe()
def _transform_event(self, data): if "time" in data: data["time"] = dt.datetime.fromtimestamp(data["time"]) return data
[docs] async def run(self, **params): """ Query the events endpoint of the Docker daemon. Publish messages inside the asyncio queue. """ if self.json_stream: warnings.warn("already running", RuntimeWarning, stackelevel=2) return forced_params = {"stream": True} params = ChainMap(forced_params, params) try: # timeout has to be set to 0, None is not passed # Otherwise after 5 minutes the client # will close the connection # http://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientSession.request async with self.docker._query( "events", method="GET", params=params, timeout=0 ) as response: self.json_stream = json_stream_stream(response, self._transform_event) try: async for data in self.json_stream: await self.channel.publish(data) finally: if self.json_stream is not None: await self.json_stream._close() self.json_stream = None finally: # signal termination to subscribers await self.channel.publish(None)
[docs] async def stop(self): if self.json_stream is not None: await self.json_stream._close() if self.task: self.task.cancel() try: await self.task except asyncio.CancelledError: pass finally: self.task = None