This commit is contained in:
edo-neo 2025-08-21 09:38:45 +02:00
parent 9a514bc064
commit a6fbd4671a

View File

@ -5,7 +5,7 @@ import time
from ctypes import *
import sys
from PyQt5.QtCore import pyqtSignal, QMutex
from PyQt5.QtCore import pyqtSignal, QMutex, QTimer
from PyQt5.QtWidgets import QMessageBox
from ..component import Component
@ -14,6 +14,9 @@ from .hikrobot_dll import *
class HikrobotSmartCamera(Component):
# Signal to notify about progress updates
progress_signal = pyqtSignal(int, str)
def __init__(self, config=None, name=None, period=0.2, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.simulate = "--sim-camera" in sys.argv
@ -25,6 +28,259 @@ class HikrobotSmartCamera(Component):
self.solution_name = None
self.rotations = "0" # Default rotation value
# Progress monitoring
self.progress_timer = QTimer()
self.progress_timer.timeout.connect(self.check_progress)
self.current_operation = None
def check_progress(self):
"""
Check the progress of the current operation by monitoring push status and progress.
This is called periodically by the timer when an operation is in progress.
"""
if not self.connected or not self.current_operation:
self.progress_timer.stop()
return
# Get the first camera handle
if len(self.cam_list) == 0:
self.progress_timer.stop()
return
handle = self.cam_list[0]["handle"]
# Get push type, state and rate
push_type = c_uint()
push_state = c_uint()
push_rate = c_uint()
try:
nRet1 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushType", byref(push_type))
nRet2 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushState", byref(push_state))
nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScPushRate", byref(push_rate))
if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK:
# Check which operation we're monitoring
if self.current_operation == "switch_scheme" and push_type.value == 2: # LPRP 2 protocol loading progress
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
self.log.info(f"Scheme switching progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if push_state.value == 0: # Success
self.log.info("Scheme switching completed successfully")
# Refresh module list after successful scheme switching
self.refresh_module_list()
else: # Failed
self.log.error("Scheme switching failed")
elif self.current_operation == "ntp_settings" and push_type.value == 16: # NTPS 16 NTP Time Check Status
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
self.log.info(f"NTP settings progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if push_state.value == 0: # Success
self.log.info("NTP settings completed successfully")
else: # Failed
self.log.error("NTP settings failed")
except Exception as e:
self.log.error(f"Error checking progress: {e}")
self.progress_timer.stop()
self.current_operation = None
def refresh_module_list(self):
"""
Refresh the module list after switching schemes.
Returns:
bool: True if successful, False otherwise
"""
if not self.connected:
self.log.error("Cannot refresh module list: Camera not connected")
return False
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot refresh module list: No camera handles available")
return False
handle = self.cam_list[0]["handle"]
try:
# Command to refresh module list
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandRefreshModuleList")
if nRet != MV_VS_OK:
self.log.error("Failed to refresh module list")
return False
self.log.info("Module list refreshed successfully")
return True
except Exception as e:
self.log.error(f"Error refreshing module list: {e}")
return False
def switch_scheme(self, solution_name):
"""
Switch to a different scheme/solution using the API as described in the documentation.
Args:
solution_name: The name of the solution to switch to
"""
if not self.connected:
self.log.error("Cannot switch scheme: Camera not connected")
return False
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot switch scheme: No camera handles available")
return False
handle = self.cam_list[0]["handle"]
try:
# 1. Set the SrcOperateSolutionName node as the target schema name
solution_name_bytes = solution_name.encode('utf-8')
nRet = mv_lib.MV_VS_SetStringValue(handle, b"SrcOperateSolutionName", solution_name_bytes)
if nRet != MV_VS_OK:
self.log.error(f"Failed to set solution name: {solution_name}")
return False
# 2. Set the Load Scenario command
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandProjectLoad")
if nRet != MV_VS_OK:
self.log.error("Failed to load project")
return False
# Start monitoring progress
self.current_operation = "switch_scheme"
self.progress_timer.start(500) # Check every 500ms
return True
except Exception as e:
self.log.error(f"Error switching scheme: {e}")
return False
def get_ntp_parameters(self):
"""
Get the current NTP parameters from the camera.
Returns:
dict: A dictionary containing the NTP parameters (server_ip, port, interval)
or None if there was an error
"""
if not self.connected:
self.log.error("Cannot get NTP parameters: Camera not connected")
return None
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot get NTP parameters: No camera handles available")
return None
handle = self.cam_list[0]["handle"]
try:
# Get NTP server IP
server_ip = c_uint()
nRet1 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPServerIP", byref(server_ip))
# Get NTP port
port = c_uint()
nRet2 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPPort", byref(port))
# Get NTP interval
interval = c_uint()
nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPInterval", byref(interval))
if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK:
# Convert IP from integer to string format
ip_str = f"{(server_ip.value >> 24) & 0xFF}.{(server_ip.value >> 16) & 0xFF}.{(server_ip.value >> 8) & 0xFF}.{server_ip.value & 0xFF}"
return {
"server_ip": ip_str,
"port": port.value,
"interval": interval.value
}
else:
self.log.error("Failed to get NTP parameters")
return None
except Exception as e:
self.log.error(f"Error getting NTP parameters: {e}")
return None
def set_ntp_parameters(self, server_ip, port, interval):
"""
Set the NTP parameters on the camera.
Args:
server_ip (str): The NTP server IP address in format "xxx.xxx.xxx.xxx"
port (int): The NTP server port
interval (int): The NTP sync interval
Returns:
bool: True if successful, False otherwise
"""
if not self.connected:
self.log.error("Cannot set NTP parameters: Camera not connected")
return False
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot set NTP parameters: No camera handles available")
return False
handle = self.cam_list[0]["handle"]
try:
# Convert IP string to integer format
ip_parts = server_ip.split('.')
if len(ip_parts) != 4:
self.log.error(f"Invalid IP address format: {server_ip}")
return False
ip_int = (int(ip_parts[0]) << 24) + (int(ip_parts[1]) << 16) + (int(ip_parts[2]) << 8) + int(ip_parts[3])
# Set NTP server IP
nRet1 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPServerIP", ip_int)
# Set NTP port
nRet2 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPPort", port)
# Set NTP interval
nRet3 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPInterval", interval)
if nRet1 != MV_VS_OK or nRet2 != MV_VS_OK or nRet3 != MV_VS_OK:
self.log.error("Failed to set NTP parameters")
return False
# Confirm the settings
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"ScDeviceSettingTimeCommand")
if nRet != MV_VS_OK:
self.log.error("Failed to confirm NTP settings")
return False
# Start monitoring progress for NTP settings
self.current_operation = "ntp_settings"
self.progress_timer.start(500) # Check every 500ms
return True
except Exception as e:
self.log.error(f"Error setting NTP parameters: {e}")
return False
def update_solution_name(self, recipe_name):
"""
This method is called when the vision component's recipe changes.
@ -43,8 +299,12 @@ class HikrobotSmartCamera(Component):
self.log.info(f"Updated rotations to: {self.rotations}")
except Exception as e:
self.log.warning(f"Could not update rotations: {e}")
self.reconfigure()
# Use the new switch_scheme method instead of reconfiguring
if self.connected:
self.switch_scheme(recipe_name)
else:
self.reconfigure()
def config_changed(self):
self.connected = False
@ -107,17 +367,20 @@ class HikrobotSmartCamera(Component):
self.log.error(f"CAM{cam_idx} AcquisitionMode failed! [{nRet&0xFFFFFFFF:#x}]")
return -2
# Set the camera recipe
if self.solution_name is not None:
mv_lib.MV_VS_SetStringValue(pHandle, "SrcOperateSolutionName", self.solution_name)
mv_lib.MV_VS_SetCommandValue(pHandle, "CommandProjectLoad")
time.sleep(5)
# Set the camera recipe - we'll use the switch_scheme method after connection is established
# This will be handled after the camera is fully initialized
# Start the camera test
nRet = mv_lib.MV_VS_StartRun(pHandle)
if nRet != MV_VS_OK:
self.log.error(f"CAM{cam_idx} Start run failed! [{nRet & 0xFFFFFFFF:#x}]")
self.connected = True
# Now that the camera is connected, switch to the solution if one is set
if self.solution_name is not None:
self.log.info(f"Switching to solution: {self.solution_name}")
self.switch_scheme(self.solution_name)
@Component.reconfig_on_error
def _get(self):