Source code for LOGS_solutions.GenerateStatistics.StatisticEntities.StatisticHandlerEntities

import calendar
import logging
import math
from abc import abstractmethod
from pathlib import Path
from typing import Dict, List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from LOGS.LOGS import LOGS
from matplotlib.figure import Figure

from ..Common.CommonHandler import CommonHandler
from ..Common.FileHandler import FileHandler


[docs] class StatisticHandlerEntities(CommonHandler): """Abstract class for creating statistics.""" def __init__( self, logs: LOGS, begin_date: str, end_date: str, target_path: str, logger: logging.Logger, ): """Initialization. :param logs: LOGS object to access the LOGS web API, :param begin_date: Lowest date limit for statistics to be created. :param end_date: Highest date limit for statistics to be created. :param target_path: Path where all datasets should be saved. """ super().__init__(logs, begin_date, end_date, target_path) self._logger = logger
[docs] @abstractmethod def create_statistic(self): """A method responsible for generating statistics. This method must be implemented in all subclasses inheriting from the base class to ensure that each subclass provides its specific implementation of the statistics generation functionality. """
[docs] def create_csv_file_prep_dis( self, dictionary_prep: Dict, dictionary_dis: Dict, dictionary_prep_str: str, dictionary_dis_str: str, entity: str, statistical_unit: str, path: Path, ): """Creates a csv file of the statistics of how many entities (like samples) where prepared/created or deleted/modified by the statistical unit (person). Used for statistics for entities whose person can prepare and delete. :param dictionary_prep: Dictionary with all elements of the unit and a sorted list of the preparation/creation data for each element. dictionary_prep = {key: [entity_name, date1, date2, ...]} :param dictionary_dis: Dictionary with all elements of the unit and a sorted list of the discarded or modified data for each element. dictionary_dis = {key: [entity_name, date1, date2, ...]} :param dictionary_prep_str: String indicating whether the dictionary contains preparation or creation data, etc. e.g: "Prepared" :param dictionary_dis_str: String indicating whether the dictionary contains discarded or modified data, etc. e.g: "Discarded" :param entity: Name of the entity that is part of the statistics (e.g. dataset for dataset_count) :param statistical_unit: Unit for which the statistics were prepared (e.g. person) :param path: Path where the csv file should be saved """ column_name_prep = ( f"{dictionary_prep_str}_{entity}_Counts" # e.g. "Prepared_samples_Counts" ) column_name_dis = ( f"{dictionary_dis_str}_{entity}_Counts" # e.g. "Discarded_samples_Counts" ) if not dictionary_prep and not dictionary_dis: return dictionary_prep_dis_sorted_list = {} # Dicionary order: key: entity_id, value: [entity_name, [dates_prep], [dates_dis]] # First fill the dictionary with the preparation data for key, value in dictionary_prep.items(): if key not in dictionary_prep_dis_sorted_list: dictionary_prep_dis_sorted_list[key] = [ value[0], # entity_name value[1:], # dates of preparation [], # no discard data ] else: dictionary_prep_dis_sorted_list[key][1] = ( dictionary_prep_dis_sorted_list[key][1] + value[1:] ) # Second fill the dictionary with the discard data for key, value in dictionary_dis.items(): if key not in dictionary_prep_dis_sorted_list: dictionary_prep_dis_sorted_list[key] = [ value[0], # entity_name [], # no preparation data value[1:], # dates of discard ] else: dictionary_prep_dis_sorted_list[key][2] = ( dictionary_prep_dis_sorted_list[key][2] + value[1:] ) # Create a CSV file with the statistic of entity of current statistical unit for key, value in dictionary_prep_dis_sorted_list.items(): if len(value[1]) == 0: dates_prep = [] else: dates_prep = sorted(value[1]) if len(value[2]) == 0: dates_dis = [] else: dates_dis = sorted(value[2]) folder_name = FileHandler.clean_filename( f"{statistical_unit}_{value[0]}_ID_{key}" ) # Remove unwanted special characters and replace a space with _ entity_unit_path = Path(path) / folder_name entity_unit_path.mkdir(parents=True, exist_ok=True) entity_name = FileHandler.clean_filename(value[0]) aggregated_data_year = pd.DataFrame( columns=[ "Year", column_name_prep, ], ) aggregated_data_month = pd.DataFrame( columns=[ "Year", "Month", column_name_prep, ] ) aggregated_data_week = pd.DataFrame( columns=[ "Year", "CalendarWeek", column_name_prep, ] ) aggregated_data_day = pd.DataFrame( columns=[ "CalendarWeek", "Day", column_name_prep, ] ) # Add the preparation data if available if dates_prep: df_prep = pd.DataFrame({"Date": dates_prep}) df_prep["Date"] = pd.to_datetime(df_prep["Date"]) year, week, day_of_week = zip( *[d.isocalendar() for d in df_prep["Date"]] ) df_prep["Year"] = year df_prep["Month"] = df_prep["Date"].dt.month df_prep["CalendarWeek"] = week df_prep["Day"] = day_of_week # data aggregated by year aggregated_data_year = df_prep.groupby("Year", as_index=False).agg( **{ column_name_prep: ("Date", "count"), } ) # data aggregated by month of the year aggregated_data_month = df_prep.groupby( ["Year", "Month"], as_index=False ).agg( **{ column_name_prep: ("Date", "count"), } ) # data aggregated by calendar week of the year aggregated_data_week = df_prep.groupby( ["Year", "CalendarWeek"], as_index=False ).agg( **{ column_name_prep: ("Date", "count"), } ) # data aggregated by day of the calendar week aggregated_data_day = df_prep.groupby( ["CalendarWeek", "Day"], as_index=False ).agg( **{ column_name_prep: ("Date", "count"), } ) else: aggregated_data_year[column_name_prep] = 0 aggregated_data_month[column_name_prep] = 0 aggregated_data_week[column_name_prep] = 0 aggregated_data_day[column_name_prep] = 0 # Add the discard data if available, otherwise fill with zeros if dates_dis: df_dis = pd.DataFrame({"Date": dates_dis}) df_dis["Date"] = pd.to_datetime(df_dis["Date"]) year, week, day_of_week = zip( *[d.isocalendar() for d in df_dis["Date"]] ) df_dis["Year"] = year df_dis["Month"] = df_dis["Date"].dt.month df_dis["CalendarWeek"] = week df_dis["Day"] = day_of_week # Merge the discard data with the preparation data for year, month, calendar week and day aggregated_data_year = pd.merge( aggregated_data_year, df_dis.groupby(["Year"], as_index=False).agg( **{ column_name_dis: ("Date", "count"), } ), on=["Year"], how="outer", ) aggregated_data_month = pd.merge( aggregated_data_month, df_dis.groupby(["Year", "Month"], as_index=False).agg( **{ column_name_dis: ("Date", "count"), } ), on=["Year", "Month"], how="outer", ) aggregated_data_week = pd.merge( aggregated_data_week, df_dis.groupby(["Year", "CalendarWeek"], as_index=False).agg( **{ column_name_dis: ("Date", "count"), } ), on=["Year", "CalendarWeek"], how="outer", ) aggregated_data_day = pd.merge( aggregated_data_day, df_dis.groupby(["CalendarWeek", "Day"], as_index=False).agg( **{ column_name_dis: ("Date", "count"), } ), on=["CalendarWeek", "Day"], how="outer", ) else: aggregated_data_year[column_name_dis] = 0 aggregated_data_month[column_name_dis] = 0 aggregated_data_week[column_name_dis] = 0 aggregated_data_day[column_name_dis] = 0 # Fill NaN values with 0 and convert to integer aggregated_data_year = aggregated_data_year.fillna(0).astype(int) aggregated_data_month = aggregated_data_month.fillna(0).astype(int) aggregated_data_week = aggregated_data_week.fillna(0).astype(int) aggregated_data_day = aggregated_data_day.fillna(0).astype(int) # Save the data to a csv file entity_name_csv = entity_name.replace("_", " ") with open( entity_unit_path / f"yearly_statistics_{entity}_{statistical_unit}_{entity_name}_ID_{key}.csv", "w", encoding="utf-8", newline="", ) as f: f.write( f"yearly statistics of {entity} for {statistical_unit} {entity_name_csv} (ID:{key}) from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}\n" ) aggregated_data_year.to_csv(f, index=False, sep=";") with open( entity_unit_path / f"monthly_yearly_statistics_{entity}_{statistical_unit}_{entity_name}_ID_{key}.csv", "w", encoding="utf-8", newline="", ) as f: f.write( f"monthly yearly statistics of {entity} for {statistical_unit} {entity_name_csv} (ID:{key}) from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}\n" ) aggregated_data_month.to_csv(f, index=False, sep=";") with open( entity_unit_path / f"weekly_yearly_statistics_{entity}_{statistical_unit}_{entity_name}_ID_{key}.csv", "w", encoding="utf-8", newline="", ) as f: f.write( f"weekly yearly statistics of {entity} for {statistical_unit} {entity_name_csv} (ID:{key}) from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}\n" ) aggregated_data_week.to_csv(f, index=False, sep=";") with open( entity_unit_path / f"daily_weekly_statistics_{entity}_{statistical_unit}_{entity_name}_ID_{key}.csv", "w", encoding="utf-8", newline="", ) as f: f.write( f"daily weekly statistics of {entity} for {statistical_unit} {entity_name_csv} (ID: {key}) from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}\n" ) aggregated_data_day.to_csv(f, index=False, sep=";")
[docs] def create_csv_file( self, sorted_list: List, entity: str, statistical_unit: str, path: str ): """Creates a csv file of the statistics. :param sorted_list: sorted list with all acquisition dates of each dataset :param entity: Name of the entity that is part of the statistics (e.g. dataset for dataset_count) :param statistical_unit: Unit for which the statistics were prepared :param path: Path where the csv file should be saved """ if not sorted_list: return dates = sorted(sorted_list) df = pd.DataFrame({"Date": dates}) df["Date"] = pd.to_datetime(df["Date"]) year, week, day_of_week = zip(*[d.isocalendar() for d in df["Date"]]) df["Year"] = year df["Month"] = df["Date"].dt.month df["CalendarWeek"] = week df["Day"] = day_of_week aggregated_data_year = df.groupby("Year", as_index=False).agg( **{ f"{entity}_Counts": ("Date", "size"), } ) aggregated_data_month = df.groupby(["Year", "Month"], as_index=False).agg( **{ f"{entity}_Counts": ("Date", "size"), } ) aggregated_data_week = df.groupby(["Year", "CalendarWeek"], as_index=False).agg( **{ f"{entity}_Counts": ("Date", "size"), } ) aggregated_data_day = df.groupby(["CalendarWeek", "Day"], as_index=False).agg( **{ f"{entity}_Counts": ("Date", "size"), } ) # Save the data to a csv file statistical_unit_csv = statistical_unit.replace("_", " ") statistical_unit = FileHandler.clean_filename(statistical_unit) with open( path / f"yearly_statistics_{entity}_{statistical_unit}.csv", "w", encoding="utf-8", newline="", ) as f: f.write( f"yearly statistics of {entity} for {statistical_unit_csv} from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}\n" ) aggregated_data_year.to_csv(f, index=False, sep=";") with open( path / f"monthly_yearly_statistics_{entity}_{statistical_unit}.csv", "w", encoding="utf-8", newline="", ) as f: f.write( f"monthly yearly statistics of {entity} for {statistical_unit_csv} from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}\n" ) aggregated_data_month.to_csv(f, index=False, sep=";") with open( path / f"weekly_yearly_statistics_{entity}_{statistical_unit}.csv", "w", encoding="utf-8", newline="", ) as f: f.write( f"weekly yearly statistics of {entity} for {statistical_unit_csv} from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}\n" ) aggregated_data_week.to_csv(f, index=False, sep=";") with open( path / f"daily_weekly_statistics_{entity}_{statistical_unit}.csv", "w", encoding="utf-8", newline="", ) as f: f.write( f"daily weekly statistics of {entity} for {statistical_unit_csv} from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}\n" ) aggregated_data_day.to_csv(f, index=False, sep=";")
[docs] def create_plot_list( self, dates_list: List, entity_path: Path, entity: str, statistical_unit: str, csv_bool: bool, show_num: bool = True, ): """Creates the plots for one entity. :param dates_list: The list with the acquisition dates of the entity. :param entity_path: Path, where the statistic should be saved. :param entity: Entity on which the statistics are based, e.g.: "samples". :param statistical_unit: Entity as instrument for which the statistics are generated, e.g.: "logs_group". :param csv_bool: Boolean if the csv file should be created or not. :param show_num: Boolean if the number should be shown in the heatmap """ entity_path.mkdir(parents=True, exist_ok=True) if not dates_list: self._logger.warning( f"No data available for the statistic of {entity} of {statistical_unit}." ) return # Create html of the statistic as block diagram per year, month-year and heatmap per month-year, week-year and day-week self.create_report( entity_path, True, False, f"Statistic_{entity}_logs_{statistical_unit}_year_blockdiagram", self.create_plot_year(dates_list, f"{entity} of {statistical_unit}"), ) self.create_report( entity_path, True, False, f"Statistic_{entity}_logs_{statistical_unit}_month_blockdiagram", self.create_blockdiagram_month( dates_list, f"{entity} of {statistical_unit}" ), ) self.create_report( entity_path, True, False, f"Statistic_{entity}_logs_{statistical_unit}_month_heatmap", self.create_plot_month( dates_list, f"{entity} of {statistical_unit}", show_num ), ) self.create_report( entity_path, True, False, f"Statistic_{entity}_logs_{statistical_unit}_week_heatmap", self.create_plot_week( dates_list, f"{entity} of {statistical_unit}", show_num ), ) self.create_report( entity_path, True, False, f"Statistic_{entity}_logs_{statistical_unit}_day_heatmap", self.create_plot_day( dates_list, f"{entity} of {statistical_unit}", show_num ), ) if csv_bool: self.create_csv_file(dates_list, entity, statistical_unit, entity_path)
[docs] def create_plot_of_dict( self, dictionary: Dict, entity_path: Path, entity: str, statistical_unit: str, csv_bool: bool, show_num: bool = True, ): """Creates the plots for all entries in the dictionary. :param dictionary: The dictionary with the entries to be plotted. :param entity_path: Path, where the statistic should be saved. :param entity: Entity on which the statistics are based, e.g.: "Samples prepared" or "Samples". :param statistical_unit: Entity for which the statistics are generated, "e.g.: "person". :param csv_bool: Boolean if the csv file should be created or not. :param show_num: Boolean if the number should be shown in the heatmap """ if not dictionary: self._logger.warning( "No data available for the statistic of %s of %s.", entity, statistical_unit, ) return ### Plot statistic of entity and write it in a PDF. entity_path.mkdir(parents=True, exist_ok=True) for key, value in dictionary.items(): folder_name = FileHandler.clean_filename( f"{statistical_unit}_{value[0]}_ID_{key}" ) entity_inst_path = entity_path / folder_name entity_inst_path.mkdir(parents=True, exist_ok=True) # Create html of the statistic as block diagram per year, month-year and heatmap per month-year, week-year and day-week title = f"{entity} for {statistical_unit} {value[0]} (ID: {key})" title = title.replace("_", " ") self.create_report( entity_inst_path, True, False, f"Statistic_{entity}_{statistical_unit}_ID_{key}_year_blockdiagram", self.create_plot_year(value[1:], title), ) self.create_report( entity_inst_path, True, False, f"Statistic_{entity}_{statistical_unit}_ID_{key}_month_blockdiagram", self.create_blockdiagram_month(value[1:], title), ) self.create_report( entity_inst_path, True, False, f"Statistic_{entity}_{statistical_unit}_ID_{key}_month_heatmap", self.create_plot_month(value[1:], title, show_num), ) self.create_report( entity_inst_path, True, False, f"Statistic_{entity}_{statistical_unit}_ID_{key}_week_heatmap", self.create_plot_week(value[1:], title, show_num), ) self.create_report( entity_inst_path, True, False, f"Statistic_{entity}_{statistical_unit}_ID_{key}_day_heatmap", self.create_plot_day(value[1:], title, show_num), ) ## Create a CSV file with the statistic of current instrument if csv_bool == True: entity_name = FileHandler.clean_filename(value[0]) self.create_csv_file( value[1:], entity, f"{statistical_unit} {entity_name} (ID: {key})", entity_inst_path, )
[docs] def create_plot_instrument( self, entity_path: Path, instrument_data: Dict, cutoff: int = 0 ): """Creates the plots for the extracted data of StatisticInstruments and the question "Which and how many experiments, projects and samples were created per instrument". :param entity_path: Path, where the statistic should be saved. :param instrument_data: Dictionary with the instrument data. :param cutoff: Cutoff value for the plots. """ for instrument_id, data in instrument_data.items(): # Create html of the distribution of experiments, samples and projects per instrument self.create_report( entity_path, True, False, f"Distribution_of experiments_of_{data[0]}_ID_{instrument_id}", self.create_plot_instrument_list( instrument_id, data[0], data[3], statistic_entity="experiments", cutoff=cutoff, ), ) self.create_report( entity_path, True, False, f"Distribution_of_samples_of_{data[0]}_ID_{instrument_id}", self.create_plot_instrument_list( instrument_id, data[0], data[2], statistic_entity="samples", cutoff=cutoff, ), ) self.create_report( entity_path, True, False, f"Distribution_of_projects_of_{data[0]}_ID_{instrument_id}", self.create_plot_instrument_list( instrument_id, data[0], data[1], statistic_entity="projects", cutoff=cutoff, ), )
[docs] def create_plot_year(self, dates_list: List, entity_title: str) -> Figure: """Creates a block diagram per year. :param dates_list: list with all dates of the entity type. :param entity_title: E.g. 'datasets for instrument x'. :return: Figure with the block diagram with the title "Number of {entity_title} per year" """ dates = dates_list df = pd.DataFrame({"Date": dates}) df["Date"] = pd.to_datetime(df["Date"]) df["Year"] = df["Date"].dt.year.astype(str) counts_per_year = df.groupby("Year").size() # dynamic adjustment of the plot size num_dates = len(counts_per_year) dynamic_width = max(10, round(num_dates * 0.5)) # dynamic adjust of the y-axis ticks max_y_value = max(counts_per_year.values) max_y_value_round = (math.ceil(max_y_value / 10) * 10) if max_y_value > 0 else 1 step = (max_y_value_round // 10) if max_y_value_round > 0 else 1 if (max_y_value_round <= 15) or step == 0: step = 1 elif max_y_value_round <= 50: step = math.ceil(step / 5) * 5 else: step = math.ceil(step / 10) * 10 y_ticks = np.arange(0, max_y_value_round + 1, step) dynamic_height = 6 + len(y_ticks) * 0.3 fig, ax = plt.subplots(figsize=(dynamic_width, dynamic_height)) ax.bar( counts_per_year.index.astype(str), counts_per_year.values, width=0.8, align="center", ) ax.grid( True, which="both", axis="y", linestyle="-", linewidth=0.1, color="lightgray", ) ax.set_xlabel("Year") ax.set_ylabel(f"Number of {entity_title}") ax.set_xticks(range(len(counts_per_year.index))) ax.set_xticklabels( counts_per_year.index.astype(str), rotation=90, fontsize=10, ha="center", ) # Set the x-axis limits fixed_margin_x = 0.2 ax.set_xlim(0 - 0.4 - fixed_margin_x, num_dates - 0.4) # Set y-axis ticks in steps of step ax.set_yticks(y_ticks) ax.set_yticklabels(y_ticks) plt.title( f"Number of \n {entity_title} \n per year from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}", loc="center", ) plt.close(fig) return fig
[docs] def create_blockdiagram_month(self, dates_list: List, entity_title: str) -> Figure: """Creates a block diagram per month. :param dates_list: list with all dates of the entity type. :param entity_title: E.g. 'datasets for instrument x'. :return: Figure with the block diagram with the title "Number of {entity_title} per month and year" """ dates = dates_list df = pd.DataFrame({"Date": dates}) df["Date"] = pd.to_datetime(df["Date"]) df["Date"] = df["Date"].dt.tz_localize(None) df["YearMonth"] = df["Date"].dt.to_period("M") counts_per_year_month = df.groupby("YearMonth").size() # dynamic adjustment of the plot size num_dates = len(counts_per_year_month) dynamic_width = max(10, round(num_dates * 0.5)) # dynamic adjust of the y-axis ticks max_y_value = max(counts_per_year_month.values) max_y_value_round = (math.ceil(max_y_value / 10) * 10) if max_y_value > 0 else 1 step = (max_y_value_round // 10) if max_y_value_round > 0 else 1 if (max_y_value_round <= 15) or step == 0: step = 1 elif max_y_value_round <= 50: step = math.ceil(step / 5) * 5 else: step = math.ceil(step / 10) * 10 y_ticks = np.arange(0, max_y_value_round + 1, step) dynamic_height = 6 + len(y_ticks) * 0.3 fig, ax = plt.subplots(figsize=(dynamic_width, dynamic_height)) ax.bar( counts_per_year_month.index.astype(str), counts_per_year_month.values, width=0.8, align="center", ) ax.grid( True, which="both", axis="y", linestyle="-", linewidth=0.1, color="lightgray", ) ax.set_xlabel("Year-Month") ax.set_ylabel(f"Number of {entity_title}") ax.set_xticks(range(len(counts_per_year_month.index))) ax.set_xticklabels( counts_per_year_month.index.astype(str), rotation=90, fontsize=10, ha="center", ) # Set the x-axis limits fixed_margin_x = 0.2 ax.set_xlim(0 - 0.4 - fixed_margin_x, num_dates - 0.4) # Set y-axis ticks in steps of step ax.set_yticks(y_ticks) ax.set_yticklabels(y_ticks) plt.title( f"Number of \n {entity_title} \n per month and year from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}", loc="center", ) plt.close(fig) return fig
[docs] def create_plot_month( self, dates_list: List, entity_title: str, show_num: bool = True ) -> Figure: """Creates a heatmap per month in a year. :param dates_list: list with all dates of the entity type :param entity_title: E.g. 'datasets for instrument x' :param show_num: Boolean if the number should be shown in the heatmap :return: Figure with the heatmap with the title "Number of {entity_title} per month and year" """ dates = dates_list df = pd.DataFrame({"Date": dates}) df["Date"] = pd.to_datetime(df["Date"]) df["Year"] = df["Date"].dt.year df["Month"] = df["Date"].dt.month heatmap_data = df.groupby(["Year", "Month"]).size().reset_index(name="Counts") heatmap_data_pivot = heatmap_data.pivot( index="Year", columns="Month", values="Counts" ).fillna(0) # Set the x and y ticks dynamically dynamic_width = max(10, len(heatmap_data_pivot.columns)) dynamic_height = max(6, len(heatmap_data_pivot.index)) fig, ax = plt.subplots(figsize=(dynamic_width, dynamic_height)) cax = ax.matshow(heatmap_data_pivot, cmap="coolwarm") if show_num: for (i, j), val in np.ndenumerate(heatmap_data_pivot): if val != 0: ax.text(j, i, int(val), ha="center", va="center", color="black") month_labels = [calendar.month_abbr[i] for i in heatmap_data_pivot.columns] plt.xticks( np.arange(len(heatmap_data_pivot.columns)), month_labels, rotation=45 ) plt.yticks(np.arange(len(heatmap_data_pivot.index)), heatmap_data_pivot.index) plt.colorbar(cax) plt.title( f"Number of \n {entity_title} \n per month and year from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}", loc="center", ) plt.xlabel("Month") plt.ylabel("Year") plt.close(fig) return fig
[docs] def create_plot_week( self, dates_list: List, entity_title: str, show_num: bool = True ) -> Figure: """Creates a heat map with the number of data records within a calendar week per year. ::param dates_list: list with all dates of the entity type :param title: E.g. 'datasets for instrument x' :param show_num: Boolean if the number should be shown in the heatmap :return: Figure with the heatmap with title "Number of {entity_type} per calendar week and year" """ dates = dates_list df = pd.DataFrame({"Date": dates}) df["Date"] = pd.to_datetime(df["Date"]) year, week, day_of_week = zip(*[d.isocalendar() for d in df["Date"]]) df["Year"] = year df["CalendarWeek"] = week heatmap_data = ( df.groupby(["Year", "CalendarWeek"]).size().reset_index(name="Counts") ) heatmap_data_pivot = heatmap_data.pivot( index="Year", columns="CalendarWeek", values="Counts" ).fillna(0) num_years = len(heatmap_data_pivot.index) # Dynamic adjustment of the plot size dynamic_height = max(6, num_years) dynamic_width = max(12, len(heatmap_data_pivot.columns)) fig, ax = plt.subplots(figsize=(dynamic_width, dynamic_height)) cax = ax.matshow(heatmap_data_pivot, cmap="coolwarm") if show_num: for (i, j), val in np.ndenumerate(heatmap_data_pivot): if val != 0: ax.text(j, i, int(val), ha="center", va="center", color="black") week_labels = ["CW " + str(week) for week in heatmap_data_pivot.columns] plt.xticks(np.arange(len(heatmap_data_pivot.columns)), week_labels, rotation=90) plt.yticks(np.arange(len(heatmap_data_pivot.index)), heatmap_data_pivot.index) plt.colorbar(cax) plt.title( f"Number of \n {entity_title} \n per calendar week and year from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}", loc="center", ) plt.ylabel("Year") plt.xlabel("Calendar Week") plt.close(fig) return fig
[docs] def create_plot_day( self, dates_list: List, entity_title: str, show_num: bool = True ) -> Figure: """Creates a heat map with the number of the entitiy within a calendar week per month over all years. :param dates_list: list with all dates of the entity type :param entity_title: E.g. 'datasets for instrument x' :param show_num: Boolean if the number should be shown in the heatmap :return: Figure with the heatmap with title "Number of {entity_title} per day and week over all months and years" """ dates = dates_list days_of_week_dict = { 1: "Mon", 2: "Tue", 3: "Wed", 4: "Thu", 5: "Fri", 6: "Sat", 7: "Sun", } df = pd.DataFrame({"Date": dates}) df["Date"] = pd.to_datetime(df["Date"]) year, week, day_of_week = zip(*[d.isocalendar() for d in df["Date"]]) df["Week"] = week df["Day"] = day_of_week # week per month over all years heatmap_data = df.groupby(["Week", "Day"]).size().reset_index(name="Counts") heatmap_data_pivot = heatmap_data.pivot( index="Day", columns="Week", values="Counts" ).fillna(0) # Dynamic adjustment of the plot size dynamic_height = max(6, len(heatmap_data_pivot.index)) dynamic_width = max(12, len(heatmap_data_pivot.columns)) fig, ax = plt.subplots(figsize=(dynamic_width, dynamic_height)) cax = ax.matshow(heatmap_data_pivot, cmap="coolwarm") if show_num: for (i, j), val in np.ndenumerate(heatmap_data_pivot): if val != 0: ax.text(j, i, int(val), ha="center", va="center", color="black") week_labels = ["CW " + str(week) for week in heatmap_data_pivot.columns] plt.xticks(np.arange(len(heatmap_data_pivot.columns)), week_labels, rotation=90) plt.yticks( np.arange(len(heatmap_data_pivot.index)), heatmap_data_pivot.index.map(days_of_week_dict), ) plt.colorbar(cax) plt.title( f"Number of \n {entity_title} \n per day and week over all months and years from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}", loc="center", ) plt.ylabel("Day") plt.xlabel("Calendar Week") plt.close(fig) return fig
[docs] def create_plot_instrument_list( self, instrument_id: int, instrument_name: str, data, statistic_entity: str, cutoff: int = 0, ) -> Figure: """Creates a pie chart for the distribution of the statistic entity (e.g. Distribution of experiments of instrument x). :param instrument_id: ID of the instrument. :param instrument_name: Name of the instrument. :param data: Data for the distribution. :param statistic_entity: Entity for which the distribution is created. (e.g. "experiments") :param cutoff: Cutoff value for the distribution. :return: Figure with the pie chart with the title "Distribution of {statistic_entity} of {instrument_name} (ID: {instrument_id}) with cutoff {cutoff}" or "Distribution of {statistic_entity} of {instrument_name} (ID: {instrument_id})" """ total_number = 0 for key, value in data.items(): total_number += value[1] filtered_data = {k: v for k, v in data.items() if v[1] >= cutoff} if not filtered_data: if cutoff > 0: information_text = f"No data available for the distribution of {statistic_entity} of {instrument_name} (ID: {instrument_id}) with cutoff {cutoff} from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}" else: information_text = f"No data available for the distribution of {statistic_entity} of {instrument_name} (ID: {instrument_id}) from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}" fig, ax = plt.subplots() ax.text( 0.5, 0.5, information_text, horizontalalignment="center", verticalalignment="center", transform=ax.transAxes, ) ax.axis("off") plt.close(fig) return fig labels = [v[0] for v in filtered_data.values()] sizes = [v[1] for v in filtered_data.values()] sorted_data = sorted(zip(sizes, labels), reverse=True) sorted_sizes, sorted_labels = zip(*sorted_data) fig, ax = plt.subplots() wedges, texts = ax.pie(sorted_sizes, startangle=90) ax.axis("equal") # cut off test, if wanted include cutoff parameter in function legend_labels = [ f"{label}: {size} ({round(size / total_number*100, 2)}%)" for label, size in zip(sorted_labels, sorted_sizes) if size >= cutoff ] plt.legend( wedges, legend_labels, title="Categories", loc="upper center", bbox_to_anchor=(0.5, -0.1), ncol=4, fontsize="small", ) if cutoff > 0: plt.title( f"Distribution of {statistic_entity} of {instrument_name} (ID: {instrument_id}) with cutoff {cutoff} from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}" ) else: plt.title( f"Distribution of {statistic_entity} of {instrument_name} (ID: {instrument_id}) from {self._begin_date.strftime('%d/%B/%Y')} to {self._end_date.strftime('%d/%B/%Y')}" ) plt.close(fig) return fig