Source code for mnc.mcs

from __future__ import division, print_function
try:
    range = xrange
except NameError:
    pass
    
import os
import sys
import json
import time
import uuid
import etcd3
import base64
import signal
import warnings
try:
    from io import BytesIO
except ImportError:
    from StringIO import StringIO as BytesIO
from datetime import datetime, timedelta
from textwrap import fill as tw_fill

import matplotlib
matplotlib.use('Agg')
from matplotlib import pyplot as plt

from mnc.common import ETCD_HOST, ETCD_PORT


__all__ = ['MonitorPoint', 'MultiMonitorPoint', 'ImageMonitorPoint',
           'MonitorPointCallbackBase', 'CommandCallbackBase', 'Client']


temp = etcd3.client()
ETCD_TYPE = type(temp)
del temp


class MonitorPoint(object):
    """
    Object for representing a monitoring point within the MCS framework.  At a
    minimum this includes:
    
     * a UNIX timestamp of when the monitoring point was updated,
     * the value of the monitoring point itself, and
     * the units of the monitoring point value or '' if there are none.
    """
    
    _required = ('timestamp', 'value', 'unit')
    
    def __init__(self, value, timestamp=None, unit='', **kwargs):
        if isinstance(value, MonitorPoint):
            value = value.as_dict()
            
        if isinstance(value, dict):
            for key in self._required:
                if key not in value:
                    raise KeyError("Missing required key: %s" % key)
        else:
            if timestamp is None:
                timestamp = time.time()
                
            value = {'timestamp': timestamp, 'value': value, 'unit': unit}
            for k,v in kwargs.items():
                value[k] = v
                
        self._entries = []
        for k,v in value.items():
            self._entries.append(k)
            setattr(self, k, v)
            
    def __repr__(self):
        output = "<%s " % (type(self).__name__,)
        first = True
        for k in self._entries:
            if not first:
                output += ', '
                
            v = getattr(self, k, None)
            if k == 'timestamp':
                v = datetime.utcfromtimestamp(v)
                v = str(v)
            if isinstance(v, str):
                v = "'%s'" % v
                
            output += "%s=%s" % (k, v)
            first = False
        output += '>'
        return tw_fill(output, subsequent_indent='    ')
        
    def __str__(self):
        output = "%s%s at %s" % (str(self.value), self.unit, datetime.utcfromtimestamp(self.timestamp))
        return output
        
    def __contains__(self, value):
        return value in self._entries
        
    @classmethod
    def from_json(cls, json_value):
        """
        Return a new :py:class:`MonitorPoint` instance based on a JSON-packed dictionary.
        """
        
        value = json.loads(json_value)
        return cls(value)
        
    def as_dict(self):
        """
        Return the information about the monitoring point as a dictionary.
        """
        
        value = {}
        for k in self._entries:
            value[k] = getattr(self, k)
        return value
        
    def as_json(self):
        """
        Return the information about the monitoring point as JSON-packed
        dictionary.
        """
        
        value = self.as_dict()
        return json.dumps(value)


class MultiMonitorPoint(MonitorPoint):
    """
    Sub-class of :py:class`MonitorPoint` for representing a multiple monitoring
    points updated at the same time within the MCS framework.  At a minimum this
    includes:
    
     * a UNIX timestamp of when the monitoring point was updated,
     * a list of values of the monitoring points themselves,
     * a list of field names for each monitoring point, and
     * the units of the monitoring point values or '' if there are none.
    
    .. note:: If only a single unit is supplied, it is replicated for all
              values.
    """
    
    _required = ('timestamp', 'value', 'field', 'unit')
    
    def __init__(self, value, timestamp=None, field=None, unit='', **kwargs):
        MonitorPoint.__init__(self, value, timestamp=timestamp, unit=unit, **kwargs)
        
        # The "multi" part
        try:
            len(self.value)
            if isinstance(self.value, (str, bytes)):
                raise TypeError
        except TypeError:
            self.value = [self.value,]
        if field is None:
            try:
                field = self.field
            except AttributeError:
                field = ['value%i' % i for i in range(len(self.value))]
        if isinstance(self.unit, str):
            self.unit = [self.unit for i in range(len(self.value))]
        if len(self.value) != len(field):
            raise RuntimeError("value and field keys must have the same length")
        if len(self.value) != len(self.unit):
            raise RuntimeError("value and unit keys must have the same length")
            
        if 'field' not in self._entries:
            self._entries.append('field')
        self.field = field
        
    def __str__(self):
        output = ', '.join(["%s=%s%s" % v for v in zip(self.field, self.value, self.unit)])
        output += " at %s" % datetime.utcfromtimestamp(self.timestamp)
        return output
        
    def as_list(self):
        """
        Return the information about the monitoring point as a list of three-
        element tuples containing (field name, value, unit).
        """
        
        return [e for e in zip(self.field, self.value, self.unit)]


class ImageMonitorPoint(MonitorPoint):
    """
    Sub-class of :py:class:`MonitorPoint` for representing a monitoring point
    image within the MCS framework.  At a minimum this includes:
    
     * a UNIX timestamp of when the monitoring point was updated,
     * the base64-encoded monitoring point image itself,
     * the MIME-type of the encoded image, and
     * the units of the monitoring point value or '' if there are none.
    """
    
    _required = ('timestamp', 'value', 'mime', 'unit')
    
    @staticmethod
    def _encode_image_data(image_data):
        image_data = base64.urlsafe_b64encode(image_data)
        try:
            image_data = image_data.decode()
        except AttributeError:
            pass
        return image_data
        
    @staticmethod
    def _decode_image_data(image_data):
        try:
            image_data = image_data.encode()
        except AttributeError:
            pass
        image_data = base64.urlsafe_b64decode(image_data)
        return image_data
        
    @classmethod
    def from_figure(cls, fig):
        """
        Return a new :py:class:`ImageMonitorPoint` instance based on a matplotlib Figure.
        """
        
        canvas = matplotlib.backends.backend_agg.FigureCanvasAgg(fig)
        image = BytesIO()
        canvas.print_png(image)
        image.seek(0)
        image_data = image.read()
        image.close()
        
        image_data = cls._encode_image_data(image_data)
        
        return cls(image_data, mime='image/png')
        
    @classmethod
    def from_image(cls, im):
        """
        Return a new :py:class:`ImageMonitorPoint` instance based on a PIL.Image.
        """
        
        image = BytesIO()
        im.save(image, 'PNG')
        image.seek(0)
        image_data = image.read()
        image.close()
        
        image_data = cls._encode_image_data(image_data)
        
        return cls(image_data, mime='image/png')
        
    @classmethod
    def from_file(cls, name_or_handle):
        """
        Return a new :py:class:`ImageMonitorPoint` instance based the contents
        of a filename or open file handle.
        """
        
        try:
            ts = os.path.getmtime(name_or_handle.name)
            ext = os.path.getext(name_or_handle.name)[1]
            if ext not in ('.png', '.jpg', '.jpeg'):
                raise RuntimeError("Provided file does not seem to be a support image format")
                
            image_data = name_or_handle.read()
        except AttributeError:
            ts = os.path.getmtime(name_or_handle)
            ext = os.path.getext(name_or_handle)[1]
            if ext not in ('.png', '.jpg', '.jpeg'):
                raise RuntimeError("Provided file does not seem to be a support image format")
                
            with open(name_or_handle, 'rb') as fh:
                image_data = fh.read()
                
        image_data = cls._encode_image_data(image_data)
        
        mime = 'image/png'
        if ext in ('.jpg', '.jpeg'):
            mime = 'image/jpeg'
            
        return cls(image_data, timestamp=ts, mime=mime)
        
    def as_array(self):
        """
        Return the data for the monitoring point image as a numpy.array, similar
        to matplotlib.pyplot.imread.
        """
        
        image_data = self._decode_image_data(self.value)
        
        image = BytesIO()
        image.write(image_data)
        image.seek(0)
        
        image_data = plt.imread(image)
        image.close()
        return image_data
        
    def to_file(self, name_or_handle):
        """
        Write the monitoring point image to the specified filename or open
        file handle.
        """
        
        image_data = self._decode_image_data(self.value)
        
        try:
            name_or_handle.write(image_data)
        except AttributeError:
            with open(name_or_handle, 'wb') as fh:
                fh.write(image_data)


class MonitorPointCallbackBase(object):
    """
    Base class to use as a callback for when a monitoring point is changed.
    """
    
    @staticmethod
    def action(value):
        """
        Static method that should be overridden by the sub-class as to what to
        do with the new monitoring point value.  The method should accept a 
        single argument of a MonitorPoint.
        """
        
        raise NotImplementedError
        
    def __call__(self, event):
        output = None
        for evt in event.events:
            value = MonitorPoint.from_json(evt.value)
            output = self.action(value)
        return output


class CommandCallbackBase(object):
    """
    Base class to use as a callback for processing a command when it is
    received.
    """
    
    def __init__(self, client):
        if isinstance(client, Client):
            client = client.client
        elif isinstance(client, ETCD_TYPE):
            pass
        else:
            raise TypeError("Expected a mcs.Client or etcd3.client.Ectd3Client")
        self.client = client
        
    @staticmethod
    def action(*args, **kwargs):
        """
        Static method that should be overridden by the sub-class as to how to
        process the command.  The method should accept arbitrary keywords per
        the command and return a two-element tuple of (command status,
        response code).
        """
        
        raise NotImplementedError
        return status, response
        
    def __call__(self, event):
        for evt in event.events:
            try:
                key = evt.key.decode()
            except AttributeError:
                key = evt.key
            value = json.loads(evt.value)
            
            sequence_id = value['sequence_id']
            command = value['command']
            payload = value['kwargs']
            if 'sequence_id' not in payload:
                try:
                    payload['sequence_id'] = sequence_id.decode()
                except AttributeError:
                    payload['sequence_id'] = sequence_id
                    
            ts = time.time()
            status, response = self.action(**payload)
            if isinstance(status, bool):
                status = 'success' if status else 'error'
            status = {'sequence_id': sequence_id,
                      'timestamp': ts,
                      'status': status,
                      'response': response}
            status = json.dumps(status)
            
            key = '/resp/'+key[5:]
            self.client.put(key, status)


[docs]class Client(object): """ MCS framework client. This can be used for both monitor and control. """ def __init__(self, id=None, timeout=5.0): """ Initialize the client with the specified subsystem ID name and command response timeout in second. A timeout of 'None' allows blocking until a response is received. If the client is anonymous, i.e., 'id' is 'None' then only reading monitoring points and sending commands are supported. """ if id is not None: id = str(id) self.id = id if timeout is None: timeout = 1e9 self.timeout = timeout self.client = etcd3.client(host=ETCD_HOST, port=ETCD_PORT) self._mon_manifest = ['manifest/monitoring', 'manifest/commands'] self._cmd_manifest = [] self._watchers = {} def __del__(self): for command in self._watchers: try: self.client.cancel_watch(self._watchers[command][0]) except Exception: pass self.client.close() def _update_mon_manifest(self, name, drop=False): """ Update the monitoring point manifest as needed. Returns a Boolean of whether or not an update was made. """ with self.client.lock(self.id, ttl=5) as lock: # Is it alread in the local manifest? updated = False value = None if drop: ## Remove from the local manifest try: del self._mon_manifest[self._mon_manifest.index(name)] except ValueError: pass ## Check the published manifest value = self.read_monitor_point('manifest/monitoring') if value is None: value = MonitorPoint([]) try: del value.value[value.value.index(name)] updated = True except ValueError: pass elif name not in self._mon_manifest: ## Not in the local manifest self._mon_manifest.append(name) ## Check the published manifest value = self.read_monitor_point('manifest/monitoring') if value is None: value = MonitorPoint([]) for entry in self._mon_manifest: if entry not in value.value: value.value.append(entry) updated = True # If there is an update, push it out if updated and value is not None: value.timestamp = time.time() value = value.as_json() self.client.put('/mon/%s/%s' % (self.id, 'manifest/monitoring'), value) return updated def _update_cmd_manifest(self, name, drop=False): """ Update the command manifest as needed. Returns a Boolean of whether or not an update was made. """ with self.client.lock(self.id, ttl=5) as lock: # Is it alread in the local manifest? updated = False value = None if drop: ## Remove from the local manifest try: del self._cmd_manifest[self._cmd_manifest.index(name)] except ValueError: pass ## Check the published manifest value = self.read_monitor_point('manifest/commands') if value is None: value = MonitorPoint([]) try: del value.value[value.value.index(name)] updated = True except ValueError: pass elif name not in self._cmd_manifest: ## Not in the local manifest self._cmd_manifest.append(name) ## Check the published manifest value = self.read_monitor_point('manifest/commands') if value is None: value = MonitorPoint([]) for entry in self._cmd_manifest: if entry not in value.value: value.value.append(entry) updated = True # If there is an update, push it out if updated and value is not None: value.timestamp = time.time() value = value.as_json() self.client.put('/mon/%s/%s' % (self.id, 'manifest/commands'), value) return updated
[docs] def remove_monitor_point(self, name): """ Remove the specified monitoring point. Returns True if the deletion was successful, False otherwise. """ if self.id is None: raise RuntimeError("Writing monitoring points is not supported in anonymous mode") if name.startswith('/'): name = name[1:] try: self.client.delete('/mon/%s/%s' % (self.id, name)) self._update_mon_manifest(name, drop=True) return True except Exception as e: return False
[docs] def write_monitor_point(self, name, value, timestamp=None, unit=''): """ Write a value to the specified monitoring point. Returns True if the write was successful, False otherwise. """ if self.id is None: raise RuntimeError("Writing monitoring points is not supported in anonymous mode") if name.startswith('/'): name = name[1:] if isinstance(value, dict): pass elif isinstance(value, MonitorPoint): value = value.as_dict() else: if timestamp is None: timestamp = time.time() value = {'timestamp': timestamp, 'value': value, 'unit': unit} value = json.dumps(value) try: self.client.put('/mon/%s/%s' % (self.id, name), value) self._update_mon_manifest(name) return True except Exception: return False
[docs] def read_monitor_point(self, name, id=None): """ Read the current value of a monitoring point. An 'id' of 'None' is interpretted as that monitoring point on the current subsystem. Returns the monitoring point as a :py:class:`MonitorPoint` if successful, None otherwise. """ if name.startswith('/'): name = name[1:] if id is None: id = self.id if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") try: value = self.client.get('/mon/%s/%s' % (id, name)) value = MonitorPoint.from_json(value[0]) return value except Exception as e: warnings.warn("Error reading monitor point '/mon/%s/%s': %s" % (id, name, str(e)), RuntimeWarning) return None
[docs] def list_monitor_points(self, id=None): """ Return the contents of the monitoring point manifest. An 'id' of 'None' is interpretted as that monitoring point on the current subsystem. Returns the list if successful, None otherwise. """ res = self.read_monitor_point('manifest/monitoring', id=id) if res is not None: res = res.value return res
[docs] def set_monitor_point_callback(self, name, callback, id=None): """ Watch the specified monitoring point and execute the callback when its value is updated. The callback should be a sub-class of :py:class:`MonitorPointCallbackBase`. An 'id' of 'None' is interpretted as that monitoring point on the current subsystem. Return True is successful, False otherwise. This watch...callback behavior continues until the appropriate :py:meth:`cancel_monitor_point_callback` is called. """ if name.startswith('/'): name = name[1:] if not isinstance(callback, MonitorPointCallbackBase): raise TypeError("Expected a MonitorPointCallbackBase-derived instance") if id is None: id = self.id if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") full_name = '/mon/%s/%s' % (id, name) try: watch_id = self.client.add_watch_callback(full_name, callback) try: self.client.cancel_watch(self._watchers[full_name][0]) except KeyError: pass self._watchers[full_name] = (watch_id, callback) return True except Exception as e: return False
[docs] def cancel_monitor_point_callback(self, name, id=None): """ Cancel watching a monitoring point setup with set_monitor_point_callback Return True if successful, False otherwise. """ if name.startswith('/'): name = name[1:] if id is None: id = self.id if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") full_name = '/mon/%s/%s' % (id, name) try: self.client.cancel_watch(self._watchers[full_name][0]) del self._watchers[full_name] return True except KeyError: return False
[docs] def read_monitor_point_branch(self, name, id=None): """ Read the current value of all keys in a monitoring point branch. An 'id' of 'None' is interpretted as that monitoring point branch on the current subsystem. Returns the monitoring point branch as a list of two-element tuples of (key, :py:class:`MonitorPoint`) if successful, None otherwise. """ if name.startswith('/'): name = name[1:] if id is None: id = self.id if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") try: output = [] for value in self.client.get_prefix('/mon/%s/%s' % (id, name)): output.append((value[1].key, MonitorPoint.from_json(value[0]))) return output except Exception as e: warnings.warn("Error reading monitor point branch '/mon/%s/%s': %s" % (id, name, str(e)), RuntimeWarning) return None
[docs] def set_monitor_point_branch_callback(self, name, callback, id=None): """ Watch the specified monitoring point branch and execute the callback when any key within that branch is updated. The callback should be a sub-class of :py:class:`MonitorPointCallbackBase`. An 'id' of 'None' is interpretted as that monitoring point branch on the current subsystem. Return True is successful, False otherwise. This watch...callback behavior continues until the appropriate :py:meth:`cancel_monitor_point_branch_callback` is called. """ if name.startswith('/'): name = name[1:] if not isinstance(callback, MonitorPointCallbackBase): raise TypeError("Expected a MonitorPointCallbackBase-derived instance") if id is None: id = self.id if id is None: raise RuntimeError("Must specify a subsystem ID when in anonymous mode") full_name = '/mon/%s/%s' % (id, name) try: watch_id = self.client.add_watch_prefix_callback(full_name, callback) try: self.client.cancel_watch(self._watchers[full_name][0]) except KeyError: pass self._watchers[full_name] = (watch_id, callback) return True except Exception as e: return False
[docs] def cancel_monitor_point_branch_callback(self, name, id=None): """ Cancel watching a monitoring point branch setup with :py:meth:`set_monitor_point_branch_callback`. Return True if successful, False otherwise. """ return self.canel_monitor_point_callback(name, id)
[docs] def list_commands(self, subsystem): """ Return the contents of the command manifest for the specified subsystem. Returns the list if successful, None otherwise. """ res = self.read_monitor_point('manifest/commands', id=subsystem) if res is not None: res = res.value return res
[docs] def set_command_callback(self, command, callback): """ Process a command by executing the callback when it is received. The callback should be a sub-class of :py:class:`CommandCallbackBase`. Return True is successful, False otherwise. This watch...callback behavior continues until the appropriate :py:meth:`cancel_command_callback` is called. """ if self.id is None: raise RuntimeError("Command processing is not supported in anonymous mode") if command.startswith('/'): command = command[1:] if not isinstance(callback, CommandCallbackBase): raise TypeError("Expected a CommandCallbackBase-derived instance") full_name = '/cmd/%s/%s' % (self.id, command) try: watch_id = self.client.add_watch_prefix_callback(full_name, callback) try: self.client.cancel_watch(self._watchers[command][0]) except KeyError: pass self._watchers[command] = (watch_id, callback) self._update_cmd_manifest(command) return True except Exception as e: return False
[docs] def cancel_command_callback(self, command): """ Cancel command processing setup with :py:meth:`set_command_callback`. Return True if successful, False otherwise. """ if self.id is None: raise RuntimeError("Command processing is not supported in anonymous mode") if command.startswith('/'): command = command[1:] full_name = '/cmd/%s/%s' % (self.id, command) try: self.client.cancel_watch(self._watchers[full_name][0]) del self._watchers[full_name] self._update_cmd_manifest(command, drop=True) return True except KeyError: return False
[docs] def send_command(self, subsystem, command, **kwargs): """ Send a command to the given subsystem and wait for a response. The arguments for the command are given as keywords. If a response is received within the timeout window, that response is returned as a two- element tuple of (True, the response as a dictionary). If a response was not received within the timeout window or another error occurred, return a two-element tuple of (False, sequence_id). """ if command.startswith('/'): command = command[1:] full_name = '/cmd/%s/%s' % (subsystem, command) resp_name = '/resp/'+full_name[5:] sequence_id = uuid.uuid1().hex try: s_id = sequence_id.decode() except AttributeError: s_id = sequence_id payload = {'sequence_id': sequence_id, 'timestamp': time.time(), 'command': command, 'kwargs': kwargs} payload = json.dumps(payload) def _timeout(signum, frame): raise RuntimeError("timeout") signal.signal(signal.SIGALRM, _timeout) try: if self.timeout > 0: signal.alarm(int(round(self.timeout))) events_iterator, cancel = self.client.watch(resp_name) self.client.put(full_name, payload) found = None t0 = time.time() while not found and (time.time() - t0) < self.timeout: for event in events_iterator: value = json.loads(event.value) if value['sequence_id'] == sequence_id: found = value break cancel() signal.alarm(0) return True, found except Exception as e: warnings.warn("Error sending command to '/cmd/%s/%s': %s" % (subsystem, command, str(e)), RuntimeWarning) return False, s_id