diff --git a/src/components/furness_controls_fco730_registers.py b/src/components/furness_controls_fco730_registers.py index 09a81ac..a8fd113 100644 --- a/src/components/furness_controls_fco730_registers.py +++ b/src/components/furness_controls_fco730_registers.py @@ -1,43 +1,73 @@ registers = { - "program_data":b'A000', - "product_data":b'B050', - "function_data":b'D000', - "counters":b'E000', - "change_cur_prod":b'F000', - "change_cur_prod_50":b'F050', - "start_test": b'G000', - "reset_test": b'H000', - "version_num_range": b'I000', - "reset_counters": b'J000', - "self_check": b'K000', - "last_test_result": b'L000', - "current_status": b'M000', - "zero_pressure": b'O000', - "security_data": b'P000', - "printer_settings": b'S000', - "comm_settings": b'x000' -} -product_tags = { - "product_id": b'a', - "test_pressure": b'b', - "tolerance": b'c', - "+fail": b'd', - "-fail": b'e', - "fill_time": b'f', - "stab_time": b'g', - "test_time": b'h', - "prefill_time": b'l', - "vent_time": b'm', - "test_type": b'n', - "fail_high": b'o', - "fail_low": b'p', - "outputs_a_h": b'q', - "prefill_pressure": b'r', -} + "commands": { + "program_data": b'A000', + "product_data": b'B050', + "function_data": b'D000', + "counters": b'E000', + "change_cur_prod": b'F000', + "change_cur_prod_50": b'F050', + "start_test": b'G000', + "reset_test": b'H000', + "version_num_range": b'I000', + "reset_counters": b'J000', + "self_check": b'K000', + "last_test_result": b'L000', + "current_status": b'M000', + "zero_pressure": b'O000', + "security_data": b'P000', + "printer_settings": b'S000', + "comm_settings": b'x000' + }, + "product_tags": { + "product_id": b'a', + "test_pressure": b'b', + "tolerance": b'c', + "+fail": b'd', + "-fail": b'e', + "fill_time": b'f', + "stab_time": b'g', + "test_time": b'h', + "prefill_time": b'l', + "vent_time": b'm', + "test_type": b'n', + "fail_high": b'o', + "fail_low": b'p', + "outputs_a_h": b'q', + "prefill_pressure": b'r', + }, -program_tags={ - "leak_units":b'a', - "press_units": b'b', - "leak_at_eot":b'c', - "product":b'l', -} \ No newline at end of file + "program_tags": { + "leak_units": b'a', + "press_units": b'b', + "leak_at_eot": b'c', + "product": b'l', + }, + "enums": { + "status_status": { + '0': 'ready_to_start', + '1': 'fault', + '2':'awaiting_reset', + '3': 'pressure_high', + '4': 'pressure_low', + '5': 'testing', + }, + "status_current_stage": { + '0': 'STANDBY', + '1': 'RIEMPIMENTO', + '2': 'ASSESTAMENTO', + '3': 'MISURA', + '4': 'ATTESA RESET', + '5': 'ATTESA START', + '6': 'ERRORE', + '7': 'pressure_high', + '8': 'pressure_low', + '9': 'jig_delay', + '10': 'prefilling', + '11': 'SCARICO', + '12': 'reserverd', + '13': 'sequence_delay', + '14': 'jig_open' + } + } + +} diff --git a/src/components/furness_controls_fco780_registers.py b/src/components/furness_controls_fco780_registers.py index 328b879..c93ddae 100644 --- a/src/components/furness_controls_fco780_registers.py +++ b/src/components/furness_controls_fco780_registers.py @@ -1,10 +1,45 @@ registers = { + "commands": { + "program_data": b'A000', + "product_data": b'B050', + "function_data": b'D000', + "counters": b'E000', + "change_cur_prod": b'F000', + "change_cur_prod_50": b'F050', + "start_test": b'G000', + "reset_test": b'H000', + "version_num_range": b'I000', + "reset_counters": b'J000', + "self_check": b'K000', + "last_test_result": b'L000', + "current_status": b'M000', + "zero_pressure": b'O000', + "security_data": b'P000', + "printer_settings": b'S000', + "comm_settings": b'x000' + }, + "product_tags": { + "product_id": b'a', + "test_pressure": b'b', + "tolerance": b'c', + "+fail": b'd', + "-fail": b'e', + "fill_time": b'f', + "stab_time": b'g', + "test_time": b'h', + "prefill_time": b'l', + "vent_time": b'm', + "test_type": b'n', + "fail_high": b'o', + "fail_low": b'p', + "outputs_a_h": b'q', + "prefill_pressure": b'r', + }, -} -product_tags = { - -} - -program_tags = { - + "program_tags": { + "leak_units": b'a', + "press_units": b'b', + "leak_at_eot": b'c', + "product": b'l', + } } diff --git a/src/components/furness_controls_leak_tester.py b/src/components/furness_controls_leak_tester.py index 0ad3630..578645e 100644 --- a/src/components/furness_controls_leak_tester.py +++ b/src/components/furness_controls_leak_tester.py @@ -1,10 +1,11 @@ +import re import sys +import time +from collections import OrderedDict from components.component import Component from components.furness_controls_fco730_registers import registers as fco730_registers from components.furness_controls_fco780_registers import registers as fco780_registers -from components.furness_controls_fco730_registers import product_tags as fco730_product_tags -from components.furness_controls_fco780_registers import product_tags as fco780_product_tags if "--sim-furness-controls" in sys.argv: from components.dummies.serial import serial else: @@ -21,6 +22,8 @@ class FurnessControlsLeakTester(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) + self.enums = None + self.current_status = None self.product_tags = None self.bytesize = None self.timeout = None @@ -31,7 +34,7 @@ class FurnessControlsLeakTester(Component): self.port = None self.model = None self.settings = None - self.registers = None + self.commands = None self.id1=b'0' self.id2=b'1' @@ -39,11 +42,9 @@ class FurnessControlsLeakTester(Component): super().config_changed() self.model = self.config[self.name]["model"].lower() if self.model == "fco730": - self.registers = fco730_registers - self.product_tags = fco730_product_tags - elif self.model == "fco780": - self.registers = fco780_registers - self.product_tags = fco780_product_tags + self.commands = fco730_registers["commands"] + self.product_tags = fco730_registers["product_tags"] + self.enums = fco730_registers["enums"] else: raise NotImplementedError(f"Furness Controls model {self.model!r} not implemented.") self.port = self.config[self.name]["port"] @@ -51,7 +52,7 @@ class FurnessControlsLeakTester(Component): self.stopbits = 1 self.parity = serial.PARITY_NONE self.bytesize = serial.EIGHTBITS - self.timeout = 0.2 + self.timeout = 0.1 if self.conn is not None: self.conn.close() self.conn = serial.Serial( @@ -62,7 +63,6 @@ class FurnessControlsLeakTester(Component): bytesize=self.bytesize, timeout=self.timeout, write_timeout=self.timeout, - #inter_byte_timeout=self.timeout, ) def _convert_from_format(self, data, formatting=None, decoding_map=None): @@ -77,39 +77,72 @@ class FurnessControlsLeakTester(Component): @Component.reconfig_on_error def _get(self): # READ INFO - current_status=self.send_enquiry("current_status") - - self.log.info(str(current_status)) - #super()._get([]) + current_status=self.get_status() + info = { + "Real time test pressure output":current_status["pressure_reading"], + "Real time differential pressure output":0, + "Real time pressure line regulator":0, + "Active alarm flags":0, + "Active test program number":0, + "Running test: active phase":current_status["current_stage"], + "Running test: measured leak":current_status["leak_reading"], + "Running test: test type":0, + "Running test: sequence index":0, + "Digital inputs status (mask)":0, + } + for round_me in ["measured leak"]: + if round_me in info.keys(): + info.update({round_me:float(f"{info[round_me]:.2f}")}) + self.log.debug(str(info)) + super()._get([info]) def start_test(self, table=None): - if table is None: - table = self.max_program_number - self.log.info(f"starting test table {table!r}") - self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)") - self.write("Selected program", table) - self.write("Start test", table) + self.log.info(f"starting test") + self.send_command("start_test") def stop_test(self): self.log.warning("stopping test") - self.current_status = self.send_enquiry("current_status") - status = self.get_status_param("state") - if status =='0': + self.current_status = self.get_status() + if self.current_status['current_stage'] in ('standby','awaiting_start'): self.log.info("ready to start") else: - self.log.warning("error state, performing self check to reset") + self.log.warning("not ready to start, performing self check to reset") self.send_command("self_check") + time.sleep(2) - def get_status_param(self,param): - par_len = 1 - if param=="state": - tag="f" - tag_idx=self.current_status.find(b'f') - data=self.current_status[tag_idx+1:tag_idx+1+par_len] - return str(data,encoding="ascii") + def get_status(self): + status_str=str(self.send_enquiry("current_status"),encoding="ascii") + status_str+='z' # dummy terminator + status_vars=OrderedDict( + { + 'a':'counter', + 'b':'product_number', + 'c':'step_number', + 'd':'new_result_available', + 'e':'mode', + 'f':'status', + 'g':'pressure_reading', + 'i':'leak_reading', + 'k':'current_stage', + 'z':'dummy' + }) + status_decoded={} + for tag,param in status_vars.items(): + next_tag=list(status_vars)[list(status_vars.keys()).index(tag) + 1] + match=re.search(f"{tag}([0-9.-]+){next_tag}",status_str) + value=None + if match is not None: + value=match.group(1) + if param == 'status': + value = self.enums['status_status'][value] + if param == 'current_stage': + value = self.enums['status_current_stage'][value] - def get_test_results(self): - self.log.info("getting test results") + status_decoded[param]=value + if next_tag =='z': + break + + return status_decoded def write_recipe(self, recipe, step, table=None): # PREPARE DATA @@ -145,7 +178,7 @@ class FurnessControlsLeakTester(Component): def send_command(self,command): if type(command) is str: - command=self.registers[command] + command=self.commands[command] out_bytes= bytearray() out_bytes.extend(EOT) out_bytes.extend(self.id1) @@ -165,7 +198,7 @@ class FurnessControlsLeakTester(Component): def send_enquiry(self,enquiry): if type(enquiry) is str: - enquiry=self.registers[enquiry] + enquiry=self.commands[enquiry] out_bytes=bytearray(EOT) out_bytes.extend(self.id1) out_bytes.extend(self.id2) @@ -182,18 +215,21 @@ class FurnessControlsLeakTester(Component): read_checksum = None calculated_checksum = self.calc_checksum(response[0:-1],start_idx=0) if read_checksum != calculated_checksum: - self.log.error(f"SEND COMMAND:{response}") + self.log.error(f"SEND ENQUIRY:{response}") + return None else: + response = response[:-2] #strip checksum & ETX return response def send_product_tag(self,tag,tag_data): self.log.info(f"Sending tag:{tag}={tag_data}") - command=bytearray(self.registers["product_data"]) + command=bytearray(self.commands["product_data"]) if type(tag_data) is str: tag_data=bytearray(tag_data,encoding="ascii") command.extend(self.product_tags[tag]) command.extend(tag_data) self.send_command(command) + @staticmethod def calc_checksum(data, start_idx=1): checksum = 0