From 1019a43ebba9e10d4a84eeadbeada20c794c6ced Mon Sep 17 00:00:00 2001 From: eduar Date: Fri, 19 Sep 2025 16:01:31 +0200 Subject: [PATCH] Bugatti proto label --- src/components/adam_6066.py | 249 ++++++++++++++++++++++++++++++++++++ src/test/test_adam_6066.py | 86 +++++++++++++ 2 files changed, 335 insertions(+) create mode 100644 src/components/adam_6066.py create mode 100644 src/test/test_adam_6066.py diff --git a/src/components/adam_6066.py b/src/components/adam_6066.py new file mode 100644 index 0000000..f217cb7 --- /dev/null +++ b/src/components/adam_6066.py @@ -0,0 +1,249 @@ +import time +import enum +from typing import List, Optional + +from pymodbus.client import ModbusTcpClient +from pymodbus.exceptions import ModbusException + + +class Edges(enum.IntEnum): + RISING = enum.auto() + FALLING = enum.auto() + + +class ADAM6XXX: + """ + Base class for the Advantech ADAM-6000 series I/O modules using Modbus TCP. + + This class provides a foundation for communication and common operations. + Specific module implementations (e.g., ADAM-6066) should inherit from this class + and define their unique register addresses. + """ + ip: str = '10.0.0.1' + name: str = '' + _connected: bool = False + _client: Optional[ModbusTcpClient] = None + + digital_inputs: List[int] = [] + digital_outputs: List[int] = [] + + def __init__(self, ip: str = '10.0.0.1', port: int = 502, connect: bool = False): + """ + Create a new ADAM-6000 object. + + This should not be called directly; use one of the module-specific objects. + + Args: + ip (str, optional): IP address of module. Defaults to '10.0.0.1'. + port (int, optional): The Modbus TCP port (default is 502). + connect (bool, optional): Whether to connect to the module on construction. Defaults to False. + """ + self.ip = ip + self.port = port + self._client = ModbusTcpClient(self.ip, self.port) + if connect: + self.connect() + + def connect(self) -> bool: + """ + Connect to the module using Modbus TCP. + + Returns: + bool: Whether the connection was successful. + """ + try: + self._connected = self._client.connect() + if not self._connected: + print(f"Error: Could not connect to the module at {self.ip}:{self.port}") + return self._connected + except Exception as e: + print(f"Connection failed: {e}") + self._connected = False + return False + + def disconnect(self): + """ + Closes the connection to the module. + """ + if self._connected: + self._client.close() + self._connected = False + print(f"Disconnected from {self.ip}") + + def get_digital_input(self, index: int) -> Optional[bool]: + """ + Read the state of a single digital input channel. + + This method uses Modbus Function Code 02 (Read Input Status). + + Args: + index (int): The index of the input to read (e.g., 0-5). + + Returns: + Optional[bool]: Input state, True (HIGH) or False (LOW), or None on failure. + """ + if not self._connected and not self.connect(): + return None + + try: + address = self.digital_inputs[index] + response = self._client.read_discrete_inputs(address) + + if response.isError(): + print(f"Modbus Error: {response}") + return None + + return response.bits[0] + except (IndexError, ModbusException) as e: + print(f"Error reading digital input {index}: {e}") + return None + + def get_digital_output(self, index: int) -> Optional[bool]: + """ + Read the state of a single digital output channel. + + This method uses Modbus Function Code 01 (Read Coil Status). + + Args: + index (int): The index of the output to read (e.g., 0-5). + + Returns: + Optional[bool]: Output state, True (HIGH) or False (LOW), or None on failure. + """ + if not self._connected and not self.connect(): + return None + + try: + address = self.digital_outputs[index] + response = self._client.read_coils(address) + + if response.isError(): + print(f"Modbus Error: {response}") + return None + + return response.bits[0] + except (IndexError, ModbusException) as e: + print(f"Error reading digital output {index}: {e}") + return None + + def set_digital_output(self, index: int, state: bool) -> bool: + """ + Set a digital output on the module. + + This method uses Modbus Function Code 05 (Force Single Coil). + + Args: + index (int): Which output to set (e.g., 0-5). + state (bool): Whether to set the output HIGH (True) or LOW (False). + + Returns: + bool: True if the write operation was successful, False otherwise. + """ + if not self._connected and not self.connect(): + return False + + try: + address = self.digital_outputs[index] + response = self._client.write_coil(address=address, value=state) + + if response.isError(): + print(f"Modbus Error: {response}") + return False + + return True + except (IndexError, ModbusException) as e: + print(f"Error setting digital output {index}: {e}") + return False + + def pulse_digital_output(self, index: int, polarity: Edges = Edges.RISING, duration: float = 0.1) -> None: + """ + Performs a software-controlled pulse of a single digital output. + + Args: + index (int): Which output to pulse. + polarity (Edges, optional): Which direction the pulse should be. Defaults to Edges.RISING. + duration (float, optional): Duration in seconds for the pulse. Defaults to 0.1. + """ + if polarity == Edges.RISING: + self.set_digital_output(index, False) + self.set_digital_output(index, True) + time.sleep(duration) + self.set_digital_output(index, False) + elif polarity == Edges.FALLING: + self.set_digital_output(index, True) + self.set_digital_output(index, False) + time.sleep(duration) + self.set_digital_output(index, True) + + +class ADAM6066(ADAM6XXX): + """ + Class covering the ADAM-6066 6-input/6-relay-output digital module. + + This class inherits from the ADAM6XXX base class and defines the specific + register addresses for the ADAM-6066 module. + """ + digital_inputs = list(range(6)) + digital_outputs = [16, 17, 18, 19, 20, 21] + + def __init__(self, ip: str, port: int = 502, connect: bool = False): + super().__init__(ip, port, connect) + + def read_all_digital_inputs(self) -> Optional[List[bool]]: + """ + Reads the status of all 6 digital inputs (DI0-DI5). + + This method uses Modbus Function Code 02 (Read Input Status). + + Returns: + Optional[List[bool]]: A list of Booleans representing the state of each DI, + or None if the read operation fails. + """ + if not self._connected and not self.connect(): + return None + + try: + # The start address is the address of the first digital input + start_address = self.digital_inputs[0] + # Read 6 discrete inputs starting from the DI_START_ADDRESS + response = self._client.read_discrete_inputs(start_address, count=6) + + if response.isError(): + print(f"Modbus Error: {response}") + return None + + # The response.bits is a list of booleans + return response.bits + except Exception as e: + print(f"Error reading all digital inputs: {e}") + return None + + def read_all_digital_outputs(self) -> Optional[List[bool]]: + """ + Reads the status of all 6 digital outputs (DO0-DO5). + + This method uses Modbus Function Code 01 (Read Coil Status). + + Returns: + Optional[List[bool]]: A list of Booleans representing the state of each DO, + or None if the read operation fails. + """ + if not self._connected and not self.connect(): + return None + + try: + # The start address is the address of the first digital output + start_address = self.digital_outputs[0] + # Read 6 coils starting from the DO_START_ADDRESS + response = self._client.read_coils(start_address, count=6) + + if response.isError(): + print(f"Modbus Error: {response}") + return None + + return response.bits + except Exception as e: + print(f"Error reading all digital outputs: {e}") + return None + + diff --git a/src/test/test_adam_6066.py b/src/test/test_adam_6066.py new file mode 100644 index 0000000..51c947c --- /dev/null +++ b/src/test/test_adam_6066.py @@ -0,0 +1,86 @@ +import os +import platform +import sys +import time +from typing import List +from src.components.adam_6066 import ADAM6066 + +if platform.system() == "Windows": + sys.path.append(f"{os.getcwd()}\\src\\components") +else: + sys.path.append(f"{os.getcwd()}/src/components") + +# --- Configuration --- +# Replace with the actual IP address of your ADAM-6066 module +ADAM_IP = '10.0.0.1' +OUTPUT_CHANNELS = 6 +DELAY_SECONDS = 0.5 + + +def format_bits_to_string(bits: List[bool]) -> str: + """ + Helper function to convert a list of booleans to a string of 0s and 1s. + """ + return ''.join(['1' if bit else '0' for bit in bits]) + + +def main(): + """ + Main function to continuously test the ADAM-6066 driver by cycling through + digital outputs and reading digital inputs. + """ + print(f"Initializing connection to ADAM-6066 at {ADAM_IP}...") + adam_module = ADAM6066(ip=ADAM_IP) + + if not adam_module.connect(): + print("Failed to connect. Please check the IP address and network connection.") + return + + out_num = 0 + try: + while True: + # --- Cycle through Digital Outputs --- + # Set the current output channel to ON + print(f"Setting DO{out_num} to ON.") + if not adam_module.set_digital_output(index=out_num, state=True): + print(f"Failed to set DO{out_num}. Exiting loop.") + break + + time.sleep(DELAY_SECONDS) + + # Read and print the current state of all inputs and outputs + di_states = adam_module.read_all_digital_inputs() + do_states = adam_module.read_all_digital_outputs() + + if di_states is not None and do_states is not None: + di_str = format_bits_to_string(di_states) + do_str = format_bits_to_string(do_states) + print(f"in:{di_str} out:{do_str}") + else: + print("Failed to read I/O states. Exiting loop.") + break + + # Set the current output channel to OFF + print(f"Setting DO{out_num} to OFF.") + if not adam_module.set_digital_output(index=out_num, state=False): + print(f"Failed to set DO{out_num}. Exiting loop.") + break + + time.sleep(DELAY_SECONDS) + + # Move to the next output channel + out_num += 1 + if out_num >= OUTPUT_CHANNELS: + out_num = 0 + + except KeyboardInterrupt: + print("\nTest interrupted by user. Cleaning up...") + except Exception as e: + print(f"\nAn unexpected error occurred: {e}") + finally: + # Always make sure to disconnect from the module + adam_module.disconnect() + + +if __name__ == "__main__": + main()