st-ten-1/src/ui/test/test.py
ST-TEN-11 7391a6d29b dev
2026-02-24 15:18:11 +01:00

1453 lines
72 KiB
Python
Executable File

import copy
import logging
import os
import sys
import weakref
from datetime import datetime, timedelta
# from distutils.util import change_root
from PyQt5.QtCore import QTimer, pyqtSlot, pyqtSignal
from PyQt5.QtWidgets import QMessageBox
from lib.db import Archive, Recipes, Users
from lib.helpers import get_shift
from lib.helpers.step import Step
from playhouse.shortcuts import model_to_dict
from ui.barcode_recipe_selection import Barcode_Recipe_Selection
from ui.helpers import replace_widget
from ui.recipe_selection import Recipe_Selection
from ui.test_assembly import Test_Assembly
from ui.test_barcodes import Test_Barcodes
from ui.test_connector import Test_Connector
from ui.test_count import Test_Count
from ui.test_count_end import Test_Count_End
from ui.test_fail import Test_Fail
from ui.test_instructions import Test_Instructions
from src.ui.test_pipe_cutter import Test_Pipe_Cutter
from ui.test_leak import Test_Leak
from ui.test_resistance import Test_Resistance
from ui.test_screws import Test_Screws
from ui.test_vision import Test_Vision
from ui.test_warning_img import Test_Warning_Img
from ui.widget import Widget
from components import ArchiveSynchronizer
class Test(Widget):
# Modulo 43 assignment table for checksum calculation
MODULO43_ASSIGNMENT_TABLE = {
'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9,
'A': 10, 'B': 11, 'C': 12, 'D': 13, 'E': 14, 'F': 15, 'G': 16, 'H': 17, 'I': 18,
'J': 19, 'K': 20, 'L': 21, 'M': 22, 'N': 23, 'O': 24, 'P': 25, 'Q': 26, 'R': 27,
'S': 28, 'T': 29, 'U': 30, 'V': 31, 'W': 32, 'X': 33, 'Y': 34, 'Z': 35,
'-': 36, '.': 37, ' ': 38, '$': 39, '/': 40, '+': 41, '%': 42
}
def __init__(self, config, components=None, main_window=None):
super().__init__()
self.autotest_timer = None
self.main_window = main_window
self.config = config
self.components = components
# GET LOGGER
self.log = logging.getLogger("Test")
# SHOW MACHINE DESCRIPTION
self.machine_description_l.setText(self.config.get("machine", {}).get("description", "N/A"))
# SHOW USERNAME
session = Users.get_session()
self.original_username = session.username
# Check if we should set the label to "ADMIN"
if session.username.upper() == "ADMIN":
self.user_l.setText("ADMIN")
session._is_admin = True
else:
self.user_l.setText(session.username)
session._is_admin = False
if session.is_admin:
self.user_l.setStyleSheet("QLabel { color: red; }")
else:
self.user_l.setStyleSheet("")
# Store original admin status
self.had_admin = session.is_admin
self.flag_label.setVisible(False)
if len(sys.argv) > 1:
self.flag_label.setVisible(True)
self.update_label_with_args() # Initial update
else:
self.flag_label.setVisible(False)
# Initialize barcode formatting variables
self.barcode_prefix = ""
self.barcode_suffix = "*="
self.active_errors = [] # List to hold current errors (type: tuples of (message, is_error))
self.current_error_index = 0 # Keeps track of the current error index during alternation
# Timer for alternating errors
self.error_timer = QTimer()
self.error_timer.setInterval(2000) # Fire every 2 seconds
self.error_timer.timeout.connect(self.display_current_error) # Connect the timer to the display logic
# SHOW AND UPDATE TIME CLOCK
self.refresh_time(init=True)
# INIT RECIPE
self.recipe = None
if "fixture_id" in self.components:
self.rfid = self.components["fixture_id"]
self.rfid.rfid_error_signal.connect(self.handle_rfid_error)
if "tecna_t3" in self.components:
self.tecna = self.components["tecna_t3"]
# self.tecna.tecna_error_signal.connect(self.handle_modbus_error)
self.error_label.setText("")
self.error_label.setStyleSheet("QLabel { color: red; }")
if self.config["hardware_config"]["barcode_recipe_selection"] == "present":
self.recipe_selection_mode = "barcode"
else:
self.recipe_selection_mode = "table"
self.step = None
self.tester_component = None
if self.config["hardware_config"]["tecna_t3"] == "present":
self.tester_component = "tecna_t3"
# self.components["tecna_t3"].tecna_error_signal.connect(self.handle_modbus_error)
elif self.config["hardware_config"]["furness_controls"] == "present":
self.tester_component = "furness_control"
self.unsupported_steps = set()
self.steps_dependencies = {
"count": set(),
"connector": {"multicomp", },
"instruction": {"digital_io"},
"screws": {"screwdriver", "tecna_t3", },
"resistance": {"multicomp", },
"leak_1": {self.tester_component, },
"test_freefall_leak": {self.tester_component, },
"leak_2": {self.tester_component, },
"pipe_cutter": {"pipe_cutter"},
"vision": {("uvc_camera", "galaxy_camera", "hikrobot_sc"), "vision", "vision_saver", }, # "neo_pixels", },
"print": {"label_printer_2"} if self.config["hardware_config"]["label_printer"] != "present" else {
"label_printer"},
}
self.unsupported_steps = set()
for step_name, dependencies in self.steps_dependencies.items():
for dependency in dependencies:
if isinstance(dependency, tuple):
# if all([d not in self.components or not self.components[d].ready for d in dependency]):
if all([d not in self.components for d in dependency]):
self.unsupported_steps.add(step_name)
else:
# if dependency not in self.components or not self.components[dependency].ready:
if dependency not in self.components:
self.unsupported_steps.add(step_name)
# Enforce second leak test hardware flag
if self.config["hardware_config"].get("second_leak_test", "absent") != "present":
self.unsupported_steps.add("leak_2")
# INIT PIECES COUNTER
self.pieces = {"ok": 0, "ko": 0}
# INIT CYCLE STATES
self.cycle_available_steps = {
# "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None),
"barcodes": Test_Assembly(img_path=self.select_step_img("scan"),
text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()),
"connector": Test_Assembly(img_path=self.select_step_img("scan"),
text=u"COLLEGARE IL CONNETTORE INDICATO AL PEZZO E LEGGERE IL SUO BARCODE",
widget=Test_Connector(run_once=True)),
"count": Test_Assembly(img_path=None, text=u"INSERIRE IL NUMERO DI PEZZI ATTESI PER IL LOTTO",
widget=Test_Count(components=self.components, recipe=self.recipe, step=self.step,
pieces=self.pieces, run_once=True)),
"warning_img": Test_Assembly(img_path=None,
text=u"ATTENZIONE - PER QUESTO CODICE ESEGUIRE LE OPERAZIONI INDICATE IN FIGURA",
widget=Test_Warning_Img(components=self.components, recipe=self.recipe,
bench_name=self.config["machine"]["image_for_warning"],
step=self.step, run_once=True)),
"count_end": Test_Assembly(img_path=None,
text=u"LOTTO TERMINATO, PREMERE CONTINUA PERCOMINCIARNE UNO NUOVO",
widget=Test_Count_End(components=self.components, recipe=self.recipe,
step=self.step, pieces=self.pieces)),
"done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None),
"emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"),
text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"",
widget=None),
"fail": Test_Assembly(img_path=self.select_step_img("fail"),
text=u"CICLO INTERROTTO, PREMERE CONTINUA PER COMINCIARE UN NUOVO CICLO",
widget=Test_Fail(parent=self)),
"blow": Test_Assembly(img_path=None, text=u"SOFFIAGGIO TUBO IN CORSO - ATTENDERE...",
widget=Test_Warning_Img(components=self.components, recipe=self.recipe,
step=self.step)),
"test_freefall_leak": Test_Assembly(img_path=None, text=None,
widget=Test_Leak(config=self.config, components=self.components,
recipe=self.recipe, step=self.step, pieces=self.pieces,
parent=self))
if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"][
"furness_controls"] != "absent" else None,
"leak_1": Test_Assembly(img_path=None, text=None,
widget=Test_Leak(config=self.config, components=self.components, recipe=self.recipe,
step=self.step, pieces=self.pieces, parent=self))
if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"][
"furness_controls"] != "absent" else None,
"leak_2": Test_Assembly(img_path=None, text=None,
widget=Test_Leak(config=self.config, components=self.components, recipe=self.recipe,
step=self.step, pieces=self.pieces, parent=self))
if ((self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"][
"furness_controls"] != "absent")
and self.config["hardware_config"].get("second_leak_test", "absent") == "present") else None,
"flush": Test_Assembly(img_path=None, text=u"SCARICO ARIA IN CORSO - ATTENDERE...",
widget=Test_Warning_Img(components=self.components, recipe=self.recipe,
step=self.step)),
"instruction": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO INDICATE IN FIGURA",
widget=Test_Instructions(config=self.config, components=self.components,
recipe=self.recipe, bench_name=self.config.machine_id,
step=self.step)),
"pipe_cutter": Test_Assembly(img_path=None, text=u"ATTENZIONE TAGLIO CORRUGATO IN CORSO",
widget=Test_Pipe_Cutter(config=self.config, components=self.components,
recipe=self.recipe, bench_name=self.config.machine_id,
step=self.step)),
"instruction_extra": Test_Assembly(img_path=None,
text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO EXTRA INDICATE IN FIGURA",
widget=Test_Instructions(config=self.config, components=self.components,
recipe=self.recipe,
bench_name=self.config.machine_id,
step=self.step)),
"piece_removal": Test_Assembly(img_path=None, text=u"RIMUOVERE IL PEZZO APRENDO TUTTE LE CHIUSURE",
widget=Test_Instructions(config=self.config, components=self.components,
recipe=self.recipe,
bench_name=self.config.machine_id, step=self.step)),
"print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO",
widget=None),
"resistance": Test_Assembly(img_path=None,
text=u"COLLEGARE CONNETTORE ELETTRICO PER EFFETTUARE PROVA RESISTENZA",
widget=Test_Resistance(components=self.components, recipe=self.recipe,
step=self.step, pieces=self.pieces)),
"screws": Test_Assembly(img_path=None, text=u"AVVITARE TUTE LE VITI COME INDICATO",
widget=Test_Screws(components=self.components, recipe=self.recipe, step=self.step,
pieces=self.pieces)),
"select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE",
widget=Recipe_Selection(config=self.config,
unsupported_steps=self.unsupported_steps)),
"barcode_recipe_selection": Test_Assembly(img_path=self.select_step_img("scan"),
text=u"LEGGERE IL BARCODE SULLA DIMA DEL COMPONENTE DA COLLAUDARE",
widget=Barcode_Recipe_Selection(parent=self)),
"vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA",
widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step,
pieces=self.pieces)),
"wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO",
widget=None),
None: Test_Assembly(img_path=self.select_step_img("warning"),
text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None),
}
self.cycle_steps = None
self.cycle_index = -1
self.print_step = None
self.last_label = None
self.require_discard_piece = False
# SETUP AUTOTEST
self.autotest_request = False
self.autotesting = False
self.autotesting_reason = None
self.autotest_cycle_steps = None
if "--no-autotest" in sys.argv:
self.setStyleSheet("background-color: red;")
else:
self.setStyleSheet("background-color: white;")
if "--no-autotest" not in sys.argv:
if "--test-autotest" in sys.argv:
self.autotest_period = int(60 * 1000) # 1 min
else:
# self.autotest_period = int(8.5 * 60 * 60 * 1000)# 8.5 HOURS
self.autotest_period = int(4 * 60 * 60 * 1000) # 4 HOURS
# self.autotest_period = 12 * 60 * 60 * 1000 # 12 HOURS
# if not self.config["autotest_done"]:
# self.request_autotest("init")
else:
self.autotest_period = None
# INIT TEST DATA
self.data = {"ok": True, "overridden": False}
self.archived = None
# CONNECT CYCLE CONTROLS
self.cancel_b.clicked.connect(self.fail_cycle)
self.change_recipe_b.clicked.connect(self.change_recipe)
self.reset_count_b.clicked.connect(self.reset_count)
for step_name, w in self.cycle_available_steps.items():
if hasattr(w, "ok"):
# custom ok handlers should call next again
if isinstance(w.widget, Recipe_Selection):
w.ok.connect(self.set_recipe)
else:
w.ok.connect(
lambda data=None, step_namel=step_name, selfie=weakref.ref(self): selfie().set_step(step_namel,
data))
if hasattr(w, "ko"):
w.ko.connect(self.fail_cycle)
# CUSTOM STEP CONNECTIONS
self.cycle_available_steps["count"].ok.connect(self.cycle_available_steps["count_end"].widget.set_amount)
# self.cycle_available_steps["warning_img"].ok.connect(self.cycle_available_steps["warning_img"].widget.set_done)
if "fixture_id" in self.components.keys():
self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid)
self.components["fixture_id"].rfid_error_signal.connect(self.handle_rfid_error)
self.tag_loaded_recipe = self.main_window.tag_loaded_recipe
# TESTING
if "--test" in sys.argv:
self.testing = True
else:
self.testing = False
# /TESTING
# START CYCLE
self.next_timer = QTimer()
self.next_timer.setSingleShot(True)
self.next_timer.timeout.connect(self.next)
self.next()
def refresh_time(self, init=False):
if init and not hasattr(self, 'time_timer'):
self.time_timer = QTimer()
self.time_timer.setSingleShot(False)
self.time_timer.timeout.connect(self.refresh_time)
t = datetime.now()
self.time_l.setText("{d}/{mo}/{y}\n{h}:{m:02d}".format(
y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute
))
# Always restart the timer, ensuring intervals are set correctly
if hasattr(self, 'time_timer'): # Safeguard for uninitialized timer
self.time_timer.start(30000)
def select_step_img(self, step, suffix=None):
img_path = "./src/ui/imgs"
names = []
if suffix is not None:
names.append(f"{step}_{suffix}_{self.config.machine_id}")
names.append(f"{step}_{suffix}")
names.append(f"{step}_{self.config.machine_id}")
names.append(f"{step}")
for name in names:
for ext in ["png", "jpg"]:
path = f"{img_path}/{name}.{ext}"
if os.path.isfile(path):
return path
raise FileNotFoundError(f"No image was found for step {step}")
def change_recipe(self):
self.next(action="change_recipe")
def set_recipe_mode_table(self):
self.recipe_selection_mode = "table"
self.change_recipe()
def set_recipe_mode_barcode(self):
self.recipe_selection_mode = "barcode"
self.change_recipe()
def cut_tube(self):
self.components["pipe_cutter"].to_calibrate()
self.components["pipe_cutter"].start_cutting()
def reprint_label(self):
self.print(self.last_label, self.print_step.spec.get("template", "EtichettaR5"))
def fail_cycle(self):
self.next(action="fail")
def setCentralWidget(self, widget):
replace_widget(self, "centralWidget", widget)
def enable_temp_admin(self):
"""Enable temporary admin privileges for the current user"""
session = Users.get_session()
if session is None:
self.log.warning("Cannot enable temporary admin privileges: No active session")
return False
# Save the current label text before changing it
self.saved_user_label_text = self.user_l.text()
# Enable temporary admin privileges using the Session class method
session.enable_temp_admin()
# Update UI to reflect admin status
self.user_l.setText("ADMIN")
self.user_l.setStyleSheet("QLabel { color: red; }")
# Refresh Recipe_Selection UI to show admin buttons
if "select_recipe" in self.cycle_available_steps and self.cycle_available_steps["select_recipe"].widget:
recipe_selection = self.cycle_available_steps["select_recipe"].widget
if hasattr(recipe_selection, "refresh"):
recipe_selection.refresh()
self.log.info("Recipe Selection UI refreshed to show admin buttons")
self.log.info(f"Temporary admin privileges enabled for user: {session.username}")
return True
def disable_temp_admin(self):
"""Disable temporary admin privileges and restore original user status"""
session = Users.get_session()
if session is None:
self.log.warning("Cannot disable temporary admin privileges: No active session")
return False
# Disable temporary admin privileges using the Session class method
session.disable_temp_admin()
# Restore original UI
# Use the saved label text if available, otherwise fall back to original username
if hasattr(self, 'saved_user_label_text'):
self.user_l.setText(self.saved_user_label_text)
else:
self.user_l.setText(self.original_username)
# Set style based on original admin status
if self.had_admin:
self.user_l.setStyleSheet("QLabel { color: red; }")
else:
self.user_l.setStyleSheet("")
# Refresh Recipe_Selection UI to hide admin buttons
if "select_recipe" in self.cycle_available_steps and self.cycle_available_steps["select_recipe"].widget:
recipe_selection = self.cycle_available_steps["select_recipe"].widget
if hasattr(recipe_selection, "refresh"):
recipe_selection.refresh()
self.log.info("Recipe Selection UI refreshed to hide admin buttons")
self.log.info(f"Temporary admin privileges disabled for user: {session.username}")
return True
def request_autotest(self, reason): # you can cancel the request calling request_autotest(False)
if "--no-autotest" not in sys.argv:
self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}")
if reason in ("init", "login"):
self.autotest_timer = QTimer()
self.autotest_timer.setSingleShot(False)
self.autotest_timer.timeout.connect(self.request_periodic_autotest)
if self.autotest_period is not None:
self.autotest_timer.start(self.autotest_period)
reason = "boot"
if not hasattr(self, "refresh_timer"):
self.refresh_timer = QTimer()
self.refresh_timer.setSingleShot(False)
self.refresh_timer.timeout.connect(self.refresh_time)
self.refresh_timer.start(30000)
if self.config["autotest_leak"]["enabled"] == "true":
self.autotest_request = reason
else:
self.autotest_request = False
else:
self.log.info(f"Autotest request ignored (reason: {reason!r}) --no-autotest flag detected")
if reason == "logout":
self.next(action="abort")
def request_periodic_autotest(self):
self.request_autotest("periodic")
def next(self, action=None):
if self.step is not None:
self.log.info(f"cycle step: {self.step.step_type!r} action: {action!r} current index:{self.cycle_index}")
else:
self.log.info(f"cycle step: {self.step!r} action: {action!r} current index:{self.cycle_index}")
# Track the previous step type to enable transition hooks (e.g., Free Fall -> Leak_1)
prev_step_type = self.step.step_type if self.step is not None else None
current_w = self.centralWidget
if hasattr(current_w, "stop"):
self.log.info(f"stopping widget {self.step.step_type}")
current_w.stop()
if action == "change_recipe":
self.log.info(f"cycle next: action: {action!r}")
self.set_recipe(recipe=None)
if self.config["hardware_config"]["tecna_t3"] == "present" or self.config["hardware_config"][
"furness_controls"] == "present":
# Reset recipe_written flags for leak widgets if they exist
leak1 = self.cycle_available_steps.get("leak_1")
if leak1 is not None and getattr(leak1, "widget", None) is not None:
leak1.widget.recipe_written = False
leak2 = self.cycle_available_steps.get("leak_2")
if leak2 is not None and getattr(leak2, "widget", None) is not None:
leak2.widget.recipe_written = False
self.step = Step(step_type="select_recipe")
self.cycle_index = -1
self.recipe = None
self.cycle_steps = None
# COUNT RESET
self.pieces["ok"] = 0
self.pieces["ko"] = 0
elif action in ("fail", "abort"):
self.log.info(f"cycle next: action: {action!r}")
# FAIL AND RESTART TEST
self.step = Step(step_type="fail")
self.cycle_index = -1
# COUNT FAIL
if action == "fail":
self.done(ok=False)
elif action is not None:
raise NotImplementedError(f"cycle next: action {action!r} is not a valid action")
# Set next cycle step normally if no action sets it explicitly
if self.recipe is None or self.cycle_steps is None:
# If recipe not set: select_recipe
if self.recipe_selection_mode == "barcode":
self.log.info(f"returning to barcode recipe selection")
self.step = Step(step_type="barcode_recipe_selection")
else:
self.log.info(f"returning to recipe selection table")
self.step = Step(step_type="select_recipe")
elif action is None:
if (
self.autotest_request is not False
and self.autotest_cycle_steps is not None
and not self.autotesting
and (self.cycle_index == -1 or self.cycle_index + 1 >= len(self.cycle_steps))
):
# If autotest was requested
# and if cycle_steps is not started or has ended
self.cycle_index = -1
self.autotesting = True
self.autotesting_reason = self.autotest_request
self.autotest_request = False
self.log.info(f"Autotest requested (reason: {self.autotesting_reason})")
if self.autotest_period is not None: # Reset periodic autotest timer
self.time_timer.start(self.autotest_period)
self.require_discard_piece = False
if self.autotesting:
if self.cycle_index + 1 < len(self.autotest_cycle_steps):
# Go to next step in autotest_cycle_steps
self.cycle_index += 1
self.step = self.autotest_cycle_steps[self.cycle_index]
else:
# When autotest ends, check if it needs to restart
if not self.data.get("ok"): # Autotest failed
self.log.warning("Restarting autotest from the first step due to failure.")
self.cycle_index = 0
self.step = self.autotest_cycle_steps[self.cycle_index] # Restart from the first step
else:
# Autotest succeeded; proceed to post-autotest actions
self.autotesting = False
if self.autotesting_reason == "logout":
Users.logout()
self.main_window.open_login()
else:
t = datetime.now()
self.last_at_l.setText(
"{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
t += timedelta(seconds=int(self.autotest_period / 1000))
self.next_at_l.setText(
"{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
self.autotesting_reason = None
self.cycle_index = -1
self.config["autotest_done"] = True
if not self.autotesting:
if len(self.cycle_steps):
# Go to next step in cycle_steps
self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps)
self.step = self.cycle_steps[self.cycle_index]
else:
self.cycle_index = -1
self.step = Step(step_type=None)
# Enable/disable cycle controls
self.change_recipe_b.setEnabled(self.recipe is not None)
self.cancel_b.setEnabled(self.step.step_type is not None and self.step.step_type not in {
"emergency",
"fail",
"select_recipe",
"wait",
})
self.log.info(f"next cycle step: {self.step.step_type!r}")
# INIT TEST DATA IF STARTING CYCLE OR RESET IS NEEDED
if self.cycle_index == 0 and not self.autotesting: # Initialize test data for normal cycles
self.data = {"ok": True, "overridden": False}
self.archived = None
if self.recipe is not None and "recipe" not in self.data:
self.data["recipe"] = model_to_dict(self.recipe)
# If transitioning from Free Fall to Leak 1, preload Leak 1 parameters onto the tester
try:
if (
prev_step_type == "test_freefall_leak"
and self.step is not None
and self.step.step_type == "leak_1"
and hasattr(self, "tester_component")
and self.tester_component in (self.components or {})
):
self.log.info("Transition detected: Free Fall -> Leak_1. Pre-writing Leak_1 recipe to tester.")
self.components[self.tester_component].write_recipe(self.recipe, self.step)
leak1 = self.cycle_available_steps.get("leak_1")
if leak1 is not None and getattr(leak1, "widget", None) is not None:
leak1.widget.recipe_written = True
except Exception as e:
try:
self.log.exception(f"Failed to pre-write Leak_1 recipe after Free Fall: {e}")
except Exception:
pass
# Remaining logic for updating widgets, starting timers, etc.
w = self.cycle_available_steps[self.step.step_type]
show = None
if self.step.step_type == "leak_2":
self.setCentralWidget(w) # Pre-show UI for leak_2
if hasattr(w, "start"):
show = w.start(recipe=self.recipe, step=self.step, pieces=self.pieces)
if show is not False and w is not current_w:
self.setCentralWidget(w)
elif show is False:
self.next_timer.start(0)
if self.step.step_type == "done":
self.archived = self.done()
self.last_label = copy.deepcopy(self.archived)
self.next_timer.start(500)
elif self.step.step_type == "print":
compiled_label = self.print(self.archived, self.step.spec.get("template", "EtichettaR5"))
self.archived.test_data.update({"print": compiled_label})
self.archived.test_data.update({"print_template": self.print_template})
self.archived.test_data.update({"barcode_stampato": self.printed_barcode})
self.archived.label = compiled_label
self.log.info(f"Label printed. Saving...")
# self.archived.save()
self.main_window.main_window.run_request.emit(self.archived.save, [], {})
self.next_timer.start(500)
elif self.step.step_type == "wait":
self.next_timer.start(500)
# Update display
self.update_count_display()
def reset_count(self):
# COUNT RESET
self.pieces["ok"] = 0
self.pieces["ko"] = 0
self.update_count_display()
def update_count_display(self):
self.pieces_count_l.setText(
f"{self.pieces['ok']} OK / {self.pieces['ko']} NOK / {sum(self.pieces.values())} TOT")
def set_recipe(self, recipe=None):
self.recipe = recipe
self.config.active_recipe = recipe
inserted_instruction = False
self.require_discard_piece = False
if self.recipe is None:
self.cycle_steps = None
self.autotest_cycle_steps = None
else:
steps = self.recipe.get_steps()
skip = set()
print_found = False
count_found = False
# create step sequence list
barcode_names = ['serial', 'barcode_input_2', 'barcode_input_3', 'barcode_input_4', 'barcode_input_5']
for i, step in enumerate(steps):
if step.step_type == "barcodes":
n_pieces_value = step.spec.get("n_pieces")
# Fix: Handle empty string and None
n_pieces = 1 if n_pieces_value in (None, '') else n_pieces_value
try:
n_pieces_adapted = int(n_pieces)
except ValueError:
self.log.error(f"Invalid value for n_pieces: {n_pieces}") # Log the error
n_pieces_adapted = 1 # Default to 1 if conversion fails
if n_pieces_adapted == 1:
step.spec["barcode_name"] = 'serial'
else:
step.spec["barcode_name"] = barcode_names[(n_pieces_adapted - 1) % len(barcode_names)]
n_pieces_adapted -= 1
new_barcode_step = copy.deepcopy(step)
new_barcode_step.spec["n_pieces"] = str(n_pieces_adapted)
steps.insert(i + 1, new_barcode_step)
if i in skip:
continue
if step.step_type == "vision":
self.components["vision"].config_changed(vision_recipe=self.recipe.name)
if step.step_type == "count":
count_found = True
if "warning_img" in step.spec:
if step.spec["warning_img"]:
steps.insert(i,
Step(step_type="warning_img", spec={"warning_img": step.spec["warning_img"]}))
skip.add(i + 1)
if "assembly" in step.spec:
if step.spec["assembly"]:
steps.insert(i, Step(step_type="instructions", spec={}))
skip.add(i + 1)
if "require_discard_piece" in step.spec:
if step.spec["require_discard_piece"]:
self.require_discard_piece = True
if step.step_type == "resistance": # ADD STEP TO ENSURE REMOVAL OF CONNECTOR
steps.insert(i + 1, Step(step_type="resistance", spec={
"scale": 500,
"expected": float("+inf"),
"tolerance_pos": 0,
"tolerance_neg": 0,
}))
skip.add(i + 1)
if step.step_type == "print":
self.print_template = step.spec.get("template", "EtichettaR5") # Store the template
if print_found:
continue
steps.insert(i, Step(step_type="done", spec={}))
print_found = True
self.print_step = step
if self.config["hardware_config"].get("enforce_piece_removal", "no") == "yes":
if recipe.spec.get("instruction", False) is not False:
steps.append(Step(step_type="piece_removal", spec={}))
skip.add(i + 1)
if count_found:
steps.append(Step(step_type="count_end", spec={}))
skip.add(i + 1)
if step.step_type in ("leak_1", "leak_2"):
self.leak_step = step
# Ensure Free Fall leak test appears before leak_1 when both are enabled
step_types = [step.step_type for step in steps]
if "test_freefall_leak" in step_types and "leak_1" in step_types:
ff_index = step_types.index("test_freefall_leak")
l1_index = step_types.index("leak_1")
if ff_index > l1_index:
# Move Free Fall step to be immediately before leak_1
steps.insert(l1_index, steps.pop(ff_index))
# Recompute step_types after reordering
step_types = [step.step_type for step in steps]
if "leak_1" in step_types and "leak_2" in step_types:
leak1_index = step_types.index("leak_1")
leak2_index = step_types.index("leak_2")
if leak1_index + 1 == leak2_index: # Ensure 'leak_1' is immediately followed by 'leak_2'
if recipe and getattr(recipe, 'spec', None) and recipe.spec.get(
"instruction_extra") and "instruction_extra" not in self.unsupported_steps:
steps.insert(leak2_index, Step(step_type="instruction_extra", spec={}))
inserted_instruction = True
# Insert 'instruction_extra' after the first 'instructions' if not inserted between leaks
if not inserted_instruction and recipe and getattr(recipe, 'spec', None) and recipe.spec.get(
"instruction_extra") and "instruction_extra" not in self.unsupported_steps:
for i, step in enumerate(steps):
if step.step_type == "instructions":
steps.insert(i + 1, Step(step_type="instruction_extra", spec={}))
inserted_instruction = True
break
if not print_found:
steps.append(Step(step_type="done"))
if count_found:
steps.append(Step(step_type="count_end"))
steps.append(Step(step_type="wait"))
self.cycle_steps = steps
self.check_steps_dependencies(self.cycle_steps)
leak_autotest_steps = []
# CONFIGURE LEAK AUTOTEST PARAMETERS
if self.config["autotest_leak"]["enabled"] == "true":
l_at_1 = copy.deepcopy(self.config["autotest_leak"])
l_at_1.pop("enabled")
l_at_1 = {a: float(x) for a, x in l_at_1.items()}
l_at_1["autotest"] = "ko_check"
l_at_2 = copy.deepcopy(self.config["autotest_leak"])
l_at_2.pop("enabled")
l_at_2 = {a: float(x) for a, x in l_at_2.items()}
l_at_2["test_pressure_qneg"] = l_at_2["test_pressure_tt_qneg"]
l_at_2["test_pressure_qpos"] = l_at_2["test_pressure_tt_qpos"]
l_at_2["autotest"] = "ok_check"
leak_autotest_steps = [Step(step_type="leak_1", spec=l_at_1), Step(step_type="leak_1", spec=l_at_2)]
self.autotest_cycle_steps = [
*([
Step(step_type="resistance", spec={
"scale": 500,
"expected": 1,
"tolerance_pos": 5,
"tolerance_neg": 5,
"autotest": True,
}),
Step(step_type="resistance", spec={
"scale": 500,
"expected": float("+inf"),
"tolerance_pos": 0,
"tolerance_neg": 0,
"autotest": True,
}),
Step(step_type="resistance", spec={
"scale": 500,
"expected": 10,
"tolerance_pos": 1,
"tolerance_neg": 1,
"autotest": True,
}),
Step(step_type="resistance", spec={
"scale": 500,
"expected": float("+inf"),
"tolerance_pos": 0,
"tolerance_neg": 0,
"autotest": True,
}),
] if "resistance" not in self.unsupported_steps else []),
*(leak_autotest_steps),
Step(step_type="done"),
Step(step_type="wait"),
]
for w in self.cycle_available_steps.values():
if hasattr(w, "reset"):
w.reset()
# UPDATE RECIPE DISPLAY
if self.recipe is not None:
self.log.info(
f"set recipe: {model_to_dict(self.recipe)!r} cycle steps: {[s.step_type for s in self.cycle_steps]}")
self.recipe_l.setText(self.recipe.name)
self.recipe_l.setStyleSheet("")
self.cycle_index = -1
self.next()
else:
self.log.info(f"set recipe: {self.recipe!r} cycle steps: {self.cycle_steps}")
self.recipe_l.setText("NON SELEZIONATA")
self.recipe_l.setStyleSheet("QLabel { color: red; }")
def check_steps_dependencies(self, steps):
unsupported_steps = set()
missing_components = set()
for step in steps:
if step.step_type in self.unsupported_steps or step.step_type not in self.cycle_available_steps:
unsupported_steps.add(step.step_type)
else:
for dependency in self.steps_dependencies.get(step.step_type, []):
if isinstance(dependency, tuple):
if all([d not in self.components for d in dependency]):
unsupported_steps.add(step.step_type)
else:
unready = set()
for d in dependency:
if d in self.components:
if not self.components[d].ready:
self.components[d].reconfigure()
if not self.components[d].ready:
unready.add(d)
else:
unready.add(d)
if unready == set(dependency):
missing_components.add(" or ".join(dependency))
else:
if dependency not in self.components:
unsupported_steps.add(step.step_type)
else:
if not self.components[dependency].ready:
self.components[dependency].reconfigure()
if not self.components[dependency].ready:
missing_components.add(dependency)
if len(unsupported_steps):
QMessageBox.critical(None, "Errore Ricetta",
f"Questa ricetta contiene uno o piu step non supportati da questo banco:\n{', '.join(sorted(unsupported_steps))}\nModificare la ricetta adeguatamente.")
if len(missing_components):
QMessageBox.critical(None, "Errore Componenti Ricetta",
f"Questa ricetta richiede i seguenti componenti per essere eseguita:\n{', '.join(sorted(missing_components))}\nModificare la ricetta adeguatamente o collegare i componenti elencati.")
if len(unsupported_steps) or len(missing_components):
self.change_recipe()
def set_step(self, step_name, data=None):
if step_name not in self.data:
self.data[step_name] = {}
if data is not None:
data["step"] = self.step.spec
data["step"].pop("name", None)
# MAKE ARRAY ONLY IF MORE THAN ONE TEST OF SAME TYPE
if len(self.data[step_name]) > 1:
self.data[step_name][str(len(self.data[step_name]))] = data
else:
self.data[step_name] = data
self.data["overridden"] = self.data["overridden"] or data.get("overridden", False)
self.data["ok"] = self.data["ok"] and data.get("ok", False)
self.next()
def done(self, ok=True):
self.log.info("cycle done, saving data...")
# Remove useless info
self.data.get("recipe", {}).get("spec", {}).pop("steps", None)
# Consolidate and preserve leak results for saving; include also free-fall leak if present
for leak in ["test_freefall_leak", "leak_1", "leak_2"]:
if leak in self.data and isinstance(self.data[leak], dict) and "results" in self.data[leak]:
leak_container = self.data[leak]
raw_results = leak_container.get("results", {})
# Try to find the dictionary that contains the "Running test:" keys
device_results = {}
if isinstance(raw_results, dict):
# 1) Preferred: nested under tester_component key
if self.tester_component and self.tester_component in raw_results and isinstance(raw_results[self.tester_component], dict):
device_results = raw_results[self.tester_component]
else:
# 2) Fallback: scan top-level values to find a dict with expected keys
for v in raw_results.values():
if isinstance(v, dict) and any(k.startswith("Running test:") for k in v.keys()):
device_results = v
break
# 3) As a last resort, maybe the keys are already at this level
if not device_results and any(k.startswith("Running test:") for k in raw_results.keys()):
device_results = raw_results
# Build a compact, consistent summary while preserving raw data
summary = {}
# Preserve textual result (if any)
if isinstance(device_results, dict):
summary["Running test: result"] = device_results.get("Running test: result", raw_results.get("Running test: result", "N/A"))
# Numeric metrics (rounded)
for k in [
"Running test: filling pressure",
"Running test: measured leak",
"Running test: pressure at the end of settling",
]:
try:
v = device_results.get(k, raw_results.get(k, None))
if v is not None:
summary[k] = round(float(v), 2)
except Exception:
pass
# Derive pressure at end of measure if possible
try:
if "Running test: pressure at the end of settling" in summary and "Running test: measured leak" in summary:
summary["Running test: pressure at the end of measure"] = round(
float(summary["Running test: pressure at the end of settling"]) + float(summary["Running test: measured leak"]), 2
)
except Exception:
pass
# Attach the cleaned summary, but keep the raw results for traceability
leak_container["raw_results"] = raw_results
leak_container["results"] = summary
elif leak in self.data:
self.log.warning(f"Leak step '{leak}' has no results; skipping consolidation...")
if "vision" in self.data:
vision = self.data.get("vision", {})
# Save vision results if available
out_paths = self.components["vision_saver"].save(
save_time=vision.get("time", None),
frame=vision.get("frame", None),
)
vision["files"] = out_paths
self.log.info(f"cycle vision saved locally: {out_paths!r}")
vision.pop("frame", None)
vision.pop("render", None)
vision.pop("detections", None)
if "results" in vision.keys():
vision["results"].pop("results", None)
if self.autotesting:
self.data["autotest"] = True
self.data["autotest_reason"] = self.autotesting_reason
self.data["recipe"]["name"] = "AUTOTEST"
# Archive and update results
archived = Archive.archive(self.data, ok and self.data["ok"], overridden=self.data["overridden"])
self.log.info(f"cycle archived locally: {archived!r}")
if not self.autotesting:
# Count results based on success or failure
if ok:
self.pieces["ok"] += 1
else:
self.pieces["ko"] += 1
else:
if self.autotesting_reason == "logout":
if ok:
# Make sure to properly logout the user before opening login screen
from lib.db.models.users import Users
from components import ArchiveSynchronizer
Users.logout()
ArchiveSynchronizer.machine_status = "logged-out"
self.main_window.open_login()
return archived
@staticmethod
def labellify(v):
if v is None:
v = "-"
elif isinstance(v, float):
v = f"{v:.1f}"
if not isinstance(v, str):
v = str(v)
return v
def print(self, archived, label):
self.log.info("cycle print")
if archived is None:
self.log.error("attempting to print empty label")
return None
if archived.label is not None:
# raise AssertionError("this should never happen")
self.log.info("printing already compiled label")
# LABEL PRINT
recipe = archived.test_data.get("recipe", {})
# Define barcode format variables
self.module_data_format = "1IT ECE{SN7}"
# Use suffix_mod_43 from print step editor if available, otherwise use default
self.barcode_format = self.print_step.spec.get("barcode")
leak_test_1 = archived.test_data.get("leak_1", {})
leak_test_1_step = leak_test_1.get("step", {})
leak_test_1_step_spec = leak_test_1_step.get("spec", {})
leak_test_1_results = leak_test_1.get("results", {})
leak_test_2 = archived.test_data.get("leak_2", {})
leak_test_2_step = leak_test_2.get("step", {})
leak_test_2_step_spec = leak_test_2_step.get("spec", {})
leak_test_2_results = leak_test_2.get("results", {})
psetminp_a = leak_test_1_step_spec.get("test_pressure", 0) * (
100 + leak_test_1_step_spec.get("test_pressure_qneg", 0) / 100)
psetmaxp_a = leak_test_1_step_spec.get("settling_pressure_max_percent", 0) * (
100 + leak_test_1_step_spec.get("test_pressure_qpos", 0) / 100)
psetminp2_a = leak_test_2_step_spec.get("settling_pressure_min_percent", 0) * (
100 + leak_test_2_step_spec.get("test_pressure_qneg", 0) / 100)
psetmaxp2_a = leak_test_2_step_spec.get("settling_pressure_max_percent", 0) * (
100 + leak_test_2_step_spec.get("test_pressure_qpos", 0) / 100)
if self.tester_component is not None:
if self.recipe.spec["leak_1"]:
leak_test_1_results["Running test: pressure at the end of measure"] = (
leak_test_1_results["Running test: pressure at the end of settling"]
+ leak_test_1_results["Running test: measured leak"])
if self.recipe.spec["leak_2"]:
leak_test_2_results["Running test: pressure at the end of measure"] = (
leak_test_2_results["Running test: pressure at the end of settling"]
+ leak_test_2_results["Running test: measured leak"])
printer_fields = self.print_step.spec
context = {
# RECIPE DATA
"RECIPE": self.labellify(recipe.get("name", "-")),
"CLIENT": self.labellify(recipe.get("client", "-")),
"PART": self.labellify(recipe.get("part_number", "-")),
"DESCRIPTION": self.labellify(recipe.get("description", "-")),
"RECIPE_TO_PRINT": self.labellify(self.print_step.spec.get("labeltxt_5", "-").replace('{N11}', '')),
# STEP SPEC
"TPREFILL": self.labellify(leak_test_1_step.get("pre_filling_time", "-")),
"PPREFILL": self.labellify(leak_test_1_step.get("pre_filling_pressure", "-")),
"TFILL": self.labellify(leak_test_1_step.get("filling_time", "-")),
"TSET": self.labellify(leak_test_1_step.get("settling_time", "-")),
"TPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_time", "-")),
"PPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_pressure", "-")),
"TFILL2": self.labellify(leak_test_2_step.get("filling_time", "-")),
"TSET2": self.labellify(leak_test_2_step.get("settling_time", "-")),
"PSETMINP": self.labellify(leak_test_1_step.get("settling_pressure_min_percent", " -")),
"PSETMAXP": self.labellify(leak_test_1_step.get("settling_pressure_max_percent", " -")),
"PSETMINP2": self.labellify(leak_test_2_step.get("settling_pressure_min_percent", " -")),
"PSETMAXP2": self.labellify(leak_test_2_step.get("settling_pressure_max_percent", " -")),
"PSETMINP_A": self.labellify(psetminp_a),
"PSETMAXP_A": self.labellify(psetmaxp_a),
"PSETMINP2_A": self.labellify(psetminp2_a),
"PSETMAXP2_A": self.labellify(psetmaxp2_a),
"TTEST": self.labellify(leak_test_1_step.get("test_time", "-")),
"TTEST2": self.labellify(leak_test_2_step.get("test_time", "-")),
"PMIN": self.labellify(leak_test_1_step.get("test_pressure_qneg", "-")),
"PMIN2": self.labellify(leak_test_2_step.get("test_pressure_qneg", "-")),
"PTEST": self.labellify(leak_test_1_step.get("test_pressure", "-")),
"PTEST2": self.labellify(leak_test_2_step.get("test_pressure", "-")),
"PMAX": self.labellify(leak_test_1_step.get("test_pressure_qpos", "-")),
"TFLUSH": self.labellify(leak_test_1_step.get("flush_time", "-")),
"PFLUSH": self.labellify(leak_test_1_step.get("flush_pressure", "-")),
# ACTUAL TESTED VALUES
"RESPFILL": self.labellify(leak_test_1_results.get("Running test: filling pressure", "-")),
"RESPFILL2": self.labellify(leak_test_2_results.get("Running test: filling pressure", "-")),
"RESPSET": self.labellify(leak_test_1_results.get("Running test: pressure at the end of settling", "-")),
"RESPSET2": self.labellify(leak_test_2_results.get("Running test: pressure at the end of settling", "-")),
"RESPEND": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")),
"RESPEND2": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")),
"RESLEAK": self.labellify(leak_test_1_results.get("Running test: measured leak", "-")),
"RESLEAK2": self.labellify(leak_test_2_results.get("Running test: measured leak", "-")),
"RESRES": self.labellify(leak_test_1_results.get("Running test: result", "-")),
"RESRES2": self.labellify(leak_test_2_results.get("Running test: result", "-")),
# SERIAL DEFINITION
"SN": str(archived.id),
"SN4": f"{archived.id:0>4}",
"SN5": f"{archived.id:0>5}",
"SN6": f"{archived.id:0>6}",
"SN7": f"{archived.id:0>7}",
# TIME DEFINITION
"DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"),
"DATE": archived.time.strftime("%d/%m/%Y"),
"TIME": archived.time.strftime("%H:%M:%S"),
"YYYY": archived.time.strftime("%Y"),
"YY": archived.time.strftime("%y"),
"MO": archived.time.strftime("%m"),
"DD": archived.time.strftime("%d"),
"HH": archived.time.strftime("%H"),
"MI": archived.time.strftime("%M"),
"SS": archived.time.strftime("%S"),
"JJJ": archived.time.strftime("%j"),
"WW": archived.time.strftime("%W"),
# EXTRA DATA
"SHIFT": str(get_shift(archived.time)),
"STATION": str(self.config.machine_id),
"OPERATOR": str(archived.user.username),
"BADGE_NUM": str(archived.user.badge_number),
# BARCODE
"BCODE": str(self.step.spec.get("barcode", "")),
# RESULT
"RESULT": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO") + str(
" FORZATO" if self.data.get("overridden", False) else ""),
"RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""),
"RESULT_L2": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO"),
}
# TESTING BROTHER
label_brother = context.get("RECIPE_TO_PRINT", "-") + context.get("DD", "-") + context.get("MO",
"-") + context.get(
"YY", "-") + context.get("SN5", "-")
barcode = str(label_brother)
# Ensure any labeltxt_N fields from the recipe are available in context (both lower and upper case)
for n in range(5):
field = f"labeltxt_{n + 1}"
if field in printer_fields.keys() and printer_fields[field] != "":
value = printer_fields[field]
context[field] = value # e.g., 'labeltxt_1'
context[field.upper()] = value # e.g., 'LABELTXT_1'
# Process any {M43:X:Y} patterns in the barcode format
processed_barcode_format = self.process_m43_patterns(self.barcode_format, context)
formatted_barcode = processed_barcode_format.format(**context)
context['BCODE'] = formatted_barcode
self.printed_barcode = formatted_barcode
if self.archived is not None:
self.archived.barcode = self.printed_barcode
# PRINT MAIN PRODUCT LABEL
# Determine which OS label printer to use based on per-recipe selection
selected_printer = printer_fields.get("printer_selection", "")
lp2_cfg = self.config.get("label_printer_2", {})
lp2_printer = lp2_cfg.get("printer", "")
use_comp_name = "label_printer_2" if selected_printer and lp2_printer and selected_printer == lp2_printer else "label_printer"
comp = self.components.get(use_comp_name) or self.components.get("label_printer") or self.components.get(
"label_printer_2")
if comp is None:
# No printer component available; log and skip printing safely
self.log.warning("No label printer component available; skipping label print.")
return context
# Set the target device name to selected printer if provided
if selected_printer:
try:
comp.printer = selected_printer
except Exception:
pass
compiled_label = comp.print_label(label, context=context)
self.log.info(f"Main label printed: {context!r}")
# return fields used to print label for saving into test archive
return context
def print_extra_labels(self):
# PRINT EXTRA LABELS IF NEEDED (BEFORE LEAK TEST)
if "extra_label_printer" in self.components.keys() and self.print_step is not None and self.autotesting is False:
printer_fields = self.print_step.spec
if len(printer_fields["extra_label"]) > 0:
labels = printer_fields["extra_label"].split(",")
for label in labels:
self.components["extra_label_printer"].print_label(f"{label}.prn", context=None)
@pyqtSlot(str)
def load_recipe_from_rfid(self, data):
if data not in (None, ''):
self.tag_loaded_recipe = data
if self.step.step_type == "barcode_recipe_selection":
if data is not None:
self.cycle_available_steps["barcode_recipe_selection"].widget.get(data)
else:
pass
else:
# fixture removed
self.tag_loaded_recipe = None
self.fail_cycle()
self.change_recipe()
def add_error(self, message, is_error):
"""
Add a new error message to the list of active errors.
Args:
message (str): The error message to add.
is_error (bool): True if it's an error that should show in red, False for non-errors.
"""
# Avoid duplicates
if (message, is_error) not in self.active_errors:
self.active_errors.append((message, is_error))
# Start the timer if it's not already active
if not self.error_timer.isActive():
self.error_timer.start()
def remove_error(self, message):
"""
Remove an error message from the list of active errors.
Args:
message (str): The error message to remove.
"""
self.active_errors = [err for err in self.active_errors if err[0] != message]
# Reset the error index if necessary
if self.current_error_index >= len(self.active_errors):
self.current_error_index = 0
# Stop the timer if there are no more errors
if not self.active_errors:
self.error_label.setText("")
self.error_label.setStyleSheet("")
self.error_timer.stop()
def display_current_error(self):
"""
Display the current error from the active errors list. If there are multiple errors,
it alternates between them every time this function is called.
"""
if self.active_errors:
# Get the current error message and update the label
message, is_error = self.active_errors[self.current_error_index]
self.error_label.setText(message)
if is_error:
self.error_label.setStyleSheet("color: red;")
else:
self.error_label.setStyleSheet("color: green;")
# Move to the next error for the next call to this method
self.current_error_index = (self.current_error_index + 1) % len(self.active_errors)
else:
# Clear the label if there are no errors
self.error_label.setText("")
self.error_label.setStyleSheet("")
@pyqtSlot(bool)
def handle_rfid_error(self, connected):
"""
Handle errors related to the RFID system.
Args:
connected (bool): True if connected, False if not.
"""
if connected:
self.remove_error("Errore RFID.") # Remove the RFID error
else:
self.add_error("Errore RFID.", True) # Add the RFID error
@pyqtSlot(bool, str)
def handle_modbus_error(self, has_error, error_message):
"""
Handle Tecna/Modbus errors and manage the error list.
Args:
has_error (bool): True if there is an error, False otherwise.
error_message (str): The error message to add.
"""
# print(f"DEBUG: Modbus error handler called - has_error={has_error}, error_message={error_message}") # Debugging
if has_error:
self.add_error(f"Errore Tecna", True) # Add the Modbus error
else:
self.remove_error(f"Errore Tecna") # Remove the Modbus error
def update_label_with_args(self, extra_info=None):
"""
Updates the flag label with the string representation of current command-line arguments
and optional extra info, directly displaying it on the label.
Args:
extra_info (str): Optional. Extra information to append to the label text.
"""
# Combine command-line arguments
args_text = " ".join(sys.argv[1:]) if len(
sys.argv) > 1 else "No system arguments provided." # Default to No args
if args_text: # If there are CLI arguments (or default message)
# Combine CLI args and extra info for label text
if extra_info:
args_text += f" | {extra_info}"
# Update the label text directly with args_text
self.flag_label.setText(args_text)
self.flag_label.setStyleSheet("QLabel { color: red; font-weight: bold; }") # Customize color if needed
self.flag_label.setVisible(True) # Ensure the label is visible
else: # No args provided
self.flag_label.setVisible(False) # Hide label
def process_m43_patterns(self, barcode_format, context):
"""
Process any {M43:X:Y} patterns in the barcode format string.
The pattern {M43:X:Y} is replaced with the modulo 43 check digit calculated
for the substring of the formatted barcode starting at position X and
containing Y characters.
Args:
barcode_format: The barcode format string that may contain {M43:X:Y} patterns.
context: The context dictionary used for formatting.
Returns:
The processed barcode format string with {M43:X:Y} patterns replaced by
placeholders that will be filled with the calculated check digits.
"""
import re
# If barcode_format is None or empty, return it as is
if not barcode_format:
self.log.warning("Empty barcode format provided")
return barcode_format
# Log the input barcode format and context
self.log.info(f"Processing barcode format: '{barcode_format}'")
self.log.info(f"Context keys: {list(context.keys())}")
# Create a temporary copy of the barcode format for processing
processed_format = barcode_format
# Find all {M43:X:Y} patterns in the barcode format
pattern = r'\{M43:(\d+):(\d+)\}'
matches = list(re.finditer(pattern, barcode_format))
self.log.info(f"Found {len(matches)} M43 patterns in barcode format")
# If no M43 patterns found, return the original format
if not matches:
self.log.info("No M43 patterns found in barcode format")
return barcode_format
# Process each match
for i, match in enumerate(matches):
# Extract X and Y values
x = int(match.group(1))
y = int(match.group(2))
self.log.info(f"Processing M43 pattern {i + 1}: X={x}, Y={y}")
# Create a placeholder for this check digit
placeholder = f"{{m43_check_{i}}}"
# Replace the {M43:X:Y} pattern with the placeholder
processed_format = processed_format.replace(match.group(0), placeholder)
# Format the barcode without the M43 patterns to get the base string
# First, create a temporary format string with all M43 patterns removed
temp_format = barcode_format
for m in matches:
temp_format = temp_format.replace(m.group(0), "")
# Format the temporary string with the context
try:
base_string = temp_format.format(**context)
self.log.info(f"Base string for checksum calculation: '{base_string}'")
except KeyError as e:
self.log.error(f"KeyError when formatting base string: {e}")
context[f"m43_check_{i}"] = "?"
continue
# Extract the substring for checksum calculation
if x < len(base_string) and x + y <= len(base_string):
substring = base_string[x:x + y]
# Log the substring and its length for debugging
self.log.info(f"M43 substring for checksum calculation: '{substring}', length: {len(substring)}")
# Remove any characters that are not valid for checksum calculation
# Check if the substring contains any invalid characters
invalid_chars = [c for c in substring if c not in self.MODULO43_ASSIGNMENT_TABLE]
if invalid_chars:
original_substring = substring
substring = ''.join(c for c in substring if c in self.MODULO43_ASSIGNMENT_TABLE)
self.log.info(
f"Removed invalid characters {invalid_chars} from substring: '{original_substring}' -> '{substring}'")
# Check if the substring is empty or contains only whitespace
if not substring or substring.isspace():
self.log.warning(f"Empty or whitespace-only substring for M43 pattern {i + 1}")
context[f"m43_check_{i}"] = "?"
continue
# Calculate the check digit
try:
check_digit = self.calculate_modulo43_checksum(substring)
# Log the calculated check digit
self.log.info(f"Calculated check digit: '{check_digit}' for substring: '{substring}'")
# Add the check digit to the context
context[f"m43_check_{i}"] = check_digit
except ValueError as e:
self.log.error(f"Error calculating checksum: {e}")
context[f"m43_check_{i}"] = "?"
else:
# Handle out-of-range indices
self.log.warning(
f"M43 pattern with X={x}, Y={y} is out of range for string of length {len(base_string)}")
context[f"m43_check_{i}"] = "?"
self.log.info(f"Processed barcode format: '{processed_format}'")
return processed_format
def calculate_modulo43_checksum(self, data_sequence: str) -> str:
"""
Calculates the Modulo 43 checksum for a given data sequence.
The function determines the checksum value for each character based on a
predefined assignment table, calculates the total sum of these values,
and then finds the remainder of the division by 43. The character
corresponding to this remainder is the check digit.
Args:
data_sequence: The input string for which to calculate the checksum.
Returns:
The Modulo 43 check digit as a single character.
Raises:
ValueError: If the data_sequence contains a character not found in the
assignment table.
"""
# If the data sequence is empty, return a default value
if not data_sequence:
self.log.warning("Empty data sequence provided for checksum calculation")
return "?"
# Use the class variable for the assignment table
assignment_table = self.MODULO43_ASSIGNMENT_TABLE
# Log the input data sequence
self.log.info(f"Calculating checksum for data sequence: '{data_sequence}'")
total_sum = 0
char_values = []
for char in data_sequence:
if char in assignment_table:
char_value = assignment_table[char]
total_sum += char_value
char_values.append((char, char_value))
else:
self.log.error(f"Character '{char}' is not valid for checksum calculation.")
raise ValueError(f"Character '{char}' is not valid for checksum calculation.")
# Log the character values and total sum
self.log.info(f"Character values: {char_values}")
self.log.info(f"Total sum: {total_sum}")
remainder = total_sum % 43
self.log.info(f"Remainder (total_sum % 43): {remainder}")
# Invert the assignment_table to map the remainder back to the character
for char, value in assignment_table.items():
if value == remainder:
self.log.info(f"Check digit for remainder {remainder}: '{char}'")
return char
# This part of the code should be unreachable given the logic
self.log.error(f"No character found for remainder {remainder}")
return "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"