'''
Created on Oct 10, 2016
@author: riaps
'''
from .port import Port,PortScope,PortInfo
import threading
import zmq
import time
import logging
import struct
from enum import Enum,auto
from .exc import OperationError
[docs]class TimerThread(threading.Thread):
[docs] class Command(Enum): # Timer command codes
TERMINATE = auto()
ACTIVATE = auto()
DEACTIVATE = auto()
START = auto()
CANCEL = auto()
HALT = auto()
def __init__(self, parent):
threading.Thread.__init__(self,daemon=False)
self.logger = logging.getLogger(__name__)
self.name = parent.instName
self.context = parent.context
if parent.period == 0:
self.period = None
self.periodic = False
else:
self.period = parent.period * 0.001 # millisec
self.periodic = True
self.delay = None
self.socket = None
self._ready = threading.Event() # Timer thread ready to accept commands
self._ready.clear()
self._running = False # Timer ('counter') is running
# self.active = threading.Event()
# self.active.clear()
# self.waiting = threading.Event()
# self.waiting.clear()
# self.terminated = threading.Event()
# self.terminated.clear()
# self.started = threading.Event()
# self.started.clear()
[docs] def ready(self):
return self._ready
[docs] def cmdError(self,where,cmd):
self.logger.error("Timer %s:%s: cmd = %r" % (self.name,where,cmd))
[docs] def waitFor(self,timeout=None):
res = self.poller.poll(timeout)
if len(res) == 0:
return None
else:
(s,_m) = res[0]
return s.recv_pyobj()
[docs] def run(self):
self.socket = self.context.socket(zmq.PAIR) # PUB
self.socket.bind('inproc://timer_' + self.name)
self.poller = zmq.Poller()
self.poller.register(self.socket,zmq.POLLIN)
self._ready.set() # Ready to accept commands
self.timeout = None
self.active = False
self._running = False
self.skip = False
self.last = None
while 1:
msg = self.waitFor(self.timeout)
if msg == TimerThread.Command.TERMINATE: break # Terminated
elif msg == None: # Timeout
if not self.active: # Wait if not active
self.timeout = None
continue
self.last = time.time()
if self._running:
if self.periodic and self.skip:
self.skip = False
else:
self.socket.send_pyobj(self.last)
if self.periodic: # Periodic: again
self.timeout = int(self.period * 1000)
pass
else: # Sporadic: wait for next command
self._running = False
self.timeout = None
continue
elif msg == TimerThread.Command.ACTIVATE:
self.active = True
self.last = None
elif msg == TimerThread.Command.DEACTIVATE:
self.active = False
self.timeout = None
self.last = None
elif msg == TimerThread.Command.START:
if self.active:
if self.periodic:
self.timeout = int(self.period * 1000)
else:
self.timeout = int(self.delay * 1000)
self._running = True
else: # Not active
self.cmdError('not active',msg)
continue
elif msg == TimerThread.Command.CANCEL:
if self.periodic:
if self.last != None: # Skip next firing
delay = self.last + self.period - time.time()
self.timeout = int(delay * 1000)
self.skip = True
else:
self._running = False
self.timeout = None
elif msg == TimerThread.Command.HALT:
self.timeout = None
self._running = False
else:
self.cmdError('loop',msg)
# self.active.wait(None) # Wait for activation
# if self.terminated.is_set(): break # If terminated, we exit
# if self.periodic: # Periodic timer
# while 1:
# self.started.wait(None)
# if self.terminated.is_set(): break
# cancelled = self.waiting.wait(self.period) # Wait for period
# if self.terminated.is_set(): break
# if cancelled: # Period was cancelled
# self.waiting.clear() # Start next period, but do not send tick
# continue
# if self.active.is_set() and self.started.is_set(): # Send tick (if active)
# value = time.time()
# self.socket.send_pyobj(value)
# else: # One shot timer
# while 1:
# self.started.wait(None) # Wait for start
# if self.terminated.is_set() or not self.active.is_set(): break
# assert self.delay != None and self.delay > 0.0
# cancelled = self.waiting.wait(self.delay) # Wait for the delay
# if self.terminated.is_set(): break
# self.started.clear() # We are not started anymore
# if cancelled: # Delay was cancelled
# self.waiting.clear() # Enable next waiting, but do not send tick
# continue
# if self.active.is_set(): # Send tick (if active)
# value = time.time()
# self.socket.send_pyobj(value)
# if self.terminated.is_set(): break # Terminated
pass
# def activate(self):
# '''
# Activate the timer port
# '''
# self.socket.send_pyobj(TimerThread.Command.ACTIVATE)
# if self.periodic:
# self.socket.send_pyobj(TimerThread.Command.START)
# def deactivate(self):
# '''
# Deactivate the timer port
# '''
# self.socket.send_pyobj(TimerThread.Command.DEACTIVATE)
# def terminate(self):
# '''
# Terminate the timer
# '''
# self.socket.send_pyobj(TimerThread.Command.TERMINATE)
[docs] def getPeriod(self):
'''
Read out the period
'''
return self.period
[docs] def setPeriod(self, _period):
'''
Set the period (for periodic timer).
Takes effect after the next firing.
'''
assert type(_period) == float and _period > 0.0
self.period = _period
[docs] def getDelay(self):
'''
Get the current delay (for sporadic timer)
'''
return self.delay
[docs] def setDelay(self, _delay):
'''
Set the current delay (for sporadic timer)
'''
assert type(_delay) == float and _delay > 0.0
self.delay = _delay
# def launch(self):
# '''
# Launch the timer
# '''
# self.socket.send_pyobj(TimerThread.Command.START)
[docs] def running(self):
'''
Returns True if the timer is running
'''
return self._running
# def cancel(self):
# '''
# Cancel the sporadic timer
# '''
# self.socket.send_pyobj(TimerThread.Command.CANCEL)
# # if self.started.is_set():
# # self.waiting.set() # Go to wait mode if started
# # else:
# # pass # Ignore if not started
# def halt(self):
# '''
# Halt the timer
# '''
# self.socket.send_pyobj(TimerThread.Command.HALT)
# # self.started.clear()
[docs]class TimPort(Port):
'''
Timer port
'''
def __init__(self, parentPart, portName, portSpec):
'''
Constructor
'''
super(TimPort, self).__init__(parentPart, portName, portSpec)
self.logger = logging.getLogger(__name__)
self.instName = self.parent.name + '.' + self.name
self.period = portSpec["period"]
self.deadline = portSpec.get("deadline",0) * 0.001 # msec
self.thread = None
self.info = None
[docs] def setup(self):
self.thread = TimerThread(self)
self.thread.start()
self.thread.ready().wait() # Wait until thread is ready to receive commands
[docs] def setupSocket(self, owner):
self.setOwner(owner)
assert self.instName == self.thread.name
self.socket = self.context.socket(zmq.PAIR) # SUB
self.socket.connect('inproc://timer_' + self.instName)
# self.socket.setsockopt_string(zmq.SUBSCRIBE, u'')
self.info = PortInfo(portKind='tim', portScope=PortScope.INTERNAL, portName=self.name,
msgType='tick', portHost='', portNum=-1)
return self.info
[docs] def activate(self):
'''
Activate the timer port
'''
if self.thread != None:
self.socket.send_pyobj(TimerThread.Command.ACTIVATE)
if self.thread.getPeriod(): # Periodic timer
self.socket.send_pyobj(TimerThread.Command.START)
[docs] def deactivate(self):
'''
Deactivate the timer port
'''
if self.thread != None:
self.socket.send_pyobj(TimerThread.Command.DEACTIVATE)
[docs] def terminate(self):
'''
Terminate the timer
'''
if self.thread != None:
self.logger.info("terminating")
self.socket.send_pyobj(TimerThread.Command.TERMINATE)
# self.thread.terminate()
self.thread.join()
self.logger.info("terminated")
[docs] def getPeriod(self):
'''
Read the period of the periodic timer
'''
if self.thread != None:
return self.thread.getPeriod()
else:
return None
[docs] def setPeriod(self, _period):
'''
Set the period - will be changed after the next firing.
Period must be positive
'''
if not (type(_period) == float and _period > 0.0):
raise OperationError("invalid argument %s" % str(_period))
if self.thread != None:
self.thread.setPeriod(_period)
[docs] def getDelay(self):
'''
Get the current delay (for sporadic timer)
'''
if self.thread != None:
return self.thread.getDelay()
else:
return None
[docs] def setDelay(self, _delay):
'''
Set the current delay (for sporadic timer)
'''
if not (type(_delay) == float and _delay > 0.0):
raise OperationError("invalid argument %s" % str(_delay))
if self.thread != None:
self.thread.setDelay(_delay)
[docs] def launch(self):
'''
Launch (start) the sporadic timer
'''
if self.thread != None:
# self.thread.launch()
self.socket.send_pyobj(TimerThread.Command.START)
[docs] def running(self):
'''
Returns True if the timer is running
'''
if self.thread != None:
return self.thread.running()
else:
return None
[docs] def cancel(self):
'''
Cancel the sporadic timer
'''
if self.thread != None:
# self.thread.cancel()
self.socket.send_pyobj(TimerThread.Command.CANCEL)
[docs] def halt(self):
'''
Halt the timer
'''
if self.thread != None:
# self.thread.halt()
self.socket.send_pyobj(TimerThread.Command.HALT)
[docs] def getSocket(self):
return self.socket
[docs] def inSocket(self):
return True
[docs] def recv_pyobj(self):
res = self.socket.recv_pyobj()
return res
[docs] def send_pyobj(self, msg):
raise OperationError("attempt to send through a timer port")
[docs] def recv(self):
'''
Receive time stamp (a float) as a byte array
'''
value = self.socket.recv_pyobj()
res = bytearray(struct.pack("f", value))
return res
[docs] def send(self):
raise OperationError("attempt to send through a timer port")
[docs] def getInfo(self):
return self.info