Compare commits
6 Commits
master
...
dual-chann
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93f579086b | ||
|
|
3e57f4b559 | ||
|
|
accf5dc72b | ||
|
|
4d4769a7be | ||
|
|
afdee1335e | ||
| 046b9fd712 |
28
config/label_templates/203/LabelCARRIERI.prn
Normal file
28
config/label_templates/203/LabelCARRIERI.prn
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
CT~~CD,~CC^~CT~
|
||||||
|
^XA
|
||||||
|
~TA000
|
||||||
|
~JSN
|
||||||
|
^LT0
|
||||||
|
^MNW
|
||||||
|
^MTT
|
||||||
|
^PON
|
||||||
|
^PMN
|
||||||
|
^LH0,0
|
||||||
|
^JMA
|
||||||
|
^PR2,2
|
||||||
|
~SD20
|
||||||
|
^JUS
|
||||||
|
^LRN
|
||||||
|
^CI27
|
||||||
|
^PA0,1,1,0
|
||||||
|
^XZ
|
||||||
|
^XA
|
||||||
|
^MMT
|
||||||
|
^PW240
|
||||||
|
^LL128
|
||||||
|
^LS0
|
||||||
|
^FT75,107^BQN,2,2
|
||||||
|
^FH\^FDLA,<CN>HOSE<PN>5803647057|<RV>000|<BL>0000|<SC>B1|<AC>YES|<PC>YES|<TN>Errecinque srl|<SN>00001|<OCG>OCG36006208|<DT>16-16-2026|<LT>000000PART|<LV>000^FS
|
||||||
|
^FT20,124^A0N,17,25^FH\^CI28^FD{PART}_C1^FS^CI27
|
||||||
|
^PQ1,0,1,Y
|
||||||
|
^XZ
|
||||||
|
|
@ -17,8 +17,9 @@ digital_io: absent
|
||||||
digital_io_flush_blow: present
|
digital_io_flush_blow: present
|
||||||
second_leak_test: present
|
second_leak_test: present
|
||||||
barcode_recipe_selection: present
|
barcode_recipe_selection: present
|
||||||
external_flush_blow: present # EXTERNAL BOX CONTROLLING MULTI-CHANNEL TEST (IF PRESENT), BLOW-CLEANING AND EXTERNAL FLUSH
|
external_flush_blow: absent # EXTERNAL BOX CONTROLLING MULTI-CHANNEL TEST (IF PRESENT), BLOW-CLEANING AND EXTERNAL FLUSH
|
||||||
dual_channel: present
|
dual_channel: present
|
||||||
|
freefall: absent
|
||||||
#fixture_id: present
|
#fixture_id: present
|
||||||
|
|
||||||
[tecna_t3]
|
[tecna_t3]
|
||||||
|
|
@ -55,6 +56,7 @@ blow_led:7 # CLEAN INDICATOR
|
||||||
ch1_led:6 # CHANNEL 1 ACTIVE INDICATOR
|
ch1_led:6 # CHANNEL 1 ACTIVE INDICATOR
|
||||||
ch2_led:5 # CHANNEL 2 ACTIVE INDICATOR
|
ch2_led:5 # CHANNEL 2 ACTIVE INDICATOR
|
||||||
flush_led:4 # FLUSH INDICATOR
|
flush_led:4 # FLUSH INDICATOR
|
||||||
|
#first_output: 8 # Added first_output for freefall
|
||||||
|
|
||||||
[recipe]
|
[recipe]
|
||||||
recipe_name_field: codice_ricetta
|
recipe_name_field: codice_ricetta
|
||||||
|
|
@ -63,6 +65,7 @@ label_template_field: modello_etichetta
|
||||||
description_field: descrizione
|
description_field: descrizione
|
||||||
|
|
||||||
[recipes_defaults]
|
[recipes_defaults]
|
||||||
|
tester_discharge_enable: yes
|
||||||
dimensione_lotto_abilitata:
|
dimensione_lotto_abilitata:
|
||||||
tempo_soffiaggio: 5
|
tempo_soffiaggio: 5
|
||||||
tempo_pre_riempimento: 0
|
tempo_pre_riempimento: 0
|
||||||
|
|
@ -109,7 +112,7 @@ settling_pressure_max_percent: 5
|
||||||
test_pressure: 7000
|
test_pressure: 7000
|
||||||
test_time: 10
|
test_time: 10
|
||||||
test_pressure_qpos: 5 #Q+ Upper test leak limit
|
test_pressure_qpos: 5 #Q+ Upper test leak limit
|
||||||
test_pressure_qneg: 15 #Q- Lower test leak limit
|
test_pressure_qneg: 15 #Q- Lower test leak limit (tube-tube)
|
||||||
test_pressure_tt_qpos: 1 # Q+ Upper test leak limit (tube-tube)
|
test_pressure_tt_qpos: 1 # Q+ Upper test leak limit (tube-tube)
|
||||||
test_pressure_tt_qneg: 5 # Q- Lower test leak limit (tube-tube)
|
test_pressure_tt_qneg: 5 # Q- Lower test leak limit (tube-tube)
|
||||||
flush_time: 1
|
flush_time: 1
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
import csv
|
import sys
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
@ -7,38 +7,87 @@ from PyQt5.QtWidgets import QApplication
|
||||||
from src.components.os_label_printer import *
|
from src.components.os_label_printer import *
|
||||||
from src.lib.helpers import ConfigReader
|
from src.lib.helpers import ConfigReader
|
||||||
|
|
||||||
SYSTEM_ID = "test-linux"
|
# --- Product Data ---
|
||||||
CSV_PATH="tmp/promatec.csv"
|
PRODUCTS = {
|
||||||
TEMPLATE="ferrari_flag_qr_only.prn"
|
"5803647057": {"CN": "pressure-hose_SG", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
|
||||||
#TEMPLATE="ferrari_30x16_203.prn"
|
"5803647058": {"CN": "Return-hose-SG", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
|
||||||
|
"5803647061": {"CN": "pressure-hose-fn4", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
|
||||||
|
"5803648415": {"CN": "NVH-HOSE", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
|
||||||
|
"5803688601": {"CN": "NVH-HOSE-xc13-fn4", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
|
||||||
|
"5803610921": {"CN": "NVH-HOSE-ESRA", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
|
||||||
|
"5803610922": {"CN": "NVH-HOSE-ESRA", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
|
||||||
|
"5803610923": {"CN": "NVH-HOSE-ESRA", "RV": "000", "BL": "00001", "SC": "B1", "AC": "YES", "PC": "NO", "LT": "000000PART", "LV": "000"},
|
||||||
|
}
|
||||||
|
|
||||||
config = ConfigReader(system_id=SYSTEM_ID)
|
def generate_qr_code(product_pn, product_data, ocg, supplier_name, date_obj, serial_number):
|
||||||
printer=Os_Label_Printer(config=config,name="label_printer")
|
# Format date as day/week_number/year, e.g., 09/15/2026
|
||||||
|
date_str = date_obj.strftime("%d/%W/%Y")
|
||||||
|
|
||||||
|
qr_string = (
|
||||||
|
f"<CN>{product_data['CN']}|<PN>{product_pn}|<RV>{product_data['RV']}|"
|
||||||
|
f"<BL>{product_data['BL']}|<SC>{product_data['SC']}|<AC>{product_data['AC']}|"
|
||||||
|
f"<PC>{product_data['PC']}|<TN>{supplier_name}|<SN>{serial_number:05d}|"
|
||||||
|
f"<OCG>{ocg}|<DT>{date_str}|<LT>{product_data['LT']}|<LV>{product_data['LV']}"
|
||||||
|
)
|
||||||
|
return qr_string
|
||||||
|
|
||||||
# timenow = datetime.now()
|
def main():
|
||||||
app = QApplication(sys.argv)
|
SYSTEM_ID = "st-ten-7"
|
||||||
|
TEMPLATE = "LabelCARRIERI.prn"
|
||||||
|
FIXED_OCG = "OCG36006208"
|
||||||
|
|
||||||
|
config = ConfigReader(system_id=SYSTEM_ID)
|
||||||
|
printer = Os_Label_Printer(config=config, name="label_printer")
|
||||||
|
|
||||||
|
app = QApplication(sys.argv)
|
||||||
|
|
||||||
with open(CSV_PATH, "r", encoding="utf-8-sig") as f:
|
while True:
|
||||||
reader = csv.DictReader(f)
|
print("\n--- New Label Printing Session ---")
|
||||||
for row in reader:
|
print("Available Part Numbers:")
|
||||||
START_SN = 1
|
for pn in PRODUCTS.keys():
|
||||||
STOP_SN = int(row["quantity"])
|
print(f"- {pn}")
|
||||||
PN = row["part_number"]
|
|
||||||
print(f"PART NUMBER # {PN}")
|
selected_pn = ""
|
||||||
|
while selected_pn not in PRODUCTS:
|
||||||
|
selected_pn = input("Enter the Part Number to print: ")
|
||||||
|
if selected_pn not in PRODUCTS:
|
||||||
|
print("Invalid Part Number. Please choose from the list above.")
|
||||||
|
|
||||||
|
product_info = PRODUCTS[selected_pn]
|
||||||
|
|
||||||
|
supplier_name = input("Enter the supplier name: ")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
quantity_str = input("Enter the number of labels to print [15]: ")
|
||||||
|
if not quantity_str:
|
||||||
|
quantity = 15
|
||||||
|
break
|
||||||
|
quantity = int(quantity_str)
|
||||||
|
if quantity > 0:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
print("Please enter a positive number.")
|
||||||
|
except ValueError:
|
||||||
|
print("Invalid number.")
|
||||||
|
|
||||||
|
print(f"Printing {quantity} labels for PART NUMBER # {selected_pn}")
|
||||||
input("Press Enter to continue...")
|
input("Press Enter to continue...")
|
||||||
for sn in range(START_SN,STOP_SN+2):
|
|
||||||
|
last_label_context = None
|
||||||
|
for sn in range(1, quantity + 1):
|
||||||
timenow = datetime.now()
|
timenow = datetime.now()
|
||||||
print(f"PRINTING LABEL # {sn}")
|
print(f"PRINTING LABEL # {sn}")
|
||||||
|
|
||||||
|
# Pass the incrementing serial number 'sn' to the QR code generator
|
||||||
|
qr_code = generate_qr_code(selected_pn, product_info, FIXED_OCG, supplier_name, timenow, sn)
|
||||||
|
|
||||||
|
# The context now contains the full QR_CODE string with the correct SN,
|
||||||
|
# and a separate SN for any human-readable fields on the label.
|
||||||
context = {
|
context = {
|
||||||
# RECIPE DATA
|
"QR_CODE": qr_code,
|
||||||
"PART": PN,
|
"PART": selected_pn,
|
||||||
# SERIAL DEFINITION
|
"SN": f"{sn:05d}",
|
||||||
"SN": str(sn),
|
|
||||||
"SN4": f"{sn:0>4}",
|
|
||||||
"SN5": f"{sn:0>5}",
|
|
||||||
"SN6": f"{sn:0>6}",
|
|
||||||
# TIME DEFINITION
|
|
||||||
"DATETIME": timenow.strftime("%d/%m/%Y %H:%M:%S"),
|
"DATETIME": timenow.strftime("%d/%m/%Y %H:%M:%S"),
|
||||||
"DATE": timenow.strftime("%d/%m/%Y"),
|
"DATE": timenow.strftime("%d/%m/%Y"),
|
||||||
"TIME": timenow.strftime("%H:%M:%S"),
|
"TIME": timenow.strftime("%H:%M:%S"),
|
||||||
|
|
@ -51,6 +100,29 @@ with open(CSV_PATH, "r", encoding="utf-8-sig") as f:
|
||||||
"SS": timenow.strftime("%S"),
|
"SS": timenow.strftime("%S"),
|
||||||
"JJJ": timenow.strftime("%j"),
|
"JJJ": timenow.strftime("%j"),
|
||||||
}
|
}
|
||||||
printer.print_label(TEMPLATE,context)
|
printer.print_label(TEMPLATE, context)
|
||||||
|
last_label_context = context
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
time.sleep(.5)
|
while True:
|
||||||
|
action = input("\nReprint last label (r), print another product (a), or exit (e)? ").lower()
|
||||||
|
if action == 'r':
|
||||||
|
if last_label_context:
|
||||||
|
print("Reprinting last label...")
|
||||||
|
printer.print_label(TEMPLATE, last_label_context)
|
||||||
|
time.sleep(0.5)
|
||||||
|
else:
|
||||||
|
print("No label has been printed in this session to reprint.")
|
||||||
|
elif action == 'a':
|
||||||
|
break
|
||||||
|
elif action == 'e':
|
||||||
|
print("Exiting.")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
print("Invalid option. Please enter 'r', 'a', or 'e'.")
|
||||||
|
|
||||||
|
if action == 'a':
|
||||||
|
continue
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
|
||||||
|
|
@ -1122,7 +1122,12 @@ class Test(Widget):
|
||||||
|
|
||||||
# Process any {M43:X:Y} patterns in the barcode format
|
# Process any {M43:X:Y} patterns in the barcode format
|
||||||
processed_barcode_format = self.process_m43_patterns(self.barcode_format, context)
|
processed_barcode_format = self.process_m43_patterns(self.barcode_format, context)
|
||||||
|
|
||||||
|
if processed_barcode_format is None:
|
||||||
|
processed_barcode_format = self.print_step.spec.get("barcode", "-")
|
||||||
|
|
||||||
formatted_barcode = processed_barcode_format.format(**context)
|
formatted_barcode = processed_barcode_format.format(**context)
|
||||||
|
|
||||||
context['BCODE'] = formatted_barcode
|
context['BCODE'] = formatted_barcode
|
||||||
self.printed_barcode = formatted_barcode
|
self.printed_barcode = formatted_barcode
|
||||||
if self.archived is not None:
|
if self.archived is not None:
|
||||||
|
|
|
||||||
|
|
@ -24,69 +24,91 @@ class Test_Leak(Test_Test):
|
||||||
self.recipe_written = False
|
self.recipe_written = False
|
||||||
self.start_b.clicked.connect(self.start_test)
|
self.start_b.clicked.connect(self.start_test)
|
||||||
self.stop_b.clicked.connect(self.stop_test)
|
self.stop_b.clicked.connect(self.stop_test)
|
||||||
self.show_instruction_b.setVisible("show_instructions" in self.parent.config["hardware_config"].keys())
|
|
||||||
|
hardware_config = self.config.get("hardware_config", {})
|
||||||
|
self.show_instruction_b.setVisible("show_instructions" in hardware_config)
|
||||||
self.show_instruction_b.clicked.connect(self.show_instruction)
|
self.show_instruction_b.clicked.connect(self.show_instruction)
|
||||||
|
|
||||||
# Connect to the tecna_error_signal to handle connection issues
|
# Connect to the tecna_error_signal to handle connection issues
|
||||||
self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error)
|
if self.tester_component in self.components:
|
||||||
|
self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error)
|
||||||
|
|
||||||
def show_instruction(self):
|
def show_instruction(self):
|
||||||
dialog=Dialog()
|
dialog=Dialog()
|
||||||
dialog.setCentralWidget(Test_Instructions_Reminder(recipe=self.parent.recipe,bench_name=self.parent.config.machine_id))
|
dialog.setCentralWidget(Test_Instructions_Reminder(recipe=self.recipe,bench_name=self.config.get("machine_id", "")))
|
||||||
dialog.show()
|
dialog.show()
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
self.components[self.tester_component].stop_test()
|
if self.tester_component in self.components:
|
||||||
|
self.components[self.tester_component].stop_test()
|
||||||
super().reset()
|
super().reset()
|
||||||
|
|
||||||
def stop_test(self):
|
def stop_test(self):
|
||||||
self.components[self.tester_component].stop_test()
|
if self.tester_component in self.components:
|
||||||
|
self.components[self.tester_component].stop_test()
|
||||||
self.display_text(text="PROVA INTERROTTA", bg_color="yellow")
|
self.display_text(text="PROVA INTERROTTA", bg_color="yellow")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
self.start_b.setEnabled(True)
|
self.start_b.setEnabled(True)
|
||||||
self.stop_b.setEnabled(False)
|
self.stop_b.setEnabled(False)
|
||||||
|
|
||||||
def start_test(self):
|
def start_test(self):
|
||||||
if self.step.step_type == "test_freefall_leak":
|
# 1. HANDLE FREEFALL SPECIFIC OUTPUTS
|
||||||
self.set_digital_out("first_output", 1) # Set high
|
hardware_config = self.config.get("hardware_config", [])
|
||||||
# print extra labels
|
if self.step.step_type == "test_freefall_leak" and hardware_config.get("freefall") == "present":
|
||||||
|
self.set_digital_out("first_output", 1, component_name="digital_io_flush_blow")
|
||||||
|
else:
|
||||||
|
self.log.info("Skipping Bit 8: Freefall hardware is absent.")
|
||||||
|
# 2. PRINT LABELS IF NECESSARY
|
||||||
if self.step.step_type == "leak_1":
|
if self.step.step_type == "leak_1":
|
||||||
self.parent.print_extra_labels()
|
self.parent.print_extra_labels()
|
||||||
|
|
||||||
# SELECT TEST CHANNEL
|
# 3. CHANNEL SELECTION (with defensive checks)
|
||||||
if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present":
|
hardware_config = self.config.get("hardware_config", {})
|
||||||
if self.parent.config["hardware_config"].get("dual_channel", None) == "present":
|
if hardware_config.get("dual_channel") == "present":
|
||||||
chan_sel = self.step.spec["chan_sel"] # 0=CH1, 1=CH2
|
is_freefall_machine = hardware_config.get("freefall") == "present"
|
||||||
self.set_digital_out("out_channel_select", chan_sel)
|
chan_sel = self.step.spec.get("chan_sel", 0)
|
||||||
self.set_digital_out("in_channel_select", chan_sel)
|
|
||||||
time.sleep(VALVE_TIME)
|
|
||||||
|
|
||||||
# SET LED INDICATORS
|
if is_freefall_machine:
|
||||||
if chan_sel == 0:
|
chan_sel = 1 - chan_sel
|
||||||
self.set_digital_out("ch1_led", True)
|
|
||||||
else:
|
|
||||||
self.set_digital_out("ch2_led", True)
|
|
||||||
|
|
||||||
self.blow_on=True
|
self.set_digital_out("out_channel_select", chan_sel, component_name="digital_io_flush_blow")
|
||||||
|
self.set_digital_out("in_channel_select", chan_sel, component_name="digital_io_flush_blow")
|
||||||
|
|
||||||
|
if chan_sel == 0:
|
||||||
|
self.set_digital_out("ch1_led", True, component_name="digital_io_flush_blow")
|
||||||
|
self.set_digital_out("ch2_led", False, component_name="digital_io_flush_blow")
|
||||||
|
else:
|
||||||
|
self.set_digital_out("ch2_led", True, component_name="digital_io_flush_blow")
|
||||||
|
self.set_digital_out("ch1_led", False, component_name="digital_io_flush_blow")
|
||||||
|
|
||||||
|
time.sleep(VALVE_TIME)
|
||||||
|
|
||||||
|
# 4. EXTERNAL FLUSH / BLOW LOGIC
|
||||||
|
if hardware_config.get("external_flush_blow") == "present":
|
||||||
|
self.blow_on = True
|
||||||
self.display_text("SOFFIAGGIO IN CORSO...")
|
self.display_text("SOFFIAGGIO IN CORSO...")
|
||||||
self.set_digital_out("blow_led",True)
|
self.set_digital_out("blow_led", True, component_name="digital_io_flush_blow")
|
||||||
self.set_digital_out("blow_on",True)
|
self.set_digital_out("blow_on", True, component_name="digital_io_flush_blow")
|
||||||
time.sleep(VALVE_TIME)
|
time.sleep(VALVE_TIME)
|
||||||
self.set_digital_out("flush_on", True)
|
self.set_digital_out("flush_on", True, component_name="digital_io_flush_blow")
|
||||||
blow_time=int(self.step.spec.get('ext_blow_time',3))
|
|
||||||
self.set_digital_out("blow_led", True)
|
blow_time = int(self.step.spec.get('ext_blow_time', 3))
|
||||||
time.sleep(blow_time)
|
time.sleep(blow_time)
|
||||||
self.set_digital_out("blow_led", False)
|
|
||||||
self.set_digital_out("blow_on", False)
|
self.set_digital_out("blow_led", False, component_name="digital_io_flush_blow")
|
||||||
|
self.set_digital_out("blow_on", False, component_name="digital_io_flush_blow")
|
||||||
time.sleep(VALVE_TIME)
|
time.sleep(VALVE_TIME)
|
||||||
self.set_digital_out("flush_on", False)
|
self.set_digital_out("flush_on", False, component_name="digital_io_flush_blow")
|
||||||
|
|
||||||
if self.parent.config["hardware_config"].get("dual_channel", None) != "present":
|
# 5. FALLBACK LED FOR SINGLE CHANNEL MACHINES
|
||||||
self.set_digital_out("ch1_led", True)
|
elif hardware_config.get("dual_channel") != "present":
|
||||||
|
self.set_digital_out("ch1_led", True, component_name="digital_io")
|
||||||
|
|
||||||
|
# 6. TRIGGER PHYSICAL TESTER
|
||||||
self.blow_on = False
|
self.blow_on = False
|
||||||
if not self.simulate:
|
if not self.simulate:
|
||||||
self.components[self.tester_component].start_test()
|
if self.tester_component in self.components:
|
||||||
|
self.components[self.tester_component].start_test()
|
||||||
|
|
||||||
|
|
||||||
def start(self, recipe=None, step=None, pieces=None):
|
def start(self, recipe=None, step=None, pieces=None):
|
||||||
|
|
@ -138,15 +160,18 @@ class Test_Leak(Test_Test):
|
||||||
if self.step.spec.get("autotest", False): # IF AUTOTESTING UPLOAD RECIPE EVERY TIME
|
if self.step.spec.get("autotest", False): # IF AUTOTESTING UPLOAD RECIPE EVERY TIME
|
||||||
self.recipe_written = False
|
self.recipe_written = False
|
||||||
|
|
||||||
if self.parent.config["hardware_config"].get("second_leak_test", "absent") == "present": # IF SECOND LEAK TEST ENABLED UPLOAD RECIPE EVERY TIME
|
hardware_config = self.config.get("hardware_config", {})
|
||||||
|
if hardware_config.get("second_leak_test") == "present": # IF SECOND LEAK TEST ENABLED UPLOAD RECIPE EVERY TIME
|
||||||
self.recipe_written = False
|
self.recipe_written = False
|
||||||
|
|
||||||
if not self.recipe_written:
|
if not self.recipe_written:
|
||||||
self.components[self.tester_component].write_recipe(self.recipe, self.step)
|
if self.tester_component in self.components:
|
||||||
|
self.components[self.tester_component].write_recipe(self.recipe, self.step)
|
||||||
self.recipe_written=True
|
self.recipe_written=True
|
||||||
|
|
||||||
self.get_connection = self.components[self.tester_component].out.connect(self.get)
|
if self.tester_component in self.components:
|
||||||
self.components[self.tester_component].resume()
|
self.get_connection = self.components[self.tester_component].out.connect(self.get)
|
||||||
|
self.components[self.tester_component].resume()
|
||||||
if self.parent_assembly_widget is not None:
|
if self.parent_assembly_widget is not None:
|
||||||
self.display_text(text="ATTENDERE")
|
self.display_text(text="ATTENDERE")
|
||||||
self.start_b.setEnabled(False)
|
self.start_b.setEnabled(False)
|
||||||
|
|
@ -216,7 +241,8 @@ class Test_Leak(Test_Test):
|
||||||
|
|
||||||
# AUTO START SECOND TEST
|
# AUTO START SECOND TEST
|
||||||
if step.step_type == "leak_2":
|
if step.step_type == "leak_2":
|
||||||
if self.config["hardware_config"].get("dual_channel", "absent") == "present":
|
hardware_config = self.config.get("hardware_config", {})
|
||||||
|
if hardware_config.get("dual_channel") == "present":
|
||||||
self.recipe_written = False
|
self.recipe_written = False
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
self.start_b.setEnabled(True)
|
self.start_b.setEnabled(True)
|
||||||
|
|
@ -230,10 +256,12 @@ class Test_Leak(Test_Test):
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
# disable test loop
|
# disable test loop
|
||||||
self.components[self.tester_component].stop_test()
|
if self.tester_component in self.components:
|
||||||
self.components[self.tester_component].pause()
|
self.components[self.tester_component].stop_test()
|
||||||
self.disconnect(self.get_connection)
|
self.components[self.tester_component].pause()
|
||||||
self.set_digital_out("first_output", 0) # Set low when test stops
|
if self.get_connection:
|
||||||
|
self.disconnect(self.get_connection)
|
||||||
|
self.set_digital_out("first_output", 0, component_name="digital_io_flush_blow") # Set low when test stops
|
||||||
super().stop()
|
super().stop()
|
||||||
self.start_b.setEnabled(False)
|
self.start_b.setEnabled(False)
|
||||||
self.stop_b.setEnabled(False)
|
self.stop_b.setEnabled(False)
|
||||||
|
|
@ -305,7 +333,7 @@ class Test_Leak(Test_Test):
|
||||||
# Let end-of-measure be derived by saver when possible
|
# Let end-of-measure be derived by saver when possible
|
||||||
}
|
}
|
||||||
|
|
||||||
if "Running test: result" in data[self.tester_component]:
|
if self.tester_component in data and "Running test: result" in data[self.tester_component]:
|
||||||
# TEST ENDED, CHECK RESULT
|
# TEST ENDED, CHECK RESULT
|
||||||
result = data[self.tester_component]["Running test: result"]
|
result = data[self.tester_component]["Running test: result"]
|
||||||
step=self.step.spec.get("autotest", "")
|
step=self.step.spec.get("autotest", "")
|
||||||
|
|
@ -333,26 +361,27 @@ class Test_Leak(Test_Test):
|
||||||
self.parent.autostart_next_step = True
|
self.parent.autostart_next_step = True
|
||||||
|
|
||||||
# SET DIGITAL OUTPUTS
|
# SET DIGITAL OUTPUTS
|
||||||
|
hardware_config = self.config.get("hardware_config", {})
|
||||||
if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present":
|
if hardware_config.get("external_flush_blow") == "present":
|
||||||
self.blow_on = True
|
self.blow_on = True
|
||||||
if self.parent.config["hardware_config"].get("dual_channel", None) != "present":
|
if hardware_config.get("dual_channel") != "present":
|
||||||
self.set_digital_out("ch1_led", False)
|
# Corrected: Use 'digital_io' for single-channel machines
|
||||||
|
self.set_digital_out("ch1_led", False, component_name="digital_io")
|
||||||
|
|
||||||
self.display_text("SCARICO ESTERNO IN CORSO...")
|
self.display_text("SCARICO ESTERNO IN CORSO...")
|
||||||
self.set_digital_out("flush_led", True)
|
self.set_digital_out("flush_led", True, component_name="digital_io_flush_blow")
|
||||||
self.set_digital_out("flush_on", True)
|
self.set_digital_out("flush_on", True, component_name="digital_io_flush_blow")
|
||||||
time.sleep(VALVE_TIME)
|
time.sleep(VALVE_TIME)
|
||||||
flush_time = int(self.step.spec.get('ext_flush_time',3))
|
flush_time = int(self.step.spec.get('ext_flush_time',3))
|
||||||
time.sleep(flush_time)
|
time.sleep(flush_time)
|
||||||
self.set_digital_out("flush_led", False)
|
self.set_digital_out("flush_led", False, component_name="digital_io_flush_blow")
|
||||||
#self.set_digital_out("flush_on", False)
|
#self.set_digital_out("flush_on", False)
|
||||||
|
|
||||||
if self.parent.config["hardware_config"].get("dual_channel", None) == "present":
|
if hardware_config.get("dual_channel") == "present":
|
||||||
self.set_digital_out("out_channel_select", False)
|
self.set_digital_out("out_channel_select", False, component_name="digital_io_flush_blow")
|
||||||
self.set_digital_out("in_channel_select", False)
|
self.set_digital_out("in_channel_select", False, component_name="digital_io_flush_blow")
|
||||||
self.set_digital_out("ch1_led", False)
|
self.set_digital_out("ch1_led", False, component_name="digital_io_flush_blow")
|
||||||
self.set_digital_out("ch2_led", False)
|
self.set_digital_out("ch2_led", False, component_name="digital_io_flush_blow")
|
||||||
else:
|
else:
|
||||||
#result = None
|
#result = None
|
||||||
ok = None
|
ok = None
|
||||||
|
|
@ -389,7 +418,7 @@ class Test_Leak(Test_Test):
|
||||||
l.setText(str(v))
|
l.setText(str(v))
|
||||||
|
|
||||||
# Check if there's a connection issue before displaying test status
|
# Check if there's a connection issue before displaying test status
|
||||||
if hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost:
|
if self.tester_component in self.components and hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost:
|
||||||
# Connection is lost, don't display test status
|
# Connection is lost, don't display test status
|
||||||
# The handle_modbus_error method will display the appropriate message
|
# The handle_modbus_error method will display the appropriate message
|
||||||
# Just ensure buttons are in the correct state
|
# Just ensure buttons are in the correct state
|
||||||
|
|
@ -398,7 +427,7 @@ class Test_Leak(Test_Test):
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check if the connection was just restored
|
# Check if the connection was just restored
|
||||||
if hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost:
|
if self.tester_component in self.components and hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost:
|
||||||
# Connection was just restored, don't display test status yet
|
# Connection was just restored, don't display test status yet
|
||||||
# The handle_modbus_error method will display the appropriate message
|
# The handle_modbus_error method will display the appropriate message
|
||||||
# Just ensure buttons are in the correct state
|
# Just ensure buttons are in the correct state
|
||||||
|
|
@ -410,7 +439,7 @@ class Test_Leak(Test_Test):
|
||||||
if d.get("Running test: active phase", None) in {
|
if d.get("Running test: active phase", None) in {
|
||||||
"WAITING START",
|
"WAITING START",
|
||||||
"ATTESA START",
|
"ATTESA START",
|
||||||
"END TEST, WAITING THE START OF A NEW TEST"
|
"END TEST, WAITING THE START OF A NEW TEST",
|
||||||
"FINE TEST",
|
"FINE TEST",
|
||||||
"STANDBY",
|
"STANDBY",
|
||||||
"PRESSIONE BASSA",
|
"PRESSIONE BASSA",
|
||||||
|
|
@ -469,12 +498,26 @@ class Test_Leak(Test_Test):
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
|
|
||||||
def set_digital_out(self,out_name=None,state=1,component_name="digital_io"):
|
def set_digital_out(self,out_name=None,state=1,component_name="digital_io"):
|
||||||
|
# First, check if the component is even loaded for this machine
|
||||||
|
if component_name not in self.components:
|
||||||
|
# If not, just ignore the call. This is the expected behavior for machines
|
||||||
|
# without this specific hardware.
|
||||||
|
return
|
||||||
|
|
||||||
if self.io_ok:
|
if self.io_ok:
|
||||||
bit = int(self.parent.config[component_name][out_name])
|
component_config = self.config.get(component_name, {})
|
||||||
ret = self.components[component_name].set_bit_verify(0,bit,state)
|
bit_val = component_config.get(out_name)
|
||||||
if not ret:
|
if bit_val is not None:
|
||||||
QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB")
|
bit = int(bit_val)
|
||||||
self.io_ok = False
|
# Now we can safely access the component
|
||||||
|
ret = self.components[component_name].set_bit_verify(0,bit,state)
|
||||||
|
if not ret:
|
||||||
|
QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB")
|
||||||
|
self.io_ok = False
|
||||||
|
else:
|
||||||
|
# This warning is still useful for debugging configurations
|
||||||
|
print(f"Warning: Output '{out_name}' not found in '{component_name}' config for this machine.")
|
||||||
|
|
||||||
|
|
||||||
def save_last(self):
|
def save_last(self):
|
||||||
if self.last is None:
|
if self.last is None:
|
||||||
|
|
@ -502,7 +545,8 @@ class Test_Leak(Test_Test):
|
||||||
bg_color="red", text_color="white"
|
bg_color="red", text_color="white"
|
||||||
)
|
)
|
||||||
# Stop the test when a connection error is detected
|
# Stop the test when a connection error is detected
|
||||||
self.parent.fail_cycle()
|
if self.parent:
|
||||||
|
self.parent.fail_cycle()
|
||||||
# Always disable start button and enable stop button during connection issues
|
# Always disable start button and enable stop button during connection issues
|
||||||
self.start_b.setEnabled(False)
|
self.start_b.setEnabled(False)
|
||||||
self.stop_b.setEnabled(True)
|
self.stop_b.setEnabled(True)
|
||||||
|
|
@ -525,7 +569,8 @@ class Test_Leak(Test_Test):
|
||||||
bg_color="green", text_color="white"
|
bg_color="green", text_color="white"
|
||||||
)
|
)
|
||||||
# Reset the flag immediately to ensure the reconnection message is displayed
|
# Reset the flag immediately to ensure the reconnection message is displayed
|
||||||
self.components[self.tester_component]._previous_connection_lost = False
|
if self.tester_component in self.components:
|
||||||
|
self.components[self.tester_component]._previous_connection_lost = False
|
||||||
# Force a UI update to ensure the message is displayed
|
# Force a UI update to ensure the message is displayed
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
# Always disable start button and enable stop button during connection issues
|
# Always disable start button and enable stop button during connection issues
|
||||||
|
|
|
||||||
|
|
@ -236,8 +236,18 @@ class Test_Test(Widget):
|
||||||
return
|
return
|
||||||
|
|
||||||
def resizeEvent(self, event=None):
|
def resizeEvent(self, event=None):
|
||||||
if hasattr(self, "img_l"):
|
# 1. Check if the label widget exists
|
||||||
self.img_l.setPixmap(self.img.scaled(self.img_l.width(), self.img_l.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation))
|
# 2. Check if the image attribute has been initialized
|
||||||
|
# 3. Check if the image is not None
|
||||||
|
if hasattr(self, "img_l") and hasattr(self, "img") and self.img is not None:
|
||||||
|
# Ensure the label has a valid width/height to avoid scaling errors
|
||||||
|
if self.img_l.width() > 0 and self.img_l.height() > 0:
|
||||||
|
self.img_l.setPixmap(self.img.scaled(
|
||||||
|
self.img_l.width(),
|
||||||
|
self.img_l.height(),
|
||||||
|
Qt.KeepAspectRatio,
|
||||||
|
Qt.SmoothTransformation
|
||||||
|
))
|
||||||
|
|
||||||
def challenge_admin(self, info):
|
def challenge_admin(self, info):
|
||||||
if not self.admin_challenged:
|
if not self.admin_challenged:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user