import os
import sys
import subprocess
import logging
import uuid
import getpass
from datetime import date
import json
from cellmaps_utils import constants
from cellmaps_utils.exceptions import CellMapsProvenanceError
logger = logging.getLogger(__name__)
[docs]
class ROCrateProvenanceAttributes(object):
"""
Wrapper object to hold subset of
`RO-Crate <https://www.researchobject.org/ro-crate/>`__ provenance attributes
"""
def __init__(self, name='Please enter a name',
organization_name='Please enter an organization',
project_name='Please enter a project',
description='Please enter a description',
keywords=['']):
"""
Constructor
:param name: name for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:type name: str
:param organization_name: what lab or group
:type organization_name: str
:param project_name: usually funding source
:type project_name: str
:param description: describes `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:type description: str
:param keywords: keywords to identify `RO-Crate <https://www.researchobject.org/ro-crate/>`__
usually set with these values
in order:
`project, data_release_name, cell_line,
treatment, name_of_computation`
:type keywords: list
"""
self._name = name
self._organization_name = organization_name
self._project_name = project_name
self._description = description
self._keywords = keywords
[docs]
def get_name(self):
"""
Gets name for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:return: name for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:rtype: str
"""
return self._name
[docs]
def get_organization_name(self):
"""
Gets organization name for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:return: organization name
:rtype: str
"""
return self._organization_name
[docs]
def get_project_name(self):
"""
Gets project name for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:return: project name
:rtype: str
"""
return self._project_name
[docs]
def get_description(self):
"""
Gets description for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:return: description
:rtype: str
"""
return self._description
[docs]
def get_keywords(self):
"""
Gets keywords for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:return: keywords
:rtype: list
"""
return self._keywords
[docs]
class ProvenanceUtil(object):
"""
Wrapper around `FAIRSCAPE-cli <https://github.com/fairscape/fairscape-cli>`__ calls
"""
def __init__(self, fairscape_binary='fairscape-cli',
default_date_format_str='%Y-%m-%d', raise_on_error=False):
"""
Constructor
:param fairscape_binary: `FAIRSCAPE <https://github.com/fairscape/fairscape-cli>`__ command line binary
If no path separators are included in this value
(for example no ``/`` on Linux|mac) this code assumes the full
path to the binary is the same directory where the python
binary executing this script resides. To bypass this
set the value to a full path with ex: ``/tmp/foo.py``
:type fairscape_binary: str
:param default_date_format_str: Default date format string
:type default_date_format_str: str
:param raise_on_error: Flag to determine if exceptions should be raised on errors
:type raise_on_error: bool
"""
self._python = sys.executable
if os.sep not in fairscape_binary:
self._binary = os.path.join(os.path.dirname(self._python),
fairscape_binary)
else:
self._binary = fairscape_binary
self._default_date_fmt_str = default_date_format_str
self._raise_on_error = raise_on_error
@staticmethod
def _log_fairscape_error(cmd, exit_code, err,
reason='non zero exit code', cwd=None):
"""
Logs fairscape error to provenance_errors.json file
:param cmd:
:type cmd: list
:param exit_code:
:type exit_code: int
:param err: error message encoded
:type err: str
:param reason:
:type reason: str
:param cwd:
:type cwd: str
"""
logger.error('Error occurred, but not raising exception due to '
'raise_on_error flag set to False')
err = str(err).strip()
log_entry = {
"cmd": cmd,
"exit_code": exit_code,
"reason": reason + " : " + err
}
if cwd is None:
log_file = os.path.join(os.getcwd(), constants.PROVENANCE_ERRORS_FILE)
else:
log_file = os.path.join(cwd, constants.PROVENANCE_ERRORS_FILE)
try:
if not os.path.exists(log_file):
with open(log_file, 'w') as file:
json.dump([log_entry], file, indent=4)
else:
with open(log_file, 'r+') as file:
data = json.load(file)
data.append(log_entry)
file.seek(0)
json.dump(data, file, indent=4)
except Exception as e:
logger.error('Failed to log provenance error: ' + str(e))
logger.error('Provenance call failed: ' + json.dumps(log_entry))
def _run_cmd(self, cmd, cwd=None, timeout=360):
"""
Runs command as a command line process
:param cmd: command to run
:type cmd: list
:param cwd: current working directory
:type cwd: str
:param timeout: timeout in seconds before killing process
:type timeout: int or float
:raises CellMapsProvenanceError: If **raise_on_error** passed
into constructor is ``True`` and
process times out before completing
:return: (return code, standard out, standard error)
:rtype: tuple
"""
logger.debug('Running command under ' + str(cwd) +
' path: ' + str(cmd))
p = subprocess.Popen(cmd, cwd=cwd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
out, err = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
logger.warning('Timeout reached. Killing process')
p.kill()
out, err = p.communicate()
if self._raise_on_error:
raise CellMapsProvenanceError('Process timed out. '
'exit code: ' +
str(p.returncode) +
' stdout: ' + str(out) +
' stderr: ' + str(err))
self._log_fairscape_error(cmd, p.returncode, err, cwd=cwd,
reason='Process timed out.')
else:
if not self._raise_on_error and p.returncode != 0:
self._log_fairscape_error(cmd, p.returncode, err, cwd=cwd)
# Removing ending new line if value is not None
if out is not None:
out = out.rstrip()
return p.returncode, out, err
def _get_keywords(self, keywords=None):
"""
Adds keywords to command
:param keywords:
:type keywords: list or str
:raises CellMapsProvenanceError: if **keywords** is not of type
str or list
:return: keywords commandline flags
:rtype: list
"""
if keywords is None:
return []
if isinstance(keywords, str):
return ['--keywords', keywords]
if isinstance(keywords, list):
retlist = []
for k in keywords:
retlist.extend(['--keywords', k])
return retlist
raise CellMapsProvenanceError('Keywords must be a list or a '
'str, but got: ' + str(type(keywords)))
def _generate_guid(self, data_type=None,
rocrate_path=None):
"""
Gets unique id by appending a UUID to **data_type** and basename of **rocrate_path**
:param data_type:
:type data_type: str
:param rocrate_path: Path to ro-crate meta data file
:type rocrate_path: dict
:return: unique id with data type and path information appended
:rtype: str
"""
if data_type is not None:
data_type_val = data_type
else:
data_type_val = ''
return str(uuid.uuid4()) + ':' + str(data_type_val) + '::' + os.path.basename(str(rocrate_path))
[docs]
@staticmethod
def example_dataset_provenance(requiredonly=True, with_ids=False):
"""
Returns example provenance dataset dict
:param requiredonly: If ``True`` only output required fields,
otherwise output all fields. This is ignored
if **with_ids** parameter is ``True``
:type requiredonly: bool
:param with_ids: If ``True`` ignore **requiredonly** and just output
dict where caller has dataset id
:type with_ids: bool
:return: Example provenance dictionary
:rtype: dict
"""
if with_ids is not None and with_ids is True:
return {'guid': 'ID of dataset'}
field_dict = {'name': 'Name of dataset',
'author': 'Author of dataset',
'version': 'Version of dataset',
'date-published': 'Date dataset was published',
'description': 'Description of dataset',
'data-format': 'Format of data'}
if requiredonly is None or requiredonly is False:
field_dict.update({'url': 'URL of datset',
'used-by': '?',
'derived-from': '?',
'associated-publication': '?',
'additional-documentation': '?'})
return field_dict
[docs]
def get_login(self):
"""
Attempts to get login of user
:return: login of user or empty string if unable to obtain
:rtype: str
"""
try:
return getpass.getuser()
except Exception as e:
logger.error('Unable to get login for user: ' + str(e))
return ''
[docs]
def get_rocrate_as_dict(self, rocrate_path):
"""
Loads `RO-Crate <https://www.researchobject.org/ro-crate/>`__ as a dict
:param rocrate_path: Directory containing `ro-crate-metadata.json` file or
path to file assumed to be ro-crate meta data file
:type rocrate_path: str
:raises CellMapsProvenanceError: If **rocrate_path** is ``None`` or
if **raise_on_error** passed
into constructor is ``True`` and
there is an issue parsing the
ro-crate meta data file
:return: `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:rtype: dict
"""
if rocrate_path is None:
raise CellMapsProvenanceError('rocrate_path is None')
if os.path.isdir(rocrate_path):
rocrate_file = os.path.join(rocrate_path, constants.RO_CRATE_METADATA_FILE)
else:
rocrate_file = rocrate_path
try:
with open(rocrate_file, 'r') as f:
data = json.load(f)
return data
except Exception as e:
if self._raise_on_error:
raise CellMapsProvenanceError('Error parsing ' + str(rocrate_file) +
' ' + str(e))
return {'@id': None, 'name': '', 'description': '', 'keywords': [''],
'isPartOf': [{"@type": "Organization", "name": ""}, {"@type": "Project", "name": ""}]}
[docs]
def get_id_of_rocrate(self, rocrate):
"""
Gets id of `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:param rocrate: `RO-Crate <https://www.researchobject.org/ro-crate/>`__ :py:class:`dict` or directory containing
`ro-crate-metadata.json` file or
path to file assumed to be `RO-Crate <https://www.researchobject.org/ro-crate/>`__ meta data
file
:type rocrate: str or dict
:return:
"""
if isinstance(rocrate, dict):
data = rocrate
else:
data = self.get_rocrate_as_dict(rocrate)
return data['@id']
[docs]
def get_name_project_org_of_rocrate(self, rocrate):
"""
Gets name, project, and organization name of `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:param rocrate: `RO-Crate <https://www.researchobject.org/ro-crate/>`__ :py:class:`dict` or directory containing
`ro-crate-metadata.json` file or
path to file assumed to be `RO-Crate <https://www.researchobject.org/ro-crate/>`__ meta data
file
:type rocrate: str or dict
:return: (name, project, organization-name)
:rtype: tuple
"""
prov_attrs = self.get_rocrate_provenance_attributes(rocrate)
return prov_attrs.get_name(), prov_attrs.get_project_name(), prov_attrs.get_organization_name()
[docs]
def get_rocrate_provenance_attributes(self, rocrate):
"""
Gets provenance attributes for an `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:param rocrate: `RO-Crate <https://www.researchobject.org/ro-crate/>`__ :py:class:`dict` or directory containing
`ro-crate-metadata.json` file or
path to file assumed to be `RO-Crate <https://www.researchobject.org/ro-crate/>`__ metadata
file
:type rocrate: str or dict
:return:
:rtype: :py:class:`~cellmaps_utils.provenance.ROCrateProvenanceAttributes`
"""
if isinstance(rocrate, dict):
data = rocrate
else:
data = self.get_rocrate_as_dict(rocrate)
name = data['name']
description = data['description']
keywords = data['keywords']
org_name = None
proj_name = None
for entry in data['isPartOf']:
if '@type' in entry:
if entry['@type'] == 'Organization':
org_name = entry['name']
elif entry['@type'] == 'Project':
proj_name = entry['name']
return ROCrateProvenanceAttributes(name=name, project_name=proj_name,
organization_name=org_name,
description=description,
keywords=keywords)
[docs]
def get_merged_rocrate_provenance_attrs(self, rocrate=None,
override_name=None,
override_project_name=None,
override_organization_name=None,
extra_keywords=None,
keywords_to_preserve=6,
merged_delimiter='|'):
"""
Creates a merged provenance attributes object when given
one or more `RO-Crates <https://www.researchobject.org/ro-crate/>`__.
It does this by the following rules:
Values for *name*, *project_name*, and *organization_name*
are put into respective sets for uniqueness sorted alphabetically
and joined together using value of **merged_delimiter**
If **override_name, override_project,** or **override_organization**
is not ``None`` then those values will be used in leiu of the
merged data mentioned earlier.
For *keywords*, the first **keywords_to_preserve** elements are put into respective
sets for uniqueness and joined together using value of **merge_delimiter**
and put back into a list. Any extra entries in **extra_keywords** is appended
to this list.
The *description* is a merging of the keywords list with a space delimiter
:param rocrate: :py:class:`dict` or directory containing
`ro-crate-metadata.json` file or
path to file assumed to be `RO-Crate <https://www.researchobject.org/ro-crate/>`__
meta data
file or a list of either of the previously
mentioned items
:type rocrate: str or dict or list
:param override_name: If not ``None``, overrides name returned
:type override_name: str
:param override_project_name: If not ``None``, overrides project name returned
:type override_project_name: str
:param override_organization_name: If not ``None``, overrides organization-name
:type override_organization_name: str
:param extra_keywords: Any extra keywords to append
:type extra_keywords: list or str
:param keywords_to_preserve: Denotes number of keywords to preserve. A value of 5
means keep the 1st 5. ``None`` means preserve all keywords
:type keywords_to_preserve: int
:param merged_delimiter: default is '|'
:type merged_delimiter: str
:raises CellMapsProvenanceError: If **rocrate**, **extra_keywords**
:return: Merged rocrate provenance attributes
:rtype: :py:class:`~cellmaps_utils.provenance.ROCrateProvenanceAttributes`
"""
name_set = set()
proj_set = set()
org_set = set()
keyword_set_dict = {}
new_keywords = []
if rocrate is None:
raise CellMapsProvenanceError('rocrate is None')
if isinstance(rocrate, str) or isinstance(rocrate, dict):
rocrate_list = [rocrate]
elif isinstance(rocrate, list):
if len(rocrate) == 0:
raise CellMapsProvenanceError('No rocrates in list')
rocrate_list = rocrate
else:
raise CellMapsProvenanceError('rocrate must be type str, list or dict, received: ' + str(type(rocrate)))
for entry in rocrate_list:
prov_attrs = self.get_rocrate_provenance_attributes(entry)
if prov_attrs.get_name() is not None:
name_set.add(prov_attrs.get_name())
else:
logger.error(f'The name for RO-Crate {str(rocrate)} is missing from the metadata. Please '
f'provide a name to uphold FAIR principles. Execution will proceed without the name.')
if prov_attrs.get_project_name() is not None:
proj_set.add(prov_attrs.get_project_name())
else:
logger.error(f'The project name for RO-Crate {str(rocrate)} is missing from the metadata. Please '
f'provide a name to uphold FAIR principles. Execution will proceed without the name.')
if prov_attrs.get_organization_name() is not None:
org_set.add(prov_attrs.get_organization_name())
else:
logger.error(f'The organization name for RO-Crate {str(rocrate)} is missing from the metadata. Please '
f'provide a name to uphold FAIR principles. Execution will proceed without the name.')
if prov_attrs.get_keywords() is not None:
for index in range(len(prov_attrs.get_keywords())):
if index not in keyword_set_dict:
keyword_set_dict[index] = set()
if prov_attrs.get_keywords()[index] is not None:
keyword_set_dict[index].add(prov_attrs.get_keywords()[index])
logger.debug('keyword_set_dict: ' + str(keyword_set_dict))
if override_name is None:
new_name = merged_delimiter.join(sorted(list(name_set)))
else:
new_name = override_name
if override_organization_name is None:
new_organization_name = merged_delimiter.join(sorted(list(org_set)))
else:
new_organization_name = override_organization_name
if override_project_name is None:
new_project_name = merged_delimiter.join(sorted(list(proj_set)))
else:
new_project_name = override_project_name
# just grab 1st **keywords_to_preserve** elements assuming they are
# project, data_release_name, cell line, treatment,
# name_of_computation
if keywords_to_preserve is not None and len(keyword_set_dict.keys()) >= keywords_to_preserve:
for index in range(keywords_to_preserve):
new_keywords.append(merged_delimiter.join(sorted(list(keyword_set_dict[index]))))
else:
for index in range(len(keyword_set_dict.keys())):
new_keywords.append(merged_delimiter.join(sorted(list(keyword_set_dict[index]))))
# add names to keywords
if name_set is not None and len(name_set) > 0:
new_keywords.extend(list(name_set))
if extra_keywords is not None:
if isinstance(extra_keywords, str):
new_keywords.append(extra_keywords)
elif isinstance(extra_keywords, list):
new_keywords.extend(extra_keywords)
else:
raise CellMapsProvenanceError('extra_keywords must be of '
'type list or str. '
'Received: ' +
str(type(extra_keywords)))
new_description = ' '.join(new_keywords)
split_keywords = set()
for keyword in new_keywords:
if merged_delimiter in keyword:
split_keywords.update(keyword.split(merged_delimiter))
new_keywords.extend(list(split_keywords))
return ROCrateProvenanceAttributes(name=new_name, project_name=new_project_name,
organization_name=new_organization_name,
description=new_description,
keywords=new_keywords)
[docs]
def register_rocrate(self, rocrate_path, name='',
organization_name='', project_name='',
description='Please enter a description',
keywords=[''],
guid=None,
timeout=30):
"""
Creates/registers `RO-Crate <https://www.researchobject.org/ro-crate/>`__
in directory specified by **rocrate_path**
Upon completion a ``ro-crate-metadata.json``
file will be created in the directory
:param rocrate_path:
:type rocrate_path: str
:param name: Name for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:type name: str
:param organization_name: Name of organization
:type organization_name: str
:param project_name: Name of project
:type project_name: str
:param description: Description
:type description: str
:param keywords: keywords
:type keywords: list
:param guid: ID for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:type guid: str
:param timeout: Time in seconds to wait for registration of `RO-Crate <https://www.researchobject.org/ro-crate/>`__
to complete
:type timeout: float
"""
cmd = [self._python, self._binary, 'rocrate', 'init',
'--name', name,
'--organization-name', organization_name,
'--project-name', project_name,
'--description', description]
cmd.extend(self._get_keywords(keywords=keywords))
if guid is None:
guid = self._generate_guid(data_type='rocrate',
rocrate_path=rocrate_path)
cmd.append('--guid')
cmd.append(guid)
try:
exit_code, out_str, err_str = self._run_cmd(cmd, cwd=rocrate_path,
timeout=timeout)
logger.debug('creation of crate stdout: ' + str(out_str))
logger.debug('creation of crate stdout: ' + str(err_str))
logger.debug('creation of crate exit code: ' + str(exit_code))
if exit_code != 0 and self._raise_on_error:
raise CellMapsProvenanceError('Error creating crate: ' +
str(out_str) + ' : ' + str(err_str))
except CellMapsProvenanceError as ce:
raise ce
except Exception as e:
raise CellMapsProvenanceError('Caught Exception: ' + str(e))
[docs]
def register_computation(self, rocrate_path, name='',
run_by='', command='',
date_created=None,
description='Must be at least 10 characters', used_software=[],
used_dataset=[], generated=[],
keywords=[''],
guid=None,
timeout=60):
"""
Registers computation adding information to
``ro-crate-metadata.json`` file stored in **rocrate_path**
directory.
:param rocrate_path: Path to existing `RO-Crate <https://www.researchobject.org/ro-crate/>`__
directory
:type rocrate_path: str
:param name:
:type name: str
:param run_by:
:type run_by: str
:param command:
:type command: str
:param date_created:
:type date_created:
:param description:
:type description: str
:param used_software: list of `FAIRSCAPE <https://fairscape.github.io>`__ software ids
:type used_software: list
:param used_dataset: list of `FAIRSCAPE <https://fairscape.github.io>`__ dataset ids used
by this computation
:type used_dataset: list
:param generated: list of `FAIRSCAPE <https://fairscape.github.io>`__ dataset ids for datasets
generated by this computation
:type generated: list
:param keywords:
:type keywords: list
:param guid: ID for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:type guid: str
:param timeout: Time in seconds to wait for registration of computation to complete
:type timeout: float
"""
if date_created is None:
date_created = date.today().strftime(self._default_date_fmt_str)
cmd = [self._python, self._binary, 'rocrate', 'register',
'computation',
'--name', name,
'--run-by', run_by,
'--date-created', date_created,
'--command', command,
'--description', description]
cmd.extend(self._get_keywords(keywords=keywords))
if guid is None:
guid = self._generate_guid(data_type='computation',
rocrate_path=rocrate_path)
cmd.append('--guid')
cmd.append(guid)
if used_software is not None:
for entry in used_software:
cmd.append('--used-software')
cmd.append(entry)
if used_dataset is not None:
for entry in used_dataset:
if entry is not None:
cmd.append('--used-dataset')
cmd.append(entry)
if generated is not None:
for entry in generated:
cmd.append('--generated')
cmd.append(entry)
cmd.append(rocrate_path)
exit_code, out_str, err_str = self._run_cmd(cmd, cwd=rocrate_path,
timeout=timeout)
logger.debug('add dataset exit code: ' + str(exit_code))
if exit_code != 0 and self._raise_on_error:
raise CellMapsProvenanceError('Error adding dataset:\n' +
str(cmd) + '\nStandard out:\n' +
str(out_str) + '\n Standard err:\n' +
str(err_str))
logger.debug('add data set out_str: ' + str(out_str))
logger.debug('add data set err_str: ' + str(err_str))
return out_str
[docs]
def register_software(self, rocrate_path, name='unknown',
description='Must be at least 10 characters',
author='', version='', file_format='', url='',
date_modified=None,
keywords=[''],
guid=None,
timeout=30):
"""
Registers software by adding information to
``ro-crate-metadata.json`` file stored in **rocrate_path**
directory.
:param name: Name of software
:type name: str
:param description: Description of software
:type description: str
:param author: Author(s) of software
:type author: str
:param version: Version of software
:type version: str
:param file_format: Format of software file(s)
:type file_format: str
:param url: URL to repository for software
:type url: str
:param date_modified:
:type date_modified: str
:param keywords:
:type keywords: list
:param guid: ID for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:type guid: str
:param rocrate_path: Path to directory with registered rocrate
:type rocrate_path: str
:param timeout: Time in seconds to wait for registration of ro-crate to complete
:type timeout: float
:raises CellMapsProvenanceError: If `FAIRSCAPE <https://fairscape.github.io>`__ call fails
:return: guid of software from `FAIRSCAPE <https://fairscape.github.io>`__
:rtype: str
"""
if date_modified is None:
date_modified = date.today().strftime(self._default_date_fmt_str)
cmd = [self._python, self._binary, 'rocrate', 'register',
'software',
'--name', name,
'--description', description,
'--author', author,
'--version', version,
'--file-format', file_format,
'--url', url,
'--date-modified', date_modified]
cmd.extend(self._get_keywords(keywords=keywords))
if guid is None:
guid = self._generate_guid(data_type='software',
rocrate_path=rocrate_path)
cmd.append('--guid')
cmd.append(guid)
cmd.append('--filepath')
cmd.append(url)
cmd.append(rocrate_path)
exit_code, out_str, err_str = self._run_cmd(cmd, cwd=rocrate_path,
timeout=timeout)
logger.debug('add software exit code: ' + str(exit_code))
if exit_code != 0 and self._raise_on_error:
raise CellMapsProvenanceError('Error adding software: ' +
str(out_str) + ' : ' + str(err_str))
logger.debug('add software out_str: ' + str(out_str))
logger.debug('add software err_str: ' + str(err_str))
return out_str
[docs]
def register_dataset(self, rocrate_path, data_dict=None,
source_file=None, skip_copy=True,
guid=None, timeout=30):
"""
Adds a dataset to existing rocrate specified by **rocrate_path**
by adding information to ``ro-crate-metadata.json`` file
Information about dataset should be specified in the **data_dict**
dict passed in.
Expected format of **data_dict**:
.. code-block::
{'name': 'Name of dataset',
'author': 'Author of dataset',
'version': 'Version of dataset',
'url': 'Url of dataset (optional)',
'date-published': 'Date dataset was published MM-DD-YYYY',
'description': 'Description of dataset',
'data-format': 'Format of data',
'schema': Path or URL to schema file in JSON format
'keywords': ['keyword1','keyword2']}
.. versionchanged:: 0.2.0
Added support for ``schema`` in **data_dict** passed in
:param rocrate_path: Path to directory with registered rocrate
:type rocrate_path: str
:param data_dict: Information about dataset to add. See above
for expected data
:type data_dict: dict
:param source_file: Path to source file of dataset
:type source_file: str
:param skip_copy: If ``True`` skip the copy of source file into
**crate_path**. Use this when source file already
resides in **crate_path**
:param guid: ID for `RO-Crate <https://www.researchobject.org/ro-crate/>`__
:type guid: str
:param timeout: Time in seconds to wait for registration of dataset to complete
:type timeout: float
:return: id of dataset from `FAIRSCAPE <https://fairscape.github.io>`__
:rtype: str
"""
operation_name = 'register'
if skip_copy is not None and skip_copy is False:
operation_name = 'add'
cmd = [self._python, self._binary, 'rocrate', operation_name,
'dataset',
'--name', data_dict['name'],
'--version', data_dict['version'],
'--data-format', data_dict['data-format'],
'--description', data_dict['description'],
'--date-published', data_dict['date-published'],
'--author', data_dict['author']]
if 'url' in data_dict:
cmd.extend(['--url', data_dict['url']])
if 'keywords' not in data_dict:
cmd.extend(self._get_keywords(keywords=''))
else:
cmd.extend(self._get_keywords(keywords=data_dict['keywords']))
if 'schema' in data_dict:
cmd.extend(['--schema', data_dict['schema']])
if guid is None:
guid = self._generate_guid(data_type='dataset',
rocrate_path=rocrate_path)
cmd.append('--guid')
cmd.append(guid)
if skip_copy is not None and skip_copy is False:
cmd.append('--source-filepath')
cmd.append(source_file)
cmd.append('--destination-filepath')
cmd.append(os.path.join(rocrate_path,
os.path.basename(source_file)))
else:
cmd.append('--filepath')
cmd.append(source_file)
cmd.append(rocrate_path)
exit_code, out_str, err_str = self._run_cmd(cmd, cwd=rocrate_path,
timeout=timeout)
logger.debug(operation_name + ' dataset exit code: ' + str(exit_code))
if exit_code != 0 and self._raise_on_error:
raise CellMapsProvenanceError('Error adding dataset: ' +
str(out_str) + ' : ' + str(err_str))
logger.debug(operation_name + ' data set out_str: ' + str(out_str))
logger.debug(operation_name + ' data set err_str: ' + str(err_str))
return out_str