import csv
import logging
import re
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict
from pathvalidate import sanitize_filename
import matplotlib.pyplot as plt
from LOGS import LOGS
from LOGS.Entities import Dataset, DatasetRequestParameter
from ...Utils.csv_utils import clean_csv_text
from .StatisticHandlerNMR import StatisticHandlerNMR
[docs]
class StatisticsDurationTime(StatisticHandlerNMR):
"""Class to generate.
- statistics for the duration time of each instrument. The statistics are divided into the following parts:
- Year duration time
- Year month duration time
- Year calendar week duration time
- Comparison heatmap duration time
"""
def __init__(
self,
logs: LOGS,
begin_date: datetime,
end_date: datetime,
target_path: str,
):
"""Initialization.
:param logs: LOGS object to access the LOGS web API,
:param begin_date: Lowest date limit for statistics to be
created.
:param end_date: Highest date limit for statistics to be
created.
:param target_path: Path where all datasets should be saved.
"""
self._logger_dur_time = logging.getLogger("StatisticsDurationTime")
self._logger_dur_time.setLevel(logging.INFO)
logfile_folder = Path(__file__).resolve().parent / "logfiles"
logfile_folder.mkdir(parents=True, exist_ok=True)
if not self._logger_dur_time.hasHandlers():
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
logconsole_handler = logging.StreamHandler(sys.stdout)
logconsole_handler.setLevel(logging.INFO)
logconsole_handler.setFormatter(formatter)
self._logger_dur_time.addHandler(logconsole_handler)
super().__init__(logs, begin_date, end_date, target_path, self._logger_dur_time)
if self._begin_date is None:
self._begin_date = (
self._logs.datasets(DatasetRequestParameter(sortBy="CREATION_DATE"))
.first()
.creationDate
)
self._begin_date = self._begin_date.replace(
hour=0, minute=0, second=0, microsecond=0
)
if self._end_date is None:
datasets_list = list(
self._logs.datasets(DatasetRequestParameter(sortBy="CREATION_DATE"))
)
self._end_date = (
datasets_list[-1].creationDate if datasets_list else datetime.now()
)
self._end_date = (self._end_date + timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
[docs]
def sum_time_strings(self, time_string: str):
"""Sum up the given time_string in seconds.
:param time_string: Time string in the format "1d 2h 3min 4s".
:return: Total time in seconds.
"""
total_seconds = 0
days = re.search(r"(\d+)\s*d", time_string)
houres = re.search(r"(\d+)\s*h", time_string)
minutes = re.search(r"(\d+)\s*min", time_string)
seconds = re.search(r"(\d+)\s*s", time_string)
if days:
total_seconds += int(days.group(1)) * 86400
if houres:
total_seconds += int(houres.group(1)) * 3600
if minutes:
total_seconds += int(minutes.group(1)) * 60
if seconds:
total_seconds += int(seconds.group(1))
return total_seconds
[docs]
def get_general_info(self, dataset: Dataset):
"""Checks if 'General information/Duration' or 'General info/Duration'
exists in the dataset."""
if dataset.getParameter("General information/Duration") is not None:
return dataset.getParameter("General information/Duration")
if dataset.getParameter("General info/Duration") is not None:
return dataset.getParameter("General info/Duration")
return None
[docs]
def check_duration(self, dataset: Dataset) -> bool:
"""Check if the duration parameter of the data set is empty or None.
Write the data set to one of the following csv files:
- DurationNone.csv: If the data set has no duration parameter.
- NoDurationTime.csv: If the duration parameter of the data set is empty.
- DurationTime.csv: If the duration parameter of the data set is not empty.
"""
if dataset.instrument is None or dataset.instrument == "":
instrument_name = "No_Instrument"
instrument_id = 0
else:
instrument_name = clean_csv_text(dataset.instrument.name)
instrument_id = dataset.instrument.id
if self.get_general_info(dataset) is None:
# If the data set has no duration parameter log an error message and write the dataset to a csv file
self._logger_dur_time.warning(
"Dataset has no duration parameter. %s It will not be included in the statistic.",
dataset.name,
)
self._target_path.mkdir(parents=True, exist_ok=True)
csv_path = self._target_path / "DurationNone.csv"
write_header = (not csv_path.exists()) or (csv_path.stat().st_size == 0)
with open(csv_path, "a", newline="", encoding="utf-8") as error_file:
writer = csv.writer(error_file, delimiter=";")
if write_header:
writer.writerows(
[
[
f"Datasets acquired between {self._begin_date.strftime('%d/%B/%Y')} "
f"and {self._end_date.strftime('%d/%B/%Y')} with 'None' as duration parameter."
],
[
"Dataset",
"ID",
"Creation Date",
"Instrument Name",
"Instrument ID",
],
]
)
writer.writerow(
[
clean_csv_text(dataset.name),
dataset.id,
dataset.creationDate,
instrument_name,
instrument_id,
]
)
return True
if self.get_general_info(dataset) == "":
# If the duration parameter of the data set is empty log an error message and write the dataset to a csv file
self._logger_dur_time.warning(
"The duration parameter of the Dataset %s is empty.", dataset.name
)
self._target_path.mkdir(parents=True, exist_ok=True)
csv_path = self._target_path / "NoDurationTime.csv"
file_exists = csv_path.is_file()
with open(
self._target_path / "NoDurationTime.csv", "a", newline=""
) as error_file:
writer = csv.writer(error_file, delimiter=";")
if not file_exists:
writer.writerows(
[
[
f"Datasets acquired between {self._begin_date.strftime('%d/%B/%Y')} "
f"and {self._end_date.strftime('%d/%B/%Y')} with an empty duration parameter."
],
[
"Dataset",
"ID",
"Creation Date",
"Instrument Name",
"Instrument ID",
],
]
)
writer.writerow(
[
clean_csv_text(dataset.name),
dataset.id,
dataset.creationDate,
instrument_name,
instrument_id,
]
)
return False
else:
# If the duration parameter is not empty, write the dataset to a csv file
self._target_path.mkdir(parents=True, exist_ok=True)
csv_path = self._target_path / "DurationTime.csv"
file_exists = csv_path.is_file()
with open(
self._target_path / "DurationTime.csv", "a", newline=""
) as error_file:
writer = csv.writer(error_file, delimiter=";")
if not file_exists:
writer.writerows(
[
[
f"Datasets acquired between {self._begin_date.strftime('%d/%B/%Y')} "
f"and {self._end_date.strftime('%d/%B/%Y')} with a duration parameter."
],
[
"Dataset",
"ID",
"Duration",
"Creation Date",
"Instrument Name",
"Instrument ID",
],
]
)
writer.writerow(
[
clean_csv_text(dataset.name),
dataset.id,
self.get_general_info(dataset),
dataset.creationDate,
instrument_name,
instrument_id,
]
)
return False
[docs]
def update_instrument_dict(
self, dataset: Dataset, dataset_instrument_dict: Dict
) -> Dict:
"""Updating the instrument dictionary for the data set. For this
purpose, the current accumulated time and the creation date are
added to the dicitonary.
:param dataset: Data set containing the instrument.
:param dataset_instrument_dict: Dictionary of all instruments
with a list of the creation date of their data sets.
:return: Dictionary of the instruments, each key is the id of
the instrument and has the instrument name as value[0]
"""
try:
dataset.fetchParameters()
except Exception as e:
self._logger_dur_time.error(
"Could not fetch the full dataset. %s It will not be included in the statistic. %s",
dataset.name,
e,
)
return dataset_instrument_dict
if self.check_duration(dataset):
return dataset_instrument_dict
# If the data set has no instrument
if dataset.instrument is None or dataset.instrument == "":
if 0 not in dataset_instrument_dict:
dataset_instrument_dict[0] = ["No_Instrument"]
datasets_date_list = dataset_instrument_dict[0]
datasets_date_list.append(
(
self.sum_time_strings(self.get_general_info(dataset)),
dataset.creationDate,
dataset.operators,
)
)
dataset_instrument_dict[0] = datasets_date_list
else:
if dataset.instrument.id not in dataset_instrument_dict:
dataset_instrument_dict[dataset.instrument.id] = [
dataset.instrument.name
]
datasets_date_list = dataset_instrument_dict[dataset.instrument.id]
datasets_date_list.append(
(
self.sum_time_strings(self.get_general_info(dataset)),
dataset.creationDate,
dataset.operators,
)
)
dataset_instrument_dict[dataset.instrument.id] = datasets_date_list
return dataset_instrument_dict
[docs]
def create_statistic(self):
"""Generates the statistics for the utilization time (based on
"duration") of each instrument.
This statistic is divided into the following parts:
- Year utilization time
- Year month utilization time
- Year calendar week utilization time
- Comparison heatmap utilization time
"""
self._logger_dur_time.info(
"Starting to generate a statistical analysis of the utilization time."
)
# Get the total number of datasets with the format "BrukerNMR" and "NMR (Varian)" and the creation date between the begin and end date
datasets_total = self._logs.datasets(
DatasetRequestParameter(
creationDateFrom=self._begin_date,
creationDateTo=self._end_date,
formatIds=["BrukerNMR", "VarianNMR"],
)
).count
# Check if there are datasets with the fromat 'BrukerNMR'nand 'NMR (Varian)' in the given time frame
if datasets_total == 0:
self._logger_dur_time.info(
"No datasets with the format 'BrukerNMR' and 'NMR (Varian)' found in the given time frame."
)
return
self._logger_dur_time.info(
"Processing datasets with the format 'BrukerNMR' and 'VarianNMR' in the given time frame: begin date: %s - end date: %s.",
self._begin_date,
self._end_date,
)
instrument_dict = {}
count = 0 # Counter for the number of processed datasets
# Get all datasets with the format "BrukerNMR" and "NMR (Varian)" and the creation date between the begin and end date
for dataset in self._logs.datasets(
DatasetRequestParameter(
creationDateFrom=self._begin_date,
creationDateTo=self._end_date,
formatIds=["BrukerNMR", "VarianNMR"],
)
):
# Skip datasets with invalid creation date
tz = dataset.creationDate.tzinfo
if (
(dataset.creationDate is None)
or (datetime(1677, 9, 21, tzinfo=tz) >= dataset.creationDate)
or (dataset.creationDate >= datetime(2262, 4, 11, tzinfo=tz))
):
self._logger_dur_time.warning(
"Dataset %s has invalid creation date.: %s Dataset will not be included in the statistics.",
dataset.id,
dataset.creationDate,
)
continue
if count % 10000 == 0 and count != 0:
self._logger_dur_time.info(
"%d/%d datasets processed.",
count,
datasets_total,
)
count += 1
instrument_dict = self.update_instrument_dict(dataset, instrument_dict)
self._logger_dur_time.info(
"Finished processing datasets with the format 'BrukerNMR' and 'NMR (Varian)'."
)
# Create the statistics for the utilization time of each instrument
self._logger_dur_time.info(
"Generating reports with the statistics for the utilization time of each instrument."
)
if len(instrument_dict) == 0:
self._logger_dur_time.warning(
"There are no datasets with utilization time."
)
else:
path_instrument = self._target_path / "utilization_time" / "instrument"
path_instrument.mkdir(parents=True, exist_ok=True)
for instrument, value in instrument_dict.items():
path_instrument_folder = path_instrument / sanitize_filename(
f"instrument_{value[0]}_ID{instrument}"
)
path_instrument_folder.mkdir(parents=True, exist_ok=True)
fig_dict = self.create_plot_year_duration(
value[1:], f"instrument {value[0]} (ID: {instrument})"
)
for key, fig in fig_dict.items():
path_instrument_year = path_instrument_folder / key
path_instrument_year.mkdir(parents=True, exist_ok=True)
if fig is not None:
self.create_report(
path_instrument_year,
True,
False,
f"instrument_{value[0]}_ID{instrument}_{key}_utilization_time",
fig,
)
plt.close(fig)
fig_dict = self.create_plot_year_month_duration(
value[1:], f"instrument {value[0]} (ID: {instrument})"
)
for key, fig in fig_dict.items():
month = key.split("-")[1]
year = key.split("-")[0]
path_instrument_month = path_instrument_folder / year / "Months"
path_instrument_year.mkdir(parents=True, exist_ok=True)
if fig is not None:
self.create_report(
path_instrument_month,
True,
False,
f"instrument_{value[0]}_ID{instrument}_{month}_{year}_utilization_time",
fig,
)
plt.close(fig)
fig_dict = self.create_plot_year_calendarWeek_duration(
value[1:], f"instrument {value[0]} (ID: {instrument})"
)
for key, fig in fig_dict.items():
calendar_week = key.split("-")[1]
year = key.split("-")[0]
path_instrument_cw = path_instrument_folder / year / "CWs"
path_instrument_year.mkdir(parents=True, exist_ok=True)
if fig is not None:
self.create_report(
path_instrument_cw,
True,
False,
f"instrument_{value[0]}_ID{instrument}_CW{calendar_week}_{year}_utilization_time",
fig,
)
plt.close(fig)
fig = self.create_plot_comparison_heatmap_duration(
instrument_dict, "instruments"
)
if fig is not None:
self.create_report(
path_instrument,
True,
False,
"Utilization_of_instruments",
fig,
)
plt.close(fig)
self._logger_dur_time.info(
"Finished generating reports with the statistics for the utilization time of each instrument."
)
self._logger_dur_time.info("Finished generating statistic of utilization time.")