erro handling tecna t3 sim ok tbt on production
This commit is contained in:
parent
a4a2b3a94e
commit
e28a0aa46c
|
|
@ -2,6 +2,7 @@ import sys
|
|||
import traceback
|
||||
import warnings
|
||||
|
||||
import pymodbus
|
||||
from PyQt5.QtWidgets import QMessageBox
|
||||
|
||||
if "--sim-serial" in sys.argv:
|
||||
|
|
@ -18,113 +19,188 @@ if "--sim-modbus" not in sys.argv:
|
|||
else:
|
||||
from components.dummies.pymodbus import ModbusClient
|
||||
|
||||
from PyQt5.QtCore import QMutex
|
||||
from PyQt5.QtCore import QMutex, pyqtSignal
|
||||
|
||||
from .component import Component
|
||||
|
||||
|
||||
class ModbusComponent(Component):
|
||||
modbus_error_signal = pyqtSignal(str)
|
||||
|
||||
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True, registers=None):
|
||||
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
|
||||
self.registers = registers if registers is not None else {}
|
||||
self.lock = QMutex()
|
||||
|
||||
def config_changed(self):
|
||||
self.method = self.config[self.name].get("method", "rtu").lower()
|
||||
self.port = self.config[self.name]["port"]
|
||||
self.baudrate = int(self.config[self.name]["baudrate"])
|
||||
self.stopbits = getattr(serial, self.config[self.name].get("stopbits", "stopbits_one").upper())
|
||||
self.parity = getattr(serial, self.config[self.name].get("parity", "parity_none").upper())
|
||||
self.bytesize = getattr(serial, self.config[self.name].get("bytesize", "eightbits").upper())
|
||||
self.byteorder = getattr(Endian, self.config[self.name].get("byteorder", "Big").upper())
|
||||
self.wordorder = getattr(Endian, self.config[self.name].get("wordorder", "Little").upper())
|
||||
self.timeout = int(self.config[self.name].get("timeout", 1))
|
||||
try:
|
||||
# Read configuration values
|
||||
self.method = self.config[self.name].get("method", "rtu").lower()
|
||||
self.port = self.config[self.name]["port"]
|
||||
self.baudrate = int(self.config[self.name]["baudrate"])
|
||||
self.stopbits = getattr(serial, self.config[self.name].get("stopbits", "stopbits_one").upper())
|
||||
self.parity = getattr(serial, self.config[self.name].get("parity", "parity_none").upper())
|
||||
self.bytesize = getattr(serial, self.config[self.name].get("bytesize", "eightbits").upper())
|
||||
self.byteorder = getattr(Endian, self.config[self.name].get("byteorder", "Big").upper())
|
||||
self.wordorder = getattr(Endian, self.config[self.name].get("wordorder", "Little").upper())
|
||||
self.timeout = int(self.config[self.name].get("timeout", 1))
|
||||
|
||||
# Lock the interaction to ensure thread safety
|
||||
self.lock.lock()
|
||||
try:
|
||||
# Initialize the Modbus client
|
||||
self.client = ModbusClient(
|
||||
method=self.method,
|
||||
port=self.port,
|
||||
stopbits=self.stopbits,
|
||||
bytesize=self.bytesize,
|
||||
parity=self.parity,
|
||||
baudrate=self.baudrate,
|
||||
timeout=self.timeout,
|
||||
strict=False,
|
||||
)
|
||||
if not self.client.connect():
|
||||
raise ConnectionError(f"Cannot connect to Modbus on port {self.port}")
|
||||
|
||||
if not self.client.is_socket_open():
|
||||
raise ConnectionError(f"Connection socket not open on port {self.port}")
|
||||
|
||||
finally:
|
||||
self.lock.unlock()
|
||||
except FileNotFoundError as e:
|
||||
error_message = f"Serial port error: {self.port} not found. Check your configuration."
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
except Exception as e:
|
||||
error_message = f"Configuration error: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
|
||||
def _read(self, register, count=1, **kwargs):
|
||||
"""Read holding registers with error handling."""
|
||||
self.lock.lock()
|
||||
self.client = ModbusClient(
|
||||
method=self.method,
|
||||
port=self.port,
|
||||
stopbits=self.stopbits,
|
||||
bytesize=self.bytesize,
|
||||
parity=self.parity,
|
||||
baudrate=self.baudrate,
|
||||
timeout=self.timeout,
|
||||
strict=False,
|
||||
)
|
||||
if not self.client.connect():
|
||||
QMessageBox.critical(None, "ERRORE", f"ERRORE MODBUS - VERIFICARE CONNESSIONE USB")
|
||||
exit(-1)
|
||||
#raise ConnectionError("device not reachable (could not connect): {} ({})".format(self.name, self.port))
|
||||
|
||||
if not self.client.is_socket_open():
|
||||
raise ConnectionError("device not reachable (socket not open): {} ({})".format(self.name, self.port))
|
||||
self.lock.unlock()
|
||||
|
||||
def _read(self, register, count=1,**kwargs):
|
||||
while True:
|
||||
self.lock.lock()
|
||||
read = self.client.read_holding_registers(register, count=count,**kwargs)
|
||||
self.lock.unlock()
|
||||
try:
|
||||
read = self.client.read_holding_registers(register, count=count, **kwargs)
|
||||
if read.isError():
|
||||
self.log.exception(traceback.format_exception(type(read), read, read.__traceback__))
|
||||
# raise read
|
||||
else:
|
||||
break
|
||||
return read
|
||||
|
||||
def _write(self, register, value,**kwargs):
|
||||
while True:
|
||||
self.lock.lock()
|
||||
wrote = self.client.write_registers(register, value, skip_encode=True,**kwargs)
|
||||
raise ValueError(f"Modbus read error at register {register}")
|
||||
return read
|
||||
except pymodbus.exceptions.ConnectionException:
|
||||
error_message = f"Modbus read failed: Connection error at port {self.port}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
return None # Return None to signal failure
|
||||
except Exception as e:
|
||||
error_message = f"Error reading Modbus register {register}: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
return None
|
||||
finally:
|
||||
self.lock.unlock()
|
||||
|
||||
def _write(self, register, value, **kwargs):
|
||||
"""Write to holding registers with error handling."""
|
||||
self.lock.lock()
|
||||
try:
|
||||
wrote = self.client.write_registers(register, value, skip_encode=True, **kwargs)
|
||||
|
||||
# Check if the response indicates an error
|
||||
if wrote.isError():
|
||||
self.log.exception(traceback.format_exception(type(wrote), wrote, wrote.__traceback__))
|
||||
# raise wrote
|
||||
else:
|
||||
break
|
||||
raise ValueError(f"Modbus write error at register {register}")
|
||||
return wrote
|
||||
except pymodbus.exceptions.ConnectionException as ce:
|
||||
error_message = f"Modbus write failed: Connection error at port {self.port}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
except Exception as e:
|
||||
error_message = f"Error writing Modbus register {register} with value {value}: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
finally:
|
||||
self.lock.unlock()
|
||||
|
||||
def _decode(self, read, *args, data_type="16bit_uint", gain=1, offset=0, **kwargs):
|
||||
decoder = BinaryPayloadDecoder.fromRegisters(read.registers, byteorder=self.byteorder, wordorder=self.wordorder)
|
||||
data = getattr(decoder, f"decode_{data_type}")(*args, **kwargs)
|
||||
data = (data - offset) / gain
|
||||
if data_type.endswith("uint"):
|
||||
data = int(abs(data))
|
||||
elif data_type.endswith("int"):
|
||||
data = int(data)
|
||||
else:
|
||||
raise NotImplementedError(f"data_type {data_type!r} is not supported")
|
||||
return data
|
||||
"""Decode data safely."""
|
||||
if read is None:
|
||||
error_message = "Error decoding Modbus data: No data to decode (read returned None)"
|
||||
self.log.error(error_message)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
return None
|
||||
try:
|
||||
decoder = BinaryPayloadDecoder.fromRegisters(
|
||||
read.registers, byteorder=self.byteorder, wordorder=self.wordorder
|
||||
)
|
||||
data = getattr(decoder, f"decode_{data_type}")(*args, **kwargs)
|
||||
data = (data - offset) / gain
|
||||
return int(abs(data)) if "uint" in data_type else int(data)
|
||||
except AttributeError:
|
||||
error_message = "Modbus read returned invalid data (NoneType encountered)"
|
||||
self.log.error(error_message)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
except Exception as e:
|
||||
error_message = f"Error decoding Modbus data: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
return None
|
||||
|
||||
def _encode(self, data, *args, data_type="16bit_uint", gain=1, offset=0, **kwargs):
|
||||
builder = BinaryPayloadBuilder(byteorder=self.byteorder, wordorder=self.wordorder)
|
||||
data = data * gain + offset
|
||||
if data_type.endswith("uint"):
|
||||
data = int(abs(data))
|
||||
elif data_type.endswith("int"):
|
||||
data = int(data)
|
||||
else:
|
||||
raise NotImplementedError(f"data_type {data_type!r} is not supported")
|
||||
getattr(builder, f"add_{data_type}")(data, *args, **kwargs)
|
||||
return builder.build()
|
||||
"""Encode data for Modbus write with error handling."""
|
||||
try:
|
||||
builder = BinaryPayloadBuilder(byteorder=self.byteorder, wordorder=self.wordorder)
|
||||
data = data * gain + offset
|
||||
if data_type.endswith("uint"):
|
||||
data = int(abs(data))
|
||||
elif data_type.endswith("int"):
|
||||
data = int(data)
|
||||
else:
|
||||
raise NotImplementedError(f"Data type {data_type!r} is not supported")
|
||||
getattr(builder, f"add_{data_type}")(data, *args, **kwargs)
|
||||
return builder.build()
|
||||
except Exception as e:
|
||||
error_message = f"Error encoding Modbus data: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
return None
|
||||
|
||||
def read(self, register, *args, data_type="16bit_uint", gain=1, offset=0, **kwargs):
|
||||
if data_type.startswith("16bit_"):
|
||||
count = 1
|
||||
elif data_type.startswith("32bit_"):
|
||||
count = 2
|
||||
else:
|
||||
raise NotImplementedError(f"data_type {data_type!r} is not supported")
|
||||
|
||||
return self._decode(self._read(register, count=count, **kwargs), *args, data_type=data_type, gain=gain,offset=offset)
|
||||
"""Read and decode Modbus register data with error handling."""
|
||||
try:
|
||||
if data_type.startswith("16bit_"):
|
||||
count = 1
|
||||
elif data_type.startswith("32bit_"):
|
||||
count = 2
|
||||
else:
|
||||
raise NotImplementedError(f"Data type {data_type!r} is not supported")
|
||||
return self._decode(
|
||||
self._read(register, count=count, **kwargs),
|
||||
*args,
|
||||
data_type=data_type,
|
||||
gain=gain,
|
||||
offset=offset,
|
||||
)
|
||||
except Exception as e:
|
||||
error_message = f"Error inside Modbus read: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
return None
|
||||
|
||||
def write(self, register, data, *args, data_type="16bit_uint", gain=1, offset=0, **kwargs):
|
||||
self._write(register, self._encode(data, *args, data_type=data_type, gain=gain, offset=offset),**kwargs)
|
||||
|
||||
# def _get(self, data):
|
||||
# # print("MODBUS", str(int(QThread.currentThreadId())), flush=True)
|
||||
# super()._get(data)
|
||||
"""Encode and write data to Modbus registers with error handling."""
|
||||
try:
|
||||
encoded_data = self._encode(data, *args, data_type=data_type, gain=gain, offset=offset)
|
||||
if encoded_data is not None:
|
||||
self._write(register, encoded_data, **kwargs)
|
||||
except Exception as e:
|
||||
error_message = f"Error inside Modbus write: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.modbus_error_signal.emit(error_message)
|
||||
|
||||
def __del__(self, event=None):
|
||||
self.lock.lock()
|
||||
if self.client.is_socket_open():
|
||||
self.client.close()
|
||||
self.lock.unlock()
|
||||
try:
|
||||
self.lock.lock()
|
||||
if self.client.is_socket_open():
|
||||
self.client.close()
|
||||
except Exception as e:
|
||||
error_message = f"Error during Modbus cleanup: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
finally:
|
||||
self.lock.unlock()
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
from lib.db import Recipes, db
|
||||
from PyQt5.QtCore import QSemaphore, pyqtSignal
|
||||
from PyQt5.QtCore import QSemaphore, pyqtSignal, pyqtSlot
|
||||
from PyQt5.QtWidgets import QMessageBox
|
||||
|
||||
from .component import Component
|
||||
|
|
@ -11,9 +11,11 @@ from .tecna_marposs_provaset_t3p_registers import registers as t3p_registers
|
|||
class TecnaMarpossProvasetT3(ModbusComponent):
|
||||
_store_recipes_signal = pyqtSignal(object)
|
||||
_store_recipes_lock = QSemaphore(0)
|
||||
tecna_error_signal = pyqtSignal(bool, str) # Emits (True, error_message) if error exists, else (False, "")
|
||||
|
||||
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
|
||||
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=None)
|
||||
self.modbus_error_signal.connect(self.handle_modbus_error)
|
||||
|
||||
def config_changed(self):
|
||||
super().config_changed()
|
||||
|
|
@ -125,111 +127,133 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
|||
return data
|
||||
|
||||
@Component.reconfig_on_error
|
||||
def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None, **kwargs):
|
||||
if type(register) is str:
|
||||
register, s = self.registers[register]
|
||||
def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None,
|
||||
**kwargs):
|
||||
try:
|
||||
if type(register) is str:
|
||||
register, s = self.registers[register]
|
||||
if data_type is None:
|
||||
data_type = s.get("dt", None)
|
||||
if gain is None:
|
||||
gain = s.get("g", None)
|
||||
if offset is None:
|
||||
offset = s.get("o", None)
|
||||
if formatting is None:
|
||||
formatting = s.get("f", None)
|
||||
if decoding_map is None:
|
||||
decoding_map = s.get("decoding", None)
|
||||
if not len(args):
|
||||
args = s.get("a", [])
|
||||
if not len(kwargs):
|
||||
kwargs = s.get("k", {})
|
||||
if data_type is None:
|
||||
data_type = s.get("dt", None)
|
||||
data_type = "16bit_uint"
|
||||
if gain is None:
|
||||
gain = s.get("g", None)
|
||||
gain = 1
|
||||
if offset is None:
|
||||
offset = s.get("o", None)
|
||||
if formatting is None:
|
||||
formatting = s.get("f", None)
|
||||
if decoding_map is None:
|
||||
decoding_map = s.get("decoding", None)
|
||||
if not len(args):
|
||||
args = s.get("a", [])
|
||||
if not len(kwargs):
|
||||
kwargs = s.get("k", {})
|
||||
if data_type is None:
|
||||
data_type = "16bit_uint"
|
||||
if gain is None:
|
||||
gain = 1
|
||||
if offset is None:
|
||||
offset = 0
|
||||
return self._convert_from_format(
|
||||
super().read(
|
||||
offset = 0
|
||||
return self._convert_from_format(
|
||||
super().read(
|
||||
register,
|
||||
*args,
|
||||
data_type=data_type,
|
||||
gain=gain,
|
||||
offset=offset,
|
||||
**kwargs,
|
||||
),
|
||||
formatting=formatting,
|
||||
decoding_map=decoding_map,
|
||||
)
|
||||
except Exception as e:
|
||||
error_message = f"Error during read operation on register {register}: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.tecna_error_signal.emit(True, error_message)
|
||||
raise # Re-raise the exception for further upstream handling if needed
|
||||
|
||||
@Component.reconfig_on_error
|
||||
@Component.reconfig_on_error
|
||||
def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None,
|
||||
**kwargs):
|
||||
try:
|
||||
if type(register) is str:
|
||||
register, s = self.registers[register]
|
||||
if data_type is None:
|
||||
data_type = s.get("dt", None)
|
||||
if gain is None:
|
||||
gain = s.get("g", None)
|
||||
if offset is None:
|
||||
offset = s.get("o", None)
|
||||
if formatting is None:
|
||||
formatting = s.get("f", None)
|
||||
if encoding_map is None:
|
||||
encoding_map = s.get("encoding", None)
|
||||
if not len(args):
|
||||
args = s.get("a", [])
|
||||
if not len(kwargs):
|
||||
kwargs = s.get("k", {})
|
||||
if data_type is None:
|
||||
data_type = "16bit_uint"
|
||||
if gain is None:
|
||||
gain = 1
|
||||
if offset is None:
|
||||
offset = 0
|
||||
return super().write(
|
||||
register,
|
||||
self._convert_to_format(
|
||||
data,
|
||||
formatting=formatting,
|
||||
encoding_map=encoding_map,
|
||||
),
|
||||
*args,
|
||||
data_type=data_type,
|
||||
gain=gain,
|
||||
offset=offset,
|
||||
**kwargs,
|
||||
),
|
||||
formatting=formatting,
|
||||
decoding_map=decoding_map,
|
||||
)
|
||||
|
||||
@Component.reconfig_on_error
|
||||
def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None, **kwargs):
|
||||
if type(register) is str:
|
||||
register, s = self.registers[register]
|
||||
if data_type is None:
|
||||
data_type = s.get("dt", None)
|
||||
if gain is None:
|
||||
gain = s.get("g", None)
|
||||
if offset is None:
|
||||
offset = s.get("o", None)
|
||||
if formatting is None:
|
||||
formatting = s.get("f", None)
|
||||
if encoding_map is None:
|
||||
encoding_map = s.get("encoding", None)
|
||||
if not len(args):
|
||||
args = s.get("a", [])
|
||||
if not len(kwargs):
|
||||
kwargs = s.get("k", {})
|
||||
if data_type is None:
|
||||
data_type = "16bit_uint"
|
||||
if gain is None:
|
||||
gain = 1
|
||||
if offset is None:
|
||||
offset = 0
|
||||
return super().write(
|
||||
register,
|
||||
self._convert_to_format(
|
||||
data,
|
||||
formatting=formatting,
|
||||
encoding_map=encoding_map,
|
||||
),
|
||||
*args,
|
||||
data_type=data_type,
|
||||
gain=gain,
|
||||
offset=offset,
|
||||
**kwargs,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
error_message = f"Error during write operation on register {register} with data {data}: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.tecna_error_signal.emit(True, error_message)
|
||||
raise # Re-raise the exception for further upstream handling if needed
|
||||
|
||||
@Component.reconfig_on_error
|
||||
def _get(self):
|
||||
# print("TECNA", str(int(QThread.currentThreadId())), flush=True)
|
||||
# READ INFO
|
||||
info = {r: self.read(r) for r in [
|
||||
"Real time test pressure output",
|
||||
"Real time differential pressure output",
|
||||
"Real time pressure line regulator",
|
||||
"Active alarm flags",
|
||||
"Active test program number",
|
||||
"Running test: active phase",
|
||||
"Running test: test type",
|
||||
"Running test: sequence index",
|
||||
"Digital inputs status (mask)",
|
||||
# "Digital outputs status (mask)",
|
||||
]}
|
||||
if self.model == "t3p":
|
||||
pass
|
||||
elif self.model == "t3l":
|
||||
info.update({r: self.read(r) for r in [
|
||||
"Active not severe alarm flags",
|
||||
]})
|
||||
else:
|
||||
raise NotImplementedError(f"Tecna t3 model {self.model!r} not implemented.")
|
||||
if info["Running test: active phase"] == "FINE TEST": # "END TEST, WAITING THE START OF A NEW TEST":
|
||||
info.update(self.get_test_results())
|
||||
for round_me in ["measured leak"]:
|
||||
if round_me in info.keys():
|
||||
info.update({round_me:float(f"{info[round_me]:.2f}")})
|
||||
self.log.debug(str(info))
|
||||
super()._get([info])
|
||||
try:
|
||||
# READ INFO
|
||||
info = {r: self.read(r) for r in [
|
||||
"Real time test pressure output",
|
||||
"Real time differential pressure output",
|
||||
"Real time pressure line regulator",
|
||||
"Active alarm flags",
|
||||
"Active test program number",
|
||||
"Running test: active phase",
|
||||
"Running test: test type",
|
||||
"Running test: sequence index",
|
||||
"Digital inputs status (mask)",
|
||||
# "Digital outputs status (mask)",
|
||||
]}
|
||||
if self.model == "t3p":
|
||||
pass
|
||||
elif self.model == "t3l":
|
||||
info.update({r: self.read(r) for r in [
|
||||
"Active not severe alarm flags",
|
||||
]})
|
||||
else:
|
||||
raise NotImplementedError(f"Tecna t3 model {self.model!r} not implemented.")
|
||||
|
||||
if info["Running test: active phase"] == "FINE TEST": # "END TEST, WAITING THE START OF A NEW TEST":
|
||||
info.update(self.get_test_results())
|
||||
|
||||
for round_me in ["measured leak"]:
|
||||
if round_me in info.keys():
|
||||
info.update({round_me: float(f"{info[round_me]:.2f}")})
|
||||
self.log.debug(str(info))
|
||||
super()._get([info])
|
||||
except Exception as e:
|
||||
error_message = f"Error during _get operation: {str(e)}"
|
||||
self.log.error(error_message, exc_info=True)
|
||||
self.tecna_error_signal.emit(True, error_message)
|
||||
raise # Re-raise the exception for further upstream handling
|
||||
|
||||
@Component.reconfig_on_error
|
||||
def _set(self, val=None):
|
||||
|
|
@ -446,3 +470,19 @@ class TecnaMarpossProvasetT3(ModbusComponent):
|
|||
break
|
||||
result+=char2
|
||||
return result
|
||||
|
||||
@pyqtSlot(str)
|
||||
def handle_modbus_error(self, error_message):
|
||||
"""
|
||||
Handle errors received from the ModbusComponent and send
|
||||
appropriate error signals for Tecna Marposs.
|
||||
"""
|
||||
if error_message:
|
||||
# Emit the Tecna-specific error signal with True and traceback
|
||||
self.tecna_error_signal.emit(True, error_message)
|
||||
#print(f"{error_message}-Tecna Marposs")
|
||||
return True, error_message
|
||||
else:
|
||||
# Emit no error (False)
|
||||
self.tecna_error_signal.emit(False, "")
|
||||
return False, None
|
||||
|
|
@ -121,7 +121,7 @@ try:
|
|||
"fixture_id": {"c": RFID_PN532, "k": {"paused": False, "lazy": False}},
|
||||
}
|
||||
# VISION COMPONENT IS OPTIONAL AND DISABLED BY DEFAULT
|
||||
if "--vision" in sys.argv:
|
||||
if self.config["hardware_config"]["vision"]== "present":
|
||||
# merge dicts
|
||||
self.components_specs = {**self.components_specs,
|
||||
**{
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ from ui.test_vision import Test_Vision
|
|||
from ui.test_warning_img import Test_Warning_Img
|
||||
from ui.widget import Widget
|
||||
from components import ArchiveSynchronizer
|
||||
from src.components.rfid_pn532 import RFID_PN532
|
||||
|
||||
|
||||
class Test(Widget):
|
||||
|
|
@ -58,17 +57,26 @@ class Test(Widget):
|
|||
else:
|
||||
self.flag_label.setVisible(False)
|
||||
|
||||
self.active_errors = [] # List to hold current errors (type: tuples of (message, is_error))
|
||||
self.current_error_index = 0 # Keeps track of the current error index during alternation
|
||||
|
||||
# Timer for alternating errors
|
||||
self.error_timer = QTimer()
|
||||
self.error_timer.setInterval(2000) # Fire every 2 seconds
|
||||
self.error_timer.timeout.connect(self.display_current_error) # Connect the timer to the display logic
|
||||
|
||||
# SHOW AND UPDATE TIME CLOCK
|
||||
self.refresh_time(init=True)
|
||||
# INIT RECIPE
|
||||
self.recipe = None
|
||||
self.auto_import = Recipe_Selection
|
||||
self.archive_synch = ArchiveSynchronizer(config=config)
|
||||
|
||||
self.rfid = RFID_PN532(config=config)
|
||||
self.rfid = self.components["fixture_id"]
|
||||
self.tecna = self.components["tecna_t3"]
|
||||
self.error_label.setText("")
|
||||
self.error_label.setStyleSheet("QLabel { color: red; }")
|
||||
self.rfid.rfid_error_signal.connect(self.handle_rfid_error)
|
||||
self.tecna.tecna_error_signal.connect(self.handle_modbus_error)
|
||||
|
||||
|
||||
if self.config["hardware_config"]["barcode_recipe_selection"] == "present":
|
||||
self.recipe_selection_mode = "barcode"
|
||||
|
|
@ -78,6 +86,7 @@ class Test(Widget):
|
|||
self.tester_component = None
|
||||
if self.config["hardware_config"]["tecna_t3"] == "present":
|
||||
self.tester_component = "tecna_t3"
|
||||
self.components["tecna_t3"].tecna_error_signal.connect(self.handle_modbus_error)
|
||||
elif self.config["hardware_config"]["furness_controls"] == "present":
|
||||
self.tester_component = "furness_control"
|
||||
|
||||
|
|
@ -828,13 +837,89 @@ class Test(Widget):
|
|||
self.fail_cycle()
|
||||
self.change_recipe()
|
||||
|
||||
def add_error(self, message, is_error):
|
||||
"""
|
||||
Add a new error message to the list of active errors.
|
||||
|
||||
Args:
|
||||
message (str): The error message to add.
|
||||
is_error (bool): True if it's an error that should show in red, False for non-errors.
|
||||
"""
|
||||
# Avoid duplicates
|
||||
if (message, is_error) not in self.active_errors:
|
||||
self.active_errors.append((message, is_error))
|
||||
|
||||
# Start the timer if it's not already active
|
||||
if not self.error_timer.isActive():
|
||||
self.error_timer.start()
|
||||
|
||||
def remove_error(self, message):
|
||||
"""
|
||||
Remove an error message from the list of active errors.
|
||||
|
||||
Args:
|
||||
message (str): The error message to remove.
|
||||
"""
|
||||
self.active_errors = [err for err in self.active_errors if err[0] != message]
|
||||
|
||||
# Reset the error index if necessary
|
||||
if self.current_error_index >= len(self.active_errors):
|
||||
self.current_error_index = 0
|
||||
|
||||
# Stop the timer if there are no more errors
|
||||
if not self.active_errors:
|
||||
self.error_label.setText("")
|
||||
self.error_label.setStyleSheet("")
|
||||
self.error_timer.stop()
|
||||
|
||||
def display_current_error(self):
|
||||
"""
|
||||
Display the current error from the active errors list. If there are multiple errors,
|
||||
it alternates between them every time this function is called.
|
||||
"""
|
||||
if self.active_errors:
|
||||
# Get the current error message and update the label
|
||||
message, is_error = self.active_errors[self.current_error_index]
|
||||
self.error_label.setText(message)
|
||||
if is_error:
|
||||
self.error_label.setStyleSheet("color: red;")
|
||||
else:
|
||||
self.error_label.setStyleSheet("color: green;")
|
||||
|
||||
# Move to the next error for the next call to this method
|
||||
self.current_error_index = (self.current_error_index + 1) % len(self.active_errors)
|
||||
else:
|
||||
# Clear the label if there are no errors
|
||||
self.error_label.setText("")
|
||||
self.error_label.setStyleSheet("")
|
||||
|
||||
@pyqtSlot(bool)
|
||||
def handle_rfid_error(self, connected):
|
||||
"""
|
||||
Handle errors related to the RFID system.
|
||||
|
||||
Args:
|
||||
connected (bool): True if connected, False if not.
|
||||
"""
|
||||
if connected:
|
||||
self.error_label.setText("") # Update the GUI label
|
||||
self.remove_error("Errore RFID.") # Remove the RFID error
|
||||
else:
|
||||
self.error_label.setText("Errore RFID.") # Update the GUI label
|
||||
self.error_label.setStyleSheet("color: red;")
|
||||
self.add_error("Errore RFID.", True) # Add the RFID error
|
||||
|
||||
@pyqtSlot(bool, str)
|
||||
def handle_modbus_error(self, has_error, error_message):
|
||||
"""
|
||||
Handle Tecna/Modbus errors and manage the error list.
|
||||
|
||||
Args:
|
||||
has_error (bool): True if there is an error, False otherwise.
|
||||
error_message (str): The error message to add.
|
||||
"""
|
||||
#print(f"DEBUG: Modbus error handler called - has_error={has_error}, error_message={error_message}") # Debugging
|
||||
if has_error:
|
||||
self.add_error(f"Errore Tecna", True) # Add the Modbus error
|
||||
else:
|
||||
self.remove_error(f"Errore Tecna") # Remove the Modbus error
|
||||
|
||||
def update_label_with_args(self, extra_info=None):
|
||||
"""
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user