Source code for chemicalchecker.util.pipeline.tasks_web.task_web_molinfo

import os
import csv
import h5py
import shutil
import tempfile
import numpy as np
from scipy.stats import rankdata

from chemicalchecker.util import psql
from chemicalchecker.database import Molecule
from chemicalchecker.util.pipeline import BaseTask
from chemicalchecker.util import logged, HPC


# We got these strings by doing: pg_dump -t 'scores' --schema-only mosaic
# -h aloy-dbsrv

DROP_TABLE_MOLINFO = "DROP TABLE IF EXISTS public.molecular_info"

DROP_TABLE_STRUCTURE = "DROP TABLE IF EXISTS public.structure"

CREATE_TABLE_MOLINFO = """CREATE TABLE public.molecular_info (
    inchikey text,
    formula text,
    popularity double precision,
    singularity double precision,
    mappability double precision,
    mw double precision,
    ro5 integer,
    qed double precision
);"""

CREATE_TABLE_STRUCTURE = """CREATE TABLE public.structure (
    inchikey character varying(27) NOT NULL,
    inchi text
);"""

CREATE_INDEX_MOLINFO = """
CREATE INDEX scores_inchikey_idx ON public.molecular_info USING btree (inchikey);
"""

CREATE_INDEX_STRUCTURE = """
CREATE INDEX structure_inchikey_idx ON public.structure USING btree (inchikey);
"""

INSERT_MOLINFO = "INSERT INTO molecular_info VALUES %s"

INSERT_STRUCTURE = "INSERT INTO structure VALUES %s"

COUNT_MOLINFO = "SELECT COUNT(*) FROM molecular_info"

COUNT_STRUCTURE = "SELECT COUNT(*) FROM structure"


[docs]@logged class MolecularInfo(BaseTask): def __init__(self, name=None, **params): task_id = params.get('task_id', None) if task_id is None: params['task_id'] = name BaseTask.__init__(self, name, **params) self.DB = params.get('DB', None) if self.DB is None: raise Exception('DB parameter is not set') self.CC_ROOT = params.get('CC_ROOT', None) if self.CC_ROOT is None: raise Exception('CC_ROOT parameter is not set')
[docs] def run(self): """Run the molecular info step.""" script_path = os.path.join(os.path.dirname( os.path.realpath(__file__)), "scripts/scores.py") try: self.__log.info("Creating table molinfo") psql.query(DROP_TABLE_MOLINFO, self.DB) psql.query(CREATE_TABLE_MOLINFO, self.DB) self.__log.info("Creating table structure") psql.query(DROP_TABLE_STRUCTURE, self.DB) psql.query(CREATE_TABLE_STRUCTURE, self.DB) except Exception as e: if not self.custom_ready(): raise Exception(e) else: self.__log.error(e) return universe_file = os.path.join(self.cachedir, "universe.h5") consensus_file = os.path.join(os.path.dirname( os.path.realpath(__file__)), "data/consensus.h5") with h5py.File(universe_file) as h5: keys = h5["keys"][:] datasize = keys.shape[0] self.__log.info("Genretaing molecular info for " + str(keys.shape[0]) + " molecules") keys.sort() job_path = tempfile.mkdtemp( prefix='jobs_molinfo_', dir=self.tmpdir) data_files_path = tempfile.mkdtemp( prefix='molinfo_data_', dir=self.tmpdir) params = {} params["num_jobs"] = datasize / 1000 params["jobdir"] = job_path params["job_name"] = "CC_MOLINFO" params["elements"] = keys params["wait"] = True params["mem_by_core"] = 4 params["memory"] = 4 params["cpu"] = 1 # job command cc_config_path = self.config.config_path cc_package = os.path.join(self.config.PATH.CC_REPO, 'package') singularity_image = self.config.PATH.SINGULARITY_IMAGE command = "OMP_NUM_THREADS=1 SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={} singularity exec {} python {} <TASK_ID> <FILE> {} {} {}" command = command.format( cc_package, cc_config_path, singularity_image, script_path, consensus_file, data_files_path, self.CC_ROOT) # submit jobs cluster = HPC.from_config(self.config) jobs = cluster.submitMultiJob(command, **params) V = [] iks = [] formula = [] done_iks = set() for l in os.listdir(data_files_path): with open(os.path.join(data_files_path, l), "r") as f: for r in csv.reader(f, delimiter="\t"): if r[0] in done_iks: continue iks += [r[0]] formula += [r[1]] V += [[r[2], r[3], r[4], r[5], r[6], r[7]]] done_iks.add(r[0]) V = np.array(V).astype(np.float) if V.shape[0] != datasize: raise Exception( "Generated molecular info does not include all universe molecules (%d/%d)" % (V.shape[0], datasize)) # Singularity V[:, 1] = rankdata(-V[:, 1]) / V.shape[0] # Mappability V[:, 2] = rankdata(V[:, 2]) / V.shape[0] # insert scores/molinfos index = range(0, datasize) for i in range(0, datasize, 1000): sl = slice(i, i + 1000) S = ["('%s', '%s', %.3f, %.3f, %.3f, %.3f, %.3f, %.3f)" % (iks[i], formula[i], V[i, 0], V[i, 1], V[i, 2], V[i, 3], V[i, 4], V[i, 5]) for i in index[sl]] try: psql.query(INSERT_MOLINFO % ",".join(S), self.DB) except Exception as e: print(e) # insert structures inchikey_inchi = Molecule.get_inchikey_inchi_mapping(keys) inchikey_inchi_str = ["('%s', '%s')" % (a, b) for a, b in list(inchikey_inchi.items())] for i in range(0, datasize, 1000): sl = slice(i, i + 1000) S = inchikey_inchi_str[sl] try: psql.query(INSERT_STRUCTURE % ",".join(S), self.DB) except Exception as e: print(e) try: self.__log.info("Checking tables") count = psql.qstring(COUNT_MOLINFO, self.DB) if int(count[0][0]) != datasize: if not self.custom_ready(): raise Exception( "Not all universe keys were added to molecular_info (%d/%d)" % (int(count[0][0]), datasize)) else: self.__log.error( "Not all universe keys were added to molecular_info (%d/%d)" % (int(count[0][0]), datasize)) count = psql.qstring(COUNT_STRUCTURE, self.DB) if int(count[0][0]) != datasize: if not self.custom_ready(): raise Exception( "Not all universe keys were added to structure (%d/%d)" % (int(count[0][0]), datasize)) else: self.__log.error( "Not all universe keys were added to structure (%d/%d)" % (int(count[0][0]), datasize)) else: self.__log.info("Indexing table") psql.query(CREATE_INDEX_MOLINFO, self.DB) psql.query(CREATE_INDEX_STRUCTURE, self.DB) shutil.rmtree(job_path, ignore_errors=True) shutil.rmtree(data_files_path, ignore_errors=True) self.mark_ready() except Exception as e: if not self.custom_ready(): raise Exception(e) else: self.__log.error(e)
[docs] def execute(self, context): """Run the molprops step.""" self.tmpdir = context['params']['tmpdir'] self.run()