1427 lines
71 KiB
Python
Executable File
1427 lines
71 KiB
Python
Executable File
import copy
|
|
import logging
|
|
import os
|
|
import sys
|
|
import weakref
|
|
from datetime import datetime, timedelta
|
|
#from distutils.util import change_root
|
|
|
|
from PyQt5.QtCore import QTimer, pyqtSlot, pyqtSignal
|
|
from PyQt5.QtWidgets import QMessageBox
|
|
from lib.db import Archive, Recipes, Users
|
|
from lib.helpers import get_shift
|
|
from lib.helpers.step import Step
|
|
from playhouse.shortcuts import model_to_dict
|
|
from ui.barcode_recipe_selection import Barcode_Recipe_Selection
|
|
from ui.helpers import replace_widget
|
|
from ui.recipe_selection import Recipe_Selection
|
|
from ui.test_assembly import Test_Assembly
|
|
from ui.test_barcodes import Test_Barcodes
|
|
from ui.test_connector import Test_Connector
|
|
from ui.test_count import Test_Count
|
|
from ui.test_count_end import Test_Count_End
|
|
from ui.test_fail import Test_Fail
|
|
from ui.test_instructions import Test_Instructions
|
|
from src.ui.test_pipe_cutter import Test_Pipe_Cutter
|
|
from ui.test_leak import Test_Leak
|
|
from ui.test_resistance import Test_Resistance
|
|
from ui.test_screws import Test_Screws
|
|
from ui.test_vision import Test_Vision
|
|
from ui.test_warning_img import Test_Warning_Img
|
|
from ui.widget import Widget
|
|
from components import ArchiveSynchronizer
|
|
|
|
|
|
class Test(Widget):
|
|
# Modulo 43 assignment table for checksum calculation
|
|
MODULO43_ASSIGNMENT_TABLE = {
|
|
'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9,
|
|
'A': 10, 'B': 11, 'C': 12, 'D': 13, 'E': 14, 'F': 15, 'G': 16, 'H': 17, 'I': 18,
|
|
'J': 19, 'K': 20, 'L': 21, 'M': 22, 'N': 23, 'O': 24, 'P': 25, 'Q': 26, 'R': 27,
|
|
'S': 28, 'T': 29, 'U': 30, 'V': 31, 'W': 32, 'X': 33, 'Y': 34, 'Z': 35,
|
|
'-': 36, '.': 37, ' ': 38, '$': 39, '/': 40, '+': 41, '%': 42
|
|
}
|
|
|
|
def __init__(self, config, components=None, main_window=None):
|
|
super().__init__()
|
|
self.autotest_timer = None
|
|
self.main_window = main_window
|
|
self.config = config
|
|
self.components = components
|
|
# GET LOGGER
|
|
self.log = logging.getLogger("Test")
|
|
# SHOW MACHINE DESCRIPTION
|
|
self.machine_description_l.setText(self.config.get("machine", {}).get("description", "N/A"))
|
|
# SHOW USERNAME
|
|
session = Users.get_session()
|
|
self.original_username = session.username
|
|
|
|
# Check if we should set the label to "ADMIN"
|
|
if session.username.upper() == "ADMIN":
|
|
self.user_l.setText("ADMIN")
|
|
session._is_admin = True
|
|
else:
|
|
self.user_l.setText(session.username)
|
|
session._is_admin = False
|
|
|
|
if session.is_admin:
|
|
self.user_l.setStyleSheet("QLabel { color: red; }")
|
|
else:
|
|
self.user_l.setStyleSheet("")
|
|
# Store original admin status
|
|
self.had_admin = session.is_admin
|
|
|
|
self.flag_label.setVisible(False)
|
|
if len(sys.argv) > 1:
|
|
self.flag_label.setVisible(True)
|
|
self.update_label_with_args() # Initial update
|
|
else:
|
|
self.flag_label.setVisible(False)
|
|
|
|
# Initialize barcode formatting variables
|
|
self.barcode_prefix = ""
|
|
self.barcode_suffix = "*="
|
|
|
|
self.active_errors = [] # List to hold current errors (type: tuples of (message, is_error))
|
|
self.current_error_index = 0 # Keeps track of the current error index during alternation
|
|
|
|
# Timer for alternating errors
|
|
self.error_timer = QTimer()
|
|
self.error_timer.setInterval(2000) # Fire every 2 seconds
|
|
self.error_timer.timeout.connect(self.display_current_error) # Connect the timer to the display logic
|
|
|
|
# SHOW AND UPDATE TIME CLOCK
|
|
self.refresh_time(init=True)
|
|
# INIT RECIPE
|
|
self.recipe = None
|
|
if "fixture_id" in self.components:
|
|
self.rfid = self.components["fixture_id"]
|
|
self.rfid.rfid_error_signal.connect(self.handle_rfid_error)
|
|
if "tecna_t3" in self.components:
|
|
self.tecna = self.components["tecna_t3"]
|
|
#self.tecna.tecna_error_signal.connect(self.handle_modbus_error)
|
|
self.error_label.setText("")
|
|
self.error_label.setStyleSheet("QLabel { color: red; }")
|
|
|
|
|
|
|
|
if self.config["hardware_config"]["barcode_recipe_selection"] == "present":
|
|
self.recipe_selection_mode = "barcode"
|
|
else:
|
|
self.recipe_selection_mode = "table"
|
|
self.step = None
|
|
self.tester_component = None
|
|
if self.config["hardware_config"]["tecna_t3"] == "present":
|
|
self.tester_component = "tecna_t3"
|
|
#self.components["tecna_t3"].tecna_error_signal.connect(self.handle_modbus_error)
|
|
elif self.config["hardware_config"]["furness_controls"] == "present":
|
|
self.tester_component = "furness_control"
|
|
|
|
|
|
|
|
self.unsupported_steps = set()
|
|
self.steps_dependencies = {
|
|
"count": set(),
|
|
"connector": {"multicomp", },
|
|
"instruction": {"digital_io"},
|
|
"screws": {"screwdriver", "tecna_t3", },
|
|
"resistance": {"multicomp", },
|
|
"leak_1": {self.tester_component, },
|
|
"test_freefall_leak": {self.tester_component, },
|
|
"leak_2": {self.tester_component, },
|
|
"pipe_cutter": {"pipe_cutter"},
|
|
"vision": {("uvc_camera", "galaxy_camera","hikrobot_sc"), "vision", "vision_saver", }, # "neo_pixels", },
|
|
"print": {"label_printer_2"} if self.config["hardware_config"]["label_printer"] != "present" else {"label_printer"},
|
|
}
|
|
self.unsupported_steps = set()
|
|
for step_name, dependencies in self.steps_dependencies.items():
|
|
for dependency in dependencies:
|
|
if isinstance(dependency, tuple):
|
|
# if all([d not in self.components or not self.components[d].ready for d in dependency]):
|
|
if all([d not in self.components for d in dependency]):
|
|
self.unsupported_steps.add(step_name)
|
|
else:
|
|
# if dependency not in self.components or not self.components[dependency].ready:
|
|
if dependency not in self.components:
|
|
self.unsupported_steps.add(step_name)
|
|
# Enforce second leak test hardware flag
|
|
if self.config["hardware_config"].get("second_leak_test", "absent") != "present":
|
|
self.unsupported_steps.add("leak_2")
|
|
# INIT PIECES COUNTER
|
|
self.pieces = {"ok": 0, "ko": 0}
|
|
# INIT CYCLE STATES
|
|
self.cycle_available_steps = {
|
|
# "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None),
|
|
"barcodes": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()),
|
|
"connector": Test_Assembly(img_path=self.select_step_img("scan"), text=u"COLLEGARE IL CONNETTORE INDICATO AL PEZZO E LEGGERE IL SUO BARCODE", widget=Test_Connector(run_once=True)),
|
|
"count": Test_Assembly(img_path=None, text=u"INSERIRE IL NUMERO DI PEZZI ATTESI PER IL LOTTO",
|
|
widget=Test_Count(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, run_once=True)),
|
|
"warning_img": Test_Assembly(img_path=None, text=u"ATTENZIONE - PER QUESTO CODICE ESEGUIRE LE OPERAZIONI INDICATE IN FIGURA",
|
|
widget=Test_Warning_Img(components=self.components, recipe=self.recipe,bench_name=self.config["machine"]["image_for_warning"], step=self.step, run_once=True)),
|
|
"count_end": Test_Assembly(img_path=None, text=u"LOTTO TERMINATO, PREMERE CONTINUA PERCOMINCIARNE UNO NUOVO",
|
|
widget=Test_Count_End(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
|
|
"done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None),
|
|
"emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"),
|
|
text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"", widget=None),
|
|
"fail": Test_Assembly(img_path=self.select_step_img("fail"), text=u"CICLO INTERROTTO, PREMERE CONTINUA PER COMINCIARE UN NUOVO CICLO", widget=Test_Fail(parent=self)),
|
|
"blow": Test_Assembly(img_path=None, text=u"SOFFIAGGIO TUBO IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)),
|
|
"test_freefall_leak": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self))
|
|
if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] !="absent" else None,
|
|
"leak_1": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self))
|
|
if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] !="absent" else None,
|
|
"leak_2": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self))
|
|
if ((self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] != "absent")
|
|
and self.config["hardware_config"].get("second_leak_test", "absent") == "present") else None,
|
|
"flush": Test_Assembly(img_path=None, text=u"SCARICO ARIA IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)),
|
|
"instruction": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO INDICATE IN FIGURA",
|
|
widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)),
|
|
"pipe_cutter": Test_Assembly(img_path=None, text=u"ATTENZIONE TAGLIO CORRUGATO IN CORSO",widget=Test_Pipe_Cutter(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)),
|
|
"instruction_extra": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO EXTRA INDICATE IN FIGURA",
|
|
widget=Test_Instructions(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)),
|
|
"piece_removal": Test_Assembly(img_path=None, text=u"RIMUOVERE IL PEZZO APRENDO TUTTE LE CHIUSURE",
|
|
widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)),
|
|
"print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO", widget=None),
|
|
"resistance": Test_Assembly(img_path=None, text=u"COLLEGARE CONNETTORE ELETTRICO PER EFFETTUARE PROVA RESISTENZA",
|
|
widget=Test_Resistance(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
|
|
"screws": Test_Assembly(img_path=None, text=u"AVVITARE TUTE LE VITI COME INDICATO", widget=Test_Screws(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
|
|
"select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE", widget=Recipe_Selection(config=self.config, unsupported_steps=self.unsupported_steps)),
|
|
"barcode_recipe_selection": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE SULLA DIMA DEL COMPONENTE DA COLLAUDARE",
|
|
widget=Barcode_Recipe_Selection(parent=self)),
|
|
"vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA", widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
|
|
"wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO", widget=None),
|
|
None: Test_Assembly(img_path=self.select_step_img("warning"), text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None),
|
|
}
|
|
self.cycle_steps = None
|
|
self.cycle_index = -1
|
|
self.print_step = None
|
|
self.last_label = None
|
|
self.require_discard_piece = False
|
|
# SETUP AUTOTEST
|
|
self.autotest_request = False
|
|
self.autotesting = False
|
|
self.autotesting_reason = None
|
|
self.autotest_cycle_steps = None
|
|
if "--no-autotest" in sys.argv:
|
|
self.setStyleSheet("background-color: red;")
|
|
else:
|
|
self.setStyleSheet("background-color: white;")
|
|
if "--no-autotest" not in sys.argv:
|
|
if "--test-autotest" in sys.argv:
|
|
self.autotest_period = int(60 * 1000) # 1 min
|
|
else:
|
|
#self.autotest_period = int(8.5 * 60 * 60 * 1000)# 8.5 HOURS
|
|
self.autotest_period = int(4 * 60 * 60 * 1000)# 4 HOURS
|
|
# self.autotest_period = 12 * 60 * 60 * 1000 # 12 HOURS
|
|
# if not self.config["autotest_done"]:
|
|
# self.request_autotest("init")
|
|
else:
|
|
self.autotest_period = None
|
|
# INIT TEST DATA
|
|
self.data = {"ok": True, "overridden": False}
|
|
self.archived = None
|
|
# CONNECT CYCLE CONTROLS
|
|
self.cancel_b.clicked.connect(self.fail_cycle)
|
|
self.change_recipe_b.clicked.connect(self.change_recipe)
|
|
self.reset_count_b.clicked.connect(self.reset_count)
|
|
for step_name, w in self.cycle_available_steps.items():
|
|
if hasattr(w, "ok"):
|
|
# custom ok handlers should call next again
|
|
if isinstance(w.widget, Recipe_Selection):
|
|
w.ok.connect(self.set_recipe)
|
|
else:
|
|
w.ok.connect(lambda data=None, step_namel=step_name, selfie=weakref.ref(self): selfie().set_step(step_namel, data))
|
|
if hasattr(w, "ko"):
|
|
w.ko.connect(self.fail_cycle)
|
|
# CUSTOM STEP CONNECTIONS
|
|
self.cycle_available_steps["count"].ok.connect(self.cycle_available_steps["count_end"].widget.set_amount)
|
|
# self.cycle_available_steps["warning_img"].ok.connect(self.cycle_available_steps["warning_img"].widget.set_done)
|
|
|
|
if "fixture_id" in self.components.keys():
|
|
self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid)
|
|
self.components["fixture_id"].rfid_error_signal.connect(self.handle_rfid_error)
|
|
self.tag_loaded_recipe = self.main_window.tag_loaded_recipe
|
|
|
|
# TESTING
|
|
if "--test" in sys.argv:
|
|
self.testing = True
|
|
else:
|
|
self.testing = False
|
|
# /TESTING
|
|
# START CYCLE
|
|
self.next_timer = QTimer()
|
|
self.next_timer.setSingleShot(True)
|
|
self.next_timer.timeout.connect(self.next)
|
|
self.next()
|
|
|
|
def refresh_time(self, init=False):
|
|
if init and not hasattr(self, 'time_timer'):
|
|
self.time_timer = QTimer()
|
|
self.time_timer.setSingleShot(False)
|
|
self.time_timer.timeout.connect(self.refresh_time)
|
|
|
|
t = datetime.now()
|
|
self.time_l.setText("{d}/{mo}/{y}\n{h}:{m:02d}".format(
|
|
y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute
|
|
))
|
|
|
|
# Always restart the timer, ensuring intervals are set correctly
|
|
if hasattr(self, 'time_timer'): # Safeguard for uninitialized timer
|
|
self.time_timer.start(30000)
|
|
|
|
def select_step_img(self, step, suffix=None):
|
|
img_path = "./src/ui/imgs"
|
|
names = []
|
|
if suffix is not None:
|
|
names.append(f"{step}_{suffix}_{self.config.machine_id}")
|
|
names.append(f"{step}_{suffix}")
|
|
names.append(f"{step}_{self.config.machine_id}")
|
|
names.append(f"{step}")
|
|
for name in names:
|
|
for ext in ["png", "jpg"]:
|
|
path = f"{img_path}/{name}.{ext}"
|
|
if os.path.isfile(path):
|
|
return path
|
|
raise FileNotFoundError(f"No image was found for step {step}")
|
|
|
|
def change_recipe(self):
|
|
self.next(action="change_recipe")
|
|
|
|
def set_recipe_mode_table(self):
|
|
self.recipe_selection_mode = "table"
|
|
self.change_recipe()
|
|
|
|
def set_recipe_mode_barcode(self):
|
|
self.recipe_selection_mode = "barcode"
|
|
self.change_recipe()
|
|
|
|
def cut_tube(self):
|
|
self.components["pipe_cutter"].to_calibrate()
|
|
self.components["pipe_cutter"].start_cutting()
|
|
|
|
def reprint_label(self):
|
|
self.print(self.last_label, self.print_step.spec.get("template", "EtichettaR5"))
|
|
|
|
def fail_cycle(self):
|
|
self.next(action="fail")
|
|
|
|
def setCentralWidget(self, widget):
|
|
replace_widget(self, "centralWidget", widget)
|
|
|
|
def enable_temp_admin(self):
|
|
"""Enable temporary admin privileges for the current user"""
|
|
session = Users.get_session()
|
|
if session is None:
|
|
self.log.warning("Cannot enable temporary admin privileges: No active session")
|
|
return False
|
|
|
|
# Save the current label text before changing it
|
|
self.saved_user_label_text = self.user_l.text()
|
|
|
|
# Enable temporary admin privileges using the Session class method
|
|
session.enable_temp_admin()
|
|
|
|
# Update UI to reflect admin status
|
|
self.user_l.setText("ADMIN")
|
|
self.user_l.setStyleSheet("QLabel { color: red; }")
|
|
|
|
#Refresh Recipe_Selection UI to show admin buttons
|
|
if "select_recipe" in self.cycle_available_steps and self.cycle_available_steps["select_recipe"].widget:
|
|
recipe_selection = self.cycle_available_steps["select_recipe"].widget
|
|
if hasattr(recipe_selection, "refresh"):
|
|
recipe_selection.refresh()
|
|
self.log.info("Recipe Selection UI refreshed to show admin buttons")
|
|
|
|
self.log.info(f"Temporary admin privileges enabled for user: {session.username}")
|
|
return True
|
|
|
|
def disable_temp_admin(self):
|
|
"""Disable temporary admin privileges and restore original user status"""
|
|
session = Users.get_session()
|
|
if session is None:
|
|
self.log.warning("Cannot disable temporary admin privileges: No active session")
|
|
return False
|
|
|
|
# Disable temporary admin privileges using the Session class method
|
|
session.disable_temp_admin()
|
|
|
|
# Restore original UI
|
|
# Use the saved label text if available, otherwise fall back to original username
|
|
if hasattr(self, 'saved_user_label_text'):
|
|
self.user_l.setText(self.saved_user_label_text)
|
|
else:
|
|
self.user_l.setText(self.original_username)
|
|
|
|
# Set style based on original admin status
|
|
if self.had_admin:
|
|
self.user_l.setStyleSheet("QLabel { color: red; }")
|
|
else:
|
|
self.user_l.setStyleSheet("")
|
|
|
|
# Refresh Recipe_Selection UI to hide admin buttons
|
|
if "select_recipe" in self.cycle_available_steps and self.cycle_available_steps["select_recipe"].widget:
|
|
recipe_selection = self.cycle_available_steps["select_recipe"].widget
|
|
if hasattr(recipe_selection, "refresh"):
|
|
recipe_selection.refresh()
|
|
self.log.info("Recipe Selection UI refreshed to hide admin buttons")
|
|
|
|
self.log.info(f"Temporary admin privileges disabled for user: {session.username}")
|
|
return True
|
|
|
|
def request_autotest(self, reason): # you can cancel the request calling request_autotest(False)
|
|
if "--no-autotest" not in sys.argv:
|
|
|
|
self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}")
|
|
if reason in ("init", "login"):
|
|
self.autotest_timer = QTimer()
|
|
self.autotest_timer.setSingleShot(False)
|
|
self.autotest_timer.timeout.connect(self.request_periodic_autotest)
|
|
if self.autotest_period is not None:
|
|
self.autotest_timer.start(self.autotest_period)
|
|
reason = "boot"
|
|
|
|
if not hasattr(self, "refresh_timer"):
|
|
self.refresh_timer = QTimer()
|
|
self.refresh_timer.setSingleShot(False)
|
|
self.refresh_timer.timeout.connect(self.refresh_time)
|
|
self.refresh_timer.start(30000)
|
|
|
|
if self.config["autotest_leak"]["enabled"] == "true":
|
|
self.autotest_request = reason
|
|
else:
|
|
self.autotest_request = False
|
|
else:
|
|
self.log.info(f"Autotest request ignored (reason: {reason!r}) --no-autotest flag detected")
|
|
if reason == "logout":
|
|
self.next(action="abort")
|
|
|
|
def request_periodic_autotest(self):
|
|
self.request_autotest("periodic")
|
|
|
|
def next(self, action=None):
|
|
if self.step is not None:
|
|
self.log.info(f"cycle step: {self.step.step_type!r} action: {action!r} current index:{self.cycle_index}")
|
|
else:
|
|
self.log.info(f"cycle step: {self.step!r} action: {action!r} current index:{self.cycle_index}")
|
|
|
|
# Track the previous step type to enable transition hooks (e.g., Free Fall -> Leak_1)
|
|
prev_step_type = self.step.step_type if self.step is not None else None
|
|
|
|
current_w = self.centralWidget
|
|
|
|
if hasattr(current_w, "stop"):
|
|
self.log.info(f"stopping widget {self.step.step_type}")
|
|
current_w.stop()
|
|
|
|
if action == "change_recipe":
|
|
self.log.info(f"cycle next: action: {action!r}")
|
|
self.set_recipe(recipe=None)
|
|
if self.config["hardware_config"]["tecna_t3"] == "present" or self.config["hardware_config"][
|
|
"furness_controls"] == "present":
|
|
# Reset recipe_written flags for leak widgets if they exist
|
|
leak1 = self.cycle_available_steps.get("leak_1")
|
|
if leak1 is not None and getattr(leak1, "widget", None) is not None:
|
|
leak1.widget.recipe_written = False
|
|
leak2 = self.cycle_available_steps.get("leak_2")
|
|
if leak2 is not None and getattr(leak2, "widget", None) is not None:
|
|
leak2.widget.recipe_written = False
|
|
self.step = Step(step_type="select_recipe")
|
|
self.cycle_index = -1
|
|
self.recipe = None
|
|
self.cycle_steps = None
|
|
# COUNT RESET
|
|
self.pieces["ok"] = 0
|
|
self.pieces["ko"] = 0
|
|
|
|
elif action in ("fail", "abort"):
|
|
self.log.info(f"cycle next: action: {action!r}")
|
|
# FAIL AND RESTART TEST
|
|
self.step = Step(step_type="fail")
|
|
self.cycle_index = -1
|
|
# COUNT FAIL
|
|
if action == "fail":
|
|
self.done(ok=False)
|
|
|
|
elif action is not None:
|
|
raise NotImplementedError(f"cycle next: action {action!r} is not a valid action")
|
|
|
|
# Set next cycle step normally if no action sets it explicitly
|
|
if self.recipe is None or self.cycle_steps is None:
|
|
# If recipe not set: select_recipe
|
|
if self.recipe_selection_mode == "barcode":
|
|
self.log.info(f"returning to barcode recipe selection")
|
|
self.step = Step(step_type="barcode_recipe_selection")
|
|
else:
|
|
self.log.info(f"returning to recipe selection table")
|
|
self.step = Step(step_type="select_recipe")
|
|
|
|
elif action is None:
|
|
if (
|
|
self.autotest_request is not False
|
|
and self.autotest_cycle_steps is not None
|
|
and not self.autotesting
|
|
and (self.cycle_index == -1 or self.cycle_index + 1 >= len(self.cycle_steps))
|
|
):
|
|
# If autotest was requested
|
|
# and if cycle_steps is not started or has ended
|
|
self.cycle_index = -1
|
|
self.autotesting = True
|
|
self.autotesting_reason = self.autotest_request
|
|
self.autotest_request = False
|
|
self.log.info(f"Autotest requested (reason: {self.autotesting_reason})")
|
|
if self.autotest_period is not None: # Reset periodic autotest timer
|
|
self.time_timer.start(self.autotest_period)
|
|
self.require_discard_piece = False
|
|
|
|
if self.autotesting:
|
|
if self.cycle_index + 1 < len(self.autotest_cycle_steps):
|
|
# Go to next step in autotest_cycle_steps
|
|
self.cycle_index += 1
|
|
self.step = self.autotest_cycle_steps[self.cycle_index]
|
|
else:
|
|
# When autotest ends, check if it needs to restart
|
|
if not self.data.get("ok"): # Autotest failed
|
|
self.log.warning("Restarting autotest from the first step due to failure.")
|
|
self.cycle_index = 0
|
|
self.step = self.autotest_cycle_steps[self.cycle_index] # Restart from the first step
|
|
else:
|
|
# Autotest succeeded; proceed to post-autotest actions
|
|
self.autotesting = False
|
|
if self.autotesting_reason == "logout":
|
|
Users.logout()
|
|
self.main_window.open_login()
|
|
else:
|
|
t = datetime.now()
|
|
self.last_at_l.setText(
|
|
"{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
|
|
t += timedelta(seconds=int(self.autotest_period / 1000))
|
|
self.next_at_l.setText(
|
|
"{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
|
|
self.autotesting_reason = None
|
|
self.cycle_index = -1
|
|
self.config["autotest_done"] = True
|
|
|
|
if not self.autotesting:
|
|
if len(self.cycle_steps):
|
|
# Go to next step in cycle_steps
|
|
self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps)
|
|
self.step = self.cycle_steps[self.cycle_index]
|
|
else:
|
|
self.cycle_index = -1
|
|
self.step = Step(step_type=None)
|
|
|
|
# Enable/disable cycle controls
|
|
self.change_recipe_b.setEnabled(self.recipe is not None)
|
|
self.cancel_b.setEnabled(self.step.step_type is not None and self.step.step_type not in {
|
|
"emergency",
|
|
"fail",
|
|
"select_recipe",
|
|
"wait",
|
|
})
|
|
|
|
self.log.info(f"next cycle step: {self.step.step_type!r}")
|
|
# INIT TEST DATA IF STARTING CYCLE OR RESET IS NEEDED
|
|
if self.cycle_index == 0 and not self.autotesting: # Initialize test data for normal cycles
|
|
self.data = {"ok": True, "overridden": False}
|
|
self.archived = None
|
|
if self.recipe is not None and "recipe" not in self.data:
|
|
self.data["recipe"] = model_to_dict(self.recipe)
|
|
|
|
# If transitioning from Free Fall to Leak 1, preload Leak 1 parameters onto the tester
|
|
try:
|
|
if (
|
|
prev_step_type == "test_freefall_leak"
|
|
and self.step is not None
|
|
and self.step.step_type == "leak_1"
|
|
and hasattr(self, "tester_component")
|
|
and self.tester_component in (self.components or {})
|
|
):
|
|
self.log.info("Transition detected: Free Fall -> Leak_1. Pre-writing Leak_1 recipe to tester.")
|
|
self.components[self.tester_component].write_recipe(self.recipe, self.step)
|
|
leak1 = self.cycle_available_steps.get("leak_1")
|
|
if leak1 is not None and getattr(leak1, "widget", None) is not None:
|
|
leak1.widget.recipe_written = True
|
|
except Exception as e:
|
|
try:
|
|
self.log.exception(f"Failed to pre-write Leak_1 recipe after Free Fall: {e}")
|
|
except Exception:
|
|
pass
|
|
|
|
# Remaining logic for updating widgets, starting timers, etc.
|
|
w = self.cycle_available_steps[self.step.step_type]
|
|
show = None
|
|
|
|
if self.step.step_type == "leak_2":
|
|
self.setCentralWidget(w) # Pre-show UI for leak_2
|
|
if hasattr(w, "start"):
|
|
show = w.start(recipe=self.recipe, step=self.step, pieces=self.pieces)
|
|
if show is not False and w is not current_w:
|
|
self.setCentralWidget(w)
|
|
elif show is False:
|
|
self.next_timer.start(0)
|
|
if self.step.step_type == "done":
|
|
self.archived = self.done()
|
|
self.last_label = copy.deepcopy(self.archived)
|
|
self.next_timer.start(500)
|
|
elif self.step.step_type == "print":
|
|
compiled_label = self.print(self.archived, self.step.spec.get("template", "EtichettaR5"))
|
|
self.archived.test_data.update({"print": compiled_label})
|
|
self.archived.test_data.update({"print_template": self.print_template})
|
|
self.archived.test_data.update({"barcode_stampato": self.printed_barcode})
|
|
self.archived.label = compiled_label
|
|
self.log.info(f"Label printed. Saving...")
|
|
# self.archived.save()
|
|
self.main_window.main_window.run_request.emit(self.archived.save, [], {})
|
|
self.next_timer.start(500)
|
|
elif self.step.step_type == "wait":
|
|
self.next_timer.start(500)
|
|
|
|
# Update display
|
|
self.update_count_display()
|
|
|
|
def reset_count(self):
|
|
# COUNT RESET
|
|
self.pieces["ok"] = 0
|
|
self.pieces["ko"] = 0
|
|
self.update_count_display()
|
|
|
|
def update_count_display(self):
|
|
self.pieces_count_l.setText(f"{self.pieces['ok']} OK / {self.pieces['ko']} NOK / {sum(self.pieces.values())} TOT")
|
|
|
|
def set_recipe(self, recipe=None):
|
|
self.recipe = recipe
|
|
self.config.active_recipe = recipe
|
|
inserted_instruction = False
|
|
self.require_discard_piece = False
|
|
if self.recipe is None:
|
|
self.cycle_steps = None
|
|
self.autotest_cycle_steps = None
|
|
else:
|
|
steps = self.recipe.get_steps()
|
|
skip = set()
|
|
print_found = False
|
|
count_found = False
|
|
# create step sequence list
|
|
barcode_names = ['serial', 'barcode_input_2', 'barcode_input_3', 'barcode_input_4', 'barcode_input_5']
|
|
for i, step in enumerate(steps):
|
|
if step.step_type == "barcodes":
|
|
n_pieces_value = step.spec.get("n_pieces")
|
|
# Fix: Handle empty string and None
|
|
n_pieces = 1 if n_pieces_value in (None, '') else n_pieces_value
|
|
try:
|
|
n_pieces_adapted = int(n_pieces)
|
|
except ValueError:
|
|
self.log.error(f"Invalid value for n_pieces: {n_pieces}") # Log the error
|
|
n_pieces_adapted = 1 # Default to 1 if conversion fails
|
|
|
|
if n_pieces_adapted == 1:
|
|
step.spec["barcode_name"] = 'serial'
|
|
else:
|
|
step.spec["barcode_name"] = barcode_names[(n_pieces_adapted - 1) % len(barcode_names)]
|
|
n_pieces_adapted -= 1
|
|
new_barcode_step = copy.deepcopy(step)
|
|
new_barcode_step.spec["n_pieces"] = str(n_pieces_adapted)
|
|
steps.insert(i + 1, new_barcode_step)
|
|
if i in skip:
|
|
continue
|
|
if step.step_type == "vision":
|
|
self.components["vision"].config_changed(vision_recipe=self.recipe.name)
|
|
if step.step_type == "count":
|
|
count_found = True
|
|
if "warning_img" in step.spec:
|
|
if step.spec["warning_img"]:
|
|
steps.insert(i, Step(step_type="warning_img", spec={"warning_img": step.spec["warning_img"]}))
|
|
skip.add(i + 1)
|
|
if "assembly" in step.spec:
|
|
if step.spec["assembly"]:
|
|
steps.insert(i, Step(step_type="instructions", spec={}))
|
|
skip.add(i + 1)
|
|
if "require_discard_piece" in step.spec:
|
|
if step.spec["require_discard_piece"]:
|
|
self.require_discard_piece = True
|
|
if step.step_type == "resistance": # ADD STEP TO ENSURE REMOVAL OF CONNECTOR
|
|
steps.insert(i + 1, Step(step_type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": float("+inf"),
|
|
"tolerance_pos": 0,
|
|
"tolerance_neg": 0,
|
|
}))
|
|
skip.add(i + 1)
|
|
if step.step_type == "print":
|
|
self.print_template = step.spec.get("template", "EtichettaR5") # Store the template
|
|
if print_found:
|
|
continue
|
|
steps.insert(i, Step(step_type="done", spec={}))
|
|
print_found = True
|
|
self.print_step = step
|
|
if self.config["hardware_config"].get("enforce_piece_removal", "no") == "yes":
|
|
if recipe.spec.get("instruction",False) is not False:
|
|
steps.append(Step(step_type="piece_removal", spec={}))
|
|
skip.add(i + 1)
|
|
if count_found:
|
|
steps.append(Step(step_type="count_end", spec={}))
|
|
skip.add(i + 1)
|
|
if step.step_type in ("leak_1", "leak_2"):
|
|
self.leak_step = step
|
|
|
|
# Ensure Free Fall leak test appears before leak_1 when both are enabled
|
|
step_types = [step.step_type for step in steps]
|
|
if "test_freefall_leak" in step_types and "leak_1" in step_types:
|
|
ff_index = step_types.index("test_freefall_leak")
|
|
l1_index = step_types.index("leak_1")
|
|
if ff_index > l1_index:
|
|
# Move Free Fall step to be immediately before leak_1
|
|
steps.insert(l1_index, steps.pop(ff_index))
|
|
# Recompute step_types after reordering
|
|
step_types = [step.step_type for step in steps]
|
|
|
|
if "leak_1" in step_types and "leak_2" in step_types:
|
|
leak1_index = step_types.index("leak_1")
|
|
leak2_index = step_types.index("leak_2")
|
|
if leak1_index + 1 == leak2_index: # Ensure 'leak_1' is immediately followed by 'leak_2'
|
|
if recipe and getattr(recipe, 'spec', None) and recipe.spec.get("instruction_extra") and "instruction_extra" not in self.unsupported_steps:
|
|
steps.insert(leak2_index, Step(step_type="instruction_extra", spec={}))
|
|
inserted_instruction = True
|
|
|
|
# Insert 'instruction_extra' after the first 'instructions' if not inserted between leaks
|
|
if not inserted_instruction and recipe and getattr(recipe, 'spec', None) and recipe.spec.get("instruction_extra") and "instruction_extra" not in self.unsupported_steps:
|
|
for i, step in enumerate(steps):
|
|
if step.step_type == "instructions":
|
|
steps.insert(i + 1, Step(step_type="instruction_extra", spec={}))
|
|
inserted_instruction = True
|
|
break
|
|
|
|
if not print_found:
|
|
steps.append(Step(step_type="done"))
|
|
if count_found:
|
|
steps.append(Step(step_type="count_end"))
|
|
steps.append(Step(step_type="wait"))
|
|
self.cycle_steps = steps
|
|
self.check_steps_dependencies(self.cycle_steps)
|
|
leak_autotest_steps = []
|
|
# CONFIGURE LEAK AUTOTEST PARAMETERS
|
|
if self.config["autotest_leak"]["enabled"] == "true":
|
|
l_at_1 = copy.deepcopy(self.config["autotest_leak"])
|
|
l_at_1.pop("enabled")
|
|
l_at_1 = {a: float(x) for a, x in l_at_1.items()}
|
|
l_at_1["autotest"] = "ko_check"
|
|
l_at_2 = copy.deepcopy(self.config["autotest_leak"])
|
|
l_at_2.pop("enabled")
|
|
l_at_2 = {a: float(x) for a, x in l_at_2.items()}
|
|
l_at_2["test_pressure_qneg"] = l_at_2["test_pressure_tt_qneg"]
|
|
l_at_2["test_pressure_qpos"] = l_at_2["test_pressure_tt_qpos"]
|
|
l_at_2["autotest"] = "ok_check"
|
|
leak_autotest_steps = [Step(step_type="leak_1", spec=l_at_1), Step(step_type="leak_1", spec=l_at_2)]
|
|
|
|
self.autotest_cycle_steps = [
|
|
*([
|
|
Step(step_type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": 1,
|
|
"tolerance_pos": 5,
|
|
"tolerance_neg": 5,
|
|
"autotest": True,
|
|
}),
|
|
Step(step_type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": float("+inf"),
|
|
"tolerance_pos": 0,
|
|
"tolerance_neg": 0,
|
|
"autotest": True,
|
|
}),
|
|
Step(step_type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": 10,
|
|
"tolerance_pos": 1,
|
|
"tolerance_neg": 1,
|
|
"autotest": True,
|
|
}),
|
|
Step(step_type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": float("+inf"),
|
|
"tolerance_pos": 0,
|
|
"tolerance_neg": 0,
|
|
"autotest": True,
|
|
}),
|
|
] if "resistance" not in self.unsupported_steps else []),
|
|
*(leak_autotest_steps),
|
|
Step(step_type="done"),
|
|
Step(step_type="wait"),
|
|
]
|
|
for w in self.cycle_available_steps.values():
|
|
if hasattr(w, "reset"):
|
|
w.reset()
|
|
# UPDATE RECIPE DISPLAY
|
|
if self.recipe is not None:
|
|
self.log.info(f"set recipe: {model_to_dict(self.recipe)!r} cycle steps: {[s.step_type for s in self.cycle_steps]}")
|
|
self.recipe_l.setText(self.recipe.name)
|
|
self.recipe_l.setStyleSheet("")
|
|
self.cycle_index = -1
|
|
self.next()
|
|
else:
|
|
self.log.info(f"set recipe: {self.recipe!r} cycle steps: {self.cycle_steps}")
|
|
self.recipe_l.setText("NON SELEZIONATA")
|
|
self.recipe_l.setStyleSheet("QLabel { color: red; }")
|
|
|
|
def check_steps_dependencies(self, steps):
|
|
unsupported_steps = set()
|
|
missing_components = set()
|
|
for step in steps:
|
|
if step.step_type in self.unsupported_steps or step.step_type not in self.cycle_available_steps:
|
|
unsupported_steps.add(step.step_type)
|
|
else:
|
|
for dependency in self.steps_dependencies.get(step.step_type, []):
|
|
if isinstance(dependency, tuple):
|
|
if all([d not in self.components for d in dependency]):
|
|
unsupported_steps.add(step.step_type)
|
|
else:
|
|
unready = set()
|
|
for d in dependency:
|
|
if d in self.components:
|
|
if not self.components[d].ready:
|
|
self.components[d].reconfigure()
|
|
if not self.components[d].ready:
|
|
unready.add(d)
|
|
else:
|
|
unready.add(d)
|
|
if unready == set(dependency):
|
|
missing_components.add(" or ".join(dependency))
|
|
else:
|
|
if dependency not in self.components:
|
|
unsupported_steps.add(step.step_type)
|
|
else:
|
|
if not self.components[dependency].ready:
|
|
self.components[dependency].reconfigure()
|
|
if not self.components[dependency].ready:
|
|
missing_components.add(dependency)
|
|
if len(unsupported_steps):
|
|
QMessageBox.critical(None, "Errore Ricetta",
|
|
f"Questa ricetta contiene uno o piu step non supportati da questo banco:\n{', '.join(sorted(unsupported_steps))}\nModificare la ricetta adeguatamente.")
|
|
if len(missing_components):
|
|
QMessageBox.critical(None, "Errore Componenti Ricetta",
|
|
f"Questa ricetta richiede i seguenti componenti per essere eseguita:\n{', '.join(sorted(missing_components))}\nModificare la ricetta adeguatamente o collegare i componenti elencati.")
|
|
if len(unsupported_steps) or len(missing_components):
|
|
self.change_recipe()
|
|
|
|
def set_step(self, step_name, data=None):
|
|
if step_name not in self.data:
|
|
self.data[step_name] = {}
|
|
if data is not None:
|
|
data["step"] = self.step.spec
|
|
data["step"].pop("name", None)
|
|
|
|
# MAKE ARRAY ONLY IF MORE THAN ONE TEST OF SAME TYPE
|
|
if len(self.data[step_name]) > 1:
|
|
self.data[step_name][str(len(self.data[step_name]))] = data
|
|
else:
|
|
self.data[step_name] = data
|
|
|
|
self.data["overridden"] = self.data["overridden"] or data.get("overridden", False)
|
|
self.data["ok"] = self.data["ok"] and data.get("ok", False)
|
|
self.next()
|
|
|
|
def done(self, ok=True):
|
|
self.log.info("cycle done, saving data...")
|
|
|
|
# Remove useless info
|
|
self.data.get("recipe", {}).get("spec", {}).pop("steps", None)
|
|
|
|
# Consolidate and preserve leak results for saving; include also free-fall leak if present
|
|
for leak in ["test_freefall_leak", "leak_1", "leak_2"]:
|
|
if leak in self.data and isinstance(self.data[leak], dict) and "results" in self.data[leak]:
|
|
leak_container = self.data[leak]
|
|
raw_results = leak_container.get("results", {})
|
|
|
|
# Try to find the dictionary that contains the "Running test:" keys
|
|
device_results = {}
|
|
if isinstance(raw_results, dict):
|
|
# 1) Preferred: nested under tester_component key
|
|
if self.tester_component and self.tester_component in raw_results and isinstance(raw_results[self.tester_component], dict):
|
|
device_results = raw_results[self.tester_component]
|
|
else:
|
|
# 2) Fallback: scan top-level values to find a dict with expected keys
|
|
for v in raw_results.values():
|
|
if isinstance(v, dict) and any(k.startswith("Running test:") for k in v.keys()):
|
|
device_results = v
|
|
break
|
|
# 3) As a last resort, maybe the keys are already at this level
|
|
if not device_results and any(k.startswith("Running test:") for k in raw_results.keys()):
|
|
device_results = raw_results
|
|
|
|
# Build a compact, consistent summary while preserving raw data
|
|
summary = {}
|
|
# Preserve textual result (if any)
|
|
if isinstance(device_results, dict):
|
|
summary["Running test: result"] = device_results.get("Running test: result", raw_results.get("Running test: result", "N/A"))
|
|
# Numeric metrics (rounded)
|
|
for k in [
|
|
"Running test: filling pressure",
|
|
"Running test: measured leak",
|
|
"Running test: pressure at the end of settling",
|
|
]:
|
|
try:
|
|
v = device_results.get(k, raw_results.get(k, None))
|
|
if v is not None:
|
|
summary[k] = round(float(v), 2)
|
|
except Exception:
|
|
pass
|
|
|
|
# Derive pressure at end of measure if possible
|
|
try:
|
|
if "Running test: pressure at the end of settling" in summary and "Running test: measured leak" in summary:
|
|
summary["Running test: pressure at the end of measure"] = round(
|
|
float(summary["Running test: pressure at the end of settling"]) + float(summary["Running test: measured leak"]), 2
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Attach the cleaned summary, but keep the raw results for traceability
|
|
leak_container["raw_results"] = raw_results
|
|
leak_container["results"] = summary
|
|
elif leak in self.data:
|
|
self.log.warning(f"Leak step '{leak}' has no results; skipping consolidation...")
|
|
|
|
if "vision" in self.data:
|
|
vision = self.data.get("vision", {})
|
|
# Save vision results if available
|
|
out_paths = self.components["vision_saver"].save(
|
|
save_time=vision.get("time", None),
|
|
frame=vision.get("frame", None),
|
|
)
|
|
vision["files"] = out_paths
|
|
self.log.info(f"cycle vision saved locally: {out_paths!r}")
|
|
vision.pop("frame", None)
|
|
vision.pop("render", None)
|
|
vision.pop("detections", None)
|
|
if "results" in vision.keys():
|
|
vision["results"].pop("results", None)
|
|
if self.autotesting:
|
|
self.data["autotest"] = True
|
|
self.data["autotest_reason"] = self.autotesting_reason
|
|
self.data["recipe"]["name"] = "AUTOTEST"
|
|
|
|
# Archive and update results
|
|
archived = Archive.archive(self.data, ok and self.data["ok"], overridden=self.data["overridden"])
|
|
self.log.info(f"cycle archived locally: {archived!r}")
|
|
if not self.autotesting:
|
|
# Count results based on success or failure
|
|
if ok:
|
|
self.pieces["ok"] += 1
|
|
else:
|
|
self.pieces["ko"] += 1
|
|
else:
|
|
if self.autotesting_reason == "logout":
|
|
if ok:
|
|
# Make sure to properly logout the user before opening login screen
|
|
from lib.db.models.users import Users
|
|
from components import ArchiveSynchronizer
|
|
Users.logout()
|
|
ArchiveSynchronizer.machine_status = "logged-out"
|
|
self.main_window.open_login()
|
|
|
|
return archived
|
|
|
|
@staticmethod
|
|
def labellify(v):
|
|
if v is None:
|
|
v = "-"
|
|
elif isinstance(v, float):
|
|
v = f"{v:.1f}"
|
|
if not isinstance(v, str):
|
|
v = str(v)
|
|
return v
|
|
|
|
def print(self, archived, label):
|
|
self.log.info("cycle print")
|
|
if archived is None:
|
|
self.log.error("attempting to print empty label")
|
|
return None
|
|
|
|
if archived.label is not None:
|
|
# raise AssertionError("this should never happen")
|
|
self.log.info("printing already compiled label")
|
|
|
|
# LABEL PRINT
|
|
recipe = archived.test_data.get("recipe", {})
|
|
|
|
# Define barcode format variables
|
|
self.module_data_format = "1IT ECE{SN7}"
|
|
# Use suffix_mod_43 from print step editor if available, otherwise use default
|
|
self.barcode_format = self.print_step.spec.get("barcode")
|
|
leak_test_1 = archived.test_data.get("leak_1", {})
|
|
leak_test_1_step = leak_test_1.get("step", {})
|
|
leak_test_1_step_spec = leak_test_1_step.get("spec", {})
|
|
leak_test_1_results = leak_test_1.get("results", {})
|
|
leak_test_2 = archived.test_data.get("leak_2", {})
|
|
leak_test_2_step = leak_test_2.get("step", {})
|
|
leak_test_2_step_spec = leak_test_2_step.get("spec", {})
|
|
leak_test_2_results = leak_test_2.get("results", {})
|
|
|
|
psetminp_a = leak_test_1_step_spec.get("test_pressure", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qneg", 0) / 100)
|
|
psetmaxp_a = leak_test_1_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qpos", 0) / 100)
|
|
psetminp2_a = leak_test_2_step_spec.get("settling_pressure_min_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qneg", 0) / 100)
|
|
psetmaxp2_a = leak_test_2_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qpos", 0) / 100)
|
|
if self.tester_component is not None:
|
|
if self.recipe.spec["leak_1"]:
|
|
leak_test_1_results["Running test: pressure at the end of measure"] = (
|
|
leak_test_1_results["Running test: pressure at the end of settling"]
|
|
+ leak_test_1_results["Running test: measured leak"])
|
|
if self.recipe.spec["leak_2"]:
|
|
leak_test_2_results["Running test: pressure at the end of measure"] = (
|
|
leak_test_2_results["Running test: pressure at the end of settling"]
|
|
+ leak_test_2_results["Running test: measured leak"])
|
|
|
|
printer_fields = self.print_step.spec
|
|
context = {
|
|
# RECIPE DATA
|
|
"RECIPE": self.labellify(recipe.get("name", "-")),
|
|
"CLIENT": self.labellify(recipe.get("client", "-")),
|
|
"PART": self.labellify(recipe.get("part_number", "-")),
|
|
"DESCRIPTION": self.labellify(recipe.get("description", "-")),
|
|
"RECIPE_TO_PRINT": self.labellify(self.print_step.spec.get("labeltxt_5", "-").replace('{N11}', '')),
|
|
# STEP SPEC
|
|
"TPREFILL": self.labellify(leak_test_1_step.get("pre_filling_time", "-")),
|
|
"PPREFILL": self.labellify(leak_test_1_step.get("pre_filling_pressure", "-")),
|
|
"TFILL": self.labellify(leak_test_1_step.get("filling_time", "-")),
|
|
"TSET": self.labellify(leak_test_1_step.get("settling_time", "-")),
|
|
"TPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_time", "-")),
|
|
"PPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_pressure", "-")),
|
|
"TFILL2": self.labellify(leak_test_2_step.get("filling_time", "-")),
|
|
"TSET2": self.labellify(leak_test_2_step.get("settling_time", "-")),
|
|
"PSETMINP": self.labellify(leak_test_1_step.get("settling_pressure_min_percent", " -")),
|
|
"PSETMAXP": self.labellify(leak_test_1_step.get("settling_pressure_max_percent", " -")),
|
|
"PSETMINP2": self.labellify(leak_test_2_step.get("settling_pressure_min_percent", " -")),
|
|
"PSETMAXP2": self.labellify(leak_test_2_step.get("settling_pressure_max_percent", " -")),
|
|
"PSETMINP_A": self.labellify(psetminp_a),
|
|
"PSETMAXP_A": self.labellify(psetmaxp_a),
|
|
"PSETMINP2_A": self.labellify(psetminp2_a),
|
|
"PSETMAXP2_A": self.labellify(psetmaxp2_a),
|
|
"TTEST": self.labellify(leak_test_1_step.get("test_time", "-")),
|
|
"TTEST2": self.labellify(leak_test_2_step.get("test_time", "-")),
|
|
"PMIN": self.labellify(leak_test_1_step.get("test_pressure_qneg", "-")),
|
|
"PMIN2": self.labellify(leak_test_2_step.get("test_pressure_qneg", "-")),
|
|
"PTEST": self.labellify(leak_test_1_step.get("test_pressure", "-")),
|
|
"PTEST2": self.labellify(leak_test_2_step.get("test_pressure", "-")),
|
|
"PMAX": self.labellify(leak_test_1_step.get("test_pressure_qpos", "-")),
|
|
"TFLUSH": self.labellify(leak_test_1_step.get("flush_time", "-")),
|
|
"PFLUSH": self.labellify(leak_test_1_step.get("flush_pressure", "-")),
|
|
# ACTUAL TESTED VALUES
|
|
"RESPFILL": self.labellify(leak_test_1_results.get("Running test: filling pressure", "-")),
|
|
"RESPFILL2": self.labellify(leak_test_2_results.get("Running test: filling pressure", "-")),
|
|
"RESPSET": self.labellify(leak_test_1_results.get("Running test: pressure at the end of settling", "-")),
|
|
"RESPSET2": self.labellify(leak_test_2_results.get("Running test: pressure at the end of settling", "-")),
|
|
"RESPEND": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")),
|
|
"RESPEND2": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")),
|
|
"RESLEAK": self.labellify(leak_test_1_results.get("Running test: measured leak", "-")),
|
|
"RESLEAK2": self.labellify(leak_test_2_results.get("Running test: measured leak", "-")),
|
|
"RESRES": self.labellify(leak_test_1_results.get("Running test: result", "-")),
|
|
"RESRES2": self.labellify(leak_test_2_results.get("Running test: result", "-")),
|
|
# SERIAL DEFINITION
|
|
"SN": str(archived.id),
|
|
"SN4": f"{archived.id:0>4}",
|
|
"SN5": f"{archived.id:0>5}",
|
|
"SN6": f"{archived.id:0>6}",
|
|
"SN7": f"{archived.id:0>7}",
|
|
# TIME DEFINITION
|
|
"DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"),
|
|
"DATE": archived.time.strftime("%d/%m/%Y"),
|
|
"TIME": archived.time.strftime("%H:%M:%S"),
|
|
"YYYY": archived.time.strftime("%Y"),
|
|
"YY": archived.time.strftime("%y"),
|
|
"MO": archived.time.strftime("%m"),
|
|
"DD": archived.time.strftime("%d"),
|
|
"HH": archived.time.strftime("%H"),
|
|
"MI": archived.time.strftime("%M"),
|
|
"SS": archived.time.strftime("%S"),
|
|
"JJJ": archived.time.strftime("%j"),
|
|
"WW": archived.time.strftime("%W"),
|
|
# EXTRA DATA
|
|
"SHIFT": str(get_shift(archived.time)),
|
|
"STATION": str(self.config.machine_id),
|
|
"OPERATOR": str(archived.user.username),
|
|
"BADGE_NUM": str(archived.user.badge_number),
|
|
# BARCODE
|
|
"BCODE": str(self.step.spec.get("barcode","")),
|
|
|
|
# RESULT
|
|
"RESULT": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO") + str(" FORZATO" if self.data.get("overridden", False) else ""),
|
|
"RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""),
|
|
"RESULT_L2": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO"),
|
|
}
|
|
#TESTING BROTHER
|
|
label_brother = context.get("RECIPE_TO_PRINT", "-") + context.get("DD","-") + context.get("MO","-") + context.get("YY","-") + context.get("SN5","-")
|
|
barcode = str(label_brother)
|
|
|
|
# Ensure any labeltxt_N fields from the recipe are available in context (both lower and upper case)
|
|
for n in range(5):
|
|
field = f"labeltxt_{n + 1}"
|
|
if field in printer_fields.keys() and printer_fields[field] != "":
|
|
value = printer_fields[field]
|
|
context[field] = value # e.g., 'labeltxt_1'
|
|
context[field.upper()] = value # e.g., 'LABELTXT_1'
|
|
|
|
# Process any {M43:X:Y} patterns in the barcode format
|
|
processed_barcode_format = self.process_m43_patterns(self.barcode_format, context)
|
|
formatted_barcode = processed_barcode_format.format(**context)
|
|
context['BCODE'] = formatted_barcode
|
|
self.printed_barcode = formatted_barcode
|
|
if self.archived is not None:
|
|
self.archived.barcode = self.printed_barcode
|
|
|
|
# PRINT MAIN PRODUCT LABEL
|
|
# Determine which label printer component to use based on per-recipe selection.
|
|
# The recipe now stores the printer by resolution string ("203"/"300").
|
|
sel_val = str(printer_fields.get("printer_selection", "")).strip()
|
|
lp1_cfg = self.config.get("label_printer", {}) or {}
|
|
lp2_cfg = self.config.get("label_printer_2", {}) or {}
|
|
# Parse selected resolution; also support legacy OS printer names for backward compatibility
|
|
def parse_resolution(val: str):
|
|
if not val:
|
|
return None
|
|
if val.isdigit():
|
|
try:
|
|
return int(val)
|
|
except Exception:
|
|
return None
|
|
# Legacy: match configured printer names
|
|
if val == str(lp1_cfg.get("printer", "")):
|
|
try:
|
|
return int(lp1_cfg.get("risoluzione", 300))
|
|
except Exception:
|
|
return 300
|
|
if val == str(lp2_cfg.get("printer", "")):
|
|
try:
|
|
return int(lp2_cfg.get("risoluzione", 300))
|
|
except Exception:
|
|
return 300
|
|
return None
|
|
sel_res = parse_resolution(sel_val)
|
|
# Read configured resolutions
|
|
try:
|
|
lp1_res = int(str(lp1_cfg.get("risoluzione", 300)).strip())
|
|
except Exception:
|
|
lp1_res = 300
|
|
try:
|
|
lp2_res = int(str(lp2_cfg.get("risoluzione", 0)).strip()) if lp2_cfg else 0
|
|
except Exception:
|
|
lp2_res = 0
|
|
# Choose component by resolution match; default to primary
|
|
use_comp_name = "label_printer"
|
|
if sel_res is not None:
|
|
if lp2_cfg and lp2_res and sel_res == lp2_res:
|
|
use_comp_name = "label_printer_2"
|
|
elif sel_res == lp1_res:
|
|
use_comp_name = "label_printer"
|
|
comp = self.components.get(use_comp_name) or self.components.get("label_printer") or self.components.get("label_printer_2")
|
|
if comp is None:
|
|
# No printer component available; log and skip printing safely
|
|
self.log.warning("No label printer component available; skipping label print.")
|
|
return context
|
|
# Set the target device name to the configured OS printer for the chosen component
|
|
try:
|
|
comp.printer = (lp2_cfg.get("printer") if use_comp_name == "label_printer_2" else lp1_cfg.get("printer")) or comp.printer
|
|
except Exception:
|
|
pass
|
|
compiled_label = comp.print_label(label, context=context)
|
|
self.log.info(f"Main label printed via {use_comp_name}: {context!r}")
|
|
# return fields used to print label for saving into test archive
|
|
return context
|
|
|
|
def print_extra_labels(self):
|
|
# PRINT EXTRA LABELS IF NEEDED (BEFORE LEAK TEST)
|
|
if "extra_label_printer" in self.components.keys() and self.print_step is not None and self.autotesting is False:
|
|
printer_fields = self.print_step.spec
|
|
if len(printer_fields["extra_label"]) > 0:
|
|
labels = printer_fields["extra_label"].split(",")
|
|
for label in labels:
|
|
self.components["extra_label_printer"].print_label(f"{label}.prn", context=None)
|
|
|
|
@pyqtSlot(str)
|
|
def load_recipe_from_rfid(self, data):
|
|
if data not in(None,''):
|
|
self.tag_loaded_recipe = data
|
|
if self.step.step_type == "barcode_recipe_selection":
|
|
if data is not None:
|
|
self.cycle_available_steps["barcode_recipe_selection"].widget.get(data)
|
|
else:
|
|
pass
|
|
|
|
else:
|
|
# fixture removed
|
|
self.tag_loaded_recipe = None
|
|
self.fail_cycle()
|
|
self.change_recipe()
|
|
|
|
def add_error(self, message, is_error):
|
|
"""
|
|
Add a new error message to the list of active errors.
|
|
|
|
Args:
|
|
message (str): The error message to add.
|
|
is_error (bool): True if it's an error that should show in red, False for non-errors.
|
|
"""
|
|
# Avoid duplicates
|
|
if (message, is_error) not in self.active_errors:
|
|
self.active_errors.append((message, is_error))
|
|
|
|
# Start the timer if it's not already active
|
|
if not self.error_timer.isActive():
|
|
self.error_timer.start()
|
|
|
|
def remove_error(self, message):
|
|
"""
|
|
Remove an error message from the list of active errors.
|
|
|
|
Args:
|
|
message (str): The error message to remove.
|
|
"""
|
|
self.active_errors = [err for err in self.active_errors if err[0] != message]
|
|
|
|
# Reset the error index if necessary
|
|
if self.current_error_index >= len(self.active_errors):
|
|
self.current_error_index = 0
|
|
|
|
# Stop the timer if there are no more errors
|
|
if not self.active_errors:
|
|
self.error_label.setText("")
|
|
self.error_label.setStyleSheet("")
|
|
self.error_timer.stop()
|
|
|
|
def display_current_error(self):
|
|
"""
|
|
Display the current error from the active errors list. If there are multiple errors,
|
|
it alternates between them every time this function is called.
|
|
"""
|
|
if self.active_errors:
|
|
# Get the current error message and update the label
|
|
message, is_error = self.active_errors[self.current_error_index]
|
|
self.error_label.setText(message)
|
|
if is_error:
|
|
self.error_label.setStyleSheet("color: red;")
|
|
else:
|
|
self.error_label.setStyleSheet("color: green;")
|
|
|
|
# Move to the next error for the next call to this method
|
|
self.current_error_index = (self.current_error_index + 1) % len(self.active_errors)
|
|
else:
|
|
# Clear the label if there are no errors
|
|
self.error_label.setText("")
|
|
self.error_label.setStyleSheet("")
|
|
|
|
@pyqtSlot(bool)
|
|
def handle_rfid_error(self, connected):
|
|
"""
|
|
Handle errors related to the RFID system.
|
|
|
|
Args:
|
|
connected (bool): True if connected, False if not.
|
|
"""
|
|
if connected:
|
|
self.remove_error("Errore RFID.") # Remove the RFID error
|
|
else:
|
|
self.add_error("Errore RFID.", True) # Add the RFID error
|
|
|
|
@pyqtSlot(bool, str)
|
|
def handle_modbus_error(self, has_error, error_message):
|
|
"""
|
|
Handle Tecna/Modbus errors and manage the error list.
|
|
|
|
Args:
|
|
has_error (bool): True if there is an error, False otherwise.
|
|
error_message (str): The error message to add.
|
|
"""
|
|
#print(f"DEBUG: Modbus error handler called - has_error={has_error}, error_message={error_message}") # Debugging
|
|
if has_error:
|
|
self.add_error(f"Errore Tecna", True) # Add the Modbus error
|
|
else:
|
|
self.remove_error(f"Errore Tecna") # Remove the Modbus error
|
|
|
|
def update_label_with_args(self, extra_info=None):
|
|
"""
|
|
Updates the flag label with the string representation of current command-line arguments
|
|
and optional extra info, directly displaying it on the label.
|
|
|
|
Args:
|
|
extra_info (str): Optional. Extra information to append to the label text.
|
|
"""
|
|
# Combine command-line arguments
|
|
args_text = " ".join(sys.argv[1:]) if len(
|
|
sys.argv) > 1 else "No system arguments provided." # Default to No args
|
|
|
|
if args_text: # If there are CLI arguments (or default message)
|
|
# Combine CLI args and extra info for label text
|
|
if extra_info:
|
|
args_text += f" | {extra_info}"
|
|
|
|
# Update the label text directly with args_text
|
|
self.flag_label.setText(args_text)
|
|
self.flag_label.setStyleSheet("QLabel { color: red; font-weight: bold; }") # Customize color if needed
|
|
self.flag_label.setVisible(True) # Ensure the label is visible
|
|
else: # No args provided
|
|
self.flag_label.setVisible(False) # Hide label
|
|
|
|
def process_m43_patterns(self, barcode_format, context):
|
|
"""
|
|
Process any {M43:X:Y} patterns in the barcode format string.
|
|
|
|
The pattern {M43:X:Y} is replaced with the modulo 43 check digit calculated
|
|
for the substring of the formatted barcode starting at position X and
|
|
containing Y characters.
|
|
|
|
Args:
|
|
barcode_format: The barcode format string that may contain {M43:X:Y} patterns.
|
|
context: The context dictionary used for formatting.
|
|
|
|
Returns:
|
|
The processed barcode format string with {M43:X:Y} patterns replaced by
|
|
placeholders that will be filled with the calculated check digits.
|
|
"""
|
|
import re
|
|
|
|
# If barcode_format is None or empty, return it as is
|
|
if not barcode_format:
|
|
self.log.warning("Empty barcode format provided")
|
|
return barcode_format
|
|
|
|
# Log the input barcode format and context
|
|
self.log.info(f"Processing barcode format: '{barcode_format}'")
|
|
self.log.info(f"Context keys: {list(context.keys())}")
|
|
|
|
# Create a temporary copy of the barcode format for processing
|
|
processed_format = barcode_format
|
|
|
|
# Find all {M43:X:Y} patterns in the barcode format
|
|
pattern = r'\{M43:(\d+):(\d+)\}'
|
|
matches = list(re.finditer(pattern, barcode_format))
|
|
self.log.info(f"Found {len(matches)} M43 patterns in barcode format")
|
|
|
|
# If no M43 patterns found, return the original format
|
|
if not matches:
|
|
self.log.info("No M43 patterns found in barcode format")
|
|
return barcode_format
|
|
|
|
# Process each match
|
|
for i, match in enumerate(matches):
|
|
# Extract X and Y values
|
|
x = int(match.group(1))
|
|
y = int(match.group(2))
|
|
self.log.info(f"Processing M43 pattern {i+1}: X={x}, Y={y}")
|
|
|
|
# Create a placeholder for this check digit
|
|
placeholder = f"{{m43_check_{i}}}"
|
|
|
|
# Replace the {M43:X:Y} pattern with the placeholder
|
|
processed_format = processed_format.replace(match.group(0), placeholder)
|
|
|
|
# Format the barcode without the M43 patterns to get the base string
|
|
# First, create a temporary format string with all M43 patterns removed
|
|
temp_format = barcode_format
|
|
for m in matches:
|
|
temp_format = temp_format.replace(m.group(0), "")
|
|
|
|
# Format the temporary string with the context
|
|
try:
|
|
base_string = temp_format.format(**context)
|
|
self.log.info(f"Base string for checksum calculation: '{base_string}'")
|
|
except KeyError as e:
|
|
self.log.error(f"KeyError when formatting base string: {e}")
|
|
context[f"m43_check_{i}"] = "?"
|
|
continue
|
|
|
|
# Extract the substring for checksum calculation
|
|
if x < len(base_string) and x + y <= len(base_string):
|
|
substring = base_string[x:x+y]
|
|
# Log the substring and its length for debugging
|
|
self.log.info(f"M43 substring for checksum calculation: '{substring}', length: {len(substring)}")
|
|
|
|
# Remove any characters that are not valid for checksum calculation
|
|
|
|
# Check if the substring contains any invalid characters
|
|
invalid_chars = [c for c in substring if c not in self.MODULO43_ASSIGNMENT_TABLE]
|
|
if invalid_chars:
|
|
original_substring = substring
|
|
substring = ''.join(c for c in substring if c in self.MODULO43_ASSIGNMENT_TABLE)
|
|
self.log.info(f"Removed invalid characters {invalid_chars} from substring: '{original_substring}' -> '{substring}'")
|
|
|
|
# Check if the substring is empty or contains only whitespace
|
|
if not substring or substring.isspace():
|
|
self.log.warning(f"Empty or whitespace-only substring for M43 pattern {i+1}")
|
|
context[f"m43_check_{i}"] = "?"
|
|
continue
|
|
|
|
# Calculate the check digit
|
|
try:
|
|
|
|
check_digit = self.calculate_modulo43_checksum(substring)
|
|
# Log the calculated check digit
|
|
self.log.info(f"Calculated check digit: '{check_digit}' for substring: '{substring}'")
|
|
# Add the check digit to the context
|
|
context[f"m43_check_{i}"] = check_digit
|
|
except ValueError as e:
|
|
self.log.error(f"Error calculating checksum: {e}")
|
|
context[f"m43_check_{i}"] = "?"
|
|
else:
|
|
# Handle out-of-range indices
|
|
self.log.warning(f"M43 pattern with X={x}, Y={y} is out of range for string of length {len(base_string)}")
|
|
context[f"m43_check_{i}"] = "?"
|
|
|
|
self.log.info(f"Processed barcode format: '{processed_format}'")
|
|
return processed_format
|
|
|
|
def calculate_modulo43_checksum(self,data_sequence: str) -> str:
|
|
"""
|
|
Calculates the Modulo 43 checksum for a given data sequence.
|
|
|
|
The function determines the checksum value for each character based on a
|
|
predefined assignment table, calculates the total sum of these values,
|
|
and then finds the remainder of the division by 43. The character
|
|
corresponding to this remainder is the check digit.
|
|
|
|
Args:
|
|
data_sequence: The input string for which to calculate the checksum.
|
|
|
|
Returns:
|
|
The Modulo 43 check digit as a single character.
|
|
|
|
Raises:
|
|
ValueError: If the data_sequence contains a character not found in the
|
|
assignment table.
|
|
"""
|
|
# If the data sequence is empty, return a default value
|
|
if not data_sequence:
|
|
self.log.warning("Empty data sequence provided for checksum calculation")
|
|
return "?"
|
|
|
|
# Use the class variable for the assignment table
|
|
assignment_table = self.MODULO43_ASSIGNMENT_TABLE
|
|
|
|
# Log the input data sequence
|
|
self.log.info(f"Calculating checksum for data sequence: '{data_sequence}'")
|
|
|
|
total_sum = 0
|
|
char_values = []
|
|
for char in data_sequence:
|
|
if char in assignment_table:
|
|
char_value = assignment_table[char]
|
|
total_sum += char_value
|
|
char_values.append((char, char_value))
|
|
else:
|
|
self.log.error(f"Character '{char}' is not valid for checksum calculation.")
|
|
raise ValueError(f"Character '{char}' is not valid for checksum calculation.")
|
|
|
|
# Log the character values and total sum
|
|
self.log.info(f"Character values: {char_values}")
|
|
self.log.info(f"Total sum: {total_sum}")
|
|
|
|
remainder = total_sum % 43
|
|
self.log.info(f"Remainder (total_sum % 43): {remainder}")
|
|
|
|
# Invert the assignment_table to map the remainder back to the character
|
|
for char, value in assignment_table.items():
|
|
if value == remainder:
|
|
self.log.info(f"Check digit for remainder {remainder}: '{char}'")
|
|
return char
|
|
|
|
# This part of the code should be unreachable given the logic
|
|
self.log.error(f"No character found for remainder {remainder}")
|
|
return "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
|