vision wip

This commit is contained in:
matteo porta 2022-06-22 17:18:29 +02:00
parent dec264e054
commit 4537c9fbc1
28 changed files with 591 additions and 391 deletions

View File

@ -15,7 +15,7 @@ export QT_NO_WARNING_OUTPUT=0
python -B -u "./src/main.py" \
--auto-login-admin \
--auto-select \
--camera-edits \
--no-autotest \
--no-edgetpu \
--no-tflite \
--sim-camera \
@ -27,6 +27,7 @@ $* 2> >(sed $'s/.*/\e[31m&\e[m/' >&2) # &
# --archive \
# --auto-login-user \
# --autotests-archive \
# --camera-edits \
# --sim-archiver \
# --sim-serial-label-printer \
# --users-management \

View File

@ -1,4 +1,5 @@
from .archive_synchronizer import ArchiveSynchronizer
from .consumer import Consumer
from .galaxy_camera import GalaxyCamera
from .modbus_component import ModbusComponent
from .os_label_printer import Os_Label_Printer

View File

@ -39,7 +39,7 @@ class Component(QObject):
self._paused = paused
self._started = False
self._running = False
self.sources = {}
self.sources = None
if self._threaded:
self._lock = QSemaphore(1)
self._lock.acquire(max(self._lock.available(), 1))
@ -78,7 +78,8 @@ class Component(QObject):
self._resume.connect(self._do_resume)
self._set_sources.connect(self._do_set_sources)
self._set_period.connect(self._do_set_period)
self.config.updated.connect(self._config_changed)
if self.config is not None:
self.config.updated.connect(self._config_changed)
self._config_changed()
self._init_periodic()
self._started = True
@ -149,13 +150,16 @@ class Component(QObject):
else:
self._do_resume()
def set_sources(self, sources=None): # sources should be {"source_name": signal_to_connect}
def set_sources(self, sources=None): #
"""
connect the given sources to trigger a call to _get
connect the given sources to trigger a call to _get,
the sources parameter should be:
a dict of signals might containing one optional argument that will be passed as data to _get
a dict of signals ({"<source_name>": <signal_to_connect>, ...})
signals might contain one optional argument that will be passed as data to _get
or None to disconnect all sources
"""
if sources is not None and not len(sources):
sources = None
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
self._set_sources.emit(sources)
@ -163,6 +167,44 @@ class Component(QObject):
else:
self._do_set_sources(sources)
def add_sources(self, sources=None, overwrite_conflicting_sources=False): # sources should be {"source_name": signal_to_connect, ...}
"""
add the given sources to the current ones,
the sources parameter should be:
a dict of signals ({"<source_name>": <signal_to_connect>, ...})
signals might contain one optional argument that will be passed as data to _get
or None if no sources are to be added
this method calls set_sources, this is semplest but not the most efficient approach
"""
if self.sources is None:
self_sources = {}
else:
self_sources = self.sources
if sources is None:
sources = {}
if not overwrite_conflicting_sources:
conflicting_sources = {
n: [self_sources[n], sources[n]]
for n in self_sources.keys() & sources.keys()
if self_sources[n] is not sources[n]
}
if len(conflicting_sources):
raise AssertionError("\n\t" + "\n\t".join([f"source named {n!r}: {s[0]!r} will not be replaced with {s[1]!r}" for n, s in conflicting_sources]))
self.set_sources({**self_sources, **sources})
def remove_sources(self, sources=None):
"""
remove the given sources to the current ones by name,
the sources parameter should be:
an iterable of source names (["<source_name>", ...])
or None if no sources are to be removed
this method calls set_sources, this is semplest but not the most efficient approach
"""
if sources is None:
return
sources = set(sources)
self.set_sources({n: s for n, s in self.sources.items() if n not in sources})
def _init_periodic(self):
if self._period is not None:
if self._timer is None:
@ -178,7 +220,7 @@ class Component(QObject):
"""will set the period for periodic calls to _get and whether or not those are lazy (see init parameters)"""
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
self._set_sources.emit({"period": period, "lazy": lazy})
self._set_period.emit({"period": period, "lazy": lazy})
self.wait_ready()
else:
self._do_set_period({"period": period, "lazy": lazy})
@ -187,7 +229,7 @@ class Component(QObject):
if self._timer is not None:
self._timer.timeout.connect(self._get)
self._timer.start()
self.log.debug(f"started periodic: {list(self.sources)}")
self.log.debug("started periodic")
else:
self.log.debug("no started periodic")
@ -198,7 +240,7 @@ class Component(QObject):
self._timer.timeout.disconnect()
except TypeError:
pass
self.log.debug(f"stopped periodic: {list(self.sources)}")
self.log.debug("stopped periodic")
else:
self.log.debug("no stopped periodic")
@ -255,18 +297,24 @@ class Component(QObject):
if self._threaded:
self._lock.release()
def _get(self, data=None):
def _get(self, data=None, emit=True):
"""
this method should be overridden when inheriting from the Component class
the overriding method should retrive all the data and then call super()._get(data)
this will emit the data in the proper format
this will emit the data in the proper format if the emit parameter is not set to False
"""
if data is None:
data = [None]
t = timing()
got = [{"time": t, self.name: d} for d in data]
self.out.emit(got)
self.log.debug(f"_get: {got}")
if emit:
if data is None:
data = [None]
t = timing()
got = [{
"time": t if type(d) is not dict or "time" not in d else d["time"],
self.name: d,
} for d in data]
self.out.emit(got)
self.log.debug(f"_get: {got}")
else:
self.log.debug("_get")
if self._timer is not None and self._single_shot:
self._timer.start()

View File

@ -0,0 +1,50 @@
from collections import deque
from PyQt5.QtCore import QMutex
from .component import Component
class Consumer(Component):
def __init__(self, work, work_fifo=True, drop_fifo=True, work_maxlen=None, config=None, name=None, holdoff=0.1, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=holdoff, lazy=lazy, paused=paused, threaded=threaded)
self.work = work
self.work_fifo = work_fifo
self.drop_fifo = drop_fifo
self.work_queue = deque(maxlen=work_maxlen)
self.lock = QMutex()
def add_consumable(self, consumable): # should be called from another thread
self.lock.lock()
# check work queue is not full
if self.work_queue.maxlen is not None and len(self.work_queue) >= self.work_queue.maxlen:
if self.drop_fifo:
skipped = self.work_queue.popleft()
else:
skipped = self.work_queue.pop()
# self.log.debug(f"skipped consumable: {skipped!r}")
self.log.debug("skipped consumable")
# add consumable to work queue
self.work_queue.append(consumable)
self.lock.unlock()
def _get(self):
self.lock.lock()
if len(self.work_queue):
if self.work_fifo:
consumed = self.work_queue.popleft()
else:
consumed = self.work_queue.pop()
self.lock.unlock()
self.log.debug("working...")
result = None
result = self.work(consumed)
# self.log.debug(f"result: {result!r}")
self.log.debug("done working")
super()._get([{
"consumed": consumed,
"result": result,
}])
else:
self.lock.unlock()
super()._get(emit=False)

View File

@ -1 +1 @@
from .modbus_client import *
from .modbus_client import ModbusClient

View File

@ -1,6 +1,7 @@
class ModbusClient:
def __init__(self, *args, **kwargs):
print(__name__, "initialized with", args, kwargs, flush=True)
# print(__name__, "initialized with", args, kwargs, flush=True)
pass
def connect(self):
self._is_socket_open = True

View File

@ -146,6 +146,13 @@ class GalaxyCamera(Component):
self.edits_dialog = EditsDialog(self.roi)
self.edits_dialog.edits_changed.connect(self.set_edits)
self._edits_new_frame.connect(self.edits_dialog.save_and_show_edits_new_frame)
self.edits_dialog.edits_pause.connect(self.toggle_paused)
def toggle_paused(self):
if self.running:
self.pause()
else:
self.resume()
def set_edits(self, edits=None):
self.edits = edits
@ -173,6 +180,7 @@ class GalaxyCamera(Component):
class EditsDialog(QDialog):
edits_changed = pyqtSignal(dict)
edits_pause = pyqtSignal()
def __init__(self, roi):
super().__init__()
@ -197,6 +205,9 @@ class EditsDialog(QDialog):
self.edits_frame_l = QLabel()
self.edits_frame_l.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
layout.addRow(self.edits_frame_l)
self.edits_pause_b = QPushButton("toggle paused")
layout.addRow(self.edits_pause_b)
self.edits_pause_b.clicked.connect(self.edits_pause)
for edit, limits in self.edits_specs.items():
limit, multiplier, slider, label = limits
slider.setRange(limit[0] * multiplier, limit[1] * multiplier)

View File

@ -1,171 +0,0 @@
import gc
import os
import random
import re
import sys
import numpy as np
import tensorflow as tf
from lib.helpers import log_msg
from lib.object_detection.utils import label_map_util
from PyQt5.QtCore import QMutex, pyqtSignal, pyqtSlot
from .input import Input
# Patch the location of gfile
tf.gfile = tf.io.gfile
class Terminals(Input):
loading_model_signal = pyqtSignal(dict)
def __init__(self, bench, name, config):
super().__init__(bench, name, config)
self.simulate = "--sim-vision" in sys.argv
self.source = self.bench.inputs[self.config["source"]]
self.model_name = self.config["model"]
self.num_classes = self.config["num_classes"]
self.matching_distance = self.config["matching_distance"]
self.crop = self.bench.zones["terminals"]["box"]
self.subzones = {k: self.bench.zones[k] for k in self.bench.zones if re.search("t[0-9].*", k)}
self.threshold = self.config["threshold"]
if not self.simulate:
# MODEL
self.model = None
self.model_lock = QMutex()
self.load_model(self.model_name)
label_map = label_map_util.load_labelmap("config/vision_test_labels/terminals.pbtxt")
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.category_index = label_map_util.create_category_index(categories)
# TO INITIALIZE TENSORFLOW MODELS
if not self.simulate:
img = np.zeros([1, 1, 3], dtype=np.uint8)
input_tensor = tf.convert_to_tensor(img)
input_tensor = input_tensor[tf.newaxis, ...]
self.model_lock.lock()
self.bench.gpu_mutex.lock()
self.model(input_tensor)
self.bench.gpu_mutex.unlock()
self.model_lock.unlock()
self.last_frame = None
def load_model(self, model_name=None):
log_msg("TERMINALS NEURAL NETWORK MODEL CHOSEN: {}".format(model_name))
if model_name.lower() in [
"",
"any",
"last",
"latest",
"newest",
"none",
None,
]:
model_name = sorted([d for d in os.listdir("neural_networks") if os.path.isdir(os.path.join("neural_networks", d)) and d.startswith("t")], reverse=True)[0]
self.loading_model_signal.emit({"status": "loading"})
self.model_lock.lock()
log_msg(f"LOADING TERMINALS NEURAL NETWORK MODEL: {model_name}", msg_type="tensorflow")
try:
with tf.device('/device:GPU:0'):
model = tf.saved_model.load(f"neural_networks/{model_name}")
# if "--terminals-tf-default-signature" in sys.argv:
# model = model.signatures["serving_default"]
self.model_name = model_name
except Exception as e:
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "aborted"})
raise e
if self.model is not None:
self.model = None
tf.keras.backend.clear_session()
gc.collect()
self.model = model
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "done"})
@pyqtSlot(list)
def _get(self, frame):
img = frame[1]
img = img[self.crop[1]:self.crop[3], self.crop[0]:self.crop[2]]
self.last_frame = img
if self.simulate:
w = self.crop[2] - self.crop[0]
h = self.crop[3] - self.crop[1]
detected_zones = {}
for zone_name, zone in self.subzones.items():
detected_zones[zone_name] = {
"class": random.choice(list(self.category_index.values())),
"score": 1,
"box": [
(zone["box"][1] - self.crop[1]) / h,
(zone["box"][0] - self.crop[0]) / w,
(zone["box"][3] - self.crop[1]) / h,
(zone["box"][2] - self.crop[0]) / w,
],
"center": [
(zone["center"][1] - self.crop[1]) / h,
(zone["center"][0] - self.crop[0]) / w,
],
}
# self.update.emit([frame[0], detected_zones, self.name])
self.out.emit([frame[0], detected_zones, self.name])
return
input_tensor = tf.convert_to_tensor(np.asarray(img))
input_tensor = input_tensor[tf.newaxis, ...]
if not self.simulate:
# RUN INFERENCE
self.model_lock.lock()
self.bench.gpu_mutex.lock()
output = self.model(input_tensor)
self.bench.gpu_mutex.unlock()
self.model_lock.unlock()
else:
output = {"detection_classes": [[None]], "detection_scores": [[0]], "detection_boxes": [[None]]}
# create detections
detections = []
for d_class, d_score, d_box in zip(output["detection_classes"][0], output["detection_scores"][0], output["detection_boxes"][0]):
if d_score < self.threshold:
continue
detections.append({
"class": self.category_index[int(d_class)],
"score": d_score.numpy().tolist(),
"box": d_box.numpy().tolist(),
"center": self.bench.get_center(d_box.numpy().tolist()),
})
# match detections with zones
w = self.crop[2] - self.crop[0]
h = self.crop[3] - self.crop[1]
detected_zones = {zone_name: None for zone_name in self.subzones}
for detection in detections:
d_center = [self.crop[0] + detection["center"][1] * w, self.crop[1] + detection["center"][0] * h]
min_distance = sys.maxsize
closest_zone = None
for zone_name, zone in self.subzones.items():
distance = self.bench.get_distance(d_center, zone["center"])
if distance < min_distance and distance <= self.matching_distance:
min_distance = distance
closest_zone = zone_name
if closest_zone is None:
continue
if detected_zones[closest_zone] is not None:
old_center = [self.crop[0] + detected_zones[closest_zone]["center"][1] * w, self.crop[1] + detected_zones[closest_zone]["center"][0] * h]
if self.bench.get_distance(old_center, zone["center"]) <= self.bench.get_distance(d_center, zone["center"]):
continue
detected_zones[closest_zone] = detection
for zone_name, zone in self.subzones.items():
if detected_zones[zone_name] is None:
detected_zones[zone_name] = {
"class": {"id": None, "name": "no_detection", "color": "rgb(0,0,0)", "color_qt": [0, 0, 0]},
"score": 1,
"box": [
(zone["box"][1] - self.crop[1]) / h,
(zone["box"][0] - self.crop[0]) / w,
(zone["box"][3] - self.crop[1]) / h,
(zone["box"][2] - self.crop[0]) / w,
],
"center": [
(zone["center"][1] - self.crop[1]) / h,
(zone["center"][0] - self.crop[0]) / w,
],
}
# self.update.emit([frame[0], detected_zones, self.name])
self.out.emit([frame[0], detected_zones, self.name])

View File

@ -8,10 +8,12 @@ import numpy
import numpy as np
import tensorflow as tf
from lib.helpers.object_detection.utils import label_map_util
from PyQt5.QtCore import QFileSystemWatcher, QMutex, pyqtSignal
from PyQt5.QtCore import (QFileSystemWatcher, QMutex, QThread, QTimer,
pyqtSignal)
from PyQt5.QtGui import QColor
from .component import Component
from .consumer import Consumer
if "--no-edgetpu" not in sys.argv:
if "--no-tflite" not in sys.argv:
@ -38,11 +40,22 @@ class Vision(Component):
status_signal = pyqtSignal(dict)
loading_model_signal = pyqtSignal(dict)
def __init__(self, config=None, name=None, period=None, lazy=True, paused=False, threaded=True, registers=None):
def __init__(self, config=None, name=None, period=None, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.lock = QMutex()
self.simulate = "--sim-vision" in sys.argv
def start(self):
self.model = None
self.consumer = Consumer(work=self.check_features, work_fifo=True, drop_fifo=True, work_maxlen=1, name="vision_consumer", paused=False)
self.consumer_thread = QThread()
self.consumer_thread.setTerminationEnabled(True)
self.consumer.moveToThread(self.consumer_thread)
self.consumer_thread.started.connect(self.consumer.start)
self.consumer_thread.start()
self.consumer.wait_ready()
self.consumer.out.connect(self.process_consumed)
super().start()
def config_changed(self):
# OBJECT DETECTION
@ -213,6 +226,7 @@ class Vision(Component):
return pow((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2, 1 / 2)
def load_model(self, model=None):
# print("VISION CONSUMER", str(int(QThread.currentThreadId())), flush=True)
self.log.info(f"requested neural network: {model!r}")
if model is None or model.lower() in [
"",
@ -324,15 +338,20 @@ class Vision(Component):
"pos_rel_mm": self.get_pos_rel_mm(center),
}
parsed_detections.append(detection)
return parsed_detections
return {"result": parsed_detections, "ok": False}
def _get(self, data):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if not self.lock.tryLock():
self.log.debug("skipped frame")
self.consumer.add_consumable(data[-1][list(self.sources)[0]])
super()._get(emit=False)
def process_consumed(self, data=None):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if data is None:
return
self.log.debug("detecting...")
detections = self.check_features(data[-1][list(self.sources)[0]], lock=False)
self.lock.unlock()
self.log.debug(f"detected {detections}")
super()._get([detections])
data = data[-1][self.consumer.name]
if data is not None:
super()._get([{
"frame": data["consumed"],
"vision": data["result"],
}])

View File

@ -30,7 +30,7 @@ class VisionSaver(Component):
out_path = save_dir / f"{timestamp}.png"
self.log.info(f"saving {out_path}")
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
if mask:
if mask and self.mask_zones is not None:
height, width, channels = img.shape
out = np.full(
[height, width, channels],

View File

@ -1,172 +0,0 @@
import gc
import os
import random
import re
import sys
import numpy as np
import tensorflow as tf
from lib.helpers import log_msg
from lib.object_detection.utils import label_map_util
from PyQt5.QtCore import QMutex, pyqtSignal, pyqtSlot
from .input import Input
# Patch the location of gfile
tf.gfile = tf.io.gfile
class Wires(Input):
loading_model_signal = pyqtSignal(dict)
def __init__(self, bench, name, config):
super().__init__(bench, name, config)
self.simulate = "--sim-vision" in sys.argv
self.source = self.bench.inputs[self.config["source"]]
self.model_name = self.config["model"]
self.num_classes = self.config["num_classes"]
self.matching_distance = self.config["matching_distance"]
self.crop = self.bench.zones["wires"]["box"]
self.subzones = {k: self.bench.zones[k] for k in self.bench.zones if re.search("w[0-9].*", k)}
self.threshold = self.config["threshold"]
if not self.simulate:
# MODEL
self.model = None
self.model_lock = QMutex()
self.load_model(self.model_name)
label_map = label_map_util.load_labelmap("config/vision_test_labels/wires.pbtxt")
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.category_index = label_map_util.create_category_index(categories)
# TO INITIALIZE TENSORFLOW MODELS
if not self.simulate:
img = np.zeros([1, 1, 3], dtype=np.uint8)
input_tensor = tf.convert_to_tensor(img)
input_tensor = input_tensor[tf.newaxis, ...]
self.model_lock.lock()
self.bench.gpu_mutex.lock()
self.model(input_tensor)
self.bench.gpu_mutex.unlock()
self.model_lock.unlock()
self.last_frame = None
def load_model(self, model_name=None):
log_msg("WIRES NEURAL NETWORK MODEL CHOSEN: {}".format(model_name))
if model_name.lower() in [
"",
"any",
"last",
"latest",
"newest",
"none",
None,
]:
model_name = sorted([d for d in os.listdir("neural_networks") if os.path.isdir(os.path.join("neural_networks", d)) and d.startswith("w")], reverse=True)[0]
self.loading_model_signal.emit({"status": "loading"})
self.model_lock.lock()
log_msg(f"LOADING WIRES NEURAL NETWORK MODEL: {model_name}", msg_type="tensorflow")
try:
with tf.device('/device:GPU:1'):
model = tf.saved_model.load(f"neural_networks/{model_name}")
# if "--wires-tf-default-signature" in sys.argv:
# model = model.signatures["serving_default"]
self.model_name = model_name
except Exception as e:
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "aborted"})
raise e
if self.model is not None:
self.model = None
tf.keras.backend.clear_session()
gc.collect()
self.model = model
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "done"})
@pyqtSlot(list)
def _get(self, frame):
img = frame[1]
img = img[self.crop[1]:self.crop[3], self.crop[0]:self.crop[2]]
self.last_frame = img
if self.simulate:
w = self.crop[2] - self.crop[0]
h = self.crop[3] - self.crop[1]
detected_zones = {}
for zone_name, zone in self.subzones.items():
detected_zones[zone_name] = {
"class": random.choice(list(self.category_index.values())),
"score": 1,
"box": [
(zone["box"][1] - self.crop[1]) / h,
(zone["box"][0] - self.crop[0]) / w,
(zone["box"][3] - self.crop[1]) / h,
(zone["box"][2] - self.crop[0]) / w,
],
"mask": np.full((15, 15), 0, dtype=np.float32).tolist(),
"center": [
(zone["center"][1] - self.crop[1]) / h,
(zone["center"][0] - self.crop[0]) / w,
],
}
# self.update.emit([frame[0], detected_zones, self.name])
self.out.emit([frame[0], detected_zones, self.name])
return
input_tensor = tf.convert_to_tensor(np.asarray(img))
input_tensor = input_tensor[tf.newaxis, ...]
if not self.simulate:
# RUN INFERENCE
self.bench.gpu_mutex.lock()
output = self.model(input_tensor)
self.bench.gpu_mutex.unlock()
else:
output = {"detection_classes": [[None]], "detection_scores": [[0]], "detection_boxes": [[None]], "detection_masks": [[None]]}
# create detections
detections = []
for d_class, d_score, d_box, d_mask in zip(output["detection_classes"][0], output["detection_scores"][0], output["detection_boxes"][0], output["detection_masks"][0]):
if d_score < self.threshold:
continue
detections.append({
"class": self.category_index[int(d_class)],
"score": d_score.numpy().tolist(),
"box": d_box.numpy().tolist(),
"mask": d_mask.numpy().tolist(),
"center": self.bench.get_center(d_box.numpy().tolist()),
})
# match detections with zones
w = self.crop[2] - self.crop[0]
h = self.crop[3] - self.crop[1]
detected_zones = {zone_name: None for zone_name in self.subzones}
for detection in detections:
d_center = [self.crop[0] + detection["center"][1] * w, self.crop[1] + detection["center"][0] * h]
min_distance = sys.maxsize
closest_zone = None
for zone_name, zone in self.subzones.items():
distance = self.bench.get_distance(d_center, zone["center"])
if distance < min_distance and distance <= self.matching_distance:
min_distance = distance
closest_zone = zone_name
if closest_zone is None:
continue
if detected_zones[closest_zone] is not None:
old_center = [self.crop[0] + detected_zones[closest_zone]["center"][1] * w, self.crop[1] + detected_zones[closest_zone]["center"][0] * h]
if self.bench.get_distance(old_center, zone["center"]) <= self.bench.get_distance(d_center, zone["center"]):
continue
detected_zones[closest_zone] = detection
for zone_name, zone in self.subzones.items():
if detected_zones[zone_name] is None:
detected_zones[zone_name] = {
"class": {"id": None, "name": "no_detection", "color": "rgb(0,0,0)", "color_qt": [0, 0, 0]},
"score": 1,
"box": [
(zone["box"][1] - self.crop[1]) / h,
(zone["box"][0] - self.crop[0]) / w,
(zone["box"][3] - self.crop[1]) / h,
(zone["box"][2] - self.crop[0]) / w,
],
"mask": np.full((15, 15), 0, dtype=np.float32).tolist(),
"center": [
(zone["center"][1] - self.crop[1]) / h,
(zone["center"][0] - self.crop[0]) / w,
],
}
# self.update.emit([frame[0], detected_zones, self.name])
self.out.emit([frame[0], detected_zones, self.name])

View File

@ -76,13 +76,13 @@ try:
# INIT COMPONENT
self.components_specs = {
"archive_synchronizer": {"c": ArchiveSynchronizer},
"galaxy_camera": {"c": GalaxyCamera},
"galaxy_camera": {"c": GalaxyCamera, "k": {"paused": True}},
"label_printer": {"c": Os_Label_Printer, "t": False},
"remote_api": {"c": RemoteAPI, "k": {"main": self}},
"tecna_marposs_provaset_t3": {"c": TecnaMarpossProvasetT3},
"test_component": {"c": TestComponent},
"vision_saver": {"c": VisionSaver, "t": False},
"vision": {"c": Vision},
"vision": {"c": Vision, "k": {"paused": True}},
}
self.components = {}
self.threads = {}
@ -104,9 +104,8 @@ try:
logging.exception(traceback.format_exc())
QMessageBox.critical(None, "Errore Banco", f"Non e stato possibile connettersi al banco:\n\n{e}")
quit()
# CONNECT VISION WORKFLOW
# connect camera frames to vision
self.components["vision"].set_sources({"galaxy_camera": self.components["galaxy_camera"].out})
self.components["vision"].wait_ready()
# GUI INIT
# self.main_window = Main_Window(self.bench)
self.main_window = Main_Window()
@ -164,7 +163,7 @@ try:
self.open_test()
def open_test(self):
self.main_window.open_tab(Test(self.config.machine_id))
self.main_window.open_tab(Test(self.config.machine_id, self.components))
if __name__ == "__main__":
app = QApplication(sys.argv)

View File

@ -11,8 +11,10 @@ from .qml_widget import Qml_Widget
from .recipe_editor import Recipe_Editor
from .recipe_selection import Recipe_Selection
from .test import Test
from .test_admin_permission import Test_Admin_Permission
from .test_autotest import Test_Autotest
from .test_home import Test_Home
from .test_vision import Test_Vision
from .users_management import Users_Management
from .widget import Widget
from .window import Window

View File

@ -1,2 +1,4 @@
from .CopyPastableCrudQTableWidget import CopyPastableCrudQTableWidget
from .crud import *
from .crud import (Cell, Combo_Box_Cell_Widget, Crud,
External_Dialog_Cell_Widget,
Json_External_Dialog_Cell_Widget, Line_Edit_Cell_Widget)

BIN
src/ui/imgs/warning.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

View File

@ -1 +1 @@
from .login import *
from .login import Login

View File

@ -1 +1,2 @@
from .recipe_selection import *
from .recipe_selection import (Json_Spec_External_Dialog_Cell_Widget,
Recipe_Selection)

View File

@ -1 +1 @@
from .test import *
from .test import Test

View File

@ -10,13 +10,15 @@ from ui.helpers import replace_widget
from ui.recipe_selection import Recipe_Selection
from ui.test_assembly import Test_Assembly
from ui.test_autotest import Test_Autotest
from ui.test_vision import Test_Vision
from ui.widget import Widget
class Test(Widget):
def __init__(self, system_name=None):
def __init__(self, system_name=None, components=None):
super().__init__()
self.system_name = system_name
self.components = components
# GET LOGGER
self.log = logging.getLogger("Test")
# SHOW USERNAME
@ -33,15 +35,16 @@ class Test(Widget):
# INIT CYCLE STATES
self.cycle_state = None
self.cycle_states = {
"select_recipe": Test_Assembly(None, u"SELEZIONARE IL CODICE DA COLLAUDARE", Recipe_Selection()),
# "assembly_1": Test_Assembly(self.select_step_img("assembly_1"), u"INSERIRE SENSORE"),
"autotest": Test_Assembly(None, u"ESEGUIRE PROCEDURA DI AUTOTEST", Test_Autotest()),
"assembly_1": Test_Assembly(self.select_step_img("assembly_1"), u"INSERIRE SENSORE"),
"done": Test_Assembly(self.select_step_img("success"), u"COLLAUDO COMPLETATO - RIMUOVERE IL SENSORE"),
"wait": Test_Assembly(self.select_step_img("wait"), u"ATTENDERE - PAUSA INTER CICLO"),
"fail": Test_Assembly(self.select_step_img("fail"), u"CICLO INTERROTTO - RIMUOVERE IL SENSORE"),
"emergency": Test_Assembly(self.select_step_img("reset_emergency"), u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\""),
"fail": Test_Assembly(self.select_step_img("fail"), u"CICLO INTERROTTO - RIMUOVERE IL SENSORE"),
"select_recipe": Test_Assembly(None, u"SELEZIONARE IL CODICE DA COLLAUDARE", Recipe_Selection()),
"vision": Test_Assembly(None, u"ESEGUIRE PROCEDURA DI AUTOTEST", Test_Vision(self.components, None)),
"wait": Test_Assembly(self.select_step_img("wait"), u"ATTENDERE - PAUSA INTER CICLO"),
}
self.cycle_loop = ["assembly_1", "done", "wait"]
self.cycle_loop = ["vision", "done", "wait"]
self.cycle_changing_state = False
# SETUP AUTOTEST
self.autotest_request = False
@ -261,6 +264,8 @@ class Test(Widget):
if self.cycle_state == "done":
self.done()
w = self.cycle_states[self.cycle_state]
if hasattr(w, "start"):
w.start()
# UPDATE PIECES DISPLAY
self.pieces_count_l.setText(f"{self.pieces[0]} OK / {self.pieces[1]} NOK / {sum(self.pieces)} TOT")
self.setCentralWidget(w)

View File

@ -0,0 +1 @@
from .test_admin_permission import Test_Admin_Permission

View File

@ -0,0 +1,38 @@
import sys
from lib.db import Users
from PyQt5.QtCore import QTimer
from PyQt5.QtGui import QKeySequence
from PyQt5.QtWidgets import QMessageBox, QShortcut
from ui.widget import Widget
class Test_Admin_Permission(Widget):
txt = u"È necessario il permesso di un amministratore.\nInserire password per continuare"
def __init__(self, info=""):
super().__init__()
QShortcut(QKeySequence("Return"), self).activated.connect(self.continue_b.click)
if len(info):
self.info_l.setText(f"{info}\n{self.txt}")
else:
self.info_l.setText(self.txt)
self.continue_b.clicked.connect(self.verify)
self.cancel_b.clicked.connect(self.cancel)
# TESTING
if "--test" in sys.argv:
self.password_le.setText("123123")
self.test_timer = QTimer()
self.test_timer.setSingleShot(True)
self.test_timer.timeout.connect(self.continue_b.clicked.emit)
self.test_timer.start(500)
# /TESTING
def verify(self):
if Users.get_user("ADMIN").verify(self.password_le.text()):
self.parentWidget().accept()
else:
QMessageBox.critical(None, "Errore password", "la password inserita non e corretta")
def cancel(self):
self.parentWidget().reject()

View File

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Admin permission</class>
<widget class="QWidget" name="Admin permission">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>422</width>
<height>139</height>
</rect>
</property>
<property name="windowTitle">
<string>Admin permission</string>
</property>
<layout class="QGridLayout" name="gridLayout_2">
<item row="6" column="1">
<widget class="QPushButton" name="cancel_b">
<property name="text">
<string>Annulla</string>
</property>
</widget>
</item>
<item row="6" column="0">
<widget class="QPushButton" name="continue_b">
<property name="text">
<string>Continua</string>
</property>
</widget>
</item>
<item row="4" column="0" colspan="2">
<widget class="QLineEdit" name="password_le">
<property name="echoMode">
<enum>QLineEdit::Password</enum>
</property>
<property name="placeholderText">
<string>password</string>
</property>
</widget>
</item>
<item row="3" column="0" colspan="2">
<widget class="QLabel" name="info_l">
<property name="text">
<string>-</string>
</property>
</widget>
</item>
</layout>
</widget>
<tabstops>
<tabstop>password_le</tabstop>
<tabstop>continue_b</tabstop>
<tabstop>cancel_b</tabstop>
</tabstops>
<resources/>
<connections/>
</ui>

View File

@ -1 +1 @@
from .test_assembly import *
from .test_assembly import Test_Assembly

View File

@ -33,7 +33,7 @@ class Test_Assembly(Widget):
if widget is not None:
replace_widget(self, "widget", widget)
# widget attributes passtrough passtrough
for attr in ["ok", "ko"]:
for attr in ["ok", "ko", "start"]:
if hasattr(self.widget, attr):
setattr(self, attr, getattr(self.widget, attr))
else:

View File

@ -1 +1 @@
from .test_autotest import *
from .test_autotest import Test_Autotest

View File

@ -0,0 +1 @@
from .test_vision import Test_Vision

View File

@ -0,0 +1,196 @@
import sys
from lib.helpers import timing
from PyQt5.QtCore import Qt, QTimer, pyqtSignal
from PyQt5.QtGui import QColor, QImage, QPalette, QPixmap
from ui import Dialog
from ui.test_admin_permission import Test_Admin_Permission
from ui.widget import Widget
class Test_Vision(Widget):
ok = pyqtSignal(dict)
request_frame = pyqtSignal()
def __init__(self, components, recipe):
super().__init__()
self.components = components
self.recipe = recipe
# setup vision variables
self.vision = {}
self.vision_ok_counter = 0
if "--sim-camera" not in sys.argv:
self.vision_ok_counter_limit = 2
else:
self.vision_ok_counter_limit = 1
# setup vision controls
self.ok_timer = QTimer()
self.ok_timer.setSingleShot(True)
self.ok_timer.setInterval(2000)
self.ok_timer.timeout.connect(self.emit_ok)
# setup save frame button
self.last_vision = None
self.save_frame_b.setEnabled(False)
self.save_frame_b.clicked.connect(self.save_last_vision)
# setup vision override
self.admin_challenged = False
self.vision_overridden = False
self.override_b.clicked.connect(self.override_vision)
self.override_b.setEnabled(True)
# setup vision staus gui
self.status_imgs_full = {
True: QPixmap("src/ui/imgs/success.png"),
"": QPixmap("src/ui/imgs/neo.ico"),
"warning": QPixmap("src/ui/imgs/warning.png"),
False: QPixmap("src/ui/imgs/fail.png"),
None: QPixmap("src/ui/imgs/wait.png"),
}
self.status_imgs_small = {k: i.scaled(32, 32, Qt.KeepAspectRatio, Qt.SmoothTransformation) for k, i in self.status_imgs_full.items()}
self.status_palettes = {
True: QPalette(),
"": QPalette(),
"warning": QPalette(),
False: QPalette(),
None: QPalette(),
}
self.status_palettes[True].setColor(QPalette.Base, Qt.green)
self.status_palettes[False].setColor(QPalette.Base, Qt.red)
self.status_palettes["warning"].setColor(QPalette.Base, QColor(255, 165, 0))
self.status_palettes[""].setColor(QPalette.Base, QColor(255, 255, 0))
self.visualize_vision()
def start(self):
self.visualize_vision()
# TESTING
if "--test" in sys.argv:
self.override_b.click()
# /TESTING
# setup camera-vision loop
self.components["galaxy_camera"].set_period(period=None) # only get frame on request
self.components["galaxy_camera"].add_sources({"test_vision": self.request_frame})
self.request_frame_connection = self.components["vision"].out.connect(self.request_frame) # request new frame as soon as vision finishes
# self.components["vision_renderer"].add_sources({"vision": self.components["vision"].out})
# self.process_vision_connection = self.components["vision_renderer"].out.connect(self.process_vision)
# self.components["vision_renderer"].resume()
self.process_vision_connection = self.components["vision"].out.connect(self.process_vision)
self.components["vision"].resume()
self.components["galaxy_camera"].resume()
# start test
self.start_time = timing()
self.request_frame.emit() # request first frame
def stop(self):
# disable camera-vision loop
self.components["galaxy_camera"].pause()
self.components["vision"].pause()
# self.components["vision_renderer"].pause()
self.disconnect(self.process_vision_connection)
# self.components["vision_renderer"].remove_sources(["vision", ])
self.disconnect(self.request_frame_connection)
self.components["galaxy_camera"].remove_sources(["vision", ])
def process_vision(self, data=None, override=False):
if self.ok_timer.isActive():
# avoid proccessing if vision was completed
return
if data is None:
data = [{"vision": {}}]
data = data[-1]
time = data.get("time", None)
data = data["vision"]
frame = data.get("frame", None)
vision = data.get("vision", None)
rendered = data.get("rendered", None)
self.last_vision = {
"time": time,
"frame": frame,
"vision": vision,
"rendered": rendered,
}
if not override:
result_ok = data.get("vision", {}).get("ok", False)
if result_ok is True:
self.vision_ok_counter += 1
else:
self.vision_ok_counter = 0
else:
self.vision_ok_counter = self.vision_ok_counter_limit
# check if completed
if self.vision_ok_counter >= self.vision_ok_counter_limit:
self.stop()
self.ok_timer.start()
self.visualize_vision(
time=time,
frame=frame,
vision=vision,
rendered=rendered,
overridden=override,
)
def visualize_vision(self, time=None, frame=None, vision=None, rendered=None, overridden=False):
self.save_frame_b.setEnabled(self.last_vision is not None)
if overridden:
self.state_l.setPixmap(self.status_imgs_small["warning"])
elif vision is None or vision.get("results", None) is None:
self.state_l.setPixmap(self.status_imgs_small[None])
elif vision.get("ok", False) is True:
self.state_l.setPixmap(self.status_imgs_small[True])
else:
self.state_l.setPixmap(self.status_imgs_small[False])
self.ok_counter_pb.setMaximum(self.vision_ok_counter_limit)
self.ok_counter_pb.setValue(min(self.vision_ok_counter, self.vision_ok_counter_limit))
if self.vision_ok_counter >= self.vision_ok_counter_limit:
self.ok_counter_pb.setPalette(self.status_palettes[True])
else:
self.ok_counter_pb.setPalette(self.status_palettes[False])
if overridden:
self.img = self.status_imgs_full["warning"]
elif rendered is not None:
self.img = rendered
elif frame is not None:
self.img = QPixmap.fromImage(QImage(
frame.data,
frame.shape[1], # width
frame.shape[0], # height
frame.shape[2] * frame.shape[1], # width * channels
QImage.Format_RGB888
))
else:
self.img = self.status_imgs_full[None]
self.resizeEvent()
def save_last_vision(self):
if self.last_vision is None:
return
def resizeEvent(self, event=None):
self.frame_l.setPixmap(self.img.scaled(self.frame_l.width(), self.frame_l.height(), Qt.KeepAspectRatio, Qt.SmoothTransformation))
def challenge_admin(self, info):
if not self.admin_challenged:
d = Dialog()
d.setCentralWidget(Test_Admin_Permission(info))
d.setModal(True)
d.show()
r = d.exec()
if r == d.Accepted:
self.admin_challenged = True
elif r == d.Rejected:
self.admin_challenged ^= False
else:
raise AssertionError("Bad admin challenge dialog return code")
return self.admin_challenged
def override_vision(self):
if self.challenge_admin("Si sta tentando di bypassare il test di visione"):
self.process_vision(override=True)
def emit_ok(self):
self.ok.emit({
"timestamp": self.frame[0],
"frame": self.frame[1],
"barcodes": self.barcodes,
"vision": self.vision,
"overridden": self.vision_overridden,
"vision_duration": timing() - self.start_time,
})

View File

@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Vision</class>
<widget class="QWidget" name="Vision">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>618</width>
<height>835</height>
</rect>
</property>
<property name="windowTitle">
<string>Vision</string>
</property>
<layout class="QGridLayout" name="gridLayout">
<item row="2" column="0">
<widget class="QPushButton" name="override_b">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Maximum">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="text">
<string>FORZA ACCETTAZIONE</string>
</property>
</widget>
</item>
<item row="1" column="0" colspan="2">
<widget class="QGroupBox" name="groupBox">
<property name="title">
<string>Risultato</string>
</property>
<layout class="QFormLayout" name="formLayout">
<item row="0" column="0">
<widget class="QProgressBar" name="ok_counter_pb">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="value">
<number>50</number>
</property>
<property name="format">
<string>%v / %m</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QLabel" name="state_l">
<property name="sizePolicy">
<sizepolicy hsizetype="Maximum" vsizetype="Maximum">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>32</width>
<height>32</height>
</size>
</property>
<property name="text">
<string>-</string>
</property>
</widget>
</item>
</layout>
</widget>
</item>
<item row="0" column="0" colspan="2">
<widget class="QLabel" name="frame_l">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>600</width>
<height>700</height>
</size>
</property>
<property name="text">
<string>-</string>
</property>
</widget>
</item>
<item row="2" column="1">
<widget class="QPushButton" name="save_frame_b">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Maximum">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="text">
<string>SALVA IMMAGINE</string>
</property>
</widget>
</item>
</layout>
</widget>
<resources/>
<connections/>
</ui>