Programme en Python pour lire les valeurs
#!/usr/bin/env python3
# -*- coding: utf_8 -*-
"""
Modbus TestKit: Implementation of Modbus protocol in python
(C)2009 - Luc Jean - luc.jean@gmail.com
(C)2009 - Apidev - http://www.apidev.fr
This is distributed under GNU LGPL license, see license.txt
"""
# https://code.google.com/archive/p/modbus-tk/wikis/ModbusMasterExample.wiki
# To install dependencies:
# pip install modbus-tk
# pip install pyserial
import serial
import struct
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
from datetime import datetime
from pymodbus.pdu import ModbusRequest
from pymodbus.client.sync import ModbusSerialClient as ModbusClient #initialize a serial RTU client instance
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.constants import Endian # For 32-bit float numbers (2 registers / 4 bytes)
from pymodbus.payload import BinaryPayloadDecoder # For 32-bit float numers (2 registers / 4 bytes)
from pymodbus.payload import BinaryPayloadBuilder # Write 32-bit floats in register
# Settings for SDM120M
METHOD = "rtu"
PORT = 'COM5'
PARITY = 'N'
BAUDRATE = 2400
STOPBITS = 1
BYTESIZE = 8
TIMEOUT = 1
RETRIES = 2
DEBUG = False
address = 1
address = int(input("Enter the address of your module: "))
print ("Asking values for address:?", address)
def main():
"""main"""
now = datetime.now()
logger = modbus_tk.utils.create_logger("console")
print ("Testing of Eastron SDM120M")
print("by Electro-Tech s.à r.l.")
print("15, rue Centrale")
print("L-4499 Limpach")
print("email: serge.klein AT electro-tech.lu")
print("Date of test: ",now.strftime("%B %d, %Y %H:%M:%S"))
#Connect to the slave
master = modbus_rtu.RtuMaster(
serial.Serial(port=PORT, baudrate=BAUDRATE, bytesize=8, parity=PARITY, stopbits=1, xonxoff=0)
)
master.set_timeout(5.0)
master.set_verbose(False)
logger.info("connected")
# Get Voltage version 1
#https://forum-raspberrypi.de/forum/thread/37754-umwandlung-array-of-int-4-byte-in-float-nach-ieee-754/?postID=317842#post317842
try:
print ("Get voltage version 1")
print ("---------------------")
if DEBUG == True : logger.info(master.execute(address, cst.READ_INPUT_REGISTERS, 0, 2))
temp=master.execute(address, cst.READ_INPUT_REGISTERS, 0, 2)
inputArray=[temp[1], temp[0]]
int32Val = temp[1] + (temp[0] << 16)
newFloat = struct.unpack('f', struct.pack('i', int32Val))[0]
print("inputArray='{}' ==> int32Val='{}' ==> newFloat='{:.2f}' Volt".format(inputArray, int32Val, newFloat))
master.close()
except modbus_tk.modbus.ModbusError as exc:
logger.error("%s- Code=%d", exc, exc.get_exception_code())
try:
#Connect to the slave
client = ModbusClient(method = METHOD, port = PORT, stopbits = STOPBITS, bytesize = BYTESIZE, parity = PARITY, baudrate = BAUDRATE, timeout = TIMEOUT, retries = RETRIES)
connection = client.connect()
print("Connected.")
### Get Voltage version 2
print ("Get voltage version 2")
print ("---------------------")
data = client.read_input_registers(0, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big) # endian=Endian.Little / endian=Endian.Big
temp = round(decoder.decode_32bit_float(),3)
print("Voltage [V]", temp)
### Get Current
data = client.read_input_registers(6, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Current [A]", temp)
### Get active power
data = client.read_input_registers(12, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Active power [W]", temp)
### Get apparent power
data = client.read_input_registers(18, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Apparent power [VA]", temp)
### Get reactive power
data = client.read_input_registers(14, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Reactive power [VAr]", temp)
### Get power factor
data = client.read_input_registers(12, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Power factor", temp)
### Get Frequency
data = client.read_input_registers(70, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Frequency [Hz]", temp)
### Import active energy
data = client.read_input_registers(72, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Import active energy [kWh]", temp)
### Export active energy
data = client.read_input_registers(74, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Export active energy [kWh]", temp)
### Import reactive energy
data = client.read_input_registers(76, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Import reactive energy [kvarh]", temp)
### Export reactive energy
data = client.read_input_registers(78, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Export reactive energy [kvarh]", temp)
### Get Total active energy
data = client.read_input_registers(342, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float(),3)
print("Total active energy [kWh]", temp)
### Get relay pulse width
data = client.read_holding_registers(12, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float())
print("Relay pulse width [ms]", temp)
### Get network parity Stop
data = client.read_holding_registers(18, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float())
print("Network parity/stop:")
if temp == 0: print("->One stop bit and no parity")
if temp == 1: print("->One stop bit and even parity")
if temp == 2: print("->One stop bit and odd parity")
if temp == 3: print("->Two stop bits and no parity")
### Get device ID
data = client.read_holding_registers(20, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float())
print("Device ID", temp)
### Get Baudrate
data = client.read_holding_registers(28, 2, unit = address)
if DEBUG == True : print ("Data registers", data.registers)
decoder = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
temp = round(decoder.decode_32bit_float())
print("Baudrate:")
if temp == 0: print("->2400 (default)")
if temp == 1: print("->4800")
if temp == 2: print("->9600")
if temp == 3: print("->1200")
client.close()
except:
print ("Modbus connection error.")
if __name__ == "__main__":
main()