Source code for LOGS_solutions.DownloadDatasets.DatasetDownloader

#!/usr/bin/env python3

import json
import os
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional

from LOGS.Entities import (
    CustomFieldDataType,
    CustomFieldRequestParameter,
    Dataset,
    DatasetRequestParameter,
    ProjectMinimal,
    Sample,
)
from LOGS.LOGS import LOGS
from pathvalidate import sanitize_filename

from ..DownloadDatasets.DatasetHandler import DatasetHandler
from ..Utils.ProgressBar import ProgressBar


[docs] class DatasetDownloader: """Downloads all datasets, organized as follows: 1) Claimed/Unclaimed 2) Projects 3) Samples 4) Datasets """ def __init__( self, logs: LOGS, args, ): """Initialization. :param logs: LOGS object to access the LOGS web API :param target_path: Path where all datasets should be saved. :param project_ids: List of project ids which should be downloaded. If empty, all projects will be downloaded. :param include_metadata: True if metadata of datasets, projects and samples should be saved. :param duplicate_handling: How datasets with the same name should be handled. 1: rename, 2: overwrite, 3: take first. :param symlink_path: Path where the datasets should be sorted by the format with symlinks to the original datasets. If None, no symlinks will be created. :param start_from: datetime object to start downloading datasets created from this date on. :param sample_names: The sample names of the custom fields of the datasets to be downloaded. Needs to be set. """ self._logs = logs self._target_path = args.target_path self._project_ids = args.project_ids self._include_metadata = args.include_metadata self._duplicate_handling = args.duplicate_handling self._symlink_path = args.symlink_path self._start_from = self.valid_start_from() self._sample_names = args.sample_names if self._symlink_path is not None: if self.can_create_symlink() is False: print( "Error: Symbolic links are not supported on this system. They will not be created." ) self._symlink_path = None self._sample_ids = ( self._logs.customFields( CustomFieldRequestParameter( dataTypes=[ CustomFieldDataType.Sample, CustomFieldDataType.SampleArray, ] ) ) .ids() .toList() )
[docs] def valid_start_from(self) -> Optional[datetime]: """Check if last_state.json exists and return the dateAdded if it exists, else return None. :return: datetime object of the last dateAdded or None """ if os.path.exists("last_state.json") and ( os.path.getsize("last_state.json") > 0 ): with open("last_state.json", "r", encoding="utf-8") as stat: data = json.load(stat) if isinstance(data, dict): if "creationDate" in data: self.start_from = datetime.fromisoformat( data["creationDate"] ) - timedelta(seconds=1) return self.start_from else: return None
[docs] def download_dataset( self, project_path: str, sample_path: str, project: ProjectMinimal, dataset: Dataset, sample: Sample, ): """Parse and download dataset. Create symlinks if symlink_path is not None. Create metadata files if include_metadata is True. :param project_path: Path where the project should be saved. :param sample_path: Path where the sample should be saved. :param project: Project object :param dataset: Dataset object :param sample: Sample object """ sample_path.mkdir(parents=True, exist_ok=True) dataset_handler = DatasetHandler( dataset_target_path=sample_path, dataset=dataset, include_metadata=self._include_metadata, duplicate_handling=self._duplicate_handling, symlink_path=self._symlink_path, original_target_path=self._target_path, ) dataset_handler.parse_dataset() ProgressBar.update_processed_files() if self._include_metadata: if project is not None and type(project) != str: project_information = self._logs.project(project.id).toJson() project_info_path = project_path / "project_information.json" if not project_info_path.exists(): with open(project_info_path, "w", encoding="utf-8") as file: json.dump( project_information, file, ensure_ascii=False, indent=4, ) if ( sample is not None and type(sample) != str and sample.value is not None and sample.value != "" ): sample_information = sample.toJson() sample_info_path = sample_path / sanitize_filename( f"sample_information_{sample.value.name}.json" ) if not sample_info_path.exists(): with open(sample_info_path, "w", encoding="utf-8") as file: json.dump( sample_information, file, ensure_ascii=False, indent=4, )
[docs] def download_datasets_structured_helper( self, dataset: Dataset, status: str, sample: Optional[str] = None, found_sample: bool = False, ): """Helper function to call download_datasets_structured with exception handling. :param dataset: Dataset object :param status: "Claimed" or "Unclaimed" :param sample: The sample name of the dataset as a string. If None, it will be set to "Not_mentioned_sample". :param found_sample: Boolean indicating if a sample has been found for the current dataset. """ if sample is None and not found_sample: dataset_sample_name = "Not_mentioned_sample" elif sample is None: return found_sample else: dataset_sample = sample.value if (dataset_sample is None or dataset_sample == "") and not found_sample: dataset_sample_name = "Not_mentioned_sample" elif dataset_sample is None or dataset_sample == "": return found_sample else: dataset_sample_name = sanitize_filename( str(dataset_sample.name).replace(" ", "_") ) with open(Path("./last_state.json"), "w", encoding="utf-8") as stat: json.dump( {"creationDate": dataset.creationDate.isoformat()}, stat ) # Save last date state in case of interruption target_path = self._target_path / status if not dataset.projects: if len(self._project_ids) > 0 and 0 not in self._project_ids: return found_sample project_path = target_path / "NoProject" sample_path = project_path / dataset_sample_name self.download_dataset( project_path, sample_path, "NoProject", dataset, sample, ) else: for project in dataset.projects: # If project ids are given, only download datasets from this projects, else download all projects if len(self._project_ids) > 0: if project.id in self._project_ids: project_path = target_path / sanitize_filename(project.name) sample_path = project_path / dataset_sample_name self.download_dataset( project_path, sample_path, project, dataset, sample, ) else: project_path = target_path / sanitize_filename(project.name) sample_path = project_path / dataset_sample_name self.download_dataset( project_path, sample_path, project, dataset, sample, ) return True
[docs] def download_datasets_structured(self): """Downloads all datasets structured in the given path.""" ProgressBar.start_progressbar("Downloading datasets") try: for dataset in self._logs.datasets( DatasetRequestParameter( sortBy="CREATION_DATE", isClaimed=True, creationDateFrom=self._start_from, ) ): if dataset.customValues is None: self.download_datasets_structured_helper(dataset, "Claimed") continue found_sample = False for sample in self._sample_names: custom_value = dataset.customValues.customField(nameOrId=sample) if custom_value is not None: found_sample = self.download_datasets_structured_helper( dataset, "Claimed", custom_value, found_sample=found_sample ) if not found_sample: self.download_datasets_structured_helper( dataset, "Claimed", found_sample=False ) for dataset in self._logs.datasets( DatasetRequestParameter( sortBy="CREATION_DATE", isClaimed=False, creationDateFrom=self._start_from, ) ): if dataset.customValues is None: self.download_datasets_structured_helper(dataset, "Unclaimed") continue found_sample = False for sample in self._sample_names: custom_value = dataset.customValues.customField(nameOrId=sample) if custom_value is not None: found_sample = self.download_datasets_structured_helper( dataset, "Unclaimed", custom_value, found_sample=found_sample, ) if not found_sample: self.download_datasets_structured_helper( dataset, "Unclaimed", found_sample=False ) except Exception as e: print(f"An error occurred: {e}") with open("last_state.json", "w", encoding="utf-8") as stat: json.dump( {"creationDate": datetime.now(timezone.utc).isoformat()}, stat ) raise finally: ProgressBar.stop_progressbar() ProgressBar.stop_progressbar()