Source code for mnc.mcs

from __future__ import division, print_function
    range = xrange
except NameError:
import os
import sys
import json
import time
import uuid
import etcd3
import base64
import signal
import warnings
    from io import BytesIO
except ImportError:
    from StringIO import StringIO as BytesIO
from datetime import datetime, timedelta
from textwrap import fill as tw_fill

import matplotlib
from matplotlib import pyplot as plt

from mnc.common import ETCD_HOST, ETCD_PORT

__all__ = ['MonitorPoint', 'MultiMonitorPoint', 'ImageMonitorPoint',
           'MonitorPointCallbackBase', 'CommandCallbackBase', 'Client']

temp = etcd3.client()
ETCD_TYPE = type(temp)
del temp

class MonitorPoint(object):
    Object for representing a monitoring point within the MCS framework.  At a
    minimum this includes:
     * a UNIX timestamp of when the monitoring point was updated,
     * the value of the monitoring point itself, and
     * the units of the monitoring point value or '' if there are none.
    _required = ('timestamp', 'value', 'unit')
    def __init__(self, value, timestamp=None, unit='', **kwargs):
        if isinstance(value, MonitorPoint):
            value = value.as_dict()
        if isinstance(value, dict):
            for key in self._required:
                if key not in value:
                    raise KeyError("Missing required key: %s" % key)
            if timestamp is None:
                timestamp = time.time()
            value = {'timestamp': timestamp, 'value': value, 'unit': unit}
            for k,v in kwargs.items():
                value[k] = v
        self._entries = []
        for k,v in value.items():
            setattr(self, k, v)
    def __repr__(self):
        output = "<%s " % (type(self).__name__,)
        first = True
        for k in self._entries:
            if not first:
                output += ', '
            v = getattr(self, k, None)
            if k == 'timestamp':
                v = datetime.utcfromtimestamp(v)
                v = str(v)
            if isinstance(v, str):
                v = "'%s'" % v
            output += "%s=%s" % (k, v)
            first = False
        output += '>'
        return tw_fill(output, subsequent_indent='    ')
    def __str__(self):
        output = "%s%s at %s" % (str(self.value), self.unit, datetime.utcfromtimestamp(self.timestamp))
        return output
    def __contains__(self, value):
        return value in self._entries
    def from_json(cls, json_value):
        Return a new :py:class:`MonitorPoint` instance based on a JSON-packed dictionary.
        value = json.loads(json_value)
        return cls(value)
    def as_dict(self):
        Return the information about the monitoring point as a dictionary.
        value = {}
        for k in self._entries:
            value[k] = getattr(self, k)
        return value
    def as_json(self):
        Return the information about the monitoring point as JSON-packed
        value = self.as_dict()
        return json.dumps(value)

class MultiMonitorPoint(MonitorPoint):
    Sub-class of :py:class`MonitorPoint` for representing a multiple monitoring
    points updated at the same time within the MCS framework.  At a minimum this
     * a UNIX timestamp of when the monitoring point was updated,
     * a list of values of the monitoring points themselves,
     * a list of field names for each monitoring point, and
     * the units of the monitoring point values or '' if there are none.
    .. note:: If only a single unit is supplied, it is replicated for all
    _required = ('timestamp', 'value', 'field', 'unit')
    def __init__(self, value, timestamp=None, field=None, unit='', **kwargs):
        MonitorPoint.__init__(self, value, timestamp=timestamp, unit=unit, **kwargs)
        # The "multi" part
            if isinstance(self.value, (str, bytes)):
                raise TypeError
        except TypeError:
            self.value = [self.value,]
        if field is None:
                field = self.field
            except AttributeError:
                field = ['value%i' % i for i in range(len(self.value))]
        if isinstance(self.unit, str):
            self.unit = [self.unit for i in range(len(self.value))]
        if len(self.value) != len(field):
            raise RuntimeError("value and field keys must have the same length")
        if len(self.value) != len(self.unit):
            raise RuntimeError("value and unit keys must have the same length")
        if 'field' not in self._entries:
        self.field = field
    def __str__(self):
        output = ', '.join(["%s=%s%s" % v for v in zip(self.field, self.value, self.unit)])
        output += " at %s" % datetime.utcfromtimestamp(self.timestamp)
        return output
    def as_list(self):
        Return the information about the monitoring point as a list of three-
        element tuples containing (field name, value, unit).
        return [e for e in zip(self.field, self.value, self.unit)]

class ImageMonitorPoint(MonitorPoint):
    Sub-class of :py:class:`MonitorPoint` for representing a monitoring point
    image within the MCS framework.  At a minimum this includes:
     * a UNIX timestamp of when the monitoring point was updated,
     * the base64-encoded monitoring point image itself,
     * the MIME-type of the encoded image, and
     * the units of the monitoring point value or '' if there are none.
    _required = ('timestamp', 'value', 'mime', 'unit')
    def _encode_image_data(image_data):
        image_data = base64.urlsafe_b64encode(image_data)
            image_data = image_data.decode()
        except AttributeError:
        return image_data
    def _decode_image_data(image_data):
            image_data = image_data.encode()
        except AttributeError:
        image_data = base64.urlsafe_b64decode(image_data)
        return image_data
    def from_figure(cls, fig):
        Return a new :py:class:`ImageMonitorPoint` instance based on a matplotlib Figure.
        canvas = matplotlib.backends.backend_agg.FigureCanvasAgg(fig)
        image = BytesIO()
        image_data =
        image_data = cls._encode_image_data(image_data)
        return cls(image_data, mime='image/png')
    def from_image(cls, im):
        Return a new :py:class:`ImageMonitorPoint` instance based on a PIL.Image.
        image = BytesIO(), 'PNG')
        image_data =
        image_data = cls._encode_image_data(image_data)
        return cls(image_data, mime='image/png')
    def from_file(cls, name_or_handle):
        Return a new :py:class:`ImageMonitorPoint` instance based the contents
        of a filename or open file handle.
            ts = os.path.getmtime(
            ext = os.path.getext([1]
            if ext not in ('.png', '.jpg', '.jpeg'):
                raise RuntimeError("Provided file does not seem to be a support image format")
            image_data =
        except AttributeError:
            ts = os.path.getmtime(name_or_handle)
            ext = os.path.getext(name_or_handle)[1]
            if ext not in ('.png', '.jpg', '.jpeg'):
                raise RuntimeError("Provided file does not seem to be a support image format")
            with open(name_or_handle, 'rb') as fh:
                image_data =
        image_data = cls._encode_image_data(image_data)
        mime = 'image/png'
        if ext in ('.jpg', '.jpeg'):
            mime = 'image/jpeg'
        return cls(image_data, timestamp=ts, mime=mime)
    def as_array(self):
        Return the data for the monitoring point image as a numpy.array, similar
        to matplotlib.pyplot.imread.
        image_data = self._decode_image_data(self.value)
        image = BytesIO()
        image_data = plt.imread(image)
        return image_data
    def to_file(self, name_or_handle):
        Write the monitoring point image to the specified filename or open
        file handle.
        image_data = self._decode_image_data(self.value)
        except AttributeError:
            with open(name_or_handle, 'wb') as fh:

class MonitorPointCallbackBase(object):
    Base class to use as a callback for when a monitoring point is changed.
    def action(value):
        Static method that should be overridden by the sub-class as to what to
        do with the new monitoring point value.  The method should accept a 
        single argument of a MonitorPoint.
        raise NotImplementedError
    def __call__(self, event):
        output = None
        for evt in
            value = MonitorPoint.from_json(evt.value)
            output = self.action(value)
        return output

class CommandCallbackBase(object):
    Base class to use as a callback for processing a command when it is
    def __init__(self, client):
        if isinstance(client, Client):
            client = client.client
        elif isinstance(client, ETCD_TYPE):
            raise TypeError("Expected a mcs.Client or etcd3.client.Ectd3Client")
        self.client = client
    def action(*args, **kwargs):
        Static method that should be overridden by the sub-class as to how to
        process the command.  The method should accept arbitrary keywords per
        the command and return a two-element tuple of (command status,
        response code).
        raise NotImplementedError
        return status, response
    def __call__(self, event):
        for evt in
                key = evt.key.decode()
            except AttributeError:
                key = evt.key
            value = json.loads(evt.value)
            sequence_id = value['sequence_id']
            command = value['command']
            payload = value['kwargs']
            if 'sequence_id' not in payload:
                    payload['sequence_id'] = sequence_id.decode()
                except AttributeError:
                    payload['sequence_id'] = sequence_id
            ts = time.time()
            status, response = self.action(**payload)
            if isinstance(status, bool):
                status = 'success' if status else 'error'
            status = {'sequence_id': sequence_id,
                      'timestamp': ts,
                      'status': status,
                      'response': response}
            status = json.dumps(status)
            key = '/resp/'+key[5:]
            self.client.put(key, status)

[docs]class Client(object): """ MCS framework client. This can be used for both monitor and control. """ def __init__(self, id=None, timeout=5.0): """ Initialize the client with the specified subsystem ID name and command response timeout in second. A timeout of 'None' allows blocking until a response is received. If the client is anonymous, i.e., 'id' is 'None' then only reading monitoring points and sending commands are supported. """ if id is not None: id = str(id) = id if timeout is None: timeout = 1e9 self.timeout = timeout self.client = etcd3.client(host=ETCD_HOST, port=ETCD_PORT) self._mon_manifest = ['manifest/monitoring', 'manifest/commands'] self._cmd_manifest = [] self._watchers = {} def __del__(self): for command in self._watchers: try: self.client.cancel_watch(self._watchers[command][0]) except Exception: pass self.client.close() def _update_mon_manifest(self, name, drop=False): """ Update the monitoring point manifest as needed. Returns a Boolean of whether or not an update was made. """ with self.client.lock(, ttl=5) as lock: # Is it alread in the local manifest? updated = False value = None if drop: ## Remove from the local manifest try: del self._mon_manifest[self._mon_manifest.index(name)] except ValueError: pass ## Check the published manifest value = self.read_monitor_point('manifest/monitoring') if value is None: value = MonitorPoint([]) try: del value.value[value.value.index(name)] updated = True except ValueError: pass elif name not in self._mon_manifest: ## Not in the local manifest self._mon_manifest.append(name) ## Check the published manifest value = self.read_monitor_point('manifest/monitoring') if value is None: value = MonitorPoint([]) for entry in self._mon_manifest: if entry not in value.value: value.value.append(entry) updated = True # If there is an update, push it out if updated and value is not None: value.timestamp = time.time() value = value.as_json() self.client.put('/mon/%s/%s' % (, 'manifest/monitoring'), value) return updated def _update_cmd_manifest(self, name, drop=False): """ Update the command manifest as needed. Returns a Boolean of whether or not an update was made. """ with self.client.lock(, ttl=5) as lock: # Is it alread in the local manifest? updated = False value = None if drop: ## Remove from the local manifest try: del self._cmd_manifest[self._cmd_manifest.index(name)] except ValueError: pass ## Check the published manifest value = self.read_monitor_point('manifest/commands') if value is None: value = MonitorPoint([]) try: del value.value[value.value.index(name)] updated = True except ValueError: pass elif name not in self._cmd_manifest: ## Not in the local manifest self._cmd_manifest.append(name) ## Check the published manifest value = self.read_monitor_point('manifest/commands') if value is None: value = MonitorPoint([]) for entry in self._cmd_manifest: if entry not in value.value: value.value.append(entry) updated = True # If there is an update, push it out if updated and value is not None: value.timestamp = time.time() value = value.as_json() self.client.put('/mon/%s/%s' % (, 'manifest/commands'), value) return updated
[docs] def remove_monitor_point(self, name): """ Remove the specified monitoring point. Returns True if the deletion was successful, False otherwise. """ if is None: raise RuntimeError("Writing monitoring points is not supported in anonymous mode") if name.startswith('/'): name = name[1:] try: self.client.delete('/mon/%s/%s' % (, name)) self._update_mon_manifest(name, drop=True) return True except Exception as e: return False
[docs] def write_monitor_point(self, name, value, timestamp=None, unit=''): """ Write a value to the specified monitoring point. Returns True if the write was successful, False otherwise. """ if is None: raise RuntimeError("Writing monitoring points is not supported in anonymous mode") if name.startswith('/'): name = name[1:] if isinstance(value, dict): pass elif isinstance(value, MonitorPoint): value = value.as_dict() else: if timestamp is None: timestamp = time.time() value = {'timestamp': timestamp, 'value': value, 'unit': unit} value = json.dumps(value) try: self.client.put('/mon/%s/%s' % (, name), value) self._update_mon_manifest(name) return True except Exception: return False
[docs] def read_monitor_point(self, name, id=None): """ Read the current value of a monitoring point. An 'id' of 'None' is interpretted as that monitoring point on the current subsystem. Returns the monitoring point as a :py:class:`MonitorPoint` if successful, None otherwise. """ if name.startswith('/'): name = name[1:] if id is None: id = if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") try: value = self.client.get('/mon/%s/%s' % (id, name)) value = MonitorPoint.from_json(value[0]) return value except Exception as e: warnings.warn("Error reading monitor point '/mon/%s/%s': %s" % (id, name, str(e)), RuntimeWarning) return None
[docs] def list_monitor_points(self, id=None): """ Return the contents of the monitoring point manifest. An 'id' of 'None' is interpretted as that monitoring point on the current subsystem. Returns the list if successful, None otherwise. """ res = self.read_monitor_point('manifest/monitoring', id=id) if res is not None: res = res.value return res
[docs] def set_monitor_point_callback(self, name, callback, id=None): """ Watch the specified monitoring point and execute the callback when its value is updated. The callback should be a sub-class of :py:class:`MonitorPointCallbackBase`. An 'id' of 'None' is interpretted as that monitoring point on the current subsystem. Return True is successful, False otherwise. This watch...callback behavior continues until the appropriate :py:meth:`cancel_monitor_point_callback` is called. """ if name.startswith('/'): name = name[1:] if not isinstance(callback, MonitorPointCallbackBase): raise TypeError("Expected a MonitorPointCallbackBase-derived instance") if id is None: id = if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") full_name = '/mon/%s/%s' % (id, name) try: watch_id = self.client.add_watch_callback(full_name, callback) try: self.client.cancel_watch(self._watchers[full_name][0]) except KeyError: pass self._watchers[full_name] = (watch_id, callback) return True except Exception as e: return False
[docs] def cancel_monitor_point_callback(self, name, id=None): """ Cancel watching a monitoring point setup with set_monitor_point_callback Return True if successful, False otherwise. """ if name.startswith('/'): name = name[1:] if id is None: id = if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") full_name = '/mon/%s/%s' % (id, name) try: self.client.cancel_watch(self._watchers[full_name][0]) del self._watchers[full_name] return True except KeyError: return False
[docs] def read_monitor_point_branch(self, name, id=None): """ Read the current value of all keys in a monitoring point branch. An 'id' of 'None' is interpretted as that monitoring point branch on the current subsystem. Returns the monitoring point branch as a list of two-element tuples of (key, :py:class:`MonitorPoint`) if successful, None otherwise. """ if name.startswith('/'): name = name[1:] if id is None: id = if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") try: output = [] for value in self.client.get_prefix('/mon/%s/%s' % (id, name)): output.append((value[1].key, MonitorPoint.from_json(value[0]))) return output except Exception as e: warnings.warn("Error reading monitor point branch '/mon/%s/%s': %s" % (id, name, str(e)), RuntimeWarning) return None
[docs] def set_monitor_point_branch_callback(self, name, callback, id=None): """ Watch the specified monitoring point branch and execute the callback when any key within that branch is updated. The callback should be a sub-class of :py:class:`MonitorPointCallbackBase`. An 'id' of 'None' is interpretted as that monitoring point branch on the current subsystem. Return True is successful, False otherwise. This watch...callback behavior continues until the appropriate :py:meth:`cancel_monitor_point_branch_callback` is called. """ if name.startswith('/'): name = name[1:] if not isinstance(callback, MonitorPointCallbackBase): raise TypeError("Expected a MonitorPointCallbackBase-derived instance") if id is None: id = if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") full_name = '/mon/%s/%s' % (id, name) try: watch_id = self.client.add_watch_prefix_callback(full_name, callback) try: self.client.cancel_watch(self._watchers[full_name][0]) except KeyError: pass self._watchers[full_name] = (watch_id, callback) return True except Exception as e: return False
[docs] def cancel_monitor_point_branch_callback(self, name, id=None): """ Cancel watching a monitoring point branch setup with :py:meth:`set_monitor_point_branch_callback`. Return True if successful, False otherwise. """ return self.canel_monitor_point_callback(name, id)
[docs] def list_commands(self, subsystem): """ Return the contents of the command manifest for the specified subsystem. Returns the list if successful, None otherwise. """ res = self.read_monitor_point('manifest/commands', id=subsystem) if res is not None: res = res.value return res
[docs] def set_command_callback(self, command, callback): """ Process a command by executing the callback when it is received. The callback should be a sub-class of :py:class:`CommandCallbackBase`. Return True is successful, False otherwise. This watch...callback behavior continues until the appropriate :py:meth:`cancel_command_callback` is called. """ if is None: raise RuntimeError("Command processing is not supported in anonymous mode") if command.startswith('/'): command = command[1:] if not isinstance(callback, CommandCallbackBase): raise TypeError("Expected a CommandCallbackBase-derived instance") full_name = '/cmd/%s/%s' % (, command) try: watch_id = self.client.add_watch_prefix_callback(full_name, callback) try: self.client.cancel_watch(self._watchers[command][0]) except KeyError: pass self._watchers[command] = (watch_id, callback) self._update_cmd_manifest(command) return True except Exception as e: return False
[docs] def cancel_command_callback(self, command): """ Cancel command processing setup with :py:meth:`set_command_callback`. Return True if successful, False otherwise. """ if is None: raise RuntimeError("Command processing is not supported in anonymous mode") if command.startswith('/'): command = command[1:] full_name = '/cmd/%s/%s' % (, command) try: self.client.cancel_watch(self._watchers[full_name][0]) del self._watchers[full_name] self._update_cmd_manifest(command, drop=True) return True except KeyError: return False
[docs] def send_command(self, subsystem, command, **kwargs): """ Send a command to the given subsystem and wait for a response. The arguments for the command are given as keywords. If a response is received within the timeout window, that response is returned as a two- element tuple of (True, the response as a dictionary). If a response was not received within the timeout window or another error occurred, return a two-element tuple of (False, sequence_id). """ if command.startswith('/'): command = command[1:] full_name = '/cmd/%s/%s' % (subsystem, command) resp_name = '/resp/'+full_name[5:] sequence_id = uuid.uuid1().hex try: s_id = sequence_id.decode() except AttributeError: s_id = sequence_id payload = {'sequence_id': sequence_id, 'timestamp': time.time(), 'command': command, 'kwargs': kwargs} payload = json.dumps(payload) def _timeout(signum, frame): raise RuntimeError("timeout") signal.signal(signal.SIGALRM, _timeout) try: if self.timeout > 0: signal.alarm(int(round(self.timeout))) events_iterator, cancel = self.client.put(full_name, payload) found = None t0 = time.time() while not found and (time.time() - t0) < self.timeout: for event in events_iterator: value = json.loads(event.value) if value['sequence_id'] == sequence_id: found = value break cancel() signal.alarm(0) return True, found except Exception as e: warnings.warn("Error sending command to '/cmd/%s/%s': %s" % (subsystem, command, str(e)), RuntimeWarning) return False, s_id