'''
Resource monitors
Created on Nov 23, 2017
@author: riaps
'''
from cgroupspy import trees
import os
import stat
import time
import signal
import subprocess
import psutil
import threading
from threading import RLock
import logging
import traceback
from pwd import getpwnam
from butter.eventfd import Eventfd
from pyroute2 import DQuotSocket
import zmq
from zmq import devices
from zmq.error import ZMQError
from riaps.consts.defs import *
from riaps.run.exc import *
from riaps.proto import deplo_capnp
from riaps.utils.sudo import riaps_sudo
from riaps.utils.config import Config
from riaps.utils.names import *
[docs]class MemMonitorThread(threading.Thread):
def __init__(self,parent,efd,usage):
threading.Thread.__init__(self,name='MemMonitorThread',daemon=False)
self.logger = logging.getLogger(__name__)
self.parent = parent
self.context = self.parent.context
self.efd = efd
self.usage = usage
self.terminated = threading.Event() # Set when the monitor is to be terminated
self.terminated.clear()
self.running = threading.Event() # Cleared when the monitor is not to run
self.running.set()
self.stopped = threading.Event() # Set if the monitor is stopped
self.stopped.clear()
self.lock = RLock()
self.notifier = None
self.notifierPort = None
self.devices = { }
self.alive = False
[docs] def setup(self,proc):
self.proc = proc
[docs] def addClientDevice(self,appName,actorName,device):
while self.notifierPort == None:
time.sleep(0.1)
with self.lock:
device.connect_in('tcp://127.0.0.1:%i' % self.notifierPort)
key = str(appName) + "." + str(actorName)
identity = actorIdentity(appName,actorName,self.proc.pid)
self.logger.info("zmqdev id = %s" % identity)
self.devices[key] = (device,identity)
[docs] def is_running(self):
return self.alive
[docs] def restart(self):
self.stopped.clear()
self.running.set()
[docs] def run(self):
self.name = 'MemMonitorThread-%r' % self.ident
self.notifier = self.context.socket(zmq.ROUTER)
# self.notifier.setsockopt(zmq.SNDTIMEO,const.deplEndpointSendTimeout)
self.notifierPort = self.notifier.bind_to_random_port('tcp://127.0.0.1')
self.alive = True
self.logger.info("MemMonitor:%d at %d" % (self.proc.pid,self.efd.fileno()))
time.sleep(0.001)
try:
while True:
if not self.running.is_set():
self.stopped.set()
self.running.wait()
if self.terminated.is_set(): break
# current = os.read(self.efd.fileno(),8) # .read_event()
current = self.efd._read_events()
if self.terminated.is_set(): break
if not self.running.is_set(): continue
self.logger.info("MemMonitor[%d] mem limit %d exceeded - %s" % (self.proc.pid,self.usage,str(current)))
with self.lock:
for key,pair in self.devices.items():
_device,identity = pair
msg = deplo_capnp.DeplCmd.new_message()
msgCmd = msg.init('resourceMsg')
msgMessage = msgCmd.init('resMemX')
msgMessage.msg = "X"
msgBytes = msg.to_bytes()
payload = zmq.Frame(msgBytes)
header = identity.encode(encoding='utf-8')
self.notifier.send_multipart([header,payload])
self.logger.info("XMem sent to [%d]" % (self.proc.pid))
# self.proc.send_signal(signal.SIGUSR1)
time.sleep(1.0)
if self.terminated.is_set(): break
except:
self.logger.error("MemMonitorThread failure")
traceback.print_exc()
self.logger.info("MemMonitor terminated")
[docs] def stop(self):
self.running.clear()
self.efd.increment()
self.stopped.wait()
[docs] def terminate(self):
self.terminated.set()
self.efd.increment()
if self.stopped.is_set():
self.stopped.clear()
self.running.set()