Source code for LOGS_solutions.GenerateStatistics.StatisticEntities.StatisticsInventories

import logging
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List

from LOGS.Entities import (
    DatasetRequestParameter,
    InventoryItem,
    InventoryItemRequestParameter,
    ProjectRequestParameter,
    SampleRequestParameter,
)
from LOGS.LOGS import LOGS

from .StatisticHandlerEntities import StatisticHandlerEntities


# TODO: Clear the questions about inventory statistics and implement them.
[docs] class StatisticsInventories(StatisticHandlerEntities): """This class provides methods to create statistics from LOGS inventory items data and save them as HTML or PDF files.""" def __init__( self, logs: LOGS, inventory: int, target_path: str = "./statistics", begin_date: datetime = None, end_date: datetime = None, # instruments: List = [], inventory_items: List = [], cutoff: int = 0, ): """Initialization. :param logs: LOGS object to access the LOGS web API,# :param inventory: Inventory to be included in the statistics. Has to be specified by its ID. :param target_path: The target path, where all statistics should be saved. Default: Within the folder containing the script, a new folder "statistics" is created in which all statistics are saved. :param begin_date: Lowest date limit for statistics to be created. :param end_date: Highest date limit for statistics to be created. :param instruments: List of instruments to be included in the statistics. Default: empty list -> all instruments are included. :param inventory_items: List of inventory items to be included in the statistics. Have to be specified by their IDs. Default: empty list -> all inventory items are included. :param cutoff: Only the statistics that correspond to >= the cut-off are displayed. """ self._logs = logs self._logger_inventories = logging.getLogger("StatisticInventories") self._logger_inventories.setLevel(logging.INFO) logfile_folder = Path(__file__).resolve().parent / "logfiles" logfile_folder.mkdir(parents=True, exist_ok=True) if not self._logger_inventories.hasHandlers(): formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") logconsole_handler = logging.StreamHandler(sys.stdout) logconsole_handler.setLevel(logging.INFO) logconsole_handler.setFormatter(formatter) self._logger_inventories.addHandler(logconsole_handler) super().__init__( logs, begin_date, end_date, target_path, self._logger_inventories ) self._inventory = inventory self._inventory_items = self._validate_list(inventory_items) self._inventory_name = self._logs.customType(self._inventory).name self.__inventory_item_path = self._target_path / "inventory" self.__cutoff = ( cutoff if isinstance(cutoff, int) else (_ for _ in ()).throw(ValueError("Cutoff must be an integer.")) ) if self._begin_date is None: self._begin_date = ( self._logs.datasets(DatasetRequestParameter(sortBy="CREATION_DATE")) .first() .creationDate ) self._begin_date = self._begin_date.replace( hour=0, minute=0, second=0, microsecond=0 ) if self._end_date is None: datasets_list = list( self._logs.datasets(DatasetRequestParameter(sortBy="CREATION_DATE")) ) self._end_date = ( datasets_list[-1].creationDate if datasets_list else datetime.now() ) self._end_date = (self._end_date + timedelta(days=1)).replace( hour=0, minute=0, second=0, microsecond=0 ) self._custom_fields = {}
[docs] def get_dataset_inventories(self) -> Dict: inv_items_dict = {} inv_items_total = self._logs.inventoryItems( InventoryItemRequestParameter(customTypeIds=[self._inventory]) ).count if inv_items_total == 0: self._logger_inventories.info( "No inventory items found across all time frames." ) return self._logger_inventories.info("Processing inventory items.") count = 0 for inv_item in self._logs.inventoryItems( InventoryItemRequestParameter(customTypeIds=[int(self._inventory)]) ): if self._inventory_items and inv_item.id not in self._inventory_items: continue inv_items_dict[inv_item.id] = (inv_item.name, {}, {}, {}) # Count datasets per project (project with this inventory item) for project in self._logs.projects( ProjectRequestParameter(inventoryItemIds=[inv_item.id]) ): data_proj_count = self._logs.datasets( DatasetRequestParameter( projectIds=[project.id], creationDateFrom=self._begin_date, creationDateTo=self._end_date, ) ).count if data_proj_count > 0: inv_items_dict[inv_item.id][1][project.id] = [ project.name, data_proj_count, ] print("dataset-project count: ", data_proj_count) for sample in self._logs.samples( SampleRequestParameter(inventoryItemIds=[inv_item.id]) ): data_sample_count = self._logs.datasets( DatasetRequestParameter( sampleIds=[sample.id], creationDateFrom=self._begin_date, creationDateTo=self._end_date, ) ).count if data_sample_count > 0: inv_items_dict[inv_item.id][2][sample.id] = [ sample.name, data_sample_count, ] print("sample_count: ", data_sample_count) if count % 100 == 0 and count != 0: self._logger_inventories.info( "%d/%d inventory items processed.", count, inv_items_total ) count += 1 self._logger_inventories.info( "Finished processing inventory %s.", self._inventory_name ) print( f"Total amount of inventory items in {self._inventory_name}: ", inv_items_total, )
[docs] def create_statistic(self): """Creates statistics from LOGS inventory items data and saves them as HTML or PDF files.""" self.get_dataset_inventories() print("Creating inventory statistics...")