vision wip

This commit is contained in:
matteo porta 2022-06-21 14:10:52 +02:00
parent e309021671
commit ca947434f8
25 changed files with 2700 additions and 15 deletions

View File

@ -1,6 +1,17 @@
[test]
parameter: default
[galaxy_camera]
exposure time: 10000
horizontal crop offset: 0
horizontal crop resolution: 2448
vertical crop offset: 724
vertical crop resolution: 800
rotate 90 clockwise times: 0
balance red: 1.0
balance green: 1.0
balance blue: 1.0
[vision_saver]
time_format: %Y-%m-%d_%H-%M-%S
path: ./data/images
@ -22,3 +33,7 @@ printer:
[tecna_marposs_provaset_t3]
address: COM3
baudrate: 115200
[vision]
detection_threshold: 0.3
; recipes_path: ./config/vision_test_recipes

View File

@ -0,0 +1,75 @@
item {
id: 1
name: 'red_big'
color: 'rgb(255,0,0)'
}
item {
id: 2
name: 'black_big'
color: 'rgb(50, 50, 50)'
}
item {
id: 3
name: 'blue_big'
color: 'rgb(0, 0, 255)'
}
item {
id: 4
name: 'white_big'
color: 'rgb(255, 255, 255)'
}
item {
id: 5
name: 'green_big'
color: 'rgb(0, 255, 0)'
}
item {
id: 6
name: 'yellow_big'
color: 'rgb(255, 255, 0)'
}
item {
id: 7
name: 'orange_big'
color: 'rgb(255, 145, 0)'
}
item {
id: 8
name: 'brown_big'
color: 'rgb(102, 40, 13)'
}
item {
id: 9
name: 'red_small'
color: 'rgb(225, 0, 0)'
}
item {
id: 10
name: 'black_small'
color: 'rgb(70, 70, 70)'
}
item {
id: 11
name: 'blue_small'
color: 'rgb(0, 0, 225)'
}
item {
id: 12
name: 'white_small'
color: 'rgb(225, 225, 225)'
}
item {
id: 13
name: 'green_small'
color: 'rgb(0, 225, 0)'
}
item {
id: 14
name: 'yellow_small'
color: 'rgb(225, 225, 0)'
}
item {
id: 15
name: 'ko'
color: 'rgb(255, 0, 255)'
}

View File

@ -0,0 +1,34 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "ok",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "red_big",
"w1_2": "black_big",
"w1_3": "white_big",
"w1_4": "green_big",
"w1_5": "yellow_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -0,0 +1,34 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "red_big",
"w1_2": "black_big",
"w1_3": "no_detection",
"w1_4": "blue_big",
"w1_5": "white_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -0,0 +1,34 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "red_big",
"w1_2": "black_big",
"w1_3": "no_detection",
"w1_4": "green_big",
"w1_5": "yellow_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -0,0 +1,34 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "blue_big",
"w1_2": "white_big",
"w1_3": "no_detection",
"w1_4": "black_big",
"w1_5": "red_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

View File

@ -0,0 +1,34 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ko",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ko",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ko",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "black_big",
"w1_2": "red_big",
"w1_3": "no_detection",
"w1_4": "white_big",
"w1_5": "blue_big",
"w2_1": "orange_big",
"w2_2": "brown_big",
"w2_3": "yellow_small",
"w2_4": "blue_small",
"w2_5": "white_small",
"w2_6": "green_small",
"w2_7": "red_small",
"w2_8": "black_small"
}
}

View File

@ -0,0 +1,34 @@
{
"terminals": {
"t1_1": "ok",
"t1_2": "ok",
"t1_3": "empty",
"t1_4": "ok",
"t1_5": "ok",
"t2_1": "ok",
"t2_2": "ok",
"t2_3": "ok",
"t2_4": "ok",
"t2_5": "ok",
"t2_6": "ok",
"t2_7": "ok",
"t2_8": "ok"
},
"wires": {
"w1_1": "red_big",
"w1_2": "black_big",
"w1_3": "no_detection",
"w1_4": "blue_big",
"w1_5": "white_big",
"w2_1": "brown_big",
"w2_2": "orange_big",
"w2_3": "blue_small",
"w2_4": "yellow_small",
"w2_5": "green_small",
"w2_6": "white_small",
"w2_7": "black_small",
"w2_8": "red_small"
}
}

54
init.sh
View File

@ -6,10 +6,56 @@ cd "$here"
echo "---------- initialize venv ----------"
lsof "./venv/bin/python" | awk 'NR > 1 {print $2}' | xargs kill || :
lsof "./venv/Scripts/activate" | awk 'NR > 1 {print $2}' | xargs kill || :
python -m pip install --upgrade pip
python -m venv venv
python="python3.9"
"${python}" -m pip install --upgrade pip
"${python}" -m venv venv
source "./venv/bin/activate" || source "./venv/Scripts/activate" || :
python -m pip install --upgrade pip
python -m pip install --upgrade -r "src/requirements.txt"
"${python}" -m pip install --upgrade pip
"${python}" -m pip install --upgrade -r "src/requirements.txt"
# echo "---------- get updated label-map-util ----------"
# wget "https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/utils/label_map_util.py" -O "./src/lib/helpers/object_detection/utils/label_map_util.py"
# sed -Ei "s/^(\s*from )(object_detection.protos import .*)$/\1lib.helpers.\2/" "./src/lib/helpers/object_detection/utils/label_map_util.py"
# sudo apt-get install -y protobuf-compiler
# wget "https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/protos/string_int_label_map.proto" -O "./src/lib/helpers/object_detection/protos/string_int_label_map.proto"
# insert="\n\1\/\/ Label color for rendering.\n\1optional string color = 9;"
# sed -Ei "s/^(\s*)(optional string display_name.*)$/\1\2\n${insert}\n/" "./src/lib/helpers/object_detection/protos/string_int_label_map.proto"
# protoc "./src/lib/helpers/object_detection/protos/"*.proto --python_out="."
# echo "---------- install libedgetpu ----------"
# # sudo apt-get install -y apt-transport-https curl gnupg
# # curl -fsSL https://bazel.build/bazel-release.pub.gpg | gpg --dearmor >bazel-archive-keyring.gpg
# # sudo mv bazel-archive-keyring.gpg /usr/share/keyrings
# # echo "deb [arch=amd64 signed-by=/usr/share/keyrings/bazel-archive-keyring.gpg] https://storage.googleapis.com/bazel-apt stable jdk1.8" | sudo tee /etc/apt/sources.list.d/bazel.list
# # sudo apt-get update
# sudo apt install -y build-essential docker # bazel libusb-1.0-0-dev libabsl-dev libflatbuffers-dev
# mkdir -p "$here/tmp"
# cd "$here/tmp"
# # git clone https://github.com/tensorflow/tensorflow || :
# # cd tensorflow
# # git pull
# # cd ..
# git clone "https://github.com/google-coral/libedgetpu" || :
# cd libedgetpu
# git pull
# DOCKER_IMAGE="ubuntu:18.04" DOCKER_TARGETS=libedgetpu make docker-build
# sudo make install
# cd "$here"
echo "---------- install gxlpy ----------"
sudo apt-get install -y g++ libc-bin
mkdir -p "$here/tmp"
cd "$here/tmp"
wget --continue --timestamping "http://downloads.get-cameras.com/Galaxy_Linux-x86_Gige-U3_32bits-64bits_1.2.1911.9122.tar.gz"
7z x -y "Galaxy_Linux-x86_Gige-U3_32bits-64bits_1.2.1911.9122.tar.gz"
7z x -y "Galaxy_Linux-x86_Gige-U3_32bits-64bits_1.2.1911.9122.tar"
cd "Galaxy_Linux-x86_Gige-U3_32bits-64bits_1.2.1911.9122"
chmod +x "Galaxy_camera.run"
echo -en "\nY\nY\nEn\nY\n" | ./Galaxy_camera.run
cd "$here/tmp"
wget --continue --timestamping "http://downloads.get-cameras.com/Galaxy_Linux_Python_1.0.1905.9081.tar.gz"
7z x -y "Galaxy_Linux_Python_1.0.1905.9081.tar.gz"
7z x -y "Galaxy_Linux_Python_1.0.1905.9081.tar"
cd "Galaxy_Linux_Python_1.0.1905.9081/api"
python3 setup.py build
python3 setup.py install
cd "$here"
cd "$here"

View File

@ -15,6 +15,10 @@ export QT_NO_WARNING_OUTPUT=0
python -B -u "./src/main.py" \
--auto-login-admin \
--auto-select \
--camera-edits \
--no-edgetpu \
--no-tflite \
--sim-camera \
--sim-modbus \
--sim-os-label-printer \
--style windows \

View File

@ -1,8 +1,10 @@
from .archive_synchronizer import ArchiveSynchronizer
from .galaxy_camera import GalaxyCamera
from .modbus_component import ModbusComponent
from .os_label_printer import Os_Label_Printer
from .remote_api import RemoteAPI
from .serial_label_printer import Serial_Label_Printer
from .tecna_marposs_provaset_t3 import TecnaMarpossProvasetT3
from .test_component import TestComponent
from .vision import Vision
from .vision_saver import VisionSaver

View File

@ -5,6 +5,7 @@ from PyQt5.QtCore import QObject, QSemaphore, Qt, QTimer, pyqtSignal
class Component(QObject):
"""emitted with data from the _get method"""
out = pyqtSignal(list)
_pause = pyqtSignal()
_resume = pyqtSignal()
@ -15,11 +16,20 @@ class Component(QObject):
self,
config=None,
name=None,
period=None, # period to call _get
lazy=True, # whether or not accumulate periodic _get calls if falling behind
period=None,
lazy=True,
paused=False,
threaded=True,
):
"""
parameters:
config: value for self.config, should be instance of lib.helpers.config_reader.ConfigReader
name: value for self.name, should be used to retrive component configuration section (self.config[self.name])
period: period in seconds for periodic calls to _get, set to None to disable
lazy: whether or not skip periodic _get calls if falling behind
paused: whether or not periodic calls to_get are paused
threaded: set this to tell the component if it should be thread synchronized or not(shoul be true if calls to methods are done from threads different from the component's one)
"""
super().__init__()
self.config = config
self.name = name if name is not None else str(id(self))
@ -44,9 +54,26 @@ class Component(QObject):
self.log.debug(f"config: {self.config}")
def config_changed(self):
"""
this method should be overridden when inheriting from the Component class
and should contain all the initialization code that needs to access self.config
so that the component will reinitialize if configuration changes
this method will be called on start and when self.config (ConfigReader) emits the updated signal
"""
pass
def start(self):
"""
this method is automatically called if threaded is set to False at object creation
otherwise if the component is in a thread this method should be used like this:
component = ComponentSubclass(threaded=True)
thread = QThread()
thread.setTerminationEnabled(True)
component.moveToThread(thread)
thread.started.connect(component.start)
thread.start()
component.wait_ready() # this is optional and will wait untill the component has finished started
"""
self._pause.connect(self._do_pause)
self._resume.connect(self._do_resume)
self._set_sources.connect(self._do_set_sources)
@ -63,6 +90,7 @@ class Component(QObject):
@property
def started(self):
"""returns True if the component has been started"""
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
started = self._started
@ -72,6 +100,7 @@ class Component(QObject):
@property
def running(self):
"""returns True if the periodic calls to _get are not paused"""
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
running = self._running
@ -80,6 +109,10 @@ class Component(QObject):
return running
def wait_ready(self, timeout=5):
"""
waits untill the requested action has been completed by the component
this will return immediately if threaded=False was passed at component initialization
"""
if self._threaded:
timeout = round(timeout * 1000)
if self._lock.tryAcquire(max(self._lock.available(), 1), timeout):
@ -89,6 +122,7 @@ class Component(QObject):
raise RuntimeError(f"{self.name} was not ready before timeout of {timeout}ms")
def pause(self):
"""will pause periodic calls to _get and sources trigghers"""
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
if self._running is False:
@ -102,6 +136,7 @@ class Component(QObject):
self._do_pause()
def resume(self):
"""will resume periodic calls to _get and sources trigghers"""
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
if self._running is True:
@ -115,6 +150,12 @@ class Component(QObject):
self._do_resume()
def set_sources(self, sources=None): # sources should be {"source_name": signal_to_connect}
"""
connect the given sources to trigger a call to _get
the sources parameter should be:
a dict of signals might containing one optional argument that will be passed as data to _get
or None to disconnect all sources
"""
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
self._set_sources.emit(sources)
@ -134,6 +175,7 @@ class Component(QObject):
self.log.debug("no init periodic")
def set_period(self, period=None, lazy=True):
"""will set the period for periodic calls to _get and whether or not those are lazy (see init parameters)"""
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
self._set_sources.emit({"period": period, "lazy": lazy})
@ -214,13 +256,24 @@ class Component(QObject):
self._lock.release()
def _get(self, data=None):
"""
this method should be overridden when inheriting from the Component class
the overriding method should retrive all the data and then call super()._get(data)
this will emit the data in the proper format
"""
if data is None:
data = [None]
got = [{"time": timing(), self.name: d} for d in data]
t = timing()
got = [{"time": t, self.name: d} for d in data]
self.out.emit(got)
self.log.debug(f"_get: {got}")
if self._single_shot:
if self._timer is not None and self._single_shot:
self._timer.start()
def set(self, val):
"""
this method should be overridden when inheriting from the Component class
the overriding method should set the requested val and then call super()._set(set_value)
this will log the value that has been set
"""
self.log.debug(f"set: {val}")

View File

@ -0,0 +1 @@
from .dummy_camera import DummyCamera

View File

@ -0,0 +1,106 @@
class DummyCamera:
# class Imager:
# def get_image(self):
# pass
#
# data_stream = [Imager()]
class BalanceRatioSelectorClass:
def __init__(self, balance_selected, balance_values):
self.balance_selected = balance_selected
self.balance_values = balance_values
def get(self):
print(f"{self.__class__.__name__}.get() -> {self.balance_selected}")
return self.balance_selected
def set(self, selector):
self.balance_selected = selector
print(f"{self.__class__.__name__}.set({selector}) -> {self.balance_selected}")
class BalanceRatioClass:
def __init__(self, balance_selected, balance_values):
self.balance_selected = balance_selected
self.balance_values = balance_values
def get(self):
if self.balance_selected in self.balance_values:
print(f"{self.__class__.__name__}.get({self.balance_selected}) -> {self.balance_values[self.balance_selected]}")
return self.balance_values[self.balance_selected]
else:
print(f"{self.__class__.__name__}.get({self.balance_selected}) -> Not present -> None")
return None
def set(self, value):
self.balance_values[self.balance_selected] = value
print(f"{self.__class__.__name__}.set({self.balance_selected}, {value}) -> {self.balance_values[self.balance_selected]}")
balance_selected = None
balance_values = {}
BalanceRatioSelector = BalanceRatioSelectorClass(balance_selected, balance_values)
BalanceRatio = BalanceRatioClass(balance_selected, balance_values)
class GetSetReadable:
value = None
@classmethod
def get(cls):
print(f"{cls.__name__}.get() -> {cls.value}")
return cls.value
@classmethod
def set(cls, value):
cls.value = value
print(f"{cls.__name__}.set({value}) -> {cls.value}")
@classmethod
def is_readable(cls):
print(f"{cls.__name__}.is_readable() -> {cls.value is not None}")
return cls.value is not None
class TriggerSoftware(GetSetReadable):
@classmethod
def send_command(cls, *args, **kwargs):
pass
class OffsetX(GetSetReadable):
pass
class OffsetY(GetSetReadable):
pass
class Width(GetSetReadable):
pass
class Height(GetSetReadable):
pass
class TriggerMode(GetSetReadable):
pass
class ExposureTime(GetSetReadable):
pass
class Gain(GetSetReadable):
pass
class GammaParam(GetSetReadable):
pass
class ContrastParam(GetSetReadable):
pass
class ColorCorrectionParam(GetSetReadable):
pass
class TriggerSource(GetSetReadable):
pass
@classmethod
def stream_on(cls):
pass
@classmethod
def stream_off(cls):
pass

View File

@ -0,0 +1,249 @@
import pathlib
import sys
from itertools import cycle
import cv2
import gxipy as gx
import imutils
import numpy as np
from PyQt5.QtCore import QMutex, Qt, QThread, pyqtSignal
from PyQt5.QtGui import QImage, QPixmap
from PyQt5.QtWidgets import (QDialog, QFormLayout, QLabel, QMessageBox,
QPushButton, QSizePolicy, QSlider)
if "--sim-camera" in sys.argv:
from components.dummies.gxpy import DummyCamera
from datetime import datetime
from .component import Component
class GalaxyCamera(Component):
_edits_new_frame = pyqtSignal(list)
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True, registers=None):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.lock = QMutex()
self.simulate = "--sim-camera" in sys.argv
self._last_frame = None
def config_changed(self):
self._period = int(self.config[self.name].get("frame time ms", 300)) / 1000
self.exposure_time = int(self.config[self.name]["exposure time"])
self.roi = {
"x": int(self.round_4(self.config[self.name]["horizontal crop offset"])),
"w": int(self.round_4(self.config[self.name]["horizontal crop resolution"])),
"y": int(self.round_4(self.config[self.name]["vertical crop offset"])),
"h": int(self.round_4(self.config[self.name]["vertical crop resolution"])),
"r": int(self.config[self.name]["rotate 90 clockwise times"]),
}
self.auto_white_balance = bool(self.config[self.name].get("auto white balance", False))
self.balance = {
"r": float(self.config[self.name].get("balance red", 1)),
"g": float(self.config[self.name].get("balance green", 1)),
"b": float(self.config[self.name].get("balance blue", 1)),
}
self.lock.lock()
self.camera.stream_off()
self.camera.TriggerMode.set(gx.GxSwitchEntry.OFF)
self.camera.ExposureTime.set(self.exposure_time)
self.camera.Gain.set(1.0)
self.camera.OffsetX.set(self.roi["x"])
self.camera.Width.set(self.roi["w"])
self.camera.OffsetY.set(self.roi["y"])
self.camera.Height.set(self.roi["h"])
if self.auto_white_balance:
self.camera.BalanceWhiteAuto.set(gx.GxAutoEntry.ONCE)
QThread.msleep(3000)
self.camera.BalanceRatioSelector.set(gx.GxBalanceRatioSelectorEntry.RED)
self.balance["r"] = self.camera.BalanceRatio.get()
self.camera.BalanceRatioSelector.set(gx.GxBalanceRatioSelectorEntry.GREEN)
self.balance["g"] = self.camera.BalanceRatio.get()
self.camera.BalanceRatioSelector.set(gx.GxBalanceRatioSelectorEntry.BLUE)
self.balance["b"] = self.camera.BalanceRatio.get()
else:
self.camera.BalanceRatioSelector.set(gx.GxBalanceRatioSelectorEntry.RED)
self.camera.BalanceRatio.set(self.balance["r"])
self.camera.BalanceRatioSelector.set(gx.GxBalanceRatioSelectorEntry.GREEN)
self.camera.BalanceRatio.set(self.balance["g"])
self.camera.BalanceRatioSelector.set(gx.GxBalanceRatioSelectorEntry.BLUE)
self.camera.BalanceRatio.set(self.balance["b"])
if self.camera.GammaParam.is_readable():
self.gamma_lut = gx.Utility.get_gamma_lut(self.camera.GammaParam.get())
else:
self.gamma_lut = None
if self.camera.ContrastParam.is_readable():
self.contrast_lut = gx.Utility.get_contrast_lut(self.camera.ContrastParam.get())
else:
self.contrast_lut = None
if self.camera.ColorCorrectionParam.is_readable():
self.color_correction = self.camera.ColorCorrectionParam.get()
else:
self.color_correction = 0
self.camera.TriggerMode.set(gx.GxSwitchEntry.ON)
self.camera.TriggerSource.set(gx.GxTriggerSourceEntry.SOFTWARE)
self.camera.stream_on()
self.edits = None
self.lock.unlock()
self.edits_enabled = "--camera-edits" in sys.argv
if self.edits_enabled:
self.init_edits()
@staticmethod
def round_4(x):
return 4 * round(int(x) / 4)
def start(self):
if self.simulate:
self.camera = DummyCamera()
self.sim_imgs = cycle(sorted(pathlib.Path("data/simulation_images/").glob("*.png")))
else:
self.device_manager = gx.DeviceManager()
# create a device manager
dev_num, dev_info_list = self.device_manager.update_device_list()
if dev_num == 0:
self.log.exception("camera not detected")
QMessageBox.critical(None, "Errore hardware", "Telecamera non rilevata.\nControllare connessione usb")
quit()
self.camera = self.device_manager.open_device_by_index(1)
super().start()
def _get(self):
# print("GALAXY CAMERA", str(int(QThread.currentThreadId())), flush=True)
frame = None
self.lock.lock()
if self.simulate:
img_path = next(self.sim_imgs)
self.log.debug(f"loading image {img_path}")
frame = cv2.cvtColor(cv2.imread(str(img_path)), cv2.COLOR_BGR2RGB)
else:
self.camera.TriggerSoftware.send_command()
frame = self.camera.data_stream[0].get_image()
if frame is not None:
if frame.get_status() == gx.GxFrameStatusList.INCOMPLETE:
self.log.error("incomplete frame")
frame = None
else:
frame = frame.convert("RGB")
frame.image_improvement(self.color_correction, self.contrast_lut, self.gamma_lut)
frame = frame.get_numpy_array()
frame = np.rot90(frame, self.roi["r"])
self.lock.unlock()
if frame is None:
self.log.error("failed to get frame")
elif self.edits_enabled:
frame = self.edit(frame, self.edits)
self._edits_new_frame.emit([frame])
super()._get([frame])
def __del__(self, event=None):
self.camera.stream_off()
self.camera.close_device()
def init_edits(self):
self.edits_enabled = True
self.edits_dialog = EditsDialog(self.roi)
self.edits_dialog.edits_changed.connect(self.set_edits)
self._edits_new_frame.connect(self.edits_dialog.save_and_show_edits_new_frame)
def set_edits(self, edits=None):
self.edits = edits
def edit(self, img, edits=None):
if edits is None:
return img
# BRIGHTNESS AND CONTRAST
contrast = float(edits.get("contrast", 0))
brightness = float(edits.get("brightness", 0))
if not (contrast == 0 and brightness == 0):
img = imutils.adjust_brightness_contrast(img, brightness=brightness, contrast=contrast / 255 * 500)
# ROTATE AND SCALE
rotation = -float(edits.get("rotation", 0))
scale = float(edits.get("scale", 1))
if not (rotation == 0 and scale == 1):
img = imutils.rotate(img, rotation, scale=scale)
# TRANSLATE
translation_x = float(edits.get("translation_x", 0))
translation_y = float(edits.get("translation_y", 0))
if not (translation_x == 0 and translation_y == 0):
img = imutils.translate(img, translation_x, translation_y)
return img
class EditsDialog(QDialog):
edits_changed = pyqtSignal(dict)
def __init__(self, roi):
super().__init__()
self.frame = None
self.edits = {
"brightness": 0,
"contrast": 0,
"rotation": 0,
"scale": 1,
"translation_x": 0,
"translation_y": 0,
}
self.edits_specs = {
"brightness": [[-255, 255], 1, QSlider(Qt.Horizontal), QLabel()],
"contrast": [[-255, 255], 1, QSlider(Qt.Horizontal), QLabel()],
"rotation": [[-180, 180], 1, QSlider(Qt.Horizontal), QLabel()],
"scale": [[0, 5], 100, QSlider(Qt.Horizontal), QLabel()],
"translation_x": [[-roi["w"], roi["w"]], 1, QSlider(Qt.Horizontal), QLabel()],
"translation_y": [[-roi["h"], roi["h"]], 1, QSlider(Qt.Horizontal), QLabel()],
}
layout = QFormLayout()
self.edits_frame_l = QLabel()
self.edits_frame_l.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
layout.addRow(self.edits_frame_l)
for edit, limits in self.edits_specs.items():
limit, multiplier, slider, label = limits
slider.setRange(limit[0] * multiplier, limit[1] * multiplier)
slider.setSingleStep(1)
slider.setValue(self.edits[edit] * multiplier)
label.setText(f"{edit}: {self.edits[edit]}")
layout.addRow(label, slider)
slider.valueChanged.connect(self.update_edits)
self.edits_save_frame_b = QPushButton("save frame")
layout.addRow(self.edits_save_frame_b)
self.edits_save_frame_b.clicked.connect(self.edits_save_frame)
self.setLayout(layout)
self.update_edits()
self.show()
def update_edits(self):
for edit, limits in self.edits_specs.items():
limit, multiplier, slider, label = limits
self.edits[edit] = slider.value() / multiplier
label.setText(f"{edit}: {self.edits[edit]}")
self.edits_changed.emit(self.edits)
def save_and_show_edits_new_frame(self, frame):
self.frame = frame[0]
if self.frame is not None:
self.edits_frame_l.setPixmap(
QPixmap.fromImage(
QImage(
self.frame.data,
self.frame.shape[1], # width
self.frame.shape[0], # height
self.frame.shape[2] * self.frame.shape[1], # width * channels
QImage.Format_RGB888
)
).scaled(
max(self.edits_frame_l.width(), 640),
max(self.edits_frame_l.height(), 480),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
)
)
def edits_save_frame(self):
if self.frame is None:
return
out_path = f"./{datetime.now().isoformat()}.png"
print(f"saving frame: {out_path!r}")
img = cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR)
cv2.imwrite(out_path, img)
return out_path

550
src/components/renderer.py Normal file
View File

@ -0,0 +1,550 @@
import copy
import gc
import os
import sys
import traceback
from configparser import ConfigParser
import lib.helpers.label_map_util as label_map_util
import numpy
import numpy as np
import tensorflow as tf
from pycoral.adapters import detect
if "--no-edgetpu" not in sys.argv:
from pycoral.utils.edgetpu import make_interpreter
else:
def make_interpreter(*args, **kwargs):
raise ValueError("\"--no-edgetpu\" in sys.argv")
from lib.helpers.log import log_msg
from PyQt5.QtCore import (QFileSystemWatcher, QMutex, QObject, QPointF, QRectF,
Qt, pyqtSignal)
from PyQt5.QtGui import QBrush, QColor, QPainter, QPen
from tflite_runtime.interpreter import Interpreter
from ui.test.test import CycleState
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# Patch the location of gfile
tf.gfile = tf.io.gfile
IMG_SCALE = 0.75 # ORIGINAL IMAGE FRAME TO QT WINDOW RATIO
class LibVision(QObject):
status_signal = pyqtSignal(dict)
loading_model_signal = pyqtSignal(dict)
def __init__(self, main_window):
super().__init__()
self.arrow = None
self.last_arrow = None
self.main_window = main_window
self.config = main_window.config
self.watcher = main_window.watcher
self.img_scale = IMG_SCALE
self.pixel_size = self.config["CAMERA"]["pixel size"]
self.image_width_qt = int(self.config["CAMERA"]["horizontal crop resolution"] * IMG_SCALE)
self.image_height_qt = int(self.config["CAMERA"]["vertical crop resolution"] * IMG_SCALE)
self.meas_tolerance = self.config["VISION"]["measurement tolerance"]
self.align_tolerance = self.config["VISION"]["alignment tolerance"]
self.autotest_tolerance = self.config["VISION"]["autotest tolerance"]
self.autotest_positions = self.config["VISION"]["autotest positions"]
self.detection_crop_qt = (0, 0, self.image_width_qt, self.image_height_qt)
self.img_center_x_qt = int(self.image_width_qt / 2)
self.img_center_y_qt = int(self.image_height_qt / 2)
self.img_q1_y_qt = 100
self.qtcolor = {"orange": QColor("#FF8400")}
# OBJECT DETECTION
self.detection_threshold = self.config["VISION"]["detection threshold"]
self.image_width_inf = 1024
self.image_height_inf = 128
self.measure_ok = False
self.edge_detected = False
self.edge_detection = None
self.edge_fitting_distance = None
self.edge_fitting_distance_real = None
self.detected_cal_positions = None
self.cal_detected = None
# recipe
self.zones = None
self.labels = None
self.recipes_dir = "config/vision_test_recipes"
self.recipe_watcher = QFileSystemWatcher([])
self.set_recipe(None)
self.recipe_watcher.fileChanged.connect(self.set_recipe)
self.NEURAL_NETWORK_MODEL = self.config["VISION"]["neural network"]
# MODEL
self.model = None
self.model_lock = QMutex()
self.tflite_mode = False
self.edgeTPU_mode = False
if "--tflite" in sys.argv:
self.tflite_mode = True
self.edgeTPU_mode = True
self.load_model(self.NEURAL_NETWORK_MODEL)
# CATEGORY INDEX
# label_map = label_map_util.load_labelmap("config/vision_test_labels/labels-onlydots.pbtxt")
label_map = label_map_util.load_labelmap("config/vision_test_labels/labels.pbtxt")
self.num_classes = len(label_map.item)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.config.category_index = label_map_util.create_category_index(categories)
self.config.classes_map = {c["name"]: k for k, c in self.config.category_index.items()}
def mm_to_qt(self, val):
return int(val * self.img_scale / self.pixel_size)
def set_recipe(self, recipe_path=None):
log_msg(f"LOADING RECIPE {recipe_path!r}")
watched = self.recipe_watcher.files()
if recipe_path == "" and len(watched) == 0: # skip bad watcher signals
return
if len(watched) > 0:
self.recipe_watcher.removePaths(watched)
self.recipe_path = recipe_path
if self.recipe_path is None:
self.zones = None
self.labels = None
else:
try:
if not os.path.isfile(self.recipe_path):
raise AssertionError(f"Recipe file {self.recipe_path!r} could not be found.")
config = ConfigParser(inline_comment_prefixes="#")
read = config.read(self.recipe_path)
if len(read) != 1 or self.recipe_path not in read:
raise AssertionError(f"Recipe file {self.recipe_path!r} could not be read.")
recipe = config._sections.get("camera_shapes", None)
if recipe is None:
raise AssertionError(
f"Recipe file {self.recipe_path!r} does not contain the 'camera_shapes' section.")
self.points = self.parse_points(recipe)
zones = config._sections.get("camera_zones", None)
if zones is None:
raise AssertionError(
f"Recipe file {self.recipe_path!r} does not contain the 'camera_zones' section.")
self.zones = self.parse_zones(zones)
labels = config._sections.get("labels", None)
if labels is not None:
self.labels = self.parse_labels(labels)
else:
self.labels = None
self.recipe_watcher.addPath(self.recipe_path)
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
self.zones = None
if self.zones is None and self.labels is None:
self.status_signal.emit({"vision_recipe": 0})
else:
self.status_signal.emit({"vision_recipe": os.path.splitext(os.path.basename(recipe_path))[0]})
@staticmethod
def parse_points(config):
points = {}
for point_name, point_spec in config.items():
try:
center, size, fill_color, border_color, border_thickness, shape = point_spec.split(" ")
center = list(map(float, center.split(",")))
center[1] = -center[1]
size = list(map(float, size.split(",")))
if len(size) == 1:
size = [size[0], size[0]]
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
shape = shape.lower()
points[point_name] = {
"center": center,
"size": size,
"fill_color": fill_color,
"border_color": border_color,
"border_thickness": border_thickness,
"shape": shape,
}
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
log_msg(f"point {point_name!r} could not be parsed. spec: {point_spec!r}", file=sys.stderr)
return points
@staticmethod
def draw_points(points, qimage, pixel_mm, mm_offset=None, painter=None):
if mm_offset is None:
mm_offset = [0, 0]
if painter is None:
painter = QPainter()
painter.begin(qimage)
avg_pixel_mm = (sum(pixel_mm) / len(pixel_mm))
for point_name, point in points.items():
try:
x = (point["center"][0] + mm_offset[0]) / pixel_mm[0] + qimage.width() / 2
y = -(point["center"][1] + mm_offset[1]) / pixel_mm[1] + qimage.height() / 2
x2 = x + (point["size"][0] + mm_offset[0]) / pixel_mm[0]
y2 = y - (point["size"][1] + mm_offset[1]) / pixel_mm[1]
w = point["size"][0] / pixel_mm[0]
h = point["size"][1] / pixel_mm[1]
painter.setBrush(QBrush(point["fill_color"], Qt.SolidPattern))
painter.setPen(
QPen(point["border_color"], point["border_thickness"] / avg_pixel_mm, Qt.SolidLine, Qt.SquareCap,
Qt.MiterJoin))
if point["shape"] == "ellipse":
painter.drawEllipse(QPointF(x, y), w / 2, h / 2)
elif point["shape"] == "cross":
painter.drawLine(x, y - h / 2, x, y + h / 2)
painter.drawLine(x - w / 2, y, x + w / 2, y)
elif point["shape"] == "line":
painter.drawLine(x, y, x2, y2)
else:
raise NotImplementedError(f"point {point_name!r} has an invalid shape: {point['shape']!r}")
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
log_msg(f"point {point_name!r} could not be drawn.", file=sys.stderr)
def parse_zones(self, config):
zones = {}
for zone_name, zone_spec in config.items():
zone_name = zone_name.upper()
try:
center, margin, d_class = zone_spec.split(" ")
center = list(map(float, center.split(",")))
center[1] = -center[1]
if margin == "none":
margin = None
elif "," in margin:
margin = list(map(float, margin.split(",")))
else:
margin = float(margin)
zones[zone_name] = {
"center": center,
"margin": margin,
"class": self.config.category_index[self.config.classes_map[d_class]],
}
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
log_msg(f"Region {zone_name!r} could not be parsed. spec: {zone_spec!r}", file=sys.stderr)
zones["fitting"] = {
"center": [0, 0],
"margin": [140, 140],
"class": self.config.category_index[self.config.classes_map["fitting"]],
}
return zones
def parse_labels(self, config):
labels = {}
for label_name, label_spec in config.items():
try:
location, font_size, fill_color, border_color, border_thickness, text = label_spec.split(" ", 5)
location = list(map(float, location.split(",")))
location[1] = -location[1]
font_size = float(font_size)
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
text = text.replace("\\n", "\n").replace("\\t", "\t")
labels[label_name] = {
"location": location,
"font_size": font_size,
"fill_color": fill_color,
"border_color": border_color,
"border_thickness": border_thickness,
"text": text,
}
except Exception:
print(*traceback.format_stack(), sep="", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)
log_msg(f"Label {label_name!r} could not be parsed. spec: {label_spec!r}", file=sys.stderr)
return labels
# def zone_center(self, zone):
# return (int((zone["xmax"] + zone["xmin"]) / 2), int((zone["ymax"] + zone["ymin"]) / 2))
def get_center(self, rect):
return [(rect[0] + rect[2]) / 2, (rect[1] + rect[3]) / 2]
def get_distance(self, p1, p2):
return pow((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2, 1 / 2)
def load_lodel_tflite(self, model=None):
# Creates tflite interpreter
try:
self.interpreter = make_interpreter(f"neural_networks/{model}/{model}_edgetpu.tflite")
self.interpreter.allocate_tensors()
self.interpreter.invoke() # warmup
self.tflite_input_details = self.interpreter.get_input_details()
self.tflite_output_details = self.interpreter.get_output_details()
self.inf_width = self.tflite_input_details[0]['shape'][2]
self.inf_height = self.tflite_input_details[0]['shape'][1]
log_msg("edge TPU model initialized")
except ValueError:
# edge TPU not found
log_msg("edge TPU not found")
self.edgeTPU_mode = False
self.interpreter = Interpreter(f"neural_networks/{model}/{model}.tflite")
self.interpreter.allocate_tensors()
self.interpreter.invoke() # warmup
self.tflite_input_details = self.interpreter.get_input_details()
self.tflite_output_details = self.interpreter.get_output_details()
self.inf_width = self.tflite_input_details[0]['shape'][2]
self.inf_height = self.tflite_input_details[0]['shape'][1]
log_msg("TFlite model on CPU initialized")
def run_inference_tflite(self, image, threshold=0.3):
self.interpreter.set_tensor(self.tflite_input_details[0]['index'], image)
self.interpreter.invoke()
objs = detect.get_objects(self.interpreter, threshold, (1, 1))
boxes = [(obj.bbox.ymin / self.inf_height, obj.bbox.xmin / self.inf_width, obj.bbox.ymax / self.inf_height, obj.bbox.xmax / self.inf_width) for obj in objs]
classes = [obj.id + 1 for obj in objs]
scores = [obj.score for obj in objs]
return {"detection_boxes": boxes, "detection_classes": classes, "detection_scores": scores,
"num_detections": 10}
def load_model(self, model=None):
log_msg("NEURAL NETWORK MODEL CHOSEN: {}".format(model))
if model.lower() in [
"",
"any",
"last",
"latest",
"newest",
"none",
None,
]:
model = \
sorted([d for d in os.listdir("neural_networks") if os.path.isdir(os.path.join("neural_networks", d))],
reverse=True)[0]
self.NEURAL_NETWORK_MODEL = model
self.loading_model_signal.emit({"status": "loading"})
self.model_lock.lock()
log_msg("LOADING NEURAL NETWORK MODEL: {}".format(model))
try:
if self.tflite_mode:
self.load_lodel_tflite(self.NEURAL_NETWORK_MODEL)
else:
model = tf.saved_model.load(f"neural_networks/{model}")
except Exception as e:
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "aborted"})
raise e
if self.model is not None:
self.model = None
tf.keras.backend.clear_session()
gc.collect()
self.model = model
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "done"})
def check_features(self, image_np):
self.watcher.profile_start()
image = np.asarray(image_np)
self.watcher.profile_measure("INF_NP")
input_tensor = np.expand_dims(image, axis=0)
self.watcher.profile_measure("INF_SHAPE")
# Run inference
self.model_lock.lock()
if self.tflite_mode:
detections = self.run_inference_tflite(input_tensor, threshold=self.detection_threshold)
else:
detections = self.model(input_tensor)
num_detections = int(detections.pop("num_detections"))
detections = {key: value[0, :num_detections].numpy() for key, value in detections.items()}
detections["num_detections"] = num_detections
self.model_lock.unlock()
self.watcher.profile_measure("INF_INF")
parsed_detections = []
for d_class, d_score, d_box in zip(detections["detection_classes"], detections["detection_scores"],
detections["detection_boxes"]):
if d_score >= self.detection_threshold:
ymin, xmin, ymax, xmax = d_box # d_box.tolist()
box = [
xmin,
ymin,
xmax,
ymax,
]
center = self.get_center(box)
detection = {
"class": self.config.category_index[int(d_class)]["name"],
"color": self.config.category_index[int(d_class)]["color_qt"],
"score": float(d_score),
"box": box,
"center": center,
"pos_rel_mm": self.get_pos_rel_mm(center),
}
parsed_detections.append(detection)
self.watcher.profile_measure("INF_POST")
return parsed_detections
def get_pos_rel_mm(self, center):
x_rel_px = (center[0] - 0.5) * self.config["CAMERA"]["horizontal crop resolution"]
return x_rel_px * self.pixel_size
def process_detections(self, detections):
self.last_arrow = copy.deepcopy(self.arrow)
for detection in detections:
self.edge_detected = None
self.measure_ok = False
if detection["class"] == "edge-left":
self.edge_detected = True
self.edge_detection = detection
self.edge_fitting_distance = self.main_window.cam_offset_mm - self.main_window.drawing.after_insertion_offset - (self.main_window.drawing.fitting_offset + self.main_window.cutting_offset_mm) + detection["pos_rel_mm"]
self.edge_fitting_distance_real = self.main_window.cam_offset_mm - (self.main_window.test_fitting_offset + self.main_window.cutting_offset_mm) + detection["pos_rel_mm"]
# DECIDE ARROW DIRECTION (NORMAL IN FIRST TEST, INVERTED IN SECOND TEST)
if (self.edge_fitting_distance_real > self.main_window.drawing.dipstick_offset) != (self.main_window.test_widget.current_state == CycleState.VISION_2_VERIFY):
self.arrow = "left"
else:
self.arrow = "right"
if abs(self.edge_fitting_distance_real - self.main_window.drawing.dipstick_offset) < self.main_window.test_tolerance:
self.measure_ok = True
self.arrow = "ok"
return
else:
self.measure_ok = False
return
break
# NO EDGE DETECTED
self.arrow = "none"
self.edge_fitting_distance = None
self.edge_fitting_distance_real = None
self.edge_detection = None
def process_detections_calibration(self, detections):
self.detected_cal_positions = [{"ok": False, "pos": None} for x in enumerate(self.autotest_positions)]
cal_positions = []
for detection in detections:
self.edge_detected = False
self.measure_ok = False
if detection["class"] == "cal":
cal_positions.append({"pos": detection["pos_rel_mm"], "detection": detection, "ok": False})
for n, pos in enumerate(self.autotest_positions):
if len(cal_positions) == 0:
break
nearest_idx = min(range(len(cal_positions)), key=lambda i: abs(cal_positions[i]["pos"] - pos))
if abs(cal_positions[nearest_idx]["pos"] - pos) < 5:
self.detected_cal_positions[n] = cal_positions[nearest_idx]
del(cal_positions[nearest_idx])
self.edge_detected = True
self.measure_ok = True
self.arrow = "ok"
for i, detected, expected in zip(range(len(self.detected_cal_positions)), self.detected_cal_positions, self.autotest_positions):
if detected["pos"] is not None and abs(detected["pos"] - expected) < self.autotest_tolerance:
self.detected_cal_positions[i]["ok"] = True
else:
self.arrow = "ko"
self.measure_ok = False
if detected["pos"] is None:
self.edge_detected = False
def visualize_calibration(self, image):
painter = QPainter()
painter.begin(image)
if len(self.detected_cal_positions) > 0:
for detection in self.detected_cal_positions:
if detection["pos"] is not None:
center = detection["detection"]["center"]
det_x_qt = int(center[0] * self.image_width_qt)
det_y_qt = int(center[1] * self.image_height_qt)
# DRAW VERTICAL EDGE POSITION MARK
if detection["ok"]:
color = Qt.green
else:
color = Qt.red
painter.setPen(QPen(color, 4, Qt.DashLine))
refline = det_x_qt, det_y_qt + 40, det_x_qt, det_y_qt - 40
painter.drawLine(*refline)
# DRAW COLORED RECTANGLES IDENTIFYING DETECTIONS
def visualize_detections(self, image, detections):
painter = QPainter()
painter.begin(image)
# DRAW DETECTIONS
detections_thickness_px = 4
if detections is not None:
painter.setOpacity(1)
for detection in detections:
result_color = QColor(*detection["color"], 150)
# center = detection["center"]
xmin, ymin, xmax, ymax = detection["box"]
x = xmin * self.image_width_qt - detections_thickness_px
y = ymin * self.image_height_qt - detections_thickness_px
w = xmax * self.image_width_qt - x + detections_thickness_px
h = ymax * self.image_height_qt - y + detections_thickness_px
painter.setPen(QPen(result_color, detections_thickness_px, Qt.SolidLine, Qt.SquareCap, Qt.MiterJoin))
painter.setBrush(QBrush())
painter.drawRect(QRectF(x, y, w, h))
def visualize_ref(self, image):
painter = QPainter()
painter.begin(image)
ref_x_qt = self.main_window.ref_x_qt
# DRAW H/V CAMERA CENTER AXIS
if self.main_window.display_camera_center:
painter.setPen(QPen(Qt.yellow, 2, Qt.DashLine))
refline = self.img_center_x_qt, 0, self.img_center_x_qt, self.image_height_qt
painter.drawLine(*refline)
refline = 0, self.img_center_y_qt, self.image_width_qt, self.img_center_y_qt
painter.drawLine(*refline)
tolerance_px = self.mm_to_qt(self.main_window.test_tolerance)
if self.main_window.test_state in (CycleState.VISION_2_VERIFY, CycleState.VISION_1_ALIGNMENT):
# DRAW VERTICAL REFERENCE POSITION AXIS
painter.setPen(QPen(Qt.cyan, 4, Qt.DashLine))
refline = ref_x_qt, 0, ref_x_qt, self.image_height_qt
painter.drawLine(*refline)
# DRAW TOLERANCE LIMITS AXIS
painter.setPen(QPen(Qt.cyan, 2, Qt.DashDotLine))
refline = ref_x_qt + tolerance_px, 0, ref_x_qt + tolerance_px, self.image_height_qt
painter.drawLine(*refline)
refline = ref_x_qt - tolerance_px, 0, ref_x_qt - tolerance_px, self.image_height_qt
painter.drawLine(*refline)
# DRAW ORIGIN(out of screen) TO REFERENCE ARROW
painter.setPen(QPen(Qt.blue, 4, Qt.SolidLine))
painter.drawLine(0, self.img_q1_y_qt, ref_x_qt, self.img_q1_y_qt)
painter.drawLine(ref_x_qt - 20, self.img_q1_y_qt - 20, ref_x_qt, self.img_q1_y_qt)
painter.drawLine(ref_x_qt - 20, self.img_q1_y_qt + 20, ref_x_qt, self.img_q1_y_qt)
if self.main_window.test_state == CycleState.AUTOTEST:
# DRAW AUTOTEST MARKS
for mark in self.main_window.config["VISION"]["autotest positions"]:
mark_x_qt = self.img_center_x_qt + self.mm_to_qt(mark)
# DRAW MARK POSITION AXIS
painter.setPen(QPen(self.qtcolor["orange"], 4, Qt.DashLine))
refline = mark_x_qt, 0, mark_x_qt, self.image_height_qt
painter.drawLine(*refline)
# DRAW TOLERANCE LIMITS AXIS
painter.setPen(QPen(self.qtcolor["orange"], 2, Qt.DashDotLine))
refline = mark_x_qt + tolerance_px, 0, mark_x_qt + tolerance_px, self.image_height_qt
painter.drawLine(*refline)
refline = mark_x_qt - tolerance_px, 0, mark_x_qt - tolerance_px, self.image_height_qt
painter.drawLine(*refline)
def visualize_edge(self, image):
painter = QPainter()
painter.begin(image)
if self.edge_detection is not None:
center = self.edge_detection["center"]
det_x_qt = int(center[0] * self.image_width_qt)
det_y_qt = int(center[1] * self.image_height_qt)
# DRAW VERTICAL EDGE POSITION MARK
if self.measure_ok:
color = Qt.green
else:
color = Qt.red
painter.setPen(QPen(color, 4, Qt.DashLine))
refline = det_x_qt, det_y_qt + 40, det_x_qt, det_y_qt - 40
painter.drawLine(*refline)
# DRAW ORIGIN(out of screen) TO DETECTION ARROW
painter.setPen(QPen(color, 4, Qt.SolidLine))
painter.drawLine(0, det_y_qt, det_x_qt, det_y_qt)
painter.drawLine(det_x_qt - 20, det_y_qt - 20, det_x_qt, det_y_qt)
painter.drawLine(det_x_qt - 20, det_y_qt + 20, det_x_qt, det_y_qt)

171
src/components/terminals.py Normal file
View File

@ -0,0 +1,171 @@
import gc
import os
import random
import re
import sys
import numpy as np
import tensorflow as tf
from lib.helpers import log_msg
from lib.object_detection.utils import label_map_util
from PyQt5.QtCore import QMutex, pyqtSignal, pyqtSlot
from .input import Input
# Patch the location of gfile
tf.gfile = tf.io.gfile
class Terminals(Input):
loading_model_signal = pyqtSignal(dict)
def __init__(self, bench, name, config):
super().__init__(bench, name, config)
self.simulate = "--sim-vision" in sys.argv
self.source = self.bench.inputs[self.config["source"]]
self.model_name = self.config["model"]
self.num_classes = self.config["num_classes"]
self.matching_distance = self.config["matching_distance"]
self.crop = self.bench.zones["terminals"]["box"]
self.subzones = {k: self.bench.zones[k] for k in self.bench.zones if re.search("t[0-9].*", k)}
self.threshold = self.config["threshold"]
if not self.simulate:
# MODEL
self.model = None
self.model_lock = QMutex()
self.load_model(self.model_name)
label_map = label_map_util.load_labelmap("config/vision_test_labels/terminals.pbtxt")
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.category_index = label_map_util.create_category_index(categories)
# TO INITIALIZE TENSORFLOW MODELS
if not self.simulate:
img = np.zeros([1, 1, 3], dtype=np.uint8)
input_tensor = tf.convert_to_tensor(img)
input_tensor = input_tensor[tf.newaxis, ...]
self.model_lock.lock()
self.bench.gpu_mutex.lock()
self.model(input_tensor)
self.bench.gpu_mutex.unlock()
self.model_lock.unlock()
self.last_frame = None
def load_model(self, model_name=None):
log_msg("TERMINALS NEURAL NETWORK MODEL CHOSEN: {}".format(model_name))
if model_name.lower() in [
"",
"any",
"last",
"latest",
"newest",
"none",
None,
]:
model_name = sorted([d for d in os.listdir("neural_networks") if os.path.isdir(os.path.join("neural_networks", d)) and d.startswith("t")], reverse=True)[0]
self.loading_model_signal.emit({"status": "loading"})
self.model_lock.lock()
log_msg(f"LOADING TERMINALS NEURAL NETWORK MODEL: {model_name}", msg_type="tensorflow")
try:
with tf.device('/device:GPU:0'):
model = tf.saved_model.load(f"neural_networks/{model_name}")
# if "--terminals-tf-default-signature" in sys.argv:
# model = model.signatures["serving_default"]
self.model_name = model_name
except Exception as e:
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "aborted"})
raise e
if self.model is not None:
self.model = None
tf.keras.backend.clear_session()
gc.collect()
self.model = model
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "done"})
@pyqtSlot(list)
def _get(self, frame):
img = frame[1]
img = img[self.crop[1]:self.crop[3], self.crop[0]:self.crop[2]]
self.last_frame = img
if self.simulate:
w = self.crop[2] - self.crop[0]
h = self.crop[3] - self.crop[1]
detected_zones = {}
for zone_name, zone in self.subzones.items():
detected_zones[zone_name] = {
"class": random.choice(list(self.category_index.values())),
"score": 1,
"box": [
(zone["box"][1] - self.crop[1]) / h,
(zone["box"][0] - self.crop[0]) / w,
(zone["box"][3] - self.crop[1]) / h,
(zone["box"][2] - self.crop[0]) / w,
],
"center": [
(zone["center"][1] - self.crop[1]) / h,
(zone["center"][0] - self.crop[0]) / w,
],
}
# self.update.emit([frame[0], detected_zones, self.name])
self.out.emit([frame[0], detected_zones, self.name])
return
input_tensor = tf.convert_to_tensor(np.asarray(img))
input_tensor = input_tensor[tf.newaxis, ...]
if not self.simulate:
# RUN INFERENCE
self.model_lock.lock()
self.bench.gpu_mutex.lock()
output = self.model(input_tensor)
self.bench.gpu_mutex.unlock()
self.model_lock.unlock()
else:
output = {"detection_classes": [[None]], "detection_scores": [[0]], "detection_boxes": [[None]]}
# create detections
detections = []
for d_class, d_score, d_box in zip(output["detection_classes"][0], output["detection_scores"][0], output["detection_boxes"][0]):
if d_score < self.threshold:
continue
detections.append({
"class": self.category_index[int(d_class)],
"score": d_score.numpy().tolist(),
"box": d_box.numpy().tolist(),
"center": self.bench.get_center(d_box.numpy().tolist()),
})
# match detections with zones
w = self.crop[2] - self.crop[0]
h = self.crop[3] - self.crop[1]
detected_zones = {zone_name: None for zone_name in self.subzones}
for detection in detections:
d_center = [self.crop[0] + detection["center"][1] * w, self.crop[1] + detection["center"][0] * h]
min_distance = sys.maxsize
closest_zone = None
for zone_name, zone in self.subzones.items():
distance = self.bench.get_distance(d_center, zone["center"])
if distance < min_distance and distance <= self.matching_distance:
min_distance = distance
closest_zone = zone_name
if closest_zone is None:
continue
if detected_zones[closest_zone] is not None:
old_center = [self.crop[0] + detected_zones[closest_zone]["center"][1] * w, self.crop[1] + detected_zones[closest_zone]["center"][0] * h]
if self.bench.get_distance(old_center, zone["center"]) <= self.bench.get_distance(d_center, zone["center"]):
continue
detected_zones[closest_zone] = detection
for zone_name, zone in self.subzones.items():
if detected_zones[zone_name] is None:
detected_zones[zone_name] = {
"class": {"id": None, "name": "no_detection", "color": "rgb(0,0,0)", "color_qt": [0, 0, 0]},
"score": 1,
"box": [
(zone["box"][1] - self.crop[1]) / h,
(zone["box"][0] - self.crop[0]) / w,
(zone["box"][3] - self.crop[1]) / h,
(zone["box"][2] - self.crop[0]) / w,
],
"center": [
(zone["center"][1] - self.crop[1]) / h,
(zone["center"][0] - self.crop[0]) / w,
],
}
# self.update.emit([frame[0], detected_zones, self.name])
self.out.emit([frame[0], detected_zones, self.name])

338
src/components/vision.py Normal file
View File

@ -0,0 +1,338 @@
import os
import sys
import traceback
from configparser import ConfigParser
from pathlib import Path
import numpy
import numpy as np
import tensorflow as tf
from lib.helpers.object_detection.utils import label_map_util
from PyQt5.QtCore import QFileSystemWatcher, QMutex, pyqtSignal
from PyQt5.QtGui import QColor
from .component import Component
if "--no-edgetpu" not in sys.argv:
if "--no-tflite" not in sys.argv:
from pycoral.utils.edgetpu import make_interpreter
else:
def make_interpreter(*args, **kwargs):
raise ValueError("\"--no-edgetpu\" in sys.argv")
if "--no-tflite" not in sys.argv:
from pycoral.adapters import detect
from tflite_runtime.interpreter import Interpreter
else:
def Interpreter(*args, **kwargs):
raise ValueError("\"--no-tflite\" in sys.argv")
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
#
# # Patch the location of gfile
# tf.gfile = tf.io.gfile
class Vision(Component):
status_signal = pyqtSignal(dict)
loading_model_signal = pyqtSignal(dict)
def __init__(self, config=None, name=None, period=None, lazy=True, paused=False, threaded=True, registers=None):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.lock = QMutex()
self.simulate = "--sim-vision" in sys.argv
self.model = None
def config_changed(self):
# OBJECT DETECTION
self.detection_threshold = float(self.config[self.name]["detection_threshold"])
# recipe
self.zones = None
self.labels = None
# LOAD RECIPE
self.recipes_dir = Path(self.config[self.name].get("recipes_dir", "./config/vision/recipes"))
self.set_recipe(None)
self.recipe_watcher = QFileSystemWatcher([])
self.recipe_watcher.fileChanged.connect(self._set_recipe)
# LOAD MODEL
self.models_dir = Path(self.config[self.name].get("models_dir", "./data/neural_networks"))
self.allowed_modes = dict.fromkeys([
"edgetpu",
"tflite_cpu",
"normal",
])
if "--no-edgetpu" in sys.argv:
self.allowed_modes.pop("edgetpu", None)
if "--no-tflite" in sys.argv:
self.allowed_modes.pop("edgetpu", None)
self.allowed_modes.pop("tflite_cpu", None)
self.load_model(self.config[self.name].get("neural_network", None))
# LOAD LABELS
label_map = label_map_util.load_labelmap("./config/vision/labels/labels.pbtxt")
self.num_classes = len(label_map.item)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.category_index = label_map_util.create_category_index(categories)
self.classes_map = {c["name"]: k for k, c in self.category_index.items()}
def clear_recipe(self):
self.recipe = None
self.points = {}
self.zones = {}
self.labels = {}
def _set_recipe(self, recipe_path):
recipe_path = str(recipe_path)
self.log.info(f"changing recipe to {recipe_path!r}")
watched = self.recipe_watcher.files()
if recipe_path == "" and len(watched) == 0: # skip bad watcher signals
return
if len(watched) > 0:
self.recipe_watcher.removePaths(watched)
self.recipe_path = recipe_path
try:
if not os.path.isfile(self.recipe_path):
raise AssertionError(f"Recipe file {self.recipe_path!r} could not be found.")
config = ConfigParser(inline_comment_prefixes="#")
read = config.read(self.recipe_path)
if len(read) != 1 or self.recipe_path not in read:
raise AssertionError("Recipe could not be read.")
os.path.splitext(os.path.basename(read[0]))[0]
self.points = self.parse_points(config.get["shapes"])
self.zones = self.parse_zones(config.get["zones"])
self.labels = self.parse_labels(config.get("labels", None))
self.recipe_watcher.addPath(str(self.recipe_path))
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"Error reading {self.recipe_path!r}:")
self.clear_recipe()
self.status_signal.emit({
"recipe": self.recipe,
"points": self.points,
"zones": self.zones,
"labels": self.labels,
})
def set_recipe(self, recipe=None):
if recipe is None:
self.clear_recipe()
else:
self._set_recipe(self.recipes_dir / str(recipe))
def parse_points(self, config=None):
if config is None:
raise AssertionError(f"Recipe file {self.recipe_path!r} does not contain the 'shapes' section.")
config = {}
points = {}
for point_name, point_spec in config.items():
try:
center, size, fill_color, border_color, border_thickness, shape = point_spec.split(" ")
center = list(map(float, center.split(",")))
center[1] = -center[1]
size = list(map(float, size.split(",")))
if len(size) == 1:
size = [size[0], size[0]]
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
shape = shape.lower()
points[point_name] = {
"center": center,
"size": size,
"fill_color": fill_color,
"border_color": border_color,
"border_thickness": border_thickness,
"shape": shape,
}
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"point {point_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {point_spec!r}")
return points
def parse_zones(self, config=None):
if config is None:
raise AssertionError(f"Recipe file {self.recipe_path!r} does not contain the 'zones' section.")
config = {}
zones = {}
for zone_name, zone_spec in config.items():
zone_name = zone_name.upper()
try:
center, margin, d_class = zone_spec.split(" ")
center = list(map(float, center.split(",")))
center[1] = -center[1]
if margin == "none":
margin = None
elif "," in margin:
margin = list(map(float, margin.split(",")))
else:
margin = float(margin)
zones[zone_name] = {
"center": center,
"margin": margin,
"class": self.category_index[self.classes_map[d_class]],
}
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"region {zone_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {zone_spec!r}")
return zones
def parse_labels(self, config=None):
if config is None:
config = {}
labels = {}
for label_name, label_spec in config.items():
try:
location, font_size, fill_color, border_color, border_thickness, text = label_spec.split(" ", 5)
location = list(map(float, location.split(",")))
location[1] = -location[1]
font_size = float(font_size)
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
text = text.replace("\\n", "\n").replace("\\t", "\t")
labels[label_name] = {
"location": location,
"font_size": font_size,
"fill_color": fill_color,
"border_color": border_color,
"border_thickness": border_thickness,
"text": text,
}
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"label {label_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {label_spec!r}")
return labels
def zone_center(self, zone):
return (int((zone["xmax"] + zone["xmin"]) / 2), int((zone["ymax"] + zone["ymin"]) / 2))
def get_center(self, rect):
return [(rect[0] + rect[2]) / 2, (rect[1] + rect[3]) / 2]
def get_distance(self, p1, p2):
return pow((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2, 1 / 2)
def load_model(self, model=None):
self.log.info(f"requested neural network: {model!r}")
if model is None or model.lower() in [
"",
"any",
"last",
"latest",
"newest",
"none",
]:
model_name = sorted([d for d in os.listdir(self.models_dir) if os.path.isdir(self.models_dir / d)], reverse=True)[0]
self.log.info(f"loading neural network: {model_name!r}")
self.loading_model_signal.emit({"status": "loading"})
self.lock.lock()
tf_mode = None
# reset tflite variables
interpreter = None
if tf_mode is None and "edgetpu" in self.allowed_modes:
try:
# create tflite edgetpu interpreter
interpreter = make_interpreter(self.models_dir / model_name / f"{model_name}_edgetpu.tflite")
tf_mode = "edgetpu"
except Exception:
self.log.exception(traceback.format_exc())
if tf_mode is None and "tflite_cpu" in self.allowed_modes:
try:
# create tflite cpu interpreter
interpreter = Interpreter(self.models_dir / model_name / f"{model_name}.tflite")
tf_mode = "tflite_cpu"
except Exception:
self.log.exception(traceback.format_exc())
# reset tensorflow variables
model = None
if tf_mode is None and "normal" in self.allowed_modes:
try:
# create tensorflow model
model = tf.saved_model.load(self.models_dir / model_name)
tf_mode = "normal"
except Exception:
self.log.exception(traceback.format_exc())
self.lock.unlock()
if tf_mode is None:
raise RuntimeError("failed initialize any neural network model")
self.tf_mode = tf_mode
self.model_name = model_name
if interpreter is not None:
# if there is a new tflite interpreter initialize it and the related values
interpreter.allocate_tensors()
interpreter.invoke() # warmup
self.tflite_input_details = interpreter.get_input_details()
self.tflite_output_details = interpreter.get_output_details()
self.inf_index = self.tflite_input_details[0]["index"]
self.inf_width = self.tflite_input_details[0]["shape"][2]
self.inf_height = self.tflite_input_details[0]["shape"][1]
else:
self.tflite_input_details = None
self.tflite_output_details = None
self.inf_index = None
self.inf_width = None
self.inf_height = None
self.interpreter = interpreter
# if there is a new model to be used, remove previous model if present
if model is not None and self.model is not None:
tf.keras.backend.clear_session()
self.model = model
self.log.info(f"initialized model {self.model!r} with mode {self.tf_mode!r}")
self.loading_model_signal.emit({"status": "done"})
def check_features(self, image, lock=True):
tensor = np.expand_dims(np.asarray(image), axis=0)
# Run inference
if lock:
self.lock.lock()
if self.tf_mode in {"edgetpu", "tflite_cpu"}:
self.interpreter.set_tensor(self.inf_index, tensor)
self.interpreter.invoke()
objs = detect.get_objects(self.interpreter, self.detection_threshold, (1, 1))
if lock:
self.lock.unlock()
boxes = [(obj.bbox.ymin / self.inf_height, obj.bbox.xmin / self.inf_width, obj.bbox.ymax / self.inf_height, obj.bbox.xmax / self.inf_width) for obj in objs]
classes = [obj.id + 1 for obj in objs]
scores = [obj.score for obj in objs]
detections = {
"detection_boxes": boxes,
"detection_classes": classes,
"detection_scores": scores,
"num_detections": 10,
}
else:
detections = self.model(tensor)
if lock:
self.lock.unlock()
parsed_detections = []
for d_box, d_class, d_score, d_mask in zip(
detections["detection_boxes"][0],
detections["detection_classes"][0],
detections["detection_scores"][0],
detections["detection_masks"][0],
):
if d_score < self.detection_threshold:
continue
center = self.get_center(d_box.numpy().tolist())
detection = {
"class": self.category_index[int(d_class)]["name"],
"color": self.category_index[int(d_class)]["color_qt"],
"score": d_score.numpy().tolist(),
"mask": d_mask.numpy().tolist(),
"box": d_box.numpy().tolist(),
"center": center,
"pos_rel_mm": self.get_pos_rel_mm(center),
}
parsed_detections.append(detection)
return parsed_detections
def _get(self, data):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if not self.lock.tryLock():
self.log.debug("skipped frame")
return
self.log.debug("detecting...")
detections = self.check_features(data[-1][list(self.sources)[0]], lock=False)
self.lock.unlock()
self.log.debug(f"detected {detections}")
super()._get([detections])

172
src/components/wires.py Normal file
View File

@ -0,0 +1,172 @@
import gc
import os
import random
import re
import sys
import numpy as np
import tensorflow as tf
from lib.helpers import log_msg
from lib.object_detection.utils import label_map_util
from PyQt5.QtCore import QMutex, pyqtSignal, pyqtSlot
from .input import Input
# Patch the location of gfile
tf.gfile = tf.io.gfile
class Wires(Input):
loading_model_signal = pyqtSignal(dict)
def __init__(self, bench, name, config):
super().__init__(bench, name, config)
self.simulate = "--sim-vision" in sys.argv
self.source = self.bench.inputs[self.config["source"]]
self.model_name = self.config["model"]
self.num_classes = self.config["num_classes"]
self.matching_distance = self.config["matching_distance"]
self.crop = self.bench.zones["wires"]["box"]
self.subzones = {k: self.bench.zones[k] for k in self.bench.zones if re.search("w[0-9].*", k)}
self.threshold = self.config["threshold"]
if not self.simulate:
# MODEL
self.model = None
self.model_lock = QMutex()
self.load_model(self.model_name)
label_map = label_map_util.load_labelmap("config/vision_test_labels/wires.pbtxt")
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.category_index = label_map_util.create_category_index(categories)
# TO INITIALIZE TENSORFLOW MODELS
if not self.simulate:
img = np.zeros([1, 1, 3], dtype=np.uint8)
input_tensor = tf.convert_to_tensor(img)
input_tensor = input_tensor[tf.newaxis, ...]
self.model_lock.lock()
self.bench.gpu_mutex.lock()
self.model(input_tensor)
self.bench.gpu_mutex.unlock()
self.model_lock.unlock()
self.last_frame = None
def load_model(self, model_name=None):
log_msg("WIRES NEURAL NETWORK MODEL CHOSEN: {}".format(model_name))
if model_name.lower() in [
"",
"any",
"last",
"latest",
"newest",
"none",
None,
]:
model_name = sorted([d for d in os.listdir("neural_networks") if os.path.isdir(os.path.join("neural_networks", d)) and d.startswith("w")], reverse=True)[0]
self.loading_model_signal.emit({"status": "loading"})
self.model_lock.lock()
log_msg(f"LOADING WIRES NEURAL NETWORK MODEL: {model_name}", msg_type="tensorflow")
try:
with tf.device('/device:GPU:1'):
model = tf.saved_model.load(f"neural_networks/{model_name}")
# if "--wires-tf-default-signature" in sys.argv:
# model = model.signatures["serving_default"]
self.model_name = model_name
except Exception as e:
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "aborted"})
raise e
if self.model is not None:
self.model = None
tf.keras.backend.clear_session()
gc.collect()
self.model = model
self.model_lock.unlock()
self.loading_model_signal.emit({"status": "done"})
@pyqtSlot(list)
def _get(self, frame):
img = frame[1]
img = img[self.crop[1]:self.crop[3], self.crop[0]:self.crop[2]]
self.last_frame = img
if self.simulate:
w = self.crop[2] - self.crop[0]
h = self.crop[3] - self.crop[1]
detected_zones = {}
for zone_name, zone in self.subzones.items():
detected_zones[zone_name] = {
"class": random.choice(list(self.category_index.values())),
"score": 1,
"box": [
(zone["box"][1] - self.crop[1]) / h,
(zone["box"][0] - self.crop[0]) / w,
(zone["box"][3] - self.crop[1]) / h,
(zone["box"][2] - self.crop[0]) / w,
],
"mask": np.full((15, 15), 0, dtype=np.float32).tolist(),
"center": [
(zone["center"][1] - self.crop[1]) / h,
(zone["center"][0] - self.crop[0]) / w,
],
}
# self.update.emit([frame[0], detected_zones, self.name])
self.out.emit([frame[0], detected_zones, self.name])
return
input_tensor = tf.convert_to_tensor(np.asarray(img))
input_tensor = input_tensor[tf.newaxis, ...]
if not self.simulate:
# RUN INFERENCE
self.bench.gpu_mutex.lock()
output = self.model(input_tensor)
self.bench.gpu_mutex.unlock()
else:
output = {"detection_classes": [[None]], "detection_scores": [[0]], "detection_boxes": [[None]], "detection_masks": [[None]]}
# create detections
detections = []
for d_class, d_score, d_box, d_mask in zip(output["detection_classes"][0], output["detection_scores"][0], output["detection_boxes"][0], output["detection_masks"][0]):
if d_score < self.threshold:
continue
detections.append({
"class": self.category_index[int(d_class)],
"score": d_score.numpy().tolist(),
"box": d_box.numpy().tolist(),
"mask": d_mask.numpy().tolist(),
"center": self.bench.get_center(d_box.numpy().tolist()),
})
# match detections with zones
w = self.crop[2] - self.crop[0]
h = self.crop[3] - self.crop[1]
detected_zones = {zone_name: None for zone_name in self.subzones}
for detection in detections:
d_center = [self.crop[0] + detection["center"][1] * w, self.crop[1] + detection["center"][0] * h]
min_distance = sys.maxsize
closest_zone = None
for zone_name, zone in self.subzones.items():
distance = self.bench.get_distance(d_center, zone["center"])
if distance < min_distance and distance <= self.matching_distance:
min_distance = distance
closest_zone = zone_name
if closest_zone is None:
continue
if detected_zones[closest_zone] is not None:
old_center = [self.crop[0] + detected_zones[closest_zone]["center"][1] * w, self.crop[1] + detected_zones[closest_zone]["center"][0] * h]
if self.bench.get_distance(old_center, zone["center"]) <= self.bench.get_distance(d_center, zone["center"]):
continue
detected_zones[closest_zone] = detection
for zone_name, zone in self.subzones.items():
if detected_zones[zone_name] is None:
detected_zones[zone_name] = {
"class": {"id": None, "name": "no_detection", "color": "rgb(0,0,0)", "color_qt": [0, 0, 0]},
"score": 1,
"box": [
(zone["box"][1] - self.crop[1]) / h,
(zone["box"][0] - self.crop[0]) / w,
(zone["box"][3] - self.crop[1]) / h,
(zone["box"][2] - self.crop[0]) / w,
],
"mask": np.full((15, 15), 0, dtype=np.float32).tolist(),
"center": [
(zone["center"][1] - self.crop[1]) / h,
(zone["center"][0] - self.crop[0]) / w,
],
}
# self.update.emit([frame[0], detected_zones, self.name])
self.out.emit([frame[0], detected_zones, self.name])

View File

@ -0,0 +1,61 @@
// Message to store the mapping from class label strings to class id. Datasets
// use string labels to represent classes while the object detection framework
// works with class ids. This message maps them so they can be converted back
// and forth as needed.
syntax = "proto2";
package object_detection.protos;
// LVIS frequency:
enum LVISFrequency {
UNSPECIFIED = 0;
FREQUENT = 1;
COMMON = 2;
RARE = 3;
}
message StringIntLabelMapItem {
// String name. The most common practice is to set this to a MID or synsets
// id.
optional string name = 1;
// Integer id that maps to the string name above. Label ids should start from
// 1.
optional int32 id = 2;
// Human readable string label.
optional string display_name = 3;
// Label color for rendering.
optional string color = 9;
// Name of class specific keypoints for each class object and their respective
// keypoint IDs.
message KeypointMap {
// Id for the keypoint. Id must be unique within a given class, however, it
// could be shared across classes. For example "nose" keypoint can occur
// in both "face" and "person" classes. Hence they can be mapped to the same
// id.
//
// Note: It is advised to assign ids in range [1, num_unique_keypoints] to
// encode keypoint targets efficiently.
optional int32 id = 1;
// Label for the keypoint.
optional string label = 2;
}
repeated KeypointMap keypoints = 4;
// Label ids for the elements that are connected in the hierarchy with the
// current element. Value should correspond to another label id element.
repeated int32 ancestor_ids = 5;
repeated int32 descendant_ids = 6;
// LVIS specific label map fields
optional LVISFrequency frequency = 7;
optional int32 instance_count = 8;
};
message StringIntLabelMap {
repeated StringIntLabelMapItem item = 1;
};

View File

@ -0,0 +1,258 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: src/lib/helpers/object_detection/protos/string_int_label_map.proto
from google.protobuf.internal import enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='src/lib/helpers/object_detection/protos/string_int_label_map.proto',
package='object_detection.protos',
syntax='proto2',
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\nBsrc/lib/helpers/object_detection/protos/string_int_label_map.proto\x12\x17object_detection.protos\"\xd0\x02\n\x15StringIntLabelMapItem\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x14\n\x0c\x64isplay_name\x18\x03 \x01(\t\x12\r\n\x05\x63olor\x18\t \x01(\t\x12M\n\tkeypoints\x18\x04 \x03(\x0b\x32:.object_detection.protos.StringIntLabelMapItem.KeypointMap\x12\x14\n\x0c\x61ncestor_ids\x18\x05 \x03(\x05\x12\x16\n\x0e\x64\x65scendant_ids\x18\x06 \x03(\x05\x12\x39\n\tfrequency\x18\x07 \x01(\x0e\x32&.object_detection.protos.LVISFrequency\x12\x16\n\x0einstance_count\x18\x08 \x01(\x05\x1a(\n\x0bKeypointMap\x12\n\n\x02id\x18\x01 \x01(\x05\x12\r\n\x05label\x18\x02 \x01(\t\"Q\n\x11StringIntLabelMap\x12<\n\x04item\x18\x01 \x03(\x0b\x32..object_detection.protos.StringIntLabelMapItem*D\n\rLVISFrequency\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\x0c\n\x08\x46REQUENT\x10\x01\x12\n\n\x06\x43OMMON\x10\x02\x12\x08\n\x04RARE\x10\x03'
)
_LVISFREQUENCY = _descriptor.EnumDescriptor(
name='LVISFrequency',
full_name='object_detection.protos.LVISFrequency',
filename=None,
file=DESCRIPTOR,
create_key=_descriptor._internal_create_key,
values=[
_descriptor.EnumValueDescriptor(
name='UNSPECIFIED', index=0, number=0,
serialized_options=None,
type=None,
create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='FREQUENT', index=1, number=1,
serialized_options=None,
type=None,
create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='COMMON', index=2, number=2,
serialized_options=None,
type=None,
create_key=_descriptor._internal_create_key),
_descriptor.EnumValueDescriptor(
name='RARE', index=3, number=3,
serialized_options=None,
type=None,
create_key=_descriptor._internal_create_key),
],
containing_type=None,
serialized_options=None,
serialized_start=517,
serialized_end=585,
)
_sym_db.RegisterEnumDescriptor(_LVISFREQUENCY)
LVISFrequency = enum_type_wrapper.EnumTypeWrapper(_LVISFREQUENCY)
UNSPECIFIED = 0
FREQUENT = 1
COMMON = 2
RARE = 3
_STRINGINTLABELMAPITEM_KEYPOINTMAP = _descriptor.Descriptor(
name='KeypointMap',
full_name='object_detection.protos.StringIntLabelMapItem.KeypointMap',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='object_detection.protos.StringIntLabelMapItem.KeypointMap.id', index=0,
number=1, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='label', full_name='object_detection.protos.StringIntLabelMapItem.KeypointMap.label', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=392,
serialized_end=432,
)
_STRINGINTLABELMAPITEM = _descriptor.Descriptor(
name='StringIntLabelMapItem',
full_name='object_detection.protos.StringIntLabelMapItem',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='object_detection.protos.StringIntLabelMapItem.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='id', full_name='object_detection.protos.StringIntLabelMapItem.id', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='display_name', full_name='object_detection.protos.StringIntLabelMapItem.display_name', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='color', full_name='object_detection.protos.StringIntLabelMapItem.color', index=3,
number=9, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='keypoints', full_name='object_detection.protos.StringIntLabelMapItem.keypoints', index=4,
number=4, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='ancestor_ids', full_name='object_detection.protos.StringIntLabelMapItem.ancestor_ids', index=5,
number=5, type=5, cpp_type=1, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='descendant_ids', full_name='object_detection.protos.StringIntLabelMapItem.descendant_ids', index=6,
number=6, type=5, cpp_type=1, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='frequency', full_name='object_detection.protos.StringIntLabelMapItem.frequency', index=7,
number=7, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='instance_count', full_name='object_detection.protos.StringIntLabelMapItem.instance_count', index=8,
number=8, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[_STRINGINTLABELMAPITEM_KEYPOINTMAP, ],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=96,
serialized_end=432,
)
_STRINGINTLABELMAP = _descriptor.Descriptor(
name='StringIntLabelMap',
full_name='object_detection.protos.StringIntLabelMap',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='item', full_name='object_detection.protos.StringIntLabelMap.item', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=434,
serialized_end=515,
)
_STRINGINTLABELMAPITEM_KEYPOINTMAP.containing_type = _STRINGINTLABELMAPITEM
_STRINGINTLABELMAPITEM.fields_by_name['keypoints'].message_type = _STRINGINTLABELMAPITEM_KEYPOINTMAP
_STRINGINTLABELMAPITEM.fields_by_name['frequency'].enum_type = _LVISFREQUENCY
_STRINGINTLABELMAP.fields_by_name['item'].message_type = _STRINGINTLABELMAPITEM
DESCRIPTOR.message_types_by_name['StringIntLabelMapItem'] = _STRINGINTLABELMAPITEM
DESCRIPTOR.message_types_by_name['StringIntLabelMap'] = _STRINGINTLABELMAP
DESCRIPTOR.enum_types_by_name['LVISFrequency'] = _LVISFREQUENCY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
StringIntLabelMapItem = _reflection.GeneratedProtocolMessageType('StringIntLabelMapItem', (_message.Message,), {
'KeypointMap' : _reflection.GeneratedProtocolMessageType('KeypointMap', (_message.Message,), {
'DESCRIPTOR' : _STRINGINTLABELMAPITEM_KEYPOINTMAP,
'__module__' : 'src.lib.helpers.object_detection.protos.string_int_label_map_pb2'
# @@protoc_insertion_point(class_scope:object_detection.protos.StringIntLabelMapItem.KeypointMap)
})
,
'DESCRIPTOR' : _STRINGINTLABELMAPITEM,
'__module__' : 'src.lib.helpers.object_detection.protos.string_int_label_map_pb2'
# @@protoc_insertion_point(class_scope:object_detection.protos.StringIntLabelMapItem)
})
_sym_db.RegisterMessage(StringIntLabelMapItem)
_sym_db.RegisterMessage(StringIntLabelMapItem.KeypointMap)
StringIntLabelMap = _reflection.GeneratedProtocolMessageType('StringIntLabelMap', (_message.Message,), {
'DESCRIPTOR' : _STRINGINTLABELMAP,
'__module__' : 'src.lib.helpers.object_detection.protos.string_int_label_map_pb2'
# @@protoc_insertion_point(class_scope:object_detection.protos.StringIntLabelMap)
})
_sym_db.RegisterMessage(StringIntLabelMap)
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,366 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Label map utility functions."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import logging
import numpy as np
from six import string_types
from six.moves import range
import tensorflow.compat.v1 as tf
from google.protobuf import text_format
from lib.helpers.object_detection.protos import string_int_label_map_pb2
_LABEL_OFFSET = 1
def _validate_label_map(label_map):
"""Checks if a label map is valid.
Args:
label_map: StringIntLabelMap to validate.
Raises:
ValueError: if label map is invalid.
"""
for item in label_map.item:
if item.id < 0:
raise ValueError('Label map ids should be >= 0.')
if (item.id == 0 and item.name != 'background' and
item.display_name != 'background'):
raise ValueError('Label map id 0 is reserved for the background label')
def create_category_index(categories):
"""Creates dictionary of COCO compatible categories keyed by category id.
Args:
categories: a list of dicts, each of which has the following keys:
'id': (required) an integer id uniquely identifying this category.
'name': (required) string representing category name
e.g., 'cat', 'dog', 'pizza'.
Returns:
category_index: a dict containing the same entries as categories, but keyed
by the 'id' field of each category.
"""
category_index = {}
for cat in categories:
category_index[cat['id']] = cat
return category_index
def get_max_label_map_index(label_map):
"""Get maximum index in label map.
Args:
label_map: a StringIntLabelMapProto
Returns:
an integer
"""
return max([item.id for item in label_map.item])
def convert_label_map_to_categories(label_map,
max_num_classes,
use_display_name=True):
"""Given label map proto returns categories list compatible with eval.
This function converts label map proto and returns a list of dicts, each of
which has the following keys:
'id': (required) an integer id uniquely identifying this category.
'name': (required) string representing category name
e.g., 'cat', 'dog', 'pizza'.
'keypoints': (optional) a dictionary of keypoint string 'label' to integer
'id'.
We only allow class into the list if its id-label_id_offset is
between 0 (inclusive) and max_num_classes (exclusive).
If there are several items mapping to the same id in the label map,
we will only keep the first one in the categories list.
Args:
label_map: a StringIntLabelMapProto or None. If None, a default categories
list is created with max_num_classes categories.
max_num_classes: maximum number of (consecutive) label indices to include.
use_display_name: (boolean) choose whether to load 'display_name' field as
category name. If False or if the display_name field does not exist, uses
'name' field as category names instead.
Returns:
categories: a list of dictionaries representing all possible categories.
"""
categories = []
list_of_ids_already_added = []
if not label_map:
label_id_offset = 1
for class_id in range(max_num_classes):
categories.append({
'id': class_id + label_id_offset,
'name': 'category_{}'.format(class_id + label_id_offset)
})
return categories
for item in label_map.item:
if not 0 < item.id <= max_num_classes:
logging.info(
'Ignore item %d since it falls outside of requested '
'label range.', item.id)
continue
if use_display_name and item.HasField('display_name'):
name = item.display_name
else:
name = item.name
if item.id not in list_of_ids_already_added:
list_of_ids_already_added.append(item.id)
category = {'id': item.id, 'name': name}
if item.HasField('frequency'):
if item.frequency == string_int_label_map_pb2.LVISFrequency.Value(
'FREQUENT'):
category['frequency'] = 'f'
elif item.frequency == string_int_label_map_pb2.LVISFrequency.Value(
'COMMON'):
category['frequency'] = 'c'
elif item.frequency == string_int_label_map_pb2.LVISFrequency.Value(
'RARE'):
category['frequency'] = 'r'
if item.HasField('instance_count'):
category['instance_count'] = item.instance_count
if item.keypoints:
keypoints = {}
list_of_keypoint_ids = []
for kv in item.keypoints:
if kv.id in list_of_keypoint_ids:
raise ValueError('Duplicate keypoint ids are not allowed. '
'Found {} more than once'.format(kv.id))
keypoints[kv.label] = kv.id
list_of_keypoint_ids.append(kv.id)
category['keypoints'] = keypoints
categories.append(category)
return categories
def load_labelmap(path):
"""Loads label map proto.
Args:
path: path to StringIntLabelMap proto text file.
Returns:
a StringIntLabelMapProto
"""
with tf.io.gfile.GFile(path, 'r') as fid:
label_map_string = fid.read()
label_map = string_int_label_map_pb2.StringIntLabelMap()
try:
text_format.Merge(label_map_string, label_map)
except text_format.ParseError:
label_map.ParseFromString(label_map_string)
_validate_label_map(label_map)
return label_map
def get_label_map_dict(label_map_path_or_proto,
use_display_name=False,
fill_in_gaps_and_background=False):
"""Reads a label map and returns a dictionary of label names to id.
Args:
label_map_path_or_proto: path to StringIntLabelMap proto text file or the
proto itself.
use_display_name: whether to use the label map items' display names as keys.
fill_in_gaps_and_background: whether to fill in gaps and background with
respect to the id field in the proto. The id: 0 is reserved for the
'background' class and will be added if it is missing. All other missing
ids in range(1, max(id)) will be added with a dummy class name
("class_<id>") if they are missing.
Returns:
A dictionary mapping label names to id.
Raises:
ValueError: if fill_in_gaps_and_background and label_map has non-integer or
negative values.
"""
if isinstance(label_map_path_or_proto, string_types):
label_map = load_labelmap(label_map_path_or_proto)
else:
_validate_label_map(label_map_path_or_proto)
label_map = label_map_path_or_proto
label_map_dict = {}
for item in label_map.item:
if use_display_name:
label_map_dict[item.display_name] = item.id
else:
label_map_dict[item.name] = item.id
if fill_in_gaps_and_background:
values = set(label_map_dict.values())
if 0 not in values:
label_map_dict['background'] = 0
if not all(isinstance(value, int) for value in values):
raise ValueError('The values in label map must be integers in order to'
'fill_in_gaps_and_background.')
if not all(value >= 0 for value in values):
raise ValueError('The values in the label map must be positive.')
if len(values) != max(values) + 1:
# there are gaps in the labels, fill in gaps.
for value in range(1, max(values)):
if value not in values:
# TODO(rathodv): Add a prefix 'class_' here once the tool to generate
# teacher annotation adds this prefix in the data.
label_map_dict[str(value)] = value
return label_map_dict
def get_keypoint_label_map_dict(label_map_path_or_proto):
"""Reads a label map and returns a dictionary of keypoint names to ids.
Note that the keypoints belong to different classes will be merged into a
single dictionary. It is expected that there is no duplicated keypoint names
or ids from different classes.
Args:
label_map_path_or_proto: path to StringIntLabelMap proto text file or the
proto itself.
Returns:
A dictionary mapping keypoint names to the keypoint id (not the object id).
Raises:
ValueError: if there are duplicated keyoint names or ids.
"""
if isinstance(label_map_path_or_proto, string_types):
label_map = load_labelmap(label_map_path_or_proto)
else:
label_map = label_map_path_or_proto
label_map_dict = {}
for item in label_map.item:
for kpts in item.keypoints:
if kpts.label in label_map_dict.keys():
raise ValueError('Duplicated keypoint label: %s' % kpts.label)
if kpts.id in label_map_dict.values():
raise ValueError('Duplicated keypoint ID: %d' % kpts.id)
label_map_dict[kpts.label] = kpts.id
return label_map_dict
def get_label_map_hierarchy_lut(label_map_path_or_proto,
include_identity=False):
"""Reads a label map and returns ancestors and descendants in the hierarchy.
The function returns the ancestors and descendants as separate look up tables
(LUT) numpy arrays of shape [max_id, max_id] where lut[i,j] = 1 when there is
a hierarchical relationship between class i and j.
Args:
label_map_path_or_proto: path to StringIntLabelMap proto text file or the
proto itself.
include_identity: Boolean to indicate whether to include a class element
among its ancestors and descendants. Setting this will result in the lut
diagonal being set to 1.
Returns:
ancestors_lut: Look up table with the ancestors.
descendants_lut: Look up table with the descendants.
"""
if isinstance(label_map_path_or_proto, string_types):
label_map = load_labelmap(label_map_path_or_proto)
else:
_validate_label_map(label_map_path_or_proto)
label_map = label_map_path_or_proto
hierarchy_dict = {
'ancestors': collections.defaultdict(list),
'descendants': collections.defaultdict(list)
}
max_id = -1
for item in label_map.item:
max_id = max(max_id, item.id)
for ancestor in item.ancestor_ids:
hierarchy_dict['ancestors'][item.id].append(ancestor)
for descendant in item.descendant_ids:
hierarchy_dict['descendants'][item.id].append(descendant)
def get_graph_relations_tensor(graph_relations):
graph_relations_tensor = np.zeros([max_id, max_id])
for id_val, ids_related in graph_relations.items():
id_val = int(id_val) - _LABEL_OFFSET
for id_related in ids_related:
id_related -= _LABEL_OFFSET
graph_relations_tensor[id_val, id_related] = 1
if include_identity:
graph_relations_tensor += np.eye(max_id)
return graph_relations_tensor
ancestors_lut = get_graph_relations_tensor(hierarchy_dict['ancestors'])
descendants_lut = get_graph_relations_tensor(hierarchy_dict['descendants'])
return ancestors_lut, descendants_lut
def create_categories_from_labelmap(label_map_path, use_display_name=True):
"""Reads a label map and returns categories list compatible with eval.
This function converts label map proto and returns a list of dicts, each of
which has the following keys:
'id': an integer id uniquely identifying this category.
'name': string representing category name e.g., 'cat', 'dog'.
'keypoints': a dictionary of keypoint string label to integer id. It is only
returned when available in label map proto.
Args:
label_map_path: Path to `StringIntLabelMap` proto text file.
use_display_name: (boolean) choose whether to load 'display_name' field
as category name. If False or if the display_name field does not exist,
uses 'name' field as category names instead.
Returns:
categories: a list of dictionaries representing all possible categories.
"""
label_map = load_labelmap(label_map_path)
max_num_classes = max(item.id for item in label_map.item)
return convert_label_map_to_categories(label_map, max_num_classes,
use_display_name)
def create_category_index_from_labelmap(label_map_path, use_display_name=True):
"""Reads a label map and returns a category index.
Args:
label_map_path: Path to `StringIntLabelMap` proto text file.
use_display_name: (boolean) choose whether to load 'display_name' field
as category name. If False or if the display_name field does not exist,
uses 'name' field as category names instead.
Returns:
A category index, which is a dictionary that maps integer ids to dicts
containing categories, e.g.
{1: {'id': 1, 'name': 'dog'}, 2: {'id': 2, 'name': 'cat'}, ...}
"""
categories = create_categories_from_labelmap(label_map_path, use_display_name)
return create_category_index(categories)
def create_class_agnostic_category_index():
"""Creates a category index with a single `object` class."""
return {1: {'id': 1, 'name': 'object'}}

View File

@ -48,8 +48,10 @@ logging.basicConfig(
try:
# IMPORT PROJECT ONLY AFTER SETTING UP SIGNAL, FAULTHANDLER AND LOGGHING
from components import (ArchiveSynchronizer, Os_Label_Printer, RemoteAPI,
TecnaMarpossProvasetT3, TestComponent, VisionSaver)
from components import (ArchiveSynchronizer, GalaxyCamera,
Os_Label_Printer, RemoteAPI,
TecnaMarpossProvasetT3, TestComponent, Vision,
VisionSaver)
from lib.db import Users
from lib.helpers import ConfigReader
from PyQt5.QtCore import QObject, QThread, pyqtSignal
@ -74,11 +76,13 @@ try:
# INIT COMPONENT
self.components_specs = {
"archive_synchronizer": {"c": ArchiveSynchronizer},
"remote_api": {"c": RemoteAPI, "k": {"main": self}},
"test_component": {"c": TestComponent},
"vision_savert": {"c": VisionSaver, "t": False},
"galaxy_camera": {"c": GalaxyCamera},
"label_printer": {"c": Os_Label_Printer, "t": False},
"remote_api": {"c": RemoteAPI, "k": {"main": self}},
"tecna_marposs_provaset_t3": {"c": TecnaMarpossProvasetT3},
"test_component": {"c": TestComponent},
"vision_saver": {"c": VisionSaver, "t": False},
"vision": {"c": Vision},
}
self.components = {}
self.threads = {}
@ -92,11 +96,17 @@ try:
component = self.components[component_name]
thread.started.connect(component.start)
thread.start()
component.wait_ready()
if component_name == "vision":
component.wait_ready(timeout=60)
else:
component.wait_ready()
except Exception as e:
logging.exception(traceback.format_exc())
QMessageBox.critical(None, "Errore Banco", f"Non e stato possibile connettersi al banco:\n\n{e}")
quit()
# CONNECT VISION WORKFLOW
self.components["vision"].set_sources({"galaxy_camera": self.components["galaxy_camera"].out})
self.components["vision"].wait_ready()
# GUI INIT
# self.main_window = Main_Window(self.bench)
self.main_window = Main_Window()
@ -124,7 +134,7 @@ try:
elif "--full-screen" in sys.argv:
self.main_window.showFullScreen()
else:
self.main_window.show()
self.main_window.showFullScreen()
def open_archive(self):
self.main_window.open_dialog(Archive())

View File

@ -1,12 +1,16 @@
--extra-index-url https://google-coral.github.io/py-repo/
argon2-cffi
bottle
google-cloud-storage
imutils
numpy
opencv-python-headless
peewee
pillow
pycoral
pymodbus
pyqt5
pyserial
requests
tensorflow
zebra

BIN
src/ui/imgs/camera.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB