Viewing file: dbsaver.py (4.84 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import import logging from lvestats.core.plugin import LveStatsPlugin from lvestats.orm import user as user_class from sqlalchemy import insert, select from lvestats.lib.commons.func import get_chunks from lvestats.orm.history import history from lvestats.orm.servers import servers from sqlalchemy.orm import sessionmaker from lvestats.lib.commons.func import reboot_lock
class DBSaver(LveStatsPlugin): def __init__(self): self.log = logging.getLogger('plugin.DBSaver') self.now = 0 # This changes in MainLoop self.config = None self.period = 60 self.server_id = 'localhost'
def set_config(self, config): self.period = int(config.get('db_timeout', self.period)) self.server_id = config.get('server_id', self.server_id)
def execute(self, lve_data): sql_insert_query = insert(history)
sql_select_servers = select([servers]).where(servers.server_id == self.server_id) sql_insert_servers = insert(servers) sql_update_servers = servers.__table__.update().where(servers.server_id == self.server_id)
with reboot_lock(): conn = self.engine.connect() tx = conn.begin() try: row = conn.execute(sql_select_servers) if row.returns_rows: res = row.fetchone() if res is not None: if res['lve_version'] != lve_data['LVE_VERSION']: conn.execute(sql_update_servers.values({'lve_version': lve_data['LVE_VERSION']})) else: conn.execute( sql_insert_servers, server_id=self.server_id, lve_version=lve_data['LVE_VERSION']) else: conn.execute( sql_insert_servers, server_id=self.server_id, lve_version=lve_data['LVE_VERSION'])
sql_insert_list = [] for lve_id, v in lve_data.get('lve_usage', {}).items(): sql_insert_list.append(dict( id=lve_id, cpu=int(round(v.cpu_usage)), # pylint: disable=round-builtin cpu_limit=v.lcpu, cpu_fault=v.cpu_fault, mep=v.mep, mep_limit=v.lep, io=int(round(v.io_usage)), # pylint: disable=round-builtin io_limit=v.io, mem=int(round(v.mem_usage)), # pylint: disable=round-builtin mem_limit=v.lmem, mem_fault=v.mem_fault, mep_fault=v.mep_fault, created=int(round(self.now)), # pylint: disable=round-builtin server_id=self.server_id, lmemphy=v.lmemphy, memphy=int(round(v.memphy)), # pylint: disable=round-builtin memphy_fault=v.memphy_fault, lnproc=v.lnproc, nproc=int(round(v.nproc)), # pylint: disable=round-builtin nproc_fault=v.nproc_fault, io_fault=v.io_fault, iops_fault=v.iops_fault, liops=v.liops, iops=int(round(v.iops)))) # pylint: disable=round-builtin
for chunk in get_chunks(sql_insert_list): conn.execute(sql_insert_query, chunk) tx.commit() except: tx.rollback() raise finally: conn.close()
class DbUsernamesSaver(LveStatsPlugin): def __init__(self): self.period = 60*60 # once an hour self.engine = None self.enabled = True
def set_config(self, config): self.enabled = config.get('collect_usernames', 'false').lower() == 'true'
def set_db_engine(self, engine): self.engine = engine
def execute(self, lve_data): if self.enabled: session = sessionmaker(bind=self.engine)() for user_info in lve_data['users']: user = user_class() user.uid, user.user_name, user.server_id = user_info try: session.merge(user) session.flush() except: # workaround for sqlalchemy could not deal with complex key on detecting duplicates when merge session.rollback()
session.commit() session.close()
|