import logging
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List
from LOGS.Entities import (
DatasetRequestParameter,
InventoryItem,
InventoryItemRequestParameter,
ProjectRequestParameter,
SampleRequestParameter,
)
from LOGS.LOGS import LOGS
from .StatisticHandlerEntities import StatisticHandlerEntities
# TODO: Clear the questions about inventory statistics and implement them.
[docs]
class StatisticsInventories(StatisticHandlerEntities):
"""This class provides methods to create statistics from LOGS inventory items data and save
them as HTML or PDF files."""
def __init__(
self,
logs: LOGS,
inventory: int,
target_path: str = "./statistics",
begin_date: datetime = None,
end_date: datetime = None,
# instruments: List = [],
inventory_items: List = [],
cutoff: int = 0,
):
"""Initialization.
:param logs: LOGS object to access the LOGS web API,#
:param inventory: Inventory to be included in the statistics. Has to be specified by its ID.
:param target_path: The target path, where all statistics should be saved.
Default: Within the folder containing the script, a new folder "statistics"
is created in which all statistics are saved.
:param begin_date: Lowest date limit for statistics to be created.
:param end_date: Highest date limit for statistics to be created.
:param instruments: List of instruments to be included in the statistics.
Default: empty list -> all instruments are included.
:param inventory_items: List of inventory items to be included in the statistics. Have to be specified by their IDs.
Default: empty list -> all inventory items are included.
:param cutoff: Only the statistics that correspond to >= the cut-off are displayed.
"""
self._logs = logs
self._logger_inventories = logging.getLogger("StatisticInventories")
self._logger_inventories.setLevel(logging.INFO)
logfile_folder = Path(__file__).resolve().parent / "logfiles"
logfile_folder.mkdir(parents=True, exist_ok=True)
if not self._logger_inventories.hasHandlers():
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
logconsole_handler = logging.StreamHandler(sys.stdout)
logconsole_handler.setLevel(logging.INFO)
logconsole_handler.setFormatter(formatter)
self._logger_inventories.addHandler(logconsole_handler)
super().__init__(
logs, begin_date, end_date, target_path, self._logger_inventories
)
self._inventory = inventory
self._inventory_items = self._validate_list(inventory_items)
self._inventory_name = self._logs.customType(self._inventory).name
self.__inventory_item_path = self._target_path / "inventory"
self.__cutoff = (
cutoff
if isinstance(cutoff, int)
else (_ for _ in ()).throw(ValueError("Cutoff must be an integer."))
)
if self._begin_date is None:
self._begin_date = (
self._logs.datasets(DatasetRequestParameter(sortBy="CREATION_DATE"))
.first()
.creationDate
)
self._begin_date = self._begin_date.replace(
hour=0, minute=0, second=0, microsecond=0
)
if self._end_date is None:
datasets_list = list(
self._logs.datasets(DatasetRequestParameter(sortBy="CREATION_DATE"))
)
self._end_date = (
datasets_list[-1].creationDate if datasets_list else datetime.now()
)
self._end_date = (self._end_date + timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
self._custom_fields = {}
[docs]
def get_dataset_inventories(self) -> Dict:
inv_items_dict = {}
inv_items_total = self._logs.inventoryItems(
InventoryItemRequestParameter(customTypeIds=[self._inventory])
).count
if inv_items_total == 0:
self._logger_inventories.info(
"No inventory items found across all time frames."
)
return
self._logger_inventories.info("Processing inventory items.")
count = 0
for inv_item in self._logs.inventoryItems(
InventoryItemRequestParameter(customTypeIds=[int(self._inventory)])
):
if self._inventory_items and inv_item.id not in self._inventory_items:
continue
inv_items_dict[inv_item.id] = (inv_item.name, {}, {}, {})
# Count datasets per project (project with this inventory item)
for project in self._logs.projects(
ProjectRequestParameter(inventoryItemIds=[inv_item.id])
):
data_proj_count = self._logs.datasets(
DatasetRequestParameter(
projectIds=[project.id],
creationDateFrom=self._begin_date,
creationDateTo=self._end_date,
)
).count
if data_proj_count > 0:
inv_items_dict[inv_item.id][1][project.id] = [
project.name,
data_proj_count,
]
print("dataset-project count: ", data_proj_count)
for sample in self._logs.samples(
SampleRequestParameter(inventoryItemIds=[inv_item.id])
):
data_sample_count = self._logs.datasets(
DatasetRequestParameter(
sampleIds=[sample.id],
creationDateFrom=self._begin_date,
creationDateTo=self._end_date,
)
).count
if data_sample_count > 0:
inv_items_dict[inv_item.id][2][sample.id] = [
sample.name,
data_sample_count,
]
print("sample_count: ", data_sample_count)
if count % 100 == 0 and count != 0:
self._logger_inventories.info(
"%d/%d inventory items processed.", count, inv_items_total
)
count += 1
self._logger_inventories.info(
"Finished processing inventory %s.", self._inventory_name
)
print(
f"Total amount of inventory items in {self._inventory_name}: ",
inv_items_total,
)
[docs]
def create_statistic(self):
"""Creates statistics from LOGS inventory items data and saves them as HTML or PDF files."""
self.get_dataset_inventories()
print("Creating inventory statistics...")