Source code for LOGS_solutions.GenerateDatasetReport.DatasetReportGenerator

#!/usr/bin/env python3

import base64
import io
import logging
import re
import sys
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Tuple

import jinja2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import qrcode
from LOGS.Entities import (
    Dataset,
    ProjectRequestParameter,
    TrackImage,
    TrackMatrix,
    TrackTable,
    TrackXY,
    TrackXYComplex,
)
from LOGS.LOGS import LOGS
from LOGS.Parameters.ParameterList import ParameterList
from LOGS.Parameters.ParameterTable import ParameterTable
from matplotlib.figure import Figure
from matplotlib.ticker import ScalarFormatter
from pdf2image import convert_from_path
from PIL import Image

from .Common.PathValidator import PathValidator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


[docs] class DatasetReportGenerator: """Generates a Report of one datasets.""" def __init__( self, logs: LOGS, dataset_id: int, target_path: str, param_category: Dict[str, str] = None, excluded_tracks: List[str] = [], separate_complex_plots: bool = True, ): """Initialization. :param logs: LOGS object to access the LOGS web API :param dataset_id: The id of the dataset for which the report will be created. :param target_path: Path, where the report should be stored, if None the directory of the script will be used as target path. :param param_category: Dictionary with the categories which should be included in the report, if empty all categories will be included. :param excluded_tracks: List with the names of the tracks which should be excluded in the report, if empty all tracks will be included. :param separate_complex_plots: If True, the complex plots will be separated into real and imaginary parts. """ self.__logs = logs self.__dataset_id = dataset_id self._dataset = self.__logs.dataset(self.__dataset_id) self._dataset.fetchFull() self.__param_category = self._validate_param_category(param_category) self.__excluded_tracks = self._validate_excluded_tracks(excluded_tracks) self.__separate_complex_plots = separate_complex_plots self.__target_path = PathValidator.validate_path(target_path) # Path to the folder with the templates self.__template_path = Path(__file__).parent / "Common" / "templates" # Path to the folder where the created HTML files should be saved self.__html_path = self.__target_path / "html" self.__url = self.get_base_url( self.__logs.apiUrl + f"/#data/{self.__dataset_id}" ) # self.__url = "https://logs.sciy.com/" # Use this for presentations
[docs] def validate_dataset(self, dataset: Dataset) -> bool: """Validates the given dataset. :param dataset: The dataset to validate. :return: True if the dataset ID is valid, False otherwise. """ if not self._dataset: logger.error(("Could not find dataset with ID ", self.__dataset_id)) sys.exit(1) if dataset.parsingState == "ParsingFailed": logger.error( "Dataset with ID %s could not be parsed. No report will be created.", dataset.id, ) sys.exit(1)
def _validate_param_category( self, param_category: Dict[str, str] ) -> Dict[str, str]: """Validates that the given param_category parameter is a dictionary. :param param_category: The param_category parameter to validate. :return: If param_category is a dictionary, it will be returned. If not, an None will be returned. """ if param_category is not None and isinstance(param_category, dict): for key, value in param_category.items(): if not isinstance(value, list): raise TypeError( f"The value of the param_category {key} has to be a list, but got {type(param_category[key])}." ) return param_category else: logger.warning( "The param_category is not a dictionary or is not set. All categories will be included in the report." ) return None def _validate_excluded_tracks(self, excluded_tracks: List[str]) -> List[str]: """Validates that the excluded_tracks parameter is a List. :param excluded_tracks: The excluded_tracks parameter to validate. :return: If excluded_tracks is a list, it will be returned. If not, an empty list will be returned. """ if excluded_tracks is not None and isinstance(excluded_tracks, list): return excluded_tracks else: logger.warning( "The excluded_tracks is not a list or is not set. All tracks will be included in the report." ) return []
[docs] def get_base_url(self, api_url: str) -> str: """Returns the URL of the dataset. :param api_url: The API-URL of the dataset. :return: The URL of the dataset. """ base_url = re.sub(r"/api/[^/]*", "", api_url) return base_url
[docs] def plot_xy_complex( self, formatter_axis: ScalarFormatter, track: TrackXYComplex, sep_part: int = None, ) -> Figure: """Generates a 1D plot of one XY_complex track. :param formatter_axis: ScalarFormatter of matplotlib for controlling the scaling and display of numerical values on the axes of a plot. :param track: XY_real track :param sep_part: 0 for real part, 1 for imaginary part, None for both parts :return: The figure of the plot. """ formatter = ScalarFormatter(useMathText=True) formatter.set_powerlimits((-2, 2)) x_inv = track.settings.zoom.x[1] - track.settings.zoom.x[0] < 0 y_inv = track.settings.zoom.y[1] - track.settings.zoom.y[0] < 0 # Plot the real part of the complex track if sep_part == 0: fig_re = plt.figure() dpi = fig_re.get_dpi() width = 600 height = 400 fig_re.set_size_inches( float(width) / float(dpi), float(height) / float(dpi) ) ax = plt.subplot(1, 1, 1) x_label = "" y_label = "" if track and track.settings.axisLabels: x_label = ( track.settings.axisLabels.x if track.settings.axisLabels.x else "" ) y_label = ( track.settings.axisLabels.y if track.settings.axisLabels.y else "" ) if track.settings.axisUnits: if track.settings.axisUnits.x: x_label += "(" + track.settings.axisUnits.x + ")" if track.settings.axisUnits.y: y_label += "(" + track.settings.axisUnits.y + ")" ax.set_xlabel(x_label, fontsize=10) ax.set_ylabel(y_label, fontsize=10) ax.xaxis.set_major_formatter(formatter_axis) ax.yaxis.set_major_formatter(formatter_axis) if track is None: ax.set_facecolor("red") else: if track.datatracks: track.fetchFull() if track.datatracks.x and track.datatracks.re: if x_inv: ax.invert_xaxis() # TODO: Debug if y_inv: ax.invert_yaxis() # TODO: Debug ax.xaxis.set_major_formatter(formatter) ax.yaxis.set_major_formatter(formatter) ax.plot( track.datatracks.x.data, track.datatracks.re.data, label=track.datatracks.re.id, linewidth=1, ) ax.legend() ax.set_title(f"{track.name} re") return fig_re # Plot the imaginary part of the complex track if sep_part == 1: fig_im = plt.figure() dpi = fig_im.get_dpi() width = 600 height = 400 fig_im.set_size_inches( float(width) / float(dpi), float(height) / float(dpi) ) ax = plt.subplot(1, 1, 1) x_label = "" y_label = "" if track and track.settings.axisLabels: x_label = ( track.settings.axisLabels.x if track.settings.axisLabels.x else "" ) y_label = ( track.settings.axisLabels.y if track.settings.axisLabels.y else "" ) if track.settings.axisUnits: if track.settings.axisUnits.x: x_label += "(" + track.settings.axisUnits.x + ")" if track.settings.axisUnits.y: y_label += "(" + track.settings.axisUnits.y + ")" ax.set_xlabel(x_label, fontsize=10) ax.set_ylabel(y_label, fontsize=10) ax.xaxis.set_major_formatter(formatter_axis) ax.yaxis.set_major_formatter(formatter_axis) if track is None: ax.set_facecolor("red") else: if track.datatracks: track.fetchFull() if track.datatracks.x and track.datatracks.im: if x_inv: ax.invert_xaxis() # TODO: Debug if y_inv: ax.invert_yaxis() # TODO: Debug ax.xaxis.set_major_formatter(formatter) ax.yaxis.set_major_formatter(formatter) ax.plot( track.datatracks.x.data, track.datatracks.im.data, label=track.datatracks.im.id, linewidth=1, ) ax.legend() ax.set_title(f"{track.name} im") return fig_im # Plot both parts of the complex track else: fig = plt.figure() dpi = fig.get_dpi() width = 600 height = 400 fig.set_size_inches(float(width) / float(dpi), float(height) / float(dpi)) ax = plt.subplot(1, 1, 1) x_label = "" y_label = "" if track and track.settings.axisLabels: x_label = ( track.settings.axisLabels.x if track.settings.axisLabels.x else "" ) y_label = ( track.settings.axisLabels.y if track.settings.axisLabels.y else "" ) if track.settings.axisUnits: if track.settings.axisUnits.x: x_label += "(" + track.settings.axisUnits.x + ")" if track.settings.axisUnits.y: y_label += "(" + track.settings.axisUnits.y + ")" ax.set_xlabel(x_label, fontsize=10) ax.set_ylabel(y_label, fontsize=10) ax.xaxis.set_major_formatter(formatter_axis) ax.yaxis.set_major_formatter(formatter_axis) if track is None: ax.set_facecolor("red") else: if track.datatracks: track.fetchFull() if ( track.datatracks.x and track.datatracks.re and track.datatracks.im ): if x_inv: ax.invert_xaxis() # TODO: Debug if y_inv: ax.invert_yaxis() # TODO: Debug ax.xaxis.set_major_formatter(formatter) ax.yaxis.set_major_formatter(formatter) ax.plot( track.datatracks.x.data, track.datatracks.re.data, label=track.datatracks.re.id, linewidth=1, ) ax.plot( track.datatracks.x.data, track.datatracks.im.data, label=track.datatracks.im.id, linewidth=1, ) ax.legend() ax.set_title(track.name) return fig
[docs] def plot_xy_real(self, formatter_axis: ScalarFormatter, track: TrackXY) -> Figure: """Generates a 1D plot of one XY_real track. :param formatter_axis: ScalarFormatter of matplotlib for controlling the scaling and display of numerical values on the axes of a plot. :param track: XY_real track :param invert_axes: If True, the axes will be inverted (both will be descending) :return: The figure of the plot. """ formatter = ScalarFormatter(useMathText=True) formatter.set_powerlimits((-2, 2)) x_inv = track.settings.zoom.x[1] - track.settings.zoom.x[0] < 0 y_inv = track.settings.zoom.y[1] - track.settings.zoom.y[0] < 0 fig = plt.figure() dpi = fig.get_dpi() width = 600 height = 400 fig.set_size_inches(float(width) / float(dpi), float(height) / float(dpi)) ax = plt.subplot(1, 1, 1) x_label = "" y_label = "" if track and track.settings.axisLabels: x_label = track.settings.axisLabels.x if track.settings.axisLabels.x else "" y_label = track.settings.axisLabels.y if track.settings.axisLabels.y else "" if track.settings.axisUnits: if track.settings.axisUnits.x: x_label += "(" + track.settings.axisUnits.x + ")" if track.settings.axisUnits.y: y_label += "(" + track.settings.axisUnits.y + ")" ax.set_xlabel(x_label, fontsize=10) ax.set_ylabel(y_label, fontsize=10) ax.xaxis.set_major_formatter(formatter_axis) ax.yaxis.set_major_formatter(formatter_axis) if track is None: ax.set_facecolor("red") else: if track.datatracks: track.fetchFull() if track.datatracks.x and track.datatracks.y: if x_inv: ax.invert_xaxis() if y_inv: ax.invert_yaxis() ax.xaxis.set_major_formatter(formatter) ax.yaxis.set_major_formatter(formatter) ax.plot( track.datatracks.x.data, track.datatracks.y.data, label=track.name, linewidth=1, ) ax.legend() ax.set_title(track.name) return fig
[docs] def plot_matrix_real(self, track: TrackMatrix) -> Figure: """Generates a Heatmap of one matrix track. :param track: TrackMatrix track :return: The figure of the heatmap. """ track.fetchFull() data = track.datatracks.matrix.data data = data.T formatter = ScalarFormatter(useMathText=True) formatter.set_powerlimits((-2, 2)) x_inv = track.settings.zoom.x[1] - track.settings.zoom.x[0] < 0 y_inv = track.settings.zoom.y[1] - track.settings.zoom.y[0] < 0 if x_inv: data = np.fliplr(data) if y_inv: data = np.flipud(data) x_min, y_min, z_min = track.datatracks.matrix.min x_max, y_max, z_max = track.datatracks.matrix.max fig, ax = plt.subplots(figsize=(10, 8)) x_label = "" y_label = "" if track and track.settings.axisLabels: x_label = track.settings.axisLabels.x if track.settings.axisLabels.x else "" y_label = track.settings.axisLabels.y if track.settings.axisLabels.y else "" if track.settings.axisUnits: if track.settings.axisUnits.x: x_label += "(" + track.settings.axisUnits.x + ")" if track.settings.axisUnits.y: y_label += "(" + track.settings.axisUnits.y + ")" ax.set_xlabel(x_label, fontsize=10) ax.set_ylabel(y_label, fontsize=10) ax.xaxis.set_major_formatter(formatter) ax.yaxis.set_major_formatter(formatter) ax_ext = ([x_max, x_min] if x_inv else [x_min, x_max]) + ( [y_max, y_min] if y_inv else [y_min, y_max] ) cax = ax.imshow( data, extent=ax_ext, origin="lower", cmap="RdBu", vmin=z_min, vmax=z_max, interpolation="nearest", aspect="auto", ) fig.colorbar(cax) ax.set_title(track.name) return fig
[docs] def plot_table(self, track: TrackTable) -> Figure: """Generates a table of one table track. :param track: table track :return: The figure of the table. """ track.datatracks.table.fetchFull() data = track.datatracks.table.data.toDict() # Extracting the table data. table_data = [] for cell in data["cells"]: row = cell["row"] col = cell["column"] value = cell["value"] while len(table_data) <= row: table_data.append([]) while len(table_data[row]) <= col: table_data[row].append("") table_data[row][col] = value # Adding the title as the first row. title_row = [track.name] + [""] * (len(table_data[0]) - 1) table_data.insert(0, title_row) df = pd.DataFrame(table_data) fig, ax = plt.subplots() ax.axis("off") table = ax.table( cellText=df.values, # colLabels=df.columns, loc="center", cellLoc="center", ) table.auto_set_column_width(col=list(range(len(df.columns)))) # Removes empty columns from the header row. for col in range(1, len(df.columns)): table[0, col].set_visible(False) return fig
[docs] def plot_image(self, track: TrackImage) -> Figure: """Generates the image of one image track. :param track: Image track :return: The figure of the image. """ track.fetchFull() image_np = np.array(track.datatracks.image.data) fig, ax = plt.subplots() ax.imshow(image_np) ax.axis("off") ax.set_title(track.name, fontsize=12) return fig
[docs] def get_graph( self, dataset: Dataset, ) -> Tuple[List, Dict[str, int]]: """Generates a list of images using matplotlib and a dictionary containing the counts of the tracks and their types. Images are returned in a form suitable for websites or other text-based environments (Base64-encoded). :param dataset: dataset of the track :param prefix: first part of the file name :return: Tuple with the list of images and a dictionary with the number of track types """ if not dataset.tracks: return # Count the number of tracks and their types track_number = len(dataset.tracks) track_types = {"Total": track_number} for i in range(track_number): track_type = str(dataset.tracks[i].type) if track_type == "image": track_types["Image"] = track_types.get("Image", 0) + 1 elif track_type == "pdf": track_types["PDF"] = track_types.get("PDF", 0) + 1 elif track_type == "matrix_real": track_types["2D"] = track_types.get("2D", 0) + 1 elif track_type == "XY_real" or track_type == "XY_complex": track_types["1D"] = track_types.get("1D", 0) + 1 elif track_type == "table": track_types["Table"] = track_types.get("Table", 0) + 1 elif track_type == "nucleotide_sequence": track_types["Nucleotide Sequence"] = ( track_types.get("Nucleotide Sequence", 0) + 1 ) elif track_type == "molecule_compound": track_types["Molecule Compound"] = ( track_types.get("Molecule Compound", 0) + 1 ) else: track_types["Others"] = track_types.get("Others", 0) + 1 if track_number < 1: return images = [] formatter_axis = ScalarFormatter(useOffset=True, useMathText=True) formatter_axis.set_scientific(True) formatter_axis.set_powerlimits((-1, 1)) for i in range(0, track_number): track = dataset.tracks[i] if track.name in self.__excluded_tracks: logger.info("Track %s is excluded from the report.", track.name) continue elif track.type == "nucleotide_sequence": logger.warning( "Nucleotide sequence is currently not supported. %s", track.name ) continue elif track.type == "matrix_real": try: fig = self.plot_matrix_real(track) except Exception as e: logger.error( "Could not plot the matrix track %s. Error: %s", track.name, str(e), ) continue elif track.type == "pdf": temporary_directory = tempfile.TemporaryDirectory() temp_dir = temporary_directory.name try: dataset.download(directory=temp_dir, overwrite=True) zip_file_name = f"{dataset.name}.zip" zip_path = Path(temp_dir) / zip_file_name with zipfile.ZipFile(zip_path, "r") as zip_ref: for member in zip_ref.namelist(): if track.name.split(".")[0] == member.split(".")[0]: pdf_path = Path(temp_dir) / member zip_ref.extract(member, temp_dir) images_from_path = convert_from_path( pdf_path, output_folder=temp_dir ) for image in images_from_path: buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode( buffered.getvalue() ).decode("utf-8") images.append( { "image": "data:image/png;base64,%s" % img_str, "size": image.size, "name": f"{track.name}_{member.split('.')[0]}", } ) else: logger.warning( "Image is currently not supported. %s", track.name ) except Exception as e: logger.error( "Could not download the PDF from the dataset %s. Error: %s", dataset.name, str(e), ) finally: temporary_directory.cleanup() continue elif track.type == "XY_complex": if self.__separate_complex_plots: try: fig_complex_spep0 = self.plot_xy_complex( formatter_axis, track, sep_part=0 ) pic_IObytes = io.BytesIO() fig_complex_spep0.savefig( pic_IObytes, format="png", bbox_inches="tight", dpi=100 ) pic_IObytes.seek(0) image = Image.open(pic_IObytes) width, height = image.size pic_IObytes.seek(0) pic_hash = base64.b64encode(pic_IObytes.read()).decode("utf-8") images.append( { "image": f"data:image/png;base64,{pic_hash}", "size": (width, height), "name": f"{track.name}_re", } ) plt.close() except Exception as e: logger.error( "Could not plot the real part of the complex track %s. Error: %s", track.name, str(e), ) try: fig_complex_sep1 = self.plot_xy_complex( formatter_axis, track, sep_part=1 ) pic_IObytes = io.BytesIO() fig_complex_sep1.savefig( pic_IObytes, format="png", bbox_inches="tight", dpi=100 ) pic_IObytes.seek(0) image = Image.open(pic_IObytes) width, height = image.size pic_IObytes.seek(0) pic_hash = base64.b64encode(pic_IObytes.read()).decode("utf-8") images.append( { "image": f"data:image/png;base64,{pic_hash}", "size": (width, height), "name": f"{track.name}_im", } ) plt.close() except Exception as e: logger.error( "Could not plot the imaginary part of the complex track %s. Error: %s", track.name, str(e), ) continue else: try: fig = self.plot_xy_complex(formatter_axis, track) except Exception as e: logger.error( "Could not plot the complex track %s. Error: %s", track.name, str(e), ) continue elif track.type == "XY_real": fig = self.plot_xy_real(formatter_axis, track) elif track.type == "table": fig = self.plot_table(track) elif track.type == "image": fig = self.plot_image(track) else: logger.warning( "Track type %s is not supported for plotting. Skipping track %s.", track.type, track.name, ) continue pic_IObytes = io.BytesIO() fig.savefig(pic_IObytes, format="png", bbox_inches="tight", dpi=100) plt.close() pic_IObytes.seek(0) image = Image.open(pic_IObytes) width, height = image.size pic_IObytes.seek(0) pic_hash = base64.b64encode(pic_IObytes.read()).decode("utf-8") images.append( { "image": f"data:image/png;base64,{pic_hash}", "size": (width, height), "name": track.name, } ) return images, track_types
[docs] def generate_qr_code(self) -> Path: """Generates a QR code for the dataset. Used in the HTML report. :param dataset_id: The ID of the dataset for which the QR code should be generated. """ qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(self.__url) qr.make(fit=True) img = qr.make_image(fill_color="#323332", back_color="white") tmp_dir_name = tempfile.mkdtemp() img_path = Path(tmp_dir_name) / "qr_code.png" with open(img_path, "wb") as bf: img.save(bf) return img_path
[docs] def read_parameter_table(self, param_table: ParameterTable) -> Dict[str, Any]: """Reads the parameter table and returns a dictionary with the parameters. :param param_table: The parameter table to read. :return: A dictionary with the parameters. """ param_table_dict = {} table_dict = {} table_dict["table"] = param_table.table table_dict["columnDecimals"] = param_table.columnDecimals table_dict["columnNames"] = param_table.columnNames table_dict["columnNumber"] = param_table.columnNumber table_dict["columnTypes"] = param_table.columnTypes param_table_dict["logsTable"] = table_dict return param_table_dict
[docs] def read_parameter_list( self, param_list: ParameterList, round_num: int = 0, param_prev: str = None ) -> Dict[str, Any]: """Reads the parameter list and returns a dictionary with the parameters. :param param_list: The parameter list to read. :param round_num: The current round number, used for recursive calls. :return: A dictionary with the parameters. """ def param_name_rename(name: str): """Renames the parameter name if it already exists in the dictionary. :param name: The name of the parameter. :return: The new name of the parameter. """ if name in parameter_dict: name = name + " " return param_name_rename(name) else: return name parameter_dict = {} for param in param_list: if self.__param_category is not None: if param.name not in self.__param_category and round_num == 0: logger.info("Parameter excluded: %s", param.name) continue if param_prev is not None and round_num == 1: if ( param.name not in self.__param_category[param_prev] and self.__param_category[param_prev] != [] ): logger.info( "Parameter excluded: %s\\%s", param_prev, param.name ) continue param_name = param_name_rename(param.name) if isinstance(param, ParameterList): round2 = round_num + 1 parameter_dict[param_name] = self.read_parameter_list( param.content, round2, param_name ) elif isinstance(param, ParameterTable): parameter_dict[param_name] = self.read_parameter_table(param) else: if param.unit is not None: parameter_dict[param_name] = ( str(param.value) + " " + str(param.unit) ) else: parameter_dict[param_name] = param.value return parameter_dict
[docs] def create_datasets_report(self): """Creates the reports of the datasets.""" # Initialize the page dictionary to store the report data page: Dict[str, Any] = {} # Get id and title of the dataset page["id"] = self.__dataset_id page["title"] = self._dataset.name # Get the dataset information page["method"] = ( self._dataset.method.name if self._dataset.method and self._dataset.method.name else "" ) if self._dataset.projects: projects = [ project.name for project in self.__logs.projects( ProjectRequestParameter(ids=[p.id for p in self._dataset.projects]) ) ] else: projects = [] if projects and len(projects) > 0: page["projects"] = projects if self._dataset.sample and self._dataset.sample.name: page["sample"] = self._dataset.sample.name # Get the plots and the number of tracks and track types track_number = len(self._dataset.tracks) if self._dataset.tracks else 0 if track_number > 0: page["images"], track_types = self.get_graph(self._dataset) else: page["images"] = [] track_types = {} # Get the dataset parameters self._dataset.fetchParameterTree() page["parameter"] = self.read_parameter_list( self._dataset.parameterTree.content ) ### Create the HTML report self.__html_path.mkdir(parents=True, exist_ok=True) template_loader = jinja2.FileSystemLoader(self.__template_path) template_env = jinja2.Environment(loader=template_loader) style_path = self.__template_path / "static", "style.css" template = template_env.get_template("report.jinja2") html_path = self.__html_path / f"report_ID{self.__dataset_id}.html" logo_path = self.__template_path / "python-solutions-icon+logo-screen-white.png" icon_path = self.__template_path / "icon-python-solutions-screen-filled.png" with open(logo_path, "rb") as img: encoded_string = base64.b64encode(img.read()).decode("utf-8") page["logo"] = f"data:image/png;base64,{encoded_string}" with open(icon_path, "rb") as img: encoded_string = base64.b64encode(img.read()).decode("utf-8") page["icon"] = f"data:image/png;base64,{encoded_string}" with open(self.generate_qr_code(), "rb") as img: encoded_string = base64.b64encode(img.read()).decode("utf-8") page["qr_code"] = f"data:image/png;base64,{encoded_string}" with open(html_path, "w", encoding="utf-8") as bf: bf.write( template.render( date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), page=page, track_types=track_types, url=self.__url, style_path=style_path, ) ) logger.info("Report for dataset %s created successfully.", self.__dataset_id) logger.info("Report path: %s", html_path)