from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
import re
import ssl
import sys
import warnings
from collections.abc import AsyncIterator, Mapping
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from pathlib import Path
from types import TracebackType
from typing import Any
import aiohttp
import attrs
from multidict import CIMultiDict
from yarl import URL
# Sub-API classes
from .configs import DockerConfigs
from .containers import DockerContainer, DockerContainers
from .events import DockerEvents
from .exceptions import (
DockerContextInvalidError,
DockerContextTLSError,
DockerError,
)
from .images import DockerImages
from .logs import DockerLog
from .networks import DockerNetwork, DockerNetworks
from .nodes import DockerSwarmNodes
from .secrets import DockerSecrets
from .services import DockerServices
from .swarm import DockerSwarm
from .system import DockerSystem
from .tasks import DockerTasks
from .types import SENTINEL, JSONObject, Sentinel
from .utils import _suppress_timeout_deprecation, httpize, parse_result
from .volumes import DockerVolume, DockerVolumes
__all__ = (
"Docker",
"DockerConfigs",
"DockerContainer",
"DockerContainers",
"DockerContextEndpoint",
"DockerEvents",
"DockerImages",
"DockerLog",
"DockerNetwork",
"DockerNetworks",
"DockerSecrets",
"DockerServices",
"DockerSwarm",
"DockerSwarmNodes",
"DockerSystem",
"DockerTasks",
"DockerVolume",
"DockerVolumes",
)
log = logging.getLogger(__name__)
_sock_search_paths = [
Path("/run/docker.sock"),
Path("/var/run/docker.sock"),
Path.home() / ".docker/run/docker.sock",
]
_rx_version = re.compile(r"^v\d+\.\d+$")
_rx_tcp_schemes = re.compile(r"^(tcp|http|https)://")
@attrs.define
class DockerContextEndpoint:
"""Docker context endpoint configuration.
Holds the endpoint settings from a Docker context, including
connection host and TLS configuration.
"""
host: str
context_name: str | None = None
skip_tls_verify: bool = False
tls_ca: bytes | None = None
tls_cert: bytes | None = None
tls_key: bytes | None = None
@property
def has_tls(self) -> bool:
"""Check if TLS credentials are available."""
return self.tls_ca is not None or self.tls_cert is not None
[docs]
class Docker:
"""
The Docker client as the main entrypoint to the sub-APIs such as
container, images, networks, swarm and services, etc.
You may access such sub-API collections via the attributes of the client instance,
like:
.. code-block:: python
docker = aiodocker.Docker()
await docker.containers.list()
await docker.images.pull(...)
Docker Host Resolution Precedence
----------------------------------
The Docker host is determined using the following precedence order (highest to lowest):
1. **url parameter** - Explicitly provided Docker daemon address
2. **context parameter** - Explicitly specified Docker context name
3. **DOCKER_HOST environment variable** - Falls back when no explicit context specified
4. **DOCKER_CONTEXT environment variable** - Specifies which Docker context to use
5. **currentContext from ~/.docker/config.json** - Default context set by docker CLI
6. **Socket auto-detection** - Searches common socket paths (e.g., /var/run/docker.sock)
Note: Explicit constructor parameters (``url``, ``context``) take precedence over
environment variables (``DOCKER_HOST``, ``DOCKER_CONTEXT``). This follows the principle
that explicit code configuration overrides implicit environment configuration, matching
the pattern used in the official Docker Go client.
Args:
url: The Docker daemon address as the full URL string (e.g.,
``"unix:///var/run/docker.sock"``, ``"tcp://127.0.0.1:2375"``,
``"npipe:////./pipe/docker_engine"``, ``"ssh://user@host:port"``).
Takes highest precedence when specified.
Refer to :doc:`ssh` for more details about SSH transports in the URL.
connector: Custom :class:`aiohttp.BaseConnector` implementation to establish new connections to the docker host.
If provided, it will be used instead of creating a connector based on the **url** value.
A caller-supplied connector is owned by the caller and is *not* closed by :meth:`close`.
session: Custom :class:`aiohttp.ClientSession`. If None, a new session will be
created with the connector and timeout settings.
The timeout configuration in this object is *ignored* by the **timeout** argument.
A caller-supplied session is owned by the caller and is *not* closed by :meth:`close`.
timeout: :class:`aiohttp.ClientTimeout` configuration for API requests.
If None, there is no timeout at all.
ssl_context: SSL context for HTTPS connections. If None and ``DOCKER_TLS_VERIFY``
is set, will create a context using ``DOCKER_CERT_PATH`` certificates.
api_version: Pin the Docker API version (e.g., "v1.43"). Use "auto" to
automatically detect the API version from the daemon.
context: Docker context name to use (e.g., "production", "staging").
When specified, loads the endpoint configuration from the named context
in ``~/.docker/contexts/``. Overrides all environment variables
(``DOCKER_HOST``, ``DOCKER_CONTEXT``) and config file settings.
Raises:
ValueError: Raised if the docker host cannot be determined,
if both url and connector are incompatible,
or if api_version format is invalid.
OSError: On Windows, if named pipe access fails unexpectedly.
"""
def __init__(
self,
url: str | None = None,
connector: aiohttp.BaseConnector | None = None,
session: aiohttp.ClientSession | None = None,
timeout: aiohttp.ClientTimeout | None = None,
ssl_context: ssl.SSLContext | None = None,
api_version: str = "auto",
context: str | None = None,
) -> None:
# Track ownership so close() only tears down what we created here.
# Capture the original caller intent before any defaults are filled in.
self._owns_connector = connector is None
self._owns_session = session is None
docker_host = url # rename
context_endpoint: DockerContextEndpoint | None = None
if docker_host is None:
context_endpoint = self._get_docker_context_endpoint(context)
if context_endpoint is not None:
docker_host = context_endpoint.host
if docker_host is None:
docker_host = os.environ.get("DOCKER_HOST", None)
if docker_host is None:
for sockpath in _sock_search_paths:
if sockpath.is_socket():
docker_host = "unix://" + str(sockpath)
break
if docker_host is None and sys.platform == "win32":
try:
if Path(r"\\.\pipe\docker_engine").exists():
docker_host = "npipe:////./pipe/docker_engine"
else:
# The default address used by Docker Client on Windows
docker_host = "https://127.0.0.1:2376"
except OSError as ex:
if ex.winerror == 231: # type: ignore
# All pipe instances are busy
# but the pipe definitely exists
docker_host = "npipe:////./pipe/docker_engine"
else:
raise
assert docker_host is not None
self.docker_host = docker_host
if api_version != "auto" and _rx_version.search(api_version) is None:
raise ValueError("Invalid API version format")
self.api_version = api_version
self._timeout = timeout or aiohttp.ClientTimeout()
if docker_host is None:
raise ValueError(
"Missing valid docker_host."
"Either DOCKER_HOST or local sockets are not available."
)
self._connection_info = docker_host
if connector is None:
UNIX_PRE = "unix://"
UNIX_PRE_LEN = len(UNIX_PRE)
WIN_PRE = "npipe://"
WIN_PRE_LEN = len(WIN_PRE)
SSH_PRE = "ssh://"
if _rx_tcp_schemes.search(docker_host):
# Determine SSL context: user-provided > context TLS > DOCKER_TLS_VERIFY
if ssl_context is None and context_endpoint is not None:
ssl_context = self._create_context_ssl_context(
context_endpoint, context_name=context_endpoint.context_name
)
if ssl_context is not None:
docker_host = _rx_tcp_schemes.sub("https://", docker_host)
if (
ssl_context is None
and os.environ.get("DOCKER_TLS_VERIFY", "0") == "1"
):
ssl_context = self._docker_machine_ssl_context()
docker_host = _rx_tcp_schemes.sub("https://", docker_host)
connector = aiohttp.TCPConnector(ssl=ssl_context) # type: ignore[arg-type]
self.docker_host = docker_host
elif docker_host.startswith(UNIX_PRE):
connector = aiohttp.UnixConnector(docker_host[UNIX_PRE_LEN:])
# dummy hostname for URL composition
self.docker_host = UNIX_PRE + "localhost"
elif docker_host.startswith(WIN_PRE):
connector = aiohttp.NamedPipeConnector(
docker_host[WIN_PRE_LEN:].replace("/", "\\")
)
# dummy hostname for URL composition
self.docker_host = WIN_PRE + "localhost"
elif docker_host.startswith(SSH_PRE):
from .ssh import SSHConnector
connector = SSHConnector(docker_host)
# dummy hostname for URL composition
self.docker_host = "unix://localhost"
else:
raise ValueError("Missing protocol scheme in docker_host.")
self.connector = connector
if session is None:
session = aiohttp.ClientSession(
connector=self.connector,
timeout=self._timeout,
# If the caller owns the connector but not the session, make sure
# closing our session does not also close their connector.
connector_owner=self._owns_connector,
)
self.session = session
self.events = DockerEvents(self)
self.containers = DockerContainers(self)
self.swarm = DockerSwarm(self)
self.services = DockerServices(self)
self.configs = DockerConfigs(self)
self.secrets = DockerSecrets(self)
self.tasks = DockerTasks(self)
self.images = DockerImages(self)
self.volumes = DockerVolumes(self)
self.networks = DockerNetworks(self)
self.nodes = DockerSwarmNodes(self)
self.system = DockerSystem(self)
# legacy aliases
self.pull = self.images.pull
self.push = self.images.push
async def __aenter__(self) -> Docker:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self.close()
[docs]
async def close(self) -> None:
"""Close the Docker client and release resources.
Stops the event monitoring and closes the underlying aiohttp session,
releasing all associated resources including connections.
Only the session and/or connector that this :class:`Docker` instance
created itself are closed; caller-supplied ``session`` or ``connector``
objects are left untouched. Calling this method multiple times is safe.
"""
await self.events.stop()
if self._owns_session:
await self.session.close()
self._owns_session = False
# The wrapping session owns the connector we made, so it has
# already been closed by session.close().
self._owns_connector = False
elif self._owns_connector:
# Unusual: caller provided a session but we created the connector.
await self.connector.close()
self._owns_connector = False
[docs]
async def auth(self, **credentials: Any) -> dict[str, Any]:
"""Authenticate with Docker registry.
Validates registry credentials and returns authentication information.
Args:
credentials: Registry authentication credentials.
Typically includes:
- ``username`` (str): Registry username
- ``password`` (str): Registry password
- ``serveraddress`` (str, optional): Registry server address without a protocol
If you have an identity token issued in prior, you may pass ``identitytoken`` only.
Returns:
Authentication response from the Docker daemon,
including:
- ``Status`` (str): A string message like "Login Succeeded"
- ``IdentityToken`` (str): The identity token
Raises:
DockerError: If authentication fails or credentials are invalid.
"""
response = await self._query_json("auth", "POST", data=credentials)
return response
[docs]
async def version(self) -> dict[str, Any]:
"""Get Docker daemon version information.
Retrieves version details about the Docker daemon including API version,
OS, architecture, and component versions.
Returns:
A dict containing version information with keys
like:
- ``Version`` (str): Docker version
- ``ApiVersion`` (str): API version
- ``Os`` (str): Operating system
- ``Arch`` (str): Architecture
- ``KernelVersion`` (str): Kernel version
- ``GitCommit`` (str): Git commit hash
and additional component-specific information.
Raises:
DockerError: If the request fails or daemon is unreachable.
"""
data = await self._query_json("version")
return data
def _canonicalize_url(self, path: str | URL, *, versioned_api: bool = True) -> URL:
if isinstance(path, URL):
assert not path.is_absolute()
if versioned_api:
return URL(
"{self.docker_host}/{self.api_version}/{path}".format(
self=self, path=path
)
)
else:
return URL(f"{self.docker_host}/{path}")
def _resolve_long_running_timeout(
self,
timeout: float | aiohttp.ClientTimeout | Sentinel | None,
) -> aiohttp.ClientTimeout:
"""Resolve timeout for long-running operations (logs, stats, build, etc.).
For long-running operations, defaults to infinite timeout by setting
both total and sock_read to None while preserving other timeout settings
from the client configuration.
Args:
timeout: The timeout value to resolve. If SENTINEL, returns an infinite
timeout based on the client's base timeout configuration, overriding
both total and sock_read but preserving other timeouts such as
connection establishment timeouts.
If None, returns infinite timeout. If float, returns a ClientTimeout
based on the client's base timeout configuration with its total and
sock_read timeout overridden.
Returns:
An explicit ClientTimeout configuration.
"""
if timeout is SENTINEL:
# Inherit the parent client's timeout but override total and sock_read
# because they don't make sense for long-running operations.
return attrs.evolve(self._timeout, total=None, sock_read=None)
elif timeout is None:
# Infinite timeout
return aiohttp.ClientTimeout()
elif isinstance(timeout, float):
# Inherit the parent client's timeout but override total and sock_read
# as the given value.
return attrs.evolve(self._timeout, total=timeout, sock_read=timeout)
else:
# Already a manually configured ClientTimeout, return as-is
return timeout
async def _check_version(self) -> None:
if self.api_version == "auto":
ver = await self._query_json("version", versioned_api=False)
self.api_version = "v" + str(ver["ApiVersion"])
@asynccontextmanager
async def _query(
self,
path: str | URL,
method: str = "GET",
*,
params: JSONObject | None = None,
data: Any | None = None,
headers: Mapping[str, str | int | bool] | None = None,
timeout: float | aiohttp.ClientTimeout | None | Sentinel = SENTINEL,
chunked: bool | None = None,
read_until_eof: bool = True,
versioned_api: bool = True,
) -> AsyncIterator[aiohttp.ClientResponse]:
"""
Get the response object by performing the HTTP request.
The caller is responsible to finalize the response object
via the async context manager protocol.
"""
yield await self._do_query(
path=path,
method=method,
params=params,
data=data,
headers=headers,
timeout=timeout,
chunked=chunked,
read_until_eof=read_until_eof,
versioned_api=versioned_api,
)
async def _do_query(
self,
path: str | URL,
method: str,
*,
params: JSONObject | None = None,
data: Any = None,
headers: Mapping[str, str | int | bool] | None = None,
timeout: float | aiohttp.ClientTimeout | None | Sentinel = SENTINEL,
chunked: bool | None = None,
read_until_eof: bool = True,
versioned_api: bool = True,
) -> aiohttp.ClientResponse:
if versioned_api:
await self._check_version()
url = self._canonicalize_url(path, versioned_api=versioned_api)
_headers: CIMultiDict[str | int | bool] = CIMultiDict()
if headers:
_headers.update(headers)
if "Content-Type" not in _headers:
_headers["Content-Type"] = "application/json"
# Derive from the timeout configured upon the client instance creation.
_timeout = self._timeout
match timeout:
case float():
if not _suppress_timeout_deprecation.get():
warnings.warn(
"Manually setting timeouts via float is highly discouraged. "
"Use asyncio.timeout() block or pass aiohttp.ClientTimeout instead.",
DeprecationWarning,
stacklevel=2,
)
# Set both total and sock_read consistently for float timeouts
_timeout = attrs.evolve(_timeout, total=timeout, sock_read=timeout)
case aiohttp.ClientTimeout():
# Override with the caller's decision.
_timeout = timeout
case None:
# Infinite timeout.
_timeout = aiohttp.ClientTimeout()
case Sentinel.TOKEN:
# Use the client-level config.
pass
try:
real_params = httpize(params)
real_headers = httpize(_headers)
response = await self.session.request(
method,
url,
params=real_params,
headers=real_headers,
data=data,
timeout=_timeout,
chunked=chunked,
read_until_eof=read_until_eof,
)
except asyncio.TimeoutError:
raise
except aiohttp.ClientConnectionError as exc:
raise DockerError(
900,
f"Cannot connect to Docker Engine via {self._connection_info} [{exc}]",
)
if (response.status // 100) in [4, 5]:
what = await response.read()
content_type = response.headers.get("content-type", "")
response.close()
if content_type == "application/json":
data = json.loads(what.decode("utf8"))
raise DockerError(response.status, data["message"])
else:
raise DockerError(response.status, what.decode("utf8"))
return response
async def _query_json(
self,
path: str | URL,
method: str = "GET",
*,
params: JSONObject | None = None,
data: Any | None = None,
headers: Mapping[str, str | int | bool] | None = None,
timeout: float | aiohttp.ClientTimeout | None | Sentinel = SENTINEL,
read_until_eof: bool = True,
versioned_api: bool = True,
) -> Any:
"""
A shorthand of _query() that treats the input as JSON.
"""
_headers: CIMultiDict[str | int | bool] = CIMultiDict()
if headers:
_headers.update(headers)
if "Content-Type" not in _headers:
_headers["Content-Type"] = "application/json"
if data is not None and not isinstance(data, (str, bytes)):
data = json.dumps(data)
async with self._query(
path,
method,
params=params,
data=data,
headers=_headers,
timeout=timeout,
read_until_eof=read_until_eof,
versioned_api=versioned_api,
) as response:
data = await parse_result(response)
return data
def _query_chunked_post(
self,
path: str | URL,
method: str = "POST",
*,
params: JSONObject | None = None,
data: Any | None = None,
headers: Mapping[str, str | int | bool] | None = None,
timeout: float | aiohttp.ClientTimeout | None | Sentinel = SENTINEL,
read_until_eof: bool = True,
versioned_api: bool = True,
) -> AbstractAsyncContextManager[aiohttp.ClientResponse]:
"""
A shorthand for uploading data by chunks
"""
_headers: CIMultiDict[str | int | bool] = CIMultiDict()
if headers:
_headers.update(headers)
if "Content-Type" not in _headers:
_headers["Content-Type"] = "application/octet-stream"
return self._query(
path,
method,
params=params,
data=data,
headers=headers,
timeout=timeout,
chunked=True,
read_until_eof=read_until_eof,
)
async def _websocket(
self, path: str | URL, **params: Any
) -> aiohttp.ClientWebSocketResponse:
if not params:
params = {"stdin": True, "stdout": True, "stderr": True, "stream": True}
url = self._canonicalize_url(path)
# ws_connect() does not have params arg.
url = url.with_query(httpize(params))
ws = await self.session.ws_connect(
url,
protocols=["chat"],
origin="http://localhost",
autoping=True,
autoclose=True,
)
return ws
@staticmethod
def _docker_machine_ssl_context() -> ssl.SSLContext:
"""
Create a SSLContext object using DOCKER_* env vars.
"""
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
context.set_ciphers(ssl._RESTRICTED_SERVER_CIPHERS) # type: ignore
certs_path = os.environ.get("DOCKER_CERT_PATH", None)
if certs_path is None:
raise ValueError("Cannot create ssl context, DOCKER_CERT_PATH is not set!")
certs_path2 = Path(certs_path)
context.load_verify_locations(cafile=str(certs_path2 / "ca.pem"))
context.load_cert_chain(
certfile=str(certs_path2 / "cert.pem"), keyfile=str(certs_path2 / "key.pem")
)
return context
@staticmethod
def _get_context_dir_name(context_name: str) -> str:
"""Compute the SHA256 hash used for context directory names.
Docker CLI uses SHA256 of the context name as the directory name
under ~/.docker/contexts/meta/ and ~/.docker/contexts/tls/.
"""
return hashlib.sha256(context_name.encode("utf-8")).hexdigest()
@staticmethod
def _get_docker_context_endpoint(
context_name: str | None = None,
) -> DockerContextEndpoint | None:
"""Get the Docker endpoint configuration from the current Docker context.
Resolution order:
1. context_name parameter (if provided)
2. DOCKER_CONTEXT environment variable
3. currentContext from ~/.docker/config.json
4. If context name is "default" or not found, return None
(caller should fall back to DOCKER_HOST or socket search)
Args:
context_name: Explicit context name to use. If provided, takes precedence
over environment variables and config file.
Returns:
DockerContextEndpoint with host, TLS settings, and certificates,
or None if no context is configured or the context is "default".
Raises:
DockerContextInvalidError: If the context is configured but has invalid
data (e.g., invalid JSON, missing required fields, or context
directory not found).
"""
current_context_name = context_name
if current_context_name is None:
current_context_name = os.environ.get("DOCKER_CONTEXT", None)
if current_context_name is None:
try:
docker_config_path = Path.home() / ".docker" / "config.json"
docker_config = json.loads(docker_config_path.read_bytes())
current_context_name = docker_config.get("currentContext")
except OSError:
# Config file doesn't exist - fallback to DOCKER_HOST
return None
except json.JSONDecodeError as e:
raise DockerContextInvalidError(
f"Invalid JSON in Docker config file: {e}"
)
# "default" is a virtual context that doesn't exist as files.
# It means "use DOCKER_HOST or search for local sockets".
if current_context_name is None or current_context_name == "default":
return None
# Compute the SHA256 hash of the context name to find its directory
context_dir_name = Docker._get_context_dir_name(current_context_name)
contexts_dir = Path.home() / ".docker" / "contexts"
meta_path = contexts_dir / "meta" / context_dir_name / "meta.json"
try:
context_data = json.loads(meta_path.read_bytes())
except OSError as e:
# Context directory/file doesn't exist
# If context was set via DOCKER_CONTEXT env var, this is an error
# If context was set via config.json, this is also an error since
# the config explicitly references a non-existent context
raise DockerContextInvalidError(
f"Context metadata file not found: {meta_path}",
context_name=current_context_name,
) from e
except json.JSONDecodeError as e:
raise DockerContextInvalidError(
f"Invalid JSON in context metadata file: {e}",
context_name=current_context_name,
) from e
try:
docker_endpoint = context_data["Endpoints"]["docker"]
except KeyError as e:
raise DockerContextInvalidError(
f"Missing required field in context metadata: {e}",
context_name=current_context_name,
) from e
try:
host = docker_endpoint["Host"]
except KeyError as e:
raise DockerContextInvalidError(
"Missing 'Host' field in docker endpoint configuration",
context_name=current_context_name,
) from e
skip_tls_verify = docker_endpoint.get("SkipTLSVerify", False)
# Load TLS certificates if available
tls_dir = contexts_dir / "tls" / context_dir_name / "docker"
tls_ca, tls_cert, tls_key = Docker._load_context_tls(
tls_dir, context_name=current_context_name
)
return DockerContextEndpoint(
host=host,
context_name=current_context_name,
skip_tls_verify=skip_tls_verify,
tls_ca=tls_ca,
tls_cert=tls_cert,
tls_key=tls_key,
)
@staticmethod
def _load_context_tls(
tls_dir: Path,
*,
context_name: str | None = None,
) -> tuple[bytes | None, bytes | None, bytes | None]:
"""Load TLS certificate files from a context's TLS directory.
Args:
tls_dir: Path to the TLS directory (e.g., ~/.docker/contexts/tls/{hash}/docker)
context_name: Name of the context (for error messages).
Returns:
Tuple of (ca_data, cert_data, key_data), each being bytes or None
if the file doesn't exist.
Raises:
DockerContextTLSError: If a TLS file exists but cannot be read.
"""
ca_data: bytes | None = None
cert_data: bytes | None = None
key_data: bytes | None = None
ca_path = tls_dir / "ca.pem"
if ca_path.exists():
try:
ca_data = ca_path.read_bytes()
except OSError as e:
raise DockerContextTLSError(
f"Failed to read CA certificate: {ca_path}: {e}",
context_name=context_name,
) from e
cert_path = tls_dir / "cert.pem"
if cert_path.exists():
try:
cert_data = cert_path.read_bytes()
except OSError as e:
raise DockerContextTLSError(
f"Failed to read client certificate: {cert_path}: {e}",
context_name=context_name,
) from e
key_path = tls_dir / "key.pem"
if key_path.exists():
try:
key_data = key_path.read_bytes()
except OSError as e:
raise DockerContextTLSError(
f"Failed to read private key: {key_path}: {e}",
context_name=context_name,
) from e
return ca_data, cert_data, key_data
@staticmethod
def _create_context_ssl_context(
endpoint: DockerContextEndpoint,
*,
context_name: str | None = None,
) -> ssl.SSLContext | None:
"""Create an SSL context from Docker context endpoint TLS data.
Args:
endpoint: The Docker context endpoint with TLS data.
context_name: Name of the context (for error messages).
Returns:
An ssl.SSLContext configured with the context's certificates,
or None if no TLS data is available.
Raises:
DockerContextTLSError: If the TLS certificates are invalid or
cannot be loaded into an SSL context.
"""
if not endpoint.has_tls:
return None
import tempfile
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
context.set_ciphers(ssl._RESTRICTED_SERVER_CIPHERS) # type: ignore
if endpoint.skip_tls_verify:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
else:
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
# SSL context methods require file paths, so we need to write temp files
# for the in-memory certificate data
ca_path: str | None = None
cert_path: str | None = None
key_path: str | None = None
try:
if endpoint.tls_ca is not None:
with tempfile.NamedTemporaryFile(
mode="wb", suffix=".pem", delete=False
) as ca_file:
ca_file.write(endpoint.tls_ca)
ca_path = ca_file.name
try:
context.load_verify_locations(cafile=ca_path)
except ssl.SSLError as e:
raise DockerContextTLSError(
f"Invalid CA certificate: {e}",
context_name=context_name,
) from e
if endpoint.tls_cert is not None and endpoint.tls_key is not None:
with tempfile.NamedTemporaryFile(
mode="wb", suffix=".pem", delete=False
) as cert_file:
cert_file.write(endpoint.tls_cert)
cert_path = cert_file.name
with tempfile.NamedTemporaryFile(
mode="wb", suffix=".pem", delete=False
) as key_file:
key_file.write(endpoint.tls_key)
key_path = key_file.name
try:
context.load_cert_chain(certfile=cert_path, keyfile=key_path)
except ssl.SSLError as e:
raise DockerContextTLSError(
f"Invalid client certificate or key: {e}",
context_name=context_name,
) from e
return context
finally:
# Clean up temp files
if ca_path is not None:
Path(ca_path).unlink(missing_ok=True)
if cert_path is not None:
Path(cert_path).unlink(missing_ok=True)
if key_path is not None:
Path(key_path).unlink(missing_ok=True)