Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ef4e98cf6b | ||
| 506587283c | |||
| 44010dba02 | |||
| f8e5f5d454 |
|
|
@ -26,8 +26,12 @@ poll_time: 10
|
||||||
hold_time: 10
|
hold_time: 10
|
||||||
|
|
||||||
[tecna_t3]
|
[tecna_t3]
|
||||||
port: /dev/ttyUSB0
|
model = t3l
|
||||||
model: t3l
|
method = tcp
|
||||||
|
host = 10.10.10.3
|
||||||
|
port = 502
|
||||||
|
unit = 1
|
||||||
|
timeout = 3
|
||||||
|
|
||||||
[fixture_rfid]
|
[fixture_rfid]
|
||||||
port: USB1
|
port: USB1
|
||||||
|
|
@ -46,6 +50,7 @@ description_field: descrizione
|
||||||
[label_printer]
|
[label_printer]
|
||||||
platform: linux
|
platform: linux
|
||||||
printer: Zebra_Technologies_ZTC_ZD421-203dpi_ZPL
|
printer: Zebra_Technologies_ZTC_ZD421-203dpi_ZPL
|
||||||
|
risoluzione: 203
|
||||||
|
|
||||||
[recipes_defaults]
|
[recipes_defaults]
|
||||||
tester_discharge_enable: yes
|
tester_discharge_enable: yes
|
||||||
|
|
|
||||||
|
|
@ -3,24 +3,26 @@ import traceback
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
import pymodbus
|
import pymodbus
|
||||||
from PyQt5.QtWidgets import QMessageBox
|
|
||||||
|
|
||||||
|
# Conditionally import real or dummy serial library
|
||||||
if "--sim-serial" in sys.argv:
|
if "--sim-serial" in sys.argv:
|
||||||
from components.dummies.serial import serial
|
from components.dummies.serial import serial
|
||||||
else:
|
else:
|
||||||
import serial
|
import serial
|
||||||
|
|
||||||
from pymodbus.constants import Endian
|
from pymodbus.constants import Endian
|
||||||
# from pymodbus.exceptions import ModbusIOException
|
|
||||||
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
|
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
|
||||||
|
|
||||||
|
# Conditionally import real or dummy Modbus clients
|
||||||
if "--sim-modbus" not in sys.argv:
|
if "--sim-modbus" not in sys.argv:
|
||||||
from pymodbus.client import ModbusSerialClient as ModbusClient
|
from pymodbus.client import ModbusSerialClient
|
||||||
|
from pymodbus.client import ModbusTcpClient
|
||||||
else:
|
else:
|
||||||
from components.dummies.pymodbus import ModbusClient
|
from components.dummies.pymodbus import ModbusSerialClient
|
||||||
|
# Assuming a dummy TCP client exists for simulation purposes
|
||||||
|
from components.dummies.pymodbus import ModbusTcpClient
|
||||||
|
|
||||||
from PyQt5.QtCore import QMutex, pyqtSignal
|
from PyQt5.QtCore import QMutex, pyqtSignal
|
||||||
|
|
||||||
from .component import Component
|
from .component import Component
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -31,25 +33,34 @@ class ModbusComponent(Component):
|
||||||
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
|
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
|
||||||
self.registers = registers if registers is not None else {}
|
self.registers = registers if registers is not None else {}
|
||||||
self.lock = QMutex()
|
self.lock = QMutex()
|
||||||
|
self.client = None
|
||||||
|
self.unit = 1
|
||||||
|
|
||||||
def config_changed(self):
|
def config_changed(self):
|
||||||
|
# Read shared and distinguishing configuration values
|
||||||
|
self.method = self.config[self.name].get("method", "rtu").lower()
|
||||||
|
self.timeout = int(self.config[self.name].get("timeout", 1))
|
||||||
|
self.unit = int(self.config[self.name].get("unit", 1)) # Slave ID
|
||||||
|
self.byteorder = getattr(Endian, self.config[self.name].get("byteorder", "Big").upper())
|
||||||
|
self.wordorder = getattr(Endian, self.config[self.name].get("wordorder", "Little").upper())
|
||||||
|
|
||||||
# Read configuration values
|
# Lock to ensure thread safety during re-initialization
|
||||||
self.method = self.config[self.name].get("method", "rtu").lower()
|
self.lock.lock()
|
||||||
self.port = self.config[self.name]["port"]
|
try:
|
||||||
self.baudrate = int(self.config[self.name]["baudrate"])
|
# Close existing client connection if it exists
|
||||||
self.stopbits = getattr(serial, self.config[self.name].get("stopbits", "stopbits_one").upper())
|
if self.client and self.client.is_socket_open():
|
||||||
self.parity = getattr(serial, self.config[self.name].get("parity", "parity_none").upper())
|
self.client.close()
|
||||||
self.bytesize = getattr(serial, self.config[self.name].get("bytesize", "eightbits").upper())
|
|
||||||
self.byteorder = getattr(Endian, self.config[self.name].get("byteorder", "Big").upper())
|
|
||||||
self.wordorder = getattr(Endian, self.config[self.name].get("wordorder", "Little").upper())
|
|
||||||
self.timeout = int(self.config[self.name].get("timeout", 1))
|
|
||||||
|
|
||||||
# Lock the interaction to ensure thread safety
|
# Initialize the appropriate Modbus client based on the method
|
||||||
self.lock.lock()
|
if self.method == 'rtu':
|
||||||
try:
|
# Serial (RTU) specific configuration
|
||||||
# Initialize the Modbus client
|
self.port = self.config[self.name]["port"]
|
||||||
self.client = ModbusClient(
|
self.baudrate = int(self.config[self.name]["baudrate"])
|
||||||
|
self.stopbits = getattr(serial, self.config[self.name].get("stopbits", "stopbits_one").upper())
|
||||||
|
self.parity = getattr(serial, self.config[self.name].get("parity", "parity_none").upper())
|
||||||
|
self.bytesize = getattr(serial, self.config[self.name].get("bytesize", "eightbits").upper())
|
||||||
|
|
||||||
|
self.client = ModbusSerialClient(
|
||||||
method=self.method,
|
method=self.method,
|
||||||
port=self.port,
|
port=self.port,
|
||||||
stopbits=self.stopbits,
|
stopbits=self.stopbits,
|
||||||
|
|
@ -59,41 +70,56 @@ class ModbusComponent(Component):
|
||||||
timeout=self.timeout,
|
timeout=self.timeout,
|
||||||
strict=False,
|
strict=False,
|
||||||
)
|
)
|
||||||
if not self.client.connect():
|
connection_details = f"port {self.port}"
|
||||||
raise ConnectionError(f"Cannot connect to Modbus on port {self.port}")
|
elif self.method == 'tcp':
|
||||||
|
# Ethernet (TCP) specific configuration
|
||||||
|
self.host = self.config[self.name]["host"]
|
||||||
|
self.port = int(self.config[self.name].get("port", 502))
|
||||||
|
|
||||||
if not self.client.is_socket_open():
|
self.client = ModbusTcpClient(
|
||||||
raise ConnectionError(f"Connection socket not open on port {self.port}")
|
host=self.host,
|
||||||
|
port=self.port,
|
||||||
|
timeout=self.timeout,
|
||||||
|
)
|
||||||
|
connection_details = f"host {self.host}:{self.port}"
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(f"Modbus method '{self.method}' is not supported.")
|
||||||
|
|
||||||
except FileNotFoundError as e:
|
# Connect and verify the connection
|
||||||
error_message = f"Serial port error: {self.port} not found. Check your configuration."
|
if not self.client.connect():
|
||||||
#self.log.error(error_message, exc_info=True)
|
raise ConnectionError(f"Cannot connect to Modbus slave on {connection_details}")
|
||||||
self.modbus_error_signal.emit(error_message)
|
|
||||||
except Exception as e:
|
if not self.client.is_socket_open():
|
||||||
error_message = f"Configuration error: {str(e)}"
|
raise ConnectionError(f"Connection socket not open for {connection_details}")
|
||||||
#self.log.error(error_message, exc_info=True)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
except (FileNotFoundError, ConnectionRefusedError):
|
||||||
finally:
|
error_message = f"Modbus connection error: {connection_details} not found or connection refused. Check configuration and device status."
|
||||||
self.lock.unlock()
|
self.modbus_error_signal.emit(error_message)
|
||||||
|
except Exception as e:
|
||||||
|
error_message = f"Configuration error: {str(e)}"
|
||||||
|
self.modbus_error_signal.emit(error_message)
|
||||||
|
finally:
|
||||||
|
self.lock.unlock()
|
||||||
|
|
||||||
def _read(self, register, count=1, **kwargs):
|
def _read(self, register, count=1, **kwargs):
|
||||||
"""Read holding registers with error handling."""
|
"""Read holding registers with error handling."""
|
||||||
self.lock.lock()
|
self.lock.lock()
|
||||||
try:
|
try:
|
||||||
|
# Set the default slave unit ID for the transaction if not provided
|
||||||
|
kwargs.setdefault('unit', self.unit)
|
||||||
read = self.client.read_holding_registers(register, count=count, **kwargs)
|
read = self.client.read_holding_registers(register, count=count, **kwargs)
|
||||||
if read.isError():
|
if read.isError():
|
||||||
error_message = f"Modbus read failed: Could not read Modbus register {register}"
|
error_message = f"Modbus read failed: Could not read register {register} from unit {kwargs['unit']}"
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
raise ValueError(f"Modbus read error at register {register}")
|
raise ValueError(f"Modbus read error at register {register}")
|
||||||
return read
|
return read
|
||||||
except pymodbus.exceptions.ConnectionException:
|
except pymodbus.exceptions.ConnectionException:
|
||||||
error_message = f"Modbus read failed: Connection error at port {self.port}"
|
connection_details = self.port if self.method == 'rtu' else self.host
|
||||||
#self.log.error(error_message, exc_info=True)
|
error_message = f"Modbus read failed: Connection error at {connection_details}"
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
return None # Return None to signal failure
|
return None # Return None to signal failure
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error reading Modbus register {register}: {str(e)}"
|
error_message = f"Error reading Modbus register {register}: {str(e)}"
|
||||||
#self.log.error(error_message, exc_info=True)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
return None
|
return None
|
||||||
finally:
|
finally:
|
||||||
|
|
@ -103,19 +129,19 @@ class ModbusComponent(Component):
|
||||||
"""Write to holding registers with error handling."""
|
"""Write to holding registers with error handling."""
|
||||||
self.lock.lock()
|
self.lock.lock()
|
||||||
try:
|
try:
|
||||||
|
# Set the default slave unit ID for the transaction if not provided
|
||||||
|
kwargs.setdefault('unit', self.unit)
|
||||||
wrote = self.client.write_registers(register, value, skip_encode=True, **kwargs)
|
wrote = self.client.write_registers(register, value, skip_encode=True, **kwargs)
|
||||||
|
|
||||||
# Check if the response indicates an error
|
|
||||||
if wrote.isError():
|
if wrote.isError():
|
||||||
raise ValueError(f"Modbus write error at register {register}")
|
raise ValueError(f"Modbus write error at register {register} on unit {kwargs['unit']}")
|
||||||
return wrote
|
return wrote
|
||||||
except pymodbus.exceptions.ConnectionException as ce:
|
except pymodbus.exceptions.ConnectionException:
|
||||||
error_message = f"Modbus write failed: Connection error at port {self.port}"
|
connection_details = self.port if self.method == 'rtu' else self.host
|
||||||
#self.log.error(error_message, exc_info=True)
|
error_message = f"Modbus write failed: Connection error at {connection_details}"
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error writing Modbus register {register} with value {value}: {str(e)}"
|
error_message = f"Error writing Modbus register {register} with value {value}: {str(e)}"
|
||||||
#self.log.error(error_message, exc_info=True)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
finally:
|
finally:
|
||||||
self.lock.unlock()
|
self.lock.unlock()
|
||||||
|
|
@ -124,7 +150,6 @@ class ModbusComponent(Component):
|
||||||
"""Decode data safely."""
|
"""Decode data safely."""
|
||||||
if read is None:
|
if read is None:
|
||||||
error_message = "Error decoding Modbus data: No data to decode (read returned None)"
|
error_message = "Error decoding Modbus data: No data to decode (read returned None)"
|
||||||
#self.log.error(error_message)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
return None
|
return None
|
||||||
try:
|
try:
|
||||||
|
|
@ -136,11 +161,9 @@ class ModbusComponent(Component):
|
||||||
return int(abs(data)) if "uint" in data_type else int(data)
|
return int(abs(data)) if "uint" in data_type else int(data)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
error_message = "Modbus read returned invalid data (NoneType encountered)"
|
error_message = "Modbus read returned invalid data (NoneType encountered)"
|
||||||
#self.log.error(error_message)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error decoding Modbus data: {str(e)}"
|
error_message = f"Error decoding Modbus data: {str(e)}"
|
||||||
#self.log.error(error_message, exc_info=True)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
@ -159,7 +182,6 @@ class ModbusComponent(Component):
|
||||||
return builder.build()
|
return builder.build()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error encoding Modbus data: {str(e)}"
|
error_message = f"Error encoding Modbus data: {str(e)}"
|
||||||
#self.log.error(error_message, exc_info=True)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
@ -181,7 +203,6 @@ class ModbusComponent(Component):
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error inside Modbus read: {str(e)}"
|
error_message = f"Error inside Modbus read: {str(e)}"
|
||||||
#self.log.error(error_message, exc_info=True)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
@ -193,16 +214,15 @@ class ModbusComponent(Component):
|
||||||
self._write(register, encoded_data, **kwargs)
|
self._write(register, encoded_data, **kwargs)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error inside Modbus write: {str(e)}"
|
error_message = f"Error inside Modbus write: {str(e)}"
|
||||||
#self.log.error(error_message, exc_info=True)
|
|
||||||
self.modbus_error_signal.emit(error_message)
|
self.modbus_error_signal.emit(error_message)
|
||||||
|
|
||||||
def __del__(self, event=None):
|
def __del__(self, event=None):
|
||||||
try:
|
try:
|
||||||
self.lock.lock()
|
self.lock.lock()
|
||||||
if self.client.is_socket_open():
|
if self.client and self.client.is_socket_open():
|
||||||
self.client.close()
|
self.client.close()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error during Modbus cleanup: {str(e)}"
|
error_message = f"Error during Modbus cleanup: {str(e)}"
|
||||||
#self.log.error(error_message, exc_info=True)
|
# self.log.error(error_message, exc_info=True)
|
||||||
finally:
|
finally:
|
||||||
self.lock.unlock()
|
self.lock.unlock()
|
||||||
|
|
@ -20,7 +20,15 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
||||||
self.modbus_error_signal.connect(self.handle_modbus_error)
|
self.modbus_error_signal.connect(self.handle_modbus_error)
|
||||||
|
|
||||||
def config_changed(self):
|
def config_changed(self):
|
||||||
super().config_changed()
|
self.log.info("Configuring Tecna Marposs component...")
|
||||||
|
try:
|
||||||
|
super().config_changed()
|
||||||
|
self.log.info("Successfully connected to Modbus device")
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Failed to connect to Modbus device: {str(e)}")
|
||||||
|
# Re-raise to let the reconfig_on_error decorator handle it
|
||||||
|
raise
|
||||||
|
|
||||||
self._store_recipes_signal.connect(self._store_recipes)
|
self._store_recipes_signal.connect(self._store_recipes)
|
||||||
self.model = self.config[self.name]["model"].lower()
|
self.model = self.config[self.name]["model"].lower()
|
||||||
if self.model == "t3p":
|
if self.model == "t3p":
|
||||||
|
|
@ -29,18 +37,25 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
||||||
self.registers = t3l_registers
|
self.registers = t3l_registers
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.")
|
raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.")
|
||||||
|
|
||||||
|
self.log.info("Setting measure units...")
|
||||||
self.set_measure_units()
|
self.set_measure_units()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
self.log.info("Getting measure units and configuration...")
|
||||||
self.units = self.get_measure_units()
|
self.units = self.get_measure_units()
|
||||||
self.max_program_number = self.read("Max number of programs")
|
self.max_program_number = self.read("Max number of programs")
|
||||||
self.saver_label_count = min(abs(int(self.config[self.name].get("saver_label_count", 1))), 0b1111)
|
self.saver_label_count = min(abs(int(self.config[self.name].get("saver_label_count", 1))), 0b1111)
|
||||||
self.saver_print_on_fail = 1 if self.config[self.name].get("saver_print_on_fail", "no").lower() in {"yes", "y", "on", "true", "1", "x"} else 0
|
self.saver_print_on_fail = 1 if self.config[self.name].get("saver_print_on_fail", "no").lower() in {"yes", "y", "on", "true", "1", "x"} else 0
|
||||||
self.saver_label_template = min(abs(int(self.config[self.name].get("saver_label_template", 1))), 0b11111111)
|
self.saver_label_template = min(abs(int(self.config[self.name].get("saver_label_template", 1))), 0b11111111)
|
||||||
self.model = self.config[self.name]["model"].lower()
|
self.model = self.config[self.name]["model"].lower()
|
||||||
self.log.info(f"units: {self.units}")
|
self.log.info(f"Configuration complete. Units: {self.units}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error during config_changed operation: {str(e)}"
|
error_message = f"Error during config_changed operation: {str(e)}"
|
||||||
|
self.log.error(error_message)
|
||||||
self.tecna_error_signal.emit(True, error_message)
|
self.tecna_error_signal.emit(True, error_message)
|
||||||
|
# Re-raise to let the reconfig_on_error decorator handle it
|
||||||
|
raise
|
||||||
|
|
||||||
_pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar)
|
_pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar)
|
||||||
_leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, }
|
_leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, }
|
||||||
|
|
@ -122,7 +137,10 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
||||||
if formatting is not None:
|
if formatting is not None:
|
||||||
# units = self.units[formatting]
|
# units = self.units[formatting]
|
||||||
# data = [data * units[0], units[1]]
|
# data = [data * units[0], units[1]]
|
||||||
data = data * self.units[formatting][0]
|
if data is not None:
|
||||||
|
data = data * self.units[formatting][0]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def _convert_to_format(self, data, formatting=None, encoding_map=None):
|
def _convert_to_format(self, data, formatting=None, encoding_map=None):
|
||||||
|
|
@ -176,7 +194,7 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
||||||
self.tecna_error_signal.emit(True, error_message)
|
self.tecna_error_signal.emit(True, error_message)
|
||||||
raise # Re-raise the exception for further upstream handling if needed
|
raise # Re-raise the exception for further upstream handling if needed
|
||||||
|
|
||||||
@Component.reconfig_on_error
|
# @Component.reconfig_on_error
|
||||||
@Component.reconfig_on_error
|
@Component.reconfig_on_error
|
||||||
def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None,
|
def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
|
|
@ -254,12 +272,21 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
||||||
if round_me in info.keys():
|
if round_me in info.keys():
|
||||||
info.update({round_me: float(f"{info[round_me]:.2f}")})
|
info.update({round_me: float(f"{info[round_me]:.2f}")})
|
||||||
self.log.debug(str(info))
|
self.log.debug(str(info))
|
||||||
|
|
||||||
|
# If we got here, the connection is working
|
||||||
|
self._ready = True
|
||||||
super()._get([info])
|
super()._get([info])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error during _get operation: {str(e)}"
|
error_message = f"Error during _get operation: {str(e)}"
|
||||||
self.log.error(error_message, exc_info=True)
|
self.log.error(error_message, exc_info=True)
|
||||||
self.tecna_error_signal.emit(True, error_message)
|
self.tecna_error_signal.emit(True, error_message)
|
||||||
raise # Re-raise the exception for further upstream handling
|
|
||||||
|
# Mark component as not ready to trigger reconnection on next get call
|
||||||
|
self._ready = False
|
||||||
|
|
||||||
|
# Don't raise the exception, let the periodic get call handle reconnection
|
||||||
|
# The next call to get will attempt to reconfigure the component
|
||||||
|
# This avoids using while loops or explicit reconnection attempts
|
||||||
|
|
||||||
@Component.reconfig_on_error
|
@Component.reconfig_on_error
|
||||||
def _set(self, val=None):
|
def _set(self, val=None):
|
||||||
|
|
@ -302,8 +329,16 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
||||||
recipe_barcode = f"j{recipe.part_number}"[:16].encode("ascii")
|
recipe_barcode = f"j{recipe.part_number}"[:16].encode("ascii")
|
||||||
recipe_barcode += b"\x00" * (24 - len(recipe_barcode))
|
recipe_barcode += b"\x00" * (24 - len(recipe_barcode))
|
||||||
test_flags = 0b0110100001010000 if (step.spec.get("autotest", False) in ["ko_check"]) else 0b0110000001010000
|
test_flags = 0b0110100001010000 if (step.spec.get("autotest", False) in ["ko_check"]) else 0b0110000001010000
|
||||||
pid_mode = int(self.config["recipes_defaults"]["pid_mode"])<<4
|
pid_mode_text = step.spec.get("pid_mod_config","AUTO") # Get the selected text from the combobox
|
||||||
test_flags = test_flags | pid_mode
|
pid_mode_value = { # Mapping of text to numeric values
|
||||||
|
"AUTO": 5,
|
||||||
|
"FAST": 0,
|
||||||
|
"MEDIUM": 1,
|
||||||
|
"SLOW": 2,
|
||||||
|
}.get(pid_mode_text, 5)
|
||||||
|
|
||||||
|
test_flags = (test_flags & ~(7 << 4)) | (pid_mode_value << 4)
|
||||||
|
|
||||||
pid_ramps=0b0000000000000000 | int(self.config["recipes_defaults"]["pid_level"])<<8 | int(self.config["recipes_defaults"]["pid_speed"])<<12
|
pid_ramps=0b0000000000000000 | int(self.config["recipes_defaults"]["pid_level"])<<8 | int(self.config["recipes_defaults"]["pid_speed"])<<12
|
||||||
spec = {
|
spec = {
|
||||||
"Flag: Instrument settings": 0b0000000000000000,
|
"Flag: Instrument settings": 0b0000000000000000,
|
||||||
|
|
@ -330,7 +365,7 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
||||||
"PSQ - Next sequence program PSOUT mode": 0,
|
"PSQ - Next sequence program PSOUT mode": 0,
|
||||||
"RAMPS: T1 configuration": pid_ramps,
|
"RAMPS: T1 configuration": pid_ramps,
|
||||||
"PID: pressure correction": step.spec.get("pid_pressure_correction",0),
|
"PID: pressure correction": step.spec.get("pid_pressure_correction",0),
|
||||||
"Various flags": 0b0000000000010000 if self.config["recipes_defaults"]["tester_discharge_enable"] == "yes" else 0b0000000000000000
|
"Various flags": 0b0000000000010000 if self.config["recipes_defaults"]["tester_discharge_enable"] in ("yes", "x") else 0b0000000000000000
|
||||||
|
|
||||||
}
|
}
|
||||||
if self.model == "t3p":
|
if self.model == "t3p":
|
||||||
|
|
@ -488,8 +523,19 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
||||||
# Emit the Tecna-specific error signal with True and the error message
|
# Emit the Tecna-specific error signal with True and the error message
|
||||||
#self.tecna_error_signal.emit(True, error_message)
|
#self.tecna_error_signal.emit(True, error_message)
|
||||||
self.log.error(f"Modbus error encountered: {error_message}")
|
self.log.error(f"Modbus error encountered: {error_message}")
|
||||||
|
|
||||||
|
# Mark component as not ready to trigger reconnection on next get call
|
||||||
|
# This is especially important for connection errors
|
||||||
|
if "Connection error" in error_message:
|
||||||
|
self.log.warning("Connection error detected, marking component as not ready for reconnection")
|
||||||
|
self._ready = False
|
||||||
|
|
||||||
return True, error_message
|
return True, error_message
|
||||||
else:
|
else:
|
||||||
# Emit no error (False)
|
# Emit no error (False)
|
||||||
self.tecna_error_signal.emit(False, "")
|
self.tecna_error_signal.emit(False, "")
|
||||||
return False, None
|
|
||||||
|
# If no error, the connection is working
|
||||||
|
self._ready = True
|
||||||
|
|
||||||
|
return False, None
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ class Test_Leak(Test_Test):
|
||||||
self.show_instruction_b.setVisible("show_instructions" in self.parent.config["hardware_config"].keys())
|
self.show_instruction_b.setVisible("show_instructions" in self.parent.config["hardware_config"].keys())
|
||||||
self.show_instruction_b.clicked.connect(self.show_instruction)
|
self.show_instruction_b.clicked.connect(self.show_instruction)
|
||||||
|
|
||||||
self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error)
|
#self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error)
|
||||||
|
|
||||||
def show_instruction(self):
|
def show_instruction(self):
|
||||||
dialog=Dialog()
|
dialog=Dialog()
|
||||||
|
|
@ -91,6 +91,10 @@ class Test_Leak(Test_Test):
|
||||||
self.simulate = True
|
self.simulate = True
|
||||||
else:
|
else:
|
||||||
self.simulate = False
|
self.simulate = False
|
||||||
|
if "--autostart" in sys.argv:
|
||||||
|
self.start_b.setEnabled(True)
|
||||||
|
self.start_b.click()
|
||||||
|
|
||||||
# /TESTING
|
# /TESTING
|
||||||
show = super().start(recipe=recipe, step=step, pieces=pieces)
|
show = super().start(recipe=recipe, step=step, pieces=pieces)
|
||||||
if show is False:
|
if show is False:
|
||||||
|
|
@ -103,13 +107,17 @@ class Test_Leak(Test_Test):
|
||||||
self.test_num_l.setText("2/2")
|
self.test_num_l.setText("2/2")
|
||||||
else:
|
else:
|
||||||
self.test_num_l.setText("1/1")
|
self.test_num_l.setText("1/1")
|
||||||
|
if self.step.spec.get("autotest", False):
|
||||||
|
self.template_print_l.setText(f"AUTOTEST")
|
||||||
|
else:
|
||||||
|
self.template_print_l.setText(f"{self.parent.print_template}")
|
||||||
self.recipe_pressure_l.setText(f"{self.step.spec['test_pressure']}")
|
self.recipe_pressure_l.setText(f"{self.step.spec['test_pressure']}")
|
||||||
self.leak_min_l.setText(f"{self.step.spec['test_pressure_qneg']}")
|
self.leak_min_l.setText(f"{self.step.spec['test_pressure_qneg']}")
|
||||||
self.leak_max_l.setText(f"{self.step.spec['test_pressure_qpos']}")
|
self.leak_max_l.setText(f"{self.step.spec['test_pressure_qpos']}")
|
||||||
self.fill_time_l.setText(f"{self.step.spec['filling_time']}")
|
self.fill_time_l.setText(f"{self.step.spec['filling_time']}")
|
||||||
self.settle_time_l.setText(f"{self.step.spec['settling_time']}")
|
self.settle_time_l.setText(f"{self.step.spec['settling_time']}")
|
||||||
self.meas_time_l.setText(f"{self.step.spec['test_time']}")
|
self.meas_time_l.setText(f"{self.step.spec['test_time']}")
|
||||||
|
self.valore_PID_l.setText(f"{self.step.spec.get('pid_pressure_correction','NA')}")
|
||||||
|
|
||||||
# SETUP TEST LOOP
|
# SETUP TEST LOOP
|
||||||
if self.step.spec.get("autotest", False): # IF AUTOTESTING UPLOAD RECIPE EVERY TIME
|
if self.step.spec.get("autotest", False): # IF AUTOTESTING UPLOAD RECIPE EVERY TIME
|
||||||
|
|
@ -130,17 +138,22 @@ class Test_Leak(Test_Test):
|
||||||
self.stop_b.setEnabled(False)
|
self.stop_b.setEnabled(False)
|
||||||
|
|
||||||
if self.step.spec.get("autotest", False) == "ok_check":
|
if self.step.spec.get("autotest", False) == "ok_check":
|
||||||
|
self.template_print_l.setVisible(False)
|
||||||
|
self.template_label.setVisible(False)
|
||||||
self.display_text(text="AUTOTEST: RIMUOVERE FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA",
|
self.display_text(text="AUTOTEST: RIMUOVERE FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA",
|
||||||
bg_color="blue", text_color="white")
|
bg_color="blue", text_color="white")
|
||||||
super().visualize(None, img=self.status_imgs_full["calibrated-leak-remove"])
|
super().visualize(None, img=self.status_imgs_full["calibrated-leak-remove"])
|
||||||
elif self.step.spec.get("autotest", False) == "ko_check":
|
elif self.step.spec.get("autotest", False) == "ko_check":
|
||||||
|
self.template_print_l.setVisible(False)
|
||||||
|
self.template_label.setVisible(False)
|
||||||
self.display_text(
|
self.display_text(
|
||||||
text="AUTOTEST: COLLEGARE TUBO-TUBO + FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA DI PROVA",
|
text="AUTOTEST: COLLEGARE TUBO-TUBO + FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA DI PROVA",
|
||||||
bg_color="blue", text_color="white")
|
bg_color="blue", text_color="white")
|
||||||
super().visualize(None, img=self.status_imgs_full["calibrated-leak"])
|
super().visualize(None, img=self.status_imgs_full["calibrated-leak"])
|
||||||
else:
|
else:
|
||||||
self.display_text(text="COLLEGARE GLI ATTACCHI PNEUMATICI E PREMERE START PER INIZIARE LA PROVA TENUTA")
|
self.display_text(text="COLLEGARE GLI ATTACCHI PNEUMATICI E PREMERE START PER INIZIARE LA PROVA TENUTA")
|
||||||
|
self.template_print_l.setVisible(True)
|
||||||
|
self.template_label.setVisible(True)
|
||||||
if self.simulate:
|
if self.simulate:
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user