#!/usr/bin/env python3
import csv
import os
from datetime import datetime
from typing import List, Optional, Set, Tuple
import pandas as pd
import logging
import openpyxl
import json
from ...Utils.Exceptions import CsvReadError, ExcelReadError
from LOGS.Auxiliary.Exceptions import LOGSException
from LOGS.Entities import (
Person,
PersonRequestParameter,
Project,
ProjectRequestParameter,
Sample,
SampleRequestParameter,
CustomType,
CustomTypeRequestParameter,
)
from LOGS.LOGS import LOGS
logging.basicConfig(level=logging.INFO)
[docs]
class SampleManager:
"""This class enables the creation of samples in a LOGS instance using a
CSV file, or the export of samples from a LOGS instance into a CSV file.
"""
def __init__(
self,
logs: LOGS,
source_path: Optional[str] = None,
target_path: Optional[str] = None,
export_format: Optional[str] = ".csv",
):
"""Initialization.
:param logs: LOGS object to access the LOGS web API
:param source_path: Source path for exporting samples in logs
instance, defaults to None
:param target_path: target path for extracting samples of a logs
instance in csv file, defaults to None
"""
self.__logs = logs
self.__source_path = source_path
self.__target_path = target_path
if self.__target_path is not None:
if self.__target_path.suffix == "":
self.__target_path = os.path.join(
self.__target_path, f"samples_export{export_format}"
)
self.__export_format = export_format
self.__source_format = self.__source_path.suffix if self.__source_path else None
[docs]
def check_customtypes(self, customtype_set: Set):
"""Checks for each custom type in the custom type set whether they exist in the
LOGS instance.
:param customtype_set: Set with all custom types named in the csv file.
:return: List of CustomType objects
"""
all_customtypes = []
logs_customtype_id = []
for customtype in self.__logs.customTypes(CustomTypeRequestParameter()):
all_customtypes.append(customtype)
logs_customtype_id.append(customtype.id)
for customtype in customtype_set:
if customtype in logs_customtype_id:
continue
elif customtype is not None and customtype != "":
message = f"The custom type {customtype} does not exist in this LOGS instance. The Script is terminated."
logging.error(message)
raise ValueError(message)
[docs]
def check_projects(self, project_set: Set):
"""Checks for each project in the project set whether they exist in the
LOGS instance.
:param person_set: Set with all projects named in the csv file.
:return: List of Project objects
"""
all_projects = []
logs_project_id = []
for project in self.__logs.projects(ProjectRequestParameter()):
all_projects.append(project)
logs_project_id.append(project.id)
for project in project_set:
if project in logs_project_id:
continue
elif project is not None and project != "":
message = f"The project {project} does not exist in this LOGS instance. The Script is terminated."
logging.error(message)
raise ValueError(message)
[docs]
def check_persons(self, person_set: Set):
"""Checks for each person in the person set whether they exist in the
LOGS instance.
:param person_set: Set with all persons named in the csv file.
"""
logs_persons_id = []
for person in self.__logs.persons(PersonRequestParameter()):
logs_persons_id.append(int(person.id))
for person in person_set:
if person in logs_persons_id:
continue
elif person is not None and person != "":
message = f"The person {person} does not exist in this LOGS instance. The Script is terminated."
logging.error(message)
raise ValueError(message)
[docs]
def create_attribute_list(
self, attribute_str: str, attr_obj_list: List, check_person: bool = False
) -> List:
"""Creates a list of attributes.
:param attribute_str: List of attributes of one class type
:param attribute_class: Class of the attributes
:param check_person: Should be True, if the attr_obj_list is a
list of persons
:return: List of all attributes in attribute_str.
"""
attr_str_list = str(attribute_str).split(",")
attribute_list = []
for attr_obj in attr_obj_list:
if attr_obj is not None and attr_obj != "":
if attr_obj.id in attr_str_list:
attribute_list.append(attr_obj)
continue
if check_person:
if attr_obj.login in attr_str_list:
attribute_list.append(attr_obj)
return attribute_list
[docs]
def post_process_data(self, sample_data: pd.DataFrame) -> pd.DataFrame:
"""Post-processes the sample data after reading from CSV or Excel.
:param sample_data: DataFrame containing the sample data.
:return: Processed DataFrame.
"""
if sample_data.get("Prepared By") is not None:
for index, sample in sample_data.iterrows():
sample_data.at[index, "Prepared By"] = [
(int(x) if x.strip().lower() != "" else None)
for x in str(sample["Prepared By"]).split(",")
]
if sample_data.get("Prepared At") is not None:
for index, sample in sample_data.iterrows():
if sample["Prepared At"] != "":
sample_data.at[index, "Prepared At"] = pd.to_datetime(
sample["Prepared At"]
)
else:
sample_data.at[index, "Prepared At"] = None
for index, sample in sample_data.iterrows():
sample_data.at[index, "Projects"] = [
(int(x) if x.strip().lower() != "" else None)
for x in str(sample["Projects"]).split(",")
]
for index, sample in sample_data.iterrows():
if sample_data.at[index, "Custom Type"] == "":
sample_data.at[index, "Custom Type"] = None
else:
sample_data.at[index, "Custom Type"] = int(
sample_data.at[index, "Custom Type"]
)
return sample_data
[docs]
def read_file(self):
"""Reads the sample data file and returns a DataFrame.
:return: DataFrame containing the sample data.
"""
logging.info("Reading sample data from file: %s", self.__source_path)
if self.__source_format == ".csv":
try:
sample_data = pd.read_csv(
self.__source_path,
delimiter=";",
dtype=str,
keep_default_na=False,
quotechar='"',
)
sample_data = self.post_process_data(sample_data)
except Exception as e:
message = f"Error reading CSV file with the samples: {e}"
logging.exception(message)
raise CsvReadError(message) from e
elif self.__source_format in [".xlsx"]:
try:
sample_data = pd.read_excel(
self.__source_path,
dtype=str,
keep_default_na=False,
engine="openpyxl",
)
sample_data = self.post_process_data(sample_data)
except Exception as e:
message = f"Error reading Excel file with the samples: {e}"
logging.exception(message)
raise ExcelReadError(message) from e
else:
raise ValueError(
f"Unsupported source format: {self.__source_format}. Supported formats are: .csv, .xlsx"
)
return sample_data
[docs]
def create_samples(self):
"""Creates a sample by the given csv-file."""
sample_data = self.read_file()
logging.info("Creating samples in LOGS instance.")
bool_prepared_by = "Prepared By" in sample_data.columns
bool_prepared_at = "Prepared At" in sample_data.columns
# Check if the persons and projects exists in the LOGS instance
person_set = set()
project_set = set()
customtype_set = set()
for index, sample in sample_data.iterrows():
if bool_prepared_by:
if sample["Prepared By"] is not None:
for person in sample["Prepared By"]:
person_set.add(person)
if sample["Projects"] is not None:
for project in sample["Projects"]:
project_set.add(int(project))
if sample["Custom Type"] is not None:
customtype_set.add(int(sample["Custom Type"]))
self.check_persons(person_set)
self.check_projects(project_set)
self.check_customtypes(customtype_set)
# Create each sample
sample_count = 1
for index, sample in sample_data.iterrows():
sample_count += 1
projects = sample["Projects"]
sample_customtype = (
self.__logs.customTypes(
CustomTypeRequestParameter(ids=[int(sample["Custom Type"])])
).first()
if not pd.isna(sample["Custom Type"])
else None
)
logging.info(
"Custom Type Name: %s",
sample_customtype.name if sample_customtype else "None",
)
log_sample = self.__logs.newSample(entityOrCustomTypeOrId=sample_customtype)
log_sample.name = str(sample["Name"]).strip()
log_sample.projects = projects
if (log_sample.customType is not None) and (
sample_customtype.name
in [
"Basic",
"Sample (LOGS 3.1)",
]
):
if bool_prepared_by:
if sample["Prepared By"] is not None:
log_sample.customValues.Legacy_fields.Prepared_by = sample[
"Prepared By"
]
if bool_prepared_at:
if sample["Prepared At"] is not None:
log_sample.customValues.Legacy_fields.Prepared_on = (
datetime.fromisoformat(str(sample["Prepared At"]))
)
try:
self.__logs.create(log_sample)
logging.info("The sample in line %s has been created.", sample_count)
except LOGSException:
logging.exception(
"The sample in line %s could not be created.", sample_count
)
[docs]
def export_samples_json(self):
"""Exports samples from the LOGS instance to JSON files."""
target_dir = os.path.dirname(self.__target_path)
for sample in self.__logs.samples(SampleRequestParameter()):
sample_json = sample.toJson()
json_filename = f"sample_{sample.id}.json"
json_path = os.path.join(target_dir, json_filename)
with open(json_path, "w", encoding="utf-8") as json_file:
json.dump(sample_json, json_file, ensure_ascii=False, indent=2)
[docs]
def export_samples_csv(self):
"""Export Samples from logs."""
heading = [
"Name",
"Custom Type",
"Projects",
]
with open(self.__target_path, "w", newline="", encoding="utf-8") as file:
writer = csv.writer(
file, delimiter=";", quotechar='"', quoting=csv.QUOTE_ALL
)
writer.writerow(heading)
for sample in self.__logs.samples(SampleRequestParameter()):
projects_str = ""
if sample.projects is not None:
projects_str = ",".join(
str(project.id) for project in sample.projects
)
sample_data = [
sample.name,
sample.customType.id if sample.customType else "",
projects_str,
]
writer.writerow(sample_data)
[docs]
def export_samples_excel(self):
"""Export Samples from logs to an excel file."""
heading = [
"Name",
"Custom Type",
"Projects",
]
wb = openpyxl.Workbook()
ws = wb.active
ws.append(heading)
for sample in self.__logs.samples(SampleRequestParameter()):
projects_str = ""
if sample.projects is not None:
projects_str = ",".join(str(project.id) for project in sample.projects)
sample_data = [
sample.name,
sample.customType.id if sample.customType else "",
projects_str,
]
ws.append(sample_data)
wb.save(self.__target_path)
[docs]
def export_samples(self):
"""Exports samples from the LOGS instance to a CSV file or Excel
file."""
if self.__export_format == ".csv":
self.export_samples_csv()
elif self.__export_format == ".xlsx":
self.export_samples_excel()
else:
raise ValueError(
f"Invalid export format: {self.__export_format}. Supported formats are: .csv, .xlsx"
)
self.export_samples_json()
[docs]
def test(self):
# TODO: Delete after testing:
newSample2 = self.__logs.newSample()
newSample2.name = "Test Sample 2"
newSample2.projects = [1]
self.__logs.create(newSample2)
print("Test finished.")