vision wip

This commit is contained in:
matteo porta 2022-06-28 12:31:27 +02:00
parent f25e0c2db6
commit 37ca9d9d38
21 changed files with 713 additions and 1026 deletions

View File

@ -1,75 +1,70 @@
item {
id: 1
name: 'red_big'
color: 'rgb(255,0,0)'
name: 'flange'
color: '0x00ff00'
}
item {
id: 2
name: 'black_big'
color: 'rgb(50, 50, 50)'
name: 'empty'
color: '0xff0000'
}
item {
id: 3
name: 'blue_big'
color: 'rgb(0, 0, 255)'
name: 'd-blue'
color: '0x0000ff'
}
item {
id: 4
name: 'white_big'
color: 'rgb(255, 255, 255)'
name: 'd-white'
color: '0xffffff'
}
item {
id: 5
name: 'green_big'
color: 'rgb(0, 255, 0)'
name: 'd-green'
color: '0x00ff00'
}
item {
id: 6
name: 'yellow_big'
color: 'rgb(255, 255, 0)'
name: 'd-red'
color: '0xff0000'
}
item {
id: 7
name: 'orange_big'
color: 'rgb(255, 145, 0)'
name: 'ej-big'
color: '0x43ddff'
}
item {
id: 8
name: 'brown_big'
color: 'rgb(102, 40, 13)'
name: 'ej-med'
color: '0x4a4eff'
}
item {
id: 9
name: 'red_small'
color: 'rgb(225, 0, 0)'
name: 'ej-small'
color: '0xa8ff32'
}
item {
id: 10
name: 'black_small'
color: 'rgb(70, 70, 70)'
name: 'cap-jg'
color: '0xff1a37'
}
item {
id: 11
name: 'blue_small'
color: 'rgb(0, 0, 225)'
name: 'ej-big-ko'
color: '0xff0000'
}
item {
id: 12
name: 'white_small'
color: 'rgb(225, 225, 225)'
name: 'ej-med-ko'
color: '0xff0000'
}
item {
id: 13
name: 'green_small'
color: 'rgb(0, 225, 0)'
name: 'ej-small-ko'
color: '0xff0000'
}
item {
id: 14
name: 'yellow_small'
color: 'rgb(225, 225, 0)'
}
item {
id: 15
name: 'ko'
color: 'rgb(255, 0, 255)'
name: 'cap-jg-ko'
color: '0xff0000'
}

View File

@ -0,0 +1,34 @@
# LIEBHER RUBBER FLANGE
[general]
name: RICETTA 1
instruction: APPORRE I SEGNI CON IL PENNARELLO COME INDICATO IN FIGURA
# POINTS FORMAT:
# point_name: point_center point_size fill_color border_color border_thickness shape
# EXAMPLE:
# name: X,Y W,H 0xAARRGGBB 0xAARRGGBB T SHAPE CLASS
# ZONES FORMAT:
# region_name: region_center region_margin class
# margin can be a box (XM*2,YM*2) or a radius (R)
# EXAMPLES:
# name: X,Y XM,YM T SHAPE CLASS
# name: X,Y R T SHAPE CLASS
# LABELS FORMAT:
# label_name: label_start_location font_size fill_color border_color border_thickness text
# EXAMPLE:
# name: X,Y S 0xAARRGGBB 0xAARRGGBB T TEXT
[markers]
cross: 1100,1100 100,100 0x000000ff 0xff0000ff 25 cross
center: 1100,1100 2050,2050 0x0000ff00 0xff0000ff 50 ellipse
[zones]
p1: 610,550 200 d-white # TOP LEFT WHITE POINT
p2: 1790,1290 200 d-white # RIGHT SIDE WHITE POINT
p3: 350,1190 200 d-blue # LEFT SIDE BLUE POINT
[labels]
p1: 510,825 100 0xffffffff 0xff000000 10 BIANCO
p2: 1690,1565 100 0xffffffff 0xff000000 10 BIANCO
p3: 250,1465 100 0xff0000ff 0xff000000 10 BLU

View File

@ -1,34 +0,0 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "ok",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "red_big",
"w1_2": "black_big",
"w1_3": "white_big",
"w1_4": "green_big",
"w1_5": "yellow_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -0,0 +1,34 @@
# LIEBHER RUBBER FLANGE
[general]
name: RICETTA 2
instruction: APPORRE I SEGNI CON IL PENNARELLO COME INDICATO IN FIGURA
# POINTS FORMAT:
# point_name: point_center point_size fill_color border_color border_thickness shape
# EXAMPLE:
# name: X,Y W,H 0xAARRGGBB 0xAARRGGBB T SHAPE CLASS
# ZONES FORMAT:
# region_name: region_center region_margin class
# margin can be a box (XM*2,YM*2) or a radius (R)
# EXAMPLES:
# name: X,Y XM,YM T SHAPE CLASS
# name: X,Y R T SHAPE CLASS
# LABELS FORMAT:
# label_name: label_start_location font_size fill_color border_color border_thickness text
# EXAMPLE:
# name: X,Y S 0xAARRGGBB 0xAARRGGBB T TEXT
[markers]
cross: 1100,1100 100,100 0x000000ff 0xff0000ff 25 cross
center: 1100,1100 2050,2050 0x0000ff00 0xff0000ff 50 ellipse
[zones]
p1: 610,550 200 d-white # TOP LEFT WHITE POINT
p2: 1790,1290 200 d-white # RIGHT SIDE WHITE POINT
p3: 350,1190 200 d-blue # LEFT SIDE BLUE POINT
[labels]
p1: 510,825 100 0xffffffff 0xff000000 10 BIANCO
p2: 1690,1565 100 0xffffffff 0xff000000 10 BIANCO
p3: 250,1465 100 0xff0000ff 0xff000000 10 BLU

View File

@ -1,34 +0,0 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "red_big",
"w1_2": "black_big",
"w1_3": "no_detection",
"w1_4": "blue_big",
"w1_5": "white_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -1,34 +0,0 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "red_big",
"w1_2": "black_big",
"w1_3": "no_detection",
"w1_4": "green_big",
"w1_5": "yellow_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -1,34 +0,0 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "blue_big",
"w1_2": "white_big",
"w1_3": "no_detection",
"w1_4": "black_big",
"w1_5": "red_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -1,34 +0,0 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ko",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ko",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ko",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "black_big",
"w1_2": "red_big",
"w1_3": "no_detection",
"w1_4": "white_big",
"w1_5": "blue_big",
"w2_1": "orange_big",
"w2_2": "brown_big",
"w2_3": "yellow_small",
"w2_4": "blue_small",
"w2_5": "white_small",
"w2_6": "green_small",
"w2_7": "red_small",
"w2_8": "black_small"
}
}

View File

@ -1,34 +0,0 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "red_big",
"w1_2": "black_big",
"w1_3": "no_detection",
"w1_4": "blue_big",
"w1_5": "white_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -13,13 +13,15 @@ source "./venv/bin/activate" || source "./venv/Scripts/activate" || :
"${python}" -m pip install --upgrade pip
"${python}" -m pip install --upgrade -r "src/requirements.txt"
# echo "---------- get updated label-map-util ----------"
# wget "https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/utils/label_map_util.py" -O "./src/lib/helpers/object_detection/utils/label_map_util.py"
# sed -Ei "s/^(\s*from )(object_detection.protos import .*)$/\1lib.helpers.\2/" "./src/lib/helpers/object_detection/utils/label_map_util.py"
# sudo apt-get install -y protobuf-compiler
# wget "https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/protos/string_int_label_map.proto" -O "./src/lib/helpers/object_detection/protos/string_int_label_map.proto"
# insert="\n\1\/\/ Label color for rendering.\n\1optional string color = 9;"
# sed -Ei "s/^(\s*)(optional string display_name.*)$/\1\2\n${insert}\n/" "./src/lib/helpers/object_detection/protos/string_int_label_map.proto"
# protoc "./src/lib/helpers/object_detection/protos/"*.proto --python_out="."
# wget "https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/utils/label_map_util.py" -O "./src/lib/helpers/object_detection/utils/label_map_util.py"
# sed -Ei "s/^(\s*from )(object_detection.protos import .*)$/\1lib.helpers.\2/" "./src/lib/helpers/object_detection/utils/label_map_util.py"
# insert='\1if item.HasField("color"):\n\1 category["color"] = item.color\n\1else:\n\1 category["color"] = ""'
# sed -Ei "s/^(\s*)(categories\.append\(category\))$/${insert}\n\1\2/" "./src/lib/helpers/object_detection/utils/label_map_util.py"
# echo "---------- install libedgetpu ----------"
# # sudo apt-get install -y apt-transport-https curl gnupg
# # curl -fsSL https://bazel.build/bazel-release.pub.gpg | gpg --dearmor >bazel-archive-keyring.gpg

View File

@ -158,8 +158,8 @@ class Component(QObject):
signals might contain one optional argument that will be passed as data to _get
or None to disconnect all sources
"""
if sources is not None and not len(sources):
sources = None
if sources is None:
sources = {}
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
self._set_sources.emit(sources)
@ -189,7 +189,7 @@ class Component(QObject):
if self_sources[n] is not sources[n]
}
if len(conflicting_sources):
raise AssertionError("\n\t" + "\n\t".join([f"source named {n!r}: {s[0]!r} will not be replaced with {s[1]!r}" for n, s in conflicting_sources]))
raise AssertionError("\n\t" + "\n\t".join([f"source named {n!r}: {s[0]!r} will not be replaced with {s[1]!r}" for n, s in conflicting_sources.items()]))
self.set_sources({**self_sources, **sources})
def remove_sources(self, sources=None):
@ -200,7 +200,7 @@ class Component(QObject):
or None if no sources are to be removed
this method calls set_sources, this is semplest but not the most efficient approach
"""
if sources is None:
if sources is None or self.sources is None:
return
sources = set(sources)
self.set_sources({n: s for n, s in self.sources.items() if n not in sources})
@ -282,6 +282,8 @@ class Component(QObject):
def _do_set_sources(self, sources):
if self._running:
self._disconnect_sources()
if sources is not None and not len(sources):
sources = None
self.sources = sources
if self._running:
self._connect_sources()

View File

@ -1,550 +0,0 @@
import copy
import gc
import os
import sys
import traceback
from configparser import ConfigParser
import lib.helpers.label_map_util as label_map_util
import numpy
import numpy as np
import tensorflow as tf
from pycoral.adapters import detect
if "--no-edgetpu" not in sys.argv:
from pycoral.utils.edgetpu import make_interpreter
else:
def make_interpreter(*args, **kwargs):
raise ValueError("\"--no-edgetpu\" in sys.argv")
from lib.helpers.log import log_msg
from PyQt5.QtCore import (QFileSystemWatcher, QMutex, QObject, QPointF, QRectF,
Qt, pyqtSignal)
from PyQt5.QtGui import QBrush, QColor, QPainter, QPen
from tflite_runtime.interpreter import Interpreter
from ui.test.test import CycleState
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# Patch the location of gfile
tf.gfile = tf.io.gfile
IMG_SCALE = 0.75 # ORIGINAL IMAGE FRAME TO QT WINDOW RATIO
class LibVision(QObject):
status_signal = pyqtSignal(dict)
loading_model_signal = pyqtSignal(dict)
def __init__(self, main_window):
super().__init__()
self.arrow = None
self.last_arrow = None
self.main_window = main_window
self.config = main_window.config
self.watcher = main_window.watcher
self.img_scale = IMG_SCALE
self.pixel_size = self.config["CAMERA"]["pixel size"]
self.image_width_qt = int(self.config["CAMERA"]["horizontal crop resolution"] * IMG_SCALE)
self.image_height_qt = int(self.config["CAMERA"]["vertical crop resolution"] * IMG_SCALE)
self.meas_tolerance = self.config["VISION"]["measurement tolerance"]
self.align_tolerance = self.config["VISION"]["alignment tolerance"]
self.autotest_tolerance = self.config["VISION"]["autotest tolerance"]
self.autotest_positions = self.config["VISION"]["autotest positions"]
self.detection_crop_qt = (0, 0, self.image_width_qt, self.image_height_qt)
self.img_center_x_qt = int(self.image_width_qt / 2)
self.img_center_y_qt = int(self.image_height_qt / 2)
self.img_q1_y_qt = 100
self.qtcolor = {"orange": QColor("#FF8400")}
# OBJECT DETECTION
self.detection_threshold = self.config["VISION"]["detection threshold"]
self.image_width_inf = 1024
self.image_height_inf = 128
self.measure_ok = False
self.edge_detected = False
self.edge_detection = None
self.edge_fitting_distance = None
self.edge_fitting_distance_real = None
self.detected_cal_positions = None
self.cal_detected = None
# recipe
self.zones = None
self.labels = None
self.recipes_dir = "config/vision_test_recipes"
self.recipe_watcher = QFileSystemWatcher([])
self.set_recipe(None)
self.recipe_watcher.fileChanged.connect(self.set_recipe)
self.NEURAL_NETWORK_MODEL = self.config["VISION"]["neural network"]
# MODEL
self.model = None
self.model_lock = QMutex()
self.tflite_mode = False
self.edgeTPU_mode = False
if "--tflite" in sys.argv:
self.tflite_mode = True
self.edgeTPU_mode = True
self.load_model(self.NEURAL_NETWORK_MODEL)
# CATEGORY INDEX
# label_map = label_map_util.load_labelmap("config/vision_test_labels/labels-onlydots.pbtxt")
label_map = label_map_util.load_labelmap("config/vision_test_labels/labels.pbtxt")
self.num_classes = len(label_map.item)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.config.category_index = label_map_util.create_category_index(categories)
self.config.classes_map = {c["name"]: k for k, c in self.config.category_index.items()}
def mm_to_qt(self, val):
return int(val * self.img_scale / self.pixel_size)
def set_recipe(self, recipe_path=None):
log_msg(f"LOADING RECIPE {recipe_path!r}")
watched = self.recipe_watcher.files()
if recipe_path == "" and len(watched) == 0: # skip bad watcher signals
return
if len(watched) > 0:
self.recipe_watcher.removePaths(watched)
self.recipe_path = recipe_path
if self.recipe_path is None:
self.zones = None
self.labels = None
else:
try:
if not os.path.isfile(self.recipe_path):
raise AssertionError(f"Recipe file {self.recipe_path!r} could not be found.")
config = ConfigParser(inline_comment_prefixes="#")
read = config.read(self.recipe_path)
if len(read) != 1 or self.recipe_path not in read:
raise AssertionError(f"Recipe file {self.recipe_path!r} could not be read.")
recipe = config._sections.get("camera_shapes", None)
if recipe is None:
raise AssertionError(
f"Recipe file {self.recipe_path!r} does not contain the 'camera_shapes' section.")
self.points = self.parse_points(recipe)
zones = config._sections.get("camera_zones", None)
if zones is None:
raise AssertionError(
f"Recipe file {self.recipe_path!r} does not contain the 'camera_zones' section.")
self.zones = self.parse_zones(zones)
labels = config._sections.get("labels", None)
if labels is not None:
self.labels = self.parse_labels(labels)
else:
self.labels = None
self.recipe_watcher.addPath(self.recipe_path)
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
self.zones = None
if self.zones is None and self.labels is None:
self.status_signal.emit({"vision_recipe": 0})
else:
self.status_signal.emit({"vision_recipe": os.path.splitext(os.path.basename(recipe_path))[0]})
@staticmethod
def parse_points(config):
points = {}
for point_name, point_spec in config.items():
try:
center, size, fill_color, border_color, border_thickness, shape = point_spec.split(" ")
center = list(map(float, center.split(",")))
center[1] = -center[1]
size = list(map(float, size.split(",")))
if len(size) == 1:
size = [size[0], size[0]]
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
shape = shape.lower()
points[point_name] = {
"center": center,
"size": size,
"fill_color": fill_color,
"border_color": border_color,
"border_thickness": border_thickness,
"shape": shape,
}
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
log_msg(f"point {point_name!r} could not be parsed. spec: {point_spec!r}", file=sys.stderr)
return points
@staticmethod
def draw_points(points, qimage, pixel_mm, mm_offset=None, painter=None):
if mm_offset is None:
mm_offset = [0, 0]
if painter is None:
painter = QPainter()
painter.begin(qimage)
avg_pixel_mm = (sum(pixel_mm) / len(pixel_mm))
for point_name, point in points.items():
try:
x = (point["center"][0] + mm_offset[0]) / pixel_mm[0] + qimage.width() / 2
y = -(point["center"][1] + mm_offset[1]) / pixel_mm[1] + qimage.height() / 2
x2 = x + (point["size"][0] + mm_offset[0]) / pixel_mm[0]
y2 = y - (point["size"][1] + mm_offset[1]) / pixel_mm[1]
w = point["size"][0] / pixel_mm[0]
h = point["size"][1] / pixel_mm[1]
painter.setBrush(QBrush(point["fill_color"], Qt.SolidPattern))
painter.setPen(
QPen(point["border_color"], point["border_thickness"] / avg_pixel_mm, Qt.SolidLine, Qt.SquareCap,
Qt.MiterJoin))
if point["shape"] == "ellipse":
painter.drawEllipse(QPointF(x, y), w / 2, h / 2)
elif point["shape"] == "cross":
painter.drawLine(x, y - h / 2, x, y + h / 2)
painter.drawLine(x - w / 2, y, x + w / 2, y)
elif point["shape"] == "line":
painter.drawLine(x, y, x2, y2)
else:
raise NotImplementedError(f"point {point_name!r} has an invalid shape: {point['shape']!r}")
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
log_msg(f"point {point_name!r} could not be drawn.", file=sys.stderr)
def parse_zones(self, config):
zones = {}
for zone_name, zone_spec in config.items():
zone_name = zone_name.upper()
try:
center, margin, d_class = zone_spec.split(" ")
center = list(map(float, center.split(",")))
center[1] = -center[1]
if margin == "none":
margin = None
elif "," in margin:
margin = list(map(float, margin.split(",")))
else:
margin = float(margin)
zones[zone_name] = {
"center": center,
"margin": margin,
"class": self.config.category_index[self.config.classes_map[d_class]],
}
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
log_msg(f"Region {zone_name!r} could not be parsed. spec: {zone_spec!r}", file=sys.stderr)
zones["fitting"] = {
"center": [0, 0],
"margin": [140, 140],
"class": self.config.category_index[self.config.classes_map["fitting"]],
}
return zones
def parse_labels(self, config):
labels = {}
for label_name, label_spec in config.items():
try:
location, font_size, fill_color, border_color, border_thickness, text = label_spec.split(" ", 5)
location = list(map(float, location.split(",")))
location[1] = -location[1]
font_size = float(font_size)
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
text = text.replace("\\n", "\n").replace("\\t", "\t")
labels[label_name] = {
"location": location,
"font_size": font_size,
"fill_color": fill_color,
"border_color": border_color,
"border_thickness": border_thickness,
"text": text,
}
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
log_msg(f"Label {label_name!r} could not be parsed. spec: {label_spec!r}", file=sys.stderr)
return labels
# def zone_center(self, zone):
# return (int((zone["xmax"] + zone["xmin"]) / 2), int((zone["ymax"] + zone["ymin"]) / 2))
def get_center(self, rect):
return [(rect[0] + rect[2]) / 2, (rect[1] + rect[3]) / 2]
def get_distance(self, p1, p2):
return pow((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2, 1 / 2)
def load_lodel_tflite(self, model=None):
# Creates tflite interpreter
try:
self.interpreter = make_interpreter(f"neural_networks/{model}/{model}_edgetpu.tflite")
self.interpreter.allocate_tensors()
self.interpreter.invoke() # warmup
self.tflite_input_details = self.interpreter.get_input_details()
self.tflite_output_details = self.interpreter.get_output_details()
self.inf_width = self.tflite_input_details[0]['shape'][2]
self.inf_height = self.tflite_input_details[0]['shape'][1]
log_msg("edge TPU model initialized")
except ValueError:
# edge TPU not found
log_msg("edge TPU not found")
self.edgeTPU_mode = False
self.interpreter = Interpreter(f"neural_networks/{model}/{model}.tflite")
self.interpreter.allocate_tensors()
self.interpreter.invoke() # warmup
self.tflite_input_details = self.interpreter.get_input_details()
self.tflite_output_details = self.interpreter.get_output_details()
self.inf_width = self.tflite_input_details[0]['shape'][2]
self.inf_height = self.tflite_input_details[0]['shape'][1]
log_msg("TFlite model on CPU initialized")
def run_inference_tflite(self, image, threshold=0.3):
self.interpreter.set_tensor(self.tflite_input_details[0]['index'], image)
self.interpreter.invoke()
objs = detect.get_objects(self.interpreter, threshold, (1, 1))
boxes = [(obj.bbox.ymin / self.inf_height, obj.bbox.xmin / self.inf_width, obj.bbox.ymax / self.inf_height, obj.bbox.xmax / self.inf_width) for obj in objs]
classes = [obj.id + 1 for obj in objs]
scores = [obj.score for obj in objs]
return {"detection_boxes": boxes, "detection_classes": classes, "detection_scores": scores,
"num_detections": 10}
def load_model(self, model=None):
log_msg("NEURAL NETWORK MODEL CHOSEN: {}".format(model))
if model.lower() in [
"",
"any",
"last",
"latest",
"newest",
"none",
None,
]:
model = \
sorted([d for d in os.listdir("neural_networks") if os.path.isdir(os.path.join("neural_networks", d))],
reverse=True)[0]
self.NEURAL_NETWORK_MODEL = model
self.loading_model_signal.emit({"status": "loading"})
self.model_lock.lock()
log_msg("LOADING NEURAL NETWORK MODEL: {}".format(model))
try:
if self.tflite_mode:
self.load_lodel_tflite(self.NEURAL_NETWORK_MODEL)
else:
model = tf.saved_model.load(f"neural_networks/{model}")
except Exception as e:
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "aborted"})
raise e
if self.model is not None:
self.model = None
tf.keras.backend.clear_session()
gc.collect()
self.model = model
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "done"})
def check_features(self, image_np):
self.watcher.profile_start()
image = np.asarray(image_np)
self.watcher.profile_measure("INF_NP")
input_tensor = np.expand_dims(image, axis=0)
self.watcher.profile_measure("INF_SHAPE")
# Run inference
self.model_lock.lock()
if self.tflite_mode:
detections = self.run_inference_tflite(input_tensor, threshold=self.detection_threshold)
else:
detections = self.model(input_tensor)
num_detections = int(detections.pop("num_detections"))
detections = {key: value[0, :num_detections].numpy() for key, value in detections.items()}
detections["num_detections"] = num_detections
self.model_lock.unlock()
self.watcher.profile_measure("INF_INF")
parsed_detections = []
for d_class, d_score, d_box in zip(detections["detection_classes"], detections["detection_scores"],
detections["detection_boxes"]):
if d_score >= self.detection_threshold:
ymin, xmin, ymax, xmax = d_box # d_box.tolist()
box = [
xmin,
ymin,
xmax,
ymax,
]
center = self.get_center(box)
detection = {
"class": self.config.category_index[int(d_class)]["name"],
"color": self.config.category_index[int(d_class)]["color_qt"],
"score": float(d_score),
"box": box,
"center": center,
"pos_rel_mm": self.get_pos_rel_mm(center),
}
parsed_detections.append(detection)
self.watcher.profile_measure("INF_POST")
return parsed_detections
def get_pos_rel_mm(self, center):
x_rel_px = (center[0] - 0.5) * self.config["CAMERA"]["horizontal crop resolution"]
return x_rel_px * self.pixel_size
def process_detections(self, detections):
self.last_arrow = copy.deepcopy(self.arrow)
for detection in detections:
self.edge_detected = None
self.measure_ok = False
if detection["class"] == "edge-left":
self.edge_detected = True
self.edge_detection = detection
self.edge_fitting_distance = self.main_window.cam_offset_mm - self.main_window.drawing.after_insertion_offset - (self.main_window.drawing.fitting_offset + self.main_window.cutting_offset_mm) + detection["pos_rel_mm"]
self.edge_fitting_distance_real = self.main_window.cam_offset_mm - (self.main_window.test_fitting_offset + self.main_window.cutting_offset_mm) + detection["pos_rel_mm"]
# DECIDE ARROW DIRECTION (NORMAL IN FIRST TEST, INVERTED IN SECOND TEST)
if (self.edge_fitting_distance_real > self.main_window.drawing.dipstick_offset) != (self.main_window.test_widget.current_state == CycleState.VISION_2_VERIFY):
self.arrow = "left"
else:
self.arrow = "right"
if abs(self.edge_fitting_distance_real - self.main_window.drawing.dipstick_offset) < self.main_window.test_tolerance:
self.measure_ok = True
self.arrow = "ok"
return
else:
self.measure_ok = False
return
break
# NO EDGE DETECTED
self.arrow = "none"
self.edge_fitting_distance = None
self.edge_fitting_distance_real = None
self.edge_detection = None
def process_detections_calibration(self, detections):
self.detected_cal_positions = [{"ok": False, "pos": None} for x in enumerate(self.autotest_positions)]
cal_positions = []
for detection in detections:
self.edge_detected = False
self.measure_ok = False
if detection["class"] == "cal":
cal_positions.append({"pos": detection["pos_rel_mm"], "detection": detection, "ok": False})
for n, pos in enumerate(self.autotest_positions):
if len(cal_positions) == 0:
break
nearest_idx = min(range(len(cal_positions)), key=lambda i: abs(cal_positions[i]["pos"] - pos))
if abs(cal_positions[nearest_idx]["pos"] - pos) < 5:
self.detected_cal_positions[n] = cal_positions[nearest_idx]
del(cal_positions[nearest_idx])
self.edge_detected = True
self.measure_ok = True
self.arrow = "ok"
for i, detected, expected in zip(range(len(self.detected_cal_positions)), self.detected_cal_positions, self.autotest_positions):
if detected["pos"] is not None and abs(detected["pos"] - expected) < self.autotest_tolerance:
self.detected_cal_positions[i]["ok"] = True
else:
self.arrow = "ko"
self.measure_ok = False
if detected["pos"] is None:
self.edge_detected = False
def visualize_calibration(self, image):
painter = QPainter()
painter.begin(image)
if len(self.detected_cal_positions) > 0:
for detection in self.detected_cal_positions:
if detection["pos"] is not None:
center = detection["detection"]["center"]
det_x_qt = int(center[0] * self.image_width_qt)
det_y_qt = int(center[1] * self.image_height_qt)
# DRAW VERTICAL EDGE POSITION MARK
if detection["ok"]:
color = Qt.green
else:
color = Qt.red
painter.setPen(QPen(color, 4, Qt.DashLine))
refline = det_x_qt, det_y_qt + 40, det_x_qt, det_y_qt - 40
painter.drawLine(*refline)
# DRAW COLORED RECTANGLES IDENTIFYING DETECTIONS
def visualize_detections(self, image, detections):
painter = QPainter()
painter.begin(image)
# DRAW DETECTIONS
detections_thickness_px = 4
if detections is not None:
painter.setOpacity(1)
for detection in detections:
result_color = QColor(*detection["color"], 150)
# center = detection["center"]
xmin, ymin, xmax, ymax = detection["box"]
x = xmin * self.image_width_qt - detections_thickness_px
y = ymin * self.image_height_qt - detections_thickness_px
w = xmax * self.image_width_qt - x + detections_thickness_px
h = ymax * self.image_height_qt - y + detections_thickness_px
painter.setPen(QPen(result_color, detections_thickness_px, Qt.SolidLine, Qt.SquareCap, Qt.MiterJoin))
painter.setBrush(QBrush())
painter.drawRect(QRectF(x, y, w, h))
def visualize_ref(self, image):
painter = QPainter()
painter.begin(image)
ref_x_qt = self.main_window.ref_x_qt
# DRAW H/V CAMERA CENTER AXIS
if self.main_window.display_camera_center:
painter.setPen(QPen(Qt.yellow, 2, Qt.DashLine))
refline = self.img_center_x_qt, 0, self.img_center_x_qt, self.image_height_qt
painter.drawLine(*refline)
refline = 0, self.img_center_y_qt, self.image_width_qt, self.img_center_y_qt
painter.drawLine(*refline)
tolerance_px = self.mm_to_qt(self.main_window.test_tolerance)
if self.main_window.test_state in (CycleState.VISION_2_VERIFY, CycleState.VISION_1_ALIGNMENT):
# DRAW VERTICAL REFERENCE POSITION AXIS
painter.setPen(QPen(Qt.cyan, 4, Qt.DashLine))
refline = ref_x_qt, 0, ref_x_qt, self.image_height_qt
painter.drawLine(*refline)
# DRAW TOLERANCE LIMITS AXIS
painter.setPen(QPen(Qt.cyan, 2, Qt.DashDotLine))
refline = ref_x_qt + tolerance_px, 0, ref_x_qt + tolerance_px, self.image_height_qt
painter.drawLine(*refline)
refline = ref_x_qt - tolerance_px, 0, ref_x_qt - tolerance_px, self.image_height_qt
painter.drawLine(*refline)
# DRAW ORIGIN(out of screen) TO REFERENCE ARROW
painter.setPen(QPen(Qt.blue, 4, Qt.SolidLine))
painter.drawLine(0, self.img_q1_y_qt, ref_x_qt, self.img_q1_y_qt)
painter.drawLine(ref_x_qt - 20, self.img_q1_y_qt - 20, ref_x_qt, self.img_q1_y_qt)
painter.drawLine(ref_x_qt - 20, self.img_q1_y_qt + 20, ref_x_qt, self.img_q1_y_qt)
if self.main_window.test_state == CycleState.AUTOTEST:
# DRAW AUTOTEST MARKS
for mark in self.main_window.config["VISION"]["autotest positions"]:
mark_x_qt = self.img_center_x_qt + self.mm_to_qt(mark)
# DRAW MARK POSITION AXIS
painter.setPen(QPen(self.qtcolor["orange"], 4, Qt.DashLine))
refline = mark_x_qt, 0, mark_x_qt, self.image_height_qt
painter.drawLine(*refline)
# DRAW TOLERANCE LIMITS AXIS
painter.setPen(QPen(self.qtcolor["orange"], 2, Qt.DashDotLine))
refline = mark_x_qt + tolerance_px, 0, mark_x_qt + tolerance_px, self.image_height_qt
painter.drawLine(*refline)
refline = mark_x_qt - tolerance_px, 0, mark_x_qt - tolerance_px, self.image_height_qt
painter.drawLine(*refline)
def visualize_edge(self, image):
painter = QPainter()
painter.begin(image)
if self.edge_detection is not None:
center = self.edge_detection["center"]
det_x_qt = int(center[0] * self.image_width_qt)
det_y_qt = int(center[1] * self.image_height_qt)
# DRAW VERTICAL EDGE POSITION MARK
if self.measure_ok:
color = Qt.green
else:
color = Qt.red
painter.setPen(QPen(color, 4, Qt.DashLine))
refline = det_x_qt, det_y_qt + 40, det_x_qt, det_y_qt - 40
painter.drawLine(*refline)
# DRAW ORIGIN(out of screen) TO DETECTION ARROW
painter.setPen(QPen(color, 4, Qt.SolidLine))
painter.drawLine(0, det_y_qt, det_x_qt, det_y_qt)
painter.drawLine(det_x_qt - 20, det_y_qt - 20, det_x_qt, det_y_qt)
painter.drawLine(det_x_qt - 20, det_y_qt + 20, det_x_qt, det_y_qt)

View File

@ -9,9 +9,10 @@ import numpy
import numpy as np
import tensorflow as tf
from lib.helpers.object_detection.utils import label_map_util
from PyQt5.QtCore import (QFileSystemWatcher, QMutex, QThread, QTimer,
pyqtSignal)
from PyQt5.QtGui import QColor
from PyQt5.QtCore import (QFileSystemWatcher, QLineF, QMutex, QPointF, QRectF,
Qt, QThread, QTimer, pyqtSignal)
from PyQt5.QtGui import (QBrush, QColor, QFont, QImage, QPainter, QPainterPath,
QPen, QPixmap)
from .component import Component
from .consumer import Consumer
@ -38,6 +39,8 @@ else:
class Vision(Component):
"""everything is expected the have shape with height (y) first then width (x)"""
status_signal = pyqtSignal(dict)
loading_model_signal = pyqtSignal(dict)
@ -48,14 +51,25 @@ class Vision(Component):
def start(self):
self.model = None
self.consumer = Consumer(work=self.check_features, work_fifo=True, drop_fifo=True, work_maxlen=1, name="vision_consumer", paused=False)
self.consumer_thread = QThread()
self.consumer_thread.setTerminationEnabled(True)
self.consumer.moveToThread(self.consumer_thread)
self.consumer_thread.started.connect(self.consumer.start)
self.consumer_thread.start()
self.consumer.wait_ready()
self.consumer.out.connect(self.process_consumed)
# VISION THREAD
self.vision_consumer = Consumer(work=self.vision_consumer_work, work_fifo=True, drop_fifo=True, work_maxlen=1, name="vision_consumer", paused=False)
self.vision_consumer_thread = QThread()
self.vision_consumer_thread.setTerminationEnabled(True)
self.vision_consumer.moveToThread(self.vision_consumer_thread)
self.vision_consumer_thread.started.connect(self.vision_consumer.start)
self.vision_consumer_thread.start()
self.vision_consumer.wait_ready()
# RENDER THREAD
self.render_consumer = Consumer(work=self.render_consumer_work, work_fifo=True, drop_fifo=True, work_maxlen=1, name="render_consumer", paused=False)
self.render_consumer_thread = QThread()
self.render_consumer_thread.setTerminationEnabled(True)
self.render_consumer.moveToThread(self.render_consumer_thread)
self.render_consumer_thread.started.connect(self.render_consumer.start)
self.render_consumer_thread.start()
self.render_consumer.wait_ready()
# CONNECT CONSUMERS
self.vision_consumer.out.connect(self.process_vision_consumed)
self.render_consumer.out.connect(self.process_render_consumed)
super().start()
def config_changed(self):
@ -88,10 +102,25 @@ class Vision(Component):
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.category_index = label_map_util.create_category_index(categories)
self.classes_map = {c["name"]: k for k, c in self.category_index.items()}
self.zone_detection_filter_mode = self.config[self.name].get("zone_detection_filter_mode", "box_touches")
self.zone_detection_preference_mode = self.config[self.name].get("zone_detection_preference_mode", "distance")
self.set_recipe("1.ini")
def get_center(self, rect):
return [(rect[0] + rect[2]) / 2, (rect[1] + rect[3]) / 2]
def get_size(self, rect):
return [rect[2] - rect[0], rect[3] - rect[1]]
def get_box(self, center, size):
return [center[0] - size[0] / 2, center[1] - size[1] / 2, center[0] + size[0] / 2, center[1] + size[1] / 2]
def get_distance(self, p1, p2):
return pow((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2, 1 / 2)
def clear_recipe(self):
self.recipe = None
self.points = {}
self.markers = {}
self.zones = {}
self.labels = {}
@ -112,9 +141,9 @@ class Vision(Component):
if len(read) != 1 or self.recipe_path not in read:
raise AssertionError("Recipe could not be read.")
os.path.splitext(os.path.basename(read[0]))[0]
self.points = self.parse_points(config.get["shapes"])
self.zones = self.parse_zones(config.get["zones"])
self.labels = self.parse_labels(config.get("labels", None))
self.markers = self.parse_markers(config._sections.get("markers", None))
self.zones = self.parse_zones(config._sections.get("zones", None))
self.labels = self.parse_labels(config._sections.get("labels", None))
self.recipe_watcher.addPath(str(self.recipe_path))
except Exception:
self.log.exception(traceback.format_exc())
@ -122,7 +151,7 @@ class Vision(Component):
self.clear_recipe()
self.status_signal.emit({
"recipe": self.recipe,
"points": self.points,
"markers": self.markers,
"zones": self.zones,
"labels": self.labels,
})
@ -133,35 +162,34 @@ class Vision(Component):
else:
self._set_recipe(self.recipes_dir / str(recipe))
def parse_points(self, config=None):
def parse_markers(self, config=None):
if config is None:
raise AssertionError(f"Recipe file {self.recipe_path!r} does not contain the 'shapes' section.")
raise AssertionError(f"Recipe file {self.recipe_path!r} does not contain the 'markers' section.")
config = {}
points = {}
for point_name, point_spec in config.items():
markers = {}
for marker_name, marker_spec in config.items():
try:
center, size, fill_color, border_color, border_thickness, shape = point_spec.split(" ")
center = list(map(float, center.split(",")))
center[1] = -center[1]
size = list(map(float, size.split(",")))
center, size, fill_color, border_color, border_thickness, shape = marker_spec.split(" ")
center = list(reversed(list(map(float, center.split(",")))))
size = list(reversed(list(map(float, size.split(",")))))
if len(size) == 1:
size = [size[0], size[0]]
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
shape = shape.lower()
points[point_name] = {
"center": center,
"size": size,
"fill_color": fill_color,
markers[marker_name] = {
"border_color": border_color,
"border_thickness": border_thickness,
"center": center,
"fill_color": fill_color,
"shape": shape,
"size": size,
}
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"point {point_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {point_spec!r}")
return points
self.log.exception(f"marker {marker_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {marker_spec!r}")
return markers
def parse_zones(self, config=None):
if config is None:
@ -171,19 +199,25 @@ class Vision(Component):
for zone_name, zone_spec in config.items():
zone_name = zone_name.upper()
try:
center, margin, d_class = zone_spec.split(" ")
center = list(map(float, center.split(",")))
center[1] = -center[1]
if margin == "none":
margin = None
elif "," in margin:
margin = list(map(float, margin.split(",")))
center, size, d_class = zone_spec.split(" ")
center = list(reversed(list(map(float, center.split(",")))))
if "," in size:
size = list(reversed(list(map(float, size.split(",")))))
shape = "rect"
else:
margin = float(margin)
size = [float(size)] * 2
shape = "ellipse"
d_class = self.category_index[self.classes_map[d_class]]
zones[zone_name] = {
"border_color": QColor(d_class["color"].replace("0x", "#")),
"border_thickness": 25,
"box": self.get_box(center, size),
"center": center,
"margin": margin,
"class": self.category_index[self.classes_map[d_class]],
"class": d_class,
"fill_color": QColor("#00000000"),
"pen_line": "DashLine",
"shape": shape,
"size": size,
}
except Exception:
self.log.exception(traceback.format_exc())
@ -197,19 +231,20 @@ class Vision(Component):
for label_name, label_spec in config.items():
try:
location, font_size, fill_color, border_color, border_thickness, text = label_spec.split(" ", 5)
location = list(map(float, location.split(",")))
location[1] = -location[1]
location = list(reversed(list(map(float, location.split(",")))))
font_size = float(font_size)
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
text = text.replace("\\n", "\n").replace("\\t", "\t")
labels[label_name] = {
"location": location,
"font_size": font_size,
"fill_color": fill_color,
"border_color": border_color,
"border_thickness": border_thickness,
"fill_color": fill_color,
"font_size": font_size,
"location": location,
"opacity": 1,
"shape": "text",
"text": text,
}
except Exception:
@ -217,15 +252,6 @@ class Vision(Component):
self.log.exception(f"label {label_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {label_spec!r}")
return labels
def zone_center(self, zone):
return (int((zone["xmax"] + zone["xmin"]) / 2), int((zone["ymax"] + zone["ymin"]) / 2))
def get_center(self, rect):
return [(rect[0] + rect[2]) / 2, (rect[1] + rect[3]) / 2]
def get_distance(self, p1, p2):
return pow((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2, 1 / 2)
def load_model(self, model=None):
# print("VISION CONSUMER", str(int(QThread.currentThreadId())), flush=True)
self.log.info(f"requested neural network: {model!r}")
@ -294,10 +320,19 @@ class Vision(Component):
self.log.info(f"initialized model {self.model!r} with mode {self.tf_mode!r}")
self.loading_model_signal.emit({"status": "done"})
def check_features(self, image, lock=True):
if image.shape != self.inf_shape[1:3]:
image = cv2.resize(image, self.inf_shape[1:3], interpolation=cv2.INTER_LINEAR)
tensor = np.expand_dims(np.asarray(image), axis=0)
def check_features(self, frame, lock=True):
# test_box = [i * s for i, s in zip([0.2, 0.2, 0.8, 0.8], frame.shape[:2] * 2)]
# return [{
# "class": self.category_index[1],
# "score": 1.0,
# "box": test_box, # rescale detection to frame size,
# "center": self.get_center(test_box),
# "size": self.get_size(test_box),
# }]
if self.inf_shape is not None and frame.shape != self.inf_shape[1:3]:
tensor = np.expand_dims(cv2.resize(frame, self.inf_shape[1:3], interpolation=cv2.INTER_LINEAR), axis=0)
else:
tensor = np.expand_dims(frame, axis=0)
# Run inference
if lock:
self.lock.lock()
@ -331,30 +366,341 @@ class Vision(Component):
continue
box = list(d_box)
# box = d_box.numpy().tolist()
center = self.get_center(box)
box = [i * s for i, s in zip(box, frame.shape[:2] * 2)] # rescale detection to frame size
detection = {
"class": self.category_index[int(d_class)],
"score": float(d_score),
# "score": d_score.numpy().tolist(),
# "mask": d_mask.numpy().tolist(),
"box": box,
"center": center,
"center": self.get_center(box),
"size": self.get_size(box),
}
parsed_detections.append(detection)
return {"result": parsed_detections, "shape": tensor[-1].shape, "ok": False, "tensor": tensor[-1]}
return parsed_detections
def _get(self, data):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
self.consumer.add_consumable(data[-1][list(self.sources)[0]])
super()._get(emit=False)
def detections_to_items(self, detections):
# DRAW DETECTIONS
if detections is not None and len(detections):
style = {
"border_thickness": 25,
"fill_color": QColor("#00000000"),
"shape": "ellipse",
}
items = {}
for item_name, item in enumerate(detections):
items[str(item_name)] = {
"box": item["box"],
**style,
"border_color": QColor(item["class"]["color"].replace("0x", "#")),
}
return items
else:
return {}
def process_consumed(self, data=None):
def process_detections(self, detections):
if self.zones is None or not len(self.zones) or detections is None or not len(detections):
return
# MATCH DETECTIONS WITH RECIPE
results = dict.fromkeys(self.zones)
for detection in detections:
# find closest zone center to the detection
# filtering out those that do not contain the detection
min_distance = sys.maxsize
closest_zone = None
for zone_name, zone in self.zones.items():
if zone["shape"] == "rect":
if self.zone_detection_filter_mode == "center_inside":
outside_zone = any([
detection["center"][0] < zone["box"][0],
detection["center"][0] > zone["box"][2],
detection["center"][1] < zone["box"][1],
detection["center"][1] > zone["box"][3],
])
elif self.zone_detection_filter_mode == "box_inside":
outside_zone = any([
detection["box"][0] < zone["box"][0],
detection["box"][2] > zone["box"][2],
detection["box"][1] < zone["box"][1],
detection["box"][3] > zone["box"][3],
])
elif self.zone_detection_filter_mode == "box_touches":
outside_zone = any([
detection["box"][2] < zone["box"][0],
detection["box"][0] > zone["box"][2],
detection["box"][3] < zone["box"][1],
detection["box"][1] > zone["box"][3],
])
else:
raise NotImplementedError(f"invalid zone_detection_filter_mode: {self.zone_detection_filter_mode!r}")
elif zone["shape"] == "ellipse": # it's a circle
distance = self.get_distance(detection["center"], zone["center"])
if self.zone_detection_filter_mode == "center_inside":
outside_zone = distance > zone["size"][0] / 2
elif self.zone_detection_filter_mode == "box_inside":
outside_zone = distance + max(detection["size"]) / 2 > zone["size"][0] / 2
elif self.zone_detection_filter_mode == "box_touches":
outside_zone = distance - max(detection["size"]) / 2 > zone["size"][0] / 2
else:
raise NotImplementedError(f"invalid zone shape: {zone['shape']!r}")
if not outside_zone and distance < min_distance:
min_distance = distance
closest_zone = zone_name
if closest_zone is not None:
# if closest zone already has a matching detection
# replace it only if the current one is preferred
if results[closest_zone] is not None:
if self.zone_detection_preference_mode == "distance":
if min_distance >= results[closest_zone]["zone_distance"]:
continue
elif self.zone_detection_preference_mode == "score":
if detection["score"] <= results[closest_zone]["score"]:
continue
else:
raise NotImplementedError(f"invalid zone_detection_preference_mode: {self.zone_detection_preference_mode!r}")
results[closest_zone] = detection.copy()
results[closest_zone]["zone_distance"] = min_distance
# check detections against recipe
checked = {}
for zone_name, detection in results.items():
if detection is None:
# dummy empty detection if nothing detected
detection = {
"box": [0, 0, 0, 0],
"center": [0, 0],
"class": {
"id": None,
"name": "no_detection",
"color": "rgb(0,0,0)",
},
# "mask": [],
"score": 1,
"size": [0, 0],
}
if zone_name not in self.zones or self.zones[zone_name]["class"]["id"] in {"no_detection", "none", }:
expected_class = {
"id": None,
"name": "no_detection",
"color": "rgb(0,0,0)",
}
else:
expected_class = self.zones[zone_name]["class"]
checked[zone_name] = {
"ok": detection is not None and detection["class"]["id"] == expected_class["id"],
"expected": expected_class,
"detection": detection,
}
return {
"ok": all(map(lambda detection: detection["ok"] is True, checked.values())),
"results": checked,
}
def results_to_items(self, results, ):
# DRAW ZONES RESULTS
if self.zones is not None and len(self.zones) and results is not None and len(results):
style = {
"border_thickness": 50,
"fill_color": QColor("#00000000"),
}
items = {
"_global_result": {
"box": [
style["border_thickness"],
style["border_thickness"],
-style["border_thickness"] * 2,
-style["border_thickness"] * 2,
],
"shape": "rect",
**style,
"border_color": Qt.green if results["ok"] else Qt.red,
}
}
for item_name, item in results["results"].items():
zone = self.zones[item_name]
items[str(item_name)] = {
"center": zone["center"],
"size": [s + 100 for s in zone["size"]],
"shape": zone["shape"],
**style,
"border_color": Qt.green if item["ok"] else Qt.red,
}
return items
else:
return {}
def render_items(self, items, offset=None, qimage=None, painter=None):
if offset is None:
offset = [0, 0]
if painter is None:
if qimage is None:
raise AssertionError("one of 'qimage' or 'painter' parameter must not be None")
painter = QPainter()
painter.begin(qimage)
for item_name, item in items.items():
try:
v = {}
if "box" in item:
v["x1"], v["y1"], v["x2"], v["y2"] = item["box"][1] + offset[1], item["box"][0] + offset[0], item["box"][3] + offset[1], item["box"][2] + offset[0]
v["w"], v["h"] = v["x2"] - v["x1"], v["y2"] - v["y1"]
v["xc"], v["yc"] = self.get_center([v["x1"], v["y1"], v["x2"], v["y2"]])
elif "location" in item:
v["x1"], v["y1"] = item["location"][1] + offset[1], item["location"][0] + offset[0]
if "size" in item:
v["w"], v["h"] = item["size"][1], item["size"][0]
v["x2"], v["y2"] = v["x1"] + v["w"], v["y1"] + v["h"]
v["xc"], v["yc"] = self.get_center([v["x1"], v["y1"], v["x2"], v["y2"]])
else:
v["w"], v["h"] = 0, 0
v["x2"], v["y2"] = v["x1"], v["y1"]
v["xc"], v["yc"] = v["x1"], v["y1"]
elif "center" in item and "size" in item:
v["xc"], v["yc"] = item["center"][1] + offset[1], item["center"][0] + offset[0]
v["w"], v["h"] = item["size"][1], item["size"][0]
v["x1"], v["y1"], v["x2"], v["y2"] = self.get_box([v["xc"], v["yc"]], [v["w"], v["h"]])
else:
raise AssertionError("item has no valid positioning information")
for k in list(v):
if v[k] < 0:
if k.startswith("x") or k.startswith("w"):
v[k] = painter.device().width() + v[k]
elif k.startswith("y") or k.startswith("h"):
v[k] = painter.device().height() + v[k]
else:
raise AssertionError("could not detect variable direction")
painter.setOpacity(item.get("opacity", 0.5))
painter.setBrush(QBrush(item.get("fill_color", QColor("#ffffff")), getattr(Qt, item.get("brush_pattern", "SolidPattern"))))
painter.setPen(QPen(
item.get("border_color", QColor("#000000")),
item.get("border_thickness", 1),
getattr(Qt, item.get("pen_line", "SolidLine")),
getattr(Qt, item.get("pen_cap", "SquareCap")),
getattr(Qt, item.get("pen_join", "MiterJoin")),
))
if item["shape"] == "ellipse":
painter.drawEllipse(QPointF(v["xc"], v["yc"]), v["w"] / 2, v["h"] / 2)
elif item["shape"] == "cross":
painter.drawLine(QLineF(v["xc"], v["y1"], v["xc"], v["y2"]))
painter.drawLine(QLineF(v["x1"], v["yc"], v["x2"], v["yc"]))
elif item["shape"] == "line":
painter.drawLine(QLineF(v["xc"], v["yc"], v["x2"], v["y2"]))
elif item["shape"] == "rect":
painter.drawRect(QRectF(v["x1"], v["y1"], v["x2"], v["y2"]))
elif item["shape"] == "text":
old_render_hints = painter.renderHints()
painter.setRenderHints(QPainter.Antialiasing | QPainter.TextAntialiasing, True)
font = QFont()
font.setStyleHint(QFont.SansSerif, QFont.PreferDefault | QFont.PreferAntialias)
font.setBold(item.get("font_bold", False))
font.setItalic(item.get("font_italic", False))
font.setKerning(item.get("font_kerning", True))
font.setLetterSpacing(QFont.AbsoluteSpacing, item.get("font_letter_spacing", 0))
font.setWordSpacing(item.get("font_word_spacing", 0))
font.setPixelSize(item.get("font_size", 25))
path = QPainterPath()
path.addText(v["x1"], v["y1"], font, item["text"])
painter.drawPath(path)
painter.setRenderHints(old_render_hints, True)
else:
raise NotImplementedError(f"item {item_name!r} has an invalid shape: {item['shape']!r}")
except Exception:
self.log.exception("".join(traceback.format_stack()))
self.log.exception(traceback.format_exc())
self.log.error(f"item {item_name!r} could not be drawn.")
def render(self, frame, detections=None, vision_results=None, mask=True, offset=None):
if mask is True:
qframe = QImage(
frame.shape[1], # width
frame.shape[0], # height
QImage.Format_RGBA8888
)
else:
aframe = cv2.cvtColor(frame, cv2.COLOR_RGB2RGBA)
qframe = QImage(
aframe.data,
aframe.shape[1], # width
aframe.shape[0], # height
aframe.shape[2] * frame.shape[1], # width * channels
QImage.Format_RGBA8888
)
painter = QPainter()
painter.begin(qframe)
for items in [
self.markers,
self.zones,
self.labels,
self.detections_to_items(detections),
self.results_to_items(vision_results)
]:
if items is not None:
try:
self.render_items(
items,
offset=offset,
painter=painter,
)
except Exception:
self.log.exception(traceback.format_exc())
painter.end()
return qframe
def _get(self, data=None):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if data is None:
return
data = data[-1][self.consumer.name]
# ADD FRAMETO VISION_CONSUMER QUEUE
self.vision_consumer.add_consumable({"frame": data[-1][list(self.sources)[0]]})
super()._get(emit=False)
def vision_consumer_work(self, consumable=None):
# VISION_CONSUMER TASK
if consumable is None:
return
detections = self.check_features(consumable["frame"])
results = self.process_detections(detections)
return {"detections": detections, "results": results}
def process_vision_consumed(self, data=None):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if data is None:
return
# ADD VISION RETURNED FROM VISION_CONSUMER TO RENDER_CONSUMER QUEUE
data = data[-1][self.vision_consumer.name]
if data is not None:
# super()._get([{
# "frame": data["consumed"]["frame"],
# "detections": data["result"]["detections"],
# "results": data["result"]["results"],
# }])
self.render_consumer.add_consumable({
"frame": data["consumed"]["frame"],
"detections": data["result"]["detections"],
"results": data["result"]["results"],
})
def render_consumer_work(self, consumable=None):
# RENDER_CONSUMER TASK
if consumable is None:
return
render = QPixmap.fromImage(self.render(
consumable["frame"],
detections=consumable["detections"],
vision_results=consumable["results"],
mask=False,
offset=None,
))
return render
def process_render_consumed(self, data=None):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if data is None:
return
# EMIT VISION AND RENDER RESULTS RETURNED FROM RENDER_CONSUMER
data = data[-1][self.render_consumer.name]
if data is not None:
super()._get([{
"frame": data["consumed"],
"vision": data["result"],
"frame": data["consumed"]["frame"],
"detections": data["consumed"]["detections"],
"results": data["consumed"]["results"],
"render": data["result"],
}])

View File

@ -26,10 +26,6 @@ message StringIntLabelMapItem {
// Human readable string label.
optional string display_name = 3;
// Label color for rendering.
optional string color = 9;
// Name of class specific keypoints for each class object and their respective
// keypoint IDs.
message KeypointMap {

View File

@ -152,6 +152,10 @@ def convert_label_map_to_categories(label_map,
keypoints[kv.label] = kv.id
list_of_keypoint_ids.append(kv.id)
category['keypoints'] = keypoints
if item.HasField("color"):
category["color"] = item.color
else:
category["color"] = ""
categories.append(category)
return categories

View File

@ -21,6 +21,8 @@ class Recipe_Editor(Widget):
"tolerance": self.tolerance_sb,
"test_duration": self.test_duration_sb,
"flush_duration": self.flush_duration_sb,
# vision
"vision_recipe": self.vision_recipe_cb,
# stabilizarion
"stabilization_time": self.stabilization_time_sb,
"stabilization_level_min": self.stabilization_level_min_sb,
@ -33,7 +35,10 @@ class Recipe_Editor(Widget):
def set_readonly(self, readonly):
for w in self.spec.values():
w.setReadOnly(readonly)
if isinstance(w, QComboBox):
w.setDisabled(readonly)
else:
w.setReadOnly(readonly)
def do_autocomplete(self, autocomplete):
if autocomplete is None:

View File

@ -7,88 +7,11 @@
<x>0</x>
<y>0</y>
<width>494</width>
<height>654</height>
<height>733</height>
</rect>
</property>
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="2">
<widget class="QGroupBox" name="groupBox_3">
<property name="title">
<string>Pressione</string>
</property>
<layout class="QGridLayout" name="gridLayout_4">
<item row="4" column="2">
<widget class="QSpinBox" name="pressure_max_sb"/>
</item>
<item row="2" column="0">
<widget class="QLabel" name="label_10">
<property name="text">
<string>Min</string>
</property>
</widget>
</item>
<item row="4" column="0">
<widget class="QLabel" name="label_12">
<property name="text">
<string>Max</string>
</property>
</widget>
</item>
<item row="2" column="2">
<widget class="QSpinBox" name="pressure_min_sb"/>
</item>
<item row="2" column="3">
<widget class="QLabel" name="label_22">
<property name="text">
<string>bar</string>
</property>
</widget>
</item>
<item row="3" column="2">
<widget class="QSpinBox" name="pressure_test_sb"/>
</item>
<item row="5" column="2">
<widget class="QSpinBox" name="pressure_ramp_sb"/>
</item>
<item row="3" column="0">
<widget class="QLabel" name="label_11">
<property name="text">
<string>Test</string>
</property>
</widget>
</item>
<item row="4" column="3">
<widget class="QLabel" name="label_24">
<property name="text">
<string>bar</string>
</property>
</widget>
</item>
<item row="5" column="0">
<widget class="QLabel" name="label_6">
<property name="text">
<string>Rampa di salita</string>
</property>
</widget>
</item>
<item row="3" column="3">
<widget class="QLabel" name="label_23">
<property name="text">
<string>bar</string>
</property>
</widget>
</item>
<item row="5" column="3">
<widget class="QLabel" name="label_18">
<property name="text">
<string>bar/min</string>
</property>
</widget>
</item>
</layout>
</widget>
</item>
<item row="2" column="1" colspan="2">
<item row="4" column="1" colspan="2">
<widget class="QGroupBox" name="groupBox_5">
<property name="title">
<string>Descrizione</string>
@ -100,12 +23,18 @@
</layout>
</widget>
</item>
<item row="1" column="1">
<item row="2" column="1">
<widget class="QGroupBox" name="groupBox_2">
<property name="title">
<string>Test</string>
</property>
<layout class="QGridLayout" name="gridLayout_3">
<item row="4" column="1">
<widget class="QSpinBox" name="test_duration_sb"/>
</item>
<item row="3" column="1">
<widget class="QSpinBox" name="tolerance_sb"/>
</item>
<item row="4" column="0">
<widget class="QLabel" name="label_8">
<property name="text">
@ -120,6 +49,12 @@
</property>
</widget>
</item>
<item row="5" column="1">
<widget class="QSpinBox" name="flush_duration_sb"/>
</item>
<item row="1" column="1">
<widget class="QSpinBox" name="cleaning_time_sb"/>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_5">
<property name="text">
@ -127,25 +62,13 @@
</property>
</widget>
</item>
<item row="4" column="1">
<widget class="QSpinBox" name="test_duration_sb"/>
</item>
<item row="3" column="0">
<widget class="QLabel" name="label_7">
<item row="3" column="2">
<widget class="QLabel" name="label_19">
<property name="text">
<string>Tolleranza</string>
<string>bar</string>
</property>
</widget>
</item>
<item row="5" column="1">
<widget class="QSpinBox" name="flush_duration_sb"/>
</item>
<item row="1" column="1">
<widget class="QSpinBox" name="cleaning_time_sb"/>
</item>
<item row="3" column="1">
<widget class="QSpinBox" name="tolerance_sb"/>
</item>
<item row="1" column="2">
<widget class="QLabel" name="label_3">
<property name="text">
@ -153,13 +76,6 @@
</property>
</widget>
</item>
<item row="3" column="2">
<widget class="QLabel" name="label_19">
<property name="text">
<string>bar</string>
</property>
</widget>
</item>
<item row="4" column="2">
<widget class="QLabel" name="label_20">
<property name="text">
@ -174,6 +90,13 @@
</property>
</widget>
</item>
<item row="3" column="0">
<widget class="QLabel" name="label_7">
<property name="text">
<string>Tolleranza</string>
</property>
</widget>
</item>
</layout>
</widget>
</item>
@ -216,7 +139,7 @@
</layout>
</widget>
</item>
<item row="1" column="2">
<item row="2" column="2">
<widget class="QGroupBox" name="groupBox_4">
<property name="title">
<string>Stabilizzazione</string>
@ -303,6 +226,102 @@
</layout>
</widget>
</item>
<item row="0" column="2">
<widget class="QGroupBox" name="groupBox_3">
<property name="title">
<string>Pressione</string>
</property>
<layout class="QGridLayout" name="gridLayout_4">
<item row="4" column="2">
<widget class="QSpinBox" name="pressure_max_sb"/>
</item>
<item row="2" column="0">
<widget class="QLabel" name="label_10">
<property name="text">
<string>Min</string>
</property>
</widget>
</item>
<item row="4" column="0">
<widget class="QLabel" name="label_12">
<property name="text">
<string>Max</string>
</property>
</widget>
</item>
<item row="2" column="2">
<widget class="QSpinBox" name="pressure_min_sb"/>
</item>
<item row="2" column="3">
<widget class="QLabel" name="label_22">
<property name="text">
<string>bar</string>
</property>
</widget>
</item>
<item row="3" column="2">
<widget class="QSpinBox" name="pressure_test_sb"/>
</item>
<item row="5" column="2">
<widget class="QSpinBox" name="pressure_ramp_sb"/>
</item>
<item row="3" column="0">
<widget class="QLabel" name="label_11">
<property name="text">
<string>Test</string>
</property>
</widget>
</item>
<item row="4" column="3">
<widget class="QLabel" name="label_24">
<property name="text">
<string>bar</string>
</property>
</widget>
</item>
<item row="5" column="0">
<widget class="QLabel" name="label_6">
<property name="text">
<string>Rampa di salita</string>
</property>
</widget>
</item>
<item row="3" column="3">
<widget class="QLabel" name="label_23">
<property name="text">
<string>bar</string>
</property>
</widget>
</item>
<item row="5" column="3">
<widget class="QLabel" name="label_18">
<property name="text">
<string>bar/min</string>
</property>
</widget>
</item>
</layout>
</widget>
</item>
<item row="3" column="1" colspan="2">
<widget class="QGroupBox" name="groupBox_6">
<property name="title">
<string>Visione</string>
</property>
<layout class="QFormLayout" name="formLayout">
<item row="0" column="0">
<widget class="QLabel" name="label_29">
<property name="text">
<string>Ricetta visione</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QComboBox" name="vision_recipe_cb"/>
</item>
</layout>
</widget>
</item>
</layout>
</widget>
<resources/>

View File

@ -1,4 +1,5 @@
import sys
from glob import iglob
from lib.db import Recipes, Users
from PyQt5.QtCore import Qt, QTimer, pyqtSignal
@ -100,7 +101,13 @@ class Recipe_Selection(Widget):
select=list(crud_aliases.keys()),
filters=filters,
fields_aliases=crud_aliases,
autocomplete={"archived": False},
autocomplete={
"archived": False,
"spec": {
# "vision_recipe": iglob("*.ini", root_dir="./config/vision/recipes/"), # only in python3.10
"vision_recipe": list(iglob("./config/vision/recipes/*.ini")),
},
},
row_upgrader=recipes_row_upgrader,
widget_classes={"spec": Json_Spec_External_Dialog_Cell_Widget, },
row_filter=recipes_row_filter,

View File

@ -1,11 +1,10 @@
import logging
import os
import sys
import time
from datetime import datetime
from lib.db import Archive, Recipes, Users
from PyQt5.QtCore import Qt, QTimer
from lib.db import Archive, Users
from PyQt5.QtCore import QTimer
from ui.helpers import replace_widget
from ui.recipe_selection import Recipe_Selection
from ui.test_assembly import Test_Assembly
@ -45,6 +44,7 @@ class Test(Widget):
"wait": Test_Assembly(self.select_step_img("wait"), u"ATTENDERE - PAUSA INTER CICLO"),
}
self.cycle_loop = ["vision", "done", "wait"]
self.cycle_index = -1
self.cycle_changing_state = False
# SETUP AUTOTEST
self.autotest_request = False
@ -53,6 +53,8 @@ class Test(Widget):
self.request_autotest("init")
else:
self.autotest_period = None
# INIT TEST DATA
self.data = None
# INIT PIECES COUNTER ([pieces_ok, pieces_failed])
self.pieces = [0, 0]
# CONNECT CYCLE CONTROLS
@ -65,8 +67,8 @@ class Test(Widget):
# custom ok handlers should call next again
if type(w.widget) is Recipe_Selection:
w.ok.connect(self.set_recipe)
# elif type(w) is Test_Camera:
# w.ok.connect(self.set_data)
elif type(w.widget) is Test_Vision:
w.ok.connect(self.set_vision)
else:
w.ok.connect(self.next)
# TESTING
@ -211,11 +213,17 @@ class Test(Widget):
def next(self, action=None):
self.log.debug(f"cycle next: cycle_state: {self.cycle_state!r} action: {action!r}")
current_w = self.cycle_states.get(self.cycle_state, None)
if current_w is not None and hasattr(current_w, "stop"):
current_w.stop()
if action == "change_recipe":
self.log.info(f"cycle next: action: {action!r}")
self.set_recipe(recipe=None)
self.cycle_changing_state = True
self.cycle_state = "select_recipe"
self.cycle_index = -1
# RESET TEST DATA
self.data = None
elif action == "fail":
self.log.info(f"cycle next: action: {action!r}")
if self.cycle_state in self.cycle_loop:
@ -225,6 +233,9 @@ class Test(Widget):
# FAIL AND RESTART TEST
self.cycle_changing_state = True
self.cycle_state = "fail"
self.cycle_index = -1
# RESET TEST DATA
self.data = None
elif action is not None:
raise NotImplementedError(f"cycle next: action {action!r} is not a valid action")
# if action did not set the next cycle_state
@ -235,14 +246,7 @@ class Test(Widget):
# if recipe not set: select_recipe
self.cycle_state = "select_recipe"
else:
try:
# get current cycle_state index in cycle_loop
cycle_index = self.cycle_loop.index(self.cycle_state)
except ValueError:
# if current cycle_state not in cycle_loop
# start the cycle_loop
cycle_index = -1
if cycle_index == -1 and self.autotest_request is not False:
if self.cycle_index == -1 and self.autotest_request is not False:
# if cycle_loop is not started or has ended
# and autotest was requested
self.autotest_request = False
@ -251,7 +255,8 @@ class Test(Widget):
self.time_timer.start(self.autotest_period)
else:
# goto next step in cycle_loop
self.cycle_state = self.cycle_loop[(cycle_index + 1) % len(self.cycle_loop)]
self.cycle_index = (self.cycle_index + 1) % len(self.cycle_loop)
self.cycle_state = self.cycle_loop[self.cycle_index]
# enable/disable cycle controls
self.change_recipe_b.setEnabled(self.recipe is not None)
self.cancel_b.setEnabled(self.cycle_state not in {
@ -261,6 +266,9 @@ class Test(Widget):
"wait",
})
self.log.info(f"cycle next: next cycle_state: {self.cycle_state!r}")
# INIT TEST DATA IF STARTING CYCLE LOOP
if self.cycle_index == 0:
self.data = {}
if self.cycle_state == "done":
self.done()
w = self.cycle_states[self.cycle_state]
@ -282,9 +290,15 @@ class Test(Widget):
self.recipe_l.setText("NON SELEZIONATA")
self.recipe_l.setStyleSheet("QLabel { color: red; }")
def set_vision(self, vision=None):
self.data["vision"] = vision
self.data["overridden"] = self.data.get("overridden", False) or self.data["vision"].get("overridden", False)
self.data["ok"] = self.data.get("ok", True) and self.data["vision"].get("ok", False)
self.next()
def done(self, ok=False):
self.log.info("cycle done")
archived = Archive.archive(self.recipe, self.data, overridden=self.data["overridden"])
archived = Archive.archive(self.recipe, self.data, ok and self.data["ok"], overridden=self.data["overridden"])
self.log.info(f"cycle archived locally: {archived!r}")
# LABEL PRINT
# self.printer.print_label("1", archived)

View File

@ -33,7 +33,7 @@ class Test_Assembly(Widget):
if widget is not None:
replace_widget(self, "widget", widget)
# widget attributes passtrough passtrough
for attr in ["ok", "ko", "start"]:
for attr in ["ok", "ko", "start", "stop"]:
if hasattr(self.widget, attr):
setattr(self, attr, getattr(self.widget, attr))
else:

View File

@ -69,9 +69,6 @@ class Test_Vision(Widget):
self.components["galaxy_camera"].set_period(period=None) # only get frame on request
self.components["galaxy_camera"].add_sources({"test_vision": self.request_frame})
self.request_frame_connection = self.components["vision"].out.connect(self.request_frame) # request new frame as soon as vision finishes
# self.components["vision_renderer"].add_sources({"vision": self.components["vision"].out})
# self.process_vision_connection = self.components["vision_renderer"].out.connect(self.process_vision)
# self.components["vision_renderer"].resume()
self.process_vision_connection = self.components["vision"].out.connect(self.process_vision)
self.components["vision"].resume()
self.components["galaxy_camera"].resume()
@ -83,11 +80,9 @@ class Test_Vision(Widget):
# disable camera-vision loop
self.components["galaxy_camera"].pause()
self.components["vision"].pause()
# self.components["vision_renderer"].pause()
self.disconnect(self.process_vision_connection)
# self.components["vision_renderer"].remove_sources(["vision", ])
self.disconnect(self.request_frame_connection)
self.components["galaxy_camera"].remove_sources(["vision", ])
self.components["galaxy_camera"].remove_sources(["test_vision", ])
def process_vision(self, data=None, override=False):
if self.ok_timer.isActive():
@ -99,21 +94,24 @@ class Test_Vision(Widget):
time = data.get("time", None)
data = data["vision"]
frame = data.get("frame", None)
vision = data.get("vision", None)
if vision is not None:
tensor = vision.get("tensor", None)
detections = data.get("detections", None)
results = data.get("results", None)
render = data.get("render", None)
if not override:
result_ok = data.get("vision", {}).get("ok", False)
else:
tensor = None
rendered = data.get("rendered", None)
result_ok = True
self.last_vision = {
"time": time,
"frame": frame,
"tensor": tensor,
"vision": vision,
"rendered": rendered,
"detections": detections,
"results": results,
"render": render,
"overridden": override,
"vision_duration": timing() - self.start_time,
"ok": result_ok,
}
if not override:
result_ok = data.get("vision", {}).get("ok", False)
if result_ok is True:
self.vision_ok_counter += 1
else:
@ -127,19 +125,19 @@ class Test_Vision(Widget):
self.visualize_vision(
time=time,
frame=frame,
tensor=tensor,
vision=vision,
rendered=rendered,
detections=detections,
results=results,
render=render,
overridden=override,
)
def visualize_vision(self, time=None, frame=None, tensor=None, vision=None, rendered=None, overridden=False):
def visualize_vision(self, time=None, frame=None, detections=None, results=None, render=None, overridden=False):
self.save_frame_b.setEnabled(self.last_vision is not None)
if overridden:
self.state_l.setPixmap(self.status_imgs_small["warning"])
elif vision is None or vision.get("results", None) is None:
elif results is None:
self.state_l.setPixmap(self.status_imgs_small[None])
elif vision.get("ok", False) is True:
elif results.get("ok", False) is True:
self.state_l.setPixmap(self.status_imgs_small[True])
else:
self.state_l.setPixmap(self.status_imgs_small[False])
@ -151,8 +149,8 @@ class Test_Vision(Widget):
self.ok_counter_pb.setPalette(self.status_palettes[False])
if overridden:
self.img = self.status_imgs_full["warning"]
elif rendered is not None:
self.img = rendered
elif render is not None:
self.img = render
elif frame is not None:
self.img = QPixmap.fromImage(QImage(
frame.data,
@ -161,14 +159,6 @@ class Test_Vision(Widget):
frame.shape[2] * frame.shape[1], # width * channels
QImage.Format_RGB888
))
elif tensor is not None:
self.img = QPixmap.fromImage(QImage(
tensor.data,
tensor.shape[1], # width
tensor.shape[0], # height
tensor.shape[2] * tensor.shape[1], # width * channels
QImage.Format_RGB888
))
else:
self.img = self.status_imgs_full[None]
self.resizeEvent()
@ -200,11 +190,4 @@ class Test_Vision(Widget):
self.process_vision(override=True)
def emit_ok(self):
self.ok.emit({
"timestamp": self.frame[0],
"frame": self.frame[1],
"barcodes": self.barcodes,
"vision": self.vision,
"overridden": self.vision_overridden,
"vision_duration": timing() - self.start_time,
})
self.ok.emit(self.last_vision)