Merge branch 'adam-6066'

This commit is contained in:
eduar 2025-09-22 11:39:25 +02:00
commit 67af92339a
2 changed files with 335 additions and 0 deletions

249
src/components/adam_6066.py Normal file
View File

@ -0,0 +1,249 @@
import time
import enum
from typing import List, Optional
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
class Edges(enum.IntEnum):
RISING = enum.auto()
FALLING = enum.auto()
class ADAM6XXX:
"""
Base class for the Advantech ADAM-6000 series I/O modules using Modbus TCP.
This class provides a foundation for communication and common operations.
Specific module implementations (e.g., ADAM-6066) should inherit from this class
and define their unique register addresses.
"""
ip: str = '10.0.0.1'
name: str = ''
_connected: bool = False
_client: Optional[ModbusTcpClient] = None
digital_inputs: List[int] = []
digital_outputs: List[int] = []
def __init__(self, ip: str = '10.0.0.1', port: int = 502, connect: bool = False):
"""
Create a new ADAM-6000 object.
This should not be called directly; use one of the module-specific objects.
Args:
ip (str, optional): IP address of module. Defaults to '10.0.0.1'.
port (int, optional): The Modbus TCP port (default is 502).
connect (bool, optional): Whether to connect to the module on construction. Defaults to False.
"""
self.ip = ip
self.port = port
self._client = ModbusTcpClient(self.ip, self.port)
if connect:
self.connect()
def connect(self) -> bool:
"""
Connect to the module using Modbus TCP.
Returns:
bool: Whether the connection was successful.
"""
try:
self._connected = self._client.connect()
if not self._connected:
print(f"Error: Could not connect to the module at {self.ip}:{self.port}")
return self._connected
except Exception as e:
print(f"Connection failed: {e}")
self._connected = False
return False
def disconnect(self):
"""
Closes the connection to the module.
"""
if self._connected:
self._client.close()
self._connected = False
print(f"Disconnected from {self.ip}")
def get_digital_input(self, index: int) -> Optional[bool]:
"""
Read the state of a single digital input channel.
This method uses Modbus Function Code 02 (Read Input Status).
Args:
index (int): The index of the input to read (e.g., 0-5).
Returns:
Optional[bool]: Input state, True (HIGH) or False (LOW), or None on failure.
"""
if not self._connected and not self.connect():
return None
try:
address = self.digital_inputs[index]
response = self._client.read_discrete_inputs(address)
if response.isError():
print(f"Modbus Error: {response}")
return None
return response.bits[0]
except (IndexError, ModbusException) as e:
print(f"Error reading digital input {index}: {e}")
return None
def get_digital_output(self, index: int) -> Optional[bool]:
"""
Read the state of a single digital output channel.
This method uses Modbus Function Code 01 (Read Coil Status).
Args:
index (int): The index of the output to read (e.g., 0-5).
Returns:
Optional[bool]: Output state, True (HIGH) or False (LOW), or None on failure.
"""
if not self._connected and not self.connect():
return None
try:
address = self.digital_outputs[index]
response = self._client.read_coils(address)
if response.isError():
print(f"Modbus Error: {response}")
return None
return response.bits[0]
except (IndexError, ModbusException) as e:
print(f"Error reading digital output {index}: {e}")
return None
def set_digital_output(self, index: int, state: bool) -> bool:
"""
Set a digital output on the module.
This method uses Modbus Function Code 05 (Force Single Coil).
Args:
index (int): Which output to set (e.g., 0-5).
state (bool): Whether to set the output HIGH (True) or LOW (False).
Returns:
bool: True if the write operation was successful, False otherwise.
"""
if not self._connected and not self.connect():
return False
try:
address = self.digital_outputs[index]
response = self._client.write_coil(address=address, value=state)
if response.isError():
print(f"Modbus Error: {response}")
return False
return True
except (IndexError, ModbusException) as e:
print(f"Error setting digital output {index}: {e}")
return False
def pulse_digital_output(self, index: int, polarity: Edges = Edges.RISING, duration: float = 0.1) -> None:
"""
Performs a software-controlled pulse of a single digital output.
Args:
index (int): Which output to pulse.
polarity (Edges, optional): Which direction the pulse should be. Defaults to Edges.RISING.
duration (float, optional): Duration in seconds for the pulse. Defaults to 0.1.
"""
if polarity == Edges.RISING:
self.set_digital_output(index, False)
self.set_digital_output(index, True)
time.sleep(duration)
self.set_digital_output(index, False)
elif polarity == Edges.FALLING:
self.set_digital_output(index, True)
self.set_digital_output(index, False)
time.sleep(duration)
self.set_digital_output(index, True)
class ADAM6066(ADAM6XXX):
"""
Class covering the ADAM-6066 6-input/6-relay-output digital module.
This class inherits from the ADAM6XXX base class and defines the specific
register addresses for the ADAM-6066 module.
"""
digital_inputs = list(range(6))
digital_outputs = [16, 17, 18, 19, 20, 21]
def __init__(self, ip: str, port: int = 502, connect: bool = False):
super().__init__(ip, port, connect)
def read_all_digital_inputs(self) -> Optional[List[bool]]:
"""
Reads the status of all 6 digital inputs (DI0-DI5).
This method uses Modbus Function Code 02 (Read Input Status).
Returns:
Optional[List[bool]]: A list of Booleans representing the state of each DI,
or None if the read operation fails.
"""
if not self._connected and not self.connect():
return None
try:
# The start address is the address of the first digital input
start_address = self.digital_inputs[0]
# Read 6 discrete inputs starting from the DI_START_ADDRESS
response = self._client.read_discrete_inputs(start_address, count=6)
if response.isError():
print(f"Modbus Error: {response}")
return None
# The response.bits is a list of booleans
return response.bits
except Exception as e:
print(f"Error reading all digital inputs: {e}")
return None
def read_all_digital_outputs(self) -> Optional[List[bool]]:
"""
Reads the status of all 6 digital outputs (DO0-DO5).
This method uses Modbus Function Code 01 (Read Coil Status).
Returns:
Optional[List[bool]]: A list of Booleans representing the state of each DO,
or None if the read operation fails.
"""
if not self._connected and not self.connect():
return None
try:
# The start address is the address of the first digital output
start_address = self.digital_outputs[0]
# Read 6 coils starting from the DO_START_ADDRESS
response = self._client.read_coils(start_address, count=6)
if response.isError():
print(f"Modbus Error: {response}")
return None
return response.bits
except Exception as e:
print(f"Error reading all digital outputs: {e}")
return None

View File

@ -0,0 +1,86 @@
import os
import platform
import sys
import time
from typing import List
from src.components.adam_6066 import ADAM6066
if platform.system() == "Windows":
sys.path.append(f"{os.getcwd()}\\src\\components")
else:
sys.path.append(f"{os.getcwd()}/src/components")
# --- Configuration ---
# Replace with the actual IP address of your ADAM-6066 module
ADAM_IP = '10.0.0.1'
OUTPUT_CHANNELS = 6
DELAY_SECONDS = 0.5
def format_bits_to_string(bits: List[bool]) -> str:
"""
Helper function to convert a list of booleans to a string of 0s and 1s.
"""
return ''.join(['1' if bit else '0' for bit in bits])
def main():
"""
Main function to continuously test the ADAM-6066 driver by cycling through
digital outputs and reading digital inputs.
"""
print(f"Initializing connection to ADAM-6066 at {ADAM_IP}...")
adam_module = ADAM6066(ip=ADAM_IP)
if not adam_module.connect():
print("Failed to connect. Please check the IP address and network connection.")
return
out_num = 0
try:
while True:
# --- Cycle through Digital Outputs ---
# Set the current output channel to ON
print(f"Setting DO{out_num} to ON.")
if not adam_module.set_digital_output(index=out_num, state=True):
print(f"Failed to set DO{out_num}. Exiting loop.")
break
time.sleep(DELAY_SECONDS)
# Read and print the current state of all inputs and outputs
di_states = adam_module.read_all_digital_inputs()
do_states = adam_module.read_all_digital_outputs()
if di_states is not None and do_states is not None:
di_str = format_bits_to_string(di_states)
do_str = format_bits_to_string(do_states)
print(f"in:{di_str} out:{do_str}")
else:
print("Failed to read I/O states. Exiting loop.")
break
# Set the current output channel to OFF
print(f"Setting DO{out_num} to OFF.")
if not adam_module.set_digital_output(index=out_num, state=False):
print(f"Failed to set DO{out_num}. Exiting loop.")
break
time.sleep(DELAY_SECONDS)
# Move to the next output channel
out_num += 1
if out_num >= OUTPUT_CHANNELS:
out_num = 0
except KeyboardInterrupt:
print("\nTest interrupted by user. Cleaning up...")
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
finally:
# Always make sure to disconnect from the module
adam_module.disconnect()
if __name__ == "__main__":
main()