"""Molrepo definition.
The Molrepo is a molecule repository (aka library or collection) exposing
mappings between various textual representations (SMILES, InChI and InChIKey)
for different set of molecules.
Example::
from chemicalchecker.database import Molrepo
molrep = Molrepo.get('drugbank')[0]
len(molrep.molecules)
>>> 9167
"""
import os
import datetime
import tempfile
from time import time
import sqlalchemy
from sqlalchemy.dialects import postgresql
from sqlalchemy import Column, Text, Boolean, ForeignKey, VARCHAR
from sqlalchemy.orm import class_mapper, ColumnProperty, relationship
from .molecule import Molecule
from .database import Base, get_engine, get_session
from chemicalchecker.util.hpc import HPC
from chemicalchecker.util.parser import Parser
from chemicalchecker.util import logged, Config
from chemicalchecker.util.decorator import cached_property
[docs]@logged
class Molrepo(Base):
"""Molrepo table class.
This table offer a mapping between inchikeys and different external
compound ids (e.g. chembl, bindigdb, etc.).
Fields:
id(str): primary key, src_id + "_" + molrepo_name.
molrepo_name(str): the molrepo name.
src_id(str): the download id as in the source file.
smiles(str): simplified molecular-input line-entry system (SMILES).
inchikey(bool): hashed version of the full InChI (SHA-256 algorithm).
inchi(bool): International Chemical Identifier (InChI).
"""
__tablename__ = 'molrepo'
molrepo_name = Column(Text, primary_key=True)
description = Column(Text)
universe = Column(Boolean)
essential = Column(Boolean)
datasources = relationship("Datasource",
secondary="molrepo_has_datasource",
back_populates="molrepos",
lazy='joined')
[docs] def __repr__(self):
"""String representation."""
return str(self.molrepo_name)
@staticmethod
def _create_table():
engine = get_engine()
Base.metadata.create_all(engine, tables=[Molrepo.__table__])
@staticmethod
def _drop_table():
engine = get_engine()
Molrepo.__table__.drop(engine)
@staticmethod
def _table_exists():
engine = get_engine()
return sqlalchemy.inspect(engine).has_table(Molrepo.__tablename__)
@staticmethod
def _table_attributes():
attrs = [a for a in class_mapper(Molrepo).iterate_properties]
col_attrs = [a.key for a in attrs if isinstance(a, ColumnProperty)]
input_attrs = [a for a in col_attrs if a != 'id']
return input_attrs
[docs] @staticmethod
def add(kwargs):
"""Add a new row to the table.
Args:
kwargs(dict):The data in dictionary format.
"""
if type(kwargs) is dict:
molrepo = Molrepo(**kwargs)
else:
raise Exception("Input data for add method is not a dictionary")
Molrepo.__log.debug(molrepo)
session = get_session()
session.add(molrepo)
session.commit()
session.close()
[docs] @staticmethod
def get(name=None):
"""Get molrepos associated to the given name.
Args:
name(str):The molrepo name, e.g "chebi"
"""
params = {}
if name is not None:
params["molrepo_name"] = name
session = get_session()
if len(params) == 0:
query = session.query(Molrepo)
else:
query = session.query(Molrepo).filter_by(**params)
res = query.all()
session.close()
return res
@cached_property
def molecules(self):
"""Fetch molecules for Molrepo."""
params = {}
params["molrepo_name"] = self.molrepo_name
session = get_session()
query = session.query(MolrepoHasMolecule).filter_by(**params)
res = query.all()
session.close()
return res
[docs] @staticmethod
def to_csv(staticmethod, filename):
"""Write molecules InChI-Key, source_id, InChI and SMILES to CSV file.
Args:
filename(str): Path to a CSV file.
"""
import pandas as pd
molecules = Molrepo.get_by_molrepo_name(molrepo_name)
df = pd.DataFrame(molecules, columns=['molrepo', 'source_id',
'SMILES', 'InChIKey', 'InChI'])
df.dropna(inplace=True)
df.sort_values('InChIKey', inplace=True)
df[['InChIKey', 'source_id', 'SMILES', 'InChI']].to_csv(
filename, index=False)
[docs] @staticmethod
def from_csv(filename):
"""Add entries from CSV file.
Args:
filename(str): Path to a CSV file.
"""
import pandas as pd
df = pd.read_csv(filename)
# The boolean columns must be changed to boolean values otherwise
# SQLalchmy passes strings
df.universe = df.universe.apply(lambda x: False if x == 'f' else True)
df.essential = df.essential.apply(
lambda x: False if x == 'f' else True)
# check columns
needed_cols = Molrepo._table_attributes()
if needed_cols != list(df.columns):
raise Exception("Input missing columns: %s", ' '.join(needed_cols))
# add them
for row_nr, row in df.iterrows():
try:
Molrepo.add(row.dropna().to_dict())
except Exception as err:
Molrepo.__log.error(
"Error in line %s: %s", row_nr, str(err))
[docs] @staticmethod
def get_universe_molrepos():
"""Get Molrepo names that are considered universe."""
session = get_session()
query = session.query(Molrepo.molrepo_name).filter(
(Molrepo.universe)).distinct(
Molrepo.molrepo_name)
res = query.all()
session.close()
return res
[docs] @staticmethod
def get_by_molrepo_name(molrepo_name, only_raw=False):
"""Get Molrepo entries associated to the given name.
Args:
molrepo_name(str): The molrepo_name to search for.
only_raw(bool): Only get the raw values without the whole object
(default:false)
"""
session = get_session()
query = session.query(
MolrepoHasMolecule.molrepo_name,
MolrepoHasMolecule.src_id,
MolrepoHasMolecule.smiles,
MolrepoHasMolecule.inchikey,
Molecule.inchi
).outerjoin(
Molecule,
Molecule.inchikey == MolrepoHasMolecule.inchikey
).filter(
MolrepoHasMolecule.molrepo_name == molrepo_name)
if only_raw:
res = query.with_entities(
MolrepoHasMolecule.molrepo_name,
MolrepoHasMolecule.src_id,
MolrepoHasMolecule.smiles,
MolrepoHasMolecule.inchikey,
Molecule.inchi).all()
else:
res = query.all()
session.close()
return res
[docs] @staticmethod
def get_fields_by_molrepo_name(molrepo_name, fields=None):
"""Get specified column fields.
Get specified column fields from a molrepo_name in raw format (tuples)
Args:
molrepo_name(str): The molrepo_name to search for.
fields(list): List of field names. If None, all fields.
"""
if fields is None:
return Molrepo.get_by_molrepo_name(molrepo_name, True)
cols = MolrepoHasMolecule._table_attributes()
query_fields = []
for field in fields:
if field in cols or field == "inchi":
if field == "inchi":
query_fields.append("Molecule." + field)
else:
query_fields.append("MolrepoHasMolecule." + field)
if len(query_fields) == 0:
return None
session = get_session()
query = session.query(MolrepoHasMolecule).outerjoin(
Molecule, Molecule.inchikey == MolrepoHasMolecule.inchikey).filter(
MolrepoHasMolecule.molrepo_name ==
molrepo_name, MolrepoHasMolecule.inchikey.isnot(None))
res = query.with_entities(*[eval(f) for f in query_fields]).all()
session.close()
return res
[docs] @staticmethod
def count(molrepo_name=None):
"""Get Molrepo entries associated to the given source name.
Args:
molrepo_name(str): The source name from `Datasource.molrepo_name`
"""
session = get_session()
if molrepo_name:
query = session.query(MolrepoHasMolecule).filter_by(
molrepo_name=molrepo_name).count()
else:
query = session.query(MolrepoHasMolecule).count()
return int(query)
[docs] @staticmethod
def from_molrepo_name(molrepo_name):
"""Fill Molrepo table from a molrepo name.
Args:
molrepo_name(str): a molrepo name.
"""
molrepo = Molrepo.get(molrepo_name)
if len(molrepo) == 0:
raise Exception(
"Molrepo name %s file not available.", molrepo_name)
map_files = {}
for ds in molrepo[0].datasources:
path = ds.data_path
if ds.filename is not None and ds.is_db is False:
path = os.path.join(path, ds.filename)
map_files[ds.datasource_name] = path
Molrepo.__log.debug("Importing Datasource %s", ds.datasource_name)
ds.download()
molrepo_parser = molrepo_name
# parser_fn yield a list of dictionaries with keys as a molrepo entry
parse_fn = Parser.parse_fn(molrepo_parser)
# profile time
t_start = time()
engine = get_engine()
with engine.begin() as conn:
for chunk in parse_fn(map_files, molrepo_name, 1000):
if len(chunk) == 0:
continue
chunk_inchi = []
chunk_molrepo = []
for data in chunk:
if data["inchikey"] is not None:
chunk_inchi.append({"inchikey": data["inchikey"],
"inchi": data["inchi"]})
del data["inchi"]
chunk_molrepo.append(data)
if len(chunk_inchi) > 0:
conn.execute(postgresql.insert(
Molecule.__table__).values(
chunk_inchi).on_conflict_do_nothing(
index_elements=[Molecule.inchikey]))
conn.execute(postgresql.insert(
MolrepoHasMolecule.__table__).values(
chunk_molrepo).on_conflict_do_nothing(
index_elements=[MolrepoHasMolecule.id]))
t_end = time()
t_delta = str(datetime.timedelta(seconds=t_end - t_start))
Molrepo.__log.info(
"Importing Molrepo Name %s took %s", molrepo_name, t_delta)
[docs] @staticmethod
def molrepo_hpc(tmpdir, only_essential=False, **kwargs):
"""Run HPC jobs importing all molrepos.
tmpdir(str): Folder (usually in scratch) where the job directory is
generated.
only_essential(bool): Only the essentail molrepos (default:false)
"""
cc_config = kwargs.get("cc_config", os.environ['CC_CONFIG'])
cfg = Config(cc_config)
job_path = tempfile.mkdtemp(prefix='jobs_molrepos_', dir=tmpdir)
# create job directory if not available
if not os.path.isdir(job_path):
os.mkdir(job_path)
# create script file
script_lines = [
"import sys, os",
"import pickle",
"from chemicalchecker.database import Datasource",
"from chemicalchecker.database import Molrepo",
"task_id = sys.argv[1]", # <TASK_ID>
"filename = sys.argv[2]", # <FILE>
"inputs = pickle.load(open(filename, 'rb'))", # load pickled data
"data = inputs[task_id]", # elements for current job
"for d in data:", # elements are indexes
" Molrepo.from_molrepo_name(d)", # start import
"print('JOB DONE')"
]
script_name = os.path.join(job_path, 'molrepo_script.py')
with open(script_name, 'w') as fh:
for line in script_lines:
fh.write(line + '\n')
# hpc parameters
molrepos_names = set()
molrepos = Molrepo.get()
for molrepo in molrepos:
if only_essential and not molrepo.essential:
continue
molrepos_names.add(molrepo.molrepo_name)
params = {}
params["num_jobs"] = len(molrepos_names)
params["jobdir"] = job_path
params["job_name"] = "CC_MOLREPO"
params["elements"] = list(molrepos_names)
params["wait"] = True
params["check_error"] = False
params["memory"] = 16
# job command
singularity_image = cfg.PATH.SINGULARITY_IMAGE
command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={}" \
" singularity exec {} python {} <TASK_ID> <FILE>"
command = command.format(
os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config,
singularity_image, script_name)
# submit jobs
cluster = HPC.from_config(cfg)
cluster.submitMultiJob(command, **params)
return cluster
[docs]@logged
class MolrepoHasMolecule(Base):
"""Molrepo-Molecule association object.
Again a Many-to-Many relationship.
This table links Molecules and Molrepos also including the external
compound identifiers (e.g. ChEMBL -> ``CHEMBL10``,
BindigDB -> ``BDBM50028883``, etc.).
Fields:
id(str): primary key, src_id + "_" + molrepo_name.
molrepo_name(str): the molrepo name.
src_id(str): the download id as in the source file.
smiles(str): simplified molecular-input line-entry system (SMILES).
inchikey(str): hashed version of the full InChI (SHA-256 algorithm).
"""
__tablename__ = 'molrepo_has_molecule'
id = Column(Text, primary_key=True)
molrepo_name = Column(Text, ForeignKey("molrepo.molrepo_name"), index=True)
src_id = Column(Text)
smiles = Column(Text) # It means the source smiles
inchikey = Column(VARCHAR(27), ForeignKey("molecule.inchikey"), index=True)
molecule = relationship("Molecule", lazy='joined')
[docs] def __repr__(self):
"""String representation."""
return str(self.inchikey)
@staticmethod
def _create_table():
engine = get_engine()
Base.metadata.create_all(engine, tables=[MolrepoHasMolecule.__table__])
@staticmethod
def _drop_table():
engine = get_engine()
MolrepoHasMolecule.__table__.drop(engine)
@staticmethod
def _table_exists():
engine = get_engine()
return sqlalchemy.inspect(engine).has_table(
MolrepoHasMolecule.__tablename__)
@staticmethod
def _table_attributes():
attrs = [a for a in class_mapper(
MolrepoHasMolecule).iterate_properties]
col_attrs = [a.key for a in attrs if isinstance(a, ColumnProperty)]
input_attrs = [a for a in col_attrs if a != 'id']
return input_attrs
[docs] @staticmethod
def get(inchikey):
"""Get Molrepo entries associated to the given inchikey.
Args:
inchikey(str): The inchikey to search for.
"""
session = get_session()
query = session.query(MolrepoHasMolecule).filter_by(inchikey=inchikey)
res = query.all()
session.close()
return res
[docs]@logged
class MolrepoHasDatasource(Base):
"""Molrepo-Datasource relationship.
Many-to-Many relationship.
"""
__tablename__ = 'molrepo_has_datasource'
molrepo_name = Column(Text, ForeignKey("molrepo.molrepo_name"),
primary_key=True)
datasource_name = Column(Text, ForeignKey("datasource.datasource_name"),
primary_key=True)
[docs] def __repr__(self):
"""String representation."""
return self.molrepo_name + " maps to " + self.datasource_name
@staticmethod
def _create_table():
engine = get_engine()
Base.metadata.create_all(
engine, tables=[MolrepoHasDatasource.__table__])
@staticmethod
def _drop_table():
engine = get_engine()
MolrepoHasDatasource.__table__.drop(engine)
@staticmethod
def _table_exists():
engine = get_engine()
return sqlalchemy.inspect(engine).has_table(
MolrepoHasDatasource.__tablename__)
@staticmethod
def _table_attributes():
attrs = [a for a in class_mapper(
MolrepoHasDatasource).iterate_properties]
col_attrs = [a.key for a in attrs if isinstance(a, ColumnProperty)]
input_attrs = [a for a in col_attrs if a != 'id']
return input_attrs
[docs] @staticmethod
def add(kwargs):
"""Add a new row to the table.
Args:
kwargs(dict):The data in dictionary format.
"""
if type(kwargs) is dict:
entry = MolrepoHasDatasource(**kwargs)
MolrepoHasDatasource.__log.debug(entry)
session = get_session()
session.add(entry)
session.commit()
session.close()
[docs] @staticmethod
def from_csv(filename):
"""Add entries from CSV file.
Args:
filename(str): Path to a CSV file.
"""
import pandas as pd
df = pd.read_csv(filename)
# check columns
needed_cols = MolrepoHasDatasource._table_attributes()
if needed_cols != list(df.columns):
raise Exception("Input missing columns: %s", ' '.join(needed_cols))
# add them
for row_nr, row in df.iterrows():
try:
MolrepoHasDatasource.add(row.dropna().to_dict())
except Exception as err:
MolrepoHasDatasource.__log.error(
"Error in line %s: %s", row_nr, str(err))