Compare commits

...

6 Commits

Author SHA1 Message Date
Eduardo
93f579086b fix dual chan wip 2026-05-12 09:03:55 +02:00
Eduardo
3e57f4b559 fix dual chan wip 2026-05-11 13:30:34 +02:00
eduar
accf5dc72b dual 2026-05-06 06:34:27 +02:00
eduar
4d4769a7be dual 2026-04-21 06:14:24 +02:00
eduar
afdee1335e carieri label 2026-04-16 07:00:46 +02:00
046b9fd712 dual channel 2026-04-08 12:06:30 +02:00
6 changed files with 258 additions and 95 deletions

View File

@ -0,0 +1,28 @@
CT~~CD,~CC^~CT~
^XA
~TA000
~JSN
^LT0
^MNW
^MTT
^PON
^PMN
^LH0,0
^JMA
^PR2,2
~SD20
^JUS
^LRN
^CI27
^PA0,1,1,0
^XZ
^XA
^MMT
^PW240
^LL128
^LS0
^FT75,107^BQN,2,2
^FH\^FDLA,<CN>HOSE<PN>5803647057|<RV>000|<BL>0000|<SC>B1|<AC>YES|<PC>YES|<TN>Errecinque srl|<SN>00001|<OCG>OCG36006208|<DT>16-16-2026|<LT>000000PART|<LV>000^FS
^FT20,124^A0N,17,25^FH\^CI28^FD{PART}_C1^FS^CI27
^PQ1,0,1,Y
^XZ

View File

@ -17,8 +17,9 @@ digital_io: absent
digital_io_flush_blow: present
second_leak_test: present
barcode_recipe_selection: present
external_flush_blow: present # EXTERNAL BOX CONTROLLING MULTI-CHANNEL TEST (IF PRESENT), BLOW-CLEANING AND EXTERNAL FLUSH
external_flush_blow: absent # EXTERNAL BOX CONTROLLING MULTI-CHANNEL TEST (IF PRESENT), BLOW-CLEANING AND EXTERNAL FLUSH
dual_channel: present
freefall: absent
#fixture_id: present
[tecna_t3]
@ -55,6 +56,7 @@ blow_led:7 # CLEAN INDICATOR
ch1_led:6 # CHANNEL 1 ACTIVE INDICATOR
ch2_led:5 # CHANNEL 2 ACTIVE INDICATOR
flush_led:4 # FLUSH INDICATOR
#first_output: 8 # Added first_output for freefall
[recipe]
recipe_name_field: codice_ricetta
@ -63,6 +65,7 @@ label_template_field: modello_etichetta
description_field: descrizione
[recipes_defaults]
tester_discharge_enable: yes
dimensione_lotto_abilitata:
tempo_soffiaggio: 5
tempo_pre_riempimento: 0
@ -109,7 +112,7 @@ settling_pressure_max_percent: 5
test_pressure: 7000
test_time: 10
test_pressure_qpos: 5 #Q+ Upper test leak limit
test_pressure_qneg: 15 #Q- Lower test leak limit
test_pressure_qneg: 15 #Q- Lower test leak limit (tube-tube)
test_pressure_tt_qpos: 1 # Q+ Upper test leak limit (tube-tube)
test_pressure_tt_qneg: 5 # Q- Lower test leak limit (tube-tube)
flush_time: 1

View File

@ -1,4 +1,4 @@
import csv
import sys
import time
from datetime import datetime
@ -7,38 +7,87 @@ from PyQt5.QtWidgets import QApplication
from src.components.os_label_printer import *
from src.lib.helpers import ConfigReader
SYSTEM_ID = "test-linux"
CSV_PATH="tmp/promatec.csv"
TEMPLATE="ferrari_flag_qr_only.prn"
#TEMPLATE="ferrari_30x16_203.prn"
# --- Product Data ---
PRODUCTS = {
"5803647057": {"CN": "pressure-hose_SG", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
"5803647058": {"CN": "Return-hose-SG", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
"5803647061": {"CN": "pressure-hose-fn4", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
"5803648415": {"CN": "NVH-HOSE", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
"5803688601": {"CN": "NVH-HOSE-xc13-fn4", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
"5803610921": {"CN": "NVH-HOSE-ESRA", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
"5803610922": {"CN": "NVH-HOSE-ESRA", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
"5803610923": {"CN": "NVH-HOSE-ESRA", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
}
config = ConfigReader(system_id=SYSTEM_ID)
printer=Os_Label_Printer(config=config,name="label_printer")
def generate_qr_code(product_pn, product_data, ocg, supplier_name, date_obj, serial_number):
# Format date as day/week_number/year, e.g., 09/15/2026
date_str = date_obj.strftime("%d/%W/%Y")
qr_string = (
f"<CN>{product_data['CN']}|<PN>{product_pn}|<RV>{product_data['RV']}|"
f"<BL>{product_data['BL']}|<SC>{product_data['SC']}|<AC>{product_data['AC']}|"
f"<PC>{product_data['PC']}|<TN>{supplier_name}|<SN>{serial_number:05d}|"
f"<OCG>{ocg}|<DT>{date_str}|<LT>{product_data['LT']}|<LV>{product_data['LV']}"
)
return qr_string
# timenow = datetime.now()
app = QApplication(sys.argv)
def main():
SYSTEM_ID = "st-ten-7"
TEMPLATE = "LabelCARRIERI.prn"
FIXED_OCG = "OCG36006208"
with open(CSV_PATH, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
START_SN = 1
STOP_SN = int(row["quantity"])
PN = row["part_number"]
print(f"PART NUMBER # {PN}")
config = ConfigReader(system_id=SYSTEM_ID)
printer = Os_Label_Printer(config=config, name="label_printer")
app = QApplication(sys.argv)
while True:
print("\n--- New Label Printing Session ---")
print("Available Part Numbers:")
for pn in PRODUCTS.keys():
print(f"- {pn}")
selected_pn = ""
while selected_pn not in PRODUCTS:
selected_pn = input("Enter the Part Number to print: ")
if selected_pn not in PRODUCTS:
print("Invalid Part Number. Please choose from the list above.")
product_info = PRODUCTS[selected_pn]
supplier_name = input("Enter the supplier name: ")
while True:
try:
quantity_str = input("Enter the number of labels to print [15]: ")
if not quantity_str:
quantity = 15
break
quantity = int(quantity_str)
if quantity > 0:
break
else:
print("Please enter a positive number.")
except ValueError:
print("Invalid number.")
print(f"Printing {quantity} labels for PART NUMBER # {selected_pn}")
input("Press Enter to continue...")
for sn in range(START_SN,STOP_SN+2):
last_label_context = None
for sn in range(1, quantity + 1):
timenow = datetime.now()
print(f"PRINTING LABEL # {sn}")
# Pass the incrementing serial number 'sn' to the QR code generator
qr_code = generate_qr_code(selected_pn, product_info, FIXED_OCG, supplier_name, timenow, sn)
# The context now contains the full QR_CODE string with the correct SN,
# and a separate SN for any human-readable fields on the label.
context = {
# RECIPE DATA
"PART": PN,
# SERIAL DEFINITION
"SN": str(sn),
"SN4": f"{sn:0>4}",
"SN5": f"{sn:0>5}",
"SN6": f"{sn:0>6}",
# TIME DEFINITION
"QR_CODE": qr_code,
"PART": selected_pn,
"SN": f"{sn:05d}",
"DATETIME": timenow.strftime("%d/%m/%Y %H:%M:%S"),
"DATE": timenow.strftime("%d/%m/%Y"),
"TIME": timenow.strftime("%H:%M:%S"),
@ -51,6 +100,29 @@ with open(CSV_PATH, "r", encoding="utf-8-sig") as f:
"SS": timenow.strftime("%S"),
"JJJ": timenow.strftime("%j"),
}
printer.print_label(TEMPLATE,context)
printer.print_label(TEMPLATE, context)
last_label_context = context
time.sleep(0.5)
time.sleep(.5)
while True:
action = input("\nReprint last label (r), print another product (a), or exit (e)? ").lower()
if action == 'r':
if last_label_context:
print("Reprinting last label...")
printer.print_label(TEMPLATE, last_label_context)
time.sleep(0.5)
else:
print("No label has been printed in this session to reprint.")
elif action == 'a':
break
elif action == 'e':
print("Exiting.")
return
else:
print("Invalid option. Please enter 'r', 'a', or 'e'.")
if action == 'a':
continue
if __name__ == "__main__":
main()

View File

@ -1122,7 +1122,12 @@ class Test(Widget):
# Process any {M43:X:Y} patterns in the barcode format
processed_barcode_format = self.process_m43_patterns(self.barcode_format, context)
if processed_barcode_format is None:
processed_barcode_format = self.print_step.spec.get("barcode", "-")
formatted_barcode = processed_barcode_format.format(**context)
context['BCODE'] = formatted_barcode
self.printed_barcode = formatted_barcode
if self.archived is not None:

View File

@ -24,22 +24,27 @@ class Test_Leak(Test_Test):
self.recipe_written = False
self.start_b.clicked.connect(self.start_test)
self.stop_b.clicked.connect(self.stop_test)
self.show_instruction_b.setVisible("show_instructions" in self.parent.config["hardware_config"].keys())
hardware_config = self.config.get("hardware_config", {})
self.show_instruction_b.setVisible("show_instructions" in hardware_config)
self.show_instruction_b.clicked.connect(self.show_instruction)
# Connect to the tecna_error_signal to handle connection issues
if self.tester_component in self.components:
self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error)
def show_instruction(self):
dialog=Dialog()
dialog.setCentralWidget(Test_Instructions_Reminder(recipe=self.parent.recipe,bench_name=self.parent.config.machine_id))
dialog.setCentralWidget(Test_Instructions_Reminder(recipe=self.recipe,bench_name=self.config.get("machine_id", "")))
dialog.show()
def reset(self):
if self.tester_component in self.components:
self.components[self.tester_component].stop_test()
super().reset()
def stop_test(self):
if self.tester_component in self.components:
self.components[self.tester_component].stop_test()
self.display_text(text="PROVA INTERROTTA", bg_color="yellow")
time.sleep(1)
@ -47,45 +52,62 @@ class Test_Leak(Test_Test):
self.stop_b.setEnabled(False)
def start_test(self):
if self.step.step_type == "test_freefall_leak":
self.set_digital_out("first_output", 1) # Set high
# print extra labels
# 1. HANDLE FREEFALL SPECIFIC OUTPUTS
hardware_config = self.config.get("hardware_config", [])
if self.step.step_type == "test_freefall_leak" and hardware_config.get("freefall") == "present":
self.set_digital_out("first_output", 1, component_name="digital_io_flush_blow")
else:
self.log.info("Skipping Bit 8: Freefall hardware is absent.")
# 2. PRINT LABELS IF NECESSARY
if self.step.step_type == "leak_1":
self.parent.print_extra_labels()
# SELECT TEST CHANNEL
if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present":
if self.parent.config["hardware_config"].get("dual_channel", None) == "present":
chan_sel = self.step.spec["chan_sel"] # 0=CH1, 1=CH2
self.set_digital_out("out_channel_select", chan_sel)
self.set_digital_out("in_channel_select", chan_sel)
time.sleep(VALVE_TIME)
# 3. CHANNEL SELECTION (with defensive checks)
hardware_config = self.config.get("hardware_config", {})
if hardware_config.get("dual_channel") == "present":
is_freefall_machine = hardware_config.get("freefall") == "present"
chan_sel = self.step.spec.get("chan_sel", 0)
if is_freefall_machine:
chan_sel = 1 - chan_sel
self.set_digital_out("out_channel_select", chan_sel, component_name="digital_io_flush_blow")
self.set_digital_out("in_channel_select", chan_sel, component_name="digital_io_flush_blow")
# SET LED INDICATORS
if chan_sel == 0:
self.set_digital_out("ch1_led", True)
self.set_digital_out("ch1_led", True, component_name="digital_io_flush_blow")
self.set_digital_out("ch2_led", False, component_name="digital_io_flush_blow")
else:
self.set_digital_out("ch2_led", True)
self.set_digital_out("ch2_led", True, component_name="digital_io_flush_blow")
self.set_digital_out("ch1_led", False, component_name="digital_io_flush_blow")
self.blow_on=True
time.sleep(VALVE_TIME)
# 4. EXTERNAL FLUSH / BLOW LOGIC
if hardware_config.get("external_flush_blow") == "present":
self.blow_on = True
self.display_text("SOFFIAGGIO IN CORSO...")
self.set_digital_out("blow_led",True)
self.set_digital_out("blow_on",True)
self.set_digital_out("blow_led", True, component_name="digital_io_flush_blow")
self.set_digital_out("blow_on", True, component_name="digital_io_flush_blow")
time.sleep(VALVE_TIME)
self.set_digital_out("flush_on", True)
blow_time=int(self.step.spec.get('ext_blow_time',3))
self.set_digital_out("blow_led", True)
self.set_digital_out("flush_on", True, component_name="digital_io_flush_blow")
blow_time = int(self.step.spec.get('ext_blow_time', 3))
time.sleep(blow_time)
self.set_digital_out("blow_led", False)
self.set_digital_out("blow_on", False)
self.set_digital_out("blow_led", False, component_name="digital_io_flush_blow")
self.set_digital_out("blow_on", False, component_name="digital_io_flush_blow")
time.sleep(VALVE_TIME)
self.set_digital_out("flush_on", False)
self.set_digital_out("flush_on", False, component_name="digital_io_flush_blow")
if self.parent.config["hardware_config"].get("dual_channel", None) != "present":
self.set_digital_out("ch1_led", True)
# 5. FALLBACK LED FOR SINGLE CHANNEL MACHINES
elif hardware_config.get("dual_channel") != "present":
self.set_digital_out("ch1_led", True, component_name="digital_io")
# 6. TRIGGER PHYSICAL TESTER
self.blow_on = False
if not self.simulate:
if self.tester_component in self.components:
self.components[self.tester_component].start_test()
@ -138,13 +160,16 @@ class Test_Leak(Test_Test):
if self.step.spec.get("autotest", False): # IF AUTOTESTING UPLOAD RECIPE EVERY TIME
self.recipe_written = False
if self.parent.config["hardware_config"].get("second_leak_test", "absent") == "present": # IF SECOND LEAK TEST ENABLED UPLOAD RECIPE EVERY TIME
hardware_config = self.config.get("hardware_config", {})
if hardware_config.get("second_leak_test") == "present": # IF SECOND LEAK TEST ENABLED UPLOAD RECIPE EVERY TIME
self.recipe_written = False
if not self.recipe_written:
if self.tester_component in self.components:
self.components[self.tester_component].write_recipe(self.recipe, self.step)
self.recipe_written=True
if self.tester_component in self.components:
self.get_connection = self.components[self.tester_component].out.connect(self.get)
self.components[self.tester_component].resume()
if self.parent_assembly_widget is not None:
@ -216,7 +241,8 @@ class Test_Leak(Test_Test):
# AUTO START SECOND TEST
if step.step_type == "leak_2":
if self.config["hardware_config"].get("dual_channel", "absent") == "present":
hardware_config = self.config.get("hardware_config", {})
if hardware_config.get("dual_channel") == "present":
self.recipe_written = False
time.sleep(1)
self.start_b.setEnabled(True)
@ -230,10 +256,12 @@ class Test_Leak(Test_Test):
def stop(self):
# disable test loop
if self.tester_component in self.components:
self.components[self.tester_component].stop_test()
self.components[self.tester_component].pause()
if self.get_connection:
self.disconnect(self.get_connection)
self.set_digital_out("first_output", 0) # Set low when test stops
self.set_digital_out("first_output", 0, component_name="digital_io_flush_blow") # Set low when test stops
super().stop()
self.start_b.setEnabled(False)
self.stop_b.setEnabled(False)
@ -305,7 +333,7 @@ class Test_Leak(Test_Test):
# Let end-of-measure be derived by saver when possible
}
if "Running test: result" in data[self.tester_component]:
if self.tester_component in data and "Running test: result" in data[self.tester_component]:
# TEST ENDED, CHECK RESULT
result = data[self.tester_component]["Running test: result"]
step=self.step.spec.get("autotest", "")
@ -333,26 +361,27 @@ class Test_Leak(Test_Test):
self.parent.autostart_next_step = True
# SET DIGITAL OUTPUTS
if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present":
hardware_config = self.config.get("hardware_config", {})
if hardware_config.get("external_flush_blow") == "present":
self.blow_on = True
if self.parent.config["hardware_config"].get("dual_channel", None) != "present":
self.set_digital_out("ch1_led", False)
if hardware_config.get("dual_channel") != "present":
# Corrected: Use 'digital_io' for single-channel machines
self.set_digital_out("ch1_led", False, component_name="digital_io")
self.display_text("SCARICO ESTERNO IN CORSO...")
self.set_digital_out("flush_led", True)
self.set_digital_out("flush_on", True)
self.set_digital_out("flush_led", True, component_name="digital_io_flush_blow")
self.set_digital_out("flush_on", True, component_name="digital_io_flush_blow")
time.sleep(VALVE_TIME)
flush_time = int(self.step.spec.get('ext_flush_time',3))
time.sleep(flush_time)
self.set_digital_out("flush_led", False)
self.set_digital_out("flush_led", False, component_name="digital_io_flush_blow")
#self.set_digital_out("flush_on", False)
if self.parent.config["hardware_config"].get("dual_channel", None) == "present":
self.set_digital_out("out_channel_select", False)
self.set_digital_out("in_channel_select", False)
self.set_digital_out("ch1_led", False)
self.set_digital_out("ch2_led", False)
if hardware_config.get("dual_channel") == "present":
self.set_digital_out("out_channel_select", False, component_name="digital_io_flush_blow")
self.set_digital_out("in_channel_select", False, component_name="digital_io_flush_blow")
self.set_digital_out("ch1_led", False, component_name="digital_io_flush_blow")
self.set_digital_out("ch2_led", False, component_name="digital_io_flush_blow")
else:
#result = None
ok = None
@ -389,7 +418,7 @@ class Test_Leak(Test_Test):
l.setText(str(v))
# Check if there's a connection issue before displaying test status
if hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost:
if self.tester_component in self.components and hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost:
# Connection is lost, don't display test status
# The handle_modbus_error method will display the appropriate message
# Just ensure buttons are in the correct state
@ -398,7 +427,7 @@ class Test_Leak(Test_Test):
return
# Check if the connection was just restored
if hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost:
if self.tester_component in self.components and hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost:
# Connection was just restored, don't display test status yet
# The handle_modbus_error method will display the appropriate message
# Just ensure buttons are in the correct state
@ -410,7 +439,7 @@ class Test_Leak(Test_Test):
if d.get("Running test: active phase", None) in {
"WAITING START",
"ATTESA START",
"END TEST, WAITING THE START OF A NEW TEST"
"END TEST, WAITING THE START OF A NEW TEST",
"FINE TEST",
"STANDBY",
"PRESSIONE BASSA",
@ -469,12 +498,26 @@ class Test_Leak(Test_Test):
QApplication.processEvents()
def set_digital_out(self,out_name=None,state=1,component_name="digital_io"):
# First, check if the component is even loaded for this machine
if component_name not in self.components:
# If not, just ignore the call. This is the expected behavior for machines
# without this specific hardware.
return
if self.io_ok:
bit = int(self.parent.config[component_name][out_name])
component_config = self.config.get(component_name, {})
bit_val = component_config.get(out_name)
if bit_val is not None:
bit = int(bit_val)
# Now we can safely access the component
ret = self.components[component_name].set_bit_verify(0,bit,state)
if not ret:
QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB")
self.io_ok = False
else:
# This warning is still useful for debugging configurations
print(f"Warning: Output '{out_name}' not found in '{component_name}' config for this machine.")
def save_last(self):
if self.last is None:
@ -502,6 +545,7 @@ class Test_Leak(Test_Test):
bg_color="red", text_color="white"
)
# Stop the test when a connection error is detected
if self.parent:
self.parent.fail_cycle()
# Always disable start button and enable stop button during connection issues
self.start_b.setEnabled(False)
@ -525,6 +569,7 @@ class Test_Leak(Test_Test):
bg_color="green", text_color="white"
)
# Reset the flag immediately to ensure the reconnection message is displayed
if self.tester_component in self.components:
self.components[self.tester_component]._previous_connection_lost = False
# Force a UI update to ensure the message is displayed
QApplication.processEvents()

View File

@ -236,8 +236,18 @@ class Test_Test(Widget):
return
def resizeEvent(self, event=None):
if hasattr(self, "img_l"):
self.img_l.setPixmap(self.img.scaled(self.img_l.width(), self.img_l.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation))
# 1. Check if the label widget exists
# 2. Check if the image attribute has been initialized
# 3. Check if the image is not None
if hasattr(self, "img_l") and hasattr(self, "img") and self.img is not None:
# Ensure the label has a valid width/height to avoid scaling errors
if self.img_l.width() > 0 and self.img_l.height() > 0:
self.img_l.setPixmap(self.img.scaled(
self.img_l.width(),
self.img_l.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
))
def challenge_admin(self, info):
if not self.admin_challenged: