Source code for certdeploy.client.errors
from subprocess import Popen
from typing import Any, Union
from ..errors import CertDeployError
[docs]class UpdateError(CertDeployError):
"""Base class for all service update related errors."""
def __init__(self, service: Any, message: Union[Exception, str] = None,
service_name: str = None):
if service.name or service_name:
service_name = f' {service.name or service_name}'
else:
service_name = ''
if isinstance(message, Exception):
message = f'{type(message).__name__}: {message}'
message = message or ''
super().__init__(
f'{type(service).__name__}{service_name} failed '
f'to update: {message}'
)
[docs]class DockerNotFound(UpdateError):
"""Base class for failed docker API searches."""
_type: str = None
def __init__(self, service: Any, service_name: str = None):
message = (f'Could not find any docker {self._type} matching the '
f'following filter: {service.filters}')
super().__init__(service, message, service_name)
[docs]class DockerContainerNotFound(DockerNotFound):
"""Could not find a docker container using the given filters."""
_type: str = 'container'
[docs]class DockerServiceNotFound(DockerNotFound):
"""Could not find a docker service using the given filters."""
_type: str = 'service'
[docs]class DockerError(UpdateError):
"""Base class for docker related errors."""
_type = None
def __init__(self, service: Any, message: Union[Exception, str] = None,
service_name: str = None):
# Reverse order from UpdateError because the docker service/container
# name may need to override the `service.name` if the error is for a
# filter match.
service_name = service_name or service.name
super().__init__(service, f'Error updating docker {self._type} '
f'{service_name} from filters={service.filters}: '
f'{message}', service_name)
[docs]class DockerContainerError(DockerError):
"""Error restarting a docker container."""
_type = 'container'
[docs]class DockerServiceError(DockerError):
"""Error force updating a docker service."""
_type = 'service'
[docs]class SystemdError(UpdateError):
"""Error updating a systemd unit."""
def __init__(self, service: Any, message: Union[Exception, str] = None,
stdout: str = None):
if not message:
message = (f'Failed to {service.action} systemd unit '
f'{service.name}')
elif isinstance(message, Exception):
message = f'{type(message).__name__}: {message}'
if stdout:
message = f'{message}. Got combined stdout/stderr: \n{stdout}'
super().__init__(service, message)
[docs]class ScriptError(UpdateError):
"""Error running an update script."""
def __init__(self, service: Any, message: Union[Exception, str] = None,
proc: Popen = None, stdout: str = None):
if not message:
if proc:
message = (f'Failed to run the update script {service.name} '
f'returned={proc.returncode}')
if not stdout:
stdout_bytes = proc.stdout.read()
if stdout_bytes:
stdout = stdout_bytes.decode()
else:
message = f'Failed to run the update script {service.name}'
elif isinstance(message, Exception):
message = f'{type(message).__name__}: {message}'
if stdout:
message = f'{message}. Got combined stdout/stderr: \n{stdout}'
super().__init__(service, message)
[docs]class InvalidKey(CertDeployError):
"""Certificate validation error."""
def __init__(self, key_path: Any):
super().__init__(f'Invalid key {key_path}')