st-ten-1/src/components/usb_586x.py
2023-04-11 17:01:17 +02:00

225 lines
8.4 KiB
Python

import ctypes
import sys
import platform
import time
from enum import Enum
from PyQt5.QtCore import QMutex, Qt, QTimer, pyqtSlot, pyqtSignal
from .component import Component
is_win = platform.system() == "Windows"
if "--sim-io" not in sys.argv:
if is_win:
from components.Automation.BDaq import *
from components.Automation.BDaq.InstantDoCtrl import InstantDoCtrl
from components.Automation.BDaq.InstantDiCtrl import InstantDiCtrl
else:
libbiodaq = ctypes.CDLL("/opt/advantech/libs/libbiodaq.so")
else:
from components.dummies.Automation.BDaq import *
from components.dummies.Automation.BDaq.InstantDoCtrl import InstantDoCtrl
from components.dummies.Automation.BDaq.InstantDiCtrl import InstantDiCtrl
#is_win=False
#import components.dummies.libbiodaq as libbiodaq
from components.dummies.libbiodaq import ErrorCode
class USB_586x(Component):
class DeviceInformation(ctypes.Structure):
_fields_ = [
('DeviceNumber', ctypes.c_uint32),
('DeviceMode', ctypes.c_uint32),
('ModuleIndex', ctypes.c_uint32),
('Description', ctypes.c_wchar_p)
]
update=pyqtSignal(object)
masks = [1 << n for n in range(8)]
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.mutex = QMutex()
self.simulate="--sim-io" in sys.argv
# DEVICE INFORMATION
self.id=config["digital_io"]["id"]
if "5860" in self.id:
self.type = "5860"
self.in_size = 1
self.out_size = 1
if "5862" in self.id:
self.type = "5862"
self.in_size = 2
self.out_size = 2
self.info = self.DeviceInformation()
self.info.Description = self.id
# self.info.DeviceNumber = -1
self.info.DeviceMode = 1
self.info.ModuleIndex = 0
self.open_device()
# SET ALL RELAYS OFF
for bit in range(0, self.out_size*8):
self.set_bit(int(bit/8), bit%8, False)
self.state_delay = 1
self.last_get = None
self.state_count = None
self.last_out = None
def open_device(self):
# DIGITAL INPUTS CLASS
if not self.simulate:
if is_win:
self.di_ctrl = InstantDiCtrl(self.info.Description)
self.do_ctrl = InstantDoCtrl(self.info.Description)
self.di_read = self.di_ctrl.readAny
self.do_write_bit = self.do_ctrl.writeBit
self.buffer = ctypes.create_string_buffer(2)
else:
self.di_create = libbiodaq.AdxInstantDiCtrlCreate
self.di_create.restype = ctypes.c_void_p
self.di_setSelectedDevice = libbiodaq.InstantDiCtrl_setSelectedDevice
self.di_setSelectedDevice.argtypes = [ctypes.c_void_p, ctypes.POINTER(self.DeviceInformation)]
self.di_setSelectedDevice.restype = ctypes.c_uint32
# DIGITAL INPUTS READ FUNCTION
self.di_read = libbiodaq.InstantDiCtrl_ReadAny
self.di_read.argtypes = [ctypes.c_void_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_char_p]
self.di_read.restype = ctypes.c_int32
# DIGITAL OUTPUTS CLASS
self.do_create = libbiodaq.AdxInstantDoCtrlCreate
self.do_create.restype = ctypes.c_void_p
# SET SELECTED DEVICE
self.do_setSelectedDevice = libbiodaq.InstantDoCtrl_setSelectedDevice
self.do_setSelectedDevice.argtypes = [ctypes.c_void_p, ctypes.POINTER(self.DeviceInformation)]
self.do_setSelectedDevice.restype = ctypes.c_uint32
# get ports
self.get_ports = libbiodaq.InstantDoCtrl_getPortDirection
self.get_ports.argtypes = [ctypes.c_void_p]
self.get_ports.restype = ctypes.POINTER(ctypes.c_void_p)
# DIGITAL OUTPUTS WRITE FUNCTION
self.do_write_bit = libbiodaq.InstantDoCtrl_WriteBit
self.do_write_bit.argtypes = [ctypes.c_void_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_char]
self.do_write_bit.restype = ctypes.c_int32
self.buffer = ctypes.create_string_buffer(2)
# INIT OBJECTS
self.di_ctrl = self.di_create()
self.do_ctrl = self.do_create()
self.di_init_status = self.di_setSelectedDevice(self.di_ctrl, ctypes.byref(self.info))
self.do_init_status = self.do_setSelectedDevice(self.do_ctrl, ctypes.byref(self.info))
else:
self.di_ctrl = InstantDiCtrl(self.info.Description)
self.do_ctrl = InstantDoCtrl(self.info.Description)
self.di_read = self.di_ctrl.readAny
self.do_write_bit = self.do_ctrl.writeBit
self.buffer = ctypes.create_string_buffer(2)
def close_device(self):
pass
@pyqtSlot()
def start(self):
# ACQUISITION TIMER
self.timer = QTimer()
self.timer.setTimerType(Qt.PreciseTimer)
self.timer.setInterval(int(1000 / 20))
self.timer.timeout.connect(self._get)
self.timer.start()
super().start()
# Read data for buffer
@pyqtSlot()
def _get(self):
get = self.get()
# print(self.last_get)
if self.state_count is None or self.last_get is None:
self.last_out = get
self.state_count = [[1 for bit in byte] for byte in get]
else:
for byte_n, byte in enumerate(get):
for bit_n, bit in enumerate(byte):
if bit == self.last_get[byte_n][bit_n]:
self.state_count[byte_n][bit_n] += 1
else:
self.state_count[byte_n][bit_n] = 1
if self.state_count[byte_n][bit_n] > self.state_delay:
self.last_out[byte_n][bit_n] = bit
self.last_get = get
#self.update.emit([time.time(), self.last_out])
super()._get([self.last_out])
def get_all(self):
return self.get()
# Read input bytes
def get(self):
self.mutex.lock()
read = []
if is_win or self.simulate:
if self.simulate:
read=[1,0,0,0,0,0,0,0],[0,0,0,0,0,0,0,0]
else:
ret = self.di_read(0, self.in_size)
if ret[0].value == ErrorCode.Success.value:
self.buffer = ret[1]
for byte_num in range(len(self.buffer)):
byte = self.buffer[byte_num]
read.append([bool(byte & m) for m in self.masks])
else:
self.buffer = None
else:
self.di_read(self.di_ctrl, 0, self.in_size, self.buffer)
for byte_num in range(len(self.buffer)):
byte = int.from_bytes(self.buffer[byte_num], "little")
read.append([bool(byte & m) for m in self.masks])
self.mutex.unlock()
return read
# Read data bit
def get_bit(self, byte, bit):
return self.get()[byte][bit]
# Send data byte
def set(self, start_byte, val):
for byte, v in enumerate(val, start=start_byte):
for bit, bv in enumerate([bool(v & m) for m in self.masks]):
self.set_bit(byte, bit, bv)
# Write single bit
def set_bit(self, byte, bit, val):
self.mutex.lock()
# print("set", byte, bit, not val, flush=True)
if not self.simulate:
if is_win:
ret=self.do_write_bit(byte, bit, int(val))
else:
ret=self.do_write_bit(self.do_ctrl, byte, bit, int(val))
else:
ret = ErrorCode.Success
self.mutex.unlock()
return ret
def set_bit_verify(self, byte, bit, val):
ok = False
retry=0
max_retry = 3
while not ok and retry <max_retry:
ret = self.set_bit(byte, bit, val)
if ret.value != ErrorCode.Success.value and not self.simulate:
self.log.error(f"SET BIT ERROR")
self.open_device()
retry+=1
else:
ok = True
return ok
# val from buffer channel
def vfbc(self, buffer, channel):
return buffer[channel[0]][channel[1]]