diff --git a/config/machine_settings/st-ten-9.ini b/config/machine_settings/st-ten-9.ini index 51ae392..5a18ece 100644 --- a/config/machine_settings/st-ten-9.ini +++ b/config/machine_settings/st-ten-9.ini @@ -50,15 +50,15 @@ description_field: descrizione tecna_discharge_enable: yes dimensione_lotto_abilitata: x tempo_pre_riempimento: 0 -pressione_pre_riempimento: 1000 -tempo_riempimento: 15 -tempo_assestamento: 15 +pressione_pre_riempimento: 0 +tempo_riempimento: 5 +tempo_assestamento: 10 tempo_di_test: 10 percentuale_minima_pressione_assestamento: 5 percentuale_massima_pressione_assestamento: 5 -pressione_di_test_delta_minimo: 30 -pressione_di_test: 7000 -pressione_di_test_delta_massimo: 30 +pressione_di_test_delta_minimo: 20 # +mbar +pressione_di_test: 5000 # mbar +pressione_di_test_delta_massimo: 20 # -mbar tempo_svuotamento: 1 pressione_svuotamento: 100 canale_di_prova: 1 diff --git a/runme.bat b/runme.bat index a1c7f25..f2ba63c 100644 --- a/runme.bat +++ b/runme.bat @@ -1,4 +1,4 @@ echo on SET mypath=%~dp0 cd %mypath% -.\venv\Scripts\activate.bat && python -O "./src/main.py" --no-edgetpu --no-tflite/ +.\venv\Scripts\activate.bat && python -O "./src/main.py" diff --git a/runme_noautotest.bat b/runme_noautotest.bat index 0e1d578..b689c06 100644 --- a/runme_noautotest.bat +++ b/runme_noautotest.bat @@ -1,4 +1,4 @@ echo on SET mypath=%~dp0 cd %mypath% -.\venv\Scripts\activate.bat && python -O "./src/main.py" --no-edgetpu --no-tflite --no-autotest +.\venv\Scripts\activate.bat && python -O "./src/main.py" --no-autotest diff --git a/src/components/furness_controls_fco730_registers.py b/src/components/furness_controls_fco730_registers.py index d88403a..09a81ac 100644 --- a/src/components/furness_controls_fco730_registers.py +++ b/src/components/furness_controls_fco730_registers.py @@ -1,9 +1,10 @@ registers = { "program_data":b'A000', - "product_data":b'B000', + "product_data":b'B050', "function_data":b'D000', "counters":b'E000', "change_cur_prod":b'F000', + "change_cur_prod_50":b'F050', "start_test": b'G000', "reset_test": b'H000', "version_num_range": b'I000', @@ -16,20 +17,27 @@ registers = { "printer_settings": b'S000', "comm_settings": b'x000' } -tags = { +product_tags = { "product_id": b'a', - "test_press": b'b', + "test_pressure": b'b', "tolerance": b'c', "+fail": b'd', "-fail": b'e', "fill_time": b'f', "stab_time": b'g', "test_time": b'h', - "prefill_time": b'i', + "prefill_time": b'l', "vent_time": b'm', "test_type": b'n', "fail_high": b'o', "fail_low": b'p', "outputs_a_h": b'q', - + "prefill_pressure": b'r', } + +program_tags={ + "leak_units":b'a', + "press_units": b'b', + "leak_at_eot":b'c', + "product":b'l', +} \ No newline at end of file diff --git a/src/components/furness_controls_fco780_registers.py b/src/components/furness_controls_fco780_registers.py index cb9cc6a..328b879 100644 --- a/src/components/furness_controls_fco780_registers.py +++ b/src/components/furness_controls_fco780_registers.py @@ -1,6 +1,10 @@ registers = { } -settings = { +product_tags = { + +} + +program_tags = { } diff --git a/src/components/furness_controls_leak_tester.py b/src/components/furness_controls_leak_tester.py index 6ec90d6..0ad3630 100644 --- a/src/components/furness_controls_leak_tester.py +++ b/src/components/furness_controls_leak_tester.py @@ -3,8 +3,8 @@ import sys from components.component import Component from components.furness_controls_fco730_registers import registers as fco730_registers from components.furness_controls_fco780_registers import registers as fco780_registers -from components.furness_controls_fco730_registers import settings as fco730_settings -from components.furness_controls_fco780_registers import settings as fco780_settings +from components.furness_controls_fco730_registers import product_tags as fco730_product_tags +from components.furness_controls_fco780_registers import product_tags as fco780_product_tags if "--sim-furness-controls" in sys.argv: from components.dummies.serial import serial else: @@ -21,6 +21,7 @@ class FurnessControlsLeakTester(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) + self.product_tags = None self.bytesize = None self.timeout = None self.parity = None @@ -39,10 +40,10 @@ class FurnessControlsLeakTester(Component): self.model = self.config[self.name]["model"].lower() if self.model == "fco730": self.registers = fco730_registers - self.settings = fco730_settings - elif self.model == "fco730": + self.product_tags = fco730_product_tags + elif self.model == "fco780": self.registers = fco780_registers - self.settings = fco780_settings + self.product_tags = fco780_product_tags else: raise NotImplementedError(f"Furness Controls model {self.model!r} not implemented.") self.port = self.config[self.name]["port"] @@ -50,7 +51,7 @@ class FurnessControlsLeakTester(Component): self.stopbits = 1 self.parity = serial.PARITY_NONE self.bytesize = serial.EIGHTBITS - self.timeout = 0.5 + self.timeout = 0.2 if self.conn is not None: self.conn.close() self.conn = serial.Serial( @@ -73,126 +74,13 @@ class FurnessControlsLeakTester(Component): data = data * self.units[formatting][0] return data - def _convert_to_format(self, data, formatting=None, encoding_map=None): - if formatting is not None: - data = int(data / self.units[formatting][0]) - if encoding_map is not None and data in encoding_map: - data = encoding_map[data] - return data - - @Component.reconfig_on_error - def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None, **kwargs): - if type(register) is str: - register, s = self.registers[register] - if data_type is None: - data_type = s.get("dt", None) - if gain is None: - gain = s.get("g", None) - if offset is None: - offset = s.get("o", None) - if formatting is None: - formatting = s.get("f", None) - if decoding_map is None: - decoding_map = s.get("decoding", None) - if not len(args): - args = s.get("a", []) - if not len(kwargs): - kwargs = s.get("k", {}) - if data_type is None: - data_type = "16bit_uint" - if gain is None: - gain = 1 - if offset is None: - offset = 0 - return self._convert_from_format( - super().read( - register, - *args, - data_type=data_type, - gain=gain, - offset=offset, - **kwargs, - ), - formatting=formatting, - decoding_map=decoding_map, - ) - - @Component.reconfig_on_error - def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None, **kwargs): - if type(register) is str: - register, s = self.registers[register] - if data_type is None: - data_type = s.get("dt", None) - if gain is None: - gain = s.get("g", None) - if offset is None: - offset = s.get("o", None) - if formatting is None: - formatting = s.get("f", None) - if encoding_map is None: - encoding_map = s.get("encoding", None) - if not len(args): - args = s.get("a", []) - if not len(kwargs): - kwargs = s.get("k", {}) - if data_type is None: - data_type = "16bit_uint" - if gain is None: - gain = 1 - if offset is None: - offset = 0 - return super().write( - register, - self._convert_to_format( - data, - formatting=formatting, - encoding_map=encoding_map, - ), - *args, - data_type=data_type, - gain=gain, - offset=offset, - **kwargs, - ) - @Component.reconfig_on_error def _get(self): # READ INFO - info = {r: self.read(r) for r in [ - "Real time test pressure output", - "Real time differential pressure output", - "Real time pressure line regulator", - "Active alarm flags", - "Active test program number", - "Running test: active phase", - "Running test: test type", - "Running test: sequence index", - "Digital inputs status (mask)", - # "Digital outputs status (mask)", - ]} - if self.model == "t3p": - pass - elif self.model == "t3l": - info.update({r: self.read(r) for r in [ - "Active not severe alarm flags", - ]}) - else: - raise NotImplementedError(f"Tecna t3 model {self.model!r} not implemented.") - if info["Running test: active phase"] == "FINE TEST": # "END TEST, WAITING THE START OF A NEW TEST": - info.update(self.get_test_results()) - for round_me in ["measured leak"]: - if round_me in info.keys(): - info.update({round_me:float(f"{info[round_me]:.2f}")}) - self.log.debug(str(info)) - super()._get([info]) + current_status=self.send_enquiry("current_status") - @Component.reconfig_on_error - def _set(self, val=None): - if val is not None: # handle request: - for register, value in val.items(): - print(register, value) - self.write(register, value) - super()._set(val) + self.log.info(str(current_status)) + #super()._get([]) def start_test(self, table=None): if table is None: @@ -204,65 +92,60 @@ class FurnessControlsLeakTester(Component): def stop_test(self): self.log.warning("stopping test") - pass + self.current_status = self.send_enquiry("current_status") + status = self.get_status_param("state") + if status =='0': + self.log.info("ready to start") + else: + self.log.warning("error state, performing self check to reset") + self.send_command("self_check") + + def get_status_param(self,param): + par_len = 1 + if param=="state": + tag="f" + tag_idx=self.current_status.find(b'f') + data=self.current_status[tag_idx+1:tag_idx+1+par_len] + return str(data,encoding="ascii") def get_test_results(self): self.log.info("getting test results") - return {r: self.read(r) for r in [ - #"Running test: phase backwards time", - "Running test: filling pressure", - "Running test: pressure at the end of settling", - #"Running test: burst pressure", - "Running test: measured leak", - #"Running test: calculated leak flow rate", - #"Running test: calculate RVP%", - "Running test: result", - ]} def write_recipe(self, recipe, step, table=None): - if table is None: - table = self.max_program_number - recipe_name = recipe.part_number[:16].encode("ascii") - recipe_name += b"\x00" * (16 - len(recipe_name)) - recipe_barcode = f"j{recipe.part_number}"[:16].encode("ascii") - recipe_barcode += b"\x00" * (24 - len(recipe_barcode)) - test_flags = 0b0110100001010100 if (step.spec.get("autotest", False) in ["ko_check"]) else 0b0110000001010100 - pid_mode = int(self.config["recipes_defaults"]["pid_mode"])<<4 - test_flags = test_flags | pid_mode - pid_ramps=0b0000000000000000 | int(self.config["recipes_defaults"]["pid_level"])<<8 | int(self.config["recipes_defaults"]["pid_speed"])<<12 - spec = { - "Flag: Instrument settings": 0b0000000000000000, - "Test program for read/write operation": table, - **{719 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # program name - **{727 - 1 + i: (recipe_barcode[i * 2 + 1] << 8) + recipe_barcode[i * 2] for i in range(12)}, # program associated bar-code - **{761 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 1 - # **{769 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 2 - "Print options": 0b0000000000000000 | self.saver_label_count << 12 | self.saver_print_on_fail << 8 | self.saver_label_template, - "Test type": "Leak Test", - "Test flags": test_flags, - "T0 - Pre-filling time": step.spec["pre_filling_time"], - "P0 - Pre-filling pressure": step.spec["pre_filling_pressure"], - "T1 - Filling time": step.spec["filling_time"], - "T2 - Settling time": step.spec["settling_time"], - "PR- - Min pressure tolerance %": step.spec["settling_pressure_min_percent"], - "PR+ - Max pressure tolerance % (P+)": step.spec["settling_pressure_max_percent"], - "T3 - Measure time": step.spec["test_time"], - "Q- Lower test leak limit": step.spec["test_pressure_qneg"], - "PREL - Nominal test pressure": step.spec["test_pressure"], - "Q+ Upper test leak limit": step.spec["test_pressure_qpos"], - "FST - Discharge time": step.spec["flush_time"], - "FSL - Discharge limit": step.spec["flush_pressure"], - "PSQ - Next sequence program PSOUT mode": 0, - "RAMPS: T1 configuration": pid_ramps, - "PID: pressure correction": 100, - "Various flags": 0b0000000000010000 if self.config["recipes_defaults"]["tecna_discharge_enable"]=="yes" else 0b0000000000000000 + # PREPARE DATA + product_id='"'+recipe.part_number[:16]+'"' + product_id=product_id.encode("ascii") + test_press_bar=step.spec["test_pressure"]/1000 + prefill_press_bar=step.spec["pre_filling_pressure"]/1000 + tolerance=int(step.spec["settling_pressure_min_percent"]) + fail_pos=test_press_bar+(int(step.spec["test_pressure_qpos"])/1000) + fail_neg=test_press_bar+(int(step.spec["test_pressure_qneg"])/1000) + fill_time=float(step.spec["filling_time"]) + stab_time=float(step.spec["settling_time"]) + test_time=float(step.spec["test_time"]) + prefill_time=float(step.spec["pre_filling_time"]) + vent_time=float(step.spec["flush_time"]) + # SEND RECIPE PARAMETERS + self.send_command("change_cur_prod_50") + self.send_product_tag("product_id", product_id) + self.send_product_tag("prefill_pressure", f"{prefill_press_bar:2.3f}") + self.send_product_tag("test_pressure", f"{test_press_bar:2.3f}") + self.send_product_tag("tolerance", f"{tolerance:3.1f}") + self.send_product_tag("+fail", f"{fail_pos:3.1f}") + self.send_product_tag("-fail", f"{fail_neg:3.1f}") + self.send_product_tag("fill_time", f"{fill_time:3.1f}") + self.send_product_tag("stab_time", f"{stab_time:3.1f}") + self.send_product_tag("test_time", f"{test_time:3.1f}") + self.send_product_tag("prefill_time", f"{prefill_time:3.1f}") + self.send_product_tag("vent_time", f"{vent_time:3.1f}") + self.send_product_tag("fail_high", f"{fail_pos:3.1f}") + self.send_product_tag("fail_low", f"{fail_neg:3.1f}") + self.send_product_tag("outputs_a_h", f"{int(0b01000000)}") - } - self.log.debug(str(spec)) - for register, value in spec.items(): - self.write(register, value) def send_command(self,command): + if type(command) is str: + command=self.registers[command] out_bytes= bytearray() out_bytes.extend(EOT) out_bytes.extend(self.id1) @@ -277,31 +160,45 @@ class FurnessControlsLeakTester(Component): if response == ACK: return True else: - self.log.error(f"SEND COMMAND:{response}") + self.log.error(f"SEND COMMAND({command}):{response}") return None def send_enquiry(self,enquiry): + if type(enquiry) is str: + enquiry=self.registers[enquiry] out_bytes=bytearray(EOT) out_bytes.extend(self.id1) out_bytes.extend(self.id2) out_bytes.extend(enquiry) out_bytes.extend(ENQ) - checksum=self.calc_checksum(out_bytes) + checksum = self.calc_checksum(out_bytes) out_bytes.append(checksum) self.conn.write(out_bytes) response = self.conn.read(100) - read_checksum = response[-1] + if len(response): + read_checksum = response[-1] + else: + read_checksum = None calculated_checksum = self.calc_checksum(response[0:-1],start_idx=0) if read_checksum != calculated_checksum: self.log.error(f"SEND COMMAND:{response}") else: return response - def calc_checksum(self,data,start_idx=1): - checksum=0 - for i,data_byte in enumerate(data): - if i