Source code for chemicalchecker.database.calcdata

"""Generic table for calculated data.

The class defined here is a generic table definition for calculated data.
That is, data that is pre-calculated or readily available in a correctly
formatted table (e.g. all the ``A`` chemistry spaces use this class in their
pre-processing scripts).
The table names are the technical description of the content of the table e.g.:

   * ``morgan_fp_r2_2048`` for ``A1``
   * ``e3fp_3conf_1024`` for ``A2``
   * ``murcko_1024_cframe_1024`` for ``A3``
   * ``maccs_keys_166`` for ``A4``
   * ``general_physchem_properties`` for ``A5``

The table is very simple and only include two fields:

   * the molecule ``InChIKey``
   * the ``raw`` signature 0

The data filling these tables is generated by :class:`~chemicalchecker.util.parser.data_calculator.DataCalculator`
by a method with the same name of the table (e.g. :meth:`~chemicalchecker.util.parser.data_calculator.DataCalculator.morgan_fp_r2_2048`)

Example::

    from chemicalchecker.database import Calcdata
    cd = Calcdata('morgan_fp_r2_2048')
    data = cd.get('RZVAJINKPMORJF-UHFFFAOYSA-N'))
    data.raw
    >>> '167,202,389,403,725,745,807,1017,1057,1299,1313,1380,1602,1613,1723,1750,1778,1854,1873'

"""
import os
import h5py
import datetime
import numpy as np
from time import time

from sqlalchemy import Column, Text
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import declarative_base

from .database import get_session, get_engine
from .molecule import Molecule

from chemicalchecker.util import logged, Config
from chemicalchecker.util.hpc import HPC
from chemicalchecker.util.parser import DataCalculator


[docs]def Calcdata(table_name): """Factory for Generic table.""" DynamicBase = declarative_base(class_registry=dict()) config = Config() @logged class GenericCalcdata(DynamicBase): """Generic table class.""" __tablename__ = table_name inchikey = Column(Text, primary_key=True) raw = Column(Text) dbname = config.DB.calcdata_dbname @staticmethod def add(kwargs): """Method to add a new row to the table. Args: kwargs(dict):The data in dictionary format . """ GenericCalcdata.__log.debug(type(kwargs)) if type(kwargs) is dict: prop = GenericCalcdata(**kwargs) else: raise Exception("Input data is not a dictionary.") if Molecule.get(prop.inchikey) is None: raise Exception("The inchikey " + str(prop.inchikey) + " is not present in table molecule.") GenericCalcdata.__log.debug(prop.inchikey) session = get_session(GenericCalcdata.dbname) session.add(prop) session.commit() session.close() @staticmethod def get(key): """Method to query table.""" session = get_session(GenericCalcdata.dbname) query = session.query(GenericCalcdata).filter_by(inchikey=key) res = query.one_or_none() session.close() return res @staticmethod def _create_table(): engine = get_engine(GenericCalcdata.dbname) DynamicBase.metadata.create_all(engine) @staticmethod def get_properties_from_list(keys): size = 1000 props = set() session = get_session(GenericCalcdata.dbname) for pos in range(0, len(keys), size): query = session.query(GenericCalcdata).filter( GenericCalcdata.inchikey.in_(keys[pos:pos + size]), GenericCalcdata.raw.isnot(None)) res = query.with_entities( GenericCalcdata.inchikey, GenericCalcdata.raw).all() props.update(res) session.close() return list(props) @staticmethod def get_missing_from_set(keys): size = 1000 present = set() vec = list(keys) session = get_session(GenericCalcdata.dbname) for pos in range(0, len(keys), size): query = session.query(GenericCalcdata).filter( GenericCalcdata.inchikey.in_(vec[pos:pos + size])) res = query.with_entities(GenericCalcdata.inchikey).all() for ele in res: present.add(ele[0]) session.close() GenericCalcdata.__log.debug( "Found already present: " + str(len(present))) return set(keys).difference(present) @staticmethod def from_inchikey(inchikey, **kwargs): inchikey_inchi = Molecule.get_inchikey_inchi_mapping(inchikey) GenericCalcdata.from_inchikey_inchi(inchikey_inchi, **kwargs) @staticmethod def from_inchikey_inchi(inchikey_inchi, missing_only=True, chunksize=1000): """Fill the property table given InChIKey to InChI map.""" if isinstance(inchikey_inchi, list): if len(inchikey_inchi[0]) != 2: raise Exception( "Inchikey_inchi variable is not a list of tuples " + "(InChIKey, InChI)") inchikey_inchi_final = dict(inchikey_inchi) else: inchikey_inchi_final = inchikey_inchi if missing_only: set_inks = set(inchikey_inchi_final.keys()) GenericCalcdata.__log.debug( "Size initial data to add: " + str(len(set_inks))) todo_iks = GenericCalcdata.get_missing_from_set(set_inks) GenericCalcdata.__log.debug( "Size final data to add: " + str(len(todo_iks))) dict_inchikey_inchi = { k: inchikey_inchi_final[k] for k in todo_iks} else: dict_inchikey_inchi = inchikey_inchi_final Molecule.add_missing_only(inchikey_inchi_final) # parse_fn yield a list of dictionaries with keys as a molprop parse_fn = DataCalculator.calc_fn(GenericCalcdata.__tablename__) # profile time t_start = time() engine = get_engine(GenericCalcdata.dbname) with engine.begin() as conn: for chunk in parse_fn(dict_inchikey_inchi, chunksize): if len(chunk) == 0: continue GenericCalcdata.__log.debug( "Loading chunk of size: " + str(len(chunk))) conn.execute( postgresql.insert(GenericCalcdata.__table__).values( chunk).on_conflict_do_nothing( index_elements=[GenericCalcdata.inchikey])) t_end = time() t_delta = str(datetime.timedelta(seconds=t_end - t_start)) GenericCalcdata.__log.info( "Loading Mol properties Name %s took %s", GenericCalcdata.__tablename__, t_delta) @staticmethod def calcdata_hpc(job_path, inchikey, **kwargs): """Run HPC jobs to calculate data from inchikey_inchi data. job_path(str): Path (usually in scratch) where the script files are generated. inchikey(list): List of inchikey. cpu: Number of cores each job will use(default:1) wait: Wait for the job to finish (default:True) memory: Maximum memory the job can take in Gigabytes(default: 5) num_jobs: Number of HPC jobs(default: 200) chunk_dbload: Number of elements loaded to the database (default: 1000) cc_config: configuration file (.json) """ # create job directory if not available cc_config = kwargs.get("cc_config", os.environ['CC_CONFIG']) cfg = Config(cc_config) if not os.path.isdir(job_path): os.mkdir(job_path) cpu = kwargs.get("cpu", 1) wait = kwargs.get("wait", True) memory = kwargs.get("memory", 5) num_jobs = kwargs.get("num_jobs", 200) chunk_dbload = kwargs.get("chunk_dbload", 1000) # create script file script_lines = [ "import sys, os", "import pickle", "import h5py", "from chemicalchecker.database import Calcdata", "task_id = sys.argv[1]", # <TASK_ID> "filename = sys.argv[2]", # <FILE> # load pickled data "inchikey = pickle.load(open(filename, 'rb'))[task_id]", "mol = Calcdata('" + GenericCalcdata.__tablename__ + "')", 'mol.from_inchikey(inchikey, ' 'missing_only=False, chunksize=%d)' % chunk_dbload, "print('JOB DONE')" ] script_name = os.path.join(job_path, 'molprop_script.py') with open(script_name, 'w') as fh: for line in script_lines: fh.write(line + '\n') # HPC job parameters params = {} params["num_jobs"] = num_jobs params["jobdir"] = job_path params["job_name"] = "CC_MLP_" + GenericCalcdata.__tablename__ params["elements"] = inchikey params["wait"] = wait params["cpu"] = cpu params["memory"] = memory params["compress"] = False # job command singularity_image = cfg.PATH.SINGULARITY_IMAGE command = ("SINGULARITYENV_PYTHONPATH={}" "SINGULARITYENV_CC_CONFIG={}" " singularity exec {} python {} <TASK_ID> <FILE>") command = command.format( os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config, singularity_image, script_name) # submit jobs cluster = HPC.from_config(cfg) cluster.submitMultiJob(command, **params) return GenericCalcdata