st-ten-1/src/components/usb_586x.py

233 lines
7.6 KiB
Python

import ctypes
import sys
import platform
import time
from enum import Enum
from PyQt5.QtCore import QMutex, Qt, QTimer, pyqtSlot, pyqtSignal
from PyQt5.QtWidgets import QMessageBox, QApplication
from .component import Component
if "--sim-io" not in sys.argv:
from components.Automation.BDaq import *
from components.Automation.BDaq.InstantDoCtrl import InstantDoCtrl
from components.Automation.BDaq.InstantDiCtrl import InstantDiCtrl
else:
from components.dummies.Automation.BDaq import *
from components.dummies.Automation.BDaq.InstantDoCtrl import InstantDoCtrl
from components.dummies.Automation.BDaq.InstantDiCtrl import InstantDiCtrl
from components.dummies.libbiodaq import ErrorCode
class USB_586x(Component):
class DeviceInformation(ctypes.Structure):
_fields_ = [
('DeviceNumber', ctypes.c_uint32),
('DeviceMode', ctypes.c_uint32),
('ModuleIndex', ctypes.c_uint32),
('Description', ctypes.c_wchar_p)
]
update=pyqtSignal(object)
masks = [1 << n for n in range(8)]
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.buffer = None
self.io_ok = False
self.mutex = QMutex()
self.simulate="--sim-io" in sys.argv
# DEVICE INFORMATION
self.id=config["digital_io"]["id"]
if "5860" in self.id:
self.type = "5860"
self.in_size = 1
self.out_size = 1
if "5862" in self.id:
self.type = "5862"
self.in_size = 2
self.out_size = 2
self.info = self.DeviceInformation()
self.info.Description = self.id
self.info.DeviceMode = 1
self.info.ModuleIndex = 0
self.open_device()
#self.out_bits = self.config["digital_outputs"]
#self.in_bits = self.config["digital_inputs"]
# SET ALL RELAYS OFF
for bit in range(0, self.out_size * 8):
self.set_bit(int(bit/8), bit % 8, False)
self.state_delay = 1
self.last_get = None
self.state_count = None
self.last_out = None
self.sim_in = [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0]
def open_device(self):
# DIGITAL I/O CLASS
if not self.simulate:
self.log.info("OPENING USB MODULE...")
try:
self.di_ctrl = InstantDiCtrl(self.info.Description)
self.do_ctrl = InstantDoCtrl(self.info.Description)
self.di_read = self.di_ctrl.readAny
self.do_write_bit = self.do_ctrl.writeBit
self.buffer = ctypes.create_string_buffer(2)
self.io_ok=True
except ValueError:
QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB")
exit(-1)
self.io_ok = False
time.sleep(1)
else:
self.di_ctrl = InstantDiCtrl(self.info.Description)
self.do_ctrl = InstantDoCtrl(self.info.Description)
self.di_read = self.di_ctrl.readAny
self.do_write_bit = self.do_ctrl.writeBit
self.buffer = ctypes.create_string_buffer(2)
self.io_ok = True
def close_device(self):
pass
@pyqtSlot()
def start(self):
# ACQUISITION TIMER
self.timer = QTimer()
self.timer.setTimerType(Qt.PreciseTimer)
self.timer.setInterval(int(1000 / 20))
self.timer.timeout.connect(self._get)
self.timer.start()
super().start()
# Read data for buffer
@pyqtSlot()
def _get(self):
get = self.get()
# print(self.last_get)
if self.state_count is None or self.last_get is None:
self.last_out = get
self.state_count = [[1 for bit in byte] for byte in get]
else:
for byte_n, byte in enumerate(get):
for bit_n, bit in enumerate(byte):
if bit == self.last_get[byte_n][bit_n]:
self.state_count[byte_n][bit_n] += 1
else:
self.state_count[byte_n][bit_n] = 1
if self.state_count[byte_n][bit_n] > self.state_delay:
self.last_out[byte_n][bit_n] = bit
self.last_get = get
#self.update.emit([time.time(), self.last_out])
super()._get([self.last_out])
def get_all(self):
return self.get()
# Read input bytes
def get(self):
self.mutex.lock()
read = []
retry=0
max_retry = 3
while retry < max_retry:
if self.simulate:
read = self.sim_in
break
else:
if self.io_ok:
ret = self.di_read(0, self.in_size)
if ret[0].value == ErrorCode.Success.value:
self.buffer = ret[1]
for byte_num in range(len(self.buffer)):
byte = self.buffer[byte_num]
read.append([bool(byte & m) for m in self.masks])
else:
self.buffer = None
self.log.error(f"READ ERROR")
self.di_ctrl.dispose()
self.do_ctrl.dispose()
self.io_ok = False
if self.io_ok:
break
else:
time.sleep(1)
self.open_device()
self.mutex.unlock()
return read
# Read data bit
def get_bit(self, byte, bit):
return self.get()[byte][bit]
def get_in(self,input_name,data=None):
if data is None:
data=self.last_get
sensor_index = int(self.in_bits[input_name])
byte_idx = int(sensor_index / 8)
bit_idx = sensor_index % 8
if data is None:
return 0
else:
return data[byte_idx][bit_idx]
# Send data byte
def set(self, start_byte, val):
for byte, v in enumerate(val, start=start_byte):
for bit, bv in enumerate([bool(v & m) for m in self.masks]):
self.set_bit(byte, bit, bv)
# Write single bit
def set_bit(self, byte, bit, val):
self.mutex.lock()
# print("set", byte, bit, not val, flush=True)
if self.io_ok:
if not self.simulate:
ret=self.do_write_bit(byte, bit, int(val))
else:
ret = ErrorCode.Success
else:
ret =False
self.mutex.unlock()
return ret
def set_bit_verify(self, byte, bit, val):
ok = False
retry=0
max_retry = 3
while not ok and retry < max_retry:
ret = self.set_bit(byte, bit, val)
if ret.value != ErrorCode.Success.value and not self.simulate:
self.log.error(f"SET BIT ERROR")
time.sleep(1)
self.open_device()
retry += 1
else:
ok = True
return ok
# val from buffer channel
def vfbc(self, buffer, channel):
return buffer[channel[0]][channel[1]]
def set_out(self,output_name,value):
if output_name in self.out_bits.keys():
sensor_index = int(self.out_bits[output_name])
byte_idx=int(sensor_index/8)
bit_idx=sensor_index%8
self.set_bit_verify(byte_idx,bit_idx,value)
def set_in_bit(self, byte, bit, val):
self.sim_in[byte][bit] = val
return True