#!/usr/bin/env python3
import json
import os
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from LOGS.Entities import (
CustomFieldDataType,
CustomFieldRequestParameter,
Dataset,
DatasetRequestParameter,
ProjectMinimal,
Sample,
)
from LOGS.LOGS import LOGS
from pathvalidate import sanitize_filename
from ..DownloadDatasets.DatasetHandler import DatasetHandler
from ..Utils.ProgressBar import ProgressBar
[docs]
class DatasetDownloader:
"""Downloads all datasets, organized as follows:
1) Claimed/Unclaimed
2) Projects
3) Samples
4) Datasets
"""
def __init__(
self,
logs: LOGS,
args,
):
"""Initialization.
:param logs: LOGS object to access the LOGS web API
:param target_path: Path where all datasets should be saved.
:param project_ids: List of project ids which should be
downloaded. If empty, all projects will be downloaded.
:param include_metadata: True if metadata of datasets, projects
and samples should be saved.
:param duplicate_handling: How datasets with the same name
should be handled. 1: rename, 2: overwrite, 3: take first.
:param symlink_path: Path where the datasets should be sorted by
the format with symlinks to the original datasets. If None,
no symlinks will be created.
:param start_from: datetime object to start downloading datasets
created from this date on.
:param sample_names: The sample names of the custom fields of the datasets to be downloaded. Needs to be set.
"""
self._logs = logs
self._target_path = args.target_path
self._project_ids = args.project_ids
self._include_metadata = args.include_metadata
self._duplicate_handling = args.duplicate_handling
self._symlink_path = args.symlink_path
self._start_from = self.valid_start_from()
self._sample_names = args.sample_names
if self._symlink_path is not None:
if self.can_create_symlink() is False:
print(
"Error: Symbolic links are not supported on this system. They will not be created."
)
self._symlink_path = None
self._sample_ids = (
self._logs.customFields(
CustomFieldRequestParameter(
dataTypes=[
CustomFieldDataType.Sample,
CustomFieldDataType.SampleArray,
]
)
)
.ids()
.toList()
)
[docs]
def can_create_symlink(self) -> bool:
"""Check if the system supports creating symbolic links.
:return: True if the system supports creating symbolic links,
False otherwise.
"""
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
target = tmp_path / "dummy.txt"
link = tmp_path / "link.txt"
try:
target.write_text("symlink test")
link.symlink_to(target)
return link.is_symlink() and link.read_text() == "symlink test"
except (OSError, NotImplementedError):
return False
[docs]
def valid_start_from(self) -> Optional[datetime]:
"""Check if last_state.json exists and return the dateAdded if it
exists, else return None.
:return: datetime object of the last dateAdded or None
"""
if os.path.exists("last_state.json") and (
os.path.getsize("last_state.json") > 0
):
with open("last_state.json", "r", encoding="utf-8") as stat:
data = json.load(stat)
if isinstance(data, dict):
if "creationDate" in data:
self.start_from = datetime.fromisoformat(
data["creationDate"]
) - timedelta(seconds=1)
return self.start_from
else:
return None
[docs]
def download_dataset(
self,
project_path: str,
sample_path: str,
project: ProjectMinimal,
dataset: Dataset,
sample: Sample,
):
"""Parse and download dataset. Create symlinks if symlink_path is not
None. Create metadata files if include_metadata is True.
:param project_path: Path where the project should be saved.
:param sample_path: Path where the sample should be saved.
:param project: Project object
:param dataset: Dataset object
:param sample: Sample object
"""
sample_path.mkdir(parents=True, exist_ok=True)
dataset_handler = DatasetHandler(
dataset_target_path=sample_path,
dataset=dataset,
include_metadata=self._include_metadata,
duplicate_handling=self._duplicate_handling,
symlink_path=self._symlink_path,
original_target_path=self._target_path,
)
dataset_handler.parse_dataset()
ProgressBar.update_processed_files()
if self._include_metadata:
if project is not None and type(project) != str:
project_information = self._logs.project(project.id).toJson()
project_info_path = project_path / "project_information.json"
if not project_info_path.exists():
with open(project_info_path, "w", encoding="utf-8") as file:
json.dump(
project_information,
file,
ensure_ascii=False,
indent=4,
)
if (
sample is not None
and type(sample) != str
and sample.value is not None
and sample.value != ""
):
sample_information = sample.toJson()
sample_info_path = sample_path / sanitize_filename(
f"sample_information_{sample.value.name}.json"
)
if not sample_info_path.exists():
with open(sample_info_path, "w", encoding="utf-8") as file:
json.dump(
sample_information,
file,
ensure_ascii=False,
indent=4,
)
[docs]
def download_datasets_structured_helper(
self,
dataset: Dataset,
status: str,
sample: Optional[str] = None,
found_sample: bool = False,
):
"""Helper function to call download_datasets_structured with
exception handling.
:param dataset: Dataset object
:param status: "Claimed" or "Unclaimed"
:param sample: The sample name of the dataset as a string. If None, it will be set to "Not_mentioned_sample".
:param found_sample: Boolean indicating if a sample has been found for the current dataset.
"""
if sample is None and not found_sample:
dataset_sample_name = "Not_mentioned_sample"
elif sample is None:
return found_sample
else:
dataset_sample = sample.value
if (dataset_sample is None or dataset_sample == "") and not found_sample:
dataset_sample_name = "Not_mentioned_sample"
elif dataset_sample is None or dataset_sample == "":
return found_sample
else:
dataset_sample_name = sanitize_filename(
str(dataset_sample.name).replace(" ", "_")
)
with open(Path("./last_state.json"), "w", encoding="utf-8") as stat:
json.dump(
{"creationDate": dataset.creationDate.isoformat()}, stat
) # Save last date state in case of interruption
target_path = self._target_path / status
if not dataset.projects:
if len(self._project_ids) > 0 and 0 not in self._project_ids:
return found_sample
project_path = target_path / "NoProject"
sample_path = project_path / dataset_sample_name
self.download_dataset(
project_path,
sample_path,
"NoProject",
dataset,
sample,
)
else:
for project in dataset.projects:
# If project ids are given, only download datasets from this projects, else download all projects
if len(self._project_ids) > 0:
if project.id in self._project_ids:
project_path = target_path / sanitize_filename(project.name)
sample_path = project_path / dataset_sample_name
self.download_dataset(
project_path,
sample_path,
project,
dataset,
sample,
)
else:
project_path = target_path / sanitize_filename(project.name)
sample_path = project_path / dataset_sample_name
self.download_dataset(
project_path,
sample_path,
project,
dataset,
sample,
)
return True
[docs]
def download_datasets_structured(self):
"""Downloads all datasets structured in the given path."""
ProgressBar.start_progressbar("Downloading datasets")
try:
for dataset in self._logs.datasets(
DatasetRequestParameter(
sortBy="CREATION_DATE",
isClaimed=True,
creationDateFrom=self._start_from,
)
):
if dataset.customValues is None:
self.download_datasets_structured_helper(dataset, "Claimed")
continue
found_sample = False
for sample in self._sample_names:
custom_value = dataset.customValues.customField(nameOrId=sample)
if custom_value is not None:
found_sample = self.download_datasets_structured_helper(
dataset, "Claimed", custom_value, found_sample=found_sample
)
if not found_sample:
self.download_datasets_structured_helper(
dataset, "Claimed", found_sample=False
)
for dataset in self._logs.datasets(
DatasetRequestParameter(
sortBy="CREATION_DATE",
isClaimed=False,
creationDateFrom=self._start_from,
)
):
if dataset.customValues is None:
self.download_datasets_structured_helper(dataset, "Unclaimed")
continue
found_sample = False
for sample in self._sample_names:
custom_value = dataset.customValues.customField(nameOrId=sample)
if custom_value is not None:
found_sample = self.download_datasets_structured_helper(
dataset,
"Unclaimed",
custom_value,
found_sample=found_sample,
)
if not found_sample:
self.download_datasets_structured_helper(
dataset, "Unclaimed", found_sample=False
)
except Exception as e:
print(f"An error occurred: {e}")
with open("last_state.json", "w", encoding="utf-8") as stat:
json.dump(
{"creationDate": datetime.now(timezone.utc).isoformat()}, stat
)
raise
finally:
ProgressBar.stop_progressbar()
ProgressBar.stop_progressbar()