"""Generic table for calculated data.
The class defined here is a generic table definition for calculated data.
That is, data that is pre-calculated or readily available in a correctly
formatted table (e.g. all the ``A`` chemistry spaces use this class in their
pre-processing scripts).
The table names are the technical description of the content of the table e.g.:
* ``morgan_fp_r2_2048`` for ``A1``
* ``e3fp_3conf_1024`` for ``A2``
* ``murcko_1024_cframe_1024`` for ``A3``
* ``maccs_keys_166`` for ``A4``
* ``general_physchem_properties`` for ``A5``
The table is very simple and only include two fields:
* the molecule ``InChIKey``
* the ``raw`` signature 0
The data filling these tables is generated by :class:`~chemicalchecker.util.parser.data_calculator.DataCalculator`
by a method with the same name of the table (e.g. :meth:`~chemicalchecker.util.parser.data_calculator.DataCalculator.morgan_fp_r2_2048`)
Example::
from chemicalchecker.database import Calcdata
cd = Calcdata('morgan_fp_r2_2048')
data = cd.get('RZVAJINKPMORJF-UHFFFAOYSA-N'))
data.raw
>>> '167,202,389,403,725,745,807,1017,1057,1299,1313,1380,1602,1613,1723,1750,1778,1854,1873'
"""
import os
import h5py
import datetime
import numpy as np
from time import time
from sqlalchemy import Column, Text
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import declarative_base
from .database import get_session, get_engine
from .molecule import Molecule
from chemicalchecker.util import logged, Config
from chemicalchecker.util.hpc import HPC
from chemicalchecker.util.parser import DataCalculator
[docs]def Calcdata(table_name):
"""Factory for Generic table."""
DynamicBase = declarative_base(class_registry=dict())
config = Config()
@logged
class GenericCalcdata(DynamicBase):
"""Generic table class."""
__tablename__ = table_name
inchikey = Column(Text, primary_key=True)
raw = Column(Text)
dbname = config.DB.calcdata_dbname
@staticmethod
def add(kwargs):
"""Method to add a new row to the table.
Args:
kwargs(dict):The data in dictionary format .
"""
GenericCalcdata.__log.debug(type(kwargs))
if type(kwargs) is dict:
prop = GenericCalcdata(**kwargs)
else:
raise Exception("Input data is not a dictionary.")
if Molecule.get(prop.inchikey) is None:
raise Exception("The inchikey " + str(prop.inchikey) +
" is not present in table molecule.")
GenericCalcdata.__log.debug(prop.inchikey)
session = get_session(GenericCalcdata.dbname)
session.add(prop)
session.commit()
session.close()
@staticmethod
def get(key):
"""Method to query table."""
session = get_session(GenericCalcdata.dbname)
query = session.query(GenericCalcdata).filter_by(inchikey=key)
res = query.one_or_none()
session.close()
return res
@staticmethod
def _create_table():
engine = get_engine(GenericCalcdata.dbname)
DynamicBase.metadata.create_all(engine)
@staticmethod
def get_properties_from_list(keys):
size = 1000
props = set()
session = get_session(GenericCalcdata.dbname)
for pos in range(0, len(keys), size):
query = session.query(GenericCalcdata).filter(
GenericCalcdata.inchikey.in_(keys[pos:pos + size]),
GenericCalcdata.raw.isnot(None))
res = query.with_entities(
GenericCalcdata.inchikey, GenericCalcdata.raw).all()
props.update(res)
session.close()
return list(props)
@staticmethod
def get_missing_from_set(keys):
size = 1000
present = set()
vec = list(keys)
session = get_session(GenericCalcdata.dbname)
for pos in range(0, len(keys), size):
query = session.query(GenericCalcdata).filter(
GenericCalcdata.inchikey.in_(vec[pos:pos + size]))
res = query.with_entities(GenericCalcdata.inchikey).all()
for ele in res:
present.add(ele[0])
session.close()
GenericCalcdata.__log.debug(
"Found already present: " + str(len(present)))
return set(keys).difference(present)
@staticmethod
def from_inchikey(inchikey, **kwargs):
inchikey_inchi = Molecule.get_inchikey_inchi_mapping(inchikey)
GenericCalcdata.from_inchikey_inchi(inchikey_inchi, **kwargs)
@staticmethod
def from_inchikey_inchi(inchikey_inchi, missing_only=True,
chunksize=1000):
"""Fill the property table given InChIKey to InChI map."""
if isinstance(inchikey_inchi, list):
if len(inchikey_inchi[0]) != 2:
raise Exception(
"Inchikey_inchi variable is not a list of tuples " +
"(InChIKey, InChI)")
inchikey_inchi_final = dict(inchikey_inchi)
else:
inchikey_inchi_final = inchikey_inchi
if missing_only:
set_inks = set(inchikey_inchi_final.keys())
GenericCalcdata.__log.debug(
"Size initial data to add: " + str(len(set_inks)))
todo_iks = GenericCalcdata.get_missing_from_set(set_inks)
GenericCalcdata.__log.debug(
"Size final data to add: " + str(len(todo_iks)))
dict_inchikey_inchi = {
k: inchikey_inchi_final[k] for k in todo_iks}
else:
dict_inchikey_inchi = inchikey_inchi_final
Molecule.add_missing_only(inchikey_inchi_final)
# parse_fn yield a list of dictionaries with keys as a molprop
parse_fn = DataCalculator.calc_fn(GenericCalcdata.__tablename__)
# profile time
t_start = time()
engine = get_engine(GenericCalcdata.dbname)
with engine.begin() as conn:
for chunk in parse_fn(dict_inchikey_inchi, chunksize):
if len(chunk) == 0:
continue
GenericCalcdata.__log.debug(
"Loading chunk of size: " + str(len(chunk)))
conn.execute(
postgresql.insert(GenericCalcdata.__table__).values(
chunk).on_conflict_do_nothing(
index_elements=[GenericCalcdata.inchikey]))
t_end = time()
t_delta = str(datetime.timedelta(seconds=t_end - t_start))
GenericCalcdata.__log.info(
"Loading Mol properties Name %s took %s",
GenericCalcdata.__tablename__, t_delta)
@staticmethod
def calcdata_hpc(job_path, inchikey, **kwargs):
"""Run HPC jobs to calculate data from inchikey_inchi data.
job_path(str): Path (usually in scratch) where the script files are
generated.
inchikey(list): List of inchikey.
cpu: Number of cores each job will use(default:1)
wait: Wait for the job to finish (default:True)
memory: Maximum memory the job can take in Gigabytes(default: 5)
num_jobs: Number of HPC jobs(default: 200)
chunk_dbload: Number of elements loaded to the database
(default: 1000)
cc_config: configuration file (.json)
"""
# create job directory if not available
cc_config = kwargs.get("cc_config", os.environ['CC_CONFIG'])
cfg = Config(cc_config)
if not os.path.isdir(job_path):
os.mkdir(job_path)
cpu = kwargs.get("cpu", 1)
wait = kwargs.get("wait", True)
memory = kwargs.get("memory", 5)
num_jobs = kwargs.get("num_jobs", 200)
chunk_dbload = kwargs.get("chunk_dbload", 1000)
# create script file
script_lines = [
"import sys, os",
"import pickle",
"import h5py",
"from chemicalchecker.database import Calcdata",
"task_id = sys.argv[1]", # <TASK_ID>
"filename = sys.argv[2]", # <FILE>
# load pickled data
"inchikey = pickle.load(open(filename, 'rb'))[task_id]",
"mol = Calcdata('" + GenericCalcdata.__tablename__ + "')",
'mol.from_inchikey(inchikey, '
'missing_only=False, chunksize=%d)' % chunk_dbload,
"print('JOB DONE')"
]
script_name = os.path.join(job_path, 'molprop_script.py')
with open(script_name, 'w') as fh:
for line in script_lines:
fh.write(line + '\n')
# HPC job parameters
params = {}
params["num_jobs"] = num_jobs
params["jobdir"] = job_path
params["job_name"] = "CC_MLP_" + GenericCalcdata.__tablename__
params["elements"] = inchikey
params["wait"] = wait
params["cpu"] = cpu
params["memory"] = memory
params["compress"] = False
# job command
singularity_image = cfg.PATH.SINGULARITY_IMAGE
command = ("SINGULARITYENV_PYTHONPATH={}"
"SINGULARITYENV_CC_CONFIG={}"
" singularity exec {} python {} <TASK_ID> <FILE>")
command = command.format(
os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config,
singularity_image, script_name)
# submit jobs
cluster = HPC.from_config(cfg)
cluster.submitMultiJob(command, **params)
return GenericCalcdata