diff --git a/config/csv_import/st-ten-9.csv b/config/csv_import/st-ten-9.csv new file mode 100644 index 0000000..b29b8b6 --- /dev/null +++ b/config/csv_import/st-ten-9.csv @@ -0,0 +1,2 @@ +codice_ricetta,cliente,codice_prodotto,descrizione +R54967,Errecinque,R54967, Tubo in metallo \ No newline at end of file diff --git a/config/machine_settings/st-ten-9.ini b/config/machine_settings/st-ten-9.ini index c15f052..a19c0f2 100644 --- a/config/machine_settings/st-ten-9.ini +++ b/config/machine_settings/st-ten-9.ini @@ -5,6 +5,7 @@ description = ST-TEN-9 SATIL SRL - BANCO COLLAUDO TUBI R5 archive_synchronizer: present label_printer: present remote_api: absent +tecna_t3: absent furness_controls: present [archive_synchronizer] diff --git a/make_desktop_file_noautotest.bat b/make_desktop_file_noautotest.bat new file mode 100644 index 0000000..64137d9 --- /dev/null +++ b/make_desktop_file_noautotest.bat @@ -0,0 +1,9 @@ +set SCRIPT="%TEMP%\%RANDOM%-%RANDOM%-%RANDOM%-%RANDOM%.vbs" +echo Set oWS = WScript.CreateObject("WScript.Shell") >> %SCRIPT% +echo sLinkFile = "%USERPROFILE%\Desktop\NO AUTOTEST.lnk" >> %SCRIPT% +echo Set oLink = oWS.CreateShortcut(sLinkFile) >> %SCRIPT% +echo oLink.TargetPath = "%userprofile%\PycharmProjects\st-ten-1\runme_noautotest.bat" >> %SCRIPT% +echo oLink.IconLocation = "%userprofile%\PycharmProjects\st-ten-1\src\ui\imgs\neo_red.ico" +echo oLink.Save >> %SCRIPT% +cscript /nologo %SCRIPT% +del %SCRIPT% diff --git a/src/components/furness_controls_fco730_registers.py b/src/components/furness_controls_fco730_registers.py new file mode 100644 index 0000000..e804068 --- /dev/null +++ b/src/components/furness_controls_fco730_registers.py @@ -0,0 +1,3 @@ +registers = { + +} diff --git a/src/components/furness_controls_fco780_registers.py b/src/components/furness_controls_fco780_registers.py new file mode 100644 index 0000000..e804068 --- /dev/null +++ b/src/components/furness_controls_fco780_registers.py @@ -0,0 +1,3 @@ +registers = { + +} diff --git a/src/components/furness_controls_leak_tester.py b/src/components/furness_controls_leak_tester.py new file mode 100644 index 0000000..96ef1d9 --- /dev/null +++ b/src/components/furness_controls_leak_tester.py @@ -0,0 +1,311 @@ +from lib.db import Recipes, Steps, db +from PyQt5.QtCore import QSemaphore, pyqtSignal +from PyQt5.QtWidgets import QMessageBox + +from .component import Component +from .modbus_component import ModbusComponent +from .furness_controls_fco730_registers import registers as fco730_registers +from .furness_controls_fco780_registers import registers as fco780_registers + + +class FurnessControlsLeakTester(Component): + + def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): + super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=None) + + def config_changed(self): + super().config_changed() + self.model = self.config[self.name]["model"].lower() + if self.model == "fco730": + self.registers = fco730_registers + elif self.model == "fco730": + self.registers = fco780_registers + else: + raise NotImplementedError(f"Furness Controls model {self.model!r} not implemented.") + self.set_measure_units() + self.units = self.get_measure_units() + self.max_program_number = self.read("Max number of programs") + self.model = self.config[self.name]["model"].lower() + self.log.info(f"units: {self.units}") + + _pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar) + _leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, } + _leak_flow_units = {"cm3/min": 0, "cm3/h": 1, } + _volume_units = {"litri": 0, "cm3": 1, } + _time_units = {"seconds": 0, } + _flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, "cfm": 3} + _pressure_units_map = {v: k for k, v in _pressure_units.items()} + _leak_units_map = {v: k for k, v in _leak_units.items()} + _leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()} + _volume_units_map = {v: k for k, v in _volume_units.items()} + _time_units_map = {v: k for k, v in _time_units.items()} + _flow_units_map = {v: k for k, v in _flow_units.items()} + + def set_measure_units(self): + if self.model == "t3p": + for register, [unit, decimals] in { + "MEASURE UNITS: Relative pressure": [self._pressure_units["mbar"], 0], # red, purple + "MEASURE UNITS: Differential (leak) pressure": [self._leak_units["mbar"], 0], # yellow + "MEASURE UNITS: Calculated leak flow rate": [self._leak_flow_units["cm3/min"], 0], # blue + "MEASURE UNITS: Volume": [self._volume_units["litri"], 0], # green + "MEASURE UNITS: Flow rate": [self._flow_units["liters/min"], 0], # orange + }.items(): + self.write(register, (decimals << 8) + unit) + elif self.model == "t3l": + for register, [unit, decimals] in { + "MEASURE UNITS: Relative pressure": [self._pressure_units["mbar"], 0], # red, purple + "MEASURE UNITS: Differential (leak) pressure": [self._leak_units["mbar"], 0], # yellow + "MEASURE UNITS: Calculated leak flow rate": [self._leak_flow_units["cm3/min"], 0], # blue + "MEASURE UNITS: Volume": [self._volume_units["litri"], 0], # green + "MEASURE UNITS: Flow rate": [self._flow_units["liters/min"], 0], # orange + }.items(): + self.write(register, unit) # (decimals << 8) + unit) + else: + raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.") + + def get_measure_units(self): + units = {} + if self.model == "t3p": + for [register, unit_map, unit_names] in [ + ["Running test: relative pressure format", self._pressure_units_map, ["relative_pressure", "red", "r", 21, ]], # also by documentation color and register number + ["Running test: differential pressure format", self._pressure_units_map, ["differential_pressure", "purple", "p", 22, ]], # also by documentation color and register number + ["Running test: relative pressure format (low resolution)", self._leak_units_map, ["relative_pressure_lr", "yellow", "y", 23, ]], # also by documentation color and register number + ["Running test: calculated leak flow rate format", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number + ["Running test: volume format", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number + ["Running test: time format", self._time_units_map, ["time", "orange", "t", 26, ]], # also by documentation color and register number + ["Running test: flow rate format", self._flow_units_map, ["flow", "white", "o", 27, ]], # also by documentation color and register number + ]: + v = self.read(register) + unit_spec = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]] + for unit_name in unit_names: + units[unit_name] = unit_spec + elif self.model == "t3l": + for [register, unit_map, unit_names] in [ + [["Running test: relative pressure scale", "Running test: relative pressure decimals"], self._pressure_units_map, ["relative_pressure", "red", "r", 1501, ]], # also by documentation color and register number + [["Running test: differential pressure scale", "Running test: differential pressure decimals"], self._pressure_units_map, ["differential_pressure", "purple", "p", 1503, ]], # also by documentation color and register number + [["Running test: relative pressure scale (low resolution)", "Running test: relative pressure decimals (low resolution)"], self._leak_units_map, ["relative_pressure_lr", "yellow", "y", 1505, ]], # also by documentation color and register number + ["Running test: calculated leak flow rate format", self._leak_flow_units_map, ["leak_flow", "blue", "b", 1507, ]], # also by documentation color and register number + ["Running test: volume format", self._volume_units_map, ["volume", "green", "g", 1508, ]], # also by documentation color and register number + ["Running test: time format", self._time_units_map, ["time", "orange", "t", 1509, ]], # also by documentation color and register number + ["Running test: flow rate format", self._flow_units_map, ["flow", "white", "o", 1510, ]], # also by documentation color and register number + ["Running test: line pressure format", self._pressure_units_map, ["line_pressure", "lp", "l", 1511, ]], # also by documentation color and register number + ]: + if type(register) is list: + v = [self.read(r) for r in register] + unit_spec = [10**(-(v[1] & 0xff)), v[0]] + else: + v = self.read(register) + unit_spec = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]] + for unit_name in unit_names: + units[unit_name] = unit_spec + else: + raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.") + return units + + def _convert_from_format(self, data, formatting=None, decoding_map=None): + if decoding_map is not None and data in decoding_map: + data = decoding_map[data] + if formatting is not None: + # units = self.units[formatting] + # data = [data * units[0], units[1]] + data = data * self.units[formatting][0] + return data + + def _convert_to_format(self, data, formatting=None, encoding_map=None): + if formatting is not None: + data = int(data / self.units[formatting][0]) + if encoding_map is not None and data in encoding_map: + data = encoding_map[data] + return data + + @Component.reconfig_on_error + def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None, **kwargs): + if type(register) is str: + register, s = self.registers[register] + if data_type is None: + data_type = s.get("dt", None) + if gain is None: + gain = s.get("g", None) + if offset is None: + offset = s.get("o", None) + if formatting is None: + formatting = s.get("f", None) + if decoding_map is None: + decoding_map = s.get("decoding", None) + if not len(args): + args = s.get("a", []) + if not len(kwargs): + kwargs = s.get("k", {}) + if data_type is None: + data_type = "16bit_uint" + if gain is None: + gain = 1 + if offset is None: + offset = 0 + return self._convert_from_format( + super().read( + register, + *args, + data_type=data_type, + gain=gain, + offset=offset, + **kwargs, + ), + formatting=formatting, + decoding_map=decoding_map, + ) + + @Component.reconfig_on_error + def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None, **kwargs): + if type(register) is str: + register, s = self.registers[register] + if data_type is None: + data_type = s.get("dt", None) + if gain is None: + gain = s.get("g", None) + if offset is None: + offset = s.get("o", None) + if formatting is None: + formatting = s.get("f", None) + if encoding_map is None: + encoding_map = s.get("encoding", None) + if not len(args): + args = s.get("a", []) + if not len(kwargs): + kwargs = s.get("k", {}) + if data_type is None: + data_type = "16bit_uint" + if gain is None: + gain = 1 + if offset is None: + offset = 0 + return super().write( + register, + self._convert_to_format( + data, + formatting=formatting, + encoding_map=encoding_map, + ), + *args, + data_type=data_type, + gain=gain, + offset=offset, + **kwargs, + ) + + @Component.reconfig_on_error + def _get(self): + # READ INFO + info = {r: self.read(r) for r in [ + "Real time test pressure output", + "Real time differential pressure output", + "Real time pressure line regulator", + "Active alarm flags", + "Active test program number", + "Running test: active phase", + "Running test: test type", + "Running test: sequence index", + "Digital inputs status (mask)", + # "Digital outputs status (mask)", + ]} + if self.model == "t3p": + pass + elif self.model == "t3l": + info.update({r: self.read(r) for r in [ + "Active not severe alarm flags", + ]}) + else: + raise NotImplementedError(f"Tecna t3 model {self.model!r} not implemented.") + if info["Running test: active phase"] == "FINE TEST": # "END TEST, WAITING THE START OF A NEW TEST": + info.update(self.get_test_results()) + for round_me in ["measured leak"]: + if round_me in info.keys(): + info.update({round_me:float(f"{info[round_me]:.2f}")}) + self.log.debug(str(info)) + super()._get([info]) + + @Component.reconfig_on_error + def _set(self, val=None): + if val is not None: # handle request: + for register, value in val.items(): + print(register, value) + self.write(register, value) + super()._set(val) + + def start_test(self, table=None): + if table is None: + table = self.max_program_number + self.log.info(f"starting test table {table!r}") + self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)") + self.write("Selected program", table) + self.write("Start test", table) + + def stop_test(self): + self.log.warning("stopping test") + self.write("Reset running test", 0) + + def get_test_results(self): + self.log.info("getting test results") + return {r: self.read(r) for r in [ + #"Running test: phase backwards time", + "Running test: filling pressure", + "Running test: pressure at the end of settling", + #"Running test: burst pressure", + "Running test: measured leak", + #"Running test: calculated leak flow rate", + #"Running test: calculate RVP%", + "Running test: result", + ]} + + def write_recipe(self, recipe, step, table=None): + if table is None: + table = self.max_program_number + recipe_name = recipe.part_number[:16].encode("ascii") + recipe_name += b"\x00" * (16 - len(recipe_name)) + recipe_barcode = f"j{recipe.part_number}"[:16].encode("ascii") + recipe_barcode += b"\x00" * (24 - len(recipe_barcode)) + test_flags = 0b0110100001010100 if (step.spec.get("autotest", False) in ["ko_check"]) else 0b0110000001010100 + pid_mode = int(self.config["recipes_defaults"]["pid_mode"])<<4 + test_flags = test_flags | pid_mode + pid_ramps=0b0000000000000000 | int(self.config["recipes_defaults"]["pid_level"])<<8 | int(self.config["recipes_defaults"]["pid_speed"])<<12 + spec = { + "Flag: Instrument settings": 0b0000000000000000, + "Test program for read/write operation": table, + **{719 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # program name + **{727 - 1 + i: (recipe_barcode[i * 2 + 1] << 8) + recipe_barcode[i * 2] for i in range(12)}, # program associated bar-code + **{761 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 1 + # **{769 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 2 + "Print options": 0b0000000000000000 | self.saver_label_count << 12 | self.saver_print_on_fail << 8 | self.saver_label_template, + "Test type": "Leak Test", + "Test flags": test_flags, + "T0 - Pre-filling time": step.spec["pre_filling_time"], + "P0 - Pre-filling pressure": step.spec["pre_filling_pressure"], + "T1 - Filling time": step.spec["filling_time"], + "T2 - Settling time": step.spec["settling_time"], + "PR- - Min pressure tolerance %": step.spec["settling_pressure_min_percent"], + "PR+ - Max pressure tolerance % (P+)": step.spec["settling_pressure_max_percent"], + "T3 - Measure time": step.spec["test_time"], + "Q- Lower test leak limit": step.spec["test_pressure_qneg"], + "PREL - Nominal test pressure": step.spec["test_pressure"], + "Q+ Upper test leak limit": step.spec["test_pressure_qpos"], + "FST - Discharge time": step.spec["flush_time"], + "FSL - Discharge limit": step.spec["flush_pressure"], + "PSQ - Next sequence program PSOUT mode": 0, + "RAMPS: T1 configuration": pid_ramps, + "PID: pressure correction": 100, + "Various flags": 0b0000000000010000 if self.config["recipes_defaults"]["tecna_discharge_enable"]=="yes" else 0b0000000000000000 + + } + if self.model == "t3p": + pass + elif self.model == "t3l": + spec.update({ + "Use programs or use products": 0, + "Nominal peak pressure": step.spec["test_pressure"], + "Pn - Nominal test pressure": step.spec["test_pressure"], + }) + else: + raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.") + self.log.debug(str(spec)) + for register, value in spec.items(): + self.write(register, value) diff --git a/src/ui/imgs/neo_red.ico b/src/ui/imgs/neo_red.ico new file mode 100644 index 0000000..82aa4f6 Binary files /dev/null and b/src/ui/imgs/neo_red.ico differ