added podman, json and yaml

This commit is contained in:
2022-11-27 19:11:46 +01:00
parent 01135dea09
commit 5226e858bb
790 changed files with 114578 additions and 16 deletions

View File

@ -0,0 +1,113 @@
"""Read containers.conf file."""
import urllib
from pathlib import Path
from typing import Dict, Optional
import xdg.BaseDirectory
try:
import toml
except ImportError:
import pytoml as toml
from podman.api import cached_property
class ServiceConnection:
"""ServiceConnection defines a connection to the Podman service."""
def __init__(self, name: str, attrs: Dict[str, str]):
"""Create a Podman ServiceConnection."""
self.name = name
self.attrs = attrs
def __repr__(self) -> str:
return f"""<{self.__class__.__name__}: '{self.id}'>"""
def __hash__(self) -> int:
return hash(tuple(self.name))
def __eq__(self, other) -> bool:
if isinstance(other, ServiceConnection):
return self.id == other.id and self.attrs == other.attrs
return False
@property
def id(self): # pylint: disable=invalid-name
"""str: Returns identifier for service connection."""
return self.name
@cached_property
def url(self):
"""urllib.parse.ParseResult: Returns URL for service connection."""
return urllib.parse.urlparse(self.attrs.get("uri"))
@cached_property
def identity(self):
"""Path: Returns Path to identity file for service connection."""
return Path(self.attrs.get("identity"))
class PodmanConfig:
"""PodmanConfig provides a representation of the containers.conf file."""
def __init__(self, path: Optional[str] = None):
"""Read Podman configuration from users XDG_CONFIG_HOME."""
if path is None:
home = Path(xdg.BaseDirectory.xdg_config_home)
self.path = home / "containers" / "containers.conf"
else:
self.path = Path(path)
self.attrs = {}
if self.path.exists():
with self.path.open(encoding='utf-8') as file:
buffer = file.read()
self.attrs = toml.loads(buffer)
def __hash__(self) -> int:
return hash(tuple(self.path.name))
def __eq__(self, other) -> bool:
if isinstance(other, PodmanConfig):
return self.id == other.id and self.attrs == other.attrs
return False
@property
def id(self): # pylint: disable=invalid-name
"""Path: Returns Path() of container.conf."""
return self.path
@cached_property
def services(self):
"""Dict[str, ServiceConnection]: Returns list of service connections.
Examples:
podman_config = PodmanConfig()
address = podman_config.services["testing"]
print(f"Testing service address {address}")
"""
services: Dict[str, ServiceConnection] = {}
engine = self.attrs.get("engine")
if engine:
destinations = engine.get("service_destinations")
for key in destinations:
connection = ServiceConnection(key, attrs=destinations[key])
services[key] = connection
return services
@cached_property
def active_service(self):
"""Optional[ServiceConnection]: Returns active connection."""
engine = self.attrs.get("engine")
if engine:
active = engine.get("active_service")
destinations = engine.get("service_destinations")
for key in destinations:
if key == active:
return ServiceConnection(key, attrs=destinations[key])
return None

View File

@ -0,0 +1,534 @@
"""Model and Manager for Container resources."""
import io
import json
import logging
import shlex
from contextlib import suppress
from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Sequence, Tuple, Union
import requests
from requests import Response
from podman import api
from podman.api import Literal
from podman.domain.images import Image
from podman.domain.images_manager import ImagesManager
from podman.domain.manager import PodmanResource
from podman.errors import APIError
logger = logging.getLogger("podman.containers")
class Container(PodmanResource):
"""Details and configuration for a container managed by the Podman service."""
@property
def name(self):
"""str: Returns container's name."""
with suppress(KeyError):
if 'Name' in self.attrs:
return self.attrs["Name"].lstrip("/")
return self.attrs["Names"][0].lstrip("/")
return None
@property
def image(self):
"""podman.domain.images.Image: Returns Image object used to create Container."""
if "Image" in self.attrs:
image_id = self.attrs["Image"]
return ImagesManager(client=self.client).get(image_id)
return Image()
@property
def labels(self):
"""dict[str, str]: Returns labels associated with container."""
with suppress(KeyError):
if "Labels" in self.attrs:
return self.attrs["Labels"]
return self.attrs["Config"]["Labels"]
return {}
@property
def status(self):
"""Literal["running", "stopped", "exited", "unknown"]: Returns status of container."""
with suppress(KeyError):
return self.attrs["State"]["Status"]
return "unknown"
@property
def ports(self):
"""dict[str, int]: Return ports exposed by container."""
with suppress(KeyError):
return self.attrs["NetworkSettings"]["Ports"]
return {}
def attach(self, **kwargs) -> Union[str, Iterator[str]]:
"""Attach to container's tty.
Keyword Args:
stdout (bool): Include stdout. Default: True
stderr (bool): Include stderr. Default: True
stream (bool): Return iterator of string(s) vs single string. Default: False
logs (bool): Include previous container output. Default: False
Raises:
NotImplementedError: method not implemented.
"""
raise NotImplementedError()
def attach_socket(self, **kwargs):
"""Not Implemented.
Raises:
NotImplementedError: method not implemented.
"""
raise NotImplementedError()
def commit(self, repository: str = None, tag: str = None, **kwargs) -> Image:
"""Save container to given repository.
Args:
repository: Where to save Image
tag: Tag to push with Image
Keyword Args:
author (str): Name of commit author
changes (List[str]): Instructions to apply during commit
comment (str): Commit message to include with Image, overrides keyword message
conf (dict[str, Any]): Ignored.
format (str): Format of the image manifest and metadata
message (str): Commit message to include with Image
pause (bool): Pause the container before committing it
"""
params = {
"author": kwargs.get("author"),
"changes": kwargs.get("changes"),
"comment": kwargs.get("comment", kwargs.get("message")),
"container": self.id,
"format": kwargs.get("format"),
"pause": kwargs.get("pause"),
"repo": repository,
"tag": tag,
}
response = self.client.post("/commit", params=params)
response.raise_for_status()
body = response.json()
return ImagesManager(client=self.client).get(body["Id"])
def diff(self) -> List[Dict[str, int]]:
"""Report changes of a container's filesystem.
Raises:
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/changes")
response.raise_for_status()
return response.json()
# pylint: disable=too-many-arguments,unused-argument
def exec_run(
self,
cmd: Union[str, List[str]],
stdout: bool = True,
stderr: bool = True,
stdin: bool = False,
tty: bool = True,
privileged: bool = False,
user=None,
detach: bool = False,
stream: bool = False,
socket: bool = False,
environment: Union[Mapping[str, str], List[str]] = None,
workdir: str = None,
demux: bool = False,
) -> Tuple[Optional[int], Union[Iterator[bytes], Any, Tuple[bytes, bytes]]]:
"""Run given command inside container and return results.
Args:
cmd: Command to be executed
stdout: Attach to stdout. Default: True
stderr: Attach to stderr. Default: True
stdin: Attach to stdin. Default: False
tty: Allocate a pseudo-TTY. Default: False
privileged: Run as privileged.
user: User to execute command as. Default: root
detach: If true, detach from the exec command.
Default: False
stream: Stream response data. Default: False
socket: Return the connection socket to allow custom
read/write operations. Default: False
environment: A dictionary or a List[str] in
the following format ["PASSWORD=xxx"] or
{"PASSWORD": "xxx"}.
workdir: Path to working directory for this exec session
demux: Return stdout and stderr separately
Returns:
First item is the command response code
Second item is the requests response content
Raises:
NotImplementedError: method not implemented.
APIError: when service reports error
"""
# pylint: disable-msg=too-many-locals
user = user or "root"
if isinstance(environment, dict):
environment = [f"{k}={v}" for k, v in environment.items()]
data = {
"AttachStderr": stderr,
"AttachStdin": stdin,
"AttachStdout": stdout,
"Cmd": cmd if isinstance(cmd, list) else shlex.split(cmd),
# "DetachKeys": detach, # This is something else
"Env": environment,
"Privileged": privileged,
"Tty": tty,
"User": user,
"WorkingDir": workdir,
}
# create the exec instance
response = self.client.post(f"/containers/{self.name}/exec", data=json.dumps(data))
response.raise_for_status()
exec_id = response.json()['Id']
# start the exec instance, this will store command output
start_resp = self.client.post(
f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty})
)
start_resp.raise_for_status()
# get and return exec information
response = self.client.get(f"/exec/{exec_id}/json")
response.raise_for_status()
return response.json().get('ExitCode'), start_resp.content
def export(self, chunk_size: int = api.DEFAULT_CHUNK_SIZE) -> Iterator[bytes]:
"""Download container's filesystem contents as a tar archive.
Args:
chunk_size: <= number of bytes to return for each iteration of the generator.
Yields:
tarball in size/chunk_size chunks
Raises:
NotFound: when container has been removed from service
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/export", stream=True)
response.raise_for_status()
for out in response.iter_content(chunk_size=chunk_size):
yield out
def get_archive(
self, path: str, chunk_size: int = api.DEFAULT_CHUNK_SIZE
) -> Tuple[Iterable, Dict[str, Any]]:
"""Download a file or folder from the container's filesystem.
Args:
path: Path to file or folder.
chunk_size: <= number of bytes to return for each iteration of the generator.
Returns:
First item is a raw tar data stream.
Second item is a dict containing os.stat() information on the specified path.
"""
response = self.client.get(f"/containers/{self.id}/archive", params={"path": [path]})
response.raise_for_status()
stat = response.headers.get("x-docker-container-path-stat", None)
stat = api.decode_header(stat)
return response.iter_content(chunk_size=chunk_size), stat
def inspect(self) -> Dict:
"""Inspect a container.
Raises:
APIError: when service reports an error
"""
response = self.client.get(f"/containers/{self.id}/json")
response.raise_for_status()
return response.json()
def kill(self, signal: Union[str, int, None] = None) -> None:
"""Send signal to container.
Raises:
APIError: when service reports an error
"""
response = self.client.post(f"/containers/{self.id}/kill", params={"signal": signal})
response.raise_for_status()
def logs(self, **kwargs) -> Union[bytes, Iterator[bytes]]:
"""Get logs from the container.
Keyword Args:
stdout (bool): Include stdout. Default: True
stderr (bool): Include stderr. Default: True
stream (bool): Return generator of strings as the response. Default: False
timestamps (bool): Show timestamps in output. Default: False
tail (Union[str, int]): Output specified number of lines at the end of
logs. Integer representing the number of lines to display, or the string all.
Default: all
since (Union[datetime, int]): Show logs since a given datetime or
integer epoch (in seconds)
follow (bool): Follow log output. Default: False
until (Union[datetime, int]): Show logs that occurred before the given
datetime or integer epoch (in seconds)
"""
stream = bool(kwargs.get("stream", False))
params = {
"follow": kwargs.get("follow", kwargs.get("stream", None)),
"since": api.prepare_timestamp(kwargs.get("since")),
"stderr": kwargs.get("stderr", None),
"stdout": kwargs.get("stdout", True),
"tail": kwargs.get("tail"),
"timestamps": kwargs.get("timestamps"),
"until": api.prepare_timestamp(kwargs.get("until")),
}
response = self.client.get(f"/containers/{self.id}/logs", stream=stream, params=params)
response.raise_for_status()
if stream:
return api.stream_frames(response)
return api.frames(response)
def pause(self) -> None:
"""Pause processes within the container."""
response = self.client.post(f"/containers/{self.id}/pause")
response.raise_for_status()
def put_archive(self, path: str, data: bytes = None) -> bool:
"""Upload tar archive containing a file or folder to be written into container.
Args:
path: File to write data into
data: Contents to write to file, when None path will be read on client to
build tarfile.
Returns:
True when successful
Raises:
APIError: when server reports error
"""
if path is None:
raise ValueError("'path' is a required argument.")
if data is None:
data = api.create_tar("/", path)
response = self.client.put(
f"/containers/{self.id}/archive", params={"path": path}, data=data
)
return response.ok
def remove(self, **kwargs) -> None:
"""Delete container.
Keyword Args:
v (bool): Delete associated volumes as well.
link (bool): Ignored.
force (bool): Kill a running container before deleting.
"""
self.manager.remove(self.id, **kwargs)
def rename(self, name: str) -> None:
"""Rename container.
Container updated in-situ to avoid reload().
Args:
name: New name for container.
"""
if not name:
raise ValueError("'name' is a required argument.")
response = self.client.post(f"/containers/{self.id}/rename", params={"name": name})
response.raise_for_status()
self.attrs["Name"] = name # shortcut to avoid needing reload()
def resize(self, height: int = None, width: int = None) -> None:
"""Resize the tty session.
Args:
height: New height of tty session.
width: New width of tty session.
"""
params = {
"h": height,
"w": width,
}
response = self.client.post(f"/containers/{self.id}/resize", params=params)
response.raise_for_status()
def restart(self, **kwargs) -> None:
"""Restart processes in container.
Keyword Args:
timeout (int): Seconds to wait for container to stop before killing container.
"""
params = {"timeout": kwargs.get("timeout")}
post_kwargs = {}
if kwargs.get("timeout"):
post_kwargs["timeout"] = float(params["timeout"]) * 1.5
response = self.client.post(f"/containers/{self.id}/restart", params=params, **post_kwargs)
response.raise_for_status()
def start(self, **kwargs) -> None:
"""Start processes in container.
Keyword Args:
detach_keys: Override the key sequence for detaching a container (Podman only)
"""
response = self.client.post(
f"/containers/{self.id}/start", params={"detachKeys": kwargs.get("detach_keys")}
)
response.raise_for_status()
def stats(self, **kwargs) -> Union[Sequence[Dict[str, bytes]], bytes]:
"""Return statistics for container.
Keyword Args:
decode (bool): If True and stream is True, stream will be decoded into dict's.
Default: False.
stream (bool): Stream statistics until cancelled. Default: True.
Raises:
APIError: when service reports an error
"""
# FIXME Errors in stream are not handled, need content and json to read Errors.
stream = kwargs.get("stream", True)
decode = kwargs.get("decode", False)
params = {
"containers": self.id,
"stream": stream,
}
response = self.client.get("/containers/stats", params=params)
response.raise_for_status()
if stream:
return self._stats_helper(decode, response.iter_lines())
with io.StringIO() as buffer:
for entry in response.text:
buffer.write(json.dumps(entry) + "\n")
return buffer.getvalue()
@staticmethod
def _stats_helper(
decode: bool, body: List[Dict[str, Any]]
) -> Iterator[Union[str, Dict[str, Any]]]:
"""Helper needed to allow stats() to return either a generator or a str."""
for entry in body:
if decode:
yield json.loads(entry)
else:
yield entry
def stop(self, **kwargs) -> None:
"""Stop container.
Keyword Args:
all (bool): When True, stop all containers. Default: False (Podman only)
ignore (bool): When True, ignore error if container already stopped (Podman only)
timeout (int): Number of seconds to wait on container to stop before killing it.
"""
params = {"all": kwargs.get("all"), "timeout": kwargs.get("timeout")}
post_kwargs = {}
if kwargs.get("timeout"):
post_kwargs["timeout"] = float(params["timeout"]) * 1.5
response = self.client.post(f"/containers/{self.id}/stop", params=params, **post_kwargs)
response.raise_for_status()
if response.status_code == requests.codes.no_content:
return
if response.status_code == requests.codes.not_modified:
if kwargs.get("ignore", False):
return
body = response.json()
raise APIError(body["cause"], response=response, explanation=body["message"])
def top(self, **kwargs) -> Union[Iterator[Dict[str, Any]], Dict[str, Any]]:
"""Report on running processes in the container.
Keyword Args:
ps_args (str): When given, arguments will be passed to ps
stream (bool): When True, repeatedly return results. Default: False
Raises:
NotFound: when the container no longer exists
APIError: when the service reports an error
"""
params = {
"ps_args": kwargs.get("ps_args"),
"stream": kwargs.get("stream", False),
}
response = self.client.get(f"/containers/{self.id}/top", params=params)
response.raise_for_status()
if params["stream"]:
self._top_helper(response)
return response.json()
@staticmethod
def _top_helper(response: Response) -> Iterator[Dict[str, Any]]:
for line in response.iter_lines():
yield line
def unpause(self) -> None:
"""Unpause processes in container."""
response = self.client.post(f"/containers/{self.id}/unpause")
response.raise_for_status()
def update(self, **kwargs):
"""Update resource configuration of the containers.
Raises:
NotImplementedError: Podman service unsupported operation.
"""
raise NotImplementedError("Container.update() is not supported by Podman service.")
def wait(self, **kwargs) -> Dict[Literal["StatusCode", "Error"], Any]:
"""Block until the container enters given state.
Keyword Args:
condition (Union[str, List[str]]): Container state on which to release.
One or more of: "configured", "created", "running", "stopped",
"paused", "exited", "removing", "stopping".
interval (int): Time interval to wait before polling for completion.
Returns:
"Error" key has a dictionary value with the key "Message".
Raises:
NotFound: when Container not found
ReadTimeoutError: when timeout is exceeded
APIError: when service returns an error
"""
condition = kwargs.get("condition")
if isinstance(condition, str):
condition = [condition]
interval = kwargs.get("interval")
params = {}
if condition != []:
params["condition"] = condition
if interval != "":
params["interval"] = interval
response = self.client.post(f"/containers/{self.id}/wait", params=params)
response.raise_for_status()
return response.json()

View File

@ -0,0 +1,602 @@
"""Mixin to provide Container create() method."""
import copy
import logging
import re
from contextlib import suppress
from typing import Any, Dict, List, MutableMapping, Union
from podman import api
from podman.domain.containers import Container
from podman.domain.images import Image
from podman.domain.pods import Pod
from podman.errors import ImageNotFound
logger = logging.getLogger("podman.containers")
class CreateMixin: # pylint: disable=too-few-public-methods
"""Class providing create method for ContainersManager."""
def create(
self, image: Union[Image, str], command: Union[str, List[str], None] = None, **kwargs
) -> Container:
"""Create a container.
Args:
image: Image to run.
command: Command to run in the container.
Keyword Args:
auto_remove (bool): Enable auto-removal of the container on daemon side when the
container's process exits.
blkio_weight_device (Dict[str, Any]): Block IO weight (relative device weight)
in the form of: [{"Path": "device_path", "Weight": weight}].
blkio_weight (int): Block IO weight (relative weight), accepts a weight value
between 10 and 1000.
cap_add (List[str]): Add kernel capabilities. For example: ["SYS_ADMIN", "MKNOD"]
cap_drop (List[str]): Drop kernel capabilities.
cgroup_parent (str): Override the default parent cgroup.
cpu_count (int): Number of usable CPUs (Windows only).
cpu_percent (int): Usable percentage of the available CPUs (Windows only).
cpu_period (int): The length of a CPU period in microseconds.
cpu_quota (int): Microseconds of CPU time that the container can get in a CPU period.
cpu_rt_period (int): Limit CPU real-time period in microseconds.
cpu_rt_runtime (int): Limit CPU real-time runtime in microseconds.
cpu_shares (int): CPU shares (relative weight).
cpuset_cpus (str): CPUs in which to allow execution (0-3, 0,1).
cpuset_mems (str): Memory nodes (MEMs) in which to allow execution (0-3, 0,1).
Only effective on NUMA systems.
detach (bool): Run container in the background and return a Container object.
device_cgroup_rules (List[str]): A list of cgroup rules to apply to the container.
device_read_bps: Limit read rate (bytes per second) from a device in the form of:
`[{"Path": "device_path", "Rate": rate}]`
device_read_iops: Limit read rate (IO per second) from a device.
device_write_bps: Limit write rate (bytes per second) from a device.
device_write_iops: Limit write rate (IO per second) from a device.
devices (List[str]): Expose host devices to the container, as a List[str] in the form
<path_on_host>:<path_in_container>:<cgroup_permissions>.
For example:
/dev/sda:/dev/xvda:rwm allows the container to have read-write access to the
host's /dev/sda via a node named /dev/xvda inside the container.
dns (List[str]): Set custom DNS servers.
dns_opt (List[str]): Additional options to be added to the container's resolv.conf file.
dns_search (List[str]): DNS search domains.
domainname (Union[str, List[str]]): Set custom DNS search domains.
entrypoint (Union[str, List[str]]): The entrypoint for the container.
environment (Union[Dict[str, str], List[str]): Environment variables to set inside
the container, as a dictionary or a List[str] in the format
["SOMEVARIABLE=xxx", "SOMEOTHERVARIABLE=xyz"].
extra_hosts (Dict[str, str]): Additional hostnames to resolve inside the container,
as a mapping of hostname to IP address.
group_add (List[str]): List of additional group names and/or IDs that the container
process will run as.
healthcheck (Dict[str,Any]): Specify a test to perform to check that the
container is healthy.
health_check_on_failure_action (int): Specify an action if a healthcheck fails.
hostname (str): Optional hostname for the container.
init (bool): Run an init inside the container that forwards signals and reaps processes
init_path (str): Path to the docker-init binary
ipc_mode (str): Set the IPC mode for the container.
isolation (str): Isolation technology to use. Default: `None`.
kernel_memory (int or str): Kernel memory limit
labels (Union[Dict[str, str], List[str]): A dictionary of name-value labels (e.g.
{"label1": "value1", "label2": "value2"}) or a list of names of labels to set
with empty values (e.g. ["label1", "label2"])
links (Optional[Dict[str, str]]): Mapping of links using the {'container': 'alias'}
format. The alias is optional. Containers declared in this dict will be linked to
the new container using the provided alias. Default: None.
log_config (LogConfig): Logging configuration.
lxc_config (Dict[str, str]): LXC config.
mac_address (str): MAC address to assign to the container.
mem_limit (Union[int, str]): Memory limit. Accepts float values (which represent the
memory limit of the created container in bytes) or a string with a units
identification char (100000b, 1000k, 128m, 1g). If a string is specified without
a units character, bytes are assumed as an intended unit.
mem_reservation (Union[int, str]): Memory soft limit.
mem_swappiness (int): Tune a container's memory swappiness behavior. Accepts number
between 0 and 100.
memswap_limit (Union[int, str]): Maximum amount of memory + swap a container is allowed
to consume.
mounts (List[Mount]): Specification for mounts to be added to the container. More
powerful alternative to volumes. Each item in the list is expected to be a
Mount object.
For example :
[
{
"type": "bind",
"source": "/a/b/c1",
"target" "/d1",
"read_only": True,
"relabel": "Z"
},
{
"type": "tmpfs",
"source": "tmpfs", # If this was not passed, the regular directory
# would be created rather than tmpfs mount !!!
# as this will cause to have invalid entry
# in /proc/self/mountinfo
"target" "/d2",
"size": "100k",
"chown": True
}
]
name (str): The name for this container.
nano_cpus (int): CPU quota in units of 1e-9 CPUs.
networks (Dict[str, Dict[str, Union[str, List[str]]):
Networks which will be connected to container during container creation
Values of the network configuration can be :
- string
- list of strings (e.g. Aliases)
network_disabled (bool): Disable networking.
network_mode (str): One of:
- bridge: Create a new network stack for the container
on the bridge network.
- none: No networking for this container.
- container:<name|id>: Reuse another container's network
stack.
- host: Use the host network stack.
Incompatible with network.
oom_kill_disable (bool): Whether to disable OOM killer.
oom_score_adj (int): An integer value containing the score given to the container in
order to tune OOM killer preferences.
pid_mode (str): If set to host, use the host PID namespace
inside the container.
pids_limit (int): Tune a container's pids limit. Set -1 for unlimited.
platform (str): Platform in the format os[/arch[/variant]]. Only used if the method
needs to pull the requested image.
ports (Dict[str, Union[int, Tuple[str, int], List[int]]]): Ports to bind inside
the container.
The keys of the dictionary are the ports to bind inside the container, either as an
integer or a string in the form port/protocol, where the protocol is either
tcp, udp, or sctp.
The values of the dictionary are the corresponding ports to open on the host,
which can be either:
- The port number, as an integer.
For example: {'2222/tcp': 3333} will expose port 2222 inside the container
as port 3333 on the host.
- None, to assign a random host port.
For example: {'2222/tcp': None}.
- A tuple of (address, port) if you want to specify the host interface.
For example: {'1111/tcp': ('127.0.0.1', 1111)}.
- A list of integers or tuples of (address, port), if you want to bind
multiple host ports to a single container port.
For example: {'1111/tcp': [1234, ("127.0.0.1", 4567)]}.
For example: {'9090': 7878, '10932/tcp': '8781',
"8989/tcp": ("127.0.0.1", 9091)}
privileged (bool): Give extended privileges to this container.
publish_all_ports (bool): Publish all ports to the host.
read_only (bool): Mount the container's root filesystem as read only.
remove (bool): Remove the container when it has finished running. Default: False.
restart_policy (Dict[str, Union[str, int]]): Restart the container when it exits.
Configured as a dictionary with keys:
- Name: One of on-failure, or always.
- MaximumRetryCount: Number of times to restart the container on failure.
For example: {"Name": "on-failure", "MaximumRetryCount": 5}
runtime (str): Runtime to use with this container.
security_opt (List[str]): A List[str]ing values to customize labels for MLS systems,
such as SELinux.
shm_size (Union[str, int]): Size of /dev/shm (e.g. 1G).
stdin_open (bool): Keep STDIN open even if not attached.
stdout (bool): Return logs from STDOUT when detach=False. Default: True.
stderr (bool): Return logs from STDERR when detach=False. Default: False.
stop_signal (str): The stop signal to use to stop the container (e.g. SIGINT).
storage_opt (Dict[str, str]): Storage driver options per container as a
key-value mapping.
stream (bool): If true and detach is false, return a log generator instead of a string.
Ignored if detach is true. Default: False.
sysctls (Dict[str, str]): Kernel parameters to set in the container.
tmpfs (Dict[str, str]): Temporary filesystems to mount, as a dictionary mapping a
path inside the container to options for that path.
For example: {'/mnt/vol2': '', '/mnt/vol1': 'size=3G,uid=1000'}
tty (bool): Allocate a pseudo-TTY.
ulimits (List[Ulimit]): Ulimits to set inside the container.
use_config_proxy (bool): If True, and if the docker client configuration
file (~/.config/containers/config.json by default) contains a proxy configuration,
the corresponding environment variables will be set in the container being built.
user (Union[str, int]): Username or UID to run commands as inside the container.
userns_mode (str): Sets the user namespace mode for the container when user namespace
remapping option is enabled. Supported values are: host
uts_mode (str): Sets the UTS namespace mode for the container.
Supported values are: host
version (str): The version of the API to use. Set to auto to automatically detect
the server's version. Default: 3.0.0
volume_driver (str): The name of a volume driver/plugin.
volumes (Dict[str, Dict[str, Union[str, list]]]): A dictionary to configure
volumes mounted inside the container.
The key is either the host path or a volume name, and the value is
a dictionary with the keys:
- bind: The path to mount the volume inside the container
- mode: Either rw to mount the volume read/write, or ro to mount it read-only.
Kept for docker-py compatibility
- extended_mode: List of options passed to volume mount.
For example:
{
'test_bind_1':
{'bind': '/mnt/vol1', 'mode': 'rw'},
'test_bind_2':
{'bind': '/mnt/vol2', 'extended_mode': ['ro', 'noexec']},
'test_bind_3':
{'bind': '/mnt/vol3', 'extended_mode': ['noexec'], 'mode': 'rw'}
}
volumes_from (List[str]): List of container names or IDs to get volumes from.
working_dir (str): Path to the working directory.
Raises:
ImageNotFound: when Image not found by Podman service
APIError: when Podman service reports an error
"""
if isinstance(image, Image):
image = image.id
payload = {"image": image, "command": command}
payload.update(kwargs)
payload = self._render_payload(payload)
payload = api.prepare_body(payload)
response = self.client.post(
"/containers/create", headers={"content-type": "application/json"}, data=payload
)
response.raise_for_status(not_found=ImageNotFound)
body = response.json()
return self.get(body["Id"])
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
@staticmethod
def _render_payload(kwargs: MutableMapping[str, Any]) -> Dict[str, Any]:
"""Map create/run kwargs into body parameters."""
args = copy.copy(kwargs)
if "links" in args:
if len(args["links"]) > 0:
raise ValueError("'links' are not supported by Podman service.")
del args["links"]
# Ignore these keywords
for key in (
"cpu_count",
"cpu_percent",
"nano_cpus",
"platform", # used by caller
"remove", # used by caller
"stderr", # used by caller
"stdout", # used by caller
"stream", # used by caller
"detach", # used by caller
"volume_driver",
):
with suppress(KeyError):
del args[key]
# These keywords are not supported for various reasons.
unsupported_keys = set(args.keys()).intersection(
(
"blkio_weight",
"blkio_weight_device", # FIXME In addition to device Major/Minor include path
"device_cgroup_rules", # FIXME Where to map for Podman API?
"device_read_bps", # FIXME In addition to device Major/Minor include path
"device_read_iops", # FIXME In addition to device Major/Minor include path
"device_requests", # FIXME In addition to device Major/Minor include path
"device_write_bps", # FIXME In addition to device Major/Minor include path
"device_write_iops", # FIXME In addition to device Major/Minor include path
"domainname",
"network_disabled", # FIXME Where to map for Podman API?
"storage_opt", # FIXME Where to map for Podman API?
"tmpfs", # FIXME Where to map for Podman API?
)
)
if len(unsupported_keys) > 0:
raise TypeError(
f"""Keyword(s) '{" ,".join(unsupported_keys)}' are"""
f""" currently not supported by Podman API."""
)
def pop(k):
return args.pop(k, None)
def to_bytes(size: Union[int, str, None]) -> Union[int, None]:
"""
Converts str or int to bytes.
Input can be in the following forms :
0) None - e.g. None -> returns None
1) int - e.g. 100 == 100 bytes
2) str - e.g. '100' == 100 bytes
3) str with suffix - available suffixes:
b | B - bytes
k | K = kilobytes
m | M = megabytes
g | G = gigabytes
e.g. '100m' == 104857600 bytes
"""
size_type = type(size)
if size is None:
return size
if size_type is int:
return size
if size_type is str:
try:
return int(size)
except ValueError as bad_size:
mapping = {'b': 0, 'k': 1, 'm': 2, 'g': 3}
mapping_regex = ''.join(mapping.keys())
search = re.search(rf'^(\d+)([{mapping_regex}])$', size.lower())
if search:
return int(search.group(1)) * (1024 ** mapping[search.group(2)])
raise TypeError(
f"Passed string size {size} should be in format\\d+[bBkKmMgG] (e.g. '100m')"
) from bad_size
else:
raise TypeError(
f"Passed size {size} should be a type of unicode, str "
f"or int (found : {size_type})"
)
# Transform keywords into parameters
params = {
"annotations": pop("annotations"), # TODO document, podman only
"apparmor_profile": pop("apparmor_profile"), # TODO document, podman only
"cap_add": pop("cap_add"),
"cap_drop": pop("cap_drop"),
"cgroup_parent": pop("cgroup_parent"),
"cgroups_mode": pop("cgroups_mode"), # TODO document, podman only
"cni_networks": [pop("network")],
"command": args.pop("command", args.pop("cmd", None)),
"conmon_pid_file": pop("conmon_pid_file"), # TODO document, podman only
"containerCreateCommand": pop("containerCreateCommand"), # TODO document, podman only
"devices": [],
"dns_options": pop("dns_opt"),
"dns_search": pop("dns_search"),
"dns_server": pop("dns"),
"entrypoint": pop("entrypoint"),
"env": pop("environment"),
"env_host": pop("env_host"), # TODO document, podman only
"expose": {},
"groups": pop("group_add"),
"healthconfig": pop("healthcheck"),
"health_check_on_failure_action": pop("health_check_on_failure_action"),
"hostadd": [],
"hostname": pop("hostname"),
"httpproxy": pop("use_config_proxy"),
"idmappings": pop("idmappings"), # TODO document, podman only
"image": pop("image"),
"image_volume_mode": pop("image_volume_mode"), # TODO document, podman only
"image_volumes": pop("image_volumes"), # TODO document, podman only
"init": pop("init"),
"init_path": pop("init_path"),
"isolation": pop("isolation"),
"labels": pop("labels"),
"log_configuration": {},
"lxc_config": pop("lxc_config"),
"mask": pop("masked_paths"),
"mounts": [],
"name": pop("name"),
"namespace": pop("namespace"), # TODO What is this for?
"network_options": pop("network_options"), # TODO document, podman only
"networks": pop("networks"),
"no_new_privileges": pop("no_new_privileges"), # TODO document, podman only
"oci_runtime": pop("runtime"),
"oom_score_adj": pop("oom_score_adj"),
"overlay_volumes": pop("overlay_volumes"), # TODO document, podman only
"portmappings": [],
"privileged": pop("privileged"),
"procfs_opts": pop("procfs_opts"), # TODO document, podman only
"publish_image_ports": pop("publish_all_ports"),
"r_limits": [],
"raw_image_name": pop("raw_image_name"), # TODO document, podman only
"read_only_filesystem": pop("read_only"),
"remove": args.pop("remove", args.pop("auto_remove", None)),
"resource_limits": {},
"rootfs": pop("rootfs"),
"rootfs_propagation": pop("rootfs_propagation"),
"sdnotifyMode": pop("sdnotifyMode"), # TODO document, podman only
"seccomp_policy": pop("seccomp_policy"), # TODO document, podman only
"seccomp_profile_path": pop("seccomp_profile_path"), # TODO document, podman only
"secrets": pop("secrets"), # TODO document, podman only
"selinux_opts": pop("security_opt"),
"shm_size": to_bytes(pop("shm_size")),
"static_mac": pop("mac_address"),
"stdin": pop("stdin_open"),
"stop_signal": pop("stop_signal"),
"stop_timeout": pop("stop_timeout"), # TODO document, podman only
"sysctl": pop("sysctls"),
"systemd": pop("systemd"), # TODO document, podman only
"terminal": pop("tty"),
"timezone": pop("timezone"),
"umask": pop("umask"), # TODO document, podman only
"unified": pop("unified"), # TODO document, podman only
"unmask": pop("unmasked_paths"), # TODO document, podman only
"use_image_hosts": pop("use_image_hosts"), # TODO document, podman only
"use_image_resolve_conf": pop("use_image_resolve_conf"), # TODO document, podman only
"user": pop("user"),
"version": pop("version"),
"volumes": [],
"volumes_from": pop("volumes_from"),
"work_dir": pop("working_dir"),
}
for device in args.pop("devices", []):
params["devices"].append({"path": device})
for item in args.pop("exposed_ports", []):
port, protocol = item.split("/")
params["expose"][int(port)] = protocol
for hostname, ip in args.pop("extra_hosts", {}).items():
params["hostadd"].append(f"{hostname}:{ip}")
if "log_config" in args:
params["log_configuration"]["driver"] = args["log_config"].get("Type")
if "Config" in args["log_config"]:
params["log_configuration"]["path"] = args["log_config"]["Config"].get("path")
params["log_configuration"]["size"] = args["log_config"]["Config"].get("size")
params["log_configuration"]["options"] = args["log_config"]["Config"].get("options")
args.pop("log_config")
for item in args.pop("mounts", []):
mount_point = {
"destination": item.get("target"),
"options": [],
"source": item.get("source"),
"type": item.get("type"),
}
# some names are different for podman-py vs REST API due to compatibility with docker
# some (e.g. chown) despite listed in podman-run documentation fails with error
names_dict = {"read_only": "ro", "chown": "U"}
options = []
simple_options = ["propagation", "relabel"]
bool_options = ["read_only", "U", "chown"]
regular_options = ["consistency", "mode", "size"]
for k, v in item.items():
option_name = names_dict.get(k, k)
if k in bool_options and v is True:
options.append(option_name)
elif k in regular_options:
options.append(f'{option_name}={v}')
elif k in simple_options:
options.append(v)
mount_point["options"] = options
params["mounts"].append(mount_point)
if "pod" in args:
pod = args.pop("pod")
if isinstance(pod, Pod):
pod = pod.id
params["pod"] = pod # TODO document, podman only
for container, host in args.pop("ports", {}).items():
if "/" in container:
container_port, protocol = container.split("/")
else:
container_port, protocol = container, "tcp"
port_map = {"container_port": int(container_port), "protocol": protocol}
if host is None:
pass
elif isinstance(host, int) or isinstance(host, str) and host.isdigit():
port_map["host_port"] = int(host)
elif isinstance(host, tuple):
port_map["host_ip"] = host[0]
port_map["host_port"] = int(host[1])
elif isinstance(host, list):
for host_list in host:
port_map = {"container_port": int(container_port), "protocol": protocol}
if (
isinstance(host_list, int)
or isinstance(host_list, str)
and host_list.isdigit()
):
port_map["host_port"] = int(host_list)
elif isinstance(host_list, tuple):
port_map["host_ip"] = host_list[0]
port_map["host_port"] = int(host_list[1])
else:
raise ValueError(f"'ports' value of '{host_list}' is not supported.")
params["portmappings"].append(port_map)
continue
else:
raise ValueError(f"'ports' value of '{host}' is not supported.")
params["portmappings"].append(port_map)
if "restart_policy" in args:
params["restart_policy"] = args["restart_policy"].get("Name")
params["restart_tries"] = args["restart_policy"].get("MaximumRetryCount")
args.pop("restart_policy")
params["resource_limits"]["pids"] = {"limit": args.pop("pids_limit", None)}
params["resource_limits"]["cpu"] = {
"cpus": args.pop("cpuset_cpus", None),
"mems": args.pop("cpuset_mems", None),
"period": args.pop("cpu_period", None),
"quota": args.pop("cpu_quota", None),
"realtimePeriod": args.pop("cpu_rt_period", None),
"realtimeRuntime": args.pop("cpu_rt_runtime", None),
"shares": args.pop("cpu_shares", None),
}
params["resource_limits"]["memory"] = {
"disableOOMKiller": args.pop("oom_kill_disable", None),
"kernel": to_bytes(args.pop("kernel_memory", None)),
"kernelTCP": args.pop("kernel_memory_tcp", None),
"limit": to_bytes(args.pop("mem_limit", None)),
"reservation": to_bytes(args.pop("mem_reservation", None)),
"swap": to_bytes(args.pop("memswap_limit", None)),
"swappiness": args.pop("mem_swappiness", None),
"useHierarchy": args.pop("mem_use_hierarchy", None),
}
for item in args.pop("ulimits", []):
params["r_limits"].append(
{
"type": item["Name"],
"hard": item["Hard"],
"soft": item["Soft"],
}
)
for item in args.pop("volumes", {}).items():
key, value = item
extended_mode = value.get('extended_mode', [])
if not isinstance(extended_mode, list):
raise ValueError("'extended_mode' value should be a list")
options = extended_mode
mode = value.get('mode')
if mode is not None:
if not isinstance(mode, str):
raise ValueError("'mode' value should be a str")
options.append(mode)
volume = {"Name": key, "Dest": value["bind"], "Options": options}
params["volumes"].append(volume)
if "cgroupns" in args:
params["cgroupns"] = {"nsmode": args.pop("cgroupns")}
if "ipc_mode" in args:
params["ipcns"] = {"nsmode": args.pop("ipc_mode")}
if "network_mode" in args:
params["netns"] = {"nsmode": args.pop("network_mode")}
if "pid_mode" in args:
params["pidns"] = {"nsmode": args.pop("pid_mode")}
if "userns_mode" in args:
params["userns"] = {"nsmode": args.pop("userns_mode")}
if "uts_mode" in args:
params["utsns"] = {"nsmode": args.pop("uts_mode")}
if len(args) > 0:
raise TypeError(
"Unknown keyword argument(s): " + " ,".join(f"'{k}'" for k in args.keys())
)
return params

View File

@ -0,0 +1,140 @@
"""PodmanResource manager subclassed for Containers."""
import logging
import urllib
from typing import Any, Dict, List, Mapping, Union
from podman import api
from podman.domain.containers import Container
from podman.domain.containers_create import CreateMixin
from podman.domain.containers_run import RunMixin
from podman.domain.manager import Manager
from podman.errors import APIError
logger = logging.getLogger("podman.containers")
class ContainersManager(RunMixin, CreateMixin, Manager):
"""Specialized Manager for Container resources."""
@property
def resource(self):
"""Type[Container]: prepare_model() will create Container classes."""
return Container
def exists(self, key: str) -> bool:
response = self.client.get(f"/containers/{key}/exists")
return response.ok
def get(self, key: str) -> Container:
"""Get container by name or id.
Args:
container_id: Container name or id.
Raises:
NotFound: when Container does not exist
APIError: when an error return by service
"""
container_id = urllib.parse.quote_plus(key)
response = self.client.get(f"/containers/{container_id}/json")
response.raise_for_status()
return self.prepare_model(attrs=response.json())
def list(self, **kwargs) -> List[Container]:
"""Report on containers.
Keyword Args:
all: If False, only show running containers. Default: False.
since: Show containers created after container name or id given.
before: Show containers created before container name or id given.
limit: Show last N created containers.
filters: Filter container reported.
Available filters:
- exited (int): Only containers with specified exit code
- status (str): One of restarting, running, paused, exited
- label (Union[str, List[str]]): Format either "key", "key=value" or a list of such.
- id (str): The id of the container.
- name (str): The name of the container.
- ancestor (str): Filter by container ancestor. Format of
<image-name>[:tag], <image-id>, or <image@digest>.
- before (str): Only containers created before a particular container.
Give the container name or id.
- since (str): Only containers created after a particular container.
Give container name or id.
sparse: Ignored
ignore_removed: If True, ignore failures due to missing containers.
Raises:
APIError: when service returns an error
"""
params = {
"all": kwargs.get("all"),
"filters": kwargs.get("filters", {}),
"limit": kwargs.get("limit"),
}
if "before" in kwargs:
params["filters"]["before"] = kwargs.get("before")
if "since" in kwargs:
params["filters"]["since"] = kwargs.get("since")
# filters formatted last because some kwargs may need to be mapped into filters
params["filters"] = api.prepare_filters(params["filters"])
response = self.client.get("/containers/json", params=params)
response.raise_for_status()
return [self.prepare_model(attrs=i) for i in response.json()]
def prune(self, filters: Mapping[str, str] = None) -> Dict[str, Any]:
"""Delete stopped containers.
Args:
filters: Criteria for determining containers to remove. Available keys are:
- until (str): Delete containers before this time
- label (List[str]): Labels associated with containers
Returns:
Keys:
- ContainersDeleted (List[str]): Identifiers of deleted containers.
- SpaceReclaimed (int): Amount of disk space reclaimed in bytes.
Raises:
APIError: when service reports an error
"""
params = {"filters": api.prepare_filters(filters)}
response = self.client.post("/containers/prune", params=params)
response.raise_for_status()
results = {"ContainersDeleted": [], "SpaceReclaimed": 0}
for entry in response.json():
if entry.get("error") is not None:
raise APIError(entry["error"], response=response, explanation=entry["error"])
results["ContainersDeleted"].append(entry["Id"])
results["SpaceReclaimed"] += entry["Size"]
return results
def remove(self, container_id: Union[Container, str], **kwargs):
"""Delete container.
Podman only
Args:
container_id: identifier of Container to delete.
Keyword Args:
v (bool): Delete associated volumes as well.
link (bool): Ignored.
force (bool): Kill a running container before deleting.
"""
if isinstance(container_id, Container):
container_id = container_id.id
params = {
"v": kwargs.get("v"),
"force": kwargs.get("force"),
}
response = self.client.delete(f"/containers/{container_id}", params=params)
response.raise_for_status()

View File

@ -0,0 +1,92 @@
"""Mixin to provide Container run() method."""
import logging
from contextlib import suppress
from typing import Generator, Iterator, List, Union
from podman.domain.containers import Container
from podman.domain.images import Image
from podman.errors import ContainerError, ImageNotFound
logger = logging.getLogger("podman.containers")
class RunMixin: # pylint: disable=too-few-public-methods
"""Class providing run() method for ContainersManager."""
def run(
self,
image: Union[str, Image],
command: Union[str, List[str], None] = None,
stdout=True,
stderr=False,
remove: bool = False,
**kwargs,
) -> Union[Container, Union[Generator[str, None, None], Iterator[str]]]:
"""Run a container.
By default, run() will wait for the container to finish and return its logs.
If detach=True, run() will start the container and return a Container object rather
than logs.
Args:
image: Image to run.
command: Command to run in the container.
stdout: Include stdout. Default: True.
stderr: Include stderr. Default: False.
remove: Delete container when the container's processes exit. Default: False.
Keyword Args:
- See the create() method for keyword arguments.
Returns:
- When detach is True, return a Container
- If stdout is True, include stdout from container in output
- If stderr is True, include stderr from container in output
- When stream is True, output from container is returned as a generator
- Otherwise, an iterator is returned after container has finished
Raises:
ContainerError: when Container exists with a non-zero code
ImageNotFound: when Image not found by Podman service
APIError: when Podman service reports an error
"""
if isinstance(image, Image):
image = image.id
if isinstance(command, str):
command = [command]
try:
container = self.create(image=image, command=command, **kwargs)
except ImageNotFound:
self.client.images.pull(image, platform=kwargs.get("platform"))
container = self.create(image=image, command=command, **kwargs)
container.start()
container.wait(condition=["running", "exited"])
container.reload()
if kwargs.get("detach", False):
return container
with suppress(KeyError):
log_type = container.attrs["HostConfig"]["LogConfig"]["Type"]
log_iter = None
if log_type in ("json-file", "journald"):
log_iter = container.logs(stdout=stdout, stderr=stderr, stream=True, follow=True)
exit_status = container.wait()["StatusCode"]
if exit_status != 0:
log_iter = None
if not kwargs.get("auto_remove", False):
log_iter = container.logs(stdout=False, stderr=True)
if remove:
container.remove()
if exit_status != 0:
raise ContainerError(container, exit_status, command, image, log_iter)
return log_iter if kwargs.get("stream", False) or log_iter is None else b"".join(log_iter)

View File

@ -0,0 +1,57 @@
"""Model and Manager for Event resources."""
import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional, Union, Iterator
from podman import api
from podman.api.client import APIClient
logger = logging.getLogger("podman.events")
class EventsManager: # pylint: disable=too-few-public-methods
"""Specialized Manager for Event resources."""
def __init__(self, client: APIClient) -> None:
"""Initialize EventManager object.
Args:
client: Connection to Podman service.
"""
self.client = client
def list(
self,
since: Union[datetime, int, None] = None,
until: Union[datetime, int, None] = None,
filters: Optional[Dict[str, Any]] = None,
decode: bool = False,
) -> Iterator[Union[str, Dict[str, Any]]]:
"""Report on networks.
Args:
decode: When True, decode stream into dict's. Default: False
filters: Criteria for including events.
since: Get events newer than this time.
until: Get events older than this time.
Yields:
When decode is True, Iterator[Dict[str, Any]]
When decode is False, Iterator[str]
"""
params = {
"filters": api.prepare_filters(filters),
"since": api.prepare_timestamp(since),
"stream": True,
"until": api.prepare_timestamp(until),
}
response = self.client.get("/events", params=params, stream=True)
response.raise_for_status()
for item in response.iter_lines():
if decode:
yield json.loads(item)
else:
yield item

View File

@ -0,0 +1,119 @@
"""Model and Manager for Image resources."""
import logging
from typing import Any, Dict, Iterator, List, Optional, Union
from podman import api
from podman.domain.manager import PodmanResource
from podman.errors import ImageNotFound
logger = logging.getLogger("podman.images")
class Image(PodmanResource):
"""Details and configuration for an Image managed by the Podman service."""
def __repr__(self) -> str:
return f"""<{self.__class__.__name__}: '{"', '".join(self.tags)}'>"""
@property
def labels(self):
"""dict[str, str]: Return labels associated with Image."""
image_labels = self.attrs.get("Labels")
if image_labels is None or len(image_labels) == 0:
return {}
return image_labels
@property
def tags(self):
"""list[str]: Return tags from Image."""
repo_tags = self.attrs.get("RepoTags")
if repo_tags is None or len(repo_tags) == 0:
return []
return [tag for tag in repo_tags if tag != "<none>:<none>"]
def history(self) -> List[Dict[str, Any]]:
"""Returns history of the Image.
Raises:
APIError: when service returns an error
"""
response = self.client.get(f"/images/{self.id}/history")
response.raise_for_status(not_found=ImageNotFound)
return response.json()
def remove(
self, **kwargs
) -> List[Dict[api.Literal["Deleted", "Untagged", "Errors", "ExitCode"], Union[str, int]]]:
"""Delete image from Podman service.
Podman only
Keyword Args:
force: Delete Image even if in use
noprune: Ignored.
Returns:
Report on which images were deleted and untagged, including any reported errors.
Raises:
ImageNotFound: when image does not exist
APIError: when service returns an error
"""
return self.manager.remove(self.id, **kwargs)
def save(
self,
chunk_size: Optional[int] = api.DEFAULT_CHUNK_SIZE,
named: Union[str, bool] = False, # pylint: disable=unused-argument
) -> Iterator[bytes]:
"""Returns Image as tarball.
Format is set to docker-archive, this allows load() to import this tarball.
Args:
chunk_size: If None, data will be streamed in received buffer size.
If not None, data will be returned in sized buffers. Default: 2MB
named: Ignored.
Raises:
APIError: when service returns an error
"""
response = self.client.get(
f"/images/{self.id}/get", params={"format": ["docker-archive"]}, stream=True
)
response.raise_for_status(not_found=ImageNotFound)
return response.iter_content(chunk_size=chunk_size)
def tag(
self,
repository: str,
tag: Optional[str],
force: bool = False, # pylint: disable=unused-argument
) -> bool:
"""Tag Image into repository.
Args:
repository: The repository for tagging Image.
tag: optional tag name.
force: Ignore client errors
Returns:
True, when operational succeeds.
Raises:
ImageNotFound: when service cannot find image
APIError: when service returns an error
"""
params = {"repo": repository, "tag": tag}
response = self.client.post(f"/images/{self.id}/tag", params=params)
if response.ok:
return True
if force and response.status_code <= 500:
return False
response.raise_for_status(not_found=ImageNotFound)
return False

View File

@ -0,0 +1,195 @@
"""Mixin for Image build support."""
import json
import logging
import pathlib
import random
import re
import shutil
import tempfile
from typing import Any, Dict, Iterator, List, Tuple
import itertools
from podman import api
from podman.domain.images import Image
from podman.errors import BuildError, PodmanError, ImageNotFound
logger = logging.getLogger("podman.images")
class BuildMixin:
"""Class providing build method for ImagesManager."""
# pylint: disable=too-many-locals,too-many-branches,too-few-public-methods,too-many-statements
def build(self, **kwargs) -> Tuple[Image, Iterator[bytes]]:
"""Returns built image.
Keyword Args:
path (str) Path to the directory containing the Dockerfile
fileobj A file object to use as the Dockerfile. (Or an IO object)
tag (str) A tag to add to the final image
quiet (bool) Whether to return the status
nocache (bool) Dont use the cache when set to True
rm (bool) Remove intermediate containers. Default True
timeout (int) HTTP timeout
custom_context (bool) Optional if using fileobj (ignored)
encoding (str) The encoding for a stream. Set to gzip for compressing (ignored)
pull (bool) Downloads any updates to the FROM image in Dockerfile
forcerm (bool) Always remove intermediate containers, even after unsuccessful builds
dockerfile (str) full path to the Dockerfile / Containerfile
buildargs (Mapping[str,str) A dictionary of build arguments
container_limits (Dict[str, Union[int,str]])
A dictionary of limits applied to each container created by the build process.
Valid keys:
- memory (int): set memory limit for build
- memswap (int): Total memory (memory + swap), -1 to disable swap
- cpushares (int): CPU shares (relative weight)
- cpusetcpus (str): CPUs in which to allow execution, For example, "0-3", "0,1"
- cpuperiod (int): CPU CFS (Completely Fair Scheduler) period (Podman only)
- cpuquota (int): CPU CFS (Completely Fair Scheduler) quota (Podman only)
shmsize (int) Size of /dev/shm in bytes. The size must be greater than 0.
If omitted the system uses 64MB
labels (Mapping[str,str]) A dictionary of labels to set on the image
cache_from (List[str]) A list of image's identifier used for build cache resolution
target (str) Name of the build-stage to build in a multi-stage Dockerfile
network_mode (str) networking mode for the run commands during build
squash (bool) Squash the resulting images layers into a single layer.
extra_hosts (Dict[str,str]) Extra hosts to add to /etc/hosts in building
containers, as a mapping of hostname to IP address.
platform (str) Platform in the format os[/arch[/variant]].
isolation (str) Isolation technology used during build. (ignored)
use_config_proxy (bool) (ignored)
http_proxy (bool) - Inject http proxy environment variables into container (Podman only)
layers (bool) - Cache intermediate layers during build.
output (str) - specifies if any custom build output is selected for following build.
outputformat (str) - The format of the output image's manifest and configuration data.
Returns:
first item is the podman.domain.images.Image built
second item is the build logs
Raises:
BuildError: when there is an error during the build
APIError: when service returns an error
TypeError: when neither path nor fileobj is not specified
"""
params = self._render_params(kwargs)
body = None
path = None
if "fileobj" in kwargs:
path = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with
filename = pathlib.Path(path.name) / params["dockerfile"]
with open(filename, "w", encoding='utf-8') as file:
shutil.copyfileobj(kwargs["fileobj"], file)
body = api.create_tar(anchor=path.name, gzip=kwargs.get("gzip", False))
elif "path" in kwargs:
filename = pathlib.Path(kwargs["path"]) / params["dockerfile"]
# The Dockerfile will be copied into the context_dir if needed
params["dockerfile"] = api.prepare_containerfile(kwargs["path"], str(filename))
excludes = api.prepare_containerignore(kwargs["path"])
body = api.create_tar(
anchor=kwargs["path"], exclude=excludes, gzip=kwargs.get("gzip", False)
)
post_kwargs = {}
if kwargs.get("timeout"):
post_kwargs["timeout"] = float(kwargs.get("timeout"))
response = self.client.post(
"/build",
params=params,
data=body,
headers={
"Content-type": "application/x-tar",
# "X-Registry-Config": "TODO",
},
stream=True,
**post_kwargs,
)
if hasattr(body, "close"):
body.close()
if hasattr(path, "cleanup"):
path.cleanup()
response.raise_for_status(not_found=ImageNotFound)
image_id = unknown = None
marker = re.compile(r"(^[0-9a-f]+)\n$")
report_stream, stream = itertools.tee(response.iter_lines())
for line in stream:
result = json.loads(line)
if "error" in result:
raise BuildError(result["error"], report_stream)
if "stream" in result:
match = marker.match(result["stream"])
if match:
image_id = match.group(1)
unknown = line
if image_id:
return self.get(image_id), report_stream
raise BuildError(unknown or "Unknown", report_stream)
@staticmethod
def _render_params(kwargs) -> Dict[str, List[Any]]:
"""Map kwargs to query parameters.
All unsupported kwargs are silently ignored.
"""
if "path" not in kwargs and "fileobj" not in kwargs:
raise TypeError("Either path or fileobj must be provided.")
if "gzip" in kwargs and "encoding" in kwargs:
raise PodmanError("Custom encoding not supported when gzip enabled.")
params = {
"dockerfile": kwargs.get("dockerfile"),
"forcerm": kwargs.get("forcerm"),
"httpproxy": kwargs.get("http_proxy"),
"networkmode": kwargs.get("network_mode"),
"nocache": kwargs.get("nocache"),
"platform": kwargs.get("platform"),
"pull": kwargs.get("pull"),
"q": kwargs.get("quiet"),
"remote": kwargs.get("remote"),
"rm": kwargs.get("rm"),
"shmsize": kwargs.get("shmsize"),
"squash": kwargs.get("squash"),
"t": kwargs.get("tag"),
"target": kwargs.get("target"),
"layers": kwargs.get("layers"),
"output": kwargs.get("output"),
"outputformat": kwargs.get("outputformat"),
}
if "buildargs" in kwargs:
params["buildargs"] = json.dumps(kwargs.get("buildargs"))
if "cache_from" in kwargs:
params["cachefrom"] = json.dumps(kwargs.get("cache_from"))
if "container_limits" in kwargs:
params["cpuperiod"] = kwargs["container_limits"].get("cpuperiod")
params["cpuquota"] = kwargs["container_limits"].get("cpuquota")
params["cpusetcpus"] = kwargs["container_limits"].get("cpusetcpus")
params["cpushares"] = kwargs["container_limits"].get("cpushares")
params["memory"] = kwargs["container_limits"].get("memory")
params["memswap"] = kwargs["container_limits"].get("memswap")
if "extra_hosts" in kwargs:
params["extrahosts"] = json.dumps(kwargs.get("extra_hosts"))
if "labels" in kwargs:
params["labels"] = json.dumps(kwargs.get("labels"))
if params["dockerfile"] is None:
params["dockerfile"] = f".containerfile.{random.getrandbits(160):x}"
# Remove any unset parameters
return dict(filter(lambda i: i[1] is not None, params.items()))

View File

@ -0,0 +1,421 @@
"""PodmanResource manager subclassed for Images."""
import io
import json
import logging
import urllib.parse
from typing import Any, Dict, Generator, Iterator, List, Mapping, Optional, Union
import requests
from podman import api
from podman.api import Literal
from podman.api.http_utils import encode_auth_header
from podman.domain.images import Image
from podman.domain.images_build import BuildMixin
from podman.domain.manager import Manager
from podman.domain.registry_data import RegistryData
from podman.errors import APIError, ImageNotFound
logger = logging.getLogger("podman.images")
class ImagesManager(BuildMixin, Manager):
"""Specialized Manager for Image resources."""
@property
def resource(self):
"""Type[podman.domain.images.Image]: prepare_model() will create Image classes."""
return Image
def exists(self, key: str) -> bool:
"""Return true when image exists."""
key = urllib.parse.quote_plus(key)
response = self.client.get(f"/images/{key}/exists")
return response.ok
def list(self, **kwargs) -> List[Image]:
"""Report on images.
Keyword Args:
name (str) Only show images belonging to the repository name
all (bool) Show intermediate image layers. By default, these are filtered out.
filters (Mapping[str, Union[str, List[str]]) Filters to be used on the image list.
Available filters:
- dangling (bool)
- label (Union[str, List[str]]): format either "key" or "key=value"
Raises:
APIError: when service returns an error
"""
params = {
"all": kwargs.get("all"),
"name": kwargs.get("name"),
"filters": api.prepare_filters(kwargs.get("filters")),
}
response = self.client.get("/images/json", params=params)
if response.status_code == requests.codes.not_found:
return []
response.raise_for_status()
return [self.prepare_model(attrs=i) for i in response.json()]
# pylint is flagging 'name' here vs. 'key' parameter in super.get()
def get(self, name: str) -> Image: # pylint: disable=arguments-differ,arguments-renamed
"""Returns an image by name or id.
Args:
name: Image id or name for which to search
Raises:
ImageNotFound: when image does not exist
APIError: when service returns an error
"""
name = urllib.parse.quote_plus(name)
response = self.client.get(f"/images/{name}/json")
response.raise_for_status(not_found=ImageNotFound)
return self.prepare_model(response.json())
def get_registry_data(
self,
name: str,
auth_config=Mapping[str, str], # pylint: disable=unused-argument
) -> RegistryData:
"""Returns registry data for an image.
Provided for compatibility
Args:
name: Image name
auth_config: Override configured credentials. Keys username and password are required.
Raises:
APIError: when service returns an error
"""
# FIXME populate attrs using auth_config
image = self.get(name)
return RegistryData(
image_name=name,
attrs=image.attrs,
client=self.client,
collection=self,
)
def load(self, data: bytes) -> Generator[Image, None, None]:
"""Restore an image previously saved.
Args:
data: Image to be loaded in tarball format.
Raises:
APIError: when service returns an error
"""
# TODO fix podman swagger cannot use this header!
# headers = {"Content-type": "application/x-www-form-urlencoded"}
response = self.client.post(
"/images/load", data=data, headers={"Content-type": "application/x-tar"}
)
response.raise_for_status()
body = response.json()
for item in body["Names"]:
yield self.get(item)
def prune(
self, filters: Optional[Mapping[str, Any]] = None
) -> Dict[Literal["ImagesDeleted", "SpaceReclaimed"], Any]:
"""Delete unused images.
The Untagged keys will always be "".
Args:
filters: Qualify Images to prune. Available filters:
- dangling (bool): when true, only delete unused and untagged images.
- until (str): Delete images older than this timestamp.
Raises:
APIError: when service returns an error
"""
response = self.client.post(
"/images/prune", params={"filters": api.prepare_filters(filters)}
)
response.raise_for_status()
deleted: List[Dict[str, str]] = []
error: List[str] = []
reclaimed: int = 0
for element in response.json():
if "Err" in element and element["Err"] is not None:
error.append(element["Err"])
else:
reclaimed += element["Size"]
deleted.append(
{
"Deleted": element["Id"],
"Untagged": "",
}
)
if len(error) > 0:
raise APIError(response.url, response=response, explanation="; ".join(error))
return {
"ImagesDeleted": deleted,
"SpaceReclaimed": reclaimed,
}
def prune_builds(self) -> Dict[Literal["CachesDeleted", "SpaceReclaimed"], Any]:
"""Delete builder cache.
Method included to complete API, the operation always returns empty
CacheDeleted and zero SpaceReclaimed.
"""
return {"CachesDeleted": [], "SpaceReclaimed": 0}
def push(
self, repository: str, tag: Optional[str] = None, **kwargs
) -> Union[str, Iterator[Union[str, Dict[str, Any]]]]:
"""Push Image or repository to the registry.
Args:
repository: Target repository for push
tag: Tag to push, if given
Keyword Args:
auth_config (Mapping[str, str]: Override configured credentials. Must include
username and password keys.
decode (bool): return data from server as Dict[str, Any]. Ignored unless stream=True.
destination (str): alternate destination for image. (Podman only)
stream (bool): return output as blocking generator. Default: False.
tlsVerify (bool): Require TLS verification.
Raises:
APIError: when service returns an error
"""
auth_config: Optional[Dict[str, str]] = kwargs.get("auth_config")
headers = {
# A base64url-encoded auth configuration
"X-Registry-Auth": encode_auth_header(auth_config)
if auth_config
else ""
}
params = {
"destination": kwargs.get("destination"),
"tlsVerify": kwargs.get("tlsVerify"),
}
name = f'{repository}:{tag}' if tag else repository
name = urllib.parse.quote_plus(name)
response = self.client.post(f"/images/{name}/push", params=params, headers=headers)
response.raise_for_status(not_found=ImageNotFound)
tag_count = 0 if tag is None else 1
body = [
{
"status": f"Pushing repository {repository} ({tag_count} tags)",
},
{
"status": "Pushing",
"progressDetail": {},
"id": repository,
},
]
stream = kwargs.get("stream", False)
decode = kwargs.get("decode", False)
if stream:
return self._push_helper(decode, body)
with io.StringIO() as buffer:
for entry in body:
buffer.write(json.dumps(entry) + "\n")
return buffer.getvalue()
@staticmethod
def _push_helper(
decode: bool, body: List[Dict[str, Any]]
) -> Iterator[Union[str, Dict[str, Any]]]:
"""Helper needed to allow push() to return either a generator or a str."""
for entry in body:
if decode:
yield entry
else:
yield json.dumps(entry)
# pylint: disable=too-many-locals,too-many-branches
def pull(
self, repository: str, tag: Optional[str] = None, all_tags: bool = False, **kwargs
) -> Union[Image, List[Image], Iterator[str]]:
"""Request Podman service to pull image(s) from repository.
Args:
repository: Repository to pull from
tag: Image tag to pull. Default: "latest".
all_tags: pull all image tags from repository.
Keyword Args:
auth_config (Mapping[str, str]) Override the credentials that are found in the
config for this request. auth_config should contain the username and password
keys to be valid.
platform (str) Platform in the format os[/arch[/variant]]
tls_verify (bool) - Require TLS verification. Default: True.
stream (bool) - When True, the pull progress will be published as received.
Default: False.
Returns:
When stream is True, return a generator publishing the service pull progress.
If all_tags is True, return list of Image's rather than Image pulled.
Raises:
APIError: when service returns an error
"""
if tag is None or len(tag) == 0:
tokens = repository.split(":")
if len(tokens) == 2:
repository = tokens[0]
tag = tokens[1]
else:
tag = "latest"
auth_config: Optional[Dict[str, str]] = kwargs.get("auth_config")
headers = {
# A base64url-encoded auth configuration
"X-Registry-Auth": encode_auth_header(auth_config)
if auth_config
else ""
}
params = {
"reference": repository,
"tlsVerify": kwargs.get("tls_verify"),
}
if all_tags:
params["allTags"] = True
else:
params["reference"] = f"{repository}:{tag}"
if "platform" in kwargs:
tokens = kwargs.get("platform").split("/")
if 1 < len(tokens) > 3:
raise ValueError(f'\'{kwargs.get("platform")}\' is not a legal platform.')
params["OS"] = tokens[0]
if len(tokens) > 1:
params["Arch"] = tokens[1]
if len(tokens) > 2:
params["Variant"] = tokens[2]
stream = kwargs.get("stream", False)
response = self.client.post("/images/pull", params=params, stream=stream, headers=headers)
response.raise_for_status(not_found=ImageNotFound)
if stream:
return response.iter_lines()
for item in response.iter_lines():
obj = json.loads(item)
if all_tags and "images" in obj:
images: List[Image] = []
for name in obj["images"]:
images.append(self.get(name))
return images
if "id" in obj:
return self.get(obj["id"])
return self.resource()
def remove(
self,
image: Union[Image, str],
force: Optional[bool] = None,
noprune: bool = False, # pylint: disable=unused-argument
) -> List[Dict[Literal["Deleted", "Untagged", "Errors", "ExitCode"], Union[str, int]]]:
"""Delete image from Podman service.
Args:
image: Name or Id of Image to remove
force: Delete Image even if in use
noprune: Ignored.
Raises:
ImageNotFound: when image does not exist
APIError: when service returns an error
"""
if isinstance(image, Image):
image = image.id
response = self.client.delete(f"/images/{image}", params={"force": force})
response.raise_for_status(not_found=ImageNotFound)
body = response.json()
results: List[Dict[str, Union[int, str]]] = []
for key in ("Deleted", "Untagged", "Errors"):
if key in body:
for element in body[key]:
results.append({key: element})
results.append({"ExitCode": body["ExitCode"]})
return results
def search(self, term: str, **kwargs) -> List[Dict[str, Any]]:
"""Search Images on registries.
Args:
term: Used to target Image results.
Keyword Args:
filters (Mapping[str, List[str]): Refine results of search. Available filters:
- is-automated (bool): Image build is automated.
- is-official (bool): Image build is owned by product provider.
- stars (int): Image has at least this number of stars.
noTrunc (bool): Do not truncate any result string. Default: True.
limit (int): Maximum number of results.
Raises:
APIError: when service returns an error
"""
params = {
"filters": api.prepare_filters(kwargs.get("filters")),
"limit": kwargs.get("limit"),
"noTrunc": True,
"term": [term],
}
response = self.client.get("/images/search", params=params)
response.raise_for_status(not_found=ImageNotFound)
return response.json()
def scp(
self,
source: str,
dest: Optional[str] = None,
quiet: Optional[bool] = False,
) -> str:
"""Securely copy images between hosts.
Args:
source: source connection/image
dest: destination connection/image
quiet: do not print save/load output, only the image
Returns:
A string containing the loaded image
Raises:
APIError: when service returns an error
"""
params = {}
if dest is not None and quiet:
params = {"destination": dest, "quiet": quiet}
elif quiet:
params = {"quiet": quiet}
response = self.client.post(f"/images/scp/{source}", params=params)
response.raise_for_status()
return response.json()

View File

@ -0,0 +1,60 @@
"""Classes to support Internet Protocol Address Management.
Provided for compatibility
"""
from typing import Any, List, Mapping, Optional
class IPAMPool(dict):
"""Collect IP Network configuration."""
def __init__(
self,
subnet: Optional[str] = None,
iprange: Optional[str] = None,
gateway: Optional[str] = None,
aux_addresses: Optional[Mapping[str, str]] = None,
):
"""Create IPAMPool.
Args:
subnet: IP subnet in CIDR format for this network.
iprange: IP range in CIDR format for endpoints on this network.
gateway: IP gateway address for this network.
aux_addresses: Ignored.
"""
super().__init__()
self.update(
{
"AuxiliaryAddresses": aux_addresses,
"Gateway": gateway,
"IPRange": iprange,
"Subnet": subnet,
}
)
class IPAMConfig(dict):
"""Collect IP Address configuration."""
def __init__(
self,
driver: Optional[str] = "default",
pool_configs: Optional[List[IPAMPool]] = None,
options: Optional[Mapping[str, Any]] = None,
):
"""Create IPAMConfig.
Args:
driver: Network driver to use with this network.
pool_configs: Network and endpoint information. Podman only supports one pool.
options: Options to provide to the Network driver.
"""
super().__init__()
self.update(
{
"Config": pool_configs or [],
"Driver": driver,
"Options": options or {},
}
)

View File

@ -0,0 +1,121 @@
"""Base classes for PodmanResources and Manager's."""
from abc import ABC, abstractmethod
from collections import abc
from typing import Any, List, Mapping, Optional, TypeVar, Union
from podman.api.client import APIClient
# Methods use this Type when a subclass of PodmanResource is expected.
PodmanResourceType: TypeVar = TypeVar("PodmanResourceType", bound="PodmanResource")
class PodmanResource(ABC):
"""Base class for representing resource of a Podman service.
Attributes:
attrs: Mapping of attributes for resource from Podman service
"""
def __init__(
self,
attrs: Optional[Mapping[str, Any]] = None,
client: Optional[APIClient] = None,
collection: Optional["Manager"] = None,
):
"""Initialize base class for PodmanResource's.
Args:
attrs: Mapping of attributes for resource from Podman service.
client: Configured connection to a Podman service.
collection: Manager of this category of resource, named `collection` for compatibility
"""
super().__init__()
self.client = client
self.manager = collection
self.attrs = {}
if attrs is not None:
self.attrs.update(attrs)
def __repr__(self):
return f"<{self.__class__.__name__}: {self.short_id}>"
def __eq__(self, other):
return isinstance(other, self.__class__) and self.id == other.id
def __hash__(self):
return hash(f"{self.__class__.__name__}:{self.id}")
@property
def id(self): # pylint: disable=invalid-name
"""str: Returns the identifier for the object."""
return self.attrs.get("Id")
@property
def short_id(self):
"""str: Returns truncated identifier. 'sha256' preserved when included in the id.
No attempt is made to ensure the returned value is semantically meaningful
for all resources.
"""
if self.id.startswith("sha256:"):
return self.id[:17]
return self.id[:10]
def reload(self) -> None:
"""Refresh this object's data from the service."""
latest = self.manager.get(self.id)
self.attrs = latest.attrs
class Manager(ABC):
"""Base class for representing a Manager of resources for a Podman service."""
@property
@abstractmethod
def resource(self):
"""Type[PodmanResource]: Class which the factory method prepare_model() will use."""
def __init__(self, client: APIClient = None) -> None:
"""Initialize Manager() object.
Args:
client: APIClient() configured to connect to Podman service.
"""
super().__init__()
self.client = client
@abstractmethod
def exists(self, key: str) -> bool:
"""Returns True if resource exists.
Podman only.
Notes:
This method does _not_ provide any mutex mechanism.
"""
@abstractmethod
def get(self, key: str) -> PodmanResourceType:
"""Returns representation of resource."""
@abstractmethod
def list(self, **kwargs) -> List[PodmanResourceType]:
"""Returns list of resources."""
def prepare_model(self, attrs: Union[PodmanResource, Mapping[str, Any]]) -> PodmanResourceType:
"""Create a model from a set of attributes."""
# Refresh existing PodmanResource.
if isinstance(attrs, PodmanResource):
attrs.client = self.client
attrs.collection = self
return attrs
# Instantiate new PodmanResource from Mapping[str, Any]
if isinstance(attrs, abc.Mapping):
# TODO Determine why pylint is reporting typing.Type not callable
# pylint: disable=not-callable
return self.resource(attrs=attrs, client=self.client, collection=self)
raise Exception(f"Can't create {self.resource.__name__} from {attrs}")

View File

@ -0,0 +1,232 @@
"""Model and Manager for Manifest resources."""
import logging
import urllib.parse
from contextlib import suppress
from typing import Any, Dict, List, Optional, Union
from podman import api
from podman.domain.images import Image
from podman.domain.manager import Manager, PodmanResource
from podman.errors import ImageNotFound
logger = logging.getLogger("podman.manifests")
class Manifest(PodmanResource):
"""Details and configuration for a manifest managed by the Podman service."""
@property
def id(self):
"""str: Returns the identifier of the manifest list."""
with suppress(KeyError, TypeError, IndexError):
digest = self.attrs["manifests"][0]["digest"]
if digest.startswith("sha256:"):
return digest[7:]
return digest
return self.name
@property
def name(self):
"""str: Returns the human-formatted identifier of the manifest list."""
return self.attrs.get("names")
@property
def quoted_name(self):
"""str: name quoted as path parameter."""
return urllib.parse.quote_plus(self.name)
@property
def names(self):
"""List[str]: Returns the identifier of the manifest."""
return self.name
@property
def media_type(self):
"""Optional[str]: Returns the Media/MIME type for this manifest."""
return self.attrs.get("mediaType")
@property
def version(self):
"""int: Returns the schema version type for this manifest."""
return self.attrs.get("schemaVersion")
def add(self, images: List[Union[Image, str]], **kwargs) -> None:
"""Add Image to manifest list.
Args:
images: List of Images to be added to manifest.
Keyword Args:
all (bool):
annotation (Dict[str, str]):
arch (str):
features (List[str]):
os (str):
os_version (str):
variant (str):
Raises:
ImageNotFound: when Image(s) could not be found
APIError: when service reports an error
"""
data = {
"all": kwargs.get("all"),
"annotation": kwargs.get("annotation"),
"arch": kwargs.get("arch"),
"features": kwargs.get("features"),
"images": [],
"os": kwargs.get("os"),
"os_version": kwargs.get("os_version"),
"variant": kwargs.get("variant"),
"operation": "update",
}
for item in images:
if isinstance(item, Image):
item = item.attrs["RepoTags"][0]
data["images"].append(item)
data = api.prepare_body(data)
response = self.client.put(f"/manifests/{self.quoted_name}", data=data)
response.raise_for_status(not_found=ImageNotFound)
return self.reload()
def push(
self,
destination: str,
all: Optional[bool] = None, # pylint: disable=redefined-builtin
) -> None:
"""Push a manifest list or image index to a registry.
Args:
destination: Target for push.
all: Push all images.
Raises:
NotFound: when the Manifest could not be found
APIError: when service reports an error
"""
params = {
"all": all,
"destination": destination,
}
response = self.client.post(f"/manifests/{self.quoted_name}/push", params=params)
response.raise_for_status()
def remove(self, digest: str) -> None:
"""Remove Image digest from manifest list.
Args:
digest: Image digest to be removed. Should a full Image reference be provided,
the digest will be parsed out.
Raises:
ImageNotFound: when the Image could not be found
APIError: when service reports an error
"""
if "@" in digest:
digest = digest.split("@", maxsplit=2)[1]
data = {"operation": "remove", "images": [digest]}
data = api.prepare_body(data)
response = self.client.put(f"/manifests/{self.quoted_name}", data=data)
response.raise_for_status(not_found=ImageNotFound)
return self.reload()
def reload(self) -> None:
"""Refresh this object's data from the service."""
latest = self.manager.get(self.name)
self.attrs = latest.attrs
class ManifestsManager(Manager):
"""Specialized Manager for Manifest resources."""
@property
def resource(self):
"""Type[Manifest]: prepare_model() will create Manifest classes."""
return Manifest
def create(
self,
name: str,
images: Optional[List[Union[Image, str]]] = None,
all: Optional[bool] = None, # pylint: disable=redefined-builtin
) -> Manifest:
"""Create a Manifest.
Args:
name: Name of manifest list.
images: Images or Image identifiers to be included in the manifest.
all: When True, add all contents from images given.
Raises:
ValueError: when no names are provided
NotFoundImage: when a given image does not exist
"""
params: Dict[str, Any] = {}
if images is not None:
params["images"] = []
for item in images:
if isinstance(item, Image):
item = item.attrs["RepoTags"][0]
params["images"].append(item)
if all is not None:
params["all"] = all
name_quoted = urllib.parse.quote_plus(name)
response = self.client.post(f"/manifests/{name_quoted}", params=params)
response.raise_for_status(not_found=ImageNotFound)
body = response.json()
manifest = self.get(body["Id"])
manifest.attrs["names"] = name
if manifest.attrs["manifests"] is None:
manifest.attrs["manifests"] = []
return manifest
def exists(self, key: str) -> bool:
key = urllib.parse.quote_plus(key)
response = self.client.get(f"/manifests/{key}/exists")
return response.ok
def get(self, key: str) -> Manifest:
"""Returns the manifest by name.
To have Manifest conform with other PodmanResource's, we use the key that
retrieved the Manifest be its name.
Args:
key: Manifest name for which to search
Raises:
NotFound: when manifest could not be found
APIError: when service reports an error
"""
quoted_key = urllib.parse.quote_plus(key)
response = self.client.get(f"/manifests/{quoted_key}/json")
response.raise_for_status()
body = response.json()
if "names" not in body:
body["names"] = key
return self.prepare_model(attrs=body)
def list(self, **kwargs) -> List[Manifest]:
"""Not Implemented."""
raise NotImplementedError("Podman service currently does not support listing manifests.")
def remove(self, name: Union[Manifest, str]) -> Dict[str, Any]:
"""Delete the manifest list from the Podman service."""
if isinstance(name, Manifest):
name = name.name
response = self.client.delete(f"/manifests/{name}")
response.raise_for_status(not_found=ImageNotFound)
body = response.json()
body["ExitCode"] = response.status_code
return body

View File

@ -0,0 +1,144 @@
"""Model for Network resources.
Example:
with PodmanClient(base_url="unix:///run/user/1000/podman/podman.sock") as client:
net = client.networks.get("db_network")
print(net.name, "\n")
"""
import hashlib
import json
import logging
from contextlib import suppress
from typing import Optional, Union
from podman.domain.containers import Container
from podman.domain.containers_manager import ContainersManager
from podman.domain.manager import PodmanResource
logger = logging.getLogger("podman.networks")
class Network(PodmanResource):
"""Details and configuration for a networks managed by the Podman service.
Attributes:
attrs (Dict[str, Any]): Attributes of Network reported from Podman service
"""
@property
def id(self): # pylint: disable=invalid-name
"""str: Returns the identifier of the network."""
with suppress(KeyError):
return self.attrs["Id"]
with suppress(KeyError):
sha256 = hashlib.sha256(self.attrs["name"].encode("ascii"))
return sha256.hexdigest()
return None
@property
def containers(self):
"""List[Container]: Returns list of Containers connected to network."""
with suppress(KeyError):
container_manager = ContainersManager(client=self.client)
return [container_manager.get(ident) for ident in self.attrs["Containers"].keys()]
return []
@property
def name(self):
"""str: Returns the name of the network."""
if "Name" in self.attrs:
return self.attrs["Name"]
if "name" in self.attrs:
return self.attrs["name"]
raise KeyError("Neither 'name' or 'Name' attribute found.")
def reload(self):
"""Refresh this object's data from the service."""
latest = self.manager.get(self.name)
self.attrs = latest.attrs
def connect(self, container: Union[str, Container], *_, **kwargs) -> None:
"""Connect given container to this network.
Args:
container: To add to this Network
Keyword Args:
aliases (List[str]): Aliases to add for this endpoint
driver_opt (Dict[str, Any]): Options to provide to network driver
ipv4_address (str): IPv4 address for given Container on this network
ipv6_address (str): IPv6 address for given Container on this network
link_local_ips (List[str]): list of link-local addresses
links (List[Union[str, Containers]]): Ignored
Raises:
APIError: when Podman service reports an error
"""
if isinstance(container, Container):
container = container.id
# TODO Talk with baude on which IPAddress field is needed...
ipam = dict(
IPv4Address=kwargs.get("ipv4_address"),
IPv6Address=kwargs.get("ipv6_address"),
Links=kwargs.get("link_local_ips"),
)
ipam = {k: v for (k, v) in ipam.items() if not (v is None or len(v) == 0)}
endpoint_config = dict(
Aliases=kwargs.get("aliases"),
DriverOpts=kwargs.get("driver_opt"),
IPAddress=kwargs.get("ipv4_address", kwargs.get("ipv6_address")),
IPAMConfig=ipam,
Links=kwargs.get("link_local_ips"),
NetworkID=self.id,
)
endpoint_config = {
k: v for (k, v) in endpoint_config.items() if not (v is None or len(v) == 0)
}
data = dict(Container=container, EndpointConfig=endpoint_config)
data = {k: v for (k, v) in data.items() if not (v is None or len(v) == 0)}
response = self.client.post(
f"/networks/{self.name}/connect",
data=json.dumps(data),
headers={"Content-type": "application/json"},
)
response.raise_for_status()
def disconnect(self, container: Union[str, Container], **kwargs) -> None:
"""Disconnect given container from this network.
Args:
container: To remove from this Network
Keyword Args:
force (bool): Force operation
Raises:
APIError: when Podman service reports an error
"""
if isinstance(container, Container):
container = container.id
data = {"Container": container, "Force": kwargs.get("force")}
response = self.client.post(f"/networks/{self.name}/disconnect", data=json.dumps(data))
response.raise_for_status()
def remove(self, force: Optional[bool] = None, **kwargs) -> None:
"""Remove this network.
Args:
force: Remove network and any associated containers
Raises:
APIError: when Podman service reports an error
"""
self.manager.remove(self.name, force=force, **kwargs)

View File

@ -0,0 +1,203 @@
"""PodmanResource manager subclassed for Network resources.
Classes and methods for manipulating network resources via Podman API service.
Example:
with PodmanClient(base_url="unix:///run/user/1000/podman/podman.sock") as client:
for net in client.networks.list():
print(net.id, "\n")
"""
import ipaddress
import logging
import sys
from contextlib import suppress
from typing import Any, Dict, List, Optional
from podman import api
from podman.api import http_utils
from podman.domain.manager import Manager
from podman.domain.networks import Network
from podman.errors import APIError
logger = logging.getLogger("podman.networks")
class NetworksManager(Manager):
"""Specialized Manager for Network resources."""
@property
def resource(self):
"""Type[Network]: prepare_model() will create Network classes."""
return Network
def create(self, name: str, **kwargs) -> Network:
"""Create a Network resource.
Args:
name: Name of network to be created
Keyword Args:
attachable (bool): Ignored, always False.
check_duplicate (bool): Ignored, always False.
dns_enabled (bool): When True, do not provision DNS for this network.
driver (str): Which network driver to use when creating network.
enable_ipv6 (bool): Enable IPv6 on the network.
ingress (bool): Ignored, always False.
internal (bool): Restrict external access to the network.
ipam (IPAMConfig): Optional custom IP scheme for the network.
labels (Dict[str, str]): Map of labels to set on the network.
options (Dict[str, Any]): Driver options.
scope (str): Ignored, always "local".
Raises:
APIError: when Podman service reports an error
"""
data = {
"name": name,
"driver": kwargs.get("driver"),
"dns_enabled": kwargs.get("dns_enabled"),
"subnets": kwargs.get("subnets"),
"ipv6_enabled": kwargs.get("enable_ipv6"),
"internal": kwargs.get("internal"),
"labels": kwargs.get("labels"),
"options": kwargs.get("options"),
}
with suppress(KeyError):
self._prepare_ipam(data, kwargs["ipam"])
response = self.client.post(
"/networks/create",
data=http_utils.prepare_body(data),
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
sys.stderr.write(str(response.json()))
return self.prepare_model(attrs=response.json())
def _prepare_ipam(self, data: Dict[str, Any], ipam: Dict[str, Any]):
if "Config" not in ipam:
return
data["subnets"] = []
for cfg in ipam["Config"]:
subnet = {
"gateway": cfg.get("Gateway"),
"subnet": cfg.get("Subnet"),
}
with suppress(KeyError):
net = ipaddress.ip_network(cfg["IPRange"])
subnet["lease_range"] = {
"start_ip": str(net[1]),
"end_ip": str(net[-2]),
}
data["subnets"].append(subnet)
def exists(self, key: str) -> bool:
response = self.client.get(f"/networks/{key}/exists")
return response.ok
def get(self, key: str) -> Network:
"""Return information for the network_id.
Args:
key: Network name or id.
Raises:
NotFound: when Network does not exist
APIError: when error returned by service
"""
response = self.client.get(f"/networks/{key}")
response.raise_for_status()
return self.prepare_model(attrs=response.json())
def list(self, **kwargs) -> List[Network]:
"""Report on networks.
Keyword Args:
names (List[str]): List of names to filter by.
ids (List[str]): List of identifiers to filter by.
filters (Mapping[str,str]): Criteria for listing networks. Available filters:
- driver="bridge": Matches a network's driver. Only "bridge" is supported.
- label=(Union[str, List[str]]): format either "key", "key=value"
or a list of such.
- type=(str): Filters networks by type, legal values are:
- "custom"
- "builtin"
- plugin=(List[str]]): Matches CNI plugins included in a network, legal
values are (Podman only):
- bridge
- portmap
- firewall
- tuning
- dnsname
- macvlan
greedy (bool): Fetch more details for each network individually.
You might want this to get the containers attached to them. Ignored.
Raises:
APIError: when error returned by service
"""
filters = kwargs.get("filters", {})
filters["name"] = kwargs.get("names")
filters["id"] = kwargs.get("ids")
filters = api.prepare_filters(filters)
params = {"filters": filters}
response = self.client.get("/networks/json", params=params)
response.raise_for_status()
return [self.prepare_model(i) for i in response.json()]
def prune(
self, filters: Optional[Dict[str, Any]] = None
) -> Dict[api.Literal["NetworksDeleted", "SpaceReclaimed"], Any]:
"""Delete unused Networks.
SpaceReclaimed always reported as 0
Args:
filters: Criteria for selecting volumes to delete. Ignored.
Raises:
APIError: when service reports error
"""
response = self.client.post("/networks/prune", filters=api.prepare_filters(filters))
response.raise_for_status()
deleted: List[str] = []
for item in response.json():
if item["Error"] is not None:
raise APIError(
item["Error"],
response=response,
explanation=f"""Failed to prune network '{item["Name"]}'""",
)
deleted.append(item["Name"])
return {"NetworksDeleted": deleted, "SpaceReclaimed": 0}
def remove(self, name: [Network, str], force: Optional[bool] = None) -> None:
"""Remove Network resource.
Args:
name: Identifier of Network to delete.
force: Remove network and any associated containers
Raises:
APIError: when Podman service reports an error
"""
if isinstance(name, Network):
name = name.name
response = self.client.delete(f"/networks/{name}", params={"force": force})
response.raise_for_status()

View File

@ -0,0 +1,119 @@
"""Model and Manager for Pod resources."""
import logging
from typing import Any, Dict, Optional, Tuple, Union
from podman.domain.manager import PodmanResource
_Timeout = Union[None, float, Tuple[float, float], Tuple[float, None]]
logger = logging.getLogger("podman.pods")
class Pod(PodmanResource):
"""Details and configuration for a pod managed by the Podman service."""
@property
def id(self): # pylint: disable=invalid-name
return self.attrs.get("ID", self.attrs.get("Id"))
@property
def name(self):
"""str: Returns name of pod."""
return self.attrs.get("Name")
def kill(self, signal: Union[str, int, None] = None) -> None:
"""Send signal to pod.
Args:
signal: To be sent to pod.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
response = self.client.post(f"/pods/{self.id}/kill", params={"signal": signal})
response.raise_for_status()
def pause(self) -> None:
"""Pause pod.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
response = self.client.post(f"/pods/{self.id}/pause")
response.raise_for_status()
def remove(self, force: Optional[bool] = None) -> None:
"""Delete pod.
Args:
force: When True, stop and delete all containers in pod before deleting pod.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
self.manager.remove(self.id, force=force)
def restart(self) -> None:
"""Restart pod.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
response = self.client.post(f"/pods/{self.id}/restart")
response.raise_for_status()
def start(self) -> None:
"""Start pod.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
response = self.client.post(f"/pods/{self.id}/start")
response.raise_for_status()
def stop(self, timeout: _Timeout = None) -> None:
"""Stop pod.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
params = {"t": timeout}
response = self.client.post(f"/pods/{self.id}/stop", params=params)
response.raise_for_status()
def top(self, **kwargs) -> Dict[str, Any]:
"""Report on running processes in pod.
Keyword Args:
ps_args (str): Optional arguments passed to ps.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
params = {
"ps_args": kwargs.get("ps_args"),
"stream": False,
}
response = self.client.get(f"/pods/{self.id}/top", params=params)
response.raise_for_status()
if len(response.text) == 0:
return {"Processes": [], "Titles": []}
return response.json()
def unpause(self) -> None:
"""Unpause pod.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
response = self.client.post(f"/pods/{self.id}/unpause")
response.raise_for_status()

View File

@ -0,0 +1,151 @@
"""PodmanResource manager subclassed for Networks."""
import json
import logging
from typing import Any, Dict, List, Optional, Union
from podman import api
from podman.domain.manager import Manager
from podman.domain.pods import Pod
from podman.errors import APIError
logger = logging.getLogger("podman.pods")
class PodsManager(Manager):
"""Specialized Manager for Pod resources."""
@property
def resource(self):
"""Type[Pod]: prepare_model() will create Pod classes."""
return Pod
def create(self, name: str, **kwargs) -> Pod:
"""Create a Pod.
Keyword Args:
See (API documentation)[
https://docs.podman.io/en/latest/_static/api.html#operation/CreatePod] for
complete list of keywords.
"""
data = {} if kwargs is None else kwargs.copy()
data["name"] = name
response = self.client.post("/pods/create", data=json.dumps(data))
response.raise_for_status()
body = response.json()
return self.get(body["Id"])
def exists(self, key: str) -> bool:
"""Returns True, when pod exists."""
response = self.client.get(f"/pods/{key}/exists")
return response.ok
# pylint is flagging 'pod_id' here vs. 'key' parameter in super.get()
def get(self, pod_id: str) -> Pod: # pylint: disable=arguments-differ,arguments-renamed
"""Return information for Pod by name or id.
Args:
pod_id: Pod name or id.
Raises:
NotFound: when network does not exist
APIError: when error returned by service
"""
response = self.client.get(f"/pods/{pod_id}/json")
response.raise_for_status()
return self.prepare_model(attrs=response.json())
def list(self, **kwargs) -> List[Pod]:
"""Report on pods.
Keyword Args:
filters (Mapping[str, str]): Criteria for listing pods. Available filters:
- ctr-ids (List[str]): List of container ids to filter by.
- ctr-names (List[str]): List of container names to filter by.
- ctr-number (List[int]): list pods with given number of containers.
- ctr-status (List[str]): List pods with containers in given state.
Legal values are: "created", "running", "paused", "stopped",
"exited", or "unknown"
- id (str) - List pod with this id.
- name (str) - List pod with this name.
- status (List[str]): List pods in given state. Legal values are:
"created", "running", "paused", "stopped", "exited", or "unknown"
- label (List[str]): List pods with given labels.
- network (List[str]): List pods associated with given Network Ids (not Names).
Raises:
APIError: when an error returned by service
"""
params = {"filters": api.prepare_filters(kwargs.get("filters"))}
response = self.client.get("/pods/json", params=params)
response.raise_for_status()
return [self.prepare_model(attrs=i) for i in response.json()]
def prune(self, filters: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Delete unused Pods.
Returns:
Dictionary Keys:
- PodsDeleted (List[str]): List of pod ids deleted.
- SpaceReclaimed (int): Always zero.
Raises:
APIError: when service reports error
"""
response = self.client.post("/pods/prune", params={"filters": api.prepare_filters(filters)})
response.raise_for_status()
deleted: List[str] = []
for item in response.json():
if item["Err"] is not None:
raise APIError(
item["Err"],
response=response,
explanation=f"""Failed to prune network '{item["Id"]}'""",
)
deleted.append(item["Id"])
return {"PodsDeleted": deleted, "SpaceReclaimed": 0}
def remove(self, pod_id: Union[Pod, str], force: Optional[bool] = None) -> None:
"""Delete pod.
Args:
pod_id: Identifier of Pod to delete.
force: When True, stop and delete all containers in pod before deleting pod.
Raises:
NotFound: when pod not found
APIError: when service reports an error
Notes:
Podman only.
"""
if isinstance(pod_id, Pod):
pod_id = pod_id.id
response = self.client.delete(f"/pods/{pod_id}", params={"force": force})
response.raise_for_status()
def stats(self, **kwargs) -> Dict[str, Any]:
"""Resource usage statistics for the containers in pods.
Keyword Args:
all (bool): Provide statistics for all running pods.
name (Union[str, List[str]]): Pods to include in report.
Raises:
NotFound: when pod not found
APIError: when service reports an error
"""
if "all" in kwargs and "name" in kwargs:
raise ValueError("Keywords 'all' and 'name' are mutually exclusive.")
params = {
"all": kwargs.get("all"),
"namesOrIDs": kwargs.get("name"),
}
response = self.client.get("/pods/stats", params=params)
response.raise_for_status()
return response.json()

View File

@ -0,0 +1,86 @@
"""Module for tracking registry metadata."""
import logging
from typing import Any, Mapping, Optional, Union
from podman import api
from podman.domain.images import Image
from podman.domain.manager import PodmanResource
from podman.errors import InvalidArgument
logger = logging.getLogger("podman.images")
class RegistryData(PodmanResource):
"""Registry metadata about Image."""
def __init__(self, image_name: str, *args, **kwargs) -> None:
"""Initialize RegistryData object.
Args:
image_name: Name of Image.
Keyword Args:
client (APIClient): Configured connection to a Podman service.
collection (Manager): Manager of this category of resource,
named `collection` for compatibility
"""
super().__init__(*args, **kwargs)
self.image_name = image_name
self.attrs = kwargs.get("attrs")
if self.attrs is None:
self.attrs = self.manager.get(image_name).attrs
def pull(self, platform: Optional[str] = None) -> Image:
"""Returns Image pulled by identifier.
Args:
platform: Platform for which to pull Image. Default: None (all platforms.)
"""
repository = api.parse_repository(self.image_name)
return self.manager.pull(repository, tag=self.id, platform=platform)
def has_platform(self, platform: Union[str, Mapping[str, Any]]) -> bool:
"""Returns True if platform is available for Image.
Podman API does not support "variant" therefore it is ignored.
Args:
platform: Name as os[/arch[/variant]] or Mapping[str,Any]
Returns:
True if platform is available
Raises:
InvalidArgument: when platform value is not valid
APIError: when service reports an error
"""
invalid_platform = InvalidArgument(f"'{platform}' is not a valid platform descriptor.")
if platform is None:
platform = {}
if isinstance(platform, dict):
if not {"os", "architecture"} <= platform.keys():
version = self.client.version()
platform["os"] = platform.get("os", version["Os"])
platform["architecture"] = platform.get("architecture", version["Arch"])
elif isinstance(platform, str):
elements = platform.split("/")
if 1 < len(elements) > 3:
raise invalid_platform
platform = {"os": elements[0]}
if len(elements) > 2:
platform["variant"] = elements[2]
if len(elements) > 1:
platform["architecture"] = elements[1]
else:
raise invalid_platform
return (
# Variant not carried in libpod attrs
platform["os"] == self.attrs["Os"]
and platform["architecture"] == self.attrs["Architecture"]
)

View File

@ -0,0 +1,139 @@
"""Model and Manager for Secrets resources."""
from contextlib import suppress
from typing import Any, List, Mapping, Optional, Union
from podman.api import APIClient
from podman.domain.manager import Manager, PodmanResource
class Secret(PodmanResource):
"""Details and configuration for a secret registered with the Podman service."""
def __repr__(self):
return f"<{self.__class__.__name__}: {self.name}>"
@property
def id(self): # pylint: disable=invalid-name
return self.attrs.get("ID")
@property
def name(self):
"""str: name of the secret."""
with suppress(KeyError):
return self.attrs['Spec']['Name']
return ""
def remove(
self,
all: Optional[bool] = None, # pylint: disable=redefined-builtin
):
"""Delete secret.
Args:
all: When True, delete all secrets.
Raises:
NotFound: when Secret does not exist
APIError: when error returned by service
"""
self.manager.remove(self.id, all=all)
class SecretsManager(Manager):
"""Specialized Manager for Secret resources."""
@property
def resource(self):
"""Type[Secret]: prepare_model() will create Secret classes."""
return Secret
def __init__(self, client: APIClient):
"""Initialize SecretsManager object.
Args:
client: Connection to Podman service.
"""
super().__init__(client)
def exists(self, key: str) -> bool:
response = self.client.get(f"/secrets/{key}/json")
return response.ok
# pylint is flagging 'secret_id' here vs. 'key' parameter in super.get()
def get(self, secret_id: str) -> Secret: # pylint: disable=arguments-differ,arguments-renamed
"""Return information for Secret by name or id.
Args:
secret_id: Secret name or id.
Raises:
NotFound: when Secret does not exist
APIError: when error returned by service
"""
response = self.client.get(f"/secrets/{secret_id}/json")
response.raise_for_status()
return self.prepare_model(attrs=(response.json()))
def list(self, **kwargs) -> List[Secret]:
"""Report on Secrets.
Keyword Args:
filters (Dict[str, Any]): Ignored.
Raises:
APIError: when error returned by service
"""
response = self.client.get("/secrets/json")
response.raise_for_status()
return [self.prepare_model(attrs=item) for item in response.json()]
def create(
self,
name: str,
data: bytes,
labels: Optional[Mapping[str, Any]] = None, # pylint: disable=unused-argument
driver: Optional[str] = None,
) -> Secret:
"""Create a Secret.
Args:
name: User-defined name of the secret.
data: Secret to be registered with Podman service.
labels: Ignored.
driver: Secret driver.
Raises:
APIError: when service returns an error
"""
params = {
"name": name,
"driver": driver,
}
response = self.client.post("/secrets/create", params=params, data=data)
response.raise_for_status()
body = response.json()
return self.get(body["ID"])
def remove(
self,
secret_id: Union[Secret, str],
all: Optional[bool] = None, # pylint: disable=redefined-builtin
):
"""Delete secret.
Podman only
Args:
secret_id: Identifier of Secret to delete.
all: When True, delete all secrets.
Raises:
NotFound: when Secret does not exist
APIError: when an error returned by service
"""
if isinstance(secret_id, Secret):
secret_id = secret_id.id
response = self.client.delete(f"/secrets/{secret_id}", params={"all": all})
response.raise_for_status()

View File

@ -0,0 +1,92 @@
"""SystemManager to provide system level information from Podman service."""
import logging
from typing import Any, Dict, Optional
from podman.api.client import APIClient
from podman import api
logger = logging.getLogger("podman.system")
class SystemManager:
"""SystemManager to provide system level information from Podman service."""
def __init__(self, client: APIClient) -> None:
"""Initialize SystemManager object.
Args:
client: Connection to Podman service.
"""
self.client = client
def df(self) -> Dict[str, Any]: # pylint: disable=invalid-name
"""Disk usage by Podman resources.
Returns:
dict: Keyed by resource categories and their data usage.
"""
response = self.client.get("/system/df")
response.raise_for_status()
return response.json()
def info(self, *_, **__) -> Dict[str, Any]:
"""Returns information on Podman service."""
response = self.client.get("/info")
response.raise_for_status()
return response.json()
def login(
self,
username: str,
password: Optional[str] = None,
email: Optional[str] = None,
registry: Optional[str] = None,
reauth: Optional[bool] = False, # pylint: disable=unused-argument
dockercfg_path: Optional[str] = None, # pylint: disable=unused-argument
) -> Dict[str, Any]:
"""Log into Podman service.
Args:
username: Registry username
password: Registry plaintext password
email: Registry account email address
registry: URL for registry access. For example,
reauth: Ignored: If True, refresh existing authentication. Default: False
dockercfg_path: Ignored: Path to custom configuration file.
https://quay.io/v2
"""
payload = {
"username": username,
"password": password,
"email": email,
"serveraddress": registry,
}
payload = api.prepare_body(payload)
response = self.client.post(
path="/auth",
headers={"Content-type": "application/json"},
data=payload,
compatible=True,
)
response.raise_for_status()
return response.json()
def ping(self) -> bool:
"""Returns True if service responded with OK."""
response = self.client.head("/_ping")
return response.ok
def version(self, **kwargs) -> Dict[str, Any]:
"""Get version information from service.
Keyword Args:
api_version (bool): When True include API version
"""
response = self.client.get("/version")
response.raise_for_status()
body = response.json()
if not kwargs.get("api_version", True):
del body["APIVersion"]
return body

View File

@ -0,0 +1,157 @@
"""Model and Manager for Volume resources."""
import logging
from typing import Any, Dict, List, Optional, Union
import requests
from podman import api
from podman.api import Literal
from podman.domain.manager import Manager, PodmanResource
from podman.errors import APIError
logger = logging.getLogger("podman.volumes")
class Volume(PodmanResource):
"""Details and configuration for an image managed by the Podman service."""
@property
def id(self):
return self.name
@property
def name(self):
"""str: Returns the name of the volume."""
return self.attrs.get("Name")
def remove(self, force: Optional[bool] = None) -> None:
"""Delete this volume.
Args:
force: When true, force deletion of in-use volume
Raises:
APIError: when service reports an error
"""
self.manager.remove(self.name, force=force)
class VolumesManager(Manager):
"""Specialized Manager for Volume resources."""
@property
def resource(self):
"""Type[Volume]: prepare_model() will create Volume classes."""
return Volume
def create(self, name: Optional[str] = None, **kwargs) -> Volume:
"""Create a Volume.
Args:
name: Name given to new volume
Keyword Args:
driver (str): Volume driver to use
driver_opts (Dict[str, str]): Options to use with driver
labels (Dict[str, str]): Labels to apply to volume
Raises:
APIError: when service reports error
"""
data = {
"Driver": kwargs.get("driver"),
"Labels": kwargs.get("labels"),
"Name": name,
"Options": kwargs.get("driver_opts"),
}
response = self.client.post(
"/volumes/create",
data=api.prepare_body(data),
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
return self.prepare_model(attrs=(response.json()))
def exists(self, key: str) -> bool:
response = self.client.get(f"/volumes/{key}/exists")
return response.ok
# pylint is flagging 'volume_id' here vs. 'key' parameter in super.get()
def get(self, volume_id: str) -> Volume: # pylint: disable=arguments-differ,arguments-renamed
"""Returns and volume by name or id.
Args:
volume_id: Volume id or name for which to search
Raises:
NotFound: when volume could not be found
APIError: when service reports an error
"""
response = self.client.get(f"/volumes/{volume_id}/json")
response.raise_for_status()
return self.prepare_model(attrs=response.json())
def list(self, *_, **kwargs) -> List[Volume]:
"""Report on volumes.
Keyword Args:
filters (Dict[str, str]): criteria to filter Volume list
- driver (str): filter volumes by their driver
- label (Dict[str, str]): filter by label and/or value
- name (str): filter by volume's name
"""
filters = api.prepare_filters(kwargs.get("filters"))
response = self.client.get("/volumes/json", params={"filters": filters})
if response.status_code == requests.codes.not_found:
return []
response.raise_for_status()
return [self.prepare_model(i) for i in response.json()]
def prune(
self, filters: Optional[Dict[str, str]] = None # pylint: disable=unused-argument
) -> Dict[Literal["VolumesDeleted", "SpaceReclaimed"], Any]:
"""Delete unused volumes.
Args:
filters: Criteria for selecting volumes to delete. Ignored.
Raises:
APIError: when service reports error
"""
response = self.client.post("/volumes/prune")
data = response.json()
response.raise_for_status()
volumes: List[str] = []
space_reclaimed = 0
for item in data:
if "Err" in item:
raise APIError(
item["Err"],
response=response,
explanation=f"""Failed to prune volume '{item.get("Id")}'""",
)
volumes.append(item.get("Id"))
space_reclaimed += item["Size"]
return {"VolumesDeleted": volumes, "SpaceReclaimed": space_reclaimed}
def remove(self, name: Union[Volume, str], force: Optional[bool] = None) -> None:
"""Delete a volume.
Podman only.
Args:
name: Identifier for Volume to be deleted.
force: When true, force deletion of in-use volume
Raises:
APIError: when service reports an error
"""
if isinstance(name, Volume):
name = name.name
response = self.client.delete(f"/volumes/{name}", params={"force": force})
response.raise_for_status()