#!/usr/bin/env python3
import json
import os
from typing import Optional
from pathlib import Path
from datetime import datetime, timedelta, timezone
import tempfile
from ..DownloadDatasets.DatasetHandler import DatasetHandler
from ..Utils.ProgressBar import ProgressBar
from LOGS.Entities import (
Dataset,
DatasetRequestParameter,
ProjectMinimal,
CustomFieldDataType,
CustomFieldRequestParameter,
Sample,
)
from LOGS.LOGS import LOGS
[docs]
class DatasetDownloader:
"""Downloads all datasets, organized as follows:
1) Claimed/Unclaimed
2) Projects
3) Samples
4) Datasets
"""
def __init__(
self,
logs: LOGS,
args,
):
"""Initialization.
:param logs: LOGS object to access the LOGS web API
:param target_path: Path where all datasets should be saved.
:param project_ids: List of project ids which should be
downloaded. If empty, all projects will be downloaded.
:param include_metadata: True if metadata of datasets, projects
and samples should be saved.
:param duplicate_handling: How datasets with the same name
should be handled. 1: rename, 2: overwrite, 3: take first.
:param symlink_path: Path where the datasets should be sorted by
the format with symlinks to the original datasets. If None,
no symlinks will be created.
:param start_from: datetime object to start downloading datasets
created from this date on.
:param sample_names: The sample names of the custom fields of the datasets to be downloaded. Needs to be set.
"""
self._logs = logs
self._target_path = args.target_path
self._project_ids = args.project_ids
self._include_metadata = args.include_metadata
self._duplicate_handling = args.duplicate_handling
self._symlink_path = args.symlink_path
self._start_from = self.valid_start_from()
self._sample_names = args.sample_names
if self._symlink_path is not None:
if self.can_create_symlink() is False:
print(
"Error: Symbolic links are not supported on this system. They will not be created."
)
self._symlink_path = None
self._sample_ids = (
self._logs.customFields(
CustomFieldRequestParameter(
dataTypes=[
CustomFieldDataType.Sample,
CustomFieldDataType.SampleArray,
]
)
)
.ids()
.toList()
)
[docs]
def can_create_symlink(self) -> bool:
"""Check if the system supports creating symbolic links.
:return: True if the system supports creating symbolic links,
False otherwise.
"""
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
target = tmp_path / "dummy.txt"
link = tmp_path / "link.txt"
try:
target.write_text("symlink test")
link.symlink_to(target)
return link.is_symlink() and link.read_text() == "symlink test"
except (OSError, NotImplementedError):
return False
[docs]
def valid_start_from(self) -> Optional[datetime]:
"""Check if last_state.json exists and return the dateAdded if it
exists, else return None.
:return: datetime object of the last dateAdded or None
"""
if os.path.exists("last_state.json") and (
os.path.getsize("last_state.json") > 0
):
with open("last_state.json", "r", encoding="utf-8") as stat:
data = json.load(stat)
if isinstance(data, dict):
if "creationDate" in data:
self.start_from = datetime.fromisoformat(
data["creationDate"]
) - timedelta(seconds=1)
return self.start_from
else:
return None
[docs]
def download_dataset(
self,
project_path: str,
sample_path: str,
project: ProjectMinimal,
dataset: Dataset,
sample: Sample,
):
"""Parse and download dataset. Create symlinks if symlink_path is not
None. Create metadata files if include_metadata is True.
:param project_path: Path where the project should be saved.
:param sample_path: Path where the sample should be saved.
:param project: Project object
:param dataset: Dataset object
:param sample: Sample object
"""
sample_path.mkdir(parents=True, exist_ok=True)
dataset_handler = DatasetHandler(
dataset_target_path=sample_path,
dataset=dataset,
include_metadata=self._include_metadata,
duplicate_handling=self._duplicate_handling,
symlink_path=self._symlink_path,
original_target_path=self._target_path,
)
dataset_handler.parse_dataset()
ProgressBar.update_processed_files()
if self._include_metadata:
if project is not None and type(project) != str:
project_information = self._logs.project(project.id).toJson()
project_info_path = project_path / "project_information.json"
if not project_info_path.exists():
with open(project_info_path, "w", encoding="utf-8") as file:
json.dump(
project_information,
file,
ensure_ascii=False,
indent=4,
)
if (
sample is not None
and type(sample) != str
and sample.value is not None
and sample.value != ""
):
sample_information = sample.toJson()
sample_info_path = (
sample_path / f"sample_information_{sample.value.name}.json"
)
if not sample_info_path.exists():
with open(sample_info_path, "w", encoding="utf-8") as file:
json.dump(
sample_information,
file,
ensure_ascii=False,
indent=4,
)
[docs]
def download_datasets_structured_helper(
self,
dataset: Dataset,
status: str,
sample: Optional[str] = None,
count: int = 0,
):
"""Helper function to call download_datasets_structured with
exception handling.
:param dataset: Dataset object
:param status: "Claimed" or "Unclaimed"
:param sample: The sample name of the dataset as a string. If None, it will be set to "Not_mentioned_sample".
:param count: The number of samples that have been processed for the current dataset. Used to determine the sample name if the sample parameter is None.
"""
if sample is None and count == 0:
dataset_sample_name = "Not_mentioned_sample"
elif sample is None:
return count
else:
dataset_sample = sample.value
if (dataset_sample is None or dataset_sample == "") and count == 0:
count += 1
dataset_sample_name = "Not_mentioned_sample"
elif dataset_sample is None or dataset_sample == "":
return count
else:
dataset_sample_name = str(dataset_sample.name).replace("/", "_")
with open(Path("./last_state.json"), "w", encoding="utf-8") as stat:
json.dump(
{"creationDate": dataset.creationDate.isoformat()}, stat
) # Save last date state in case of interruption
target_path = self._target_path / status
if not dataset.projects:
if len(self._project_ids) > 0 and 0 not in self._project_ids:
return count
project_path = target_path / "NoProject"
sample_path = project_path / dataset_sample_name
self.download_dataset(
project_path,
sample_path,
"NoProject",
dataset,
sample,
)
else:
for project in dataset.projects:
# If project ids are given, only download datasets from this projects, else download all projects
if len(self._project_ids) > 0:
if project.id in self._project_ids:
project_path = target_path / project.name
sample_path = project_path / dataset_sample_name
self.download_dataset(
project_path,
sample_path,
project,
dataset,
sample,
)
else:
project_path = target_path / project.name
sample_path = project_path / dataset_sample_name
self.download_dataset(
project_path,
sample_path,
project,
dataset,
sample,
)
return count
[docs]
def download_datasets_structured(self):
"""Downloads all datasets structured in the given path."""
ProgressBar.start_progressbar("Downloading datasets")
try:
for dataset in self._logs.datasets(
DatasetRequestParameter(
sortBy="CREATION_DATE",
isClaimed=True,
creationDateFrom=self._start_from,
)
):
if dataset.customValues is None:
self.download_datasets_structured_helper(dataset, "Claimed")
continue
sample_counter = 0
for sample in self._sample_names:
custom_value = dataset.customValues.customField(nameOrId=sample)
if custom_value is None:
if sample_counter == len(self._sample_names):
sample_counter = self.download_datasets_structured_helper(
dataset, "Claimed", count=sample_counter
)
else:
sample_counter = self.download_datasets_structured_helper(
dataset, "Claimed", custom_value, count=sample_counter
)
for dataset in self._logs.datasets(
DatasetRequestParameter(
sortBy="CREATION_DATE",
isClaimed=False,
creationDateFrom=self._start_from,
)
):
if dataset.customValues is None:
self.download_datasets_structured_helper(dataset, "Unclaimed")
continue
sample_counter = 0
for sample in self._sample_names:
custom_value = dataset.customValues.customField(nameOrId=sample)
if custom_value is None:
if sample_counter == len(self._sample_names):
sample_counter = self.download_datasets_structured_helper(
dataset, "Unclaimed", count=sample_counter
)
else:
sample_counter = self.download_datasets_structured_helper(
dataset, "Unclaimed", custom_value, count=sample_counter
)
except Exception as e:
print(f"An error occurred: {e}")
with open("last_state.json", "w", encoding="utf-8") as stat:
json.dump(
{"creationDate": datetime.now(timezone.utc).isoformat()}, stat
)
raise
finally:
ProgressBar.stop_progressbar()
ProgressBar.stop_progressbar()