# -*- coding: utf-8 -*-
import collections
import queue
import threading
import multiprocessing # for Queue
import psutil
from . import helpers
from . import collectors
[docs]class Monitor(threading.Thread):
"""
The base class for all :ref:`monitor`\s.
"""
collector_type = collectors.Collector
def __init__(self, app, component):
super(Monitor, self).__init__()
self.app = app
self.component = component
self.daemon = True
self.seppuku = False
self.queue_update = multiprocessing.Queue(1)
self.queue_data = multiprocessing.Queue(1)
self.collector = self.collector_type(self.app, self.queue_update, self.queue_data)
self.data = {}
self.defective = False # for future use, mostly for networked monitors (netdata, mpd, …)
def register_elements(self, elements):
pass
def tick(self):
if not self.queue_update.full():
self.queue_update.put('UPDATE', block=True)
def start(self):
self.collector.start()
super(Monitor, self).start()
def run(self):
#while self.collector.is_alive():
while not self.seppuku:
try:
self.data = self.queue_data.get(timeout=1)
except queue.Empty:
# try again, but give thread the ability to die without
# waiting on collector indefinitely
continue
self.commit_seppuku()
def commit_seppuku(self):
print(f"{self.__class__.__name__} committing glorious seppuku!")
#self.queue_update.close()
self.collector.terminate()
self.collector.join()
[docs] def normalize(self, element):
"""
Return most current datapoint about `element`,
normalized to a float between 0 and 1.
Parameters:
element (str): An :ref:`element` that is valid in the context of this monitor.
.. note:: This function has to be overriden in custom monitors.
"""
raise NotImplementedError("%s.normalize not implemented!" % self.__class__.__name__)
[docs] def caption(self, fmt):
"""
Return a given string with placeholders filled in with current values of this monitor.
Parameters:
fmt (str): A format string; The `text` item of a :ref:`caption-description`.
.. note:: This function has to be overridden in custom monitors.
"""
raise NotImplementedError("%s.caption not implemented!" % self.__class__.__name__)
[docs]class CPUMonitor(Monitor):
"""
Monitor for CPU usage.
"""
collector_type = collectors.CPUCollector
[docs] def normalize(self, element):
"""
Elements exposed:
* ``aggregate``: average cpu use, sum of all core loads divided by number of cores
* ``core_<n>``: load of core ``<n>``, with possible values of ``<n>`` being 0 to number of cores - 1
"""
if not self.data:
return 0
if element == 'aggregate':
return self.data['aggregate'] / 100.0
# assume core_<n> otherwise
idx = int(element.split('_')[1])
return self.data['percpu'][idx] / 100.0
[docs] def caption(self, fmt):
"""
Exposed keys:
* ``aggregate``: average cpu use, sum of all core loads divided by number of cores
* ``core_<n>``: load of core ``<n>``, with possible values of ``<n>`` being 0 to number of cores - 1
* ``count``: number of cores
"""
if not self.data:
return fmt
data = {}
data['count'] = self.data['count']
data['aggregate'] = self.data['aggregate']
for idx, perc in enumerate(self.data['percpu']):
data['core_%d' % idx] = perc
return fmt.format(**data)
[docs]class MemoryMonitor(Monitor):
"""
Monitor for memory usage
"""
collector_type = collectors.MemoryCollector
[docs] def normalize(self, element):
"""
Elements exposed:
* ``percent``: memory use of all processes.
* ``top_<n>``: memory use of the ``<n>``\th-biggest process. Valid values of ``<n>`` are 1-3.
* ``other``: memory use of all processes except the top 3
"""
if not self.data:
return 0
if element == 'percent':
return self.data.get('percent', 0) / 100.0
return self.data[element].get('percent', 0) / 100.0
[docs] def caption(self, fmt):
"""
Exposed keys:
* ``total``: how much memory this machine has in total,
* ``percent``: total memory usage in percent.
* ``available``: how much memory can be malloc'd without going into swap (roughly).
* ``top_<n>``: access information about the 3 "biggest" processes. possible subkeys are ``name``, ``size`` and ``percent``.
* ``other``: aggregate information for all processes *except* the top 3. Same subkeys as those, plus ``'count``.
"""
if not self.data:
return fmt
data = helpers.DotDict()#dict(self.data) # clone
data['total'] = helpers.pretty_bytes(self.data['total'])
data['available'] = helpers.pretty_bytes(self.data['available'])
data['percent'] = self.data['percent']
for k in ['top_1', 'top_2', 'top_3', 'other']:
data[k] = helpers.DotDict()
data[k]['name'] = self.data[k]['name']
data[k]['size'] = helpers.pretty_bytes(self.data[k]['size'])
#data[k]['shared'] = helpers.pretty_bytes(self.data[k]['shared'])
if k == 'other':
data[k]['count'] = self.data[k]['count']
return fmt.format(**data)
[docs]class NetworkMonitor(Monitor):
"""
Monitor for network interfaces.
"""
collector_type = collectors.NetworkCollector
def __init__(self, app, component):
super(NetworkMonitor, self).__init__(app, component)
self.interfaces = collections.OrderedDict()
if self.app.config['FPS'] < 2:
# we need a minimum of 2 samples so we can compute a difference
deque_len = 2
else:
# max size equal fps means this holds data of only the last second
deque_len = self.app.config['FPS']
keys = [
'bytes_sent',
'bytes_recv',
'packets_sent',
'packets_recv',
'errin',
'errout',
'dropin',
'dropout'
]
for if_name in psutil.net_io_counters(pernic=True).keys():
self.interfaces[if_name] = {
'addrs': {},
'stats': {},
'counters': {}
}
for key in keys:
self.interfaces[if_name]['counters'][key] = collections.deque([], deque_len)
self.aggregate = {
'if_count': len(self.interfaces),
'if_up': 0,
'speed': 0, # aggregate link speed
'counters': {}
}
for key in keys:
self.aggregate['counters'][key] = collections.deque([], deque_len)
def run(self):
while not self.seppuku:
try:
self.data = self.queue_data.get(timeout=1)
except queue.Empty:
# try again, but give thread the ability to die
# without waiting on collector indefinitely.
continue
aggregates = {}
for key in self.aggregate['counters']:
#self.aggregate['counters'][k] = []
aggregates[key] = 0
self.aggregate['speed'] = 0
for if_name, if_data in self.interfaces.items():
if_has_data = if_name in self.data['counters'] and\
if_name in self.data['stats'] and\
if_name in self.data['addrs']
if if_has_data:
for key, deque in if_data['counters'].items():
value = self.data['counters'][if_name]._asdict()[key]
deque.append(value)
aggregates[key] += value
self.interfaces[if_name]['stats'] = self.data['stats'][if_name]._asdict()
if self.interfaces[if_name]['stats']['speed'] == 0:
self.interfaces[if_name]['stats']['speed'] = 1000 # assume gbit speed per default
self.aggregate['speed'] += self.interfaces[if_name]['stats']['speed']
if if_name in self.data['addrs']:
self.interfaces[if_name]['addrs'] = self.data['addrs'][if_name]
else:
self.interfaces[if_name]['addrs'] = []
for key, value in aggregates.items():
self.aggregate['counters'][key].append(value)
self.commit_seppuku()
[docs] def count_sec(self, interface, key):
"""
get a specified count for a given interface
as calculated for the last second.
Example:
| ``self.count_sec('eth0', 'bytes_sent')``
| (will return count of bytes sent in the last second)
"""
if interface == 'aggregate':
deque = self.aggregate['counters'][key]
else:
deque = self.interfaces[interface]['counters'][key]
if len(deque) < 2: # not enough data
return 0
elif self.app.config['FPS'] < 2:
# fps < 1 means data covers 1/fps seconds
return (deque[-1] - deque[0]) / self.app.config['FPS']
else:
# last (most recent) minus first (oldest) item
return deque[-1] - deque[0]
[docs] def normalize(self, element):
"""
Exposed elements:
* ``<if>.bytes_sent``: upload of network interface ``<if>``.
* ``<if>.bytes_recv``: download of network interface ``<if>``.
`<if>` can be any local network interface as well as `'aggregate'`.
"""
if_name, key = element.split('.')
if if_name == 'aggregate':
if len(self.aggregate['counters'][key]) >= 2:
link_quality = float(self.aggregate['speed'] * 1024**2)
return (self.count_sec(if_name, key) * 8) / link_quality
elif len(self.interfaces[if_name]['counters'][key]) >= 2:
link_quality = float(self.interfaces[if_name]['stats']['speed'] * 1024**2)
return (self.count_sec(if_name, key) * 8) / link_quality
# program flow should only arrive here if we have less than 2
# datapoints in which case we can't establish used bandwidth.
return 0
[docs] def caption(self, fmt):
"""
Exposed keys:
* ``<if>.bytes_sent``: upload of network interface ``<if>``.
* ``<if>.bytes_recv``: download of network interface ``<if>``.
* ``<if>.if_up``: Boolean, whether the interface is up.
* ``<if>.speed``: interface speed in Mbit/s
* ``<if>.counters``: supplies access to interface counters. Possible sub-elements are:
* ``bytes_sent``
* ``bytes_recv``
* ``packets_sent``
* ``packets_recv``
* ``errin``
* ``errout``
* ``dropin``
* ``dropout``
`<if>` can be any local network interface as well as ``'aggregate'``.
Additionally, the ``'aggregate'`` interface exposes the total
count of network interfaces as ``if_count``.
"""
if not self.data:
return fmt
data = {}
data['aggregate'] = helpers.DotDict()
data['aggregate']['if_count'] = self.aggregate['if_count']
data['aggregate']['if_up'] = self.aggregate['if_up']
data['aggregate']['speed'] = self.aggregate['speed']
data['aggregate']['counters'] = helpers.DotDict()
for key in self.aggregate['counters'].keys():
data['aggregate']['counters'][key] = self.count_sec('aggregate', key)
if key.startswith('bytes'):
data['aggregate']['counters'][key] = helpers.pretty_bits(data['aggregate']['counters'][key]) + '/s'
for if_name in self.interfaces.keys():
data[if_name] = helpers.DotDict()
data[if_name]['addrs'] = helpers.DotDict()
all_addrs = []
for idx, addr in enumerate(self.interfaces[if_name]['addrs']):
data[if_name]['addrs'][str(idx)] = addr
all_addrs.append(addr.address)
data[if_name]['all_addrs'] = u"\n".join(all_addrs)
data[if_name]['stats'] = helpers.DotDict(self.interfaces[if_name]['stats'])
data[if_name]['counters'] = helpers.DotDict()
for key in self.interfaces[if_name]['counters'].keys():
data[if_name]['counters'][key] = self.count_sec(if_name, key)
if key.startswith('bytes'):
data[if_name]['counters'][key] = helpers.pretty_bits(data[if_name]['counters'][key]) + '/s'
return fmt.format(**data)
[docs]class BatteryMonitor(Monitor):
"""
Monitor laptop batteries.
"""
collector_type = collectors.BatteryCollector
[docs] def normalize(self, element):
"""
This function exposes no explicit elements, but always just returns
the current fill of the battery.
"""
# TODO: multi-battery support? needs support by psutil…
if not self.data:
return 0
return self.data.percent / 100.0
[docs] def caption(self, fmt):
"""
Exposed keys:
* ``power_plugged``: Boolean, whether the AC cable is connected.
* ``percent``: current fill of the battery in percent.
* ``secsleft``: seconds left till battery is completely drained.
* ``state``: Current state of the battery, one of ``'full'``, ``'charging'`` or ``'draining'``.
"""
if not self.data:
return fmt
data = self.data._asdict()
data['percent'] = helpers.pretty_si(data['percent'])
if data['secsleft'] == psutil.POWER_TIME_UNLIMITED:
data['secsleft'] = '∞'
elif data['secsleft'] == psutil.POWER_TIME_UNKNOWN:
data['secsleft'] = 'unknown'
else:
data['secsleft'] = helpers.pretty_time(data['secsleft'])
if not data['power_plugged']:
data['state'] = 'draining'
elif data['percent'] == 100:
data['state'] = 'full'
else:
data['state'] = 'charging'
return fmt.format(**data)
[docs]class DiskMonitor(Monitor):
"""
Monitors disk I/O and partitions.
"""
collector_type = collectors.DiskCollector
def __init__(self, *args, **kwargs):
super(DiskMonitor, self).__init__(*args, **kwargs)
self.normalization_values = {}
io = psutil.disk_io_counters(perdisk=True)
for disk, info in io.items():
#self.normalization_values[disk] = info._asdict()
for key, value in info._asdict().items():
element = '.'.join(['io', disk, key])
if key.endswith('_bytes'):
self.normalization_values[element] = 100 * 1024 ** 2 # assume baseline ability of 100 MByte/s
elif key.endswith('_count'):
self.normalization_values[element] = 0 # FIXME: I have 0 clue what a reasonable baseline here is
elif key.endswith('_time'):
self.normalization_values[element] = 1000 # one second of data, collector reports data in milliseconds/s
else:
self.normalization_values[element] = 0
[docs] def normalize(self, element):
"""
Elements exposed:
* ``io``
* Valid subelements are disk device file names as found in
``/dev``. Examples: ``ada0``, ``sda``.
Valid subsubelements are as follows:
* ``read_bytes``
* ``write_bytes``
* ``read_time``
* ``write_time``
* ``busy_time``
* ``partitions``
* Valid subelements are partition device file names as
found in ``/dev``, with dots (``.``) being replaced
with dashes (``-``). Examples: ``root-eli``, ``sda1``.
"""
parts = element.split('.')
if parts[0] == 'io':
disk, key = parts[1:]
value = self.data['io'][disk][key]
if self.normalization_values[element] < value:
self.normalization_values[element] = value
return self.data['io'][disk][key] / self.normalization_values[element]
elif parts[0] == 'partitions':
name = parts[1]
info = self.data['partitions'][name]['usage']
return info['used'] / info['total']
return 0
[docs] def caption(self, fmt):
"""
Exposed keys are the same as for :func:`DiskMonitor.normalize`.
"""
data = helpers.DotDict()
if 'partitions' in self.data and 'io' in self.data:
data['io'] = helpers.DotDict()
for name, diskinfo in self.data['io'].items():
data['io'][name] = helpers.DotDict()
for key, value in diskinfo.items():
if key.endswith('_bytes'):
value = helpers.pretty_bytes(value)
data['io'][name][key] = value
data['partitions'] = helpers.DotDict()
for name, partition in self.data['partitions'].items():
part_data = helpers.DotDict()
for key in ['name', 'device', 'mountpoint', 'fstype', 'opts']:
part_data[key] = partition[key]
part_data['usage'] = helpers.DotDict(partition['usage'])
data['partitions'][name] = part_data
return fmt.format(**data)
return fmt
[docs]class NetdataMonitor(Monitor):
"""
Monitor that interfaces with (remote) netdata instances.
"""
collector_type = collectors.NetdataCollector
def __init__(self, app, component, host, port):
self.collector_type = functools.partial(self.collector_type, host=host, port=port)
super(NetdataMonitor, self).__init__(app, component)
self.charts = set()
self.normalization_values = {} # keep a table of known maximums because netdata doesn't supply absolute normalization values
#self.info_last_try = time.time()
#try:
# self.netdata_info = self.collector.client.charts()
#except netdata.NetdataException as e:
# print(f"Couldn't get chart overview from netdata host {host}!")
# self.netdata_info = None
# self.defective = True
self.defective = True
self.info_last_try = 0
self.netdata_info = False
self.almost_fixed = False
def __repr__(self):
return f"<{self.__class__.__name__} host={self.collector.client.host} port={self.collector.client.port}>"
def register_elements(self, elements):
for element in elements:
parts = element.split('.')
chart = '.'.join(parts[:2])
if not chart in self.charts:
self.normalization_values[chart] = 0
if self.netdata_info:
if not chart in self.netdata_info['charts']:
raise ValueError(f"Invalid chart: {chart} on netdata instance {self.host}:{self.port}!")
chart_info = self.netdata_info['charts'][chart]
if chart_info['units'] == 'percentage':
self.normalization_values[chart] = 100
else:
self.normalization_values[chart] = 0
self.charts.add(chart)
def run(self):
#while self.collector.is_alive():
while not self.seppuku:
try:
(chart, data) = self.queue_data.get(timeout=1/self.app.config['FPS'])
self.data[chart] = data
l = len(data['data'][0])
if(l < 2):
print(f"Missing data, marking {self} as defective.")
self.defective = True
else:
values = data['data'][0][1:]
if self.defective:
pass
elif values[0] is None: # ignore "dead" datapoints
print(f"Got dead datapoint, marking {self} as defective.")
self.defective = True
elif self.netdata_info['charts'][chart]['units'] == 'percentage':
self.normalization_values[chart] = 100 # in case self was defective when register_elements was called
else:
cumulative_value = sum(data['data'][0][1:])
if self.normalization_values[chart] < cumulative_value:
self.normalization_values[chart] = cumulative_value
except queue.Empty:
continue # try again
self.commit_seppuku()
def tick(self):
if self.defective:
t = time.time()
if t >= self.info_last_try + self.app.config['NETDATA_RETRY']:
if self.almost_fixed:
print(f"{self.__class__.__name__} instance almost fixed, ignoring first chart overview because of netdata weirdness.")
else:
print(f"{self.__class__.__name__} instance currently defective, trying to get netdata overview from {self.collector.client.host}.")
self.info_last_try = t
try:
self.netdata_info = self.collector.client.charts()
if self.almost_fixed:
self.defective = False
self.almost_fixed = False
self.tick() # do the actual tick (i.e. the else clause)
if not self.defective: # in case defective was re-set in because of dead datapoints
print("Success!")
else:
self.almost_fixed = True
except netdata.NetdataException as e:
print(f"Failed, will retry in {self.app.config['NETDATA_RETRY']} seconds.")
else:
if not self.queue_update.full():
#if not self.seppuku: # don't request more updates to collector when we're trying to die
for chart in self.charts:
self.queue_update.put(f"UPDATE {chart}", block=True)
[docs] def normalize(self, element):
"""
Exposed elements correspond to *chart names* and their datapoint
*dimension*\s. For a list of valid chart and dimensions names, consult
``/api/v1/charts`` of the netdata instance in question.
Examples:
* ``system.cpu.nice``
* ``disk.ada0.writes``
"""
if self.defective:
return 0
else:
parts = element.split('.')
chart = '.'.join(parts[:2])
#if chart not in self.charts or not self.data[chart]:
if not chart in self.data:
#print(f"No data for {chart}")
return 0 #
#timestamp = self.data[chart]['data'][0][0] # first element of a netdata datapoint is always time
#if timestamp > self.last_updates[chart]:
subelem = parts[2]
subidx = self.data[chart]['labels'].index(subelem)
value = self.data[chart]['data'][0][subidx]
if value >= self.normalization_values[chart]:
self.normalization_values[chart] = value
if self.normalization_values[chart] == 0:
return 0
r = value / self.normalization_values[chart]
return r
[docs] def caption(self, fmt):
"""
Exposed keys are the same as for :func:`NetdataMonitor.normalize`.
"""
if not self.data or self.defective:
return fmt
data = helpers.DotDict()
for chart_name, chart_data in self.data.items():
chart_keys = chart_name.split('.')
unit = self.netdata_info['charts'][chart_name]['units'] # called "units" but actually only ever one. it's a string.
if not chart_keys[0] in data:
data[chart_keys[0]] = helpers.DotDict()
d = helpers.DotDict()
for idx, label in enumerate(chart_data['labels']):
value = chart_data['data'][0][idx]
if value == None:
value = 0
elif unit == 'bytes':
value = helpers.pretty_bytes(value)
elif unit.startswith('kilobytes'):
postfix = unit[9:]
value = helpers.pretty_bytes(value * 1024) + postfix
elif unit.startswith('kilobits'):
postfix = unit[8:]
value = helpers.pretty_bits(value * 1024) + postfix
else:
value = f"{value} {unit}"
d[label] = value
data[chart_keys[0]][chart_keys[1]] = d
return fmt.format(**data)