'''
Resource monitors
Created on Nov 23, 2017
@author: riaps
'''
import os
import stat
import time
#import signal
import psutil
import threading
from threading import RLock
import logging
import traceback
import zmq
from zmq import devices
from zmq.error import ZMQError
from riaps.consts.defs import *
from riaps.run.exc import *
from riaps.proto import deplo_capnp
from riaps.utils.config import Config
from riaps.utils.names import *
[docs]class CPUMonitorThread(threading.Thread):
'''
Thread for monitoring an actor and enforcing resource limits.
'''
def __init__(self,parent,interval,usage):
threading.Thread.__init__(self,name = 'CPUMonitorThread',daemon=False)
self.logger = logging.getLogger(__name__)
self.parent = parent
self.context = self.parent.context
self.interval = interval # sec
self.usage = usage # % INT
self.terminated = threading.Event() # Set when the monitor is to be terminated
self.terminated.clear()
self.running = threading.Event() # Cleared when the monitor is not to run
self.running.set()
self.stopped = threading.Event() # Set if the monitor is stoppped
self.stopped.clear()
self.lock = RLock()
self.notifier = None
self.notifierPort = None
self.devices = { }
self.alive = False
[docs] def setup(self,proc):
self.proc = proc
self.mon = psutil.Process(proc.pid)
[docs] def addClientDevice(self,appName,actorName,device):
while self.notifierPort == None:
time.sleep(0.1)
with self.lock:
device.connect_in('tcp://127.0.0.1:%i' % self.notifierPort)
key = str(appName) + "." + str(actorName)
identity = actorIdentity(appName,actorName,self.proc.pid)
self.logger.info("zmqdev id = %s" % identity)
self.devices[key] = (device,identity)
[docs] def is_running(self):
return self.alive
[docs] def restart(self):
self.stopped.clear()
self.running.set()
[docs] def run(self):
self.name = 'CPUMonitorThread-%r' % self.ident
self.notifier = self.context.socket(zmq.ROUTER)
# self.notifier.setsockopt(zmq.SNDTIMEO,const.deplEndpointSendTimeout)
self.notifierPort = self.notifier.bind_to_random_port('tcp://127.0.0.1')
self.alive = True
current = self.mon.cpu_percent(self.interval)
time.sleep(0.001)
while True:
if not self.running.is_set():
self.stopped.set()
self.running.wait()
if self.terminated.is_set(): break
current = self.mon.cpu_percent(self.interval)
if self.terminated.is_set(): break
if current == 0: continue
self.logger.info("XCPU to [%d]? %d > %d in %f" % (self.proc.pid,current,self.usage,self.interval))
if current > self.usage:
# print ("SIGXCPU sent")
# self.proc.send_signal(signal.SIGXCPU)
with self.lock:
for key,pair in self.devices.items():
(_dev,identity) = pair
msg = deplo_capnp.DeplCmd.new_message()
msgCmd = msg.init('resourceMsg')
msgMessage = msgCmd.init('resCPUX')
msgMessage.msg = "X"
msgBytes = msg.to_bytes()
payload = zmq.Frame(msgBytes)
header = identity.encode(encoding='utf-8')
self.notifier.send_multipart([header,payload])
self.logger.info("XCPU sent to [%d]" % (self.proc.pid))
time.sleep(1.0)
# self.mon = psutil.Process(self.proc.pid)
# current = self.mon.cpu_percent(self.interval)
if self.terminated.is_set(): break
self.logger.info("CPUMonitor terminated")
[docs] def stop(self):
self.running.clear()
self.stopped.wait()
[docs] def terminate(self):
self.terminated.set()
if self.stopped.is_set():
self.stopped.clear()
self.running.set()