Source code for chemicalchecker.tool.targetmate.utils.hpc

import os

import tempfile
from chemicalchecker.util import Config
from chemicalchecker.util.hpc import HPC
from chemicalchecker.util import logged
import multiprocessing
import numpy as np
import time
import pickle

[docs]@logged class HPCUtils: def __init__(self, **kwargs): self.job_paths = [] @staticmethod def cpu_count(): return multiprocessing.cpu_count()
[docs] def waiter(self, jobs, secs=3): """Wait for jobs to finish""" if not jobs: return self.__log.info("Waiting for jobs to finish...") while np.any([job.status() != "done" for job in jobs]): for job in jobs: self.__log.debug(job.status()) time.sleep(secs) self.__log.info("Jobs done.")
[docs] def func_hpc(self, func_name, *args, **kwargs): """Execute the *any* method on the configured HPC. Args: args(tuple): the arguments for of the function method kwargs(dict): arguments for the HPC method. """ # read config file cc_config = kwargs.get("cc_config", os.environ['CC_CONFIG']) cfg = Config(cc_config) # create job directory if not available job_base_path = kwargs.get("job_base_path", cfg.PATH.CC_TMP) tmp_dir = tempfile.mktemp(prefix='tmp_', dir=job_base_path) job_path = kwargs.get("job_path", tmp_dir) self.job_paths += [job_path] if not os.path.isdir(job_path): os.makedirs(job_path, exist_ok=True) # check cpus cpu = kwargs.get("cpu", 1) memory = kwargs.get("memory", int(cpu*2)) # create script file script_lines = [ "import os, sys", "import pickle", "with open(sys.argv[1], 'rb') as f:" " obj, args = pickle.load(f)", "obj.%s(*args)" % func_name, "print('JOB DONE')" ] script_name = '%s_%s_hpc.py' % (self.__class__.__name__, func_name) script_path = os.path.join(job_path, script_name) with open(script_path, 'w') as fh: for line in script_lines: fh.write(line + '\n') # pickle self and fit args pickle_file = '%s_%s_hpc.pkl' % (self.__class__.__name__, func_name) pickle_path = os.path.join(job_path, pickle_file) with open(pickle_path, "wb") as f: try: pickle.dump((self, args), f) except: pickle.dump((self, args), f, protocol = 4) #Added by Paula: Protocol 4 allows pickling of larger data objects. Consider only using this protocol in case of large object (see difference in in file size) 11/12/20 # hpc parameters params = kwargs params["num_jobs"] = 1 params["jobdir"] = job_path params["job_name"] = script_name params["cpu"] = cpu params["memory"] = memory params["wait"] = False # job command singularity_image = cfg.PATH.SINGULARITY_IMAGE command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={}" +\ " singularity exec --bind /aloy/web_checker/:/aloy/web_checker {} python {} {}" command = command.format( os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config, singularity_image, script_name, pickle_file) # submit jobs cluster = HPC.from_config(cfg) cluster.submitMultiJob(command, **params) return cluster