'''
Part class
Created on Oct 9, 2016
@author: run
'''
import importlib
import threading
import zmq
import time
from enum import Enum
from .port import PortInfo
from .pubPort import PubPort
from .subPort import SubPort
from .cltPort import CltPort
from .srvPort import SrvPort
from .reqPort import ReqPort
from .repPort import RepPort
from .qryPort import QryPort
from .ansPort import AnsPort
from .timPort import TimPort
from .insPort import InsPort
from .exc import StateError
from .comp import ComponentThread
from .exc import SetupError
from .exc import ControlError
from .exc import BuildError
import logging
[docs]class Part(object):
'''
Part class to encapsulate and manage component (and its thread)
'''
[docs] class State(Enum): # Component state codes
Starting = 0
Initial = 1
Ready = 2
Active = 3
Checkpointing = 4
Inactive = 5
Passive = 6
Destroyed = 7
_mods = {}
@property
def mods(self):
return self._mods
@mods.setter
def mods(self, val):
self._mods = val
def __init__(self, parentActor, iTypeDef, iName, iTypeName, iArgs):
'''
Construct the Part object, load the component implementation and construct its object
'''
self.logger = logging.getLogger(__name__)
self.state = Part.State.Starting
self.name = iName
self.parent = parentActor
self.type = iTypeDef
self.typeName = self.type['name']
self.args = iArgs
self.context = parentActor.context
self.appContext = parentActor.appContext
self.load()
self.class_ = getattr(self.module_, self.typeName)
self.class_.OWNER = self # Trick to set the OWNER of the component
self.logger.info('Constructing %s of type %s' % (iName, self.typeName))
self.instance = self.class_(**self.args) # Run the component constructor
self.class_.OWNER = None
self.control = None
self.thread = None
self.buildAllPorts(self.type["ports"]) # Build all the ports of the component
self.scheduler = self.type.get("scheduler", "default")
self.state = Part.State.Initial
[docs] def getName(self):
return self.name
[docs] def getTypeName(self):
return self.typeName
[docs] def getActorName(self):
return self.parent.getActorName()
[docs] def getAppName(self):
return self.parent.getAppName()
[docs] def getActorID(self):
return self.parent.getActorID()
[docs] def getUUID(self):
return self.parent.getUUID()
[docs] def load(self):
'''
Load the component implementation code
'''
if self.typeName not in self.mods: # If not loaded yet
try:
self.module_ = importlib.import_module(self.typeName) # Execute the loader
self.mods[self.typeName] = self.module_
except Exception as e:
print ("%s: %s" % (type(e), e))
raise
else:
self.module_ = self.mods[self.typeName]
[docs] def buildPorts(self, res, key, ports, class_):
'''
Build the port objects of a kind of this part
'''
portDict = ports[key]
for port in portDict:
portName = port
portSpec = portDict[portName]
res[portName] = class_(self, portName, portSpec)
[docs] def buildAllPorts(self, portSpecs):
'''
Build all the ports of the part
'''
self.ports = {}
self.buildPorts(self.ports, 'pubs', portSpecs, PubPort)
self.buildPorts(self.ports, 'subs', portSpecs, SubPort)
self.buildPorts(self.ports, 'clts', portSpecs, CltPort)
self.buildPorts(self.ports, 'srvs', portSpecs, SrvPort)
self.buildPorts(self.ports, 'reqs', portSpecs, ReqPort)
self.buildPorts(self.ports, 'reps', portSpecs, RepPort)
self.buildPorts(self.ports, 'qrys', portSpecs, QryPort)
self.buildPorts(self.ports, 'anss', portSpecs, AnsPort)
self.buildPorts(self.ports, 'tims', portSpecs, TimPort)
self.buildPorts(self.ports, 'inss', portSpecs, InsPort)
for portName in self.ports:
# The port will be accessible in the component instance under its own name
setattr(self.instance, portName, self.ports[portName])
[docs] def setupPorts(self, ports):
'''
Set up all the ports of this part
'''
for portName in ports:
ports[portName].setup()
[docs] def sendControl(self, cmd, timeOut):
'''
Send a control message to component thread
'''
if self.control != None:
self.control.setsockopt(zmq.SNDTIMEO, timeOut)
self.control.send_pyobj(cmd)
# def getControl(self):
# return self.control
[docs] def setup(self, control_):
'''
Set up the part and change its state to Ready
'''
if self.state != Part.State.Initial:
raise StateError("Invalid state %s in setup()" % self.state)
# Control socket for communicating with the component thread
self.control = control_
# self.context.socket(zmq.PAIR)
# self.control.bind('inproc://part_' + self.name + '_control')
self.setupPorts(self.ports)
self.thread = ComponentThread(self) # Create component thread
self.thread.start()
time.sleep(0.01) # Hack to yield to the component thread
self.sendControl("build", -1) # Command the component thread to build itself
prefix = (self.name, self.typeName)
queue = []
while 1: # Wait for a response from the component thread
msg = self.control.recv_pyobj()
if msg == "done": # OK, we are done
break;
res = msg # Otherwise append the response to the queue
if type(res) is PortInfo and \
res.portKind in {'pub', 'sub', \
'clt', 'srv', \
'req', 'rep', \
'qry', 'ans'}:
queue.append([prefix,res])
else:
raise BuildError("invalid response from ComponentThread %s" % msg)
# Process all component thread responses
for elt in queue:
self.parent.registerEndpoint(elt)
self.state = Part.State.Ready
[docs] def handlePortUpdate(self, portName, host, port):
'''
Handle a port update message coming from the discovery service
'''
self.logger.info("handlePortUpdate %s %s %s" % (portName, str(host), str(port)))
msg = ("portUpdate", portName, host, port)
self.control.send_pyobj(msg) # Relay message to component thread
# rep = self.control.recv_pyobj() # Wait for an OK response
# if rep == "ok" :
# pass
# else:
# pass
# def activatePorts(self, ports):
# '''
# Activate all ports of this part
# '''
# for portName in ports:
# ports[portName].activate()
#
[docs] def activate(self):
'''
Activate this part
'''
if not self.state in (Part.State.Ready, Part.State.Passive, Part.State.Inactive):
raise StateError("Invalid state %s in activate()" % self.state)
# self.activatePorts(self.ports) # Activate parts
self.sendControl("activate", -1) # Send activation command to component thread
self.state = Part.State.Active
# def deactivatePorts(self, ports):
# '''
# Deactivate all ports
# '''
# for portName in ports:
# ports[portName].deactivate()
[docs] def deactivate(self):
if self.state != Part.State.Active:
raise StateError("Invalid state %s in deactivate()" % self.state)
# self.deactivatePorts(self.ports)
self.sendControl("deactivate", -1) # Send deactivation command to component thread
self.state = Part.State.Inactive
[docs] def passivate(self):
if self.state != Part.State.Active:
raise StateError("Invalid state %s in passivate()" % self.state)
self.sendControl("passivate", -1) # Send passivate command to component thread
self.state = Part.State.Passive
[docs] def reactivate(self):
if self.state != Part.State.Ready:
raise StateError("Invalid state %s in reactivate()" % self.state)
self.state = Part.State.Active
[docs] def checkpoint(self):
if self.state != Part.State.Active:
raise StateError("Invalid state %s in checkpoint()" % self.state)
# Checkpoint
[docs] def destroy(self):
if self.state == Part.State.Destroyed:
raise StateError("Invalid state %s in destroy()" % self.state)
# Destroy thread
[docs] def handleReinstate(self):
'''
Reinstate providers with a restarted disco
'''
self.logger.info("handleReinstate - %s:%s" % (self.name, self.typeName))
prefix = (self.name, self.typeName)
for portName in self.ports:
port = self.ports[portName]
info = port.getInfo()
portType = info.portKind
if portType in {'pub', 'srv', 'rep', 'ans'}:
endp = [prefix, info]
self.parent.registerEndpoint(endp)
[docs] def handleCPULimit(self):
self.logger.info("handleCPULimit - %s:%s" % (self.name, self.typeName))
msg = ("limitCPU",)
self.sendControl(msg,-1) # Relay message to component thread
[docs] def handleMemLimit(self):
self.logger.info("handleMemLimit - %s:%s" % (self.name, self.typeName))
msg = ("limitMem",)
self.sendControl(msg,-1) # Relay message to component thread
[docs] def handleSpcLimit(self):
self.logger.info("handleSpcLimit - %s:%s" % (self.name, self.typeName))
msg = ("limitSpc",)
self.sendControl(msg,-1) # Relay message to component thread
[docs] def handleNetLimit(self):
self.logger.info("handleNetLimit - %s:%s" % (self.name, self.typeName))
msg = ("limitNet",)
self.sendControl(msg,-1) # Relay message to component thread
[docs] def handleNICStateChange(self, state):
self.logger.info("handleNICStateChange - %s:%s NIC %s" % (self.name, self.typeName, state))
msg = ("nicState", state)
self.sendControl(msg,-1) # Relay message to component thread
[docs] def handlePeerStateChange(self, state, uuid):
self.logger.info("handlePeerStateChange - %s:%s peer %s at %s"
% (self.name, self.typeName, state, uuid))
msg = ("peerState", state, uuid)
self.sendControl(msg,-1) # Relay message to component thread
[docs] def terminate(self):
self.logger.info("terminating %s" % self.typeName)
self.sendControl("kill", -1) # Send message to the thread to kill itself
time.sleep(0.1)
if self.thread != None:
self.thread.join()
for portObj in self.ports.values():
portObj.terminate()
self.logger.info("terminated %s" % self.typeName)