st-ten-1/src/lib/helpers/recipe_manager.py

537 lines
28 KiB
Python

import os
import csv
import locale
from datetime import datetime
import shutil
import re
from PyQt5.QtCore import pyqtSignal, QObject
from PyQt5.QtWidgets import QFileDialog
from lib.db import Recipes, db # Assuming these are part of your project structure
class RecipeManagerSignals(QObject):
recipes_imported = pyqtSignal()
recipe_manager_signals = RecipeManagerSignals()
def read_steps(row, config, defaults=None, unsupported_steps=None):
if defaults is None:
defaults = config.get("recipes_defaults", lambda k: None)
# Configurable fields from the config object
barcode_serial_field = config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip()
warning_image_field = config.get("recipe", {}).get("warning_image_field", "warning_img").strip()
print_template_field = config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip()
decsep = locale.localeconv()["decimal_point"]
# Extract and clean "r nominale" value
rcsv = (
row.get("r nominale", defaults["r nominale"])
.replace(" ", "").replace(",", decsep).replace("Ω", "").replace("?", "")
)
if rcsv == "":
rcsv = "999" # Default fallback for "r nominale" if empty
# Helper functions
def get_default_value(field, key):
value = field.get(key, defaults[key])
return value if value != "" else defaults[key]
def safe_parse(value):
try:
return int(float(value))
except ValueError:
return 0 # Default to 0 if parsing fails
# Normalize printer_selection to resolution string ("203"/"300")
lp1 = (config.get("label_printer", {}) or {})
lp2 = (config.get("label_printer_2", {}) or {})
def normalize_printer_sel(val):
try:
s = str(val).strip()
except Exception:
s = ""
if s == "":
# default to primary printer resolution
try:
return str(int(lp1.get("risoluzione", 300)))
except Exception:
return "300"
if s.isdigit():
return s
# legacy OS printer name: match lp1 or lp2 names
if s == str(lp1.get("printer", "")):
try:
return str(int(lp1.get("risoluzione", 300)))
except Exception:
return "300"
if s == str(lp2.get("printer", "")):
try:
return str(int(lp2.get("risoluzione", 300)))
except Exception:
return "300"
# unknown string; keep as-is
return s
# Define the steps dictionary
steps = {
"count": {
"amount": row.get("dimensione_lotto", defaults["dimensione_lotto"]),
"warning_img": row.get(warning_image_field, defaults["warning_img"]),
"require_discard_piece": row.get("richiedi_inserimento_scarto", defaults["richiedi_inserimento_scarto"]),
},
"connector": {
"connector": row.get("connettore", defaults["connettore"]),
},
"barcodes": {
"serial": row.get(barcode_serial_field, defaults["codice_a_barre"]),
"n_pieces":(row.get("n_componenti")) if row.get("n_componenti") is not None else 1,
"barcode_input_2": row.get("barcode_input_2", "-"),
"barcode_input_3": row.get("barcode_input_3", "-"),
"barcode_input_4": row.get("barcode_input_4", "-"),
"barcode_input_5": row.get("barcode_input_5", "-"),
},
"resistance": {
"scale": locale.atof(row.get("scala_resistenza", defaults["scala_resistenza"])),
"expected": locale.atof(rcsv),
"tolerance_pos": locale.atof(get_default_value(row, "tolleranza_resistenza_pos")),
"tolerance_neg": locale.atof(get_default_value(row, "tolleranza_resistenza_neg")),
} if "resistance" not in unsupported_steps else None,
"screws": {
"quantity": row.get("viti", defaults["viti"]),
},
"instruction": {},
"pipe_cutter": {
"length": row.get("l_taglio_guaina_gc_[mm]", defaults["lunghezza_corrugato"]),
"diameter": row.get("diametro_corrugato", defaults["diametro_corrugato"]),
},
# Empty placeholder for future extensions
"leak_1": {
"pre_filling_time": safe_parse(row.get("tempo_pre_riempimento", defaults["tempo_pre_riempimento"])),
"pre_filling_pressure": safe_parse(
row.get("pressione_pre_riempimento", defaults["pressione_pre_riempimento"])),
"filling_time": safe_parse(row.get("tempo_riempimento", defaults["tempo_riempimento"])),
"settling_time": safe_parse(get_default_value(row, "tempo_assestamento")),
"settling_pressure_min_percent": safe_parse(
row.get("percentuale_minima_pressione_assestamento",
defaults["percentuale_minima_pressione_assestamento"])
),
"settling_pressure_max_percent": safe_parse(
row.get("percentuale_massima_pressione_assestamento",
defaults["percentuale_massima_pressione_assestamento"])
),
"test_time": safe_parse(row.get("tempo_di_test", defaults["tempo_di_test"])),
"test_pressure_qneg": safe_parse(
row.get("pressione_di_test_delta_minimo", defaults["pressione_di_test_delta_minimo"])),
"test_pressure": safe_parse(row.get("pressione_di_test", defaults["pressione_di_test"])),
"test_pressure_qpos": safe_parse(
row.get("pressione_di_test_delta_massimo", defaults["pressione_di_test_delta_massimo"])),
"flush_time": safe_parse(row.get("tempo_svuotamento", defaults["tempo_svuotamento"])),
"flush_pressure": safe_parse(row.get("pressione_svuotamento", defaults["pressione_svuotamento"])),
"chan_sel": safe_parse(row.get("canale_di_prova", defaults["canale_di_prova"])),
"ext_flush_time": safe_parse(row.get("tempo_svuotamento_esterno", defaults["tempo_svuotamento_esterno"])),
"ext_blow_time": safe_parse(row.get("tempo_soffiaggio_esterno", defaults["tempo_soffiaggio_esterno"])),
"pid_pressure_correction": safe_parse(
row.get("pid_pressure_correction", defaults["pid_pressure_correction"])),
"pid_mod_config": safe_parse(row.get("pid_mod_config", defaults["pid_mod_config"])),
},
"test_freefall_leak": {
# Dedicated Free Fall parameters
# Tempo riempimento (seconds)
"filling_time": safe_parse(row.get("tempo_riempimento_free_fall", row.get("tempo_riempimento", defaults.get("tempo_riempimento", 0)))),
# Pressione riempimento (mbar)
"filling_pressure": safe_parse(row.get("pressione_riempimento_free_fall", row.get("pressione_pre_riempimento", defaults.get("pressione_pre_riempimento", 0)))),
# Pressione minima/massima (mbar)
"pressure_min": safe_parse(row.get("pressione_min_free_fall", defaults.get("pressione_min_free_fall", 0))),
"pressure_max": safe_parse(row.get("pressione_max_free_fall", defaults.get("pressione_max_free_fall", 0))),
# Riempimento continuo (si/no)
"continuous_filling": str(row.get("riempimento_continuo_free_fall", "")).strip().lower() in {"si","s","x","yes","y","true","1"},
},
"leak_2": {
"pre_filling_time": safe_parse(row.get("tempo_pre_riempimento_2", defaults["tempo_pre_riempimento_2"])),
"pre_filling_pressure": safe_parse(
row.get("pressione_pre_riempimento_2", defaults["pressione_pre_riempimento_2"])),
"filling_time": safe_parse(row.get("tempo_riempimento_2", defaults["tempo_riempimento_2"])),
"settling_time": safe_parse(row.get("tempo_assestamento_2", defaults["tempo_assestamento_2"])),
"settling_pressure_min_percent": safe_parse(
row.get("percentuale_minima_pressione_assestamento_2",
defaults["percentuale_minima_pressione_assestamento_2"])
),
"settling_pressure_max_percent": safe_parse(
row.get("percentuale_massima_pressione_assestamento_2",
defaults["percentuale_massima_pressione_assestamento_2"])
),
"test_time": safe_parse(row.get("tempo_di_test_2", defaults["tempo_di_test_2"])),
"test_pressure_qneg": safe_parse(
row.get("pressione_di_test_delta_minimo_2", defaults["pressione_di_test_delta_minimo_2"])),
"test_pressure": safe_parse(row.get("pressione_di_test_2", defaults["pressione_di_test_2"])),
"test_pressure_qpos": safe_parse(
row.get("pressione_di_test_delta_massimo_2", defaults["pressione_di_test_delta_massimo_2"])),
"flush_time": safe_parse(row.get("tempo_svuotamento_2", defaults["tempo_svuotamento_2"])),
"flush_pressure": safe_parse(row.get("pressione_svuotamento_2", defaults["pressione_svuotamento_2"])),
"chan_sel": safe_parse(row.get("canale_di_prova_2", defaults["canale_di_prova_2"])),
"ext_flush_time": safe_parse(row.get("tempo_svuotamento_esterno_2", defaults["tempo_svuotamento_esterno"])),
"ext_blow_time": safe_parse(row.get("tempo_soffiaggio_esterno_2", defaults["tempo_soffiaggio_esterno"])),
"pid_pressure_correction": safe_parse(row.get("pid_pressure_correction", defaults["pid_pressure_correction_2"])),
"pid_mod_config": safe_parse(row.get("pid_mod_config", defaults["pid_mod_config"])),
},
"vision": {
"recipe": row.get("ricetta_visione", defaults["ricetta_visione"]),
},
"print": {
"template": row.get(print_template_field, defaults["modello_etichetta"]),
"labeltxt_1": row.get("labeltxt_1", ""),
"labeltxt_2": row.get("labeltxt_2", ""),
"labeltxt_3": row.get("labeltxt_3", ""),
"labeltxt_4": row.get("labeltxt_4", ""),
"labeltxt_5": row.get("barcode_input_finelinea", ""),
"extra_label": row.get("etichette_supplementari", ""),
"barcode": row.get("barcode_stampato",defaults["barcode_format"]),
# Store resolution instead of printer name; map legacy names to their resolution
"printer_selection": normalize_printer_sel(row.get("printer_selection", lp1.get("risoluzione", 300))),
},
}
# Remove unsupported steps if specified
if unsupported_steps:
for step in unsupported_steps:
steps.pop(step, None)
return steps
def import_recipes(config, csv_path=None, defaults=None, unsupported_steps=None, logger=None):
"""
Import recipes from CSV and update or create new ones in the database.
:param config: Configuration object with recipe settings.
:param csv_path: Path to the CSV file (optional). If None, a file dialog will open.
:param defaults: Default values to use for missing fields in the CSV.
:param unsupported_steps: A list of unsupported step names to exclude.
:param logger: Logger object for logging messages (optional).
"""
if defaults is None:
defaults = config.get("recipes_defaults", lambda k: None)
# Open file dialog if csv_path is not provided
if csv_path is None:
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
csv_path, _ = QFileDialog.getOpenFileName(
None,
"Import Recipes",
"recipes.csv",
"CSV files (*.csv);;All Files (*)",
options=options,
)
csv_path = str(csv_path)
if not len(csv_path):
return
if logger:
logger.info(f"Importing recipes from: {csv_path}.")
# Get field mappings from the config
recipe_name_field = config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip()
part_number_field = config.get("recipe", {}).get("part_number_field", "part_number").strip()
description_field = config.get("recipe", {}).get("description_field", "descrizione").strip()
barcode_enable_field = config.get(
"recipe", {}
).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip()
try:
with open(csv_path, "r", encoding="utf-8-sig") as file:
reader = csv.DictReader(file)
count = 0
# Wrap all database operations in a transaction
with db.atomic():
for ucrow in reader:
# Normalize row keys to lowercase for consistency
row = dict((k.lower(), v) for k, v in ucrow.items())
recipe_name = row.get(recipe_name_field, defaults["codice_ricetta"])
steps_specs = read_steps(row, config, defaults=defaults, unsupported_steps=unsupported_steps)
# Create or update recipe in the database
try:
# Try to fetch existing recipe
recipe = Recipes.get_by_id(recipe_name)
recipe_is_new = False
except Recipes.DoesNotExist:
# Create a new recipe if it doesn't exist
recipe = Recipes(name=recipe_name, part_number="TEMPORARY")
recipe_is_new = True
# Update recipe attributes
recipe.client = row.get("cliente", defaults["cliente"])
recipe.part_number = row.get(part_number_field, defaults["part_number"])
recipe.description = row.get(description_field, defaults["descrizione"])
# Recipe specifications
steps = {}
for step_name, step_spec in steps_specs.items():
if unsupported_steps is None or step_name not in unsupported_steps:
steps[step_name] = step_spec
recipe.spec = {
"count": len(
row.get("dimensione_lotto_abilitata", defaults["dimensione_lotto_abilitata"])) and "count" not in (
unsupported_steps or []),
"connector": len(row.get("verifica_connettore_abilitata",
defaults["verifica_connettore_abilitata"])) and "connector" not in (
unsupported_steps or []),
"barcodes": len(row.get(barcode_enable_field,
defaults["verifica_codice_a_barre_abilitata"])) and "barcodes" not in (
unsupported_steps or []),
"resistance": len(row.get("verifica_resistenza_connettore_abilitata", defaults[
"verifica_resistenza_connettore_abilitata"])) and "resistance" not in (unsupported_steps or []),
"screws": len(row.get("avvitatura_abilitata", defaults["avvitatura_abilitata"])) and "screws" not in (
unsupported_steps or []),
"instruction": len(
row.get("istruzione_abilitata", defaults["istruzione_abilitata"])) and "instruction" not in (
unsupported_steps or []),
"instruction_extra": (str(row.get("istruzione_abilitata_extra", defaults["istruzione_abilitata_extra"])) or "").strip().lower() == "x" and "instruction_extra" not in (unsupported_steps or []),
"pipe_cutter": len(row.get("tagliatubi", defaults["tagliatubi"])) and "pipe_cutter" not in (unsupported_steps or []),
"vision": len(
row.get("test_visione_abilitato", defaults["test_visione_abilitato"])) and "vision" not in (
unsupported_steps or []),
"test_freefall_leak": len(
row.get("prova_tenuta_abilitata", defaults["prova_tenuta_abilitata"])) and "test_freefall_leak" not in (
unsupported_steps or []),
"leak_1": len(
row.get("prova_tenuta_abilitata", defaults["prova_tenuta_abilitata"])) and "leak_1" not in (
unsupported_steps or []),
"leak_2": len(
row.get("prova_tenuta_abilitata_2", defaults["prova_tenuta_abilitata_2"])) and "leak_2" not in (
unsupported_steps or []),
"print": len(
row.get("stampa_etichetta_abilitata", defaults["stampa_etichetta_abilitata"])) and "print" not in (
unsupported_steps or []),
"steps": steps_specs,
}
try:
if recipe_is_new:
recipe.save(force_insert=True) # Insert new recipe
else:
recipe.save() # Update existing recipe
count += 1 # Increment imported recipe count
except Exception as e:
if logger:
logger.error(f"Error saving recipe {recipe_name}: {e}")
raise # Re-raise to trigger transaction rollback
# Transaction is automatically committed if no exceptions occurred
if logger:
logger.info(f"Imported {count} recipes.")
recipe_manager_signals.recipes_imported.emit()
except Exception as e:
if logger:
logger.error(f"Error importing recipes: {e}")
raise
def export_recipes(config, csv_path=None, logger=None):
if csv_path is None:
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
csv_path, _ = QFileDialog.getSaveFileName(
None,
"Export Recipes",
"recipes.csv",
"CSV files (*.csv);;All Files (*)",
options=options,
)
csv_path = str(csv_path)
if not len(csv_path):
return
if not csv_path.lower().endswith(".csv"):
csv_path += ".csv"
try:
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
recipe_name_field = config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip()
barcode_enable_field = config.get("recipe", {}).get("barcode_enable_field",
"verifica_codice_a_barre_abilitata").strip()
barcode_serial_field = config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip()
print_template_field = config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip()
data = []
fieldnames = set() # Use a set to avoid duplicates
# Wrap database operations in a transaction for consistency
with db.atomic():
# Iterate over all recipes in the database
for recipe in Recipes.select():
try:
steps = recipe.get_steps_map()
exportable = {
# Base fields
recipe_name_field: recipe.name,
"cliente": recipe.client,
"part_number": recipe.part_number,
}
# Add base fields to the fieldnames
fieldnames.update([recipe_name_field, "cliente", "part_number"])
# Check and add steps conditionally
if "connector" in steps:
exportable.update({
"verifica_connettore_abilitata": "x",
"connettore": steps["connector"].spec["connector"]
})
fieldnames.update(["verifica_connettore_abilitata", "connettore"])
if "resistance" in steps:
exportable.update({
"verifica_resistenza_connettore_abilitata": "x",
"scala_resistenza": steps["resistance"].spec["scale"],
"r nominale": steps["resistance"].spec["expected"],
"tolleranza_resistenza_pos": steps["resistance"].spec["tolerance_pos"],
"tolleranza_resistenza_neg": steps["resistance"].spec["tolerance_neg"],
})
fieldnames.update(["verifica_resistenza_connettore_abilitata", "scala_resistenza", "r nominale",
"tolleranza_resistenza_pos", "tolleranza_resistenza_neg"])
if "barcodes" in steps:
exportable.update({
barcode_enable_field: "x",
barcode_serial_field: steps["barcodes"].spec["serial"]
})
fieldnames.update([barcode_enable_field, barcode_serial_field])
if "screws" in steps:
exportable.update({
"avvitatura_abilitata": "x",
"viti": steps["screws"].spec["quantity"]
})
fieldnames.update(["avvitatura_abilitata", "viti"])
if "leak_1" in steps:
exportable.update({
"prova_tenuta_abilitata": "x",
"tempo_pre_riempimento": steps["leak_1"].spec["pre_filling_time"],
"pressione_pre_riempimento": steps["leak_1"].spec["pre_filling_pressure"],
"tempo_di_test": steps["leak_1"].spec["test_time"],
"pressione_di_test": steps["leak_1"].spec["test_pressure"],
"pid_pressure_correction": steps["leak_1"].spec["pid_pressure_correction"],
})
fieldnames.update(["prova_tenuta_abilitata", "tempo_pre_riempimento", "pressione_pre_riempimento",
"tempo_di_test", "pressione_di_test", "pid_pressure_correction"])
# Export Free Fall leak test parameters
if "test_freefall_leak" in steps:
# Ensure enable flag present even if leak_1 absent
exportable.update({
"prova_pervieta_abilitata": exportable.get("prova_pervieta_abilitata", ""),
"tempo_riempimento_free_fall": steps["test_freefall_leak"].spec.get("filling_time", 0),
"pressione_riempimento_free_fall": steps["test_freefall_leak"].spec.get("filling_pressure", 0),
"pressione_min_free_fall": steps["test_freefall_leak"].spec.get("pressure_min", 0),
"pressione_max_free_fall": steps["test_freefall_leak"].spec.get("pressure_max", 0),
"riempimento_continuo_free_fall": "x" if steps["test_freefall_leak"].spec.get("continuous_filling") else "",
})
fieldnames.update([
"prova_pervieta_abilitata",
"tempo_riempimento_free_fall",
"pressione_riempimento_free_fall",
"pressione_min_free_fall",
"pressione_max_free_fall",
"riempimento_continuo_free_fall",
])
if "leak_2" in steps:
exportable.update({
"prova_tenuta_abilitata_2": "x",
"tempo_pre_riempimento_2": steps["leak_2"].spec["pre_filling_time"],
"pressione_pre_riempimento_2": steps["leak_2"].spec["pre_filling_pressure"],
"tempo_di_test_2": steps["leak_2"].spec["test_time"],
"pressione_di_test_2": steps["leak_2"].spec["test_pressure"],
"pid_pressure_correction": steps["leak_1"].spec["pid_pressure_correction"],
})
fieldnames.update(["prova_tenuta_abilitata_2", "tempo_pre_riempimento_2", "pressione_pre_riempimento_2",
"tempo_di_test_2", "pressione_di_test_2", "pid_pressure_correction"])
if "vision" in steps:
exportable.update({
"test_visione_abilitato": steps["vision"].spec.get("enabled", ""),
"ricetta_visione": steps["vision"].spec["recipe"]
})
fieldnames.update(["test_visione_abilitato", "ricetta_visione"])
if "print" in steps:
exportable.update({
"stampa_etichetta_abilitata": "x",
print_template_field: steps["print"].spec["template"],
# Export resolution string; fallback to primary printer resolution from config
"printer_selection": steps["print"].spec.get(
"printer_selection",
str((config.get("label_printer", {}) or {}).get("risoluzione", 300)),
),
})
fieldnames.update(["stampa_etichetta_abilitata", print_template_field, "printer_selection"])
# Append the exportable row to the data
data.append(exportable)
except Exception as e:
if logger:
logger.error(f"Error processing recipe {recipe.name}: {e}")
# Continue with next recipe instead of failing the entire export
continue
except Exception as e:
if logger:
logger.error(f"Error exporting recipes: {e}")
raise
# Export data to CSV if there is any data
if len(data):
if logger:
logger.info(f"Exporting recipes to {csv_path}")
with open(csv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(fieldnames))
writer.writeheader()
writer.writerows(data)
if logger:
logger.info(f"Exported {len(data)} recipes to {csv_path}.")
def backup_current_recipes(config, logger=None):
"""
Back up current recipes to a CSV file named after the current machine description.
Only one backup file is kept (overwritten on each call).
"""
# Define the backup directory
backup_dir = os.path.join('config', 'csv_import', 'backup_csv')
# Read machine description from config
try:
machine_desc = (config.get('machine', {}) or {}).get('description')
if not machine_desc:
# Fallbacks
machine_desc = getattr(config, 'machine_id', None) or 'backup_recipes'
except Exception:
machine_desc = getattr(config, 'machine_id', None) or 'backup_recipes'
# Sanitize description to create a safe filename
safe_desc = re.sub(r"[^A-Za-z0-9._-]+", "_", str(machine_desc).strip())
if not safe_desc:
safe_desc = 'backup_recipes'
# Build backup file path (no timestamp => single rotating file)
backup_file = f"{safe_desc}.csv"
backup_path = os.path.join(backup_dir, backup_file)
# Ensure the backup directory exists
os.makedirs(backup_dir, exist_ok=True)
# Export current recipes to the backup path (overwrites existing file)
# Suppress internal export logs during automatic backup
export_recipes(config=config, csv_path=backup_path, logger=None)
# Do not log here to avoid duplicate messages; caller will handle final log
return backup_path # Return the backup path for reference if needed