Source code for LOGS_solutions.GenerateStatistics.StatisticNMR.StatisticsDurationTime

import csv
import logging
import re
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict
from pathvalidate import sanitize_filename

import matplotlib.pyplot as plt
from LOGS import LOGS
from LOGS.Entities import Dataset, DatasetRequestParameter

from ...Utils.csv_utils import clean_csv_text
from .StatisticHandlerNMR import StatisticHandlerNMR


[docs] class StatisticsDurationTime(StatisticHandlerNMR): """Class to generate. - statistics for the duration time of each instrument. The statistics are divided into the following parts: - Year duration time - Year month duration time - Year calendar week duration time - Comparison heatmap duration time """ def __init__( self, logs: LOGS, begin_date: datetime, end_date: datetime, target_path: str, ): """Initialization. :param logs: LOGS object to access the LOGS web API, :param begin_date: Lowest date limit for statistics to be created. :param end_date: Highest date limit for statistics to be created. :param target_path: Path where all datasets should be saved. """ self._logger_dur_time = logging.getLogger("StatisticsDurationTime") self._logger_dur_time.setLevel(logging.INFO) logfile_folder = Path(__file__).resolve().parent / "logfiles" logfile_folder.mkdir(parents=True, exist_ok=True) if not self._logger_dur_time.hasHandlers(): formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") logconsole_handler = logging.StreamHandler(sys.stdout) logconsole_handler.setLevel(logging.INFO) logconsole_handler.setFormatter(formatter) self._logger_dur_time.addHandler(logconsole_handler) super().__init__(logs, begin_date, end_date, target_path, self._logger_dur_time) if self._begin_date is None: self._begin_date = ( self._logs.datasets(DatasetRequestParameter(sortBy="CREATION_DATE")) .first() .creationDate ) self._begin_date = self._begin_date.replace( hour=0, minute=0, second=0, microsecond=0 ) if self._end_date is None: datasets_list = list( self._logs.datasets(DatasetRequestParameter(sortBy="CREATION_DATE")) ) self._end_date = ( datasets_list[-1].creationDate if datasets_list else datetime.now() ) self._end_date = (self._end_date + timedelta(days=1)).replace( hour=0, minute=0, second=0, microsecond=0 )
[docs] def sum_time_strings(self, time_string: str): """Sum up the given time_string in seconds. :param time_string: Time string in the format "1d 2h 3min 4s". :return: Total time in seconds. """ total_seconds = 0 days = re.search(r"(\d+)\s*d", time_string) houres = re.search(r"(\d+)\s*h", time_string) minutes = re.search(r"(\d+)\s*min", time_string) seconds = re.search(r"(\d+)\s*s", time_string) if days: total_seconds += int(days.group(1)) * 86400 if houres: total_seconds += int(houres.group(1)) * 3600 if minutes: total_seconds += int(minutes.group(1)) * 60 if seconds: total_seconds += int(seconds.group(1)) return total_seconds
[docs] def get_general_info(self, dataset: Dataset): """Checks if 'General information/Duration' or 'General info/Duration' exists in the dataset.""" if dataset.getParameter("General information/Duration") is not None: return dataset.getParameter("General information/Duration") if dataset.getParameter("General info/Duration") is not None: return dataset.getParameter("General info/Duration") return None
[docs] def check_duration(self, dataset: Dataset) -> bool: """Check if the duration parameter of the data set is empty or None. Write the data set to one of the following csv files: - DurationNone.csv: If the data set has no duration parameter. - NoDurationTime.csv: If the duration parameter of the data set is empty. - DurationTime.csv: If the duration parameter of the data set is not empty. """ if dataset.instrument is None or dataset.instrument == "": instrument_name = "No_Instrument" instrument_id = 0 else: instrument_name = clean_csv_text(dataset.instrument.name) instrument_id = dataset.instrument.id if self.get_general_info(dataset) is None: # If the data set has no duration parameter log an error message and write the dataset to a csv file self._logger_dur_time.warning( "Dataset has no duration parameter. %s It will not be included in the statistic.", dataset.name, ) self._target_path.mkdir(parents=True, exist_ok=True) csv_path = self._target_path / "DurationNone.csv" write_header = (not csv_path.exists()) or (csv_path.stat().st_size == 0) with open(csv_path, "a", newline="", encoding="utf-8") as error_file: writer = csv.writer(error_file, delimiter=";") if write_header: writer.writerows( [ [ f"Datasets acquired between {self._begin_date.strftime('%d/%B/%Y')} " f"and {self._end_date.strftime('%d/%B/%Y')} with 'None' as duration parameter." ], [ "Dataset", "ID", "Creation Date", "Instrument Name", "Instrument ID", ], ] ) writer.writerow( [ clean_csv_text(dataset.name), dataset.id, dataset.creationDate, instrument_name, instrument_id, ] ) return True if self.get_general_info(dataset) == "": # If the duration parameter of the data set is empty log an error message and write the dataset to a csv file self._logger_dur_time.warning( "The duration parameter of the Dataset %s is empty.", dataset.name ) self._target_path.mkdir(parents=True, exist_ok=True) csv_path = self._target_path / "NoDurationTime.csv" file_exists = csv_path.is_file() with open( self._target_path / "NoDurationTime.csv", "a", newline="" ) as error_file: writer = csv.writer(error_file, delimiter=";") if not file_exists: writer.writerows( [ [ f"Datasets acquired between {self._begin_date.strftime('%d/%B/%Y')} " f"and {self._end_date.strftime('%d/%B/%Y')} with an empty duration parameter." ], [ "Dataset", "ID", "Creation Date", "Instrument Name", "Instrument ID", ], ] ) writer.writerow( [ clean_csv_text(dataset.name), dataset.id, dataset.creationDate, instrument_name, instrument_id, ] ) return False else: # If the duration parameter is not empty, write the dataset to a csv file self._target_path.mkdir(parents=True, exist_ok=True) csv_path = self._target_path / "DurationTime.csv" file_exists = csv_path.is_file() with open( self._target_path / "DurationTime.csv", "a", newline="" ) as error_file: writer = csv.writer(error_file, delimiter=";") if not file_exists: writer.writerows( [ [ f"Datasets acquired between {self._begin_date.strftime('%d/%B/%Y')} " f"and {self._end_date.strftime('%d/%B/%Y')} with a duration parameter." ], [ "Dataset", "ID", "Duration", "Creation Date", "Instrument Name", "Instrument ID", ], ] ) writer.writerow( [ clean_csv_text(dataset.name), dataset.id, self.get_general_info(dataset), dataset.creationDate, instrument_name, instrument_id, ] ) return False
[docs] def update_instrument_dict( self, dataset: Dataset, dataset_instrument_dict: Dict ) -> Dict: """Updating the instrument dictionary for the data set. For this purpose, the current accumulated time and the creation date are added to the dicitonary. :param dataset: Data set containing the instrument. :param dataset_instrument_dict: Dictionary of all instruments with a list of the creation date of their data sets. :return: Dictionary of the instruments, each key is the id of the instrument and has the instrument name as value[0] """ try: dataset.fetchParameters() except Exception as e: self._logger_dur_time.error( "Could not fetch the full dataset. %s It will not be included in the statistic. %s", dataset.name, e, ) return dataset_instrument_dict if self.check_duration(dataset): return dataset_instrument_dict # If the data set has no instrument if dataset.instrument is None or dataset.instrument == "": if 0 not in dataset_instrument_dict: dataset_instrument_dict[0] = ["No_Instrument"] datasets_date_list = dataset_instrument_dict[0] datasets_date_list.append( ( self.sum_time_strings(self.get_general_info(dataset)), dataset.creationDate, dataset.operators, ) ) dataset_instrument_dict[0] = datasets_date_list else: if dataset.instrument.id not in dataset_instrument_dict: dataset_instrument_dict[dataset.instrument.id] = [ dataset.instrument.name ] datasets_date_list = dataset_instrument_dict[dataset.instrument.id] datasets_date_list.append( ( self.sum_time_strings(self.get_general_info(dataset)), dataset.creationDate, dataset.operators, ) ) dataset_instrument_dict[dataset.instrument.id] = datasets_date_list return dataset_instrument_dict
[docs] def create_statistic(self): """Generates the statistics for the utilization time (based on "duration") of each instrument. This statistic is divided into the following parts: - Year utilization time - Year month utilization time - Year calendar week utilization time - Comparison heatmap utilization time """ self._logger_dur_time.info( "Starting to generate a statistical analysis of the utilization time." ) # Get the total number of datasets with the format "BrukerNMR" and "NMR (Varian)" and the creation date between the begin and end date datasets_total = self._logs.datasets( DatasetRequestParameter( creationDateFrom=self._begin_date, creationDateTo=self._end_date, formatIds=["BrukerNMR", "VarianNMR"], ) ).count # Check if there are datasets with the fromat 'BrukerNMR'nand 'NMR (Varian)' in the given time frame if datasets_total == 0: self._logger_dur_time.info( "No datasets with the format 'BrukerNMR' and 'NMR (Varian)' found in the given time frame." ) return self._logger_dur_time.info( "Processing datasets with the format 'BrukerNMR' and 'VarianNMR' in the given time frame: begin date: %s - end date: %s.", self._begin_date, self._end_date, ) instrument_dict = {} count = 0 # Counter for the number of processed datasets # Get all datasets with the format "BrukerNMR" and "NMR (Varian)" and the creation date between the begin and end date for dataset in self._logs.datasets( DatasetRequestParameter( creationDateFrom=self._begin_date, creationDateTo=self._end_date, formatIds=["BrukerNMR", "VarianNMR"], ) ): # Skip datasets with invalid creation date tz = dataset.creationDate.tzinfo if ( (dataset.creationDate is None) or (datetime(1677, 9, 21, tzinfo=tz) >= dataset.creationDate) or (dataset.creationDate >= datetime(2262, 4, 11, tzinfo=tz)) ): self._logger_dur_time.warning( "Dataset %s has invalid creation date.: %s Dataset will not be included in the statistics.", dataset.id, dataset.creationDate, ) continue if count % 10000 == 0 and count != 0: self._logger_dur_time.info( "%d/%d datasets processed.", count, datasets_total, ) count += 1 instrument_dict = self.update_instrument_dict(dataset, instrument_dict) self._logger_dur_time.info( "Finished processing datasets with the format 'BrukerNMR' and 'NMR (Varian)'." ) # Create the statistics for the utilization time of each instrument self._logger_dur_time.info( "Generating reports with the statistics for the utilization time of each instrument." ) if len(instrument_dict) == 0: self._logger_dur_time.warning( "There are no datasets with utilization time." ) else: path_instrument = self._target_path / "utilization_time" / "instrument" path_instrument.mkdir(parents=True, exist_ok=True) for instrument, value in instrument_dict.items(): path_instrument_folder = path_instrument / sanitize_filename( f"instrument_{value[0]}_ID{instrument}" ) path_instrument_folder.mkdir(parents=True, exist_ok=True) fig_dict = self.create_plot_year_duration( value[1:], f"instrument {value[0]} (ID: {instrument})" ) for key, fig in fig_dict.items(): path_instrument_year = path_instrument_folder / key path_instrument_year.mkdir(parents=True, exist_ok=True) if fig is not None: self.create_report( path_instrument_year, True, False, f"instrument_{value[0]}_ID{instrument}_{key}_utilization_time", fig, ) plt.close(fig) fig_dict = self.create_plot_year_month_duration( value[1:], f"instrument {value[0]} (ID: {instrument})" ) for key, fig in fig_dict.items(): month = key.split("-")[1] year = key.split("-")[0] path_instrument_month = path_instrument_folder / year / "Months" path_instrument_year.mkdir(parents=True, exist_ok=True) if fig is not None: self.create_report( path_instrument_month, True, False, f"instrument_{value[0]}_ID{instrument}_{month}_{year}_utilization_time", fig, ) plt.close(fig) fig_dict = self.create_plot_year_calendarWeek_duration( value[1:], f"instrument {value[0]} (ID: {instrument})" ) for key, fig in fig_dict.items(): calendar_week = key.split("-")[1] year = key.split("-")[0] path_instrument_cw = path_instrument_folder / year / "CWs" path_instrument_year.mkdir(parents=True, exist_ok=True) if fig is not None: self.create_report( path_instrument_cw, True, False, f"instrument_{value[0]}_ID{instrument}_CW{calendar_week}_{year}_utilization_time", fig, ) plt.close(fig) fig = self.create_plot_comparison_heatmap_duration( instrument_dict, "instruments" ) if fig is not None: self.create_report( path_instrument, True, False, "Utilization_of_instruments", fig, ) plt.close(fig) self._logger_dur_time.info( "Finished generating reports with the statistics for the utilization time of each instrument." ) self._logger_dur_time.info("Finished generating statistic of utilization time.")