Source code for chemicalchecker.util.hpc.local

"""Run job in local system."""
import os
import re
import glob
import uuid
import pickle
import tarfile
import subprocess
import numpy as np

from chemicalchecker.util import logged

STARTED = "started"
DONE = "done"
READY = "ready"
ERROR = "error"


[docs]@logged class local(): """Local job class.""" jobStatusSuffix = ".status" def __init__(self, **kwargs): """Initialize the local class.""" self.error_finder = kwargs.get("error_finder", self.__find_error) self.statusFile = None self.status_id = None def _chunks(self, l, n): """Yield successive n-sized chunks from l.""" if isinstance(l, list) or isinstance(l, np.ndarray): for i in np.array_split(l, n): yield i elif isinstance(l, dict): keys = list(l.keys()) for i in np.array_split(keys, n): yield {k: l[k] for k in i} else: raise Exception("Element datatype not supported: %s" % type(l))
[docs] def submitMultiJob(self, command, **kwargs): """Submit multiple job/task.""" # get arguments or default values num_jobs = int(kwargs.get("num_jobs", 1)) self.jobdir = kwargs.get("jobdir", '') self.job_name = kwargs.get("job_name", 'hpc_cc_job') elements = kwargs.get("elements", []) compress_out = kwargs.get("compress", True) check_error = kwargs.get("check_error", True) cpu = 2 cpusafe = kwargs.get("cpusafe", True) # Remove the call to singularity since we are already in a singularity # image cmd_split = command.split() if 'singularity' in cmd_split: sidx = cmd_split.index('singularity') cmd_split_tmp = cmd_split[:sidx] + cmd_split[sidx + 3:] command = ' '.join(cmd_split_tmp) command = command.replace('SINGULARITYENV_', '') self.__log.debug("Job name is: " + self.job_name) if not os.path.exists(self.jobdir): os.makedirs(self.jobdir) if (len(elements) == 0 and num_jobs > 1): raise Exception( "Number of specified jobs does not match to the number of elements") if num_jobs == 0: raise Exception("Number of specified jobs is zero") if len(elements) > 0: self.__log.debug("Num elements submitted " + str(len(elements))) input_dict = dict() for cid, chunk in enumerate(self._chunks(elements, num_jobs), 1): input_dict[str(cid)] = chunk input_path = os.path.join(self.jobdir, str(uuid.uuid4())) with open(input_path, 'wb') as fh: pickle.dump(input_dict, fh, protocol=2) command = command.replace("<FILE>", input_path) if cpusafe: # set environment variable that limit common libraries cpu # ubscription for the command env_vars = [ 'OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMEXPR_NUM_THREADS' ] command = ' '.join(["%s=%s" % (v, str(cpu)) for v in env_vars] + [command]) # Creates the final job.sh self.__log.info("Writing script file for in dir " + self.jobdir) for i in range(num_jobs): cmd_run = command.replace("<TASK_ID>", str(i + 1)) self.__log.info("Running job %d/%d", i + 1, num_jobs) self.__log.debug("CMD: %s", cmd_run) with open(os.path.join(self.jobdir, "log" + str(i + 1) + ".txt"), 'w') as f: subprocess.call([cmd_run], stdout=f, stderr=f, shell=True) self.statusFile = os.path.join( self.jobdir, self.job_name + self.jobStatusSuffix) errors = None with open(self.statusFile, "w") as f: f.write(DONE) self.status_id = DONE if check_error: errors = self.check_errors() if compress_out and errors is None: self.compress() if errors is not None: return errors
def __find_error(self, files, print_logs): errors = '' for file_name in files: if print_logs: self.__log.debug('*' * 40) self.__log.debug('* START log for: %s', file_name) self.__log.debug('*' * 40) with open(file_name, 'r') as f: num = 1 for line in f: print('* ' + line.strip()) if re.search(r'(?i)error', line): errors += file_name + " " + str(num) + " " + line if 'Traceback (most recent call last)' in line: errors += file_name + " " + str(num) + " " + line num += 1 if print_logs: self.__log.debug('*' * 40) self.__log.debug('* END log for: %s', file_name) self.__log.debug('*' * 40) return errors
[docs] def check_errors(self, print_logs=True): """Check for errors in the output logs of the jobs.""" errors = '' self.__log.debug("Checking errors in job") files = [] for file_name in glob.glob(os.path.join(self.jobdir, 'log*')): files.append(file_name) errors = self.error_finder(files, print_logs) if len(errors) > 0: self.__log.debug("Found errors in job") if self.status_id == DONE: with open(self.statusFile, "w") as f: f.write(ERROR) self.status_id = ERROR return errors else: if self.status_id == DONE: with open(self.statusFile, "w") as f: f.write(READY) self.status_id = READY return None
[docs] def compress(self): """Compress the output logs.""" self.__log.debug("Compressing output job files...") tar = tarfile.open(os.path.join( self.jobdir, self.job_name + ".tar.gz"), "w:gz") for file_name in glob.glob(os.path.join(self.jobdir, 'log*')): tar.add(file_name, os.path.basename(file_name)) tar.close() for file_name in glob.glob(os.path.join(self.jobdir, 'log*')): os.remove(file_name)
[docs] def status(self): """Gets the status of the job submission.""" if self.statusFile is None: return None return self.status_id