Files
fastapi_test/venv/lib/python3.11/site-packages/podman/api/client.py
2022-11-27 19:11:46 +01:00

416 lines
14 KiB
Python

"""APIClient for connecting to Podman service."""
import json
import urllib.parse
from typing import Any, ClassVar, IO, Iterable, List, Mapping, Optional, Tuple, Type, Union
import requests
from requests.adapters import HTTPAdapter
from podman import api
from podman.api.ssh import SSHAdapter
from podman.api.uds import UDSAdapter
from podman.errors import APIError, NotFound
from podman.tlsconfig import TLSConfig
from podman.version import __version__
_Data = Union[
None,
str,
bytes,
Mapping[str, Any],
Iterable[Tuple[str, Optional[str]]],
IO,
]
"""Type alias for request data parameter."""
_Timeout = Union[None, float, Tuple[float, float], Tuple[float, None]]
"""Type alias for request timeout parameter."""
class APIResponse:
"""APIResponse proxy requests.Response objects.
Override raise_for_status() to implement Podman API binding errors.
All other methods and attributes forwarded to original Response.
"""
def __init__(self, response: requests.Response):
"""Initialize APIResponse.
Args:
response: the requests.Response to provide implementation
"""
self._response = response
def __getattr__(self, item: str):
"""Forward any query for an attribute not defined in this proxy class to wrapped class."""
return getattr(self._response, item)
def raise_for_status(self, not_found: Type[APIError] = NotFound) -> None:
"""Raises exception when Podman service reports one."""
if self.status_code < 400:
return
try:
body = self.json()
cause = body["cause"]
message = body["message"]
except (json.decoder.JSONDecodeError, KeyError):
cause = message = self.text
if self.status_code == requests.codes.not_found:
raise not_found(cause, response=self._response, explanation=message)
raise APIError(cause, response=self._response, explanation=message)
class APIClient(requests.Session):
"""Client for Podman service API."""
# Abstract methods (delete,get,head,post) are specialized and pylint cannot walk hierarchy.
# pylint: disable=too-many-instance-attributes,arguments-differ,arguments-renamed
supported_schemes: ClassVar[List[str]] = (
"unix",
"http+unix",
"ssh",
"http+ssh",
"tcp",
"http",
)
def __init__(
self,
base_url: str = None,
version: Optional[str] = None,
timeout: Optional[float] = None,
tls: Union[TLSConfig, bool] = False,
user_agent: Optional[str] = None,
num_pools: Optional[int] = None,
credstore_env: Optional[Mapping[str, str]] = None,
use_ssh_client=True,
max_pools_size=None,
**kwargs,
): # pylint: disable=unused-argument
"""Instantiate APIClient object.
Args:
base_url: Address to use for connecting to Podman service.
version: Override version prefix for Podman resource URLs.
timeout: Time in seconds to allow for Podman service operation.
tls: Configuration for TLS connections.
user_agent: Override User-Agent HTTP header.
num_pools: The number of connection pools to cache.
credstore_env: Environment for storing credentials.
use_ssh_client: Use system ssh agent rather than ssh module. Always, True.
max_pool_size: Override number of connections pools to maintain.
Default: requests.adapters.DEFAULT_POOLSIZE
Keyword Args:
compatible_version (str): Override version prefix for compatible resource URLs.
identity (str): Provide SSH key to authenticate SSH connection.
Raises:
ValueError: when a parameter is incorrect
"""
super().__init__()
self.base_url = self._normalize_url(base_url)
adapter_kwargs = kwargs.copy()
if num_pools is not None:
adapter_kwargs["pool_connections"] = num_pools
if max_pools_size is not None:
adapter_kwargs["pool_maxsize"] = max_pools_size
if timeout is not None:
adapter_kwargs["timeout"] = timeout
if self.base_url.scheme == "http+unix":
self.mount("http://", UDSAdapter(self.base_url.geturl(), **adapter_kwargs))
self.mount("https://", UDSAdapter(self.base_url.geturl(), **adapter_kwargs))
elif self.base_url.scheme == "http+ssh":
self.mount("http://", SSHAdapter(self.base_url.geturl(), **adapter_kwargs))
self.mount("https://", SSHAdapter(self.base_url.geturl(), **adapter_kwargs))
elif self.base_url.scheme == "http":
self.mount("http://", HTTPAdapter(**adapter_kwargs))
self.mount("https://", HTTPAdapter(**adapter_kwargs))
else:
assert False, "APIClient.supported_schemes changed without adding a branch here."
self.version = version or api.VERSION
self.path_prefix = f"/v{self.version}/libpod/"
self.compatible_version = kwargs.get("compatible_version", api.COMPATIBLE_VERSION)
self.compatible_prefix = f"/v{self.compatible_version}/"
self.timeout = timeout
self.pool_maxsize = num_pools or requests.adapters.DEFAULT_POOLSIZE
self.credstore_env = credstore_env or {}
self.user_agent = user_agent or (
f"PodmanPy/{__version__} (API v{self.version}; Compatible v{self.compatible_version})"
)
self.headers.update({"User-Agent": self.user_agent})
@staticmethod
def _normalize_url(base_url: str) -> urllib.parse.ParseResult:
uri = urllib.parse.urlparse(base_url)
if uri.scheme not in APIClient.supported_schemes:
raise ValueError(
f"The scheme '{uri.scheme}' must be one of {APIClient.supported_schemes}"
)
# Normalize URL scheme, needs to match up with adapter mounts
if uri.scheme == "unix":
uri = uri._replace(scheme="http+unix")
elif uri.scheme == "ssh":
uri = uri._replace(scheme="http+ssh")
elif uri.scheme == "tcp":
uri = uri._replace(scheme="http")
# Normalize URL netloc, needs to match up with transport adapters expectations
if uri.netloc == "":
uri = uri._replace(netloc=uri.path)._replace(path="")
if "/" in uri.netloc:
uri = uri._replace(netloc=urllib.parse.quote_plus(uri.netloc))
return uri
def delete(
self,
path: Union[str, bytes],
params: Union[None, bytes, Mapping[str, str]] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: _Timeout = None,
stream: Optional[bool] = False,
**kwargs,
) -> APIResponse:
"""HTTP DELETE operation against configured Podman service.
Args:
path: Relative path to RESTful resource.
params: Optional parameters to include with URL.
headers: Optional headers to include in request.
timeout: Number of seconds to wait on request, or (connect timeout, read timeout) tuple
stream: Return iterator for content vs reading all content into memory
Keyword Args:
compatible: Will override the default path prefix with compatible prefix
Raises:
APIError: when service returns an error
"""
return self._request(
"DELETE",
path=path,
params=params,
headers=headers,
timeout=timeout,
stream=stream,
**kwargs,
)
def get(
self,
path: Union[str, bytes],
params: Union[None, bytes, Mapping[str, List[str]]] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: _Timeout = None,
stream: Optional[bool] = False,
**kwargs,
) -> APIResponse:
"""HTTP GET operation against configured Podman service.
Args:
path: Relative path to RESTful resource.
params: Optional parameters to include with URL.
headers: Optional headers to include in request.
timeout: Number of seconds to wait on request, or (connect timeout, read timeout) tuple
stream: Return iterator for content vs reading all content into memory
Keyword Args:
compatible: Will override the default path prefix with compatible prefix
Raises:
APIError: when service returns an error
"""
return self._request(
"GET",
path=path,
params=params,
headers=headers,
timeout=timeout,
stream=stream,
**kwargs,
)
def head(
self,
path: Union[str, bytes],
params: Union[None, bytes, Mapping[str, str]] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: _Timeout = None,
stream: Optional[bool] = False,
**kwargs,
) -> APIResponse:
"""HTTP HEAD operation against configured Podman service.
Args:
path: Relative path to RESTful resource.
params: Optional parameters to include with URL.
headers: Optional headers to include in request.
timeout: Number of seconds to wait on request, or (connect timeout, read timeout) tuple
stream: Return iterator for content vs reading all content into memory
Keyword Args:
compatible: Will override the default path prefix with compatible prefix
Raises:
APIError: when service returns an error
"""
return self._request(
"HEAD",
path=path,
params=params,
headers=headers,
timeout=timeout,
stream=stream,
**kwargs,
)
def post(
self,
path: Union[str, bytes],
params: Union[None, bytes, Mapping[str, str]] = None,
data: _Data = None,
headers: Optional[Mapping[str, str]] = None,
timeout: _Timeout = None,
stream: Optional[bool] = False,
**kwargs,
) -> APIResponse:
"""HTTP POST operation against configured Podman service.
Args:
path: Relative path to RESTful resource.
data: HTTP body for operation
params: Optional parameters to include with URL.
headers: Optional headers to include in request.
timeout: Number of seconds to wait on request, or (connect timeout, read timeout) tuple
stream: Return iterator for content vs reading all content into memory
Keyword Args:
compatible: Will override the default path prefix with compatible prefix
Raises:
APIError: when service returns an error
"""
return self._request(
"POST",
path=path,
params=params,
data=data,
headers=headers,
timeout=timeout,
stream=stream,
**kwargs,
)
def put(
self,
path: Union[str, bytes],
params: Union[None, bytes, Mapping[str, str]] = None,
data: _Data = None,
headers: Optional[Mapping[str, str]] = None,
timeout: _Timeout = None,
stream: Optional[bool] = False,
**kwargs,
) -> APIResponse:
"""HTTP PUT operation against configured Podman service.
Args:
path: Relative path to RESTful resource.
data: HTTP body for operation
params: Optional parameters to include with URL.
headers: Optional headers to include in request.
timeout: Number of seconds to wait on request, or (connect timeout, read timeout) tuple
stream: Return iterator for content vs reading all content into memory
Keyword Args:
compatible: Will override the default path prefix with compatible prefix
Raises:
APIError: when service returns an error
"""
return self._request(
"PUT",
path=path,
params=params,
data=data,
headers=headers,
timeout=timeout,
stream=stream,
**kwargs,
)
def _request(
self,
method: str,
path: Union[str, bytes],
data: _Data = None,
params: Union[None, bytes, Mapping[str, str]] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: _Timeout = None,
stream: Optional[bool] = None,
**kwargs,
) -> APIResponse:
"""HTTP operation against configured Podman service.
Args:
method: HTTP method to use for request
path: Relative path to RESTful resource.
params: Optional parameters to include with URL.
headers: Optional headers to include in request.
timeout: Number of seconds to wait on request, or (connect timeout, read timeout) tuple
Keyword Args:
compatible: Will override the default path prefix with compatible prefix
Raises:
APIError: when service returns an error
"""
# Only set timeout if one is given, lower level APIs will not override None
timeout_kw = {}
timeout = timeout or self.timeout
if timeout_kw is not None:
timeout_kw["timeout"] = timeout
compatible = kwargs.get("compatible", False)
path_prefix = self.compatible_prefix if compatible else self.path_prefix
path = path.lstrip("/") # leading / makes urljoin crazy...
# TODO should we have an option for HTTPS support?
# Build URL for operation from base_url
uri = urllib.parse.ParseResult(
"http",
self.base_url.netloc,
urllib.parse.urljoin(path_prefix, path),
self.base_url.params,
self.base_url.query,
self.base_url.fragment,
)
try:
return APIResponse(
self.request(
method.upper(),
uri.geturl(),
params=params,
data=data,
headers=(headers or {}),
stream=stream,
**timeout_kw,
)
)
except OSError as e:
raise APIError(uri.geturl(), explanation=f"{method.upper()} operation failed") from e