264 lines
9.3 KiB
Python
264 lines
9.3 KiB
Python
import os.path
|
|
import sys
|
|
import weakref
|
|
|
|
from PyQt5 import QtSvg
|
|
from PyQt5.QtCore import Qt
|
|
|
|
from lib.helpers import timing
|
|
from PyQt5.QtCore import QTimer
|
|
from PyQt5.QtGui import QKeySequence, QPixmap
|
|
from PyQt5.QtWidgets import QShortcut, QApplication, QVBoxLayout
|
|
from ui.test_test import Test_Test
|
|
from lxml import etree
|
|
|
|
|
|
class Test_Instructions(Test_Test):
|
|
def __init__(self,config ,components=None, recipe=None, step=None, run_once=False, reset_on_start=False, enable_override=True,bench_name="generic"):
|
|
super().__init__(components=components, recipe=recipe, step=step, run_once=run_once, reset_on_start=reset_on_start, enable_override=enable_override)
|
|
self.get_connection = None
|
|
self.bench_name = bench_name
|
|
self.config = config
|
|
self.recipe=recipe
|
|
self.svgWidget=None
|
|
self.svg_root=None
|
|
self.flag=False
|
|
self.inputs={}
|
|
self.svg_widget=QtSvg.QSvgWidget()
|
|
self.layout = QVBoxLayout()
|
|
self.layout.addWidget(self.svg_widget)
|
|
self.svg_w.setLayout(self.layout)
|
|
self.svg_path=os.path.join("config","instruction_images",self.config["machine"].get("instruction_folder","generic"),"")
|
|
self.timer = QTimer()
|
|
self.timer.timeout.connect(self.toggle_icons)
|
|
self.expected_input_state=True
|
|
|
|
if self.config["hardware_config"]["tecna_t3"] == "absent" and self.config["hardware_config"]["furness_controls"] == "absent":
|
|
if hasattr(self, "print_b"):
|
|
self.print_b.setVisible(True)
|
|
self.print_b.setEnabled(True)
|
|
self.print_b.clicked.connect(self.print_button)
|
|
# Read the keep_true configuration flag (defaults to disabled)
|
|
self.keep_true = str(self.config.get("digital_io", {}).get("keep_true", "no")).lower() in ["yes", "true", "1"]
|
|
|
|
def start(self, recipe=None, step=None, pieces=None):
|
|
show = super().start(recipe=recipe, step=step)
|
|
|
|
self.timer.start(1000)
|
|
self.inputs = {}
|
|
# setup test loop
|
|
if "digital_io" in self.components.keys():
|
|
self.get_connection = self.components["digital_io"].out.connect(self.get)
|
|
svg_path = f"{self.svg_path}{self.recipe}.svg"
|
|
if step is not None:
|
|
if step.step_type == "instruction":
|
|
svg_path = f"{self.svg_path}{self.recipe}.svg"
|
|
if step.step_type == "instruction_extra":
|
|
svg_path = f"{self.svg_path}{self.recipe}-extra.svg"
|
|
|
|
if not os.path.exists(svg_path):
|
|
svg_path=f"{self.svg_path}DEFAULT.svg"
|
|
self.svg_root = etree.parse(svg_path)
|
|
self.svg_str = etree.tostring(self.svg_root)
|
|
self.svg_str=etree.tostring(self.svg_root)
|
|
self.expected_input_state = True if step.step_type in ("instruction","instruction_extra") else False
|
|
|
|
self.monitored_ids=self.svg_root.xpath(f'''.//*[starts-with(@id, 'sensor_')]''')
|
|
self.warning_ids=self.svg_root.xpath(f'''.//*[starts-with(@id, 'warning_')]''')
|
|
self.tape_ids=self.svg_root.xpath(f'''.//*[starts-with(@id, 'tape_')]''')
|
|
|
|
self.done = False
|
|
return show
|
|
|
|
def toggle_icons(self, step=None):
|
|
self.flag = not self.flag
|
|
|
|
# Handle tape_ids
|
|
for elem in self.tape_ids:
|
|
if self.flag:
|
|
self.show_tape(elem)
|
|
else:
|
|
self.hide_tape(elem)
|
|
|
|
for elem in self.warning_ids:
|
|
warning_name = elem.attrib['id']
|
|
warning_index_part = warning_name.split("_")[1]
|
|
|
|
try:
|
|
warning_index = int(warning_index_part) - 3
|
|
except ValueError:
|
|
warning_index = warning_index_part
|
|
|
|
if warning_index not in self.inputs:
|
|
self.inputs[warning_index] = False
|
|
|
|
if self.inputs[warning_index]:
|
|
self.show_ok(elem)
|
|
self.show_icon(elem)
|
|
else:
|
|
self.show_warning_ko(elem)
|
|
if self.flag:
|
|
self.show_icon(elem)
|
|
else:
|
|
self.hide_icon(elem)
|
|
|
|
for elem in self.monitored_ids:
|
|
sensor_name = elem.attrib['id']
|
|
sensor_index_part = sensor_name.split("_")[1]
|
|
|
|
try:
|
|
sensor_index = int(sensor_index_part) - 3
|
|
except ValueError:
|
|
sensor_index = sensor_index_part
|
|
|
|
inverse = '_inv' in sensor_name
|
|
stat = '_stat' in sensor_name
|
|
|
|
if sensor_index not in self.inputs:
|
|
self.inputs[sensor_index] = False
|
|
if stat:
|
|
if inverse and not self.inputs[sensor_index]:
|
|
self.show_ok(elem)
|
|
self.show_icon(elem)
|
|
else:
|
|
self.show_ok(elem)
|
|
self.show_icon(elem)
|
|
else:
|
|
if inverse:
|
|
if not self.inputs[sensor_index]:
|
|
self.show_ko(elem)
|
|
self.show_icon(elem)
|
|
else:
|
|
self.show_ok(elem)
|
|
if self.flag:
|
|
self.show_icon(elem)
|
|
else:
|
|
self.hide_icon(elem)
|
|
else:
|
|
if self.inputs[sensor_index]:
|
|
self.show_ok(elem)
|
|
self.show_icon(elem)
|
|
else:
|
|
self.show_ko(elem)
|
|
if self.flag:
|
|
self.show_icon(elem)
|
|
else:
|
|
self.hide_icon(elem)
|
|
|
|
self.show_svg()
|
|
def print_button(self):
|
|
super().get([{
|
|
"time": timing(),
|
|
"results": {
|
|
"ok": True,
|
|
},
|
|
}], override=False, fail=False, skip_delay=False)
|
|
|
|
self.set_done()
|
|
|
|
self.toggle_icons()
|
|
|
|
def show_ok(self,id):
|
|
id.attrib["{http://www.w3.org/1999/xlink}href"]= f"{self.svg_path}img/ok.png"
|
|
def show_ko(self,id):
|
|
id.attrib["{http://www.w3.org/1999/xlink}href"]= f"{self.svg_path}img/arw-yel-down.png"
|
|
|
|
def show_icon(self,id):
|
|
id.set("display", "inline")
|
|
|
|
def hide_icon(self,id):
|
|
id.set("display", "none")
|
|
|
|
def show_warning_ko(self,id):
|
|
id.attrib["{http://www.w3.org/1999/xlink}href"]= f"{self.svg_path}img/warning.png"
|
|
|
|
def show_tape(self,id):
|
|
id.attrib["{http://www.w3.org/1999/xlink}href"]= f"{self.svg_path}img/tape_black.png"
|
|
id.set("display", "inline")
|
|
|
|
def hide_tape(self,id):
|
|
id.set("display", "none")
|
|
|
|
def show_svg(self):
|
|
self.svg_str = etree.tostring(self.svg_root)
|
|
self.svg_widget.load(self.svg_str)
|
|
QApplication.processEvents()
|
|
|
|
def get(self, data=None, override=False):
|
|
if self.parent_assembly_widget().parent().step.step_type == "select_recipe":
|
|
self.stop()
|
|
return
|
|
if self.done: # avoid proccessing if completed
|
|
return
|
|
if data is None:
|
|
super().get(None, override=override)
|
|
return
|
|
|
|
# check if all sensor are ok
|
|
ok = True
|
|
if len(data[0]["digital_io"]) == 0:
|
|
return
|
|
|
|
for sensor in self.monitored_ids:
|
|
sensor_name = sensor.attrib['id']
|
|
sensor_index_part = sensor.attrib['id'].split("_")[1]
|
|
|
|
try:
|
|
sensor_index = int(sensor_index_part) - 3
|
|
except ValueError:
|
|
sensor_index = sensor_index_part
|
|
|
|
inverse = 'inv' in sensor_name
|
|
byte_idx = int(sensor_index / 8) if isinstance(sensor_index, int) else None
|
|
bit_idx = sensor_index % 8 if isinstance(sensor_index, int) else None
|
|
current_bitstate = data[0]["digital_io"][byte_idx][bit_idx] if isinstance(sensor_index, int) else None
|
|
|
|
if 'stat' in sensor_name:
|
|
continue
|
|
|
|
if isinstance(sensor_index, int):
|
|
is_expected = (inverse and current_bitstate != self.expected_input_state) or (not inverse and current_bitstate == self.expected_input_state)
|
|
|
|
if self.keep_true:
|
|
# Memorize the True state if the condition is met
|
|
if is_expected:
|
|
self.inputs[sensor_index] = True
|
|
elif sensor_index not in self.inputs:
|
|
self.inputs[sensor_index] = False
|
|
else:
|
|
# Live tracking (Original behavior expectation when keep_true is not enabled)
|
|
self.inputs[sensor_index] = is_expected
|
|
|
|
if not self.inputs.get(sensor_index, False):
|
|
ok = False
|
|
else:
|
|
ok = False
|
|
|
|
if ok:
|
|
self.stop()
|
|
super().get([{
|
|
"time": timing(),
|
|
"results": {
|
|
"ok": ok,
|
|
},
|
|
}], override=override, fail=ok is False, skip_delay=False)
|
|
self.set_done()
|
|
|
|
self.toggle_icons()
|
|
|
|
def set_done(self):
|
|
self.done=True
|
|
self.done_ok=True
|
|
|
|
|
|
def stop(self):
|
|
#self.done=True
|
|
#self.done_ok = True
|
|
self.timer.stop()
|
|
if self.get_connection is not None:
|
|
self.disconnect(self.get_connection)
|
|
super().stop()
|
|
|
|
def reset(self):
|
|
super().reset()
|