Source code for riaps.rfab.api.task

import logging.handlers
from threading import Thread
import time
from invoke.exceptions import UnexpectedExit
import socket
from fabric import Result as FabResult
from fabric.connection import Connection
from fabric.transfer import Result as TransferResult
import logging
import sys
from pathlib import Path
from shutil import rmtree
import traceback


[docs]def Result_to_log(res): log = [f"Command: {res.command}", f"Exited: {res.exited}"] for t,stream in zip(["STDOUT","STDERR"],[res.stdout,res.stderr]): if len(stream) > 0: # out = stream.splitlines() # if len(out) > 1: # lines = "\n".join([f"{t}:",]+out) # log.extend(lines) # else: # lines = f"{t}: {out[0]}" # log.append(lines) log.append(f"{t}: {stream}") return "\n".join(log)
[docs]def TransferResult_to_log(res): log = [f"TRANSFER FILE", f"Local (rel): {res.orig_local}", f"Local (abs): {res.local}", f"Remote (rel): {res.orig_remote}", f"Remote (abs): {res.remote}"] return "\n".join(log)
from enum import Enum, auto
[docs]class STATE(Enum): INIT = auto() SUCCEEDED = auto() FAILED = auto() EXCEPTED = auto()
[docs]class SkipResult(FabResult): def __init__(self,connection,command,stdout,stderr='',exited=0): super().__init__(connection=connection,command=command,stdout=stdout,stderr=stderr,exited=exited)
[docs]class Result(FabResult): pass
[docs]class BadExit(UnexpectedExit): def __init__(self,exc: UnexpectedExit,msg=None): super().__init__(exc.result,exc.reason) self.msg = msg or ""
[docs]class Task: def __init_subclass__(cls) -> None: super().__init_subclass__() cls._steps = {} cls._params = {} for name,f in cls.__dict__.items(): if name[0:2] != '__': if callable(f): cls._steps[name] = f else: cls._params[name] = f def __init__(self, connection, **kwargs, ): """Creates instance with instantiated contexts for each connection Parameters: connections (list[fabric.Connection]): Connections to each targeted host kwargs: kwargs passed to each connection-context """ self.connection: Connection = connection self.kwargs = kwargs self.kwargs['hide'] = True self.results = {name:None for name in self._steps.values()} self.state = STATE.INIT self.final_res = None parentlogger = logging.getLogger(self.__class__.__name__) self.logger = parentlogger.getChild(self.connection.host) self.logger.setLevel("INFO") self._step_gen = (iter(self._steps.keys())) self.curr_step: str = next(self._step_gen) def _run_one(self): func = self._steps[self.curr_step] self.logger.info(f"Running {self.curr_step}...") try: res = func(self) if isinstance(res,TransferResult): self.logger.info(TransferResult_to_log(res)) res = Result(connection=res.connection,stdout=f"{res.local} <--> {res.remote}") elif isinstance(res,FabResult): self.logger.info(Result_to_log(res)) self.results[self.curr_step] = res self.curr_step = next(self._step_gen) return True except StopIteration: self.final_res = self.results[self.curr_step] self.state = STATE.SUCCEEDED except UnexpectedExit as e: if not isinstance(e,BadExit): e = BadExit(e) self.final_res = self.results[self.curr_step] = e self.state = STATE.FAILED self.logger.warn(Result_to_log(e.result)) if len(e.msg): self.logger.warn(f"Hint: {e.msg}") self.logger.warn(f"{func.__qualname__} FAILED") except Exception as e: self.final_res = self.results[self.curr_step] = e self.state = STATE.EXCEPTED #traceback.print_exc(file=sys.stdout) self.logger.error(f"{func.__qualname__} EXCEPTION") self.logger.exception(e) return False def _run_all(self): while(more_steps := self._run_one()): pass
[docs] def done(self) -> bool: return self.state != STATE.INIT
[docs] def put(self, file, **kwargs): return self.connection.put(file,**kwargs)
[docs] def get(self, remote, local, **kwargs): return self.connection.get(remote,local,**kwargs)
[docs] def sudo(self,cmd,**kwargs): kw = self.kwargs | kwargs fail_msg = kwargs.pop("fail_msg",None) try: return self.connection.sudo(cmd,**kw) except UnexpectedExit as e: if fail_msg: raise BadExit(e,fail_msg) raise e
[docs] def run(self,cmd,**kwargs): fail_msg = kwargs.pop("fail_msg",None) kw = self.kwargs | kwargs try: return self.connection.run(cmd,**kw) except UnexpectedExit as e: if fail_msg: raise BadExit(e,fail_msg) raise e
def _print_multiline(log_func,header_str,stream,indent=0): lines = stream.splitlines() if len(lines) == 1: log_func(f"{' '.rjust(indent)}{header_str} {lines[0]}") return log_func(f"{' '.rjust(indent)}{header_str}") [log_func(f"{' '.rjust(indent+2)}{l}") for l in lines]
[docs]class TaskRunner: def __init__(self,hosts,task,**kwargs): super().__init__() self.taskClass = task self.hosts = hosts self.rootlogger = logging.getLogger("TaskRunner") self.rootlogger.addHandler(logging.StreamHandler(sys.stdout)) self.rootlogger.setLevel("WARN") if kwargs.pop('verbose',False): self.rootlogger.setLevel("INFO") self.log_folder = kwargs.pop('log_folder',None) self.logger = self.rootlogger.getChild('task-runner') self.kwargs = kwargs self.ctxs = {c:task(c,**kwargs) for c in hosts}
[docs] def set_log_folder(self,path): logdir = Path(path) assert logdir.exists(), f"Log folder does not exist: {logdir.absolute()}" for conn,ctx in self.ctxs.items(): ctx.logger.addHandler(logging.FileHandler(Path(logdir,conn.host))) ctx.logger.propagate = False self.logger.addHandler(logging.FileHandler(Path(logdir,"task-runner")))
[docs] def run(self): self._run_parallel() self.pretty_print()
def _run_parallel(self): #TODO: Possible move to asyncio? threads = [Thread(None,ctx._run_all,name=f"thread_{c.host}") for c,ctx in self.ctxs.items()] [t.start() for t in threads] running = True while(running): time.sleep(1) waitlist = ', '.join([t.name for t in threads if t.is_alive()]) if len(waitlist) == 0: break # print(f"Waiting on: {waitlist}")
[docs] def ok(self) -> bool: return all([ctx.state == STATE.SUCCEEDED for ctx in self.ctxs.values()])
[docs] def pretty_print(self): _succeeded = {c:ctx for c,ctx in self.ctxs.items() if ctx.state == STATE.SUCCEEDED} _failed = {c:ctx for c,ctx in self.ctxs.items() if ctx.state == STATE.FAILED} _excepted = {c:ctx for c,ctx in self.ctxs.items() if ctx.state == STATE.EXCEPTED} for c, ctx in self.ctxs.items(): try: assert(ctx.done()) except AssertionError: self.logger.error(f"NOT DONE FOR {c.host}") if _succeeded: self.logger.info(f"Succeeded ({len(_succeeded)}):") for c,ctx in _succeeded.items(): r = ctx.final_res _print_multiline(self.logger.info,f"{c.host}:",r.stdout,2) if _failed: self.logger.error(f"Failed ({len(_failed)}):") for c,ctx in _failed.items(): e: BadExit = ctx.final_res self.logger.error(f" host: {c.host}:") self.logger.error(f" cmd: {e.result.command}") self.logger.error(f" exit code: {e.result.exited}") if len(e.msg): self.logger.error(f" Hint: {e.msg}") if len(e.result.stdout): _print_multiline(self.logger.error,f"STDOUT:",e.result.stdout,4) if len(e.result.stderr): _print_multiline(self.logger.error,f"STDERR:",e.result.stderr,4) if _excepted: self.logger.error(f"Excepted ({len(_excepted)}):") for c,ctx in _excepted.items(): r = ctx.final_res hint=None exception_hints=[(socket.gaierror,"No known address for host")] for eh in exception_hints: T, h = eh if isinstance(r,T): hint = h _print_multiline(self.logger.error,f"{c.host}.exception:",str(r),2) if hint is not None: _print_multiline(self.logger.error,f"^^^ HINT:",hint,2)