Armadillo-IoT G3(以下Armadillo)よりModbus/TCPを用いてモジュールを操作する方法についてご紹介します。 この記事では例としてADVANTECH社のI/Oモジュール「ADAM-6051」を用います。 Linux 4.9およびDebian GNU/Linux 9 Stretchにて動作確認しています。
ModbusとModbus/TCP
Modbusはシリアルポートやイーサネット等を通して各種機器を操作するためのプロトコルです。 その仕様が公開されており、また容易であるため、広く普及しています。 この記事では、TCP/IP上の接続のプロトコルであるModbus/TCPを使用して、ADAM-6051を操作します。
使用するパッケージのインストール
今回はPython3のパッケージであるpyModbusTCPを利用します。 このライブラリを利用することで、数行のコードでModbus/TCPを使用することができます。 pipよりインストールすることができます。
[armadillo ~]# pip install pyModbusTCP
ADAM-6051と通信するサンプル
以下にADAM-6051を操作するためのpythonサンプルソースコードを示します。
from pyModbusTCP.client import ModbusClient class Adam: def __init__(self, host = "10.0.0.1", port = 502): self.client = ModbusClient(host = host, port = port) def read_coil(self, addr, length): if not self.client.open(): raise Exception("Unable to open client") bits = self.client.read_coils(addr, length) self.client.close() return bits def write_coil(self, addr, value): if not self.client.open(): raise Exception("Unable to open client") bits = self.client.write_single_coil(addr, value) self.client.close() return bits def read_register(self, addr, length): if not self.client.open(): raise Exception("Unable to open client") regs = self.client.read_holding_registers(addr, length) self.client.close() return regs def write_register(self, addr, value): if not self.client.open(): raise Exception("Unable to open client") regs = self.client.write_single_register(addr, value) self.client.close() return regs class Adam6051: DI_CHANNEL_COUNT = 12 DO_CHANNEL_COUNT = 2 COUNTER_CHANNEL_COUNT = 2 def __init__(self, host = "10.0.0.1", port = 502): self.adam = Adam(host, port) self.output = [False] * self.DO_CHANNEL_COUNT self.apply_output() # 変数outputの内容を、ADAM本体に適用します。(クラスから自動で呼び出されます。) def apply_output(self): result = 0 for channel in range(2): if self.output[channel]: result |= 1 << channel # もしoutput[channel]が真なら、channelビット目を立てます。 self.adam.write_register(302, result) # DIへの入力があるかを返します。(チャンネル0~11以外が指定された場合、Exceptionを返します。) def get_input(self, channel): channel = int(channel) if channel >= 0 and channel < self.DI_CHANNEL_COUNT: return self.adam.read_coil(channel, 1)[0] else: raise Exception("Out of bounds") # 全てのDIへの入力をリストで返します。 def get_all_input(self): return self.adam.read_coil(0, self.DI_CHANNEL_COUNT) # DOを操作します。True: 接続、False: 開放 def set_output(self, channel, flag): channel = int(channel) if channel >= 0 and channel < self.DO_CHANNEL_COUNT: if flag: self.output[channel] = True else: self.output[channel] = False self.apply_output() else: raise Exception("Out of bounds") # DI及びカウンターへ入力された回数を取得します。12および13がC0およびC1に対応しています。 def get_counter(self, channel): channel = int(channel) if channel >= 0 and channel < self.DI_CHANNEL_COUNT + self.COUNTER_CHANNEL_COUNT: regs = self.adam.read_register(channel * 2, 2) return regs[0] + regs[1] * 65536 else: raise Exception("Out of bounds") # 入力された回数のカウントを開始します。 def start_counter(self, channel): channel = int(channel) if channel >= 0 and channel < self.DI_CHANNEL_COUNT + self.COUNTER_CHANNEL_COUNT: return self.adam.write_coil(32 + channel * 4, True) else: raise Exception("Out of bounds") # 入力された回数のカウントを停止します。 def stop_counter(self, channel): channel = int(channel) if channel >= 0 and channel < self.DI_CHANNEL_COUNT + self.COUNTER_CHANNEL_COUNT: return self.adam.write_coil(32 + channel * 4, False) else: raise Exception("Out of bounds") # 指定されたチャンネルのカウンターが動作状態を返します。 def counter_status(self, channel): channel = int(channel) if channel >= 0 and channel < self.DI_CHANNEL_COUNT + self.COUNTER_CHANNEL_COUNT: return self.adam.read_coil(32 + channel * 4, 1)[0] else: raise Exception("Out of bounds") # カウンターの回数をクリアします。 def clear_counter(self, channel): channel = int(channel) if channel >= 0 and channel < self.DI_CHANNEL_COUNT + self.COUNTER_CHANNEL_COUNT: return self.adam.write_coil(33 + channel * 4, True) else: raise Exception("Out of bounds") # カウンターがオーバーフローフラグを返します。 def is_counter_overflowed(self, channel): channel = int(channel) if channel >= 0 and channel < self.DI_CHANNEL_COUNT + self.COUNTER_CHANNEL_COUNT: return self.adam.read_coil(34 + channel * 4, 1)[0] else: raise Exception("Out of bounds") def main(): import time adam6051 = Adam6051(host = "10.0.0.1") adam6051.clear_counter(12) while True: print(adam6051.get_counter(12)) time.sleep(1) if __name__ == "__main__": main()
ADAM-6051の標準のIPは10.0.0.1です。
これはAdamApax .NET Utilityで変更することができます。そうした場合、main()
内の3行目のhost = "10.0.0.1"
を適宜変更してください。
このサンプルソースコードは、ADAM-6051のC0のカウンターをクリアし、その後1秒ごとに現在の回数を表示します。
また、モジュールとしてインポートすることで各種関数を利用してADAM-6051と通信することができます。
動作確認
パルス発生器等をADAM-6051のC0+とC0-に接続し、C言語サンプルコードを実行して動作することを確認してください。 Armadilloに「Armadillo-IoT 絶縁デジタル入出力/アナログ入力アドオンモジュール DA00」を搭載している場合、以下のCのソースコードを実行することで、電源を接続した際に開放と短絡を繰り返してパルスを発生させられます。 CON2のピン1と2に接続してください。
#include <stdio.h> #include <unistd.h> int main() { int isoutputopen = 0; FILE *fp; while(1) { fp = fopen("/sys/class/gpio/gpio89/value", "w"); if (fp == NULL) { return 1; } isoutputopen = 1 - isoutputopen; if (isoutputopen) { fputc('1', fp); } else { fputc('0', fp); } fclose(fp); usleep(500000); } }
以上がArmadilloからModbus/TCPでADAM-6051を操作する手順です。 pyModbusTCPは他のModbus/TCPを利用する機器を操作する際にも非常に役立ちます。 ぜひご活用ください。