Source code for riaps.deplo.procm

'''
Created on Apr 7, 2018

@author: riaps
'''
import os,signal
import sys
import time
import json
import hashlib
from os.path import join
from collections import namedtuple
import subprocess
import threading
import logging
import traceback
import psutil
import pwd

import zmq

from riaps.consts.defs import *
from riaps.utils.sudo import is_su
from riaps.utils.config import Config
from riaps.run.exc import BuildError

ProcessMonitorRecord = namedtuple('ProcessMonitorRecord', 'name proc thread')

[docs]class ProcessMonitor(threading.Thread): ''' Thread for monitoring a process ''' def __init__(self,parent,qualName): threading.Thread.__init__(self,name='ProcessMonitor:%r' % qualName,daemon=False) self.logger = logging.getLogger(__name__) self.parent = parent self.context = parent.context self.endpoint = parent.endpoint self.proc = None self.command = None self.proc_name = None self.terminated = threading.Event() # Thread must terminate self.terminated.clear() self.released = threading.Event() # Process to be released self.released.clear()
[docs] def setup(self,record): self.proc_name = record.name self.proc = record.proc
[docs] def run(self): self.dealer = self.context.socket(zmq.DEALER) self.dealer.setsockopt(zmq.IDENTITY, str(self.proc_name).encode(encoding='utf_8')) self.dealer.connect(self.endpoint) self.unexpected = False while True: self.proc.wait() if self.terminated.is_set(): break code = self.proc.returncode if self.released.is_set(): self.logger.info("expected termination: %s[%r]" % (self.proc_name,code)) break else: # Process termination was unexpected self.unexpected = True self.logger.error("unexpected termination: %s[%r]" % (self.proc_name,code)) self.dealer.send_pyobj((self.proc_name,)) # Ask parent to restart, wait until completed _resp = self.dealer.recv_pyobj() self.unexpected = False self.dealer.close()
[docs] def release(self): self.released.set() self.logger.info("released proc %s" % self.proc_name)
[docs] def error(self): return self.unexpected
[docs] def terminate(self): self.logger.info("terminating") self.terminated.set()
[docs]class ProcessManager(object): ''' Manages processes: service(s) and actors started by deplo ''' def __init__(self,parent): ''' Constructor ''' self.logger = logging.getLogger(__name__) self.depm = parent self.context = parent.context self.endpoint = None self.monitors = { } self.lock = threading.RLock()
[docs] def monitor(self,qualName,proc): self.logger.info(" monitoring %s" % (qualName)) with self.lock: if self.endpoint == None: self.endpoint = self.depm.procMonEndpoint if qualName in self.monitors: old = self.monitors[qualName] new = ProcessMonitorRecord(name=old.name,proc=proc,thread=old.thread) old.thread.setup(new) self.monitors[qualName] = new else: thread = ProcessMonitor(self,qualName) record = ProcessMonitorRecord(name=qualName,proc=proc,thread=thread) self.monitors[qualName] = record thread.setup(record) thread.start() time.sleep(0.01)
[docs] def release(self,qualName): self.logger.info("releasing %s" % qualName) with self.lock: if qualName in self.monitors: record = self.monitors[qualName] thread = record.thread if not thread.error(): thread.release() del self.monitors[qualName] else: self.logger.error(" unknown process %s" % qualName)
def __destroy__(self): for mon in self.monitors.keys(): thread = self.monitors[mon] thread.terminate() self.monitors.clear()