Source code for riaps.run.insPort

'''
Created on Jan 9, 2017

@author: riaps
'''
from .port import Port,PortInfo,PortScope
import threading
import zmq
import time
from .exc import OperationError, PortError
from enum import Enum

try:
    import cPickle
    pickle = cPickle
except:
    cPickle = None
    import pickle

class _DeviceThread(threading.Thread):
    '''
    Prototypical 'inside' thread that implements a 1 sec 
    'ticker' and an echo service.
    '''
    def __init__(self, trigger):
        threading.Thread.__init__(self,daemon=False)
        self.trigger = trigger
        self.active = threading.Event()
        self.active.clear()
        self.terminated = threading.Event()
        self.terminated.clear()
        self.period = 1000.0        # 1 sec 
        self.plug = None
        self.plug_identity = None

    def get_identity(self,ins_port):
        '''
        to be called from the 'outer' component thread, retrieves the identity of the 
        'inner' side (i.e. thread), so that that component thread can send 
        messages to the selected inner (thread).   
        '''
        if self.plug_identity is None:
            while True:
                if self.plug != None:
                    self.plug_identity = ins_port.get_plug_identity(self.plug)
                    break
                time.sleep(0.1)
        return self.plug_identity

    def run(self):
        self.plug = self.parent.setupPlug(self)
        self.poller = zmq.Poller() 
        self.poller.register(self.plug, zmq.POLLIN)
        while 1:
            self.active.wait(None)
            if self.terminated.is_set(): break
            if self.active.is_set():
                socks = dict(self.poller.poll(self.period))
                if self.terminated.is_set(): break
                if self.plug in socks and socks[self.plug] == zmq.POLLIN:
                    msg = self.plug.recv_pyobj()    # Message from component
                    self.plug.send_pyobj(msg)       # ... echo it
                    continue
                if len(socks) == 0:                 # Timeout
                    value = time.time()
                    self.plug.send_pyobj(value)

    def activate(self):
        self.active.set()

    def deactivate(self):
        self.active.clear()

    def terminate(self):
        self.active.set()
        self.terminated.set()

# class Component:
#     pass
#
# class _Device(Component):
#     '''
#     message Query;
#     message Answer;
#     device _Device() {
#         inside trigger;
#         ans echo (Query,Answer);
#     }
#     '''
#     def __init__(self):
#         super().__init__()
#         self.thread = None
#
#     def handleActivate(self):                   # activation: sets up inner thread
#         if self.thread == None: 
#             self.thread = _DeviceThread(self.trigger)
#             self.thread.start() 
#             self.trigger.set_identity(self._DeviceThread.get_identity(self.trigger))
#             self.trigger.activate()
#
#     def __destroy__(self):
#         self._DeviceThread.deactivate()
#         self._DeviceThread.terminate()
#         self._DeviceThread.join()
#
#     def on_trigger(self):                       # operation triggered by inner thread
#         msg = self.trigger.recv_pyobj()
#         if type(msg) == float:                  # time value
#             pass
#         else:                                   # echo answer
#             self.echo.send_pyobj(msg)
#
#     def on_echo(self):
#         qry = self.echo.recv_pyobj()            # recv query to echo
#         self.trigger.send_pyobj(qry)            # send it to inner thread


[docs]class InsPort(Port): ''' classdocs ''' def __init__(self, parentPart, portName, portSpec): ''' Constructor ''' super(InsPort, self).__init__(parentPart, portName, portSpec) self.instName = self.parent.name + '.' + self.name self.spec = portSpec["spec"] self.inner_threads = [] self.parent_thread = None self.info = None self.plugMap = { } self.identity = bytes(8) # Default identity, it is an error to use it.
[docs] def setup(self): if self.spec == 'default': thread = _DeviceThread(self) thread.start() self.activate() else: pass
[docs] def setupSocket(self, owner): self.setOwner(owner) self.parent_thread = threading.current_thread() self.socket = self.context.socket(zmq.ROUTER) self.socket.bind('inproc://inside_' + self.instName) self.info = PortInfo(portKind='ins', portScope=PortScope.INTERNAL, portName=self.name, msgType='inside', portHost='', portNum=-1) return self.info
[docs] def reset(self): pass
[docs] def setupPlug(self, thread): assert thread != self.parent_thread # Must be called from an inner thread if thread in self.inner_threads: raise OperationError('inside thread %s already running on %s' % (thread.ident, self.instName)) else: self.inner_threads += [thread] plug = self.context.socket(zmq.DEALER) identity = str(id(thread)) plug.setsockopt_string(zmq.IDENTITY, identity, 'utf-8') plug.connect('inproc://inside_' + self.instName) self.plugMap[plug] = bytes(identity, 'utf-8') return plug
[docs] def activate(self): for thread in self.inner_threads: if thread and hasattr(thread, 'activate'): thread.activate()
[docs] def deactivate(self): for thread in self.inner_threads: if thread and hasattr(thread, 'deactivate'): thread.deactivate()
[docs] def terminate(self): for thread in self.inner_threads: if hasattr(thread, 'terminate') and thread.is_alive(): thread.terminate()
[docs] def getSocket(self): return self.socket
[docs] def inSocket(self): return True
[docs] def getContext(self): return self.context
[docs] def get_plug_identity(self, plug): return self.plugMap.get(plug, None)
[docs] def get_identity(self): return self.identity
[docs] def set_identity(self, identity): self.identity = identity
[docs] def ins_port_recv(self, is_pyobj): try: msgFrames = self.socket.recv_multipart() # Receive multipart (IDENTITY + payload) message except zmq.error.ZMQError as e: raise PortError("recv error (%d)" % e.errno, e.errno) from e self.identity = msgFrames[0] # Separate identity, it is a Frame if is_pyobj: result = pickle.loads(msgFrames[1]) # Separate payload (pyobj) else: result = msgFrames[1] # Separate payload (bytes) return result
[docs] def ins_port_send(self, msg, is_pyobj): try: sendMsg = [self.identity] # Identity is already a frame if is_pyobj: payload = zmq.Frame(pickle.dumps(msg)) # Pickle python payload else: payload = zmq.Frame(msg) # Take bytes sendMsg += [payload] self.socket.send_multipart(sendMsg) except zmq.error.ZMQError as e: raise PortError("send error (%d)" % e.errno, e.errno) from e return True
[docs] def recv_pyobj(self): return self.ins_port_recv(True)
[docs] def send_pyobj(self, msg): return self.ins_port_send(msg, True)
[docs] def recv(self): return self.ins_port_recv(False)
[docs] def send(self, msg): return self.ins_port_send(msg, False)
[docs] def getInfo(self): return self.info