Source code for gulik.monitors

# -*- coding: utf-8 -*-

import collections
import queue
import threading
import multiprocessing # for Queue
import psutil

from . import helpers
from . import collectors


[docs]class Monitor(threading.Thread): """ The base class for all :ref:`monitor`\s. """ collector_type = collectors.Collector def __init__(self, app, component): super(Monitor, self).__init__() self.app = app self.component = component self.daemon = True self.seppuku = False self.queue_update = multiprocessing.Queue(1) self.queue_data = multiprocessing.Queue(1) self.collector = self.collector_type(self.app, self.queue_update, self.queue_data) self.data = {} self.defective = False # for future use, mostly for networked monitors (netdata, mpd, …) def register_elements(self, elements): pass def tick(self): if not self.queue_update.full(): self.queue_update.put('UPDATE', block=True) def start(self): self.collector.start() super(Monitor, self).start() def run(self): #while self.collector.is_alive(): while not self.seppuku: try: self.data = self.queue_data.get(timeout=1) except queue.Empty: # try again, but give thread the ability to die without # waiting on collector indefinitely continue self.commit_seppuku() def commit_seppuku(self): print(f"{self.__class__.__name__} committing glorious seppuku!") #self.queue_update.close() self.collector.terminate() self.collector.join()
[docs] def normalize(self, element): """ Return most current datapoint about `element`, normalized to a float between 0 and 1. Parameters: element (str): An :ref:`element` that is valid in the context of this monitor. .. note:: This function has to be overriden in custom monitors. """ raise NotImplementedError("%s.normalize not implemented!" % self.__class__.__name__)
[docs] def caption(self, fmt): """ Return a given string with placeholders filled in with current values of this monitor. Parameters: fmt (str): A format string; The `text` item of a :ref:`caption-description`. .. note:: This function has to be overridden in custom monitors. """ raise NotImplementedError("%s.caption not implemented!" % self.__class__.__name__)
[docs]class CPUMonitor(Monitor): """ Monitor for CPU usage. """ collector_type = collectors.CPUCollector
[docs] def normalize(self, element): """ Elements exposed: * ``aggregate``: average cpu use, sum of all core loads divided by number of cores * ``core_<n>``: load of core ``<n>``, with possible values of ``<n>`` being 0 to number of cores - 1 """ if not self.data: return 0 if element == 'aggregate': return self.data['aggregate'] / 100.0 # assume core_<n> otherwise idx = int(element.split('_')[1]) return self.data['percpu'][idx] / 100.0
[docs] def caption(self, fmt): """ Exposed keys: * ``aggregate``: average cpu use, sum of all core loads divided by number of cores * ``core_<n>``: load of core ``<n>``, with possible values of ``<n>`` being 0 to number of cores - 1 * ``count``: number of cores """ if not self.data: return fmt data = {} data['count'] = self.data['count'] data['aggregate'] = self.data['aggregate'] for idx, perc in enumerate(self.data['percpu']): data['core_%d' % idx] = perc return fmt.format(**data)
[docs]class MemoryMonitor(Monitor): """ Monitor for memory usage """ collector_type = collectors.MemoryCollector
[docs] def normalize(self, element): """ Elements exposed: * ``percent``: memory use of all processes. * ``top_<n>``: memory use of the ``<n>``\th-biggest process. Valid values of ``<n>`` are 1-3. * ``other``: memory use of all processes except the top 3 """ if not self.data: return 0 if element == 'percent': return self.data.get('percent', 0) / 100.0 return self.data[element].get('percent', 0) / 100.0
[docs] def caption(self, fmt): """ Exposed keys: * ``total``: how much memory this machine has in total, * ``percent``: total memory usage in percent. * ``available``: how much memory can be malloc'd without going into swap (roughly). * ``top_<n>``: access information about the 3 "biggest" processes. possible subkeys are ``name``, ``size`` and ``percent``. * ``other``: aggregate information for all processes *except* the top 3. Same subkeys as those, plus ``'count``. """ if not self.data: return fmt data = helpers.DotDict()#dict(self.data) # clone data['total'] = helpers.pretty_bytes(self.data['total']) data['available'] = helpers.pretty_bytes(self.data['available']) data['percent'] = self.data['percent'] for k in ['top_1', 'top_2', 'top_3', 'other']: data[k] = helpers.DotDict() data[k]['name'] = self.data[k]['name'] data[k]['size'] = helpers.pretty_bytes(self.data[k]['size']) #data[k]['shared'] = helpers.pretty_bytes(self.data[k]['shared']) if k == 'other': data[k]['count'] = self.data[k]['count'] return fmt.format(**data)
[docs]class NetworkMonitor(Monitor): """ Monitor for network interfaces. """ collector_type = collectors.NetworkCollector def __init__(self, app, component): super(NetworkMonitor, self).__init__(app, component) self.interfaces = collections.OrderedDict() if self.app.config['FPS'] < 2: # we need a minimum of 2 samples so we can compute a difference deque_len = 2 else: # max size equal fps means this holds data of only the last second deque_len = self.app.config['FPS'] keys = [ 'bytes_sent', 'bytes_recv', 'packets_sent', 'packets_recv', 'errin', 'errout', 'dropin', 'dropout' ] for if_name in psutil.net_io_counters(pernic=True).keys(): self.interfaces[if_name] = { 'addrs': {}, 'stats': {}, 'counters': {} } for key in keys: self.interfaces[if_name]['counters'][key] = collections.deque([], deque_len) self.aggregate = { 'if_count': len(self.interfaces), 'if_up': 0, 'speed': 0, # aggregate link speed 'counters': {} } for key in keys: self.aggregate['counters'][key] = collections.deque([], deque_len) def run(self): while not self.seppuku: try: self.data = self.queue_data.get(timeout=1) except queue.Empty: # try again, but give thread the ability to die # without waiting on collector indefinitely. continue aggregates = {} for key in self.aggregate['counters']: #self.aggregate['counters'][k] = [] aggregates[key] = 0 self.aggregate['speed'] = 0 for if_name, if_data in self.interfaces.items(): if_has_data = if_name in self.data['counters'] and\ if_name in self.data['stats'] and\ if_name in self.data['addrs'] if if_has_data: for key, deque in if_data['counters'].items(): value = self.data['counters'][if_name]._asdict()[key] deque.append(value) aggregates[key] += value self.interfaces[if_name]['stats'] = self.data['stats'][if_name]._asdict() if self.interfaces[if_name]['stats']['speed'] == 0: self.interfaces[if_name]['stats']['speed'] = 1000 # assume gbit speed per default self.aggregate['speed'] += self.interfaces[if_name]['stats']['speed'] if if_name in self.data['addrs']: self.interfaces[if_name]['addrs'] = self.data['addrs'][if_name] else: self.interfaces[if_name]['addrs'] = [] for key, value in aggregates.items(): self.aggregate['counters'][key].append(value) self.commit_seppuku()
[docs] def count_sec(self, interface, key): """ get a specified count for a given interface as calculated for the last second. Example: | ``self.count_sec('eth0', 'bytes_sent')`` | (will return count of bytes sent in the last second) """ if interface == 'aggregate': deque = self.aggregate['counters'][key] else: deque = self.interfaces[interface]['counters'][key] if len(deque) < 2: # not enough data return 0 elif self.app.config['FPS'] < 2: # fps < 1 means data covers 1/fps seconds return (deque[-1] - deque[0]) / self.app.config['FPS'] else: # last (most recent) minus first (oldest) item return deque[-1] - deque[0]
[docs] def normalize(self, element): """ Exposed elements: * ``<if>.bytes_sent``: upload of network interface ``<if>``. * ``<if>.bytes_recv``: download of network interface ``<if>``. `<if>` can be any local network interface as well as `'aggregate'`. """ if_name, key = element.split('.') if if_name == 'aggregate': if len(self.aggregate['counters'][key]) >= 2: link_quality = float(self.aggregate['speed'] * 1024**2) return (self.count_sec(if_name, key) * 8) / link_quality elif len(self.interfaces[if_name]['counters'][key]) >= 2: link_quality = float(self.interfaces[if_name]['stats']['speed'] * 1024**2) return (self.count_sec(if_name, key) * 8) / link_quality # program flow should only arrive here if we have less than 2 # datapoints in which case we can't establish used bandwidth. return 0
[docs] def caption(self, fmt): """ Exposed keys: * ``<if>.bytes_sent``: upload of network interface ``<if>``. * ``<if>.bytes_recv``: download of network interface ``<if>``. * ``<if>.if_up``: Boolean, whether the interface is up. * ``<if>.speed``: interface speed in Mbit/s * ``<if>.counters``: supplies access to interface counters. Possible sub-elements are: * ``bytes_sent`` * ``bytes_recv`` * ``packets_sent`` * ``packets_recv`` * ``errin`` * ``errout`` * ``dropin`` * ``dropout`` `<if>` can be any local network interface as well as ``'aggregate'``. Additionally, the ``'aggregate'`` interface exposes the total count of network interfaces as ``if_count``. """ if not self.data: return fmt data = {} data['aggregate'] = helpers.DotDict() data['aggregate']['if_count'] = self.aggregate['if_count'] data['aggregate']['if_up'] = self.aggregate['if_up'] data['aggregate']['speed'] = self.aggregate['speed'] data['aggregate']['counters'] = helpers.DotDict() for key in self.aggregate['counters'].keys(): data['aggregate']['counters'][key] = self.count_sec('aggregate', key) if key.startswith('bytes'): data['aggregate']['counters'][key] = helpers.pretty_bits(data['aggregate']['counters'][key]) + '/s' for if_name in self.interfaces.keys(): data[if_name] = helpers.DotDict() data[if_name]['addrs'] = helpers.DotDict() all_addrs = [] for idx, addr in enumerate(self.interfaces[if_name]['addrs']): data[if_name]['addrs'][str(idx)] = addr all_addrs.append(addr.address) data[if_name]['all_addrs'] = u"\n".join(all_addrs) data[if_name]['stats'] = helpers.DotDict(self.interfaces[if_name]['stats']) data[if_name]['counters'] = helpers.DotDict() for key in self.interfaces[if_name]['counters'].keys(): data[if_name]['counters'][key] = self.count_sec(if_name, key) if key.startswith('bytes'): data[if_name]['counters'][key] = helpers.pretty_bits(data[if_name]['counters'][key]) + '/s' return fmt.format(**data)
[docs]class BatteryMonitor(Monitor): """ Monitor laptop batteries. """ collector_type = collectors.BatteryCollector
[docs] def normalize(self, element): """ This function exposes no explicit elements, but always just returns the current fill of the battery. """ # TODO: multi-battery support? needs support by psutil… if not self.data: return 0 return self.data.percent / 100.0
[docs] def caption(self, fmt): """ Exposed keys: * ``power_plugged``: Boolean, whether the AC cable is connected. * ``percent``: current fill of the battery in percent. * ``secsleft``: seconds left till battery is completely drained. * ``state``: Current state of the battery, one of ``'full'``, ``'charging'`` or ``'draining'``. """ if not self.data: return fmt data = self.data._asdict() data['percent'] = helpers.pretty_si(data['percent']) if data['secsleft'] == psutil.POWER_TIME_UNLIMITED: data['secsleft'] = '∞' elif data['secsleft'] == psutil.POWER_TIME_UNKNOWN: data['secsleft'] = 'unknown' else: data['secsleft'] = helpers.pretty_time(data['secsleft']) if not data['power_plugged']: data['state'] = 'draining' elif data['percent'] == 100: data['state'] = 'full' else: data['state'] = 'charging' return fmt.format(**data)
[docs]class DiskMonitor(Monitor): """ Monitors disk I/O and partitions. """ collector_type = collectors.DiskCollector def __init__(self, *args, **kwargs): super(DiskMonitor, self).__init__(*args, **kwargs) self.normalization_values = {} io = psutil.disk_io_counters(perdisk=True) for disk, info in io.items(): #self.normalization_values[disk] = info._asdict() for key, value in info._asdict().items(): element = '.'.join(['io', disk, key]) if key.endswith('_bytes'): self.normalization_values[element] = 100 * 1024 ** 2 # assume baseline ability of 100 MByte/s elif key.endswith('_count'): self.normalization_values[element] = 0 # FIXME: I have 0 clue what a reasonable baseline here is elif key.endswith('_time'): self.normalization_values[element] = 1000 # one second of data, collector reports data in milliseconds/s else: self.normalization_values[element] = 0
[docs] def normalize(self, element): """ Elements exposed: * ``io`` * Valid subelements are disk device file names as found in ``/dev``. Examples: ``ada0``, ``sda``. Valid subsubelements are as follows: * ``read_bytes`` * ``write_bytes`` * ``read_time`` * ``write_time`` * ``busy_time`` * ``partitions`` * Valid subelements are partition device file names as found in ``/dev``, with dots (``.``) being replaced with dashes (``-``). Examples: ``root-eli``, ``sda1``. """ parts = element.split('.') if parts[0] == 'io': disk, key = parts[1:] value = self.data['io'][disk][key] if self.normalization_values[element] < value: self.normalization_values[element] = value return self.data['io'][disk][key] / self.normalization_values[element] elif parts[0] == 'partitions': name = parts[1] info = self.data['partitions'][name]['usage'] return info['used'] / info['total'] return 0
[docs] def caption(self, fmt): """ Exposed keys are the same as for :func:`DiskMonitor.normalize`. """ data = helpers.DotDict() if 'partitions' in self.data and 'io' in self.data: data['io'] = helpers.DotDict() for name, diskinfo in self.data['io'].items(): data['io'][name] = helpers.DotDict() for key, value in diskinfo.items(): if key.endswith('_bytes'): value = helpers.pretty_bytes(value) data['io'][name][key] = value data['partitions'] = helpers.DotDict() for name, partition in self.data['partitions'].items(): part_data = helpers.DotDict() for key in ['name', 'device', 'mountpoint', 'fstype', 'opts']: part_data[key] = partition[key] part_data['usage'] = helpers.DotDict(partition['usage']) data['partitions'][name] = part_data return fmt.format(**data) return fmt
[docs]class NetdataMonitor(Monitor): """ Monitor that interfaces with (remote) netdata instances. """ collector_type = collectors.NetdataCollector def __init__(self, app, component, host, port): self.collector_type = functools.partial(self.collector_type, host=host, port=port) super(NetdataMonitor, self).__init__(app, component) self.charts = set() self.normalization_values = {} # keep a table of known maximums because netdata doesn't supply absolute normalization values #self.info_last_try = time.time() #try: # self.netdata_info = self.collector.client.charts() #except netdata.NetdataException as e: # print(f"Couldn't get chart overview from netdata host {host}!") # self.netdata_info = None # self.defective = True self.defective = True self.info_last_try = 0 self.netdata_info = False self.almost_fixed = False def __repr__(self): return f"<{self.__class__.__name__} host={self.collector.client.host} port={self.collector.client.port}>" def register_elements(self, elements): for element in elements: parts = element.split('.') chart = '.'.join(parts[:2]) if not chart in self.charts: self.normalization_values[chart] = 0 if self.netdata_info: if not chart in self.netdata_info['charts']: raise ValueError(f"Invalid chart: {chart} on netdata instance {self.host}:{self.port}!") chart_info = self.netdata_info['charts'][chart] if chart_info['units'] == 'percentage': self.normalization_values[chart] = 100 else: self.normalization_values[chart] = 0 self.charts.add(chart) def run(self): #while self.collector.is_alive(): while not self.seppuku: try: (chart, data) = self.queue_data.get(timeout=1/self.app.config['FPS']) self.data[chart] = data l = len(data['data'][0]) if(l < 2): print(f"Missing data, marking {self} as defective.") self.defective = True else: values = data['data'][0][1:] if self.defective: pass elif values[0] is None: # ignore "dead" datapoints print(f"Got dead datapoint, marking {self} as defective.") self.defective = True elif self.netdata_info['charts'][chart]['units'] == 'percentage': self.normalization_values[chart] = 100 # in case self was defective when register_elements was called else: cumulative_value = sum(data['data'][0][1:]) if self.normalization_values[chart] < cumulative_value: self.normalization_values[chart] = cumulative_value except queue.Empty: continue # try again self.commit_seppuku() def tick(self): if self.defective: t = time.time() if t >= self.info_last_try + self.app.config['NETDATA_RETRY']: if self.almost_fixed: print(f"{self.__class__.__name__} instance almost fixed, ignoring first chart overview because of netdata weirdness.") else: print(f"{self.__class__.__name__} instance currently defective, trying to get netdata overview from {self.collector.client.host}.") self.info_last_try = t try: self.netdata_info = self.collector.client.charts() if self.almost_fixed: self.defective = False self.almost_fixed = False self.tick() # do the actual tick (i.e. the else clause) if not self.defective: # in case defective was re-set in because of dead datapoints print("Success!") else: self.almost_fixed = True except netdata.NetdataException as e: print(f"Failed, will retry in {self.app.config['NETDATA_RETRY']} seconds.") else: if not self.queue_update.full(): #if not self.seppuku: # don't request more updates to collector when we're trying to die for chart in self.charts: self.queue_update.put(f"UPDATE {chart}", block=True)
[docs] def normalize(self, element): """ Exposed elements correspond to *chart names* and their datapoint *dimension*\s. For a list of valid chart and dimensions names, consult ``/api/v1/charts`` of the netdata instance in question. Examples: * ``system.cpu.nice`` * ``disk.ada0.writes`` """ if self.defective: return 0 else: parts = element.split('.') chart = '.'.join(parts[:2]) #if chart not in self.charts or not self.data[chart]: if not chart in self.data: #print(f"No data for {chart}") return 0 # #timestamp = self.data[chart]['data'][0][0] # first element of a netdata datapoint is always time #if timestamp > self.last_updates[chart]: subelem = parts[2] subidx = self.data[chart]['labels'].index(subelem) value = self.data[chart]['data'][0][subidx] if value >= self.normalization_values[chart]: self.normalization_values[chart] = value if self.normalization_values[chart] == 0: return 0 r = value / self.normalization_values[chart] return r
[docs] def caption(self, fmt): """ Exposed keys are the same as for :func:`NetdataMonitor.normalize`. """ if not self.data or self.defective: return fmt data = helpers.DotDict() for chart_name, chart_data in self.data.items(): chart_keys = chart_name.split('.') unit = self.netdata_info['charts'][chart_name]['units'] # called "units" but actually only ever one. it's a string. if not chart_keys[0] in data: data[chart_keys[0]] = helpers.DotDict() d = helpers.DotDict() for idx, label in enumerate(chart_data['labels']): value = chart_data['data'][0][idx] if value == None: value = 0 elif unit == 'bytes': value = helpers.pretty_bytes(value) elif unit.startswith('kilobytes'): postfix = unit[9:] value = helpers.pretty_bytes(value * 1024) + postfix elif unit.startswith('kilobits'): postfix = unit[8:] value = helpers.pretty_bits(value * 1024) + postfix else: value = f"{value} {unit}" d[label] = value data[chart_keys[0]][chart_keys[1]] = d return fmt.format(**data)