Viewing file: config.py (4.46 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#!/opt/alt/python37/bin/python3 import logging import os import json from dataclasses import dataclass, asdict, field from wmt.common.const import CONFIG_PATH from wmt.common.exceptions import WmtConfigException from wmt.common.url_parser import parse from clcommon.cpapi import get_admin_email from socket import gethostname from typing import List
@dataclass class Cfg: """ Default values, in case config has not been specified yet """ ping_interval: int = 30 ping_timeout: int = 10 report_email: str = None ignore_list: List[str] = field(default_factory=list) summary_notification_enabled: bool = True alert_notifications_enabled: bool = True
class ConfigManager: def __init__(self): self.allowed_params = Cfg.__dataclass_fields__.keys() self.from_email = f'web-monitoring-tool@{gethostname()}' self.default_report_email = get_admin_email()
self.cfg = self._init_cfg()
# self.ignored_domains is used as cache for checking domains if it is ignored or not self._ignored_domains = self.generate_ignored_domains() self.target_email = self._get_target_email()
def _get_target_email(self): """ This function checks to see which email address to use for TO: field of smtp. If report_email has been defined by user then report_email will be used. By default (in case not defined by user) default_report_email will be used """ return self.cfg.report_email if self.cfg.report_email else self.default_report_email
def to_dict(self): return asdict(self.cfg)
def _init_cfg(self) -> Cfg: # if not present - use defaults if not self.is_present(): return Cfg()
data = self.read()
cfg = Cfg() for key, value in data.items(): if key not in self.allowed_params: logging.warning( f'unsupported parameter "{key}", please ensure config contains' f' only allowed parameters: {list(self.allowed_params)}' ) else: setattr(cfg, key, value)
return cfg
@staticmethod def is_present(): return os.path.isfile(CONFIG_PATH)
def modify(self, new_json: str): """ Changes configuration of wmt
Returns: self.to_dict()
Raises: WmtConfigException
Example: wmt-api-solo --config-change {'key': 'val'} """ try: new_config = json.loads(new_json) except json.JSONDecodeError as e: raise WmtConfigException(str(e))
if not set(new_config.keys()).issubset(self.allowed_params): raise WmtConfigException(f'some of passed params are unsupported, ' f'only allowed parameters: {list(self.allowed_params)}')
config = { **self.to_dict(), **new_config }
if config.get('ignore_list') and isinstance(config.get('ignore_list'), str): config['ignore_list'] = config['ignore_list'].split(',')
# Write config to /etc/wmt/config.json file with open(CONFIG_PATH, 'w') as f: json.dump(config, f, indent=4) self.cfg = Cfg(**config) return self.to_dict()
@staticmethod def read(): try: with open(CONFIG_PATH) as f: data = json.load(f) except json.JSONDecodeError as e: raise WmtConfigException(str(e)) return data
def reload(self): self.cfg = self._init_cfg()
def is_domain_ignored(self, domain) -> bool: """ Check if domain is in ignored list. """ return domain in self._ignored_domains
def generate_ignored_domains(self) -> set: """ Generates ignored domains from self.ignore_list and returns it for using as cache in self.ignored_domains set().
Purpose of this function is to generate formatted url as "http://www.test.com" from user entered urls in self.ignore_list. It makes sense to check domains names with this cache before pings or reports """ domains = set() for d in self.cfg.ignore_list: domain_from_ignore_list = parse(d)
if 'https' in domain_from_ignore_list: domain_from_ignore_list = domain_from_ignore_list.replace('https', 'http')
domains.add(domain_from_ignore_list)
return domains
|