cam recipe tbt

This commit is contained in:
neo-nb3 2025-08-21 22:32:06 +02:00
parent 7321aecd85
commit 9193b2cc6d

View File

@ -5,528 +5,29 @@ import time
from ctypes import *
import sys
from PyQt5.QtCore import pyqtSignal, QMutex, QTimer
from PyQt5.QtCore import pyqtSignal, QMutex
from PyQt5.QtWidgets import QMessageBox
from ..component import Component
from components.component import Component
from .hikrobot_dll import *
class HikrobotSmartCamera(Component):
# Signal to notify about progress updates
progress_signal = pyqtSignal(int, str)
def __init__(self, config=None, name=None, period=0.2, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.simulate = "--sim-camera" in sys.argv
self.num_cameras=1
self.num_cameras = 1
self.connected = False
self.ok_memory = True
self.ok_frames = []
self.ok_results =[]
self.solution_name = None
self.rotations = "0" # Default rotation value
# Progress monitoring
self.progress_timer = QTimer()
self.progress_timer.timeout.connect(self.check_progress)
self.current_operation = None
def check_progress(self):
"""
Check the progress of the current operation by monitoring push status and progress.
This is called periodically by the timer when an operation is in progress.
"""
if not self.connected or not self.current_operation:
self.log.debug("Progress check stopped: not connected or no operation in progress")
self.progress_timer.stop()
return
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Progress check stopped: no camera handles available")
self.progress_timer.stop()
return
handle = self.cam_list[0]["handle"]
# Get push type, state and rate
push_type = c_uint()
push_state = c_uint()
push_rate = c_uint()
try:
self.log.info("Getting push status information for operation: " + str(self.current_operation))
# Get additional camera status information for debugging
self.log.info("Current camera state before getting push status:")
self._debug_camera_state(handle)
nRet1 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushType", byref(push_type))
nRet2 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushState", byref(push_state))
nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScPushRate", byref(push_rate))
# Check for specific error codes
has_generic_error = False
if nRet1 != MV_VS_OK:
error_code = nRet1 & 0xFFFFFFFF
self.log.error(f"Failed to get ScPushType, error code: 0x{error_code:x}")
if error_code == 0x80030100: # MV_VS_E_GC_GENERIC
has_generic_error = True
if nRet2 != MV_VS_OK:
error_code = nRet2 & 0xFFFFFFFF
self.log.error(f"Failed to get ScPushState, error code: 0x{error_code:x}")
if error_code == 0x80030100: # MV_VS_E_GC_GENERIC
has_generic_error = True
if nRet3 != MV_VS_OK:
error_code = nRet3 & 0xFFFFFFFF
self.log.error(f"Failed to get ScPushRate, error code: 0x{error_code:x}")
if error_code == 0x80030100: # MV_VS_E_GC_GENERIC
has_generic_error = True
# If we encountered the generic error, log additional information
if has_generic_error:
self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100): General camera error")
self.log.info("This error often occurs during scheme switching when the camera is busy")
if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK:
self.log.info(f"Push status - Type: {push_type.value}, State: {push_state.value}, Rate: {push_rate.value}")
# Check which operation we're monitoring
if self.current_operation == "switch_scheme":
# For scheme switching, we expect push_type.value == 2
if push_type.value == 2: # LPRP 2 protocol loading progress
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
self.log.info(f"Scheme switching progress: {progress}%, Status: {status}")
# Always emit progress updates, even if no progress is made
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if push_state.value == 0: # Success
self.log.info("Scheme switching completed successfully")
# Refresh module list after successful scheme switching
self.refresh_module_list()
else: # Failed
self.log.error("Scheme switching failed")
else:
# If we're not getting the expected push type, log it and still update the UI
self.log.info(f"Waiting for push type 2 (protocol loading), current type: {push_type.value}")
# Send a progress update with the current status
status_message = f"Initializing (Type: {push_type.value})"
self.progress_signal.emit(10, status_message) # Use 10% as a placeholder for initialization
elif self.current_operation == "ntp_settings":
if push_type.value == 16: # NTPS 16 NTP Time Check Status
progress = push_rate.value
status = "Success" if push_state.value == 0 else "Failed"
self.log.info(f"NTP settings progress: {progress}%, Status: {status}")
self.progress_signal.emit(progress, status)
# If progress is complete or failed, stop the timer
if progress >= 100 or push_state.value == 1:
self.progress_timer.stop()
self.current_operation = None
if push_state.value == 0: # Success
self.log.info("NTP settings completed successfully")
else: # Failed
self.log.error("NTP settings failed")
else:
self.log.info(f"Waiting for push type 16 (NTP settings), current type: {push_type.value}")
# Send a progress update with the current status
status_message = f"Initializing (Type: {push_type.value})"
self.progress_signal.emit(10, status_message)
else:
self.log.warning(f"Unknown operation type: {self.current_operation}")
else:
# If we can't get the push status, log detailed error information
self.log.warning(f"Could not get push status information, error codes: 0x{nRet1&0xFFFFFFFF:x}, 0x{nRet2&0xFFFFFFFF:x}, 0x{nRet3&0xFFFFFFFF:x}")
# Try to get additional camera status for debugging
self._debug_camera_state(handle)
# Check if we have the generic error
if has_generic_error:
# For the generic error, we'll send a more specific status message
status_message = "Initializing (Camera busy)"
self.progress_signal.emit(15, status_message)
else:
# Send a progress update to the UI
status_message = "Initializing (Status retrieval failed)"
self.progress_signal.emit(5, status_message)
# After 10 seconds (20 checks at 500ms), force completion if we're still waiting
if not hasattr(self, '_progress_check_count'):
self._progress_check_count = 0
self._progress_check_count += 1
self.log.info(f"Progress check count: {self._progress_check_count}/20")
if self._progress_check_count >= 20:
current_op = self.current_operation
self.log.warning(f"Force completing operation after timeout: {current_op}")
self.progress_timer.stop()
self.current_operation = None
self._progress_check_count = 0
# If we were switching schemes, try to refresh the module list anyway
if current_op == "switch_scheme":
self.log.info("Attempting to refresh module list after timeout")
self.refresh_module_list()
except Exception as e:
self.log.error(f"Error checking progress: {e}")
self.progress_timer.stop()
self.current_operation = None
def _debug_camera_state(self, handle):
"""
Get additional camera state information for debugging purposes.
Args:
handle: Camera handle
"""
try:
# Try to get the current solution name
solution_name = create_string_buffer(256)
nRet = mv_lib.MV_VS_GetStringValue(handle, b"SrcOperateSolutionName", solution_name, 256)
if nRet == MV_VS_OK:
self.log.info(f"Current solution name: {solution_name.value.decode('utf-8')}")
else:
self.log.info(f"Failed to get solution name, error code: 0x{nRet&0xFFFFFFFF:x}")
# Try to get the device status
device_status = c_uint()
nRet = mv_lib.MV_VS_GetEnumValue(handle, b"DeviceStatus", byref(device_status))
if nRet == MV_VS_OK:
self.log.info(f"Device status: {device_status.value}")
else:
error_code = nRet & 0xFFFFFFFF
self.log.info(f"Failed to get device status, error code: 0x{error_code:x}")
# Special handling for 0x80030100 (MV_VS_E_GC_GENERIC)
if error_code == 0x80030100:
self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100): General camera error")
self.log.info("This error often occurs during scheme switching when the camera is busy")
# Try to get the acquisition status
acquisition_status = c_uint()
nRet = mv_lib.MV_VS_GetEnumValue(handle, b"AcquisitionStatus", byref(acquisition_status))
if nRet == MV_VS_OK:
self.log.info(f"Acquisition status: {acquisition_status.value}")
else:
error_code = nRet & 0xFFFFFFFF
self.log.info(f"Failed to get acquisition status, error code: 0x{error_code:x}")
# Special handling for 0x80030100 (MV_VS_E_GC_GENERIC)
if error_code == 0x80030100:
self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100): General camera error")
self.log.info("This error often occurs during scheme switching when the camera is busy")
except Exception as e:
self.log.error(f"Error getting debug camera state: {e}")
def refresh_module_list(self):
"""
Refresh the module list after switching schemes.
Returns:
bool: True if successful, False otherwise
"""
if not self.connected:
self.log.error("Cannot refresh module list: Camera not connected")
return False
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot refresh module list: No camera handles available")
return False
handle = self.cam_list[0]["handle"]
try:
# Command to refresh module list
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandRefreshModuleList")
if nRet != MV_VS_OK:
error_code = nRet & 0xFFFFFFFF
self.log.error(f"Failed to refresh module list, error code: 0x{error_code:x}")
# Special handling for 0x80030100 (MV_VS_E_GC_GENERIC)
if error_code == 0x80030100:
self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100) during module list refresh")
self.log.info("This is a non-critical error, continuing operation")
# We'll return True here to avoid treating this as a fatal error
return True
return False
self.log.info("Module list refreshed successfully")
return True
except Exception as e:
self.log.error(f"Error refreshing module list: {e}")
# Log the full exception traceback for debugging
import traceback
self.log.error(f"Exception traceback: {traceback.format_exc()}")
return False
def switch_scheme(self, solution_name, retry_count=0, max_retries=2):
"""
Switch to a different scheme/solution using the API as described in the documentation.
Args:
solution_name: The name of the solution to switch to
retry_count: Current retry attempt (used internally for recursion)
max_retries: Maximum number of retry attempts for error recovery
Returns:
bool: True if the scheme switching process was successfully initiated, False otherwise
"""
if not self.connected:
self.log.error("Cannot switch scheme: Camera not connected")
return False
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot switch scheme: No camera handles available")
return False
# Check if an operation is already in progress
if self.current_operation is not None:
self.log.warning(f"Cannot switch scheme: Another operation is already in progress: {self.current_operation}")
# Try to stop the current operation
if self.progress_timer.isActive():
self.log.warning(f"Stopping ongoing operation: {self.current_operation}")
self.progress_timer.stop()
self.current_operation = None
handle = self.cam_list[0]["handle"]
# Get current camera state for debugging
self.log.info("Getting camera state before scheme switch:")
self._debug_camera_state(handle)
try:
# Check if the solution name is valid (not empty)
if not solution_name or solution_name.strip() == "":
self.log.error("Cannot switch scheme: Solution name is empty")
return False
# 1. Set the SrcOperateSolutionName node as the target schema name
solution_name_bytes = solution_name.encode('utf-8')
self.log.info(f"Setting solution name: {solution_name} (bytes: {solution_name_bytes})")
nRet = mv_lib.MV_VS_SetStringValue(handle, b"SrcOperateSolutionName", solution_name_bytes)
if nRet != MV_VS_OK:
error_code = nRet & 0xFFFFFFFF
self.log.error(f"Failed to set solution name: {solution_name}, error code: 0x{error_code:x}")
# Try to get more information about the error
self._debug_camera_state(handle)
else:
self.log.info(f"Successfully set solution name: {solution_name}")
# 2. Set the Load Scenario command
self.log.info("Sending CommandProjectLoad command")
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandProjectLoad")
if nRet != MV_VS_OK:
error_code = nRet & 0xFFFFFFFF
self.log.error(f"Failed to load project, error code: 0x{error_code:x}")
# Try to get more information about the error
self._debug_camera_state(handle)
return False
else:
self.log.info("Successfully sent project load command")
# Reset progress check count
self._progress_check_count = 0
# Start monitoring progress
self.log.info("Starting progress monitoring for scheme switching")
self.current_operation = "switch_scheme"
self.progress_timer.start(500) # Check every 500ms
return True
except Exception as e:
self.log.error(f"Error switching scheme: {e}")
# Log the full exception traceback for debugging
import traceback
self.log.error(f"Exception traceback: {traceback.format_exc()}")
return False
def get_ntp_parameters(self):
"""
Get the current NTP parameters from the camera.
Returns:
dict: A dictionary containing the NTP parameters (server_ip, port, interval)
or None if there was an error
"""
if not self.connected:
self.log.error("Cannot get NTP parameters: Camera not connected")
return None
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot get NTP parameters: No camera handles available")
return None
handle = self.cam_list[0]["handle"]
try:
# Get NTP server IP
server_ip = c_uint()
nRet1 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPServerIP", byref(server_ip))
# Get NTP port
port = c_uint()
nRet2 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPPort", byref(port))
# Get NTP interval
interval = c_uint()
nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPInterval", byref(interval))
if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK:
# Convert IP from integer to string format
ip_str = f"{(server_ip.value >> 24) & 0xFF}.{(server_ip.value >> 16) & 0xFF}.{(server_ip.value >> 8) & 0xFF}.{server_ip.value & 0xFF}"
return {
"server_ip": ip_str,
"port": port.value,
"interval": interval.value
}
else:
self.log.error("Failed to get NTP parameters")
return None
except Exception as e:
self.log.error(f"Error getting NTP parameters: {e}")
return None
def set_ntp_parameters(self, server_ip, port, interval):
"""
Set the NTP parameters on the camera.
Args:
server_ip (str): The NTP server IP address in format "xxx.xxx.xxx.xxx"
port (int): The NTP server port
interval (int): The NTP sync interval
Returns:
bool: True if successful, False otherwise
"""
if not self.connected:
self.log.error("Cannot set NTP parameters: Camera not connected")
return False
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot set NTP parameters: No camera handles available")
return False
handle = self.cam_list[0]["handle"]
try:
# Convert IP string to integer format
ip_parts = server_ip.split('.')
if len(ip_parts) != 4:
self.log.error(f"Invalid IP address format: {server_ip}")
return False
ip_int = (int(ip_parts[0]) << 24) + (int(ip_parts[1]) << 16) + (int(ip_parts[2]) << 8) + int(ip_parts[3])
# Set NTP server IP
nRet1 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPServerIP", ip_int)
# Set NTP port
nRet2 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPPort", port)
# Set NTP interval
nRet3 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPInterval", interval)
if nRet1 != MV_VS_OK or nRet2 != MV_VS_OK or nRet3 != MV_VS_OK:
self.log.error("Failed to set NTP parameters")
return False
# Confirm the settings
nRet = mv_lib.MV_VS_SetCommandValue(handle, b"ScDeviceSettingTimeCommand")
if nRet != MV_VS_OK:
self.log.error("Failed to confirm NTP settings")
return False
# Start monitoring progress for NTP settings
self.current_operation = "ntp_settings"
self.progress_timer.start(500) # Check every 500ms
return True
except Exception as e:
self.log.error(f"Error setting NTP parameters: {e}")
return False
def update_solution_name(self, recipe_name):
"""
This method is called when the vision component's recipe changes.
It updates the solution_name attribute and triggers a scheme switch.
It also attempts to get the rotations value from the vision component if available.
"""
self.log.info(f"Updating solution name to: {recipe_name}")
self.solution_name = recipe_name
# Try to get rotations from vision component if available
try:
if hasattr(self, "components") and "vision" in self.components and hasattr(self.components["vision"], "vision_config"):
if self.components["vision"].vision_config is not None and "rotations" in self.components["vision"].vision_config:
self.rotations = self.components["vision"].vision_config["rotations"]
self.log.info(f"Updated rotations to: {self.rotations}")
except Exception as e:
self.log.warning(f"Could not update rotations: {e}")
# Store the solution name for later use, but don't switch schemes yet
# The actual scheme switching will be handled by the Test_Vision component
# when it calls start() and checks if a scheme switch is needed
if not self.connected:
self.log.info("Camera not connected, will switch scheme after connection")
self.reconfigure() # This will connect and then switch scheme
else:
# Stop any ongoing operation
if self.current_operation:
self.log.warning(f"Stopping ongoing operation: {self.current_operation}")
self.progress_timer.stop()
self.current_operation = None
# Start the scheme switching process (asynchronously)
# The Test_Vision component will wait for this to complete
self.switch_scheme(recipe_name)
self.ok_results = []
def config_changed(self):
self.connected = False
global MV_VS_OK
device_info_list = MV_VS_DEVICE_INFO_LIST()
nRet = mv_lib.MV_VS_EnumDevices(ctypes.byref(device_info_list))
enum_ok = False
for n in range(3):
if nRet != MV_VS_OK:
@ -534,7 +35,6 @@ class HikrobotSmartCamera(Component):
return -2
else:
print("Enum devices OK")
if device_info_list.nDeviceNum > 0:
print_device_info(device_info_list)
if device_info_list.nDeviceNum >= self.num_cameras:
@ -572,6 +72,79 @@ class HikrobotSmartCamera(Component):
if not login_ok:
return -2
time.sleep(0.2)
# --- Start of Added Code: Switch Scheme ---
# As per SDK PDF Chapter 5.11, this section switches the camera's active scheme.
# It reads a comma-separated list of scheme names from the config file.
try:
solution_name=self.config.vision_config.get("solution_name", None)
if solution_name is not None:
scheme_name = solution_name
self.log.info(f"CAM{cam_idx} Attempting to switch to scheme: {scheme_name.decode()}")
# Step 1: Set the scheme name to be switched using MV_VS_SetStringValue.
# The node name is "SolutionName".
nRet = mv_lib.MV_VS_SetStringValue(pHandle, b"SolutionName", c_char_p(scheme_name))
if nRet != MV_VS_OK:
self.log.error(f"CAM{cam_idx} Set SolutionName failed! nRet [{nRet & 0xFFFFFFFF:#x}]")
# Clean up resources before returning
mv_lib.MV_VS_Logout(pHandle)
mv_lib.MV_VS_DestroyHandle(pHandle)
return -2
# Step 2: Call the scheme switch command using MV_VS_SetCommandValue.
# The command is "CommandSolutionSwitch".
nRet = mv_lib.MV_VS_SetCommandValue(pHandle, b"CommandSolutionSwitch")
if nRet != MV_VS_OK:
self.log.error(f"CAM{cam_idx} CommandSolutionSwitch failed! nRet [{nRet & 0xFFFFFFFF:#x}]")
# Clean up resources before returning
mv_lib.MV_VS_Logout(pHandle)
mv_lib.MV_VS_DestroyHandle(pHandle)
return -2
self.log.info(f"CAM{cam_idx} Switched to scheme '{scheme_name.decode()}' successfully.")
# Step 3: Wait for the scheme switch to complete.
# As per SDK PDF Chapter 3.3.2, we poll "SolutionSwitchStatus".
# A value of 0 indicates the switch is complete.
self.log.info(f"CAM{cam_idx} Waiting for scheme switch to complete...")
switch_timeout = 30 # 30-second timeout for the switch
start_time = time.time()
while True:
if time.time() - start_time > switch_timeout:
self.log.error(f"CAM{cam_idx} Timed out waiting for scheme switch.")
mv_lib.MV_VS_Logout(pHandle)
mv_lib.MV_VS_DestroyHandle(pHandle)
return -2
# Create a variable to store the status
status = ctypes.c_int64()
nRet = mv_lib.MV_VS_GetIntValue(pHandle, b"SolutionSwitchStatus", byref(status))
if nRet != MV_VS_OK:
self.log.error(f"CAM{cam_idx} Get SolutionSwitchStatus failed! nRet [{nRet & 0xFFFFFFFF:#x}]")
mv_lib.MV_VS_Logout(pHandle)
mv_lib.MV_VS_DestroyHandle(pHandle)
return -2
if status.value == 0:
self.log.info(f"CAM{cam_idx} Scheme switch completed successfully.")
break
else:
self.log.info(f"CAM{cam_idx} Scheme switch not yet complete. Waiting...")
time.sleep(1)
else:
self.log.warning(f"CAM{cam_idx} No scheme name defined in config for this camera index. Using default scheme.")
except KeyError:
self.log.warning("'schemes' key not found in vision_config. Using camera's default/current scheme.")
except Exception as e:
self.log.error(f"CAM{cam_idx} An unexpected error occurred during scheme switching: {e}")
# Clean up resources before returning
mv_lib.MV_VS_Logout(pHandle)
mv_lib.MV_VS_DestroyHandle(pHandle)
return -2
# --- End of Added Code ---
nRet = mv_lib.MV_VS_SetBoolValue(pHandle, b"CommandImageMode", 0)
if nRet != MV_VS_OK:
self.log.error(f"CAM{cam_idx} CommandImageMode failed! [{nRet&0xFFFFFFFF:#x}]")
@ -581,17 +154,12 @@ class HikrobotSmartCamera(Component):
self.log.error(f"CAM{cam_idx} AcquisitionMode failed! [{nRet&0xFFFFFFFF:#x}]")
return -2
# Set the camera recipe - we'll use the switch_scheme method after connection is established
# This will be handled after the camera is fully initialized
# Start the camera test
nRet = mv_lib.MV_VS_StartRun(pHandle)
if nRet != MV_VS_OK:
self.log.error(f"CAM{cam_idx} Start run failed! [{nRet & 0xFFFFFFFF:#x}]")
self.connected = True
@Component.reconfig_on_error
def _get(self):
if not self.connected:
@ -600,142 +168,47 @@ class HikrobotSmartCamera(Component):
return
concat_frame = None
concat_results = []
try:
rot = self.rotations.split(",")
for cam_idx in range(self.num_cameras):
try:
cam = self.cam_list[cam_idx]
self.log.info(f"GET FRAME CAMERA # {cam_idx}")
# Initialize default values in case of error
frame = None
res = True # Default to error condition
if self.ok_results[cam_idx]:
# keep last good result
frame = self.ok_frames[cam_idx]
res = False
else:
try:
# Trigger acquisition & test execution
nRet = mv_lib.MV_VS_SetCommandValue(cam["handle"], b"AcquisitionStart")
if nRet != MV_VS_OK:
error_code = nRet & 0xFFFFFFFF
self.log.error(f"CAM{cam_idx} AcquisitionStart failed! [0x{error_code:x}]")
# Special handling for 0x80030100 (MV_VS_E_GC_GENERIC)
if error_code == 0x80030100:
self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100) during acquisition")
self.log.info("This is a non-critical error, using last good frame if available")
# Use last good frame if available
if self.ok_frames[cam_idx] is not None:
frame = self.ok_frames[cam_idx]
res = False
else:
# No good frame available, mark as disconnected
self.log.error("No previous good frame available")
self.connected = False
frame_res = None
else:
# For other errors, mark as disconnected
self.connected = False
frame_res = None
else:
# get frame
frame_res = MV_VS_GetFrame(cam["handle"])
rot = self.config.vision_config["rotations"].split(",")
for cam_idx in range(self.num_cameras):
cam = self.cam_list[cam_idx]
self.log.info(f"GET FRAME CAMERA # {cam_idx}")
if self.ok_results[cam_idx]:
# keep last good result
frame = self.ok_frames[cam_idx]
res = False
else:
# Trigger acquisition & test execution
nRet = mv_lib.MV_VS_SetCommandValue(cam["handle"], b"AcquisitionStart")
if nRet != MV_VS_OK:
self.log.error(f"CAM{cam_idx} AcquisitionStart failed! [{nRet&0xFFFFFFFF:#x}]")
self.connected = False
frame_res = None
else:
# get frame
frame_res = MV_VS_GetFrame(cam["handle"])
if frame_res is not None:
try:
# Safely access the result data with error checking
if "res" in frame_res and "ScDeviceSolutionRunningResult" in frame_res["res"]:
res = frame_res["res"]["ScDeviceSolutionRunningResult"]
frame = frame_res["frame"]
# rotate frame
if len(rot) > cam_idx: # Check if rotation info exists for this camera
if rot[cam_idx] == "1":
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
if rot[cam_idx] == "2":
frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
if rot[cam_idx] == "3":
frame = cv2.rotate(frame, cv2.ROTATE_180)
else:
self.log.warning(f"CAM{cam_idx} Invalid frame result format")
# Use last good frame if available
if self.ok_frames[cam_idx] is not None:
frame = self.ok_frames[cam_idx]
res = False
else:
self.connected = False
except Exception as e:
self.log.error(f"Error processing frame for CAM{cam_idx}: {e}")
# Use last good frame if available
if self.ok_frames[cam_idx] is not None:
frame = self.ok_frames[cam_idx]
res = False
else:
self.connected = False
else:
self.log.warning(f"CAM{cam_idx} Failed to get frame")
# Use last good frame if available
if self.ok_frames[cam_idx] is not None:
frame = self.ok_frames[cam_idx]
res = False
else:
self.connected = False
except Exception as e:
self.log.error(f"Error during frame acquisition for CAM{cam_idx}: {e}")
# Log the full exception traceback for debugging
import traceback
self.log.error(f"Exception traceback: {traceback.format_exc()}")
# Use last good frame if available
if self.ok_frames[cam_idx] is not None:
frame = self.ok_frames[cam_idx]
res = False
else:
self.connected = False
# Skip this camera if we couldn't get a valid frame
if frame is None:
self.log.warning(f"Skipping CAM{cam_idx} due to missing frame")
continue
# Process the frame
if concat_frame is None:
concat_frame = copy.deepcopy(frame)
else:
try:
concat_frame = cv2.hconcat([concat_frame, frame])
except Exception as e:
self.log.error(f"Error concatenating frames: {e}")
# Use only the first frame if concatenation fails
if concat_frame is None and frame is not None:
concat_frame = copy.deepcopy(frame)
concat_results.append(not res)
if not res:
self.ok_results[cam_idx] = True
self.ok_frames[cam_idx] = copy.deepcopy(frame)
except Exception as e:
self.log.error(f"Error processing camera {cam_idx}: {e}")
# Continue with next camera
continue
# If we couldn't get any frames, return
if frame_res is not None:
res = frame_res["res"]["ScDeviceSolutionRunningResult"]
frame = frame_res["frame"]
# rotate frame
if rot[cam_idx] == "1":
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
if rot[cam_idx] == "2":
frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
if rot[cam_idx] == "3":
frame = cv2.rotate(frame, cv2.ROTATE_180)
else:
self.connected = False
if concat_frame is None:
self.log.error("No valid frames obtained from any camera")
return
super()._get([concat_frame, self.ok_results])
except Exception as e:
self.log.error(f"Error in _get method: {e}")
# Log the full exception traceback for debugging
import traceback
self.log.error(f"Exception traceback: {traceback.format_exc()}")
# Don't propagate the exception to avoid crashing the application
concat_frame = copy.deepcopy(frame)
else:
concat_frame = cv2.hconcat([concat_frame, frame])
concat_results.append(not res)
if not res:
self.ok_results[cam_idx] = True
self.ok_frames[cam_idx] = copy.deepcopy(frame)
super()._get([concat_frame, self.ok_results])
def resume(self):
self.log.info(f"RESUMING")
@ -749,122 +222,9 @@ class HikrobotSmartCamera(Component):
if len(self.cam_list) == self.num_cameras:
# Dummy acquisition for each camera to avoid reading past images
for cam_idx in range(self.num_cameras):
try:
cam = self.cam_list[cam_idx]
nRet = mv_lib.MV_VS_SetCommandValue(cam["handle"], b"AcquisitionStart")
if nRet != MV_VS_OK:
error_code = nRet & 0xFFFFFFFF
self.log.warning(f"CAM{cam_idx} AcquisitionStart warning, error code: 0x{error_code:x}")
# Special handling for 0x80030100 (MV_VS_E_GC_GENERIC)
if error_code == 0x80030100:
self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100) during acquisition start")
self.log.info("This is a non-critical error, continuing operation")
# Skip frame acquisition for this camera
continue
# Only try to get a frame if AcquisitionStart was successful
frame_res = MV_VS_GetFrame(cam["handle"])
# Check if frame acquisition was successful
if frame_res is None:
self.log.warning(f"CAM{cam_idx} Failed to get initial frame, but continuing operation")
except Exception as e:
self.log.error(f"Error during camera {cam_idx} initialization: {e}")
# Log the full exception traceback for debugging
import traceback
self.log.error(f"Exception traceback: {traceback.format_exc()}")
# Continue with other cameras
continue
cam = self.cam_list[cam_idx]
nRet = mv_lib.MV_VS_SetCommandValue(cam["handle"], b"AcquisitionStart")
frame_res = MV_VS_GetFrame(cam["handle"])
else:
QMessageBox.critical(None, "Errore", f"ERRORE CONNESSIONE TELECAMERE\n")
super().resume()
def get_current_scheme(self):
"""
Get the current scheme/solution name from the camera.
Returns:
str: The current scheme name or None if there was an error
"""
if not self.connected:
self.log.error("Cannot get current scheme: Camera not connected")
return None
# Get the first camera handle
if len(self.cam_list) == 0:
self.log.error("Cannot get current scheme: No camera handles available")
return None
handle = self.cam_list[0]["handle"]
try:
# Get the current solution name
solution_name = create_string_buffer(256)
nRet = mv_lib.MV_VS_GetStringValue(handle, b"SrcOperateSolutionName", solution_name, 256)
if nRet != MV_VS_OK:
self.log.error(f"Failed to get current solution name, error code: 0x{nRet&0xFFFFFFFF:x}")
return None
current_scheme = solution_name.value.decode('utf-8')
self.log.info(f"Current scheme: {current_scheme}")
return current_scheme
except Exception as e:
self.log.error(f"Error getting current scheme: {e}")
return None
def switch_scheme_sync(self, solution_name, timeout=30):
"""
Synchronously switch to a different scheme/solution and wait for completion.
Args:
solution_name: The name of the solution to switch to
timeout: Maximum time in seconds to wait for scheme switching to complete (default: 30)
Returns:
bool: True if successful, False otherwise
"""
if not self.connected:
self.log.error("Cannot switch scheme: Camera not connected")
return False
# Get current scheme for comparison
current_scheme = self.get_current_scheme()
if current_scheme == solution_name:
self.log.info(f"Already using scheme: {solution_name}")
return True
# Start the scheme switching process
success = self.switch_scheme(solution_name)
if not success:
self.log.error(f"Failed to initiate scheme switch to: {solution_name}")
return False
# Wait for the scheme switching to complete with timeout
self.log.info(f"Waiting for scheme switch to complete (timeout: {timeout}s)")
start_time = time.time()
while self.current_operation == "switch_scheme":
# Sleep a bit to avoid busy waiting
time.sleep(0.5)
# Check if we've exceeded the timeout
if timeout is not None and (time.time() - start_time) > timeout:
self.log.warning(f"Scheme switch timeout after {timeout} seconds")
# Force stop the progress monitoring
if self.progress_timer.isActive():
self.progress_timer.stop()
self.current_operation = None
break
# Verify the switch was successful by checking the current scheme
new_scheme = self.get_current_scheme()
if new_scheme == solution_name:
self.log.info(f"Successfully switched to scheme: {solution_name}")
return True
else:
self.log.warning(f"Scheme switch verification failed. Expected: {solution_name}, Got: {new_scheme}")
return False