This commit is contained in:
edo-neo 2025-08-21 11:05:30 +02:00
parent 2b70218fd4
commit 27870f9003

View File

@ -33,6 +33,97 @@ class HikrobotSmartCamera(Component):
self.progress_timer.timeout.connect(self.check_progress)
self.current_operation = None
def get_push_status(self):
"""
Get the current push status information from the camera.
As described in section 3.3.2 of the documentation, this retrieves:
- Push Type (ScPushType)
- Push Status (ScPushState)
- Push Progress (ScPushRate)
Returns:
dict: A dictionary containing the push status information or None if there was an error
"""
if not self.connected:
self.log.error("Cannot get push status: Camera not connected")
return None
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot get push status: No camera handles available")
return None
handle = self.cam_list[0]["handle"]
try:
# Get push type, state and rate
push_type = c_uint()
push_state = c_uint()
push_rate = c_uint()
nRet1 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushType", byref(push_type))
nRet2 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushState", byref(push_state))
nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScPushRate", byref(push_rate))
if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK:
# Map push type to a readable name
push_type_names = {
0: "UDMR", # Update individual module results
1: "CTRP", # Create Template Progress
2: "LPRP", # Protocol loading progress
3: "SPRP", # Protocol save progress
4: "FMUP", # Firmware update progress
5: "DFCS", # Format the verification state
6: "LTRP", # Upload Template Progress
7: "LTIP", # Upload Template Image Progress
8: "UDMS", # Update all module results
9: "LLGP", # Load log progress
10: "CMIS", # Detects module integrity
11: "UPRP", # Upload scenario progress
12: "DRTP", # Device reset progress
13: "UPMP", # Upload Teaching Model progress
14: "UPLI", # Upload local image progress
15: "ISTP", # Export stored images to test image progress
16: "NTPS", # NTP Time Check Status
17: "DCLP", # DI or COMMUNITION switch protocol load progress
18: "SRSP", # Insufficient system resources prompt
19: "UPUS", # Upload package progress
20: "UPUR", # Upload resource file progress
21: "CPRP", # New Scenario Progress
22: "EDRP", # Modify the scheme name
23: "TIMG", # Test map run aborted
24: "GLOG" # Get the device log
}
# Map push state to a readable name
push_state_names = {
0: "Success",
1: "Failed"
}
push_type_name = push_type_names.get(push_type.value, f"Unknown ({push_type.value})")
push_state_name = push_state_names.get(push_state.value, f"Unknown ({push_state.value})")
return {
"type": push_type.value,
"type_name": push_type_name,
"state": push_state.value,
"state_name": push_state_name,
"rate": push_rate.value
}
else:
if nRet1 != MV_VS_OK:
self.log.error(f"Failed to get ScPushType, error code: 0x{nRet1&0xFFFFFFFF:x}")
if nRet2 != MV_VS_OK:
self.log.error(f"Failed to get ScPushState, error code: 0x{nRet2&0xFFFFFFFF:x}")
if nRet3 != MV_VS_OK:
self.log.error(f"Failed to get ScPushRate, error code: 0x{nRet3&0xFFFFFFFF:x}")
return None
except Exception as e:
self.log.error(f"Error getting push status: {e}")
return None
def check_progress(self):
"""
Check the progress of the current operation by monitoring push status and progress.
@ -49,97 +140,100 @@ class HikrobotSmartCamera(Component):
self.progress_timer.stop()
return
handle = self.cam_list[0]["handle"]
# Get push status
push_status = self.get_push_status()
# Get push type, state and rate
push_type = c_uint()
push_state = c_uint()
push_rate = c_uint()
try:
self.log.debug("Getting push status information")
nRet1 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushType", byref(push_type))
nRet2 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushState", byref(push_state))
nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScPushRate", byref(push_rate))
if push_status is None:
self.log.warning("Could not get push status information")
# After 10 seconds (20 checks at 500ms), force completion if we're still waiting
if not hasattr(self, '_progress_check_count'):
self._progress_check_count = 0
if nRet1 != MV_VS_OK:
self.log.error(f"Failed to get ScPushType, error code: 0x{nRet1&0xFFFFFFFF:x}")
if nRet2 != MV_VS_OK:
self.log.error(f"Failed to get ScPushState, error code: 0x{nRet2&0xFFFFFFFF:x}")
if nRet3 != MV_VS_OK:
self.log.error(f"Failed to get ScPushRate, error code: 0x{nRet3&0xFFFFFFFF:x}")
if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK:
self.log.debug(f"Push status - Type: {push_type.value}, State: {push_state.value}, Rate: {push_rate.value}")
self._progress_check_count += 1
if self._progress_check_count >= 20:
self.log.warning(f"Force completing operation after timeout: {self.current_operation}")
self.progress_timer.stop()
self.current_operation = None
self._progress_check_count = 0
# Check which operation we're monitoring
# If we were switching schemes, try to refresh the module list anyway
if self.current_operation == "switch_scheme":
if push_type.value == 2: # LPRP 2 protocol loading progress
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
self.log.info(f"Scheme switching progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if push_state.value == 0: # Success
self.log.info("Scheme switching completed successfully")
# Refresh module list after successful scheme switching
self.refresh_module_list()
else: # Failed
self.log.error("Scheme switching failed")
else:
self.log.debug(f"Waiting for push type 2 (protocol loading), current type: {push_type.value}")
elif self.current_operation == "ntp_settings":
if push_type.value == 16: # NTPS 16 NTP Time Check Status
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
self.log.info(f"NTP settings progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if push_state.value == 0: # Success
self.log.info("NTP settings completed successfully")
else: # Failed
self.log.error("NTP settings failed")
else:
self.log.debug(f"Waiting for push type 16 (NTP settings), current type: {push_type.value}")
else:
self.log.warning(f"Unknown operation type: {self.current_operation}")
else:
# If we can't get the push status after multiple attempts, try to proceed anyway
self.log.warning("Could not get push status information, continuing with operation")
self.log.info("Attempting to refresh module list after timeout")
self.refresh_module_list()
return
# Log push status
self.log.debug(f"Push status - Type: {push_status['type_name']} ({push_status['type']}), " +
f"State: {push_status['state_name']} ({push_status['state']}), " +
f"Rate: {push_status['rate']}%")
# Check which operation we're monitoring
if self.current_operation == "switch_scheme":
if push_status['type'] == 2: # LPRP 2 protocol loading progress
progress = push_status['rate']
status = push_status['state_name']
# After 10 seconds (20 checks at 500ms), force completion if we're still waiting
if not hasattr(self, '_progress_check_count'):
self._progress_check_count = 0
self.log.info(f"Scheme switching progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
self._progress_check_count += 1
if self._progress_check_count >= 20:
self.log.warning(f"Force completing operation after timeout: {self.current_operation}")
# If progress is complete or failed, stop the timer
if progress >= 100 or push_status['state'] == 1:
self.progress_timer.stop()
self.current_operation = None
self._progress_check_count = 0
# If we were switching schemes, try to refresh the module list anyway
if self.current_operation == "switch_scheme":
self.log.info("Attempting to refresh module list after timeout")
if push_status['state'] == 0: # Success
self.log.info("Scheme switching completed successfully")
# Refresh module list after successful scheme switching
self.refresh_module_list()
else: # Failed
self.log.error("Scheme switching failed")
else:
self.log.debug(f"Waiting for push type 2 (protocol loading), current type: {push_status['type']}")
elif self.current_operation == "ntp_settings":
if push_status['type'] == 16: # NTPS 16 NTP Time Check Status
progress = push_status['rate']
status = push_status['state_name']
except Exception as e:
self.log.error(f"Error checking progress: {e}")
self.progress_timer.stop()
self.current_operation = None
self.log.info(f"NTP settings progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_status['state'] == 1:
self.progress_timer.stop()
self.current_operation = None
if push_status['state'] == 0: # Success
self.log.info("NTP settings completed successfully")
else: # Failed
self.log.error("NTP settings failed")
else:
self.log.debug(f"Waiting for push type 16 (NTP settings), current type: {push_status['type']}")
elif self.current_operation == "switch_scheme_communications":
if push_status['type'] == 17: # DCLP 17 DI or COMMUNITION switch protocol load progress
progress = push_status['rate']
status = push_status['state_name']
self.log.info(f"Communications scheme switching progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_status['state'] == 1:
self.progress_timer.stop()
self.current_operation = None
if push_status['state'] == 0: # Success
self.log.info("Communications scheme switching completed successfully")
# Refresh module list after successful scheme switching
self.refresh_module_list()
else: # Failed
self.log.error("Communications scheme switching failed")
else:
self.log.debug(f"Waiting for push type 17 (communications protocol loading), current type: {push_status['type']}")
else:
self.log.warning(f"Unknown operation type: {self.current_operation}")
def refresh_module_list(self):
"""
@ -176,8 +270,17 @@ class HikrobotSmartCamera(Component):
"""
Switch to a different scheme/solution using the API as described in the documentation.
As described in section 5.11 of the documentation:
1. Set the SrcOperateSolutionName node as the target schema name
2. Set the Load Scenario command
3. Get Switch Protocol Progress (monitor ScPushType 2)
4. Refresh Module List
Args:
solution_name: The name of the solution to switch to
Returns:
bool: True if the scheme switching process was started successfully, False otherwise
"""
if not self.connected:
self.log.error("Cannot switch scheme: Camera not connected")
@ -212,16 +315,50 @@ class HikrobotSmartCamera(Component):
else:
self.log.info("Successfully sent project load command")
# Start monitoring progress
# 3. Start monitoring progress (ScPushType 2 - protocol loading progress)
self.log.info("Starting progress monitoring for scheme switching")
self.current_operation = "switch_scheme"
self.progress_timer.start(500) # Check every 500ms
# 4. Refresh Module List will be done automatically after the progress is complete
# in the check_progress method
return True
except Exception as e:
self.log.error(f"Error switching scheme: {e}")
return False
def switch_scheme_communications(self, solution_name, comm_str, comm_ret_str):
"""
Switch to a different scheme/solution using the Communications switching mode.
This method combines setting the Communications switching mode and then monitoring
the progress of the switch.
Args:
solution_name: The name of the solution to switch to
comm_str: The communication string
comm_ret_str: The communication return string
Returns:
bool: True if the scheme switching process was started successfully, False otherwise
"""
if not self.connected:
self.log.error("Cannot switch scheme: Camera not connected")
return False
# First set the Communications switching mode
if not self.set_scheme_switch_mode_communications(solution_name, comm_str, comm_ret_str):
self.log.error("Failed to set Communications switching mode")
return False
# Start monitoring progress for Communications scheme switching
self.log.info("Starting progress monitoring for Communications scheme switching")
self.current_operation = "switch_scheme_communications"
self.progress_timer.start(500) # Check every 500ms
return True
def get_ntp_parameters(self):
"""
Get the current NTP parameters from the camera.
@ -544,6 +681,132 @@ class HikrobotSmartCamera(Component):
self.log.error(f"Error getting current scheme: {e}")
return None
def set_scheme_switch_mode_off(self, solution_name):
"""
Set the scheme switching mode to OFF.
As described in section 5.12.1 of the documentation:
1. Set the DstOperateSolutionName node as the target schema name
2. Set the protocol switch mode to off
3. Set protocol parameters command
Args:
solution_name: The name of the solution to set
Returns:
bool: True if successful, False otherwise
"""
if not self.connected:
self.log.error("Cannot set scheme switch mode: Camera not connected")
return False
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot set scheme switch mode: No camera handles available")
return False
handle = self.cam_list[0]["handle"]
try:
self.log.info(f"Setting scheme switch mode to OFF for solution: {solution_name}")
# 1. Set the DstOperateSolutionName node as the target schema name
solution_name_bytes = solution_name.encode('utf-8')
nRet = mv_lib.MV_VS_SetStringValue(handle, b"SrcOperateSolutionName", solution_name_bytes)
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution name: {solution_name}, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
# 2. Set the protocol switch mode to off
nRet = mv_lib.MV_VS_SetEnumValueByString(handle, b"SolutionSwitchMode", b"off")
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution switch mode to OFF, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
# 3. Set protocol parameters command
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandSolutionSwitchParam")
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution switch parameters, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
self.log.info(f"Successfully set scheme switch mode to OFF for solution: {solution_name}")
return True
except Exception as e:
self.log.error(f"Error setting scheme switch mode to OFF: {e}")
return False
def set_scheme_switch_mode_communications(self, solution_name, comm_str, comm_ret_str):
"""
Set the scheme switching mode to Communications switching.
As described in section 5.12.2 of the documentation:
1. Set the DstOperateSolutionName node as the target schema name
2. Set the protocol switch mode to TriggerCommunication
3. Set the communication string
4. Set the communication return value
5. Set the protocol parameter command
Args:
solution_name: The name of the solution to set
comm_str: The communication string
comm_ret_str: The communication return string
Returns:
bool: True if successful, False otherwise
"""
if not self.connected:
self.log.error("Cannot set scheme switch mode: Camera not connected")
return False
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot set scheme switch mode: No camera handles available")
return False
handle = self.cam_list[0]["handle"]
try:
self.log.info(f"Setting scheme switch mode to Communications for solution: {solution_name}")
# 1. Set the DstOperateSolutionName node as the target schema name
solution_name_bytes = solution_name.encode('utf-8')
nRet = mv_lib.MV_VS_SetStringValue(handle, b"DstOperateSolutionName", solution_name_bytes)
if nRet != MV_VS_OK:
self.log.error(f"Failed to set destination solution name: {solution_name}, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
# 2. Set the protocol switch mode to TriggerCommunication
nRet = mv_lib.MV_VS_SetEnumValueByString(handle, b"SolutionSwitchMode", b"TriggerCommunication")
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution switch mode to TriggerCommunication, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
# 3. Set the communication string
comm_str_bytes = comm_str.encode('utf-8')
nRet = mv_lib.MV_VS_SetStringValue(handle, b"SolutionSwitchString", comm_str_bytes)
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution switch string: {comm_str}, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
# 4. Set the communication return value
comm_ret_str_bytes = comm_ret_str.encode('utf-8')
nRet = mv_lib.MV_VS_SetStringValue(handle, b"SolutionSwitchRetString", comm_ret_str_bytes)
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution switch return string: {comm_ret_str}, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
# 5. Set the protocol parameter command
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandSolutionSwitchParam")
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution switch parameters, error code: 0x{nRet&0xFFFFFFFF:x}")
return False
self.log.info(f"Successfully set scheme switch mode to Communications for solution: {solution_name}")
return True
except Exception as e:
self.log.error(f"Error setting scheme switch mode to Communications: {e}")
return False
def switch_scheme_sync(self, solution_name, timeout=None):
"""
Synchronously switch to a different scheme/solution and wait for completion.