Source code for LOGS_solutions.DownloadDatasets.DatasetDownloader

#!/usr/bin/env python3

import json
import os
from typing import Optional
from pathlib import Path
from datetime import datetime, timedelta, timezone
import tempfile

from ..DownloadDatasets.DatasetHandler import DatasetHandler
from ..Utils.ProgressBar import ProgressBar

from LOGS.Entities import (
    Dataset,
    DatasetRequestParameter,
    ProjectMinimal,
    CustomFieldDataType,
    CustomFieldRequestParameter,
    Sample,
)
from LOGS.LOGS import LOGS


[docs] class DatasetDownloader: """Downloads all datasets, organized as follows: 1) Claimed/Unclaimed 2) Projects 3) Samples 4) Datasets """ def __init__( self, logs: LOGS, args, ): """Initialization. :param logs: LOGS object to access the LOGS web API :param target_path: Path where all datasets should be saved. :param project_ids: List of project ids which should be downloaded. If empty, all projects will be downloaded. :param include_metadata: True if metadata of datasets, projects and samples should be saved. :param duplicate_handling: How datasets with the same name should be handled. 1: rename, 2: overwrite, 3: take first. :param symlink_path: Path where the datasets should be sorted by the format with symlinks to the original datasets. If None, no symlinks will be created. :param start_from: datetime object to start downloading datasets created from this date on. :param sample_names: The sample names of the custom fields of the datasets to be downloaded. Needs to be set. """ self._logs = logs self._target_path = args.target_path self._project_ids = args.project_ids self._include_metadata = args.include_metadata self._duplicate_handling = args.duplicate_handling self._symlink_path = args.symlink_path self._start_from = self.valid_start_from() self._sample_names = args.sample_names if self._symlink_path is not None: if self.can_create_symlink() is False: print( "Error: Symbolic links are not supported on this system. They will not be created." ) self._symlink_path = None self._sample_ids = ( self._logs.customFields( CustomFieldRequestParameter( dataTypes=[ CustomFieldDataType.Sample, CustomFieldDataType.SampleArray, ] ) ) .ids() .toList() )
[docs] def valid_start_from(self) -> Optional[datetime]: """Check if last_state.json exists and return the dateAdded if it exists, else return None. :return: datetime object of the last dateAdded or None """ if os.path.exists("last_state.json") and ( os.path.getsize("last_state.json") > 0 ): with open("last_state.json", "r", encoding="utf-8") as stat: data = json.load(stat) if isinstance(data, dict): if "creationDate" in data: self.start_from = datetime.fromisoformat( data["creationDate"] ) - timedelta(seconds=1) return self.start_from else: return None
[docs] def download_dataset( self, project_path: str, sample_path: str, project: ProjectMinimal, dataset: Dataset, sample: Sample, ): """Parse and download dataset. Create symlinks if symlink_path is not None. Create metadata files if include_metadata is True. :param project_path: Path where the project should be saved. :param sample_path: Path where the sample should be saved. :param project: Project object :param dataset: Dataset object :param sample: Sample object """ sample_path.mkdir(parents=True, exist_ok=True) dataset_handler = DatasetHandler( dataset_target_path=sample_path, dataset=dataset, include_metadata=self._include_metadata, duplicate_handling=self._duplicate_handling, symlink_path=self._symlink_path, original_target_path=self._target_path, ) dataset_handler.parse_dataset() ProgressBar.update_processed_files() if self._include_metadata: if project is not None and type(project) != str: project_information = self._logs.project(project.id).toJson() project_info_path = project_path / "project_information.json" if not project_info_path.exists(): with open(project_info_path, "w", encoding="utf-8") as file: json.dump( project_information, file, ensure_ascii=False, indent=4, ) if ( sample is not None and type(sample) != str and sample.value is not None and sample.value != "" ): sample_information = sample.toJson() sample_info_path = ( sample_path / f"sample_information_{sample.value.name}.json" ) if not sample_info_path.exists(): with open(sample_info_path, "w", encoding="utf-8") as file: json.dump( sample_information, file, ensure_ascii=False, indent=4, )
[docs] def download_datasets_structured_helper( self, dataset: Dataset, status: str, sample: Optional[str] = None, count: int = 0, ): """Helper function to call download_datasets_structured with exception handling. :param dataset: Dataset object :param status: "Claimed" or "Unclaimed" :param sample: The sample name of the dataset as a string. If None, it will be set to "Not_mentioned_sample". :param count: The number of samples that have been processed for the current dataset. Used to determine the sample name if the sample parameter is None. """ if sample is None and count == 0: dataset_sample_name = "Not_mentioned_sample" elif sample is None: return count else: dataset_sample = sample.value if (dataset_sample is None or dataset_sample == "") and count == 0: count += 1 dataset_sample_name = "Not_mentioned_sample" elif dataset_sample is None or dataset_sample == "": return count else: dataset_sample_name = str(dataset_sample.name).replace("/", "_") with open(Path("./last_state.json"), "w", encoding="utf-8") as stat: json.dump( {"creationDate": dataset.creationDate.isoformat()}, stat ) # Save last date state in case of interruption target_path = self._target_path / status if not dataset.projects: if len(self._project_ids) > 0 and 0 not in self._project_ids: return count project_path = target_path / "NoProject" sample_path = project_path / dataset_sample_name self.download_dataset( project_path, sample_path, "NoProject", dataset, sample, ) else: for project in dataset.projects: # If project ids are given, only download datasets from this projects, else download all projects if len(self._project_ids) > 0: if project.id in self._project_ids: project_path = target_path / project.name sample_path = project_path / dataset_sample_name self.download_dataset( project_path, sample_path, project, dataset, sample, ) else: project_path = target_path / project.name sample_path = project_path / dataset_sample_name self.download_dataset( project_path, sample_path, project, dataset, sample, ) return count
[docs] def download_datasets_structured(self): """Downloads all datasets structured in the given path.""" ProgressBar.start_progressbar("Downloading datasets") try: for dataset in self._logs.datasets( DatasetRequestParameter( sortBy="CREATION_DATE", isClaimed=True, creationDateFrom=self._start_from, ) ): if dataset.customValues is None: self.download_datasets_structured_helper(dataset, "Claimed") continue sample_counter = 0 for sample in self._sample_names: custom_value = dataset.customValues.customField(nameOrId=sample) if custom_value is None: if sample_counter == len(self._sample_names): sample_counter = self.download_datasets_structured_helper( dataset, "Claimed", count=sample_counter ) else: sample_counter = self.download_datasets_structured_helper( dataset, "Claimed", custom_value, count=sample_counter ) for dataset in self._logs.datasets( DatasetRequestParameter( sortBy="CREATION_DATE", isClaimed=False, creationDateFrom=self._start_from, ) ): if dataset.customValues is None: self.download_datasets_structured_helper(dataset, "Unclaimed") continue sample_counter = 0 for sample in self._sample_names: custom_value = dataset.customValues.customField(nameOrId=sample) if custom_value is None: if sample_counter == len(self._sample_names): sample_counter = self.download_datasets_structured_helper( dataset, "Unclaimed", count=sample_counter ) else: sample_counter = self.download_datasets_structured_helper( dataset, "Unclaimed", custom_value, count=sample_counter ) except Exception as e: print(f"An error occurred: {e}") with open("last_state.json", "w", encoding="utf-8") as stat: json.dump( {"creationDate": datetime.now(timezone.utc).isoformat()}, stat ) raise finally: ProgressBar.stop_progressbar() ProgressBar.stop_progressbar()