Source code for riaps.run.dcPorts

'''
Distributed coordination - Communication ports for the groups.

Created on Feb 23, 2019
Author: riaps
'''

import ctypes
import threading
import zmq
import time
import logging
import struct
import traceback
from riaps.run.port import Port,PortInfo,PortScope,BindPort,ConnPort
from riaps.consts.defs import *
from riaps.utils import spdlog_setup
import spdlog
import random
import string
from riaps.run.exc import BuildError, OperationError, PortError
import struct
import collections
from zmq.backend.cython.utils import ZMQError

try:
    import cPickle
    pickle = cPickle
except:
    cPickle = None
    import pickle

[docs]class GroupSimplexPort(Port): def __init__(self, parentComponent, portName, groupSpec): ''' GroupSimplexPort constructor ''' super().__init__(parentComponent, portName) self.type = groupSpec["message"] self.isTimed = groupSpec["timed"] self.portScope = PortScope.GLOBAL self.msgType = self.type self.info = None
[docs]class GroupPubPort(BindPort,GroupSimplexPort): ''' Group Publisher port is for publishing application and housekeeping messages for all group members. ''' def __init__(self, parentPart, portName, groupSpec): ''' Constructor ''' super(GroupPubPort, self).__init__(parentPart, portName,groupSpec)
[docs] def setup(self): pass
[docs] def setupSocket(self, owner): return self.setupBindSocket(owner, zmq.PUB, 'gpub',[(zmq.SNDTIMEO,self.sendTimeout)])
# self.setOwner(owner) # self.socket = self.context.socket(zmq.PUB) # self.socket.setsockopt(zmq.SNDTIMEO, self.sendTimeout) # self.host = '' # self.portNum = -1 # self.setupCurve(True) # globalHost = self.getGlobalIface() # self.bindAddr = "tcp://" + globalHost # self.portNum = self.socket.bind_to_random_port(self.bindAddr) # self.host = globalHost # self.info = PortInfo(portKind='gpub', portScope=self.portScope, portName=self.name, # msgType=self.type, # portHost=self.host, portNum=self.portNum) # return self.info
[docs] def closeSocket(self): self.closeBindSocket()
[docs] def reset(self): pass
[docs] def update(self, host, port): raise OperationError("Unsupported update() on GroupPubPort")
[docs] def getSocket(self): return self.socket
[docs] def inSocket(self): return False
[docs] def send_pyobj(self, msg): raise OperationError("Unsupported send_pyobj() on GroupPubPort")
# return self.port_send(msg,True)
[docs] def send(self, msg): raise OperationError("Unsupported send() on GroupPubPort")
# return self.port_send(msg,False)
[docs] def sendGroup(self, msgType, msg): try: msgFrames = [zmq.Frame(msgType)] msgFrames += [zmq.Frame(msg)] if self.isTimed: now = time.time() now = struct.pack("d", now) nowFrame = zmq.Frame(now) msgFrames += [nowFrame] self.socket.send_multipart(msgFrames) except zmq.error.ZMQError as e: raise PortError("send error (%d)" % e.errno, e.errno) from e return True
[docs] def recv_pyobj(self): raise OperationError("attempt to receive through a publish port")
[docs] def recv(self): raise OperationError("attempt to receive through a publish port")
[docs] def getInfo(self): return self.info
[docs]class GroupSubPort(ConnPort,GroupSimplexPort): ''' Group subscriber port is for receiving application and housekeeping messages from all group members. ''' def __init__(self, parentPart, portName, groupSpec): ''' Constructor ''' super(GroupSubPort, self).__init__(parentPart, portName,groupSpec) # self.type = groupSpec["message"] # self.isTimed = groupSpec["timed"] self.deadline = None # self.portScope = PortScope.GLOBAL # self.pubs = [] self.sendTime = 0.0 self.recvTime = 0.0 self.info = None
[docs] def setup(self): pass
[docs] def setupSocket(self, owner): return self.setupConnSocket(owner,zmq.SUB,'gsub',[(zmq.SUBSCRIBE,'')])
# self.setOwner(owner) # self.socket = self.context.socket(zmq.SUB) # self.socket.setsockopt_string(zmq.SUBSCRIBE, '') # self.setupCurve(False) # self.host = '' # globalHost = self.getGlobalIface() # self.portNum = -1 # self.host = globalHost # self.info = PortInfo(portKind='gsub', portScope=self.portScope, portName=self.name, # msgType=self.type, # portHost=self.host, portNum=self.portNum) # return self.info
[docs] def closeSocket(self): self.closeConnSocket()
[docs] def reset(self): self.resetConnSocket(zmq.SUB,[(zmq.SUBSCRIBE,'')])
[docs] def getSocket(self): return self.socket
[docs] def inSocket(self): return True
# def update(self, host, port): # pubPort = "tcp://" + str(host) + ":" + str(port) # self.pubs.append((host, port)) # self.socket.connect(pubPort)
[docs] def recv_pyobj(self): raise OperationError("Unsupported recv_pyobj() on GroupSubPort")
# return self.port_recv(True)
[docs] def send_pyobj(self, msg): raise OperationError("attempt to send through a subscriber port")
[docs] def recv(self): raise OperationError("Unsupported recv() on GroupSubPort")
# return self.port_recv(False)
[docs] def send(self, _msg): raise OperationError("attempt to send through a subscriber port")
[docs] def recvGroup(self): try: msgFrames = self.socket.recv_multipart() except zmq.error.ZMQError as e: raise PortError("recv error (%d)" % e.errno, e.errno) from e if self.isTimed: self.recvTime = time.time() _msgType = msgFrames[0] _payload = msgFrames[1] if len(msgFrames) == 3: rawMsg = msgFrames[2] rawTuple = struct.unpack("d", rawMsg) self.sendTime = rawTuple[0] return msgFrames
[docs] def getInfo(self): return self.info
[docs]class GroupDuplexPort(Port): def __init__(self, parentComponent, portName, groupSpec): ''' GroupDuplexPort constructor ''' super().__init__(parentComponent, portName) self.req_type = 'group-mtl' self.rep_type = 'group-mfl' self.isTimed = groupSpec["timed"] self.portScope = PortScope.GLOBAL self.msgType = str(self.req_type) + '#' + str(self.rep_type) self.info = None
[docs]class GroupAnsPort(BindPort,GroupDuplexPort): ''' Group answer port is for the leader to receive messages from members. Based on a DEALER socket. Group-internal communication port for messaging with the leader, no message type, but can be timed. ''' def __init__(self, parentPart, portName, groupSpec): ''' Constructor ''' super(GroupAnsPort, self).__init__(parentPart, portName, groupSpec) # self.req_type = 'group-mtl' # self.rep_type = 'group-mfl' # self.isTimed = groupSpec["timed"] # self.portScope = PortScope.GLOBAL self.identity = None self.socket = None self.portNum = None self.bindAddr = None self.poller = None # self.info = None
[docs] def setup(self): pass
[docs] def setupSocket(self, owner): self.setOwner(owner) self.socket = self.context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.SNDTIMEO, self.sendTimeout) self.setupCurve(True) globalHost = self.getGlobalIface() self.host = globalHost self.portNum = None self.info = PortInfo(portKind='gans', portScope=self.portScope, portName=self.name, msgType=str(self.req_type) + '#' + str(self.rep_type), portHost=self.host, portNum=self.portNum) return self.info
[docs] def closeSocket(self): if self.portNum != None: self.socket.setsockopt(zmq.LINGER, 0) self.socket.unbind(self.bindAddr) self.portNum = None if self.socket != None: self.socket.close() # Close and destroy old socket del self.socket self.socket = None
[docs] def update(self): raise OperationError("Unsupported update() on GroupAnsPort")
[docs] def updatePoller(self, poller): if self.portNum != None: self.socket.setsockopt(zmq.LINGER, 0) poller.unregister(self.self.socket) # Unregister old socket as tainted self.socket.close() # Close and destroy old socket del self.socket self.socket = self.context.socket(zmq.ROUTER) # Create new socket self.setupCurve(True) # Set up encryption poller.register(self.socket, zmq.POLLIN) # Register with poller bindAddr = "tcp://" + self.host self.portNum = self.socket.bind_to_random_port(bindAddr) self.bindAddr = "%s:%d" % (bindAddr, self.portNum) self.info = PortInfo(portKind='gans', portScope=self.portScope, portName=self.name, msgType=str(self.req_type) + '#' + str(self.rep_type), portHost=self.host, portNum=self.portNum)
[docs] def reset(self): pass
[docs] def getSocket(self): return self.socket
[docs] def inSocket(self): return True
[docs] def get_identity(self): return self.identity
[docs] def set_identity(self, identity): self.identity = identity
[docs] def getPortNumber(self): return self.portNum
[docs] def recvFromMember(self): try: msgFrames = self.socket.recv_multipart() # Receive multipart (IDENTITY + payload) message except zmq.error.ZMQError as e: raise PortError("recv error (%d)" % e.errno, e.errno) from e if self.isTimed: self.recvTime = time.time() self.identity = msgFrames[0] # Separate identity _msgType = msgFrames[1] _payload = msgFrames[2] if len(msgFrames) == 4: # If we have a send time stamp rawMsg = msgFrames[3] rawTuple = struct.unpack("d", rawMsg) self.sendTime = rawTuple[0] return msgFrames[1:]
[docs] def sendToMember(self, msgType, msg): try: sendMsg = [zmq.Frame(self.identity)] # Identity sendMsg += [zmq.Frame(msgType)] sendMsg += [zmq.Frame(msg)] # Take bytes if self.isTimed: now = time.time() now = struct.pack("d", now) nowFrame = zmq.Frame(now) sendMsg += [nowFrame] self.socket.send_multipart(sendMsg) except zmq.error.ZMQError as e: raise PortError("send error (%d)" % e.errno, e.errno) from e return True
[docs] def recv_pyobj(self): raise OperationError("Unsupported recv_pyobj() on GroupAnsPort")
# return self.ans_port_recv(True)
[docs] def send_pyobj(self, msg): raise OperationError("Unsupported send_pyobj() on GroupAnsPort")
# return self.ans_port_send(msg,True)
[docs] def recv(self): raise OperationError("Unsupported recv() on GroupAnsPort")
# return self.ans_port_recv(False)
[docs] def send(self, _msg): raise OperationError("Unsupported send() on GroupAnsPort")
# return self.ans_port_send(False)
[docs] def getInfo(self): return self.info
[docs]class GroupQryPort(ConnPort, GroupDuplexPort): ''' Group query port is for accessing the leader from members. Based on a DEALER socket. Group-internal communication port for messaging with the leader, no message type, but can be timed. ''' def __init__(self, parentPart, portName, groupSpec): ''' Initialize the query port object. ''' super(GroupQryPort, self).__init__(parentPart, portName, groupSpec) # self.req_type = 'group-mtl' # self.rep_type = 'group-mfl' # self.isTimed = groupSpec["timed"] # self.portScope = PortScope.GLOBAL self.serverHost = None self.serverPort = None self.info = None
[docs] def setup(self): ''' Set up the port ''' pass
[docs] def setupSocket(self, owner): ''' Set up the socket of the port. Return a tuple suitable for querying the discovery service for the servers (not used currently). ''' self.setOwner(owner) self.socket = self.context.socket(zmq.DEALER) self.socket.setsockopt_string(zmq.IDENTITY, str(id(self)), 'utf-8') # FIXME: identity is not unique across nodes self.socket.setsockopt(zmq.SNDTIMEO, self.sendTimeout) self.setupCurve(False) globalHost = self.getGlobalIface() self.portNum = -1 self.host = globalHost self.info = PortInfo(portKind='gqry', portScope=self.portScope, portName=self.name, msgType=str(self.req_type) + '#' + str(self.rep_type), portHost=self.host, portNum=self.portNum) return self.info
[docs] def closeSocket(self): if self.socket != None: if self.serverHost != None and self.serverPort != None: # Old connection self.socket.setsockopt(zmq.LINGER, 0) oldConn = "tcp://" + str(self.serverHost) + ":" + str(self.serverPort) self.socket.disconnect(oldConn) self.socket.close() del self.socket self.socket = None
[docs] def reset(self): pass
[docs] def getSocket(self): ''' Return the socket of port ''' return self.socket
[docs] def inSocket(self): ''' Return True because the socket is used of input ''' return True
[docs] def update(self, host, port): ''' Update the query port -- connect its socket to a server ''' if self.serverHost == host and self.serverPort == port: return if self.serverHost != None and self.serverPort != None: # Old connection self.socket.setsockopt(zmq.LINGER, 0) oldConn = "tcp://" + str(self.serverHost) + ":" + str(self.serverPort) self.socket.disconnect(oldConn) # Alternate code below - more radical update # self.poller.register(self.qry,0) # Unregister old socket as tainted # self.qry.close() # Close it # del self.qry # self.qry = self.context.socket(zmq.DEALER) # Create new socket # self.qry.setsockopt_string(zmq.IDENTITY, str(id(self)), 'utf-8') # self.poller.register(self.qry,zmq.POLLIN) # Register it else: pass self.serverHost, self.serverPort = host, port if self.serverHost != None and self.serverPort != None: newConn = "tcp://" + str(self.serverHost) + ":" + str(self.serverPort) self.socket.connect(newConn)
[docs] def recv_pyobj(self): raise OperationError("Unsupported recv_pyobj() on GroupQryPort")
# return self.port_recv(True)
[docs] def send_pyobj(self, msg): raise OperationError("Unsupported send_pyobj() on GroupQryPort")
# return self.port_send(msg,True)
[docs] def recv(self): raise OperationError("Unsupported recv() on GroupQryPort")
# return self.port_recv(False)
[docs] def send(self, _msg): raise OperationError("Unsupported send() on GroupQryPort")
# return self.port_send(msg,True)
[docs] def sendToLeader(self, msgType, msg): if self.serverHost == None or self.serverPort == None: return False try: msgFrames = [zmq.Frame(msgType)] msgFrames += [zmq.Frame(msg)] if self.isTimed: now = time.time() now = struct.pack("d", now) nowFrame = zmq.Frame(now) msgFrames += [nowFrame] self.socket.send_multipart(msgFrames) except zmq.error.ZMQError as e: raise PortError("send error (%d)" % e.errno, e.errno) from e return True
[docs] def recvFromLeader(self): try: msgFrames = self.socket.recv_multipart() except zmq.error.ZMQError as e: raise PortError("recv error (%d)" % e.errno, e.errno) from e if self.isTimed: self.recvTime = time.time() _msgType = msgFrames[0] _payload = msgFrames[1] if len(msgFrames) == 3: rawMsg = msgFrames[2] rawTuple = struct.unpack("d", rawMsg) self.sendTime = rawTuple[0] return msgFrames
[docs] def getInfo(self): ''' Retrieve relevant information about this port ''' return self.info