Source code for chemicalchecker.util.pipeline.pipeline

"""Utility to run pipelines."""
import os
import logging

from chemicalchecker.util import logged


[docs]@logged class Pipeline(): """Pipeline class. Run pipelines according to config files. """ def __init__(self, pipeline_path, config, keep_jobs=False, only_tasks=[], exclude_tasks=[]): """Initialize a Pipeline instance. pipeline_path (str): Path where the pipeline will set its structure. keep_jobs (bool): If True temporary job directories will not be deleted. """ self.tasks = [] self.pipeline_path = pipeline_path self.__log.info("PIPELINE PATH: {}".format(self.pipeline_path)) self.config = config self.only_tasks = only_tasks self.exclude_tasks = exclude_tasks self.readydir = os.path.join(self.pipeline_path, "ready") self.logdir = os.path.join(self.pipeline_path, "log") self.tmpdir = os.path.join(self.pipeline_path, "tmp") self.cachedir = os.path.join(self.pipeline_path, "cache") self.keep_jobs = keep_jobs # check and make needed directories if not os.path.exists(self.pipeline_path): os.makedirs(self.pipeline_path) if not os.path.exists(self.readydir): os.makedirs(self.readydir) if not os.path.exists(self.tmpdir): os.makedirs(self.tmpdir) if not os.path.exists(self.logdir): os.makedirs(self.logdir) if not os.path.exists(self.cachedir): os.makedirs(self.cachedir) # log to file self.logfile = os.path.join(pipeline_path, "log", "pipeline.log") logger = logging.getLogger() fh = logging.FileHandler(self.logfile) fh.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(asctime)s %(name)-12s [%(levelname)-8s] %(message)s') fh.setFormatter(formatter) logger.addHandler(fh) def _check_task(self, task): """Decide if task can be included.""" if any([task.name.startswith(st) for st in self.exclude_tasks]): self.__log.info("Task EXCLUDED: '%s'", task.name) return False if len(self.only_tasks) > 0: if any([task.name.startswith(st) for st in self.only_tasks]): self.__log.info("Task SELECTED: '%s'", task.name) return True else: self.__log.info("Task EXCLUDED: '%s'", task.name) return False self.__log.info("Task INCLUDED: '%s'", task.name) return True
[docs] def add_task(self, task): """Add tasks to the pipeline.""" task.set_dirs(self.readydir, self.tmpdir, self.cachedir) task.keep_jobs = self.keep_jobs task.config = self.config if self._check_task(task): self.tasks.append(task)
[docs] def insert_task(self, position, task): """Add tasks to the pipeline.""" task.set_dirs(self.readydir, self.tmpdir, self.cachedir) task.keep_jobs = self.keep_jobs task.config = self.config if self._check_task(task): self.tasks.insert(position, task)
[docs] def run(self): """Run the pipeline.""" for task in self.tasks: # check if already done if task.is_ready(): self.__log.info( "Task: '%s' already done. Skipping...", task.name) continue # run it self.__log.info( "Starting task: '%s' ...", task.name) task.run() # after runnin we expect the step to be in ready state if not task.is_ready(): # if not we report the error self.__log.error( "Pipeline failed in task %s. Please, check errors.", task.name) break else: self.__log.info( "Done task: '%s' ...", task.name)
[docs] def clean(self, step=None): """Clean all or some of the pipeline steps.""" if step is None: for current_step in self.tasks: # clean it current_step.clean() else: for task in self.tasks: if task.name == step: task.clean()