st-ten-1/src/main.py
2024-10-24 13:34:14 +02:00

302 lines
14 KiB
Python

#!/usr/bin/env python3
import argparse
import faulthandler
import logging
import os
import platform
import signal
import sys
import traceback
import weakref
from datetime import datetime
from pathlib import Path
from ui.diagnostics import Diagnostics
from lib.helpers.single_process import SingleProcess
if platform.system().lower() == "windows":
sys.path.append(f"{os.getcwd()}\src\components")
else:
sys.path.append(f"{os.getcwd()}/src/components")
app = None
parser = argparse.ArgumentParser(prog='ST-TEN', description='Leak test system')
parser.add_argument('-s', '--system-id')
args, unspec = parser.parse_known_args()
def quit_app(signalnum=None, handler=None):
logging.info(f"quitting app. signal: {signalnum!r}, handler: {handler!r}")
global app
if app is not None:
app.quit()
quit()
# SETUP QUITTING ON CTRL+C
signal.signal(signal.SIGINT, quit_app)
# SETUP FAULTHANDLER
faulthandler.enable(file=sys.stderr, all_threads=True)
# SETUP LOGS
logs_dir = Path(".") / "data" / "logs"
os.makedirs(logs_dir, exist_ok=True)
logging.basicConfig(
format="{asctime}:{name}:{levelname}:{message}",
datefmt="%Y-%m-%d_%H-%M-%S",
style="{",
level="DEBUG" if "--debug" in sys.argv else "INFO",
handlers=[
logging.StreamHandler(stream=sys.stderr),
logging.FileHandler(
logs_dir / f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log",
mode="a",
encoding="utf-8",
delay=False,
**({"errors": "surrogateescape"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}),
),
],
force=True,
**({"encoding": "utf-8"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}),
**({"errors": "surrogateescape"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}),
)
try:
# IMPORT PROJECT ONLY AFTER SETTING UP SIGNAL, FAULTHANDLER AND LOGGING
from components import (ArchiveSynchronizer, Multicomp730424,
Os_Label_Printer, RemoteAPI,
TecnaMarpossProvasetT3, FurnessControlsLeakTester, TecnaScrewdriver, USB_586x, RFID_PN532,BrotherLabelPrinter)
from lib.db import Users
from lib.helpers import ConfigReader
from PyQt6.QtCore import QObject, QThread, pyqtSignal, pyqtSlot, Qt
from PyQt6.QtWidgets import QApplication, QMessageBox
from ui import About, Archive, Login, Main_Window, Test, Users_Management, Recipe_Selection, \
Barcode_Recipe_Selection
if "--vision" in sys.argv:
from components import GalaxyCamera, NeoPixels, UVCCamera, Vision, VisionSaver
class Main(QObject):
do = pyqtSignal(dict)
@staticmethod
def _do(config):
return config["f"](*config.get("a", []), **config.get("k", {}))
def __init__(self, parent=None):
# print(f"MAIN {int(QThread.currentThreadId())}", flush=True)
super().__init__()
self.tag_loaded_recipe = None
self.do.connect(self._do)
try:
# READ CONFIG
system_id = args.system_id if "system_id" in args else None
self.config = ConfigReader(system_id=system_id)
logging.info(f"STARTING SESSION ON MACHINE {self.config['machine']['description']}")
self.config["autotest_done"] = False
# INIT COMPONENT
self.components_specs = {
"archive_synchronizer": {"c": ArchiveSynchronizer},
"label_printer": {"c": Os_Label_Printer, "t": False},
"extra_label_printer": {"c": Os_Label_Printer, "t": False},
"label_printer_2": {"c": BrotherLabelPrinter, "t": False},
"multicomp": {"c": Multicomp730424, "k": {"paused": True}},
"remote_api": {"c": RemoteAPI, "k": {"main": self}},
"screwdriver": {"c": TecnaScrewdriver, "k": {"paused": True}},
"tecna_t3": {"c": TecnaMarpossProvasetT3, "k": {"paused": True}},
"furness_controls": {"c": FurnessControlsLeakTester, "k": {"paused": True}},
"digital_io": {"c": USB_586x, "k": {"paused": True}},
"digital_io_flush_blow": {"c": USB_586x, "k": {"paused": True}},
"fixture_id": {"c": RFID_PN532, "k": {"paused": False, "lazy": False}},
}
# VISION COMPONENT IS OPTIONAL AND DISABLED BY DEFAULT
if "--vision" in sys.argv:
# merge dicts
self.components_specs = {**self.components_specs,
**{
"galaxy_camera": {"c": GalaxyCamera, "k": {"paused": True}},
"neo_pixels": {"c": NeoPixels, "t": False},
"uvc_camera": {"c": UVCCamera, "k": {"paused": True}},
"vision_saver": {"c": VisionSaver, "t": False},
"vision": {"c": Vision, "k": {"paused": True}}
}
}
for component_name in list(self.components_specs):
if self.config.get("hardware_config", {}).get(component_name, None) != "present":
self.components_specs.pop(component_name, None)
elif component_name not in self.components_specs:
raise AssertionError(f"{component_name!r} is not a valid component name")
self.components = {}
self.threads = {}
for component_name, spec in self.components_specs.items():
self.components[component_name] = spec["c"](*spec.get("a", []), config=self.config,
name=component_name, **spec.get("k", {}))
if spec.get("t", True):
self.threads[component_name] = QThread()
self.threads[component_name].setTerminationEnabled(True)
self.components[component_name].moveToThread(self.threads[component_name])
if "fixture_id" in self.components.keys():
self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid)
for component_name, thread in self.threads.items():
component = self.components[component_name]
thread.started.connect(component.start)
thread.start()
# DEBUGGER WORKAROUND
QApplication.processEvents()
QThread.msleep(1000)
QApplication.processEvents()
if component_name == "vision":
component.wait_completion(timeout=60)
else:
component.wait_completion()
except Exception as e:
logging.exception(traceback.format_exc())
QMessageBox.critical(None, "Errore", f"Errore di avvio del programma di collaudo:\n\n{e}")
quit()
# connect camera frames to vision
if "vision" in self.components and "uvc_camera" in self.components:
self.components["vision"].set_sources({"uvc_camera": self.components["uvc_camera"].out})
elif "vision" in self.components and "galaxy_camera" in self.components:
self.components["vision"].set_sources({"galaxy_camera": self.components["galaxy_camera"].out})
# connect tecna to screwdriver
if "screwdriver" in self.components and "tecna_t3" in self.components:
self.components["tecna_t3"].set_requestors({"screwdriver": self.components["screwdriver"].request})
self.components["screwdriver"].set_sources({"tecna_t3": self.components["tecna_t3"].out})
# GUI INIT
if "--no-gui" not in sys.argv:
# self.main_window = Main_Window(self.bench)
self.main_window = Main_Window()
# CONNECT MAIN WINDOW ACTIONS
self.main_window.logout_a.triggered.connect(lambda checked, selfie=weakref.ref(self): selfie().logout())
self.main_window.archive_a.triggered.connect(
lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog(
Archive(hide_cloud_image="vision_saver" not in selfie().components)))
if "--archive" in sys.argv:
self.main_window.archive_a.trigger()
if "--about" in sys.argv:
self.main_window.about_a.trigger()
# admin menu should not be visible before an admin logs in
self.main_window.admin_m.menuAction().setVisible(False)
self.main_window.about_a.triggered.connect(
lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog(About()))
self.main_window.quit_a.triggered.connect(quit_app)
self.main_window.users_management_a.triggered.connect(
lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog(Users_Management()))
self.main_window.table_selection_a.triggered.connect(self.set_recipe_mode_table)
self.main_window.barcode_selection_a.triggered.connect(self.set_recipe_mode_barcode)
self.main_window.ristampa_etichetta_a.triggered.connect(self.reprint_label)
self.main_window.diagnostics_a.triggered.connect(
lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog(Diagnostics(selfie())))
if "--users-management" in sys.argv:
self.main_window.users_management_a.trigger()
# CONFIG-SPECIFIC MENU ENTRY ACTIVATION
if "tecna_t3" in self.components and (
"--enable-saving-tecna-recipes" in sys.argv or self.config.get("tecna_t3", {}).get("saver",
None) == "present"):
self.main_window.save_tecna_recipes_a.triggered.connect(self.components["tecna_t3"].store_recipes)
self.main_window.save_tecna_recipes_a.setVisible(True)
if "--save-tecna-recipes" in sys.argv:
self.main_window.save_tecna_recipes_a.trigger()
else:
self.main_window.save_tecna_recipes_a.setVisible(False)
self.main_window.barcode_selection_a.setVisible(
self.config["hardware_config"]["barcode_recipe_selection"] == "present")
# OPEN LOGIN TAB
self.open_login()
# SHOW MAIN WINDOW
if "--panel" in sys.argv:
self.main_window.show()
elif "--maximized" in sys.argv:
self.main_window.showMaximized()
elif "--full-screen" in sys.argv:
self.main_window.showFullScreen()
else:
self.main_window.showFullScreen()
def open_login(self):
tab = Login()
tab.successful_login.connect(self.logged_in)
self.main_window.open_tab(tab)
def logged_in(self):
session = Users.get_session()
if session is not None:
if session.is_admin:
self.main_window.admin_m.menuAction().setVisible(True)
else:
self.main_window.admin_m.menuAction().setVisible(False)
# open test
self.main_window.open_tab(Test(self.config, self.components, self))
self.main_window.centralWidget().request_autotest("login")
else:
self.main_window.admin_m.menuAction().setVisible(False)
def logout(self):
# Users.logout()
self.main_window.admin_m.menuAction().setVisible(False)
if type(self.main_window.centralWidget().centralWidget.widget) in (
Recipe_Selection, Barcode_Recipe_Selection):
# LOGOUT IMMEDIATELY IF NOT TESTING
Users.logout()
ArchiveSynchronizer.machine_status = "logged-out"
self.open_login()
else:
if not self.main_window.centralWidget().autotesting:
# LOGOUT AFTER AUTOTEST IF TESTING
self.main_window.centralWidget().request_autotest("logout")
else:
# LOGOUT IMMEDIATELY IF AUTOTESTING
Users.logout()
self.open_login()
def set_recipe_mode_table(self):
self.main_window.centralWidget().set_recipe_mode_table()
def set_recipe_mode_barcode(self):
self.main_window.centralWidget().set_recipe_mode_barcode()
def reprint_label(self):
self.main_window.centralWidget().reprint_label()
@pyqtSlot(str)
def load_recipe_from_rfid(self, data):
self.tag_loaded_recipe = data
if __name__ == "__main__":
app = QApplication(sys.argv)
with SingleProcess() as single_process_lock:
if not single_process_lock and "--no-lock" not in sys.argv:
logging.error(f"Program already opened, exiting...")
QMessageBox.critical(None, "ERRORE", "IL PROGRAMMA E' GIA' IN ESECUZIONE")
exit(0)
main = Main()
if "--no-gui" not in sys.argv:
app.exec()
if "--interact" in sys.argv:
import code
import readline
variables = globals().copy()
variables.update(locals())
shell = code.InteractiveConsole(variables)
shell.interact()
except Exception:
logging.exception(traceback.format_exc())
# extype, value, tb = sys.exc_info()
# pdb.post_mortem(tb)