#!/usr/bin/env python3.6
# -*- coding: utf-8 -*-
"""
Architecture
------------
::
+-----------------------------------------+
| |
| gulik process | collector processes
| |
| (main thread) (monitor threads) |
| Gulik----------- CPUMonitor <-----ø-----> CPUCollector
| | | `- MemoryMonitor <-----ø-----> MemoryCollector
| | |`-- NetworkMonitor <-----ø-----> NetworkCollector
| | `--- … <-----ø-----> …
| | | |
| | | |
| `-- visualizers | (visualizers |
| | `- Arc | access |
| |`-- Plot <´ monitors) |
| `--- … |
| |
+-----------------------------------------+
In ``gulik``, there is one central :class:`Gulik` object.
It manages :class:`Monitor`\s and :class:`Visualizer`\s.
Visualizers use the :func:`Monitor.normalize` and :func:`Monitor.caption`
functions to utilize the collected data.
Communication between the :class:`Monitor`\s within the gulik process and
:class:`Collector` processes is done via queues. Every monitor/collector
pair shares two queues. One "update queue" that monitors use to send update
requests to collectors and one "data queue" that collectors use to send the
next datapoint to their respective monitor.
Box model
---------
::
+-----------------------------------------------------------+ ↑
| | |
| margin | |
| | |
| +---------------------------------------------------+ | |
| | | |
| | padding | | h
| | | | e
| | +-----------------------------------+ ↑ | | i
| | | | | inner | | g
| | | inner drawing region | | height | | h
| | | | | | | t
| | +-----------------------------------+ ↓ | |
| | | | |
| | ←------------ inner width ----------→ | | |
| | | | |
| +---------------------------------------------------+ | |
| | |
+-----------------------------------------------------------+ ↓
←--------------------------- width -------------------------→
``gulik``\s box model is pretty similar to the one CSS has, with
one important distinction: margins and paddings are included in
``width`` and ``height`` in order to make layouting easier.
Concepts
--------
.. _visualizer:
visualizer
^^^^^^^^^^
Visualizers are instances of any subclass of :class:`Visualizer`.
A visualizer is assigned a :class:`Monitor` and a list of :ref:`element`\s.
:class:`Gulik` will periodically call all visualizers ``update`` methods
(see source of :func:`Visualizer.update`).
What exactly happens in the ``update`` function of a visualizer differs
between the different classes, but usually it queries the instance-assigned
monitor for the most recent data about its :ref:`element`\s by calling
:func:`Monitor.normalize` and then does some drawing on the passed
:class:`cairo.Context` to visualize the data in some manner.
Currently, there are 7 built-in visualizers:
* :class:`Text`
* :class:`Rect`
* :class:`Arc`
* :class:`Plot`
* :class:`MirrorRect`
* :class:`MirrorArc`
* :class:`MirrorPlot`
You are, however, welcome to implement your own visualizers by subclassing
:class:`Visualizer` and overriding :func:`Visualizer.draw`.
.. _monitor:
monitor
^^^^^^^
Monitors are instances of any subclass of :class:`Monitor`,
which itself is a subclass of :class:`multithreading.Thread`.
Every monitor acts as one half of a monitor-:ref:`collector` pair,
each of which collects and transforms data on a specific :ref:`component`.
The monitors responsibility in this pair is to take data from the collector
and offer it in a form that is usable by :ref:`Visualizer`\s. This is mainly
done through the functions :func:`Monitor.normalize` and :func:`Monitor.caption`.
Currently, there are 6 built-in monitors:
* :class:`CPUMonitor`
* :class:`MemoryMonitor`
* :class:`NetworkMonitor`
* :class:`BatteryMonitor`
* :class:`DiskMonitor`
* :class:`NetdataMonitor`
.. _collector:
collector
^^^^^^^^^
Collectors are instances of any subclass of :class:`Collector`,
which itself is a subclass of :class:`multiprocessing.Process`.
Every collector acts as one half of a :ref:`monitor`-collector pair.
The collectors responsibility in this pair is to collect system usage
data and send them to its associated :ref:`monitor`.
.. _component:
component
^^^^^^^^^
A string identifying a data source.
Valid values are:
* ``'cpu'``
* ``'memory'``
* ``'network'``
* ``'battery'``
* ``'disk'``
Besides that, ``'netdata-<hostname>'`` is valid, but only if ``<hostname>``
exists in the ``NETDATA_HOSTS`` configuration option.
.. _element:
element
^^^^^^^
A string identifying a (sub) element of a data source.
Valid values are defined within the respective :class:`Monitor`\s.
.. _alignment:
alignment
^^^^^^^^^
A string in the shape of ``<x-align>_<y-align>``, where:
* ``<x-align>`` can be any of ``left``, ``center``, ``right``
* ``<y-align>`` can be any of ``top``, ``center``, ``bottom``
alignments are used both for text positioning relative to its respective
allowed borders as well as positioning captions within :class:`Visualizer`\s.
.. _caption-description:
caption description
^^^^^^^^^^^^^^^^^^^
A dictionary describing a caption to be rendered by a :class:`Visualizer`.
Required items:
* ``text``: The text to be rendered
Optional items:
* ``position``: Either an ``(x,y)`` coordinate tuple or an :ref:`alignment`
* ``align``: An :ref:`alignment`
* ``font_size``: A number (int or float) denoting vertical font size in pixels
* ``font_weight``: A string. ``Normal``, ``Bold``, ``Light``, etc.
* ``operator``: One of cairos blending operators (See :class:`cairo.Operator`)
.. _pattern:
pattern
^^^^^^^
A function taking one :class:`Color` as parameter returning a cairo surface for
use as fill. See :func:`stripe45` for an example.
.. _palette:
palette
^^^^^^^
A function taking one :class:`Color` and one `int` parameter returning a
``list`` of :class:`Color` objects with its length being equal to the passed
`int` parameter.
.. note:: :func:`palette_hue` and :func:`palette_value` have extra parameters
you won't be able to use without wrapping them in :func:`functools.partial`
first!
.. _combination:
combination
^^^^^^^^^^^
A string denoting how multiple :ref:`element`\s are displayed within a :class:`Visualizer`.
Valid values are:
* ``separate``: separate elements visually
* ``cumulative``: show values cumulatively, assume data is normalized for that (i.e. all values added max out at 1.0)
* ``cumulative_force``: like ``cumulative``, but assumes every single value can go up to 1.0
"""
#__all__ = ['Gulik', 'Visualizer']
import os
# get version from _version file
from gulik._version import version as __version__
import sys
import math
import time
import random
import signal
import collections
import queue
import functools
import threading
import multiprocessing
import setproctitle
import psutil
import colorsys
import cairo
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('PangoCairo', '1.0') # not sure if want
from gi.repository import Gtk, Gdk, GLib, Pango, PangoCairo
from . import netdata
Operator = cairo.Operator
PAGESIZE = os.sysconf('SC_PAGESIZE')
## Helpers ##
[docs]class Color(object):
"""
Magic color class implementing and supplying on-the-fly manipulation of
RGB and HSV (and alpha) attributes.
"""
def __init__(self, red=None, green=None, blue=None, alpha=None, hue=None, saturation=None, value=None):
rgb_passed = bool(red)|bool(green)|bool(blue)
hsv_passed = bool(hue)|bool(saturation)|bool(value)
if not alpha:
alpha = 0.0
if rgb_passed and hsv_passed:
raise ValueError("Color can't be initialized with RGB and HSV at the same time.")
elif hsv_passed:
if not hue:
hue = 0.0
if not saturation:
saturation = 0.0
if not value:
value = 0.0
super(Color, self).__setattr__('hue', hue)
super(Color, self).__setattr__('saturation', saturation)
super(Color, self).__setattr__('value', value)
self._update_rgb()
else:
if not red:
red = 0
if not green:
green = 0
if not blue:
blue = 0
super(Color, self).__setattr__('red', red)
super(Color, self).__setattr__('green', green)
super(Color, self).__setattr__('blue', blue)
self._update_hsv()
super(Color, self).__setattr__('alpha', alpha)
def __setattr__(self, key, value):
if key in ('red', 'green', 'blue'):
if value > 1.0:
value = value % 1.0
super(Color, self).__setattr__(key, value)
self._update_hsv()
elif key in ('hue', 'saturation', 'value'):
if key == 'hue' and (value >= 360.0 or value < 0):
value = value % 360.0
elif key != 'hue' and value > 1.0:
value = 1.0
super(Color, self).__setattr__(key, value)
self._update_rgb()
else:
if key == 'alpha' and value > 1.0: # TODO: Might this be more fitting in another place?
value = 1.0
super(Color, self).__setattr__(key, value)
def __repr__(self):
return '<%s: red %f, green %f, blue %f, hue %f, saturation %f, value %f, alpha %f>' % (
self.__class__.__name__,
self.red,
self.green,
self.blue,
self.hue,
self.saturation,
self.value,
self.alpha
)
def clone(self):
return Color(red=self.red, green=self.green, blue=self.blue, alpha=self.alpha)
def blend(self, other, mode='normal'):
if self.alpha != 1.0: # no clue how to blend with a translucent bottom layer
self.red = self.red * self.alpha
self.green = self.green * self.alpha
self.blue = self.blue * self.alpha
self.alpha = 1.0
if mode == 'normal':
own_influence = 1.0 - other.alpha
self.red = (self.red * own_influence) + (other.red * other.alpha)
self.green = (self.green * own_influence) + (other.green * other.alpha)
self.blue = (self.blue * own_influence) + (other.blue * other.alpha)
def lighten(self, other):
if isinstance(other, int) or isinstance(other, float):
other = Color(red=other, green=other, blue=other, alpha=1.0)
if self.alpha != 1.0:
self.red = self.red * self.alpha
self.green = self.green * self.alpha
self.blue = self.blue * self.alpha
self.alpha = 1.0
red = self.red + (other.red * other.alpha)
green = self.green + (other.green * other.alpha)
blue = self.blue + (other.blue * other.alpha)
if red > 1.0:
red = 1.0
if green > 1.0:
green = 1.0
if blue > 1.0:
blue = 1.0
self.red = red
self.green = green
self.blue = blue
def darken(self, other):
if isinstance(other, int) or isinstance(other, float):
other = Color(red=other, green=other, blue=other, alpha=1.0)
red = self.red - other.red
green = self.green - other.green
blue = self.blue - other.blue
if red < 0:
red = 0
if green < 0:
green = 0
if blue < 0:
blue = 0
self.red = red
self.green = green
self.blue = blue
[docs] def tuple_rgb(self):
""" return color (without alpha) as tuple, channels being float 0.0-1.0 """
return (self.red, self.green, self.blue)
[docs] def tuple_rgba(self):
""" return color (*with* alpha) as tuple, channels being float 0.0-1.0 """
return (self.red, self.green, self.blue, self.alpha)
def _update_hsv(self):
hue, saturation, value = colorsys.rgb_to_hsv(self.red, self.green, self.blue)
super(Color, self).__setattr__('hue', hue * 360.0)
super(Color, self).__setattr__('saturation', saturation)
super(Color, self).__setattr__('value', value)
def _update_rgb(self):
red, green, blue = colorsys.hsv_to_rgb(self.hue / 360.0, self.saturation, self.value)
super(Color, self).__setattr__('red', red)
super(Color, self).__setattr__('green', green)
super(Color, self).__setattr__('blue', blue)
[docs]class DotDict(dict):
"""
A dictionary with its data being readable through faked attributes.
Used to avoid [[[][][][][]] in caption formatting.
"""
def __getattribute__(self, name):
#data = super(DotDict, self).__getattribute__('data')
keys = super(DotDict, self).keys()
if name in keys:
return self.get(name)
return super(DotDict, self).__getattribute__(name)
[docs]def palette_hue(base, count, distance=180):
"""
Creates a hue-rotation palette.
Parameters:
base (:class:`Color`): Color on which the palette will be based (i.e. the starting point of the hue-rotation).
count (int): number of colors the palette should hold.
distance (int or float): angular distance on a 360° hue circle thingamabob.
Returns:
list: A list of length **count** of :class:`Color` objects.
"""
if count == 1:
return [base]
palette = []
for i in range(0, count):
color = base.clone()
color.hue += i/(count - 1) * distance
palette.append(color)
return palette
[docs]def palette_value(base, count, min=None, max=None):
"""
Creates a value-stepped palette
Parameters:
base (:class:`Color`): Color on which the palette will be based (i.e. source of hue and saturation)
count (int): number of colors the palette should hold
min (float >= 0 and <= 1): minimum value (the v in hsv)
max (float >= 0 and <= 1): maximum value
Returns:
list: A list of length **count** of :class:`Color` objects.
"""
if count == 1:
return [base]
if min is None:
if 0.2 > base.value:
min = base.value
else:
min = 0.2
if max is None:
if 0.6 < base.value:
max = base.value
else:
max = 0.6
span = max - min
step = span / (count - 1)
palette = []
for i in range(0, count):
color = base.clone()
color.value = max - i * step
palette.append(color)
return palette
[docs]def pretty_si(number):
"""
Return a SI-postfixed string representation of a number (int or float).
"""
postfixes = ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']
value = number
for postfix in postfixes:
if value / 1000.0 < 1:
break
value /= 1000.0
return "%.2f%s" % (value, postfix)
[docs]def pretty_bytes(bytecount):
"""
Return a human-readable representation given a size in bytes.
"""
units = ['Byte', 'kbyte', 'Mbyte', 'Gbyte', 'Tbyte']
value = bytecount
for unit in units:
if value / 1024.0 < 1:
break
value /= 1024.0
return "%.2f %s" % (value, unit)
[docs]def pretty_bits(bytecount):
"""
Return a human-readable representation in bits given a size in bytes.
"""
units = ['bit', 'kbit', 'Mbit', 'Gbit', 'Tbit']
value = bytecount * 8 # bytes to bits
for unit in units:
if value / 1024.0 < 1:
break
value /= 1024.0
return "%.2f %s" % (value, unit)
[docs]def ignore_none(*args):
"""
Return the first passed value that isn't ``None``.
"""
for arg in args:
if not arg is None:
return arg
def condense_addr_parts(items):
matching = []
max_match = min([len(x) for x in items])
for i in range(0, max_match):
s = set()
for item_idx in range(0, len(items)):
s.add(items[item_idx][i])
if len(s) > 1: # lazy hack, means not all items have the same part at index i
break
matching.append(items[item_idx][i])
return '.'.join(matching)
def alignment_offset(align, size):
x_align, y_align = align.split('_')
if x_align == 'left':
x_offset = 0
elif x_align == 'center':
x_offset = -size[0] / 2
elif x_align == 'right':
x_offset = -size[0]
else:
raise ValueError("unknown horizontal alignment: '%s', must be one of: left, center, right" % x_align)
if y_align == 'top':
y_offset = 0
elif y_align == 'center':
y_offset = -size[1] / 2
elif y_align == 'bottom':
y_offset = -size[1]
else:
raise ValueError("unknown horizontal alignment: '%s', must be one of: top, center, bottom" % y_align)
return (x_offset, y_offset)
## PATTERNS ##
def stripe45(color):
surface = cairo.ImageSurface(cairo.Format.ARGB32, 10, 10)
context = cairo.Context(surface)
context.set_source_rgba(*color.tuple_rgba())
context.move_to(5, 5)
context.line_to(10, 0)
context.line_to(10, 5)
context.line_to(5, 10)
context.line_to(0, 10)
context.line_to(5, 5)
context.close_path()
context.fill()
context.move_to(0, 0)
context.line_to(5, 0)
context.line_to(0, 5)
context.close_path()
context.fill()
return surface
##class PeriodicCall(threading.Thread):
#
# """ Periodically forces a window to redraw """
#
# def __init__(self, target, hz):
#
# super(PeriodicCall, self).__init__()
# self.daemon = True
#
# self.target = target
# self.interval = 1.0/hz
#
#
# def run(self):
#
# while True: # This thread will automatically die with its parent because of the daemon flag
#
# self.target()
# time.sleep(self.interval)
try:
_default_height = Gdk.Screen().get_default().get_height()
except Exception: # to avoid setup failures in sandboxes without graphical environment
_default_height = 0
DEFAULTS = {
'FPS': 1,
'WIDTH': 200,
'HEIGHT': _default_height,
'X': 0,
'Y': 0,
'NETDATA_HOSTS': [],
'NETDATA_RETRY': 5,
'BSD_ACCURATE_MEMORY': False, # use inaccurate but cheaper memory collection on bsd
# styling stuff below this
'MARGIN': 5,
'PADDING': 5,
'PADDING_BOTTOM': 40, # space for 2-rows of autolegend
'FONT': 'Orbitron',
'FONT_WEIGHT': 'Light',
'FONT_SIZE': 10,
'COLOR_WINDOW_BACKGROUND': Color(0.05, 0.05, 0.05, 0.8),
'COLOR_BACKGROUND': Color(1,1,1, 0.1), # background for visualizers
'COLOR_FOREGROUND': Color(0.5, 1, 0, 0.6),
'COLOR_CAPTION': Color(1,1,1, 0.6),
#'COLOR_CAPTION_MINOR': Color(1,1,1, 0.3),
'PALETTE': functools.partial(palette_hue, distance=-120), # mhh, curry…
'PATTERN': stripe45,
'CAPTION_PLACEMENT': 'inner', # allow captions to be properly centered in the inner region of visualizers, as opposed to 'padding'
#'CAPTION_PLACEMENT': 'padding', # allow captions to be placed within paddings, as opposed to 'inner'
'LEGEND': True,
'LEGEND_ORDER': 'normal', # other valid value: 'reverse'
'LEGEND_SIZE': 20, # not font size, but height of one legend cell, including margin and padding.
'LEGEND_PLACEMENT': 'padding',
'LEGEND_MARGIN': 2.5,
#'LEGEND_PADDING': 2.5,
'LEGEND_PADDING': 0,
'OPERATOR': Operator.OVER,
# class-specific default styles
'PADDING_TEXT': 0, # otherwise text will be tiny
'PADDING_RECT': 5,
'FONT_SIZE_RECT': 14,
'FONT_WEIGHT_RECT': 'Bold',
#'COLOR_RECT_CAPTION': Color(1,1,1, 1),
'PATTERN_RECT': None,
'OPERATOR_RECT_CAPTION': Operator.DIFFERENCE, # grants visibility no matter how much of the rect is filled
'CAPTION_PLACEMENT_RECT': 'inner', # to get padding around cut-out
'PATTERN_ARC': None,
'PATTERN_MIRRORARC': None,
'AUTOSCALE': False,
'LINE': True,
'MARKERS': False,
'GRID': True,
}
## Stuff I'd much rather do without a huge dependency like gtk ##
class Window(Gtk.Window):
def __init__(self):
super(Window, self).__init__()
self.set_title('gulik')
self.set_role('gulik')
self.stick() # show this window on every virtual desktop
self.set_app_paintable(True)
self.set_type_hint(Gdk.WindowTypeHint.DOCK)
self.set_keep_below(True)
screen = self.get_screen()
visual = screen.get_rgba_visual()
if visual != None and screen.is_composited():
self.set_visual(visual)
self.show_all()
@property
def width(self):
return self.get_size()[0]
@property
def height(self):
return self.get_size()[1]
## Collectors ##
class Collector(multiprocessing.Process):
def __init__(self, app, queue_update, queue_data):
super(Collector, self).__init__()
self.daemon = True
self.app = app
self.queue_update = queue_update
self.queue_data = queue_data
self.elements = []
def terminate(self):
#self.queue_data.close() # closing queues manually actually seems to mess stuff up
# Would've done this cleaner, but after half a day of chasing some
# retarded quantenbug I'm done with this shit. Just nuke the fucking
# things from orbit.
os.kill(self.pid, signal.SIGKILL)
#super(Collector, self).terminate()
def run(self):
setproctitle.setproctitle(f"gulik - {self.__class__.__name__}")
while True:
try:
msg = self.queue_update.get(block=True)
if msg == 'UPDATE':
self.update()
except KeyboardInterrupt: # so we don't randomly explode on ctrl+c
pass
def update(self):
raise NotImplementedError("%s.update not implemented!" % self.__class__.__name__)
class CPUCollector(Collector):
def update(self):
count = psutil.cpu_count()
aggregate = psutil.cpu_percent(percpu=False)
percpu = psutil.cpu_percent(percpu=True)
self.queue_data.put(
{
'count': count,
'aggregate': aggregate,
'percpu': percpu
},
block=True
)
# according to psutil docs, there should at least be 0.1 seconds
# between calls to cpu_percent without sampling interval
time.sleep(0.1)
class MemoryCollector(Collector):
def update(self):
vmem = psutil.virtual_memory()
processes = []
total_use = 0
for process in psutil.process_iter():
if psutil.LINUX or (psutil.BSD and not self.app.config['BSD_ACCURATE_MEMORY']):
if psutil.BSD:
key = 'rss'
else:
key = 'pss'
try:
pmem = process.memory_full_info()._asdict()
processes.append(DotDict({
'name': process.name(),
'size': pmem[key],
#'shared': pmem.shared,
'percent': pmem[key] / vmem.total * 100
}))
total_use += pmem[key]
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
continue # skip to next process
elif psutil.BSD:
try:
resident = 0
size = 0
#shared = 0
try:
for mmap in process.memory_maps():
# assuming everything with a real path is
# "not really in ram", but no clue.
if mmap.path.startswith('['):
size += mmap.private * PAGESIZE
resident += mmap.rss * PAGESIZE
#shared += (mmap.rss - mmap.private) * PAGESIZE # FIXME: probably broken, can yield negative values
except OSError:
pass # probably "device not available"
processes.append(DotDict({
'name': process.name(),
'size': size,
#'shared': resident - size,
'percent': size / vmem.total * 100,
}))
total_use += size
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
pass# TODO: add counter for processes we can't introspect
info = DotDict({
'total': vmem.total,
'percent': total_use / vmem.total * 100,
'available': vmem.total - total_use
})
processes_sorted = sorted(processes, key=lambda x: x['size'], reverse=True)
for i, process in enumerate(processes_sorted[:3]):
info['top_%d' % (i + 1)] = process
info['other'] = DotDict({
'name': 'other',
'size': 0,
#'shared': 0,
'count': 0
})
for process in processes_sorted[3:]:
info['other']['size'] += process['size']
#info['other']['shared'] += process['shared']
info['other']['count'] += 1
info['other']['percent'] = info['other']['size'] / vmem.total * 100
self.queue_data.put(info, block=True)
#time.sleep(1) # because this became horribly slow
class NetworkCollector(Collector):
def update(self):
stats = psutil.net_if_stats()
addrs = psutil.net_if_addrs()
counters = psutil.net_io_counters(pernic=True)
connections = psutil.net_connections(kind='all')
self.queue_data.put(
{
'stats': stats,
'addrs': addrs,
'counters': counters,
'connections': connections,
},
block=True
)
class BatteryCollector(Collector):
def update(self):
self.queue_data.put(psutil.sensors_battery(), block=True)
class DiskCollector(Collector):
def __init__(self, *args, **kwargs):
super(DiskCollector, self).__init__(*args, **kwargs)
self.previous_io = {}
def update(self):
data = {}
partitions = psutil.disk_partitions()
data['partitions'] = {}
for partition in partitions:
name = partition.device.split('/')[-1].replace('.', '-')
data['partitions'][name] = partition._asdict()
data['partitions'][name]['name'] = name
data['partitions'][name]['usage'] = psutil.disk_usage(partition.mountpoint)._asdict()
io = psutil.disk_io_counters(perdisk=True)
data['io'] = {}
for disk, info in io.items():
previous_info = self.previous_io.get(disk, None)
if previous_info is None:
data['io'][disk] = {
'read_count': 0,
'write_count': 0,
'read_bytes': 0,
'write_bytes': 0,
'read_time': 0,
'write_time': 0,
'busy_time': 0
}
else:
info = info._asdict()
previous_info = previous_info._asdict()
data['io'][disk] = {}
for k in ['read_count', 'write_count', 'read_bytes', 'write_bytes', 'read_time', 'write_time', 'busy_time']:
data['io'][disk][k] = (info[k] - previous_info[k]) * self.app.config['FPS']
#data['io'][disk] = info._asdict()
self.queue_data.put(data, block=True)
self.previous_io = io
class NetdataCollector(Collector):
def __init__(self, app, queue_update, queue_data, host, port):
super(NetdataCollector, self).__init__(app, queue_update, queue_data)
self.client = netdata.Netdata(host, port=port, timeout=1/self.app.config['FPS'])
def run(self):
setproctitle.setproctitle(f"gulik - {self.__class__.__name__}")
while True:
try:
msg = self.queue_update.get(block=True)
if msg.startswith('UPDATE '):
chart = msg[7:]
self.update(chart)
except KeyboardInterrupt: # so we don't randomly explode on ctrl+c
pass
def update(self, chart):
try:
# get the last second of data condensed to one point
data = self.client.data(chart, points=1, after=-1, options=['absolute'])
except netdata.NetdataException:
pass
else:
self.queue_data.put((chart, data), block=True)
## Monitors ##
[docs]class Monitor(threading.Thread):
"""
The base class for all :ref:`monitor`\s.
"""
collector_type = Collector
def __init__(self, app, component):
super(Monitor, self).__init__()
self.app = app
self.component = component
self.daemon = True
self.seppuku = False
self.queue_update = multiprocessing.Queue(1)
self.queue_data = multiprocessing.Queue(1)
self.collector = self.collector_type(self.app, self.queue_update, self.queue_data)
self.data = {}
self.defective = False # for future use, mostly for networked monitors (netdata, mpd, …)
def register_elements(self, elements):
pass
def tick(self):
if not self.queue_update.full():
self.queue_update.put('UPDATE', block=True)
def start(self):
self.collector.start()
super(Monitor, self).start()
def run(self):
#while self.collector.is_alive():
while not self.seppuku:
try:
self.data = self.queue_data.get(timeout=1)
except queue.Empty:
# try again, but give thread the ability to die without
# waiting on collector indefinitely
continue
self.commit_seppuku()
def commit_seppuku(self):
print(f"{self.__class__.__name__} committing glorious seppuku!")
#self.queue_update.close()
self.collector.terminate()
self.collector.join()
def normalize(self, element):
raise NotImplementedError("%s.normalize not implemented!" % self.__class__.__name__)
def caption(self, fmt):
raise NotImplementedError("%s.caption not implemented!" % self.__class__.__name__)
[docs]class CPUMonitor(Monitor):
"""
Memory for CPU usage.
"""
collector_type = CPUCollector
[docs] def normalize(self, element):
"""
Elements exposed:
* ``aggregate``: average cpu use, sum of all core loads divided by number of cores
* ``core_<n>``: load of core ``<n>``, with possible values of ``<n>`` being 0 to number of cores - 1
"""
if not self.data:
return 0
if element == 'aggregate':
return self.data['aggregate'] / 100.0
# assume core_<n> otherwise
idx = int(element.split('_')[1])
return self.data['percpu'][idx] / 100.0
[docs] def caption(self, fmt):
"""
Exposed keys:
* ``aggregate``: average cpu use, sum of all core loads divided by number of cores
* ``core_<n>``: load of core ``<n>``, with possible values of ``<n>`` being 0 to number of cores - 1
* ``count``: number of cores
"""
if not self.data:
return fmt
data = {}
data['count'] = self.data['count']
data['aggregate'] = self.data['aggregate']
for idx, perc in enumerate(self.data['percpu']):
data['core_%d' % idx] = perc
return fmt.format(**data)
[docs]class MemoryMonitor(Monitor):
"""
Monitor for memory usage
"""
collector_type = MemoryCollector
[docs] def normalize(self, element):
"""
Elements exposed:
* ``percent``: memory use of all processes.
* ``top_<n>``: memory use of the ``<n>``\th-biggest process. Valid values of ``<n>`` are 1-3.
* ``other``: memory use of all processes except the top 3
"""
if not self.data:
return 0
if element == 'percent':
return self.data.get('percent', 0) / 100.0
return self.data[element].get('percent', 0) / 100.0
[docs] def caption(self, fmt):
"""
Exposed keys:
* ``total``: how much memory this machine has in total,
* ``percent``: total memory usage in percent.
* ``available``: how much memory can be malloc'd without going into swap (roughly).
* ``top_<n>``: access information about the 3 "biggest" processes. possible subkeys are ``name``, ``size`` and ``percent``.
* ``other``: aggregate information for all processes *except* the top 3. Same subkeys as those, plus ``'count``.
"""
if not self.data:
return fmt
data = DotDict()#dict(self.data) # clone
data['total'] = pretty_bytes(self.data['total'])
data['available'] = pretty_bytes(self.data['available'])
data['percent'] = self.data['percent']
for k in ['top_1', 'top_2', 'top_3', 'other']:
data[k] = DotDict()
data[k]['name'] = self.data[k]['name']
data[k]['size'] = pretty_bytes(self.data[k]['size'])
#data[k]['shared'] = pretty_bytes(self.data[k]['shared'])
if k == 'other':
data[k]['count'] = self.data[k]['count']
return fmt.format(**data)
[docs]class NetworkMonitor(Monitor):
"""
Monitor for network interfaces.
"""
collector_type = NetworkCollector
def __init__(self, app, component):
super(NetworkMonitor, self).__init__(app, component)
self.interfaces = collections.OrderedDict()
if self.app.config['FPS'] < 2:
# we need a minimum of 2 samples so we can compute a difference
deque_len = 2
else:
# max size equal fps means this holds data of only the last second
deque_len = self.app.config['FPS']
keys = [
'bytes_sent',
'bytes_recv',
'packets_sent',
'packets_recv',
'errin',
'errout',
'dropin',
'dropout'
]
for if_name in psutil.net_io_counters(pernic=True).keys():
self.interfaces[if_name] = {
'addrs': {},
'stats': {},
'counters': {}
}
for key in keys:
self.interfaces[if_name]['counters'][key] = collections.deque([], deque_len)
self.aggregate = {
'if_count': len(self.interfaces),
'if_up': 0,
'speed': 0, # aggregate link speed
'counters': {}
}
for key in keys:
self.aggregate['counters'][key] = collections.deque([], deque_len)
def run(self):
while not self.seppuku:
try:
self.data = self.queue_data.get(timeout=1)
except queue.Empty:
# try again, but give thread the ability to die
# without waiting on collector indefinitely.
continue
aggregates = {}
for key in self.aggregate['counters']:
#self.aggregate['counters'][k] = []
aggregates[key] = 0
self.aggregate['speed'] = 0
for if_name, if_data in self.interfaces.items():
if_has_data = if_name in self.data['counters'] and\
if_name in self.data['stats'] and\
if_name in self.data['addrs']
if if_has_data:
for key, deque in if_data['counters'].items():
value = self.data['counters'][if_name]._asdict()[key]
deque.append(value)
aggregates[key] += value
self.interfaces[if_name]['stats'] = self.data['stats'][if_name]._asdict()
if self.interfaces[if_name]['stats']['speed'] == 0:
self.interfaces[if_name]['stats']['speed'] = 1000 # assume gbit speed per default
self.aggregate['speed'] += self.interfaces[if_name]['stats']['speed']
if if_name in self.data['addrs']:
self.interfaces[if_name]['addrs'] = self.data['addrs'][if_name]
else:
self.interfaces[if_name]['addrs'] = []
for key, value in aggregates.items():
self.aggregate['counters'][key].append(value)
self.commit_seppuku()
[docs] def count_sec(self, interface, key):
"""
get a specified count for a given interface
as calculated for the last second.
EXAMPLE: ``self.count_sec('eth0', 'bytes_sent')``
(will return count of bytes sent in the last second)
"""
if interface == 'aggregate':
deque = self.aggregate['counters'][key]
else:
deque = self.interfaces[interface]['counters'][key]
if len(deque) < 2: # not enough data
return 0
elif self.app.config['FPS'] < 2:
# fps < 1 means data covers 1/fps seconds
return (deque[-1] - deque[0]) / self.app.config['FPS']
else:
# last (most recent) minus first (oldest) item
return deque[-1] - deque[0]
[docs] def normalize(self, element):
"""
Exposed elements:
* ``<if>.bytes_sent``: upload of network interface ``<if>``.
* ``<if>.bytes_recv``: download of network interface ``<if>``.
`<if>` can be any local network interface as well as `'aggregate'`.
"""
if_name, key = element.split('.')
if if_name == 'aggregate':
if len(self.aggregate['counters'][key]) >= 2:
link_quality = float(self.aggregate['speed'] * 1024**2)
return (self.count_sec(if_name, key) * 8) / link_quality
elif len(self.interfaces[if_name]['counters'][key]) >= 2:
link_quality = float(self.interfaces[if_name]['stats']['speed'] * 1024**2)
return (self.count_sec(if_name, key) * 8) / link_quality
# program flow should only arrive here if we have less than 2
# datapoints in which case we can't establish used bandwidth.
return 0
[docs] def caption(self, fmt):
"""
Exposed keys:
* ``<if>.bytes_sent``: upload of network interface ``<if>``.
* ``<if>.bytes_recv``: download of network interface ``<if>``.
* ``<if>.if_up``: Boolean, whether the interface is up.
* ``<if>.speed``: interface speed in Mbit/s
* ``<if>.counters``: supplies access to interface counters. Possible sub-elements are:
* ``bytes_sent``
* ``bytes_recv``
* ``packets_sent``
* ``packets_recv``
* ``errin``
* ``errout``
* ``dropin``
* ``dropout``
`<if>` can be any local network interface as well as ``'aggregate'``.
Additionally, the ``'aggregate'`` interface exposes the total
count of network interfaces as ``if_count``.
"""
if not self.data:
return fmt
data = {}
data['aggregate'] = DotDict()
data['aggregate']['if_count'] = self.aggregate['if_count']
data['aggregate']['if_up'] = self.aggregate['if_up']
data['aggregate']['speed'] = self.aggregate['speed']
data['aggregate']['counters'] = DotDict()
for key in self.aggregate['counters'].keys():
data['aggregate']['counters'][key] = self.count_sec('aggregate', key)
if key.startswith('bytes'):
data['aggregate']['counters'][key] = pretty_bits(data['aggregate']['counters'][key]) + '/s'
for if_name in self.interfaces.keys():
data[if_name] = DotDict()
data[if_name]['addrs'] = DotDict()
all_addrs = []
for idx, addr in enumerate(self.interfaces[if_name]['addrs']):
data[if_name]['addrs'][str(idx)] = addr
all_addrs.append(addr.address)
data[if_name]['all_addrs'] = u"\n".join(all_addrs)
data[if_name]['stats'] = DotDict(self.interfaces[if_name]['stats'])
data[if_name]['counters'] = DotDict()
for key in self.interfaces[if_name]['counters'].keys():
data[if_name]['counters'][key] = self.count_sec(if_name, key)
if key.startswith('bytes'):
data[if_name]['counters'][key] = pretty_bits(data[if_name]['counters'][key]) + '/s'
return fmt.format(**data)
[docs]class BatteryMonitor(Monitor):
"""
Monitor laptop batteries.
"""
collector_type = BatteryCollector
[docs] def normalize(self, element):
"""
This function exposes no explicit elements, but always just returns
the current fill of the battery.
"""
# TODO: multi-battery support? needs support by psutil…
if not self.data:
return 0
return self.data.percent / 100.0
[docs] def caption(self, fmt):
"""
Exposed keys:
* ``power_plugged``: Boolean, whether the AC cable is connected.
* ``percent``: current fill of the battery in percent.
* ``secsleft``: seconds left till battery is completely drained.
* ``state``: Current state of the battery, one of ``'full'``, ``'charging'`` or ``'draining'``.
"""
if not self.data:
return fmt
data = self.data._asdict()
if not data['power_plugged']:
data['state'] = 'draining'
elif data['percent'] == 100:
data['state'] = 'full'
else:
data['state'] = 'charging'
return fmt.format(**data)
[docs]class DiskMonitor(Monitor):
"""
Monitors disk I/O and partitions.
"""
collector_type = DiskCollector
def __init__(self, *args, **kwargs):
super(DiskMonitor, self).__init__(*args, **kwargs)
self.normalization_values = {}
io = psutil.disk_io_counters(perdisk=True)
for disk, info in io.items():
#self.normalization_values[disk] = info._asdict()
for key, value in info._asdict().items():
element = '.'.join(['io', disk, key])
if key.endswith('_bytes'):
self.normalization_values[element] = 100 * 1024 ** 2 # assume baseline ability of 100 MByte/s
elif key.endswith('_count'):
self.normalization_values[element] = 0 # FIXME: I have 0 clue what a reasonable baseline here is
elif key.endswith('_time'):
self.normalization_values[element] = 1000 # one second of data, collector reports data in milliseconds/s
else:
self.normalization_values[element] = 0
[docs] def normalize(self, element):
"""
Elements exposed:
* ``io``
* Valid subelements are disk device file names as found in
``/dev``. Examples: ``ada0``, ``sda``.
Valid subsubelements are as follows:
* ``read_bytes``
* ``write_bytes``
* ``read_time``
* ``write_time``
* ``busy_time``
* ``partitions``
* Valid subelements are partition device file names as
found in ``/dev``, with dots (``.``) being replaced
with dashes (``-``). Examples: ``root-eli``, ``sda1``.
"""
parts = element.split('.')
if parts[0] == 'io':
disk, key = parts[1:]
value = self.data['io'][disk][key]
if self.normalization_values[element] < value:
self.normalization_values[element] = value
return self.data['io'][disk][key] / self.normalization_values[element]
elif parts[0] == 'partitions':
name = parts[1]
info = self.data['partitions'][name]['usage']
return info['used'] / info['total']
return 0
[docs] def caption(self, fmt):
"""
Exposed keys are the same as for :func:`DiskMonitor.normalize`.
"""
data = DotDict()
if 'partitions' in self.data and 'io' in self.data:
data['io'] = DotDict()
for name, diskinfo in self.data['io'].items():
data['io'][name] = DotDict()
for key, value in diskinfo.items():
if key.endswith('_bytes'):
value = pretty_bytes(value)
data['io'][name][key] = value
data['partitions'] = DotDict()
for name, partition in self.data['partitions'].items():
part_data = DotDict()
for key in ['name', 'device', 'mountpoint', 'fstype', 'opts']:
part_data[key] = partition[key]
part_data['usage'] = DotDict(partition['usage'])
data['partitions'][name] = part_data
return fmt.format(**data)
return fmt
[docs]class NetdataMonitor(Monitor):
"""
Monitor that interfaces with (remote) netdata instances.
"""
collector_type = NetdataCollector
def __init__(self, app, component, host, port):
self.collector_type = functools.partial(self.collector_type, host=host, port=port)
super(NetdataMonitor, self).__init__(app, component)
self.charts = set()
self.normalization_values = {} # keep a table of known maximums because netdata doesn't supply absolute normalization values
self.info_last_try = time.time()
try:
self.netdata_info = self.collector.client.charts()
except netdata.NetdataException as e:
print(f"Couldn't get chart overview from netdata host {host}!")
self.netdata_info = None
self.defective = True
self.almost_fixed = False
def __repr__(self):
return f"<{self.__class__.__name__} host={self.collector.client.host} port={self.collector.client.port}>"
def register_elements(self, elements):
for element in elements:
parts = element.split('.')
chart = '.'.join(parts[:2])
if not chart in self.charts:
self.normalization_values[chart] = 0
if self.netdata_info:
if not chart in self.netdata_info['charts']:
raise ValueError(f"Invalid chart: {chart} on netdata instance {self.host}:{self.port}!")
chart_info = self.netdata_info['charts'][chart]
if chart_info['units'] == 'percentage':
self.normalization_values[chart] = 100
else:
self.normalization_values[chart] = 0
self.charts.add(chart)
def run(self):
#while self.collector.is_alive():
while not self.seppuku:
try:
(chart, data) = self.queue_data.get(timeout=1/self.app.config['FPS'])
self.data[chart] = data
l = len(data['data'][0])
if(l < 2):
print(f"Missing data, marking {self} as defective.")
self.defective = True
else:
values = data['data'][0][1:]
if self.defective:
pass
elif values[0] is None: # ignore "dead" datapoints
print(f"Got dead datapoint, marking {self} as defective.")
self.defective = True
elif self.netdata_info['charts'][chart]['units'] == 'percentage':
self.normalization_values[chart] = 100 # in case self was defective when register_elements was called
else:
cumulative_value = sum(data['data'][0][1:])
if self.normalization_values[chart] < cumulative_value:
self.normalization_values[chart] = cumulative_value
except queue.Empty:
continue # try again
self.commit_seppuku()
def tick(self):
if self.defective:
t = time.time()
if t >= self.info_last_try + self.app.config['NETDATA_RETRY']:
if self.almost_fixed:
print(f"{self.__class__.__name__} instance almost fixed, ignoring first chart overview because of netdata weirdness.")
else:
print(f"{self.__class__.__name__} instance currently defective, trying to get netdata overview from {self.collector.client.host}.")
self.info_last_try = t
try:
self.netdata_info = self.collector.client.charts()
if self.almost_fixed:
self.defective = False
self.almost_fixed = False
self.tick() # do the actual tick (i.e. the else clause)
if not self.defective: # in case defective was re-set in because of dead datapoints
print("Success!")
else:
self.almost_fixed = True
except netdata.NetdataException as e:
print(f"Failed, will retry in {self.app.config['NETDATA_RETRY']} seconds.")
else:
if not self.queue_update.full():
#if not self.seppuku: # don't request more updates to collector when we're trying to die
for chart in self.charts:
self.queue_update.put(f"UPDATE {chart}", block=True)
[docs] def normalize(self, element):
"""
Exposed elements correspond to *chart names* and their datapoint
*dimension*s. For a list of valid chart and dimensions names, consult
``/api/v1/charts`` of the netdata instance in question.
Examples:
* ``system.cpu.nice``
* ``disk.ada0.writes``
"""
if self.defective:
return 0
else:
parts = element.split('.')
chart = '.'.join(parts[:2])
#if chart not in self.charts or not self.data[chart]:
if not chart in self.data:
#print(f"No data for {chart}")
return 0 #
#timestamp = self.data[chart]['data'][0][0] # first element of a netdata datapoint is always time
#if timestamp > self.last_updates[chart]:
subelem = parts[2]
subidx = self.data[chart]['labels'].index(subelem)
value = self.data[chart]['data'][0][subidx]
if value >= self.normalization_values[chart]:
self.normalization_values[chart] = value
if self.normalization_values[chart] == 0:
return 0
r = value / self.normalization_values[chart]
return r
[docs] def caption(self, fmt):
"""
Exposed keys are the same as for :func:`NetdataMonitor.normalize`.
"""
if not self.data or self.defective:
return fmt
data = DotDict()
for chart_name, chart_data in self.data.items():
chart_keys = chart_name.split('.')
unit = self.netdata_info['charts'][chart_name]['units'] # called "units" but actually only ever one. it's a string.
if not chart_keys[0] in data:
data[chart_keys[0]] = DotDict()
d = DotDict()
for idx, label in enumerate(chart_data['labels']):
value = chart_data['data'][0][idx]
if value == None:
value = 0
elif unit == 'bytes':
value = pretty_bytes(value)
elif unit.startswith('kilobytes'):
postfix = unit[9:]
value = pretty_bytes(value * 1024) + postfix
elif unit.startswith('kilobits'):
postfix = unit[8:]
value = pretty_bits(value * 1024) + postfix
else:
value = f"{value} {unit}"
d[label] = value
data[chart_keys[0]][chart_keys[1]] = d
return fmt.format(**data)
## Visualizers ##
[docs]class Visualizer(object):
"""
The base class for all visualizers. Not called widget to avoid naming
confusion with gtk widgets (which aren't even used in gulik).
Usually you won't instantiate this by yourself but use :func:`Box.place`.
Parameters:
app (:class:`Gulik`): The app managing visualizers and monitors
monitor (:class:`Monitor`): The monitor managing data collection for this visualizer
x (int): leftmost coordinate on the x-axis
y (int): topmost coordinate on the y-axis
width (int): overall width (including **margin** and **padding**)
height (int): overall height (including **margin** and **padding**)
margin (int, optional): margin applied around all sides
margin_left (int, optional): margin applied to the left side
margin_right (int, optional): margin applied to the right side
margin_top (int, optional): margin applied to the top side
margin_bottom (int, optional): margin applied to the bottom side
padding (int, optional): padding applied around all sides
padding_left (int, optional): padding applied to the left side
padding_right (int, optional): padding applied to the right side
padding_top (int, optional): padding applied to the top side
padding_bottom (int, optional): padding applied to the bottom side
elements (list of str): A list of :ref:`element`\s to visualize
captions (list of dict, optional): A list of :ref:`caption descriptions<caption-description>`
legend (bool): Whether to try to automatically create a legend of elements
legend_order('normal' or 'reverse', optional): Whether to reverse the legend order
legend_format(str): A format string, can contain ``{element}`` to refer to the element legend items refer to.
legend_size (int, optional): height of a single legend item
legend_placement ('padding' or 'inner'): Where to place the legend
legend_margin (int, optional): margin applied around all sides of the legend
legend_margin_left (int, optional): margin applied to the left side of the legend
legend_margin_right (int, optional): margin applied to the right side of the legend
legend_margin_top (int, optional): margin applied to the top side of the legend
legend_margin_bottom (int, optional): margin applied to the bottom side of the legend
legend_padding (int, optional): padding applied around all sides of the legend
legend_padding_left (int, optional): padding applied to the left side of the legend
legend_padding_right (int, optional): padding applied to the right side of the legend
legend_padding_top (int, optional): padding applied to the top side of the legend
legend_padding_bottom (int, optional): padding applied to the bottom side of the legend
foreground (:class:`Color`, optional): The foreground color, base-color for generated palettes
background (:class:`Color`, optional): The background color
pattern (function, optional): An executable :ref:`pattern`
palette (function, optional): An executable :ref:`palette`
combination (str, optional): The :ref:`combination` mode used when displaying multiple :ref:`element`\s
"""
def __init__(
self,
app,
monitor,
x=0,
y=0,
width=None,
height=None,
margin=None,
margin_left=None,
margin_right=None,
margin_top=None,
margin_bottom=None,
padding=None,
padding_left=None,
padding_right=None,
padding_top=None,
padding_bottom=None,
elements=None,
captions=None,
caption_placement=None,
legend=None,
legend_order=None,
legend_format=None,
legend_size=None,
legend_placement=None,
legend_margin=None,
legend_margin_left=None,
legend_margin_right=None,
legend_margin_top=None,
legend_margin_bottom=None,
legend_padding=None,
legend_padding_left=None,
legend_padding_right=None,
legend_padding_top=None,
legend_padding_bottom=None,
foreground=None,
background=None,
pattern=None,
palette=None,
combination=None,
operator=None
):
self.app = app
self.monitor = monitor
self.x = x
self.y = y
self.elements = ignore_none(elements, [])
self.captions = ignore_none(captions, list())
self.caption_placement = ignore_none(caption_placement, self.get_style('caption_placement'))
self.legend = ignore_none(legend, self.get_style('legend'))
self.legend_order = ignore_none(legend_order, self.get_style('legend_order'))
self.legend_format = legend_format
self.legend_placement = ignore_none(legend_placement, self.get_style('legend_placement'))
self.legend_size= ignore_none(legend_size, self.get_style('legend_size'))
self.legend_margin_left = ignore_none(legend_margin_left, legend_margin, self.get_style('legend_margin', 'left'))
self.legend_margin_right = ignore_none(legend_margin_right, legend_margin, self.get_style('legend_margin', 'right'))
self.legend_margin_top = ignore_none(legend_margin_top, legend_margin, self.get_style('legend_margin', 'top'))
self.legend_margin_bottom = ignore_none(legend_margin_bottom, legend_margin, self.get_style('legend_margin', 'bottom'))
self.legend_padding_left = ignore_none(legend_padding_left, legend_padding, self.get_style('legend_padding', 'left'))
self.legend_padding_right = ignore_none(legend_padding_right, legend_padding, self.get_style('legend_padding', 'right'))
self.legend_padding_top = ignore_none(legend_padding_top, legend_padding, self.get_style('legend_padding', 'top'))
self.legend_padding_bottom = ignore_none(legend_padding_bottom, legend_padding, self.get_style('legend_padding', 'bottom'))
self.operator = ignore_none(operator, self.get_style('operator'))
self.width = ignore_none(width, self.get_style('width'))
self.height = ignore_none(height, self.get_style('height'))
self.margin_left = ignore_none(margin_left, margin, self.get_style('margin', 'left'))
self.margin_right = ignore_none(margin_right, margin, self.get_style('margin', 'right'))
self.margin_top = ignore_none(margin_top, margin, self.get_style('margin', 'top'))
self.margin_bottom = ignore_none(margin_bottom, margin, self.get_style('margin', 'bottom'))
self.padding_left = ignore_none(padding_left, padding, self.get_style('padding', 'left'))
self.padding_right = ignore_none(padding_right, padding, self.get_style('padding', 'right'))
self.padding_top = ignore_none(padding_top, padding, self.get_style('padding', 'top'))
self.padding_bottom = ignore_none(padding_bottom, padding, self.get_style('padding', 'bottom'))
self.colors = {}
self.colors['foreground'] = ignore_none(foreground, self.get_style('color', 'foreground'))
self.colors['background'] = ignore_none(background, self.get_style('color', 'background'))
self.pattern = ignore_none(pattern, self.get_style('pattern'))
self.palette = ignore_none(palette, self.get_style('palette')) # function to generate color palettes with
self.combination = ignore_none(combination, 'separate') # combination mode when handling multiple elements. 'separate', 'cumulative' or 'cumulative_force'. cumulative assumes all values add up to max 1.0, while separate assumes every value can reach 1.0 and divides all values by the number of elements handled
assert self.inner_width > 0, f"margin and padding too big, implying negative inner width for {self.__class__.__name__}/{self.monitor.component}/{self.elements}"
assert self.inner_height > 0, f"margin and padding too big, implying negative inner height for {self.__class__.__name__}/{self.monitor.component}/{self.elements}"
self.monitor.register_elements(self.elements)
if self.legend:
legend_x = self.x + self.margin_left + self.padding_left
legend_y = self.y + self.margin_top + self.padding_top
if self.legend_placement == 'inner':
legend_height = self.inner_height
else: # 'padding'
legend_y += self.inner_height
legend_height = self.padding_bottom
rownum = legend_height // self.legend_size
if rownum < 1:
print(f"Can't add autolegend to {self.__class__.__name__}/{self.monitor.component} because of insufficient space: {legend_height}px")
else:
legend_info = self.legend_info()
colnum = math.ceil(len(legend_info) // rownum) or 1
cell_width = self.inner_width // colnum
#colors = self.palette(self.colors['foreground'], len(self.elements))
box = self.app.box(legend_x, legend_y, self.inner_width, legend_height)
if self.legend_order == 'reverse':
iterator = reversed(legend_info)
else:
iterator = legend_info
#for element in legend_elements:
for element in iterator:
color, text = legend_info[element]
if self.legend_format:
text = self.legend_format.format(**{'element': element})
box.place(
self.monitor.component,
Text,
text=text,
align='center_center',
foreground=color,
background=Color(0,0,0, 0),
width=cell_width,
height=self.legend_size,
margin_left=self.legend_margin_left,
margin_right=self.legend_margin_right,
margin_top=self.legend_margin_top,
margin_bottom=self.legend_margin_bottom,
padding_left=self.legend_padding_left,
padding_right=self.legend_padding_right,
padding_top=self.legend_padding_top,
padding_bottom=self.legend_padding_bottom,
legend=False
)
[docs] def get_style(self, name, subname=None):
"""
load the most specific style setting available given a name and optional subname.
usage examples: self.get_style('margin', 'left'),
"""
keys = []
if subname:
keys.append('_'.join([name, self.__class__.__name__, subname]).upper())
keys.append('_'.join([name, self.__class__.__name__]).upper())
if subname:
keys.append('_'.join([name, subname]).upper())
keys.append(name.upper())
for key in keys:
if key in self.app.config:
return self.app.config[key]
[docs] def legend_info(self):
""" defines colors for legend elements """
data = collections.OrderedDict()
colors = self.palette(self.colors['foreground'], len(self.elements))
for idx, color in enumerate(colors):
element = self.elements[idx]
data[element] = (color, element)
return data
@property
def padded_width(self):
return self.width - self.margin_left - self.margin_right
@property
def padded_height(self):
return self.height - self.margin_top - self.margin_bottom
@property
def inner_width(self):
return self.padded_width - self.padding_left - self.padding_right
@property
def inner_height(self):
return self.padded_height - self.padding_top - self.padding_bottom
def set_brush(self, context, color): # possible TODO: better function name
if self.pattern:
context.set_source_surface(self.pattern(color))
context.get_source().set_extend(cairo.Extend.REPEAT)
else:
context.set_source_rgba(*color.tuple_rgba())
def draw_background(self, context):
context.set_source_rgba(*self.colors['background'].tuple_rgba())
context.rectangle(self.x + self.margin_left+ self.padding_left, self.y + self.margin_top + self.padding_top, self.inner_width, self.inner_height)
context.fill()
def draw_captions(self, context):
for caption in self.captions:
context.save()
context.set_operator(caption.get('operator', self.get_style('operator', 'caption')))
if 'position' in caption:
if isinstance(caption['position'], str):
# handle alignment-style strings like "center_bottom"
if self.caption_placement == 'inner':
offset = [-x for x in alignment_offset(caption['position'], (self.inner_width, self.inner_height))]
else:
offset = [-x for x in alignment_offset(caption['position'], (self.padded_width, self.padded_height))]
else:
offset = caption['position']
else:
offset = [0, 0]
if self.caption_placement == 'inner':
position = [offset[0] + self.x + self.margin_left + self.padding_left, offset[1] + self.y + self.margin_top + self.padding_top]
else:
position = [offset[0] + self.x + self.margin_left, offset[1] + self.y + self.margin_top]
caption_text = self.monitor.caption(caption['text'])
self.app.draw_text(
context,
caption_text,
position[0],
position[1],
align=caption.get('align', None),
color=caption.get('color', self.get_style('color', 'caption')),
font_size=caption.get('font_size', self.get_style('font_size', 'caption')),
font_weight=caption.get('font_weight', self.get_style('font_weight', 'caption'))
)
context.restore()
[docs] def update(self, context):
"""
Parameters:
context (:class:`cairo.Context`): cairo context of the window
"""
context.save()
context.set_operator(self.operator)
self.draw(context)
context.restore()
def draw(self, context):
raise NotImplementedError("%s.draw not implemented!" % self.__class__.__name__)
[docs]class Text(Visualizer):
"""
Scrollable text using monitors' ``caption`` function to give textual
representations of values, prettified where necessary.
"""
def __init__(self, app, monitor, text, speed=25, align=None, **kwargs):
super(Text, self).__init__(app, monitor, **kwargs)
self.text = text # the text to be rendered, a format string passed to monitor.caption
self.previous_text = '' # to be able to detect change
self.speed = speed
self.align = ignore_none(align, 'left_top')
surface = cairo.ImageSurface(cairo.Format.ARGB32, 10, 10)
context = cairo.Context(surface)
font = Pango.FontDescription('%s %s 10' % (self.get_style('font'), self.get_style('font_weight')))
layout = PangoCairo.create_layout(context)
layout.set_font_description(font)
layout.set_text('0', -1) # naively assuming 0 is the highest glyph
size = layout.get_pixel_size()
if size[1] > 10:
self.font_size = self.inner_height * 10/size[1]
else:
self.font_size = self.inner_height # probably not gonna happen
self.direction = 'left'
self.offset = 0.0
self.step = speed / self.app.config['FPS'] # i.e. speed in pixel/s
def draw_background(self, context):
context.set_source_rgba(*self.colors['background'].tuple_rgba())
context.rectangle(self.x + self.margin_left, self.y + self.margin_top, self.padded_width, self.padded_height)
context.fill()
def draw(self, context):
text = self.monitor.caption(self.text)
self.draw_background(context)
context.save()
context.rectangle(self.x + self.margin_left + self.padding_left, self.y + self.margin_top + self.padding_top, self.inner_width, self.inner_height)
context.clip()
context.set_source_rgba(*self.colors['foreground'].tuple_rgba())
font = Pango.FontDescription('%s %s %d' % (self.get_style('font'), self.get_style('font_weight'), self.font_size))
layout = PangoCairo.create_layout(context)
layout.set_font_description(font)
layout.set_text(text, -1)
size = layout.get_pixel_size()
align_offset = alignment_offset(self.align, size) # needed for the specified text alignment
#align_offset = [sum(x) for x in zip(alignment_offset(self.align, size), alignment_offset(self.align, [self.inner_width, self.inner_height]))] # needed for the specified text alignment
max_offset = size[0] - self.inner_width # biggest needed offset for marquee, can be negative if all text fits
if max_offset <= 0 or text != self.previous_text:
self.direction = 'left'
self.offset = 0
x = self.x + self.margin_left + self.padding_left - self.offset
if max_offset < 0: # only account for horizontal offset when space allows
x += align_offset[0]
if self.align.startswith('center'):
x += self.inner_width / 2
elif self.align.startswith('right'):
x += self.inner_width
y = self.y + self.margin_top + self.padding_top + align_offset[1] # NOTE: does stuff even show up with vertical align != center?
if self.align.endswith('center'):
y += self.inner_height / 2
elif self.align.endswith('bottom'):
y += self.inner_height
context.translate(x, y)
PangoCairo.update_layout(context, layout)
PangoCairo.show_layout(context, layout)
context.restore()
if self.direction == 'left':
self.offset += self.step
if self.offset > max_offset:
self.direction = 'right'
self.offset = max_offset
else:
self.offset -= self.step
if self.offset < 0:
self.direction = 'left'
self.offset = 0
self.previous_text = text
[docs]class Rect(Visualizer):
"""
.. image:: _static/rect.png
"""
def draw_rect(self, context, value, color, offset=0.0):
self.set_brush(context, color)
context.rectangle(
self.x + self.margin_left + self.padded_width * offset,
self.y + self.margin_top,
self.padded_width * value,
self.padded_height)
context.fill()
def draw_background(self, context):
self.draw_rect(context, 1, self.colors['background'])
def draw(self, context):
colors = self.palette(self.colors['foreground'], len(self.elements))
offset = 0.0
self.draw_background(context)
for idx, element in enumerate(self.elements):
value = self.monitor.normalize(element)
if self.combination != 'cumulative':
value /= len(self.elements)
color = colors[idx]
self.draw_rect(context, value, color, offset)
if self.combination.startswith('cumulative'):
offset += value
else:
offset += 1.0 / len(self.elements)
self.draw_captions(context)
[docs]class MirrorRect(Visualizer):
"""
Mirrored variant of :class:`Rect`.
"""
def __init__(self, app, monitor, **kwargs):
self.left = kwargs['elements'][0]
self.right = kwargs['elements'][1]
kwargs['elements'] = self.left + self.right
super(MirrorRect, self).__init__(app, monitor, **kwargs)
self.x_center = self.x + self.margin_left + (self.inner_width / 2)
self.draw_left = self.draw_rect_negative
self.draw_right = self.draw_rect
[docs] def legend_info(self):
""" defines colors for legend elements """
data = collections.OrderedDict()
colors = self.palette(self.colors['foreground'], max(len(self.left), len(self.right)))
parts = [[] for _ in range(0, len(colors))]
for elements in (self.left, self.right):
for idx, element in enumerate(elements):
parts[idx].append(element.split('.'))
condensed = []
for items in parts:
condensed.append(condense_addr_parts(items))
assert len(colors) == len(condensed)
for idx, condensed_name in enumerate(condensed):
data[condensed_name] = (colors[idx], condensed_name)
return data
def draw_rect(self, context, value, color, offset=0.0):
self.set_brush(context, color)
context.rectangle(self.x_center + self.inner_width / 2 * offset, self.y + self.margin_top + self.padding_top, self.inner_width / 2 * value, self.inner_height)
context.fill()
def draw_rect_negative(self, context, value, color, offset=0.0):
self.set_brush(context, color)
context.rectangle(self.x_center - self.inner_width / 2 * offset - self.inner_width / 2 * value, self.y + self.margin_top + self.padding_top, self.inner_width / 2 * value, self.inner_height)
context.fill()
def draw(self, context):
colors = self.palette(self.colors['foreground'], max(len(self.left), len(self.right)))
self.draw_background(context)
for elements, drawer in ((self.left, self.draw_left), (self.right, self.draw_right)):
offset = 0.0
for idx, element in enumerate(elements):
value = self.monitor.normalize(element)
if self.combination != 'cumulative':
value /= len(elements)
color = colors[idx]
drawer(context, value, color, offset)
if self.combination.startswith('cumulative'):
offset += value
else:
offset += 1.0 / len(elements)
self.draw_captions(context)
[docs]class Arc(Visualizer):
"""
.. image:: _static/arc.png
Parameters:
stroke_width (int): width of the arc in pixels.
"""
def __init__(self, app, monitor, stroke_width=5, **kwargs):
super(Arc, self).__init__(app, monitor, **kwargs)
self.stroke_width = stroke_width
#self.radius = (min(self.width, self.height) / 2) - (2 * self.padding) - (self.stroke_width / 2)
self.radius = (min(self.inner_width, self.inner_height) / 2) - self.stroke_width / 2
self.x_center = self.x + self.margin_left + self.padding_left + (self.inner_width / 2)
self.y_center = self.y + self.margin_top + self.padding_top + (self.inner_height / 2)
def draw_arc(self, context, value, color, offset=0.0):
context.set_line_width(self.stroke_width)
context.set_line_cap(cairo.LINE_CAP_BUTT)
self.set_brush(context, color)
context.arc(
self.x_center,
self.y_center,
self.radius,
math.pi / 2 + math.pi * 2 * offset,
math.pi / 2 + math.pi * 2 * (offset + value)
)
context.stroke()
def draw_background(self, context):
self.draw_arc(context, 1, self.colors['background'])
def draw(self, context):
self.draw_background(context)
colors = self.palette(self.colors['foreground'], len(self.elements))
offset = 0.0
for idx, element in enumerate(self.elements):
value = self.monitor.normalize(element)
if self.combination != 'cumulative':
value /= len(self.elements)
color = colors[idx]
self.draw_arc(context, value, color, offset=offset)
if self.combination == 'separate':
offset += 1 / len(self.elements)
else:
offset += value
self.draw_captions(context)
[docs]class MirrorArc(MirrorRect, Arc):
"""
Mirrored variant of :class:`Arc`.
"""
def __init__(self, app, monitor, **kwargs):
super(MirrorArc, self).__init__(app, monitor, **kwargs)
self.draw_left = self.draw_arc_negative
self.draw_right = self.draw_arc
def draw_arc(self, context, value, color, offset=0.0):
value /= 2
offset /= 2
super(MirrorArc, self).draw_arc(context, value, color, offset=offset)
def draw_arc_negative(self, context, value, color, offset=0.0):
context.set_line_width(self.stroke_width)
context.set_line_cap(cairo.LINE_CAP_BUTT)
self.set_brush(context, color)
context.arc_negative(
self.x_center,
self.y_center,
self.radius,
math.pi / 2 - math.pi * offset,
math.pi / 2 - math.pi * (offset + value)
)
context.stroke()
def draw_background(self, context):
self.draw_arc(context, 2, self.colors['background'])
[docs]class Plot(Visualizer):
"""
.. image:: _static/plot.png
Parameters:
num_points (int, optional): The number of datapoints to show.
autoscale (bool, optional): Whether to automatically "zoom" into the data.
markers (bool, optional): Whether tho render markers at data point coordinates.
line (bool, optional): Whether to draw a line.
grid (bool, optional): Whether to draw a grid. The grid automatically adapts if ``autoscale`` is ``True``.
**kwargs: passed on to :class:`Visualizer`\.
"""
def __init__(self, app, monitor, num_points=None, autoscale=None, markers=None, line=None, grid=None, **kwargs):
super(Plot, self).__init__(app, monitor, **kwargs)
if num_points:
self.num_points = num_points
self.step = self.inner_width / (num_points - 1)
assert int(self.step) >= 1, "num_points %d exceeds pixel density!" % num_points
else:
# closest fit to 8px step that fills self.inner_width perfectly
self.num_points = int(self.inner_width // 8)
self.step = self.inner_width / (self.num_points - 1)
self.prepare_points() # creates self.points with a deque for every plot
self.autoscale = ignore_none(autoscale, self.get_style('autoscale'))
self.markers = ignore_none(markers, self.get_style('markers'))
self.line = ignore_none(line, self.get_style('line'))
self.grid = ignore_none(grid, self.get_style('grid'))
self.grid_height = self.inner_height
self.colors['plot_line'] = self.colors['foreground'].clone()
self.colors['plot_line'].alpha *= 0.8
self.colors['plot_fill'] = self.colors['foreground'].clone()
if self.line:
self.colors['plot_fill'].alpha *= 0.25
self.colors['grid_major'] = self.colors['plot_line'].clone()
self.colors['grid_major'].alpha *= 0.5
self.colors['grid_minor'] = self.colors['background'].clone()
self.colors['grid_minor'].alpha *= 0.8
self.colors['grid_milli'] = self.colors['background'].clone()
self.colors['grid_milli'].alpha *= 0.4
self.colors['caption_scale'] = self.colors['foreground'].clone()
self.colors['caption_scale'].alpha *= 0.6
self.colors_plot_marker = self.palette(self.colors['foreground'], len(self.elements))
self.colors_plot_line = self.palette(self.colors['plot_line'], len(self.elements))
self.colors_plot_fill = self.palette(self.colors['plot_fill'], len(self.elements))
def prepare_points(self):
self.points = collections.OrderedDict()
for element in self.elements:
self.points[element] = collections.deque([], self.num_points)
def get_scale_factor(self, elements=None):
if elements is None:
elements = self.elements
if self.combination.startswith('cumulative'):
cumulative_points = []
for idx in range(0, self.num_points):
value = 0.0
for element in elements:
try:
value += self.points[element][idx]
except IndexError as e:
continue # means self.points deques aren't filled completely yet
if self.combination == 'cumulative_force':
cumulative_points.append(value / len(elements))
else:
cumulative_points.append(value)
p = max(cumulative_points)
else:
maxes = []
for element in elements:
if len(self.points[element]):
maxes.append(max(self.points[element]))
p = max(maxes)
if p > 0:
return 1.0 / p
return 0.0
def get_points_scaled(self, element, elements=None):
if elements is None:
elements = self.elements
scale_factor = self.get_scale_factor(elements)
if scale_factor == 0.0:
return [0.0 for _ in range(0, len(self.points[element]))]
r = []
for amplitude in self.points[element]:
r.append(amplitude * scale_factor)
return r
def draw_grid(self, context, elements=None):
if elements is None:
elements = self.elements
scale_factor = self.get_scale_factor(elements)
context.set_line_width(1)
context.set_source_rgba(*self.colors['grid_minor'].tuple_rgba())
#context.set_dash([1,1])
x_coords = [x * self.step for x in range(0, self.num_points - 1)]
x_coords = [self.x + self.margin_left + self.padding_left + x for x in x_coords]
for x in x_coords:
context.move_to(x, self.y + self.margin_top + self.padding_top)
context.line_to(x, self.y + self.margin_top + self.padding_top + self.grid_height)
context.stroke()
if not self.autoscale:
for i in range(0, 110, 10): # 0,10,20..100
value = i / 100.0
y = self.y + self.margin_top + self.padding_top + self.grid_height - self.grid_height * value
context.move_to(self.x + self.margin_left + self.padding_left, y)
context.line_to(self.x + self.margin_left + self.padding_left + self.inner_width, y)
context.stroke()
elif scale_factor > 0:
if scale_factor > 1000:
return # current maximum value under 1 permill, thus no guides are placed
elif scale_factor > 100:
# current maximum under 1 percent, place permill guides
context.set_source_rgba(*self.colors['grid_milli'].tuple_rgba())
for i in range(0, 10):
# place lines for 0-9 percent
value = i / 1000.0 * scale_factor
y = self.y + self.margin_top + self.padding_top + self.grid_height - self.grid_height * value
if y < self.y + self.margin_top + self.padding_top:
break # stop the loop if guides would be placed outside the visualizer
context.move_to(self.x + self.margin_left + self.padding_left, y)
context.line_to(self.x + self.margin_left + self.padding_left + self.inner_width, y)
context.stroke()
elif scale_factor > 10:
context.set_source_rgba(*self.colors['grid_minor'].tuple_rgba())
for i in range(0, 10):
# place lines for 0-9 percent
value = i / 100.0 * scale_factor
y = self.y + self.margin_top + self.padding_top + self.grid_height - self.grid_height * value
if y < self.y + self.margin_top + self.padding_top:
break # stop the loop if guides would be placed outside the visualizer
context.move_to(self.x + self.margin_left + self.padding_left, y)
context.line_to(self.x + self.margin_left + self.padding_left + self.inner_width, y)
context.stroke()
else: # major (10% step) guides
context.set_source_rgba(*self.colors['grid_major'].tuple_rgba())
for i in range(0, 110, 10): # 0,10,20..100
value = i / 100.0 * scale_factor
y = self.y + self.margin_top + self.padding_top + self.grid_height - self.grid_height * value
if y < self.y + self.margin_top + self.padding_top:
break # stop the loop if guides would be placed outside the visualizer
context.move_to(self.x + self.margin_left + self.padding_left, y)
context.line_to(self.x + self.margin_left + self.padding_left + self.inner_width, y)
context.stroke()
#context.set_dash([1,0]) # reset dash
def draw_plot(self, context, points, colors, offset=None):
coords = []
for idx, amplitude in enumerate(points):
if offset:
amplitude += offset[idx]
coords.append((
self.x + idx * self.step + self.margin_left + self.padding_left,
self.y + self.margin_top + self.padding_top + self.inner_height - (self.inner_height * amplitude)
))
if self.line:
# draw lines
context.set_source_rgba(*colors['plot_line'].tuple_rgba())
context.set_line_width(2)
#context.set_line_cap(cairo.LINE_CAP_BUTT)
for idx, (x, y) in enumerate(coords):
if idx == 0:
context.move_to(x, y)
else:
context.line_to(x, y)
context.stroke()
if self.pattern:
context.set_source_surface(self.pattern(colors['plot_fill']))
context.get_source().set_extend(cairo.Extend.REPEAT)
context.move_to(self.x + self.margin_left + self.padding_left, self.y + self.margin_top + self.padding_top + self.inner_height)
for idx, (x, y) in enumerate(coords):
context.line_to(x, y)
if offset: # "cut out" the offset at the bottom
previous_amplitude = None
for i, amplitude in enumerate(reversed(offset)):
if len(offset) - i > len(points):
continue # ignore x coordinates not reached yet by the graph
if (amplitude != previous_amplitude or i == len(offset) - 1):
offset_x = self.x + self.margin_left + self.padding_left + self.inner_width - i * self.step
offset_y = self.y + self.margin_top + self.padding_top + self.inner_height - self.inner_height * amplitude
context.line_to(offset_x, offset_y)
else:
context.line_to(x, self.y + self.margin_top + self.padding_top + self.inner_height)
context.close_path()
context.fill()
if self.markers:
# place points
for (x, y) in coords:
context.set_source_rgba(*colors['plot_marker'].tuple_rgba())
context.arc(
x,
y,
2,
0,
2 * math.pi
)
context.fill()
def update(self, context):
for element in set(self.elements): # without set() the same element multiple times leads to multiple same points added every time.
self.points[element].append(self.monitor.normalize(element))
super(Plot, self).update(context)
def draw(self, context):
self.draw_background(context)
if self.grid:
self.draw_grid(context)
if self.autoscale:
scale_factor = self.get_scale_factor()
if scale_factor == 0.0:
text = u"∞X"
else:
text = "%sX" % pretty_si(self.get_scale_factor())
self.app.draw_text(context, text, self.x + self.margin_left +self.padding_left + self.inner_width, self.y, align='right_top', color=self.colors['caption_scale'], font_size=self.get_style('font_size', 'scale'))
colors_plot_marker = self.palette(self.colors['foreground'], len(self.elements))
colors_plot_line = self.palette(self.colors['plot_line'], len(self.elements))
colors_plot_fill = self.palette(self.colors['plot_fill'], len(self.elements))
offset = [0.0 for _ in range(0, self.num_points)]
for idx, element in enumerate(self.elements):
colors = {
'plot_marker': self.colors_plot_marker[idx],
'plot_line': self.colors_plot_line[idx],
'plot_fill': self.colors_plot_fill[idx]
}
if self.autoscale:
points = self.get_points_scaled(element)
else:
points = list(self.points[element])
if self.combination == 'separate':
self.draw_plot(context, points, colors)
elif self.combination.startswith('cumulative'):
if self.combination == 'cumulative_force':
for idx in range(0, len(points)):
points[idx] /= len(self.elements)
self.draw_plot(context, points, colors, offset)
for idx, value in enumerate(points):
offset[idx] += value
self.draw_captions(context)
[docs]class MirrorPlot(Plot):
"""
Mirrored variant of :class:`Plot`.
"""
def __init__(self, app, monitor, scale_lock=True, **kwargs):
self.up = kwargs['elements'][0]
self.down = kwargs['elements'][1]
kwargs['elements'] = self.up + self.down
super(MirrorPlot, self).__init__(app, monitor, **kwargs)
self.y_center = self.y + self.margin_top + self.padding_top + (self.inner_height / 2)
self.scale_lock = scale_lock # bool, whether to use the same scale for up and down
self.grid_height /= 2
palette_len = max((len(self.up), len(self.down)))
self.colors_plot_marker = self.palette(self.colors['foreground'], palette_len)
self.colors_plot_line = self.palette(self.colors['plot_line'], palette_len)
self.colors_plot_fill = self.palette(self.colors['plot_fill'], palette_len)
def legend_info(self):
data = collections.OrderedDict()
colors = self.palette(self.colors['foreground'], max(len(self.up), len(self.down)))
parts = [[] for _ in range(0, len(colors))]
for elements in (self.up, self.down):
for idx, element in enumerate(elements):
parts[idx].append(element.split('.'))
condensed = []
for items in parts:
condensed.append(condense_addr_parts(items))
assert len(colors) == len(condensed)
for idx, condensed_name in enumerate(condensed):
data[condensed_name] = (colors[idx], condensed_name)
return data
def prepare_points(self):
self.points = collections.OrderedDict()
for element in self.elements:
self.points[element] = collections.deque([], self.num_points)
def get_scale_factor(self, elements=None):
if self.scale_lock:
scale_up = super(MirrorPlot, self).get_scale_factor(self.up)
scale_down = super(MirrorPlot, self).get_scale_factor(self.down)
if (scale_up > 0) and (scale_down > 0): # both values > 0
return min((scale_up, scale_down))
elif (scale_up == 0) != (scale_down == 0): # one value is 0
return max(scale_up, scale_down)
else: # both values are 0
return 0
return super(MirrorPlot, self).get_scale_factor(elements)
def draw_grid(self, context, elements=None):
context.set_line_width(1)
context.set_source_rgba(*self.colors['grid_milli'].tuple_rgba())
context.move_to(self.x + self.margin_left + self.padding_left, self.y_center)
context.line_to(self.x + self.margin_left + self.padding_left + self.inner_width, self.y_center)
context.stroke()
if elements == self.up:
context.translate(0, self.grid_height)
super(MirrorPlot, self).draw_grid(context, elements=elements)
context.translate(0, -self.grid_height)
else:
super(MirrorPlot, self).draw_grid(context, elements=elements)
def draw_plot(self, context, points, colors, offset=None):
points = [x/2 for x in points]
if not offset is None:
offset =[x/2 for x in offset]
context.translate(0, -self.inner_height / 2)
super(MirrorPlot, self).draw_plot(context, points, colors, offset=offset)
context.translate(0, self.inner_height / 2)
def draw_plot_negative(self, context, points, colors, offset=None):
points = [-x for x in points]
if not offset is None:
offset = [-x for x in offset]
self.draw_plot(context, points, colors, offset=offset)
def update(self, context):
for element in set(self.up + self.down):
self.points[element].append(self.monitor.normalize(element))
super(Plot, self).update(context) # TRAP: calls parent of parent, not direct parent!
def draw(self, context):
# TODO: This is mostly a copy-paste of Plot.update+draw, needs moar DRY.
self.draw_background(context)
for elements, drawer in ((self.up, self.draw_plot), (self.down, self.draw_plot_negative)):
if self.grid:
self.draw_grid(context, elements)
if self.autoscale:
scale_factor = self.get_scale_factor(elements)
if scale_factor == 0.0:
text = u"∞X"
else:
text = "%sX" % pretty_si(self.get_scale_factor(elements))
if elements == self.up:
self.app.draw_text(context, text, self.x + self.margin_left + self.padding_left + self.inner_width, self.y + self.margin_top + self.padding_top, align='right_bottom', color=self.colors['caption_scale'], font_size=10)
elif not self.scale_lock: # don't show 'down' scalefactor if it's locked
self.app.draw_text(context, text, self.x + self.margin_left + self.padding_left + self.inner_width, self.y + self.margin_top + self.padding_top + self.inner_height, align='right_top', color=self.colors['caption_scale'], font_size=10)
offset = [0.0 for _ in range(0, self.num_points)]
for idx, element in enumerate(elements):
colors = {
'plot_marker': self.colors_plot_marker[idx],
'plot_line': self.colors_plot_line[idx],
'plot_fill': self.colors_plot_fill[idx]
}
if self.autoscale:
points = self.get_points_scaled(element, elements)
else:
points = list(self.points[element])
if self.combination == 'separate':
drawer(context, points, colors)
elif self.combination.startswith('cumulative'):
if self.combination == 'cumulative_force':
for idx in range(0, len(points)):
points[idx] /= len(elements)
drawer(context, points, colors, offset)
for idx, value in enumerate(points):
offset[idx] += value
self.draw_captions(context)
[docs]class Box(object):
"""
Can wrap multiple :class:`Visualizer`\s, used for layouting.
Orders added visualizers from left to right and top to bottom.
This is basically a smart helper for :func:`Gulik.add_visualizer`.
"""
def __init__(self, app, x, y, width, height):
self._last_right = 0
self._last_top = 0
self._last_bottom = 0
self._next_row_x = 0
self.app = app
self.x = x
self.y = y
self.width = width
self.height = height
[docs] def place(self, component, cls, **kwargs):
"""
place a new :class:`Visualizer`.
Parameters:
component (str): The :ref:`component` string identifying the data source.
cls (:class:`type`, child of :class:`Visualizer`): The visualizer class to instantiate.
**kwargs: passed on to ``cls``\' constructor. width and height defined in here are honored.
"""
width = kwargs.get('width', None)
height = kwargs.get('height', None)
if width is None:
width = self.width - self._last_right
if height is None:
height = self._last_bottom - self._last_top # same height as previous visualizer
if height == 0: # should only happen on first visualizer
height = self.height
elif height is None:
height = self.height - self._last_bottom
if self._last_right + width > self.width: # move to next "row"
#print("next row", component, cls, kwargs)
x = self._next_row_x
y = self._last_bottom
self._next_row_x = 0 # this will probably break adding a third multi-stacked column, but works for now
else:
x = self._last_right
y = self._last_top
kwargs['x'] = self.x + x
kwargs['y'] = self.y + y
kwargs['width'] = width
kwargs['height'] = height
self.app.add_visualizer(component, cls, **kwargs)
if y + height < self._last_bottom:
self._next_row_x = self._last_right
self._last_right = x + width
self._last_top = y
self._last_bottom = y + height
assert self._last_bottom <= self.y + self.height, f"Box already full! Can't place {cls.__name__}"
[docs]class Gulik(object):
"""
The main object thingamabob.
"""
monitor_table = {
'cpu': CPUMonitor,
'memory': MemoryMonitor,
'network': NetworkMonitor,
'battery': BatteryMonitor,
'disk': DiskMonitor
}
def __init__(self, configpath):
self.started = False
self.will_to_live = True # only needed because of gtk
self.screen = Gdk.Screen.get_default()
self.window = Window()
self.window.connect('draw', self.draw)
self.configpath = configpath
self.config = {}
self.setup = self.autosetup
self.monitors = {}
self.visualizers = []
#self.boxes = []
self.apply_config()
def reset(self):
# clear out any existing visualizers and monitors
for monitor in self.monitors.values(): # kill any existing monitors
monitor.seppuku = True
self.monitors = {}
self.visualizers = []
#self.boxes = []
def module_to_config(self, locals):
config = {}
for key in DEFAULTS:
config[key] = locals.get(key, DEFAULTS[key])
for key in set(locals) - set(DEFAULTS): # iterates through everything defined in config.py we haven't already covered with DEFAULTS
if key == key.upper(): # all-caps means it's config
config[key] = locals[key]
if not 'COLOR_FOREGROUND_TEXT' in config:
config['COLOR_FOREGROUND_TEXT'] = config['COLOR_CAPTION']
return config
def add_netdata_monitors(self):
# create monitors for all netdata hosts
for host in self.config['NETDATA_HOSTS']:
if isinstance(host, (list, tuple)): # handle (host, port) tuples and bare hostnames as values
host, port = host
else:
port = 19999
component = f"netdata-{host}"
self.monitors[component] = NetdataMonitor(self, component, host, port)
def resize_and_move(self):
self.window.resize(self.config['WIDTH'], self.config['HEIGHT'])
self.window.move(self.config['X'], self.config['Y']) # move apparently must be called after show_all
def apply_config(self):
print(f"Trying to load config from {self.configpath}…")
custom = False # whether gulik is using a custom configuration
old_config = self.config
old_setup = self.setup
self.config = {}
try:
fd = open(self.configpath, mode='r')
config_string = fd.read()
except OSError as e:
print("No config at '%s' or insufficient permissions to read it. Falling back to defaults…" % self.configpath)
config_locals = DEFAULTS
else:
try:
config_locals = {}
exec(config_string, config_locals)
custom = True
except Exception as e:
print("Error in '%s': %s" % (self.configpath, e))
print("Falling back to defaults…")
config_locals = DEFAULTS
self.config = self.module_to_config(config_locals)
self.resize_and_move()
if 'setup' in config_locals:
self.setup = functools.partial(config_locals['setup'], app=self)
else:
self.setup = self.autosetup
self.reset()
self.add_netdata_monitors()
# NOTE: psutil-based monitors are autoloaded in add_visualizer and thus don't have to be handled here like netdata monitors
# finally, run the actual setup function to place all visualizers
try:
self.setup()
print("Done.")
except Exception as e:
# Hokay, so I *think* this catches all failure scenarios for config (re)loads…
if not custom:
print("Sorry, friend. The autosetup seems to be broken on your machine. You might want to file a bug report.")
print("Alternatively, you can create a custom-fit configuration at ~/.config/gulik/config.py.")
self.stop()
return
else:
if isinstance(self.setup, functools.partial):
name = self.setup.func.__name__
else:
name = self.setup.__name__
print(f"Your custom 'setup' function failed - {type(e).__name__}: {e}")
print("Falling back to previous configuration")
self.monitors = {}
self.visualizers = []
self.config = old_config or self.module_to_config(DEFAULTS) # DEFAULTS if config was empty before (happens on first time this function is called on a Gulik object
self.setup = old_setup
self.resize_and_move()
self.add_netdata_monitors()
try:
self.setup()
except Exception as e:
print("Previous setup failed, too. Reverting to autosetup.")
self.monitors = {}
self.visualizers = []
self.config = self.module_to_config(DEFAULTS)
self.setup = self.autosetup
self.resize_and_move()
self.add_netdata_monitors()
try:
self.setup()
except Exception as e:
raise
print("Well, this should never happen.")
print(e)
self.stop()
def signal_reload(self, *_):
self.apply_config() # re-creates monitors and visualizers
for monitor in self.monitors.values():
monitor.start()
def tick(self):
for monitor in self.monitors.values():
monitor.tick()
self.window.queue_draw()
return self.will_to_live # gtk stops executing timeout callbacks if they don't return True
def draw_text(self, context, text, x, y, align=None, color=None, font_size=None, font_weight=None):
if align is None:
align = 'left_top'
if color is None:
color = self.config['COLOR_CAPTION']
context.set_source_rgba(*color.tuple_rgba())
font_size = font_size or self.config['FONT_SIZE']
font_weight = font_weight or self.config['FONT_WEIGHT']
font = Pango.FontDescription('%s %s %d' % (self.config['FONT'], font_weight, font_size))
layout = PangoCairo.create_layout(context)
layout.set_font_description(font)
layout.set_text(text, -1)
size = layout.get_pixel_size()
x_offset, y_offset = alignment_offset(align, size)
context.translate(x + x_offset, y + y_offset)
PangoCairo.update_layout(context, layout)
PangoCairo.show_layout(context, layout)
context.translate(-x - x_offset, -y - y_offset)
return size
def draw_gulik(self, context):
# gulik is the messenger of our lady discordia.
# he is 200 by 133 poxels big
gulik = cairo.ImageSurface.create_from_png(os.path.join(__path__[0], 'gulik.png'))
context.save()
context.set_operator(cairo.OPERATOR_SOFT_LIGHT)
context.translate(self.config['WIDTH'] - 200, self.config['HEIGHT'] - 133)
context.set_source_surface(gulik)
context.rectangle(0, 0, 200, 133)
context.fill()
context.restore()
def draw(self, window, context):
context.set_operator(cairo.OPERATOR_CLEAR)
context.paint()
context.set_operator(cairo.OPERATOR_OVER)
context.set_source_rgba(*self.config['COLOR_WINDOW_BACKGROUND'].tuple_rgba())
context.rectangle(0, 0, self.config['WIDTH'], self.config['HEIGHT'])
context.fill()
self.draw_gulik(context)
for visualizer in self.visualizers:
visualizer.update(context)
def start(self):
assert self.config['FPS'] != 0
assert isinstance(self.config['FPS'], float) or self.config['FPS'] >= 1
for monitor in self.monitors.values():
monitor.start()
signal.signal(signal.SIGINT, self.stop) # so ctrl+c actually kills gulik
signal.signal(signal.SIGTERM, self.stop) # so kill actually kills gulik, and doesn't leave a braindead Gtk.main and Collector processes dangling around
signal.signal(signal.SIGUSR1, self.signal_reload) # reload config on user-defined signal. (no, not HUP)
GLib.timeout_add(1000/self.config['FPS'], self.tick)
self.tick() # first tick without delay so we get output asap
self.started = True
Gtk.main() # blocks until Gtk.main.quit is called
print("\nThank you for flying with phryk evil mad sciences, LLC. Please come again.")
def stop(self, num=None, frame=None):
spinner = '▏▎▍▌▋▊▉▉▊▋▌▍▎'
self.will_to_live = False # will stop calls by gtk main loop to self.tick
for monitor in self.monitors.values():
monitor.seppuku = True
i = 0
while monitor.is_alive(): # show spinner animation until the monitor is finished
i += 1
spinner_idx = i % len(spinner)
sys.stdout.write(f"\r{spinner[spinner_idx]} ")
sys.stdout.flush()
time.sleep(1/24)
#sys.stdout.write("\rn")
#sys.stdout.flush()
if self.started: # Gtk.main.quit throws an error if called when not started
Gtk.main_quit()
else:
exit() # to actually quit the program before Gtk.main is running
def box(self, x=0, y=0, width=None, height=None):
width = ignore_none(width, self.config['WIDTH'] - x)
height = ignore_none(height, self.config['HEIGHT'] - y)
box = Box(self, x, y, width, height)
#self.boxes.append(box)
return box
def add_visualizer(self, component, cls, **kwargs):
if not component in self.monitors:
if component in self.monitor_table:
print("Autoloading %s!" % self.monitor_table[component].__name__)
self.monitors[component] = self.monitor_table[component](self, component)
elif component.startswith('netdata-'):
raise LookupError(f"Unknown netdata host '{component[8:]}'")
else:
raise LookupError("No monitor class known for component '%s'. Custom monitor classes have to be added to Gulik.monitor_table to enable autoloading." % component)
monitor = self.monitors[component]
visualizer = cls(self, monitor, **kwargs)
self.visualizers.append(visualizer)
def autosetup(self, x=0, y=0, width=None, height=None):
box = self.box(x, y, width, height)
cpu_num = psutil.cpu_count()
all_cores = ['core_%d' % x for x in range(0, cpu_num)]
box.place(
'cpu',
Arc,
#elements=['aggregate'],
elements=all_cores,
width=box.width,
height=box.width,
stroke_width=10,
combination='cumulative_force',
legend=False,
padding_bottom=5,
captions=[
{
'text': 'CPU',
'position': 'left_top',
'align': 'left_top',
'font_weight': 'Bold',
'operator': Operator.HARD_LIGHT,
},
{
'text': '{aggregate:.1f}%',
'position': 'center_center',
'align': 'center_center',
},
{
'text': '{count} cores',
'position': 'right_bottom',
'align': 'right_bottom',
}
]
)
box.place(
'cpu',
Plot,
elements=all_cores,
width=box.width,
height=130,
autoscale=True,
combination='cumulative_force',
)
box.place(
'memory',
Arc,
elements=['other', 'top_3', 'top_2', 'top_1'],
width=box.width,
height=box.width + 100,
padding_bottom=80, # make space for 4 rows of legend
stroke_width=30,
combination='cumulative',
#legend_format="{{{element}}}",
legend_order='reverse',
legend_format="{{{element}.name}} ({{{element}.size}})",
captions=[
{
'text': 'memory',
'position': 'left_top',
'align': 'left_top',
'font_weight': 'Bold',
'operator': Operator.HARD_LIGHT,
},
{
'text': '{percent:.1f}%',
'position': 'center_center',
'align': 'center_center'
},
{
'text': '{total}',
'position': 'right_bottom',
'align': 'right_bottom',
}
]
)
last_visualizer = self.visualizers[-1]
palette = [color for color in reversed(last_visualizer.palette(last_visualizer.colors['foreground'], 4))]
#box.place('memory', Text, text='{top_1.name} ({top_1.size})', width=box.width, height=25, foreground=palette[0])
#box.place('memory', Text, text='{top_2.name} ({top_2.size})', width=box.width, height=25, foreground=palette[1])
#box.place('memory', Text, text='{top_3.name} ({top_3.size})', width=box.width, height=25, foreground=palette[2])
#box.place('memory', Text, text='other({other.size}/{other.count})', width=box.width, height=25, foreground=palette[3])
all_nics = [x for x in psutil.net_if_addrs().keys()]
all_nics_up = ['%s.bytes_sent' % x for x in all_nics]
all_nics_down = ['%s.bytes_recv' % x for x in all_nics]
all_nics_up_packets_ = ['%s.packets_sent' % x for x in all_nics]
all_nics_down_packets = ['%s.packets_recv' % x for x in all_nics]
all_nics_up_errors = ['%s.errout' % x for x in all_nics]
all_nics_down_errors = ['%s.errin' % x for x in all_nics]
all_nics_up_drop = ['%s.dropout' % x for x in all_nics]
all_nics_down_drop = ['%s.dropin' % x for x in all_nics]
partitions = psutil.disk_partitions()
all_partitions = []
for partition in partitions:
name = partition.device.split('/')[-1].replace('.', '-')
all_partitions.append(name)
all_disks = list(psutil.disk_io_counters(perdisk=True))
all_disks_read = [f"io.{disk}.read_bytes" for disk in all_disks]
all_disks_write = [f"io.{disk}.write_bytes" for disk in all_disks]
disk_busy_size = 130 / len(all_disks)
box.place(
'disk',
MirrorPlot,
width=box.width - disk_busy_size,
height=130,
elements=[all_disks_read, all_disks_write],
caption_placement='padding',
captions=[
{
'text': 'disk',
'position': 'left_top',
'align': 'left_top',
'font_weight': 'Bold',
'operator': Operator.HARD_LIGHT,
}
]
)
last_visualizer = self.visualizers[-1]
colors = last_visualizer.palette(last_visualizer.colors['foreground'], len(all_disks))
for idx, disk in enumerate(all_disks):
box.place(
'disk',
Arc,
width=disk_busy_size,
height=disk_busy_size,
foreground=colors[idx],
legend=False,
padding=0,
stroke_width=10,
elements=[f"io.{disk}.busy_time"]
)
# box.place(
# 'network',
# MirrorArc,
# width=box.width,
# height=box.width,
# padding_bottom=5,
# legend=False,
# elements=[all_nics_up, all_nics_down],
# combination='cumulative_force',
# captions=[
# {
# 'text': 'network',
# 'position': 'left_top',
# 'align': 'left_top',
# 'font_weight': 'Bold',
# 'operator': Operator.HARD_LIGHT,
# },
# {
# 'text': '{aggregate.counters.bytes_sent}\n{aggregate.counters.bytes_recv}',
# #'text': '{em0.all_addrs}',
# 'position': 'center_center',
# 'align': 'center_center',
# }
# ]
# )
box.place(
'network',
MirrorPlot,
width=box.width,
height=140,
margin_top=15,
#padding=15,
padding_bottom=50,
elements=[all_nics_up, all_nics_down],
combination='cumulative_force',
autoscale=True,
caption_placement='padding',
captions=[
{
'text': 'network',
'position': 'left_top',
'align': 'left_top',
'font_weight': 'Bold',
'operator': Operator.HARD_LIGHT,
}
]
)
#alignments = ['left_top', 'center_top','right_top']
#palette = self.visualizers[-1].colors_plot_marker
#for idx, if_name in enumerate(all_nics):
# # build a legend
# color = palette[idx]
# align = alignments[idx % 3] # boom.
# box.place('network', Text, text=if_name, foreground=color, width=box.width/3, height=25, align=align)
if psutil.sensors_battery() is not None:
color = self.config['COLOR_FOREGROUND'].clone()
color.alpha = 1
box.place(
'battery',
Rect,
width=box.width,
height=90,
margin_top=35,
elements=['battery'],
foreground=color,
captions=[
{
'text': '{percent}%',
'position': 'left_center',
'align': 'left_center',
'color': color,
'operator': Operator.DIFFERENCE, # such candy, many wow
},
{
'text': '{state}',
'position': 'right_center',
'align': 'right_center',
'color': color,
'operator': Operator.DIFFERENCE,
},
]
)