Source code for chemicalchecker.database.datasource

"""Datasource definition.

A Datasource is a source of raw data. Typically comes in form of an url to an
external resource. This class offer a mean to standardize raw data collection
using the :mod:`~chemicalchecker.util.download` class and interfacing them
with the Dataset table.
"""
import os
import sqlalchemy
from sqlalchemy import Column, Text, Boolean
from sqlalchemy.orm import class_mapper, ColumnProperty, relationship

from .database import Base, get_session, get_engine

from chemicalchecker.util import logged, Config
from chemicalchecker.util.download import Downloader
from chemicalchecker.util.hpc import HPC


[docs]@logged class Datasource(Base): """Datasource table class. Parameters: name(str): primary key, simple unique name for the Datasource. url(str): the download link. user(str): few downloads require credentials. password(str): few downloads require credentials. description(str): free text description of the resource. filename(str): optional, a `molrepo` name. NB this name is the value of `Molrepo.molrepo_name` also is defininf the `Parser` that will be used. calcdata(bool): the datasource is actually from one of the calculated data. """ __tablename__ = 'datasource' datasource_name = Column(Text, primary_key=True) description = Column(Text) is_db = Column(Boolean) url = Column(Text) username = Column(Text) password = Column(Text) filename = Column(Text) calcdata = Column(Boolean) datasets = relationship("Dataset", secondary="dataset_has_datasource", back_populates="datasources", lazy='joined') molrepos = relationship("Molrepo", secondary="molrepo_has_datasource", back_populates="datasources", lazy='joined')
[docs] def __repr__(self): """String representation.""" return self.datasource_name
@staticmethod def _create_table(): engine = get_engine() Base.metadata.create_all(engine, tables=[Datasource.__table__]) @staticmethod def _drop_table(): engine = get_engine() Datasource.__table__.drop(engine) @staticmethod def _table_exists(): engine = get_engine() return sqlalchemy.inspect(engine).has_table(Datasource.__tablename__) @staticmethod def _table_attributes(): attrs = [a for a in class_mapper(Datasource).iterate_properties] col_attrs = [a.key for a in attrs if isinstance(a, ColumnProperty)] input_attrs = [a for a in col_attrs] return input_attrs
[docs] @staticmethod def add(kwargs): """Add a new row to the table. Args: kwargs(dict):The data in dictionary format. """ if type(kwargs) is dict: datasource = Datasource(**kwargs) Datasource.__log.debug(datasource) session = get_session() session.add(datasource) session.commit() session.close()
[docs] @staticmethod def from_csv(filename): """Add entries from CSV file. Args: filename(str): Path to a CSV file. """ import pandas as pd df = pd.read_csv(filename) # NS: the last column has to be changed to boolean values otherwise # SQLalchmy passes strings df.calcdata = df.calcdata.apply(lambda x: False if x == 'f' else True) df.is_db = df.is_db.apply(lambda x: False if x == 'f' else True) # check columns needed_cols = Datasource._table_attributes() if needed_cols != list(df.columns): raise Exception("Input missing columns: %s", ' '.join(needed_cols)) # add them for row_nr, row in df.iterrows(): try: Datasource.add(row.dropna().to_dict()) except Exception as err: Datasource.__log.error( "Error in line %s: %s", row_nr, str(err))
[docs] @staticmethod def get(name=None): """Get Datasources associated to the given dataset. Args: name(str):The Datasource name, e.g "chebi" """ params = {} if name is not None: params["datasource_name"] = name session = get_session() if len(params) == 0: query = session.query(Datasource).distinct( Datasource.datasource_name) else: query = session.query(Datasource).filter_by(**params) res = query.all() session.close() return res
[docs] @staticmethod def test_all_valid_url(): """Check if all Datasources urls are valid.""" testable_ds = [ds for ds in Datasource.get() if not ds.username] return all([ds.valid_url for ds in testable_ds])
[docs] @staticmethod def test_all_downloaded(only_essential=False): """Check if all Datasources have been downloaded. Args: only_essential(bool): Check only datasources that are essential """ if only_essential: datasources = set() for ds in Datasource.get(): for dset in ds.datasets: if dset.essential: datasources.add(ds) break for molrepo in ds.molrepos: if molrepo.essential: datasources.add(ds) break return all([ds.available for ds in datasources]) else: return all([ds.available for ds in Datasource.get()])
@property def data_path(self): """Check if Datasource is available.""" return os.path.join(Config().PATH.CC_DATA, self.datasource_name) @property def available(self): """Check if Datasource is available.""" if os.path.isdir(self.data_path): self.__log.info("%s AVAILABLE", self) return True else: self.__log.warning("%s NOT AVAILABLE", self) return False @property def valid_url(self): """Check if Datasource url is valid.""" try: Downloader.validate_url(self.url) self.__log.info("%s AVAILABLE", self) return True except Exception as err: self.__log.warning("%s FAILED %s", self, str(err)) return False
[docs] def download(self, force=False): """Download the Datasource. force(bool): Force download overwriting previous download. """ # check if already downloaded if not force and self.available: self.__log.warning("Datasource available, skipping download.") return # create download string if self.username and self.password: protocol, address = self.url.split('//') url = "{}//{}:{}@{}".format(protocol, self.username.replace('@', '%40'), self.password, address) else: url = self.url # call the downloader if self.is_db: dbname = self.datasource_name else: dbname = None down = Downloader(url, self.data_path, dbname=dbname, file=self.filename) down.download()
[docs] @staticmethod def download_hpc(job_path, only_essential=False, **kwargs): #Error: Tuple doesn't have get atttribute #def download_hpc(job_path, only_essential=False, *kwargs): """Run HPC jobs downloading the resources. Args: job_path(str): Path (usually in scratch) where the script files are generated. only_essential(bool):Download only the essential datasources (default: False). """ cc_config = kwargs.get("cc_config", os.environ['CC_CONFIG']) cfg = Config(cc_config) # create job directory if not available if not os.path.isdir(job_path): os.mkdir(job_path) # create script file script_lines = [ "import sys, os", "import pickle", "from chemicalchecker.database import Datasource", "task_id = sys.argv[1]", # <TASK_ID> "filename = sys.argv[2]", # <FILE> "inputs = pickle.load(open(filename, 'rb'))", # load pickled data "data = inputs[task_id]", # elements for current job "for d in data:", # elements are indexes " ds = Datasource.get(name=d)", # query the db " ds[0].download()", # start download "print('JOB DONE')" ] script_name = os.path.join(job_path, 'download_script.py') with open(script_name, 'w') as fh: for line in script_lines: fh.write(line + '\n') # hpc parameters if only_essential: all_datasources = set() for ds in Datasource.get(): for dset in ds.datasets: if dset.essential: all_datasources.add(ds) break for molrepo in ds.molrepos: if molrepo.essential: all_datasources.add(ds) break else: all_datasources = Datasource.get() ds_names = [] for ds in all_datasources: ds_names.append(ds.datasource_name) params = {} params["num_jobs"] = len(ds_names) params["jobdir"] = job_path params["job_name"] = "CC_DOWNLOAD" params["elements"] = ds_names params["wait"] = True # job command singularity_image = cfg.PATH.SINGULARITY_IMAGE command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={}" +\ " singularity exec {} python {} <TASK_ID> <FILE>" command = command.format( os.path.join(cfg.PATH.CC_REPO, 'package'), cc_config, singularity_image, script_name) # submit jobs cluster = HPC.from_config(cfg) cluster.submitMultiJob(command, **params) return cluster