'''
Resource monitors
Created on Nov 23, 2017
@author: riaps
'''
import os
import stat
import time
import signal
import subprocess
import psutil
import threading
from threading import RLock
import logging
import traceback
from pwd import getpwnam
from butter.eventfd import Eventfd
from pyroute2 import DQuotSocket
import socket
import zmq
from zmq import devices
from zmq.error import ZMQError
from riaps.consts.defs import *
from riaps.run.exc import *
from riaps.proto import deplo_capnp
from riaps.utils.sudo import riaps_sudo
from riaps.utils.config import Config
from riaps.utils.names import *
[docs]class SpcMonitorThread(threading.Thread):
def __init__(self, parent):
threading.Thread.__init__(self,name='SpcMonitorThread',daemon=False)
self.logger = logging.getLogger(__name__)
self.parent = parent
self.context = self.parent.context
self.terminated = threading.Event() # Set when the monitor is to be terminated
self.terminated.clear()
self.running = threading.Event() # Cleared when the monitor is not to run
self.running.set()
self.stopped = threading.Event() # Set if the monitor is stoppped
self.stopped.clear()
self.alive = False
self.uid2Name = { } # uid -> [ app.actor ]
self.rlock = RLock()
self.notifier = None
self.notifierPort = None
self.devices = { }
self.name2Uid = { } # app.actor -> uid
#self.pid2Key = { }
#self.pid2Uid = { }
self.ds = None
[docs] def addProc(self,appName,actName,proc):
self.logger.info("addProc %s.%s [%d]" % (appName,actName,proc.pid))
with self.rlock:
uids = psutil.Process(proc.pid).uids()
uid = uids.real # Real UID
fullName = appName + '.' + actName
if uid in self.uid2Name:
self.uid2Name[uid] += [fullName]
else:
self.uid2Name[uid] = [fullName]
self.name2Uid[fullName] = uid
# self.pid2Key[proc.pid] = '???'
[docs] def delProc(self,appName,actName,proc):
self.logger.info("delProc %s.%s [%d]" % (appName,actName,proc.pid))
# uids = psutil.Process(proc.pid).uids()
# uid = uids.real # Real UID
with self.rlock:
fullName = appName + '.' + actName
uid = self.name2Uid[fullName]
self.uid2Name[uid].remove(fullName)
del self.name2Uid[fullName]
# device = self.devices[fullName]
del self.devices[fullName]
# del self.pid2Key[proc.pid]
[docs] def addClientDevice(self,appName,actorName,device,proc):
self.logger.info("addClientDevice %s.%s [%d]" % (appName,actorName,proc.pid))
while self.notifierPort == None:
time.sleep(0.1)
with self.rlock:
key = str(appName) + "." + str(actorName)
device.connect_in('tcp://127.0.0.1:%i' % self.notifierPort)
identity = actorIdentity(appName,actorName,proc.pid)
self.logger.info("zmqdev id = %s" % identity)
self.devices[key] = (device,identity)
[docs] def is_running(self):
return self.alive
[docs] def restart(self):
self.stopped.clear()
self.running.set()
[docs] def run(self):
self.name='SpcMonitorThread-%r' % self.ident
self.logger.info("run: started")
self.alive = True
self.notifier = self.context.socket(zmq.ROUTER)
# self.notifier.setsockopt(zmq.SNDTIMEO,const.deplEndpointSendTimeout)
self.notifierPort = self.notifier.bind_to_random_port('tcp://127.0.0.1')
self.alive = True
while True:
try:
self.ds = DQuotSocket()
self.ds.settimeout(const.spcMonitorTimeout) # Timeout on quota system netlink socket @ 10 sec
if not self.running.is_set():
self.stopped.set()
self.running.wait()
if self.terminated.is_set(): break
self.logger.info("waiting on DQuotSocket")
try:
for msg in self.ds.get():
uid = msg.get_attr('QUOTA_NL_A_EXCESS_ID')
with self.rlock:
if uid in self.uid2Name:
self.logger.info("SPC limit exceeded by uid = %d" % (uid))
for fullName in self.uid2Name[uid]:
# self.logger.info('SIGUSR2 to [%d]' % (proc.pid))
# proc.send_signal(signal.SIGUSR2)
(_device,identity) = self.devices[fullName]
key = fullName
msg = deplo_capnp.DeplCmd.new_message()
msgCmd = msg.init('resourceMsg')
msgMessage = msgCmd.init('resSpcX')
msgMessage.msg = "X"
msgBytes = msg.to_bytes()
payload = zmq.Frame(msgBytes)
header = identity.encode(encoding='utf-8')
self.notifier.send_multipart([header,payload])
self.logger.info("XSpc sent to %s [%s]" % (fullName,identity))
time.sleep(0.1)
except socket.timeout:
self.ds.close()
self.ds = None
if self.terminated.is_set(): break
continue
except:
self.logger.error("SpcMonitorThread exception")
raise
if not self.running.is_set(): continue
if self.terminated.is_set(): break
except:
self.logger.error("SpcMonitorThread failure")
# traceback.print_exc()
break
if self.ds != None:
self.ds.close()
self.ds = None
self.logger.info("SpcMonitor terminated")
[docs] def stop(self):
self.logger.info("stop")
self.running.clear()
self.stopped.wait()
[docs] def terminate(self):
self.logger.info("terminating")
if self.ds != None:
self.ds.settimeout(0.1)
self.terminated.set()
if self.stopped.is_set():
self.stopped.clear()
self.running.set()
self.logger.info("terminated")