dual
This commit is contained in:
parent
4d4769a7be
commit
accf5dc72b
|
|
@ -30,7 +30,8 @@ class Test_Leak(Test_Test):
|
|||
self.show_instruction_b.clicked.connect(self.show_instruction)
|
||||
|
||||
# Connect to the tecna_error_signal to handle connection issues
|
||||
self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error)
|
||||
if self.tester_component in self.components:
|
||||
self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error)
|
||||
|
||||
def show_instruction(self):
|
||||
dialog=Dialog()
|
||||
|
|
@ -38,11 +39,13 @@ class Test_Leak(Test_Test):
|
|||
dialog.show()
|
||||
|
||||
def reset(self):
|
||||
self.components[self.tester_component].stop_test()
|
||||
if self.tester_component in self.components:
|
||||
self.components[self.tester_component].stop_test()
|
||||
super().reset()
|
||||
|
||||
def stop_test(self):
|
||||
self.components[self.tester_component].stop_test()
|
||||
if self.tester_component in self.components:
|
||||
self.components[self.tester_component].stop_test()
|
||||
self.display_text(text="PROVA INTERROTTA", bg_color="yellow")
|
||||
time.sleep(1)
|
||||
self.start_b.setEnabled(True)
|
||||
|
|
@ -97,12 +100,13 @@ class Test_Leak(Test_Test):
|
|||
|
||||
# 5. FALLBACK LED FOR SINGLE CHANNEL MACHINES
|
||||
elif hardware_config.get("dual_channel") != "present":
|
||||
self.set_digital_out("ch1_led", True, component_name="digital_io_flush_blow")
|
||||
self.set_digital_out("ch1_led", True, component_name="digital_io")
|
||||
|
||||
# 6. TRIGGER PHYSICAL TESTER
|
||||
self.blow_on = False
|
||||
if not self.simulate:
|
||||
self.components[self.tester_component].start_test()
|
||||
if self.tester_component in self.components:
|
||||
self.components[self.tester_component].start_test()
|
||||
|
||||
|
||||
def start(self, recipe=None, step=None, pieces=None):
|
||||
|
|
@ -159,11 +163,13 @@ class Test_Leak(Test_Test):
|
|||
self.recipe_written = False
|
||||
|
||||
if not self.recipe_written:
|
||||
self.components[self.tester_component].write_recipe(self.recipe, self.step)
|
||||
if self.tester_component in self.components:
|
||||
self.components[self.tester_component].write_recipe(self.recipe, self.step)
|
||||
self.recipe_written=True
|
||||
|
||||
self.get_connection = self.components[self.tester_component].out.connect(self.get)
|
||||
self.components[self.tester_component].resume()
|
||||
if self.tester_component in self.components:
|
||||
self.get_connection = self.components[self.tester_component].out.connect(self.get)
|
||||
self.components[self.tester_component].resume()
|
||||
if self.parent_assembly_widget is not None:
|
||||
self.display_text(text="ATTENDERE")
|
||||
self.start_b.setEnabled(False)
|
||||
|
|
@ -248,9 +254,11 @@ class Test_Leak(Test_Test):
|
|||
|
||||
def stop(self):
|
||||
# disable test loop
|
||||
self.components[self.tester_component].stop_test()
|
||||
self.components[self.tester_component].pause()
|
||||
self.disconnect(self.get_connection)
|
||||
if self.tester_component in self.components:
|
||||
self.components[self.tester_component].stop_test()
|
||||
self.components[self.tester_component].pause()
|
||||
if self.get_connection:
|
||||
self.disconnect(self.get_connection)
|
||||
self.set_digital_out("first_output", 0, component_name="digital_io_flush_blow") # Set low when test stops
|
||||
super().stop()
|
||||
self.start_b.setEnabled(False)
|
||||
|
|
@ -323,7 +331,7 @@ class Test_Leak(Test_Test):
|
|||
# Let end-of-measure be derived by saver when possible
|
||||
}
|
||||
|
||||
if "Running test: result" in data[self.tester_component]:
|
||||
if self.tester_component in data and "Running test: result" in data[self.tester_component]:
|
||||
# TEST ENDED, CHECK RESULT
|
||||
result = data[self.tester_component]["Running test: result"]
|
||||
step=self.step.spec.get("autotest", "")
|
||||
|
|
@ -355,7 +363,8 @@ class Test_Leak(Test_Test):
|
|||
if hardware_config.get("external_flush_blow") == "present":
|
||||
self.blow_on = True
|
||||
if hardware_config.get("dual_channel") != "present":
|
||||
self.set_digital_out("ch1_led", False, component_name="digital_io_flush_blow")
|
||||
# Corrected: Use 'digital_io' for single-channel machines
|
||||
self.set_digital_out("ch1_led", False, component_name="digital_io")
|
||||
|
||||
self.display_text("SCARICO ESTERNO IN CORSO...")
|
||||
self.set_digital_out("flush_led", True, component_name="digital_io_flush_blow")
|
||||
|
|
@ -407,7 +416,7 @@ class Test_Leak(Test_Test):
|
|||
l.setText(str(v))
|
||||
|
||||
# Check if there's a connection issue before displaying test status
|
||||
if hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost:
|
||||
if self.tester_component in self.components and hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost:
|
||||
# Connection is lost, don't display test status
|
||||
# The handle_modbus_error method will display the appropriate message
|
||||
# Just ensure buttons are in the correct state
|
||||
|
|
@ -416,7 +425,7 @@ class Test_Leak(Test_Test):
|
|||
return
|
||||
|
||||
# Check if the connection was just restored
|
||||
if hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost:
|
||||
if self.tester_component in self.components and hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost:
|
||||
# Connection was just restored, don't display test status yet
|
||||
# The handle_modbus_error method will display the appropriate message
|
||||
# Just ensure buttons are in the correct state
|
||||
|
|
@ -487,18 +496,25 @@ class Test_Leak(Test_Test):
|
|||
QApplication.processEvents()
|
||||
|
||||
def set_digital_out(self,out_name=None,state=1,component_name="digital_io"):
|
||||
# First, check if the component is even loaded for this machine
|
||||
if component_name not in self.components:
|
||||
# If not, just ignore the call. This is the expected behavior for machines
|
||||
# without this specific hardware.
|
||||
return
|
||||
|
||||
if self.io_ok:
|
||||
component_config = self.config.get(component_name, {})
|
||||
bit_val = component_config.get(out_name)
|
||||
if bit_val is not None:
|
||||
bit = int(bit_val)
|
||||
# Now we can safely access the component
|
||||
ret = self.components[component_name].set_bit_verify(0,bit,state)
|
||||
if not ret:
|
||||
QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB")
|
||||
self.io_ok = False
|
||||
else:
|
||||
# Handle case where out_name is not in the config to avoid a crash
|
||||
print(f"Warning: Output '{out_name}' not found in '{component_name}' config.")
|
||||
# This warning is still useful for debugging configurations
|
||||
print(f"Warning: Output '{out_name}' not found in '{component_name}' config for this machine.")
|
||||
|
||||
|
||||
def save_last(self):
|
||||
|
|
@ -527,7 +543,8 @@ class Test_Leak(Test_Test):
|
|||
bg_color="red", text_color="white"
|
||||
)
|
||||
# Stop the test when a connection error is detected
|
||||
self.parent.fail_cycle()
|
||||
if self.parent:
|
||||
self.parent.fail_cycle()
|
||||
# Always disable start button and enable stop button during connection issues
|
||||
self.start_b.setEnabled(False)
|
||||
self.stop_b.setEnabled(True)
|
||||
|
|
@ -550,7 +567,8 @@ class Test_Leak(Test_Test):
|
|||
bg_color="green", text_color="white"
|
||||
)
|
||||
# Reset the flag immediately to ensure the reconnection message is displayed
|
||||
self.components[self.tester_component]._previous_connection_lost = False
|
||||
if self.tester_component in self.components:
|
||||
self.components[self.tester_component]._previous_connection_lost = False
|
||||
# Force a UI update to ensure the message is displayed
|
||||
QApplication.processEvents()
|
||||
# Always disable start button and enable stop button during connection issues
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user