Source code for certdeploy.client.config.service

"""CertDeploy Client update service config types."""

import os
import re
import shutil
from typing import Union

from ...errors import ConfigError, ConfigInvalid, ConfigInvalidNumber
from .. import log

_DOCKER_NAME_RE = re.compile(r'[a-z0-9_.-]+', flags=re.I)
_RC_SERVICE_ACTIONS = ('restart', 'reload')
_SYSTEMCTL_ACTIONS = ('restart', 'reload')
_SYSTEMD_UNIT_NAME_RE = re.compile(
    r'[a-z0-9:_,.\\-]+(@[a-z0-9:_,.\\-]+)?\.'
    r'(service|socket|device|mount|automount|swap|target|path|timer|slice|'
    r'scope)',
    flags=re.I,
)


[docs] class Service: """Service config base class. Note: Some simple validation is done in this base class and its subclasses. The goal is to catch obvious mistakes like invalid names or values of the wrong type early in the execution of the process. """ action: str = None """The action to preform on the service. Defaults to `None`. This must be overriden if a service type uses it.""" filters: dict = {} """Filters to identify the service. Defaults to an empty `dict`.""" name: str = None # Just so it's there when exceptions look for it """The name identifying the service. Defaults to `None` just so it's available for exceptions.""" # False to distinguish unset vs set to None timeout: Union[float, int] = False """The timeout for the `action` preformed on the service. Defaults to `None`.""" def __init__(self, config: dict): log.debug( 'New service %s from config: config=%s', self.__class__.__name__, config, ) # name comes first to let errors reference it self.name = self._validate_name(config.get('name')) self.action = self._validate_action(config.get('action')) self.filters = self._validate_filters(config.get('filters')) self.timeout = self._validate_timeout(config.get('timeout', False)) log.debug('New service: %s', self) def _validate_action(self, action: str) -> str: return action or self.action def _validate_filters(self, filters): if filters is not None and not isinstance(filters, dict): raise ConfigError( f'Invalid value {filters} for `filters` for ' f'service {self.name}. `filters` must be a ' 'dictionary or `null`.' ) return filters or self.filters def _validate_timeout( self, timeout: Union[bool, float, int], ) -> Union[float, int]: log.debug('timeout = %s, self.timeout = %s', timeout, self.timeout) if timeout is False: # timeout was not given use the default return self.timeout if timeout is not None and not isinstance(timeout, (float, int)): raise ConfigInvalidNumber( 'timeout', timeout, optional=True, config_desc=f'service {self.name}', ) # If timeout is an int or float (it's int float or None given the above) # use it otherwise use the default (self.timeout). This is needed for # DockerContainer. Otherwise this could just return timeout. return timeout def _validate_name(self, name: str) -> str: # Don't return self.name since it always needs to be set or None. return name def __eq__(self, other) -> bool: """Test if `other` is the same as this instance. Return `True` if `other` is the same type and some attributes match. """ if not isinstance(other, self.__class__): return False return ( self.action == other.action and self.filters == other.filters and self.name == other.name and self.timeout == other.timeout ) def __repr__(self) -> str: """Return a pragmatic representation of this instance.""" return ( f'<{self.__class__.__name__}: action={self.action}, ' f'filters={self.filters}, name={self.name}, ' f'timeout={self.timeout}>' )
[docs] @staticmethod def load(config: dict) -> 'Service': """Load an update service model from a config dict. Arguments: config (dict): An update service config `dict`. The only required key for all types of services is `type`. Which is used to specify the type of service. Each service type has its own required config keys beyond `type`. """ try: service_class = { 'docker_container': DockerContainer, 'docker_service': DockerService, 'rc': RCService, 'script': Script, 'systemd': SystemdUnit, }[config.get('type')] except KeyError as err: raise ConfigError( f'{config.get("type")} is not a valid service ' 'type.' ) from err return service_class(config)
[docs] class DockerService(Service): """Docker service update config. Notes: * If no value is given for `filters` in `config` and `name` is given filters will be set to exactly match `name`. * If no value is given for both `filters` and `name` in `config`, `ConfigError` is raised. """ _type = 'service' def __init__(self, config: dict): # noqa: D107 super().__init__(config) if not self.name and not self.filters: raise ConfigError( 'Either `filters` or `name` must be given in ' f'`docker_{self._type}` configs. Got: {config}.' ) def _validate_name(self, name: str) -> str: if name is None: return name if not name or not _DOCKER_NAME_RE.match(name.strip()): raise ConfigInvalid( 'name', name, config_desc=f'docker {self._type} config', ) return name.strip()
[docs] class DockerContainer(DockerService): """Docker container update config.""" _type = 'container' action = 'restart' """The default update method.""" def __init__(self, config: dict): # noqa: D107 super().__init__(config) if self.name and not self.filters: # Match the exact name as given self.filters = {'name': f'^{self.name}$'}
[docs] class RCService(Service): """RC Service update config. OpenRC/Upstart/SysV style service update config. Note: `action` and `name` are validated. `action` has to be either ``reload`` or ``restart``. `name` must be a valid rc service name. It doesn't have to exist on the system to pass validation it just has to look right. """ action: str = 'restart' """The default update method for updating `rc` services. Valid values are ``reload`` or ``restart``. """ def _validate_action(self, action: str) -> str: if not action: return self.action if action.lower().strip() in _RC_SERVICE_ACTIONS: return action.lower().strip() raise ConfigInvalid( 'action', action, config_desc=f'service {self.name}', ) def _validate_name(self, name: str) -> str: if not name: raise ConfigInvalid( 'name', name, config_desc='rc service update config', ) return name.strip()
[docs] class Script(Service): """Script based update config. Note: The value of name is made into an absolute path as part of validation. This means any relative paths are evaluated relative to the current working directory of the client if they aren't found with `shutil.which()`. If the script isn't found `ConfigError` is raised. """ def __init__(self, config: dict): # noqa: D107 super().__init__(config) if os.path.isabs(self.name): self.script_path = self.name elif shutil.which(self.name): self.script_path = shutil.which(self.name) else: self.script_path = os.path.abspath(self.name) if not os.path.exists(self.script_path): raise ConfigError( f'Script file "{self.script_path}" for service ' f'{self.name} not found.' ) def _validate_name(self, name: str) -> str: if not name: raise ConfigInvalid('name', name, config_desc='script config') return name
[docs] class SystemdUnit(Service): """Systemd unit update config. Note: `action` and `name` are validated. `action` has to be either ``reload`` or ``restart``. `name` must be a valid Systemd unit name. It doesn't have to exist on the system to pass validation it just has to look right. """ action: str = 'restart' """The default update method for updating `systemd` services. Valid values are ``reload`` or ``restart``. """ def _validate_action(self, action: str) -> str: if not action: return self.action if action.lower().strip() in _SYSTEMCTL_ACTIONS: return action.lower().strip() raise ConfigInvalid( 'action', action, config_desc=f'service {self.name}', ) def _validate_name(self, name: str) -> str: if not name or not _SYSTEMD_UNIT_NAME_RE.match(name.strip()): raise ConfigInvalid( 'name', name, config_desc='systemd update service config' ) return name.strip()