Source code for aiodocker.events
from __future__ import annotations
import asyncio
import datetime as dt
import warnings
from collections import ChainMap
from .channel import Channel
from .jsonstream import json_stream_stream
from .types import SENTINEL, Sentinel
[docs]
class DockerEvents:
def __init__(self, docker):
self.docker = docker
self.channel = Channel()
self.json_stream = None
self.task = None
[docs]
def listen(self):
warnings.warn(
"use subscribe() method instead", DeprecationWarning, stacklevel=2
)
return self.channel.subscribe()
[docs]
def subscribe(self, *, create_task=True, **params):
"""Subscribes to the Docker events channel. Use the keyword argument
create_task=False to prevent automatically spawning the background
tasks that listen to the events.
This function returns a ChannelSubscriber object.
"""
if create_task and not self.task:
self.task = asyncio.ensure_future(self.run(**params))
return self.channel.subscribe()
def _transform_event(self, data):
if "time" in data:
data["time"] = dt.datetime.fromtimestamp(data["time"])
return data
[docs]
async def run(self, *, timeout: float | Sentinel | None = SENTINEL, **params):
"""
Query the events endpoint of the Docker daemon.
Publish messages inside the asyncio queue.
Args:
timeout: The timeout for the events stream (infinite by default).
"""
if self.json_stream:
warnings.warn("already running", category=RuntimeWarning, stacklevel=2)
return
forced_params = {"stream": True}
merged_params = ChainMap(forced_params, params)
# Default to infinite timeout for event streaming
timeout_config = self.docker._resolve_long_running_timeout(timeout)
try:
async with self.docker._query(
"events", method="GET", params=merged_params, timeout=timeout_config
) as response:
self.json_stream = json_stream_stream(
response, self._transform_event, raise_on_error=False
)
try:
async for data in self.json_stream:
await self.channel.publish(data)
finally:
if self.json_stream is not None:
await self.json_stream._close()
self.json_stream = None
finally:
# signal termination to subscribers
await self.channel.publish(None)
[docs]
async def stop(self):
if self.json_stream is not None:
await self.json_stream._close()
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
finally:
self.task = None