This commit is contained in:
edo-neo 2025-08-21 09:48:38 +02:00
parent a6fbd4671a
commit 855964de0c

View File

@ -39,11 +39,13 @@ class HikrobotSmartCamera(Component):
This is called periodically by the timer when an operation is in progress.
"""
if not self.connected or not self.current_operation:
self.log.debug("Progress check stopped: not connected or no operation in progress")
self.progress_timer.stop()
return
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Progress check stopped: no camera handles available")
self.progress_timer.stop()
return
@ -55,47 +57,85 @@ class HikrobotSmartCamera(Component):
push_rate = c_uint()
try:
self.log.debug("Getting push status information")
nRet1 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushType", byref(push_type))
nRet2 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushState", byref(push_state))
nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScPushRate", byref(push_rate))
if nRet1 != MV_VS_OK:
self.log.error(f"Failed to get ScPushType, error code: 0x{nRet1&0xFFFFFFFF:x}")
if nRet2 != MV_VS_OK:
self.log.error(f"Failed to get ScPushState, error code: 0x{nRet2&0xFFFFFFFF:x}")
if nRet3 != MV_VS_OK:
self.log.error(f"Failed to get ScPushRate, error code: 0x{nRet3&0xFFFFFFFF:x}")
if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK:
self.log.debug(f"Push status - Type: {push_type.value}, State: {push_state.value}, Rate: {push_rate.value}")
# Check which operation we're monitoring
if self.current_operation == "switch_scheme" and push_type.value == 2: # LPRP 2 protocol loading progress
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
self.log.info(f"Scheme switching progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if self.current_operation == "switch_scheme":
if push_type.value == 2: # LPRP 2 protocol loading progress
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
if push_state.value == 0: # Success
self.log.info("Scheme switching completed successfully")
# Refresh module list after successful scheme switching
self.refresh_module_list()
else: # Failed
self.log.error("Scheme switching failed")
self.log.info(f"Scheme switching progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
elif self.current_operation == "ntp_settings" and push_type.value == 16: # NTPS 16 NTP Time Check Status
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
self.log.info(f"NTP settings progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if push_state.value == 0: # Success
self.log.info("Scheme switching completed successfully")
# Refresh module list after successful scheme switching
self.refresh_module_list()
else: # Failed
self.log.error("Scheme switching failed")
else:
self.log.debug(f"Waiting for push type 2 (protocol loading), current type: {push_type.value}")
elif self.current_operation == "ntp_settings":
if push_type.value == 16: # NTPS 16 NTP Time Check Status
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
if push_state.value == 0: # Success
self.log.info("NTP settings completed successfully")
else: # Failed
self.log.error("NTP settings failed")
self.log.info(f"NTP settings progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if push_state.value == 0: # Success
self.log.info("NTP settings completed successfully")
else: # Failed
self.log.error("NTP settings failed")
else:
self.log.debug(f"Waiting for push type 16 (NTP settings), current type: {push_type.value}")
else:
self.log.warning(f"Unknown operation type: {self.current_operation}")
else:
# If we can't get the push status after multiple attempts, try to proceed anyway
self.log.warning("Could not get push status information, continuing with operation")
# After 10 seconds (20 checks at 500ms), force completion if we're still waiting
if not hasattr(self, '_progress_check_count'):
self._progress_check_count = 0
self._progress_check_count += 1
if self._progress_check_count >= 20:
self.log.warning(f"Force completing operation after timeout: {self.current_operation}")
self.progress_timer.stop()
self.current_operation = None
self._progress_check_count = 0
# If we were switching schemes, try to refresh the module list anyway
if self.current_operation == "switch_scheme":
self.log.info("Attempting to refresh module list after timeout")
self.refresh_module_list()
except Exception as e:
self.log.error(f"Error checking progress: {e}")
self.progress_timer.stop()
@ -151,20 +191,29 @@ class HikrobotSmartCamera(Component):
handle = self.cam_list[0]["handle"]
try:
self.log.info(f"Starting scheme switch to: {solution_name}")
# 1. Set the SrcOperateSolutionName node as the target schema name
solution_name_bytes = solution_name.encode('utf-8')
self.log.debug(f"Setting solution name: {solution_name} (bytes: {solution_name_bytes})")
nRet = mv_lib.MV_VS_SetStringValue(handle, b"SrcOperateSolutionName", solution_name_bytes)
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution name: {solution_name}")
self.log.error(f"Failed to set solution name: {solution_name}, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
else:
self.log.info(f"Successfully set solution name: {solution_name}")
# 2. Set the Load Scenario command
self.log.debug("Sending CommandProjectLoad command")
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandProjectLoad")
if nRet != MV_VS_OK:
self.log.error("Failed to load project")
self.log.error(f"Failed to load project, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
else:
self.log.info("Successfully sent project load command")
# Start monitoring progress
self.log.info("Starting progress monitoring for scheme switching")
self.current_operation = "switch_scheme"
self.progress_timer.start(500) # Check every 500ms
@ -284,7 +333,7 @@ class HikrobotSmartCamera(Component):
def update_solution_name(self, recipe_name):
"""
This method is called when the vision component's recipe changes.
It updates the solution_name attribute and triggers a reconfiguration.
It updates the solution_name attribute and triggers a scheme switch.
It also attempts to get the rotations value from the vision component if available.
"""
@ -300,11 +349,29 @@ class HikrobotSmartCamera(Component):
except Exception as e:
self.log.warning(f"Could not update rotations: {e}")
# Use the new switch_scheme method instead of reconfiguring
# Use the switch_scheme method to change the camera scheme
if self.connected:
self.switch_scheme(recipe_name)
# Stop any ongoing operation
if self.current_operation:
self.log.warning(f"Stopping ongoing operation: {self.current_operation}")
self.progress_timer.stop()
self.current_operation = None
# Try to switch scheme
success = self.switch_scheme(recipe_name)
if not success:
self.log.error(f"Failed to switch scheme to {recipe_name}, trying to reconnect...")
# If switching failed, try to reconnect and then switch again
self.connected = False
self.config_changed() # This will reconnect
if self.connected:
self.log.info(f"Reconnected, trying to switch scheme again...")
self.switch_scheme(recipe_name)
else:
self.log.error("Failed to reconnect to camera")
else:
self.reconfigure()
self.log.info("Camera not connected, will switch scheme after connection")
self.reconfigure() # This will connect and then switch scheme
def config_changed(self):
self.connected = False
@ -450,3 +517,91 @@ class HikrobotSmartCamera(Component):
else:
QMessageBox.critical(None, "Errore", f"ERRORE CONNESSIONE TELECAMERE\n")
super().resume()
def get_current_scheme(self):
"""
Get the current scheme/solution name from the camera.
Returns:
str: The current scheme name or None if there was an error
"""
if not self.connected:
self.log.error("Cannot get current scheme: Camera not connected")
return None
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot get current scheme: No camera handles available")
return None
handle = self.cam_list[0]["handle"]
try:
# Get the current solution name
solution_name = create_string_buffer(256)
nRet = mv_lib.MV_VS_GetStringValue(handle, b"SrcOperateSolutionName", solution_name, 256)
if nRet != MV_VS_OK:
self.log.error(f"Failed to get current solution name, error code: 0x{nRet&0xFFFFFFFF:x}")
return None
current_scheme = solution_name.value.decode('utf-8')
self.log.info(f"Current scheme: {current_scheme}")
return current_scheme
except Exception as e:
self.log.error(f"Error getting current scheme: {e}")
return None
def test_scheme_switch(self, solution_name=None):
"""
Test method to manually trigger a scheme switch.
This can be used for debugging or testing the scheme switching functionality.
Args:
solution_name: The name of the solution to switch to. If None, uses the current solution_name.
Returns:
bool: True if successful, False otherwise
"""
if solution_name is None:
if self.solution_name is None:
self.log.error("No solution name provided and no current solution name set")
return False
solution_name = self.solution_name
self.log.info(f"Testing scheme switch to: {solution_name}")
# Get current scheme for comparison
current_scheme = self.get_current_scheme()
if current_scheme == solution_name:
self.log.info(f"Already using scheme: {solution_name}")
return True
if not self.connected:
self.log.info("Camera not connected, connecting first...")
self.config_changed()
if not self.connected:
self.log.error("Failed to connect to camera")
return False
# Stop any ongoing operation
if self.current_operation:
self.log.warning(f"Stopping ongoing operation: {self.current_operation}")
self.progress_timer.stop()
self.current_operation = None
# Try to switch scheme
success = self.switch_scheme(solution_name)
# Verify the switch was successful
if success:
# Wait a bit for the switch to complete
time.sleep(2)
new_scheme = self.get_current_scheme()
if new_scheme == solution_name:
self.log.info(f"Successfully switched to scheme: {solution_name}")
return True
else:
self.log.warning(f"Scheme switch command succeeded but verification failed. Current scheme: {new_scheme}")
return False
return success