#!/usr/bin/env python3 import argparse import faulthandler import logging import os import platform # import pdb import signal import sys import traceback import weakref from datetime import datetime from pathlib import Path if platform.system() == "Windows": sys.path.append(f"{os.getcwd()}\src\components") from components.usb_586x import USB_586x app = None parser = argparse.ArgumentParser(prog='ST-TEN', description='Leak test system') parser.add_argument('-s', '--system-id') args,unspec = parser.parse_known_args() def quit_app(signalnum=None, handler=None): logging.info(f"quitting app. signal: {signalnum!r}, handler: {handler!r}") global app if app is not None: app.quit() quit() # SETUP QUITTING ON CTRL+C signal.signal(signal.SIGINT, quit_app) # SETUP FAULTHANDLER faulthandler.enable(file=sys.stderr, all_threads=True) # SETUP LOGS logs_dir = Path(".") / "data" / "logs" os.makedirs(logs_dir, exist_ok=True) logging.basicConfig( format="{asctime}:{name}:{levelname}:{message}", datefmt="%Y-%m-%dT%H-%M-%S%z", style="{", level="INFO", handlers=[ logging.StreamHandler(stream=sys.stderr), logging.FileHandler( logs_dir / f"{datetime.now().isoformat().replace(':', ';')}.log", mode="a", encoding="utf-8", delay=False, **({"errors": "surrogateescape"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}), ), ], force=True, **({"encoding": "utf-8"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}), **({"errors": "surrogateescape"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}), ) try: # IMPORT PROJECT ONLY AFTER SETTING UP SIGNAL, FAULTHANDLER AND LOGGHING from components import (ArchiveSynchronizer, Multicomp730424, Os_Label_Printer, RemoteAPI, TecnaMarpossProvasetT3, TecnaScrewdriver) from lib.db import Users from lib.helpers import ConfigReader from PyQt5.QtCore import QObject, QThread, pyqtSignal from PyQt5.QtWidgets import QApplication, QMessageBox from ui import About, Archive, Login, Main_Window, Test, Users_Management if "--vision" in sys.argv: from components import GalaxyCamera, NeoPixels, UVCCamera, Vision, VisionSaver class Main(QObject): do = pyqtSignal(dict) @staticmethod def _do(config): return config["f"](*config.get("a", []), **config.get("k", {})) def __init__(self, parent=None): # print(f"MAIN {int(QThread.currentThreadId())}", flush=True) super().__init__() self.do.connect(self._do) try: # READ CONFIG system_id = args.system_id if "system_id" in args else None self.config = ConfigReader(system_id=system_id) self.config["autotest_done"] = False # INIT COMPONENT self.components_specs = { "archive_synchronizer": {"c": ArchiveSynchronizer}, "label_printer": {"c": Os_Label_Printer, "t": False}, "extra_label_printer": {"c": Os_Label_Printer, "t": False}, "multicomp": {"c": Multicomp730424, "k": {"paused": True}}, "remote_api": {"c": RemoteAPI, "k": {"main": self}}, "screwdriver": {"c": TecnaScrewdriver, "k": {"paused": True}}, "tecna_t3": {"c": TecnaMarpossProvasetT3, "k": {"paused": True}}, "digital_io":{"c":USB_586x,"k":{"paused":True}} } # VISION COMPONENT IS OPTIONAL AND DISABLED BY DEFAULT if "--vision" in sys.argv: # merge dicts self.components_specs = {**self.components_specs, **{ "galaxy_camera": {"c": GalaxyCamera, "k": {"paused": True}}, "neo_pixels": {"c": NeoPixels, "t": False}, "uvc_camera": {"c": UVCCamera, "k": {"paused": True}}, "vision_saver": {"c": VisionSaver, "t": False}, "vision": {"c": Vision, "k": {"paused": True}} } } for component_name in list(self.components_specs): if self.config.get("hardware_config", {}).get(component_name, None) != "present": self.components_specs.pop(component_name, None) elif component_name not in self.components_specs: raise AssertionError(f"{component_name!r} is not a valid component name") self.components = {} self.threads = {} for component_name, spec in self.components_specs.items(): self.components[component_name] = spec["c"](*spec.get("a", []), config=self.config, name=component_name, **spec.get("k", {})) if spec.get("t", True): self.threads[component_name] = QThread() self.threads[component_name].setTerminationEnabled(True) self.components[component_name].moveToThread(self.threads[component_name]) for component_name, thread in self.threads.items(): component = self.components[component_name] thread.started.connect(component.start) thread.start() if "--debugger-workaround" in sys.argv: QApplication.processEvents() QThread.msleep(1000) QApplication.processEvents() if component_name == "vision": component.wait_completion(timeout=60) else: component.wait_completion() except Exception as e: logging.exception(traceback.format_exc()) QMessageBox.critical(None, "Errore", f"Errore di avvio del programma di collaudo:\n\n{e}") quit() # connect camera frames to vision if "vision" in self.components and "uvc_camera" in self.components: self.components["vision"].set_sources({"uvc_camera": self.components["uvc_camera"].out}) elif "vision" in self.components and "galaxy_camera" in self.components: self.components["vision"].set_sources({"galaxy_camera": self.components["galaxy_camera"].out}) # connect tecna to screwdriver if "screwdriver" in self.components and "tecna_t3" in self.components: self.components["tecna_t3"].set_requestors({"screwdriver": self.components["screwdriver"].request}) self.components["screwdriver"].set_sources({"tecna_t3": self.components["tecna_t3"].out}) # GUI INIT if "--no-gui" not in sys.argv: # self.main_window = Main_Window(self.bench) self.main_window = Main_Window() # CONNECT MAIN WINDOW ACTIONS self.main_window.logout_a.triggered.connect(lambda checked, self=weakref.ref(self): self().logout()) self.main_window.archive_a.triggered.connect( lambda checked, self=weakref.ref(self): self().main_window.open_dialog( Archive(hide_cloud_image="vision_saver" not in self().components))) if "--archive" in sys.argv: self.main_window.archive_a.trigger() self.main_window.about_a.triggered.connect( lambda checked, self=weakref.ref(self): self().main_window.open_dialog(About())) if "--about" in sys.argv: self.main_window.about_a.trigger() self.main_window.admin_m.menuAction().setVisible( False) # admin menu should not be visible before an admin logs in self.main_window.quit_a.triggered.connect(quit_app) self.main_window.users_management_a.triggered.connect( lambda checked, self=weakref.ref(self): self().main_window.open_dialog(Users_Management())) if "--users-management" in sys.argv: self.main_window.users_management_a.trigger() # self.main_window.recipes_management_a.triggered.connect(lambda checked, self=weakref.ref(self): self().main_window.open_dialog(Recipes_Management())) # if "--recipes-management" in sys.argv: # self.main_window.recipes_management_a.trigger() # self.main_window.steps_management_a.triggered.connect(lambda checked, self=weakref.ref(self): self().main_window.open_dialog(Steps_Management())) # if "--steps-management" in sys.argv: # self.main_window.steps_management_a.trigger() if "tecna_t3" in self.components and ( "--enable-saving-tecna-recipes" in sys.argv or self.config.get("tecna_t3", {}).get("saver", None) == "present"): self.main_window.save_tecna_recipes_a.triggered.connect(self.components["tecna_t3"].store_recipes) self.main_window.save_tecna_recipes_a.setVisible(True) if "--save-tecna-recipes" in sys.argv: self.main_window.save_tecna_recipes_a.trigger() else: self.main_window.save_tecna_recipes_a.setVisible(False) # OPEN LOGIN TAB self.open_login() # SHOW MAIN WINDOW if "--panel" in sys.argv: self.main_window.show() elif "--maximized" in sys.argv: self.main_window.showMaximized() elif "--full-screen" in sys.argv: self.main_window.showFullScreen() else: self.main_window.showFullScreen() def open_login(self): tab = Login() tab.successful_login.connect(self.logged_in) self.main_window.open_tab(tab) def logged_in(self): session = Users.get_session() if session is not None: if session.is_admin: self.main_window.admin_m.menuAction().setVisible(True) else: self.main_window.admin_m.menuAction().setVisible(False) # open test self.main_window.open_tab(Test(self.config, self.components)) else: self.main_window.admin_m.menuAction().setVisible(False) def logout(self): if type(self.main_window.centralWidget) is Test: self.main_window.centralWidget.change_recipe() Users.logout() self.main_window.admin_m.menuAction().setVisible(False) self.open_login() if __name__ == "__main__": app = QApplication(sys.argv) main = Main() if "--no-gui" not in sys.argv: app.exec() if "--interact" in sys.argv: import code import readline variables = globals().copy() variables.update(locals()) shell = code.InteractiveConsole(variables) shell.interact() except Exception: logging.exception(traceback.format_exc()) # extype, value, tb = sys.exc_info() # pdb.post_mortem(tb)