Armadillo-IoT A6E のゲートウェイコンテナのカスタマイズ方法について説明します。既存のゲートウェイコンテナの動作確認を行っていない場合は、Armadillo-IoT ゲートウェイ A6E製品マニュアル「10.2. ゲートウェイコンテナを動かす」をご確認ください。また、本Howで使用しているコンテナイメージのバージョンについては、記載日現在 2023/02/13 リリースしている v2.0.0 となります。
本 Howto の構成は以下のとおりです。
- 1. システム概要
- 2. 開発環境をセットアップする
- 3. 設定ファイルをカスタマイズする
- 4. ゲートウェイアプリケーションのソースコードをカスタマイズする
- 5. ゲートウェイアプリケーションの動作確認
- 6. リリース版ゲートウェイアプリケーションをビルド、書き込みする
1. システム概要
本HowToの例では、下図のようにセンサや可変抵抗から取得したアナログ値をArmadillo-IoT A6E 内蔵のADCを用いてデジタル値に変換し、変換した値をクラウドにアップロードするシステムをゲートウェイアプリケーションに追加します。ADCの有効化方法については、[Howto] Armadillo-IoT A6E で ADC を使用する方法をご確認ください。
2. 開発環境をセットアップする
ゲートウェイアプリケーションのカスタマイズは、ATDE 上の VSCode にて行います。開発環境のセットアップ方法については、Armadillo-IoT ゲートウェイ A6E製品マニュアル「10.1.2. ATDE 上でのセットアップ」、「10.1.3. Armadillo 上でのセットアップ」をご確認ください。
3. 設定ファイルをカスタマイズする
ATDE上に展開した gw-app-project-[VERSION] ディレクトリ下の "app/config/sensing_mgr.conf" を編集し、ADCの動作に関する設定値を設定ファイルに追加します。 編集済みの sensing_mgr.conf は、以下のURLからダウンロードすることもできます。
sensing_mgr.confの追加内容は以下のとおりです。
[ADC] ; polling_interval[sec] polling_interval=1
上記設定内容についての詳細は以下のとおりです。
項目 | 概要 | 設定値 | 内容 |
---|---|---|---|
polling_interval | データ取得間隔[sec] | 1~3600 | この値に従って、ADCの値を読み出します |
設定ファイルに上記の内容が記載されていることを確認して保存します。また、デフォルトの場合はクラウド接続せず、定期的にADCのデータを取得するようになっています。必要に応じて設定ファイルを編集してください。設定ファイルの詳細についてはArmadillo-IoT ゲートウェイ A6E製品マニュアル「10.2.5. 設定ファイルの編集」をご確認ください。
4. ゲートウェイアプリケーションのソースコードをカスタマイズする
本Howtoで使用するソースコードを使用してゲートウェイアプリケーションのソースコードをカスタマイズする場合は、以下のようにダウンロードし、ATDE上に展開した gw-app-project-[VERSION] ディレクトリ下の "app/src/" に配置します。
ここにソースコードを配置することで、カスタマイズしたゲートウェイアプリケーションが動かせるようになります。カスタマイズしたゲートウェイアプリケーションの動かし方については 5. ゲートウェイアプリケーションの動作確認 をご確認ください。
[ATDE ~]# wget https://download.atmark-techno.com/sample/armadillo_iotg_a6e-gw-container-customize-howto/customize.tar [ATDE ~]# tar xf customize.tar [ATDE ~]# ls customize gw-app-project-[VERSION] cp -r customize/* gw-app-project-[VERSION]/app/src/ [ATDE ~]# ls gw-app-project-[VERSION]/app/src/ app main.py
上記手順でダウンロードした customize.tar の中身は以下のようになっています。
ファイル名 | 説明 |
---|---|
customize/main.py | デバイスモデル、デバイスコンフィグ、インターフェース設定の設定ファイル、接続先のクラウド情報の設定ファイルを指定します |
customize/app/sensing_mgr/model_config_base.py | コンテナアプリケーションの動作に関する設定値を、設定ファイル sensing_mgr.conf からパース・取得します |
customize/app/sensing_mgr/model_dev_a6e_base.py | ADC のReporterのインスタンスを作成します |
customize/app/sensing_mgr/reporters/adc_reporter.py | ADCから値を取得します |
ゲートウェイアプリケーションにセンサを追加する場合は以下のソースコードの説明を参考にカスタマイズしてください。
カスタマイズするソースコードの説明
main.py
main 関数
使用するデバイスモデル、デバイスコンフィグ、インターフェース設定の設定ファイル、接続先のクラウド情報の設定ファイルをここで指定します。センサを追加するだけであれば main.py はこのまま使用することが出来ます。
def main(): asyncio.run(mainloop(ModelDevA6EBase, ModelConfigA6EBase, "/config/sensing_mgr.conf", "/config/cloud_agent.conf"))
model_config_base.py
ModelConfigBase クラス
sensing_mgr.conf から取得するセクション名やパラメータ名、設定値のパース・取得等の処理を行う関数を持ちます。
- DEFAULT セクションは、ゲートウェイコンテナの全体的な動作設定を行うために必要となります。
- LOG セクションはログの出力設定を行うために必要となります。 これらのセクションの詳細についてはArmadillo-IoT ゲートウェイ A6E製品マニュアル「10.2.5.1. インターフェース設定」をご確認ください。
- ここでは、ADC セクションについてはセクション名 "ADC " 及び ADC のデータ取得間隔を設定する "POLLING_INTERVAL" パラメータを定義しています。
- "MAX_INTERVAL" では、ADC のデータ取得間隔(POLLING_INTERVAL)として設定できる最大値を定義しています。
class ModelConfigBase: # DEFAULT section … [1] CLOUD_CONFIG = "cloud_config" SEND_CLOUD = "send_cloud" CACHE = "cache" SEND_INTERVAL = "send_interval" DATA_SEND_ONESHOT = "data_send_oneshot" WAIT_CONTAINER_STOP = "wait_container_stop" # LOG section … [2] LOG = "LOG" FILE = "file" STREAM = "stream" # ADC section … [3] ADC = "ADC" POLLING_INTERVAL = "polling_interval" MAX_INTERVAL = 3600 … [4] ︙ (略)
load 関数
sensing_mgr.conf から設定値のパース、取得を行います。また、ADC セクションのセクション名が見つからない場合には、"[ADC] section is invalid." とエラーを出力します。
def load(self, config_file_path): config = configparser.ConfigParser() config.read(config_file_path) if not self.parse_default_config(config): self._logging.error("config file not found [DEFAULT] section.") return False self.parse_log_config(config) try: self.parse_adc_config(config) except Exception as e: self._logging.error("[ADC] section is invalid.") self._logging.error(e.args) return True
check_max_interval 関数
ADCからのデータ取得間隔(POLLING_INTERVAL)が最大値(MAX_INTERVAL)を超えていないかを確認します。最大値を超えていた場合には、"max interval check error." とエラーを出力します。
def check_max_interval(self, val): try: if val == "": return 1 if isinstance(val, str): val = int(val) if val > self.MAX_INTERVAL: val = self.MAX_INTERVAL msg = print( "max interval set {}[s].".format(self.MAX_INTERVAL)) self._logging.info(msg) return val except Exception as e: self._logging.error("max interval check error.") self._logging.error(e.args) return 1
parse_adc_config 関数
ADC セクションのパラメータのパースを行います。この関数は load 関数内で呼び出されます。
def parse_adc_config(self, config): adc_dict = {} section = ModelConfigBase.ADC interval = self.check_max_interval( config[section].getint(ModelConfigBase.POLLING_INTERVAL, 1)) adc_dict[ModelConfigBase.POLLING_INTERVAL] = interval self._configs[section] = adc_dict
get_adc_conf 関数
ADC セクションのパラメータを取得します。この関数は次に説明する update_adc_conf 関数内で呼び出されます。
def get_adc_conf(self): return self._configs[ModelConfigBase.ADC]
update_adc_conf 関数・update_config 関数
ADC セクションに設定された値を更新します。update_config 関数は、次に説明する model_dev_a6e_base.py の update_reporter 関数内で呼び出されます。
def update_adc_conf(self, key, dict): old_conf = None section = "" if key == ModelConfigBase.ADC: old_conf = self.get_adc_conf() section = ModelConfigBase.ADC else: self._logging.error("ADC config key is invalid.") return None for key, value in dict.items(): old_conf[key] = value self._configs[section] = old_conf return section def update_config(self, data): update_list = {} for key, value in data.items(): section = None if "_config" not in key: continue index = key.find('_config') key = key[:index] if "ADC" in key: section = self.update_adc_conf(key, value) if section is not None: update_list[section] = self._configs[section] return update_list
model_dev_a6e_base.py
ModelDevA6EBase クラス
Reporterのインスタンスを作成する関数、クラウドからのコンフィグ更新を行う関数を持ちます。
class ModelDevA6EBase(ModelDevBase): def __init__(self, modelConfig, cloud_config, send_cloud): super().__init__(modelConfig, send_cloud) self._adc_reporter = None self._model_config = modelConfig self._cloud_config = cloud_config self._logging = logging.getLogger(Const.LOGGER_NAME) ︙ (略)
on_recv_data 関数
クラウドからのデータ送信があるかどうかを調べ、データがある場合に設定値を更新します。
def on_recv_data(self, data): if data is None: self._logging.info("data from cloud is empty.") return False if self._cloud_config: self._logging.info("update config from cloud.") self.update_reporter(self._model_config, data) else: self._logging.info("config don't update from cloud.") return
update_reporter 関数
ADC セクションに設定された値を更新します。
def update_reporter(self, config, data): update_list = config.update_config(data) if not update_list: return for section, conf in update_list.items(): if section == config.ADC: self._adc_reporter.set_interval(conf.get(config.POLLING_INTERVAL))
_make_active_reporters 関数・_make_reporters 関数
Reporterのインスタンスを作成します。
def _make_active_reporters(self, config, report_queue, report_repository): reporters = [] return reporters def _make_reporters(self, config): reporters = [] conf = config.get_adc_conf() adcReporter = ADCReporter( start=True, interval=conf.get(config.POLLING_INTERVAL) ) adcReporter.set_interval(conf.get(config.POLLING_INTERVAL)) reporters.append(adcReporter) self._adc_reporter = adcReporter return reporters
adc_reporter.py
ADCReporter クラス
ADCから値を取得する関数を持ちます。
class ADCReporter(Reporter): def __init__(self, start, interval): super().__init__() self._prop = {} self._prop["ADC_polling_interval"] = interval self._oneshot = False self._oneshot_send = False self._start = start self._logging = logging.getLogger(Const.LOGGER_NAME) ︙ (略)
data_type 関数
取得するデータタイプの説明を返り値として返します。
def data_type(self): return 'adc_value'
oneshot_report 関数
1回だけデータを取得し、コンテナを終了するように _oneshot を True とします。
def oneshot_report(self): self._oneshot = True
report 関数
ADCReporter の主要な処理部分です。
- reporter が取得開始していない場合、空の report を作成します。
- oneshot の場合は一回だけデータを取得して、 report を作成します。
- ADC1(IN1) から値を読み込みます。また、値の取得に失敗した場合は、"ADC value read error." とエラーを出力します。
- report の作成します。
def report(self): report = None if self._start is False: … [1] return (report, None) if self._oneshot is False or \ (self._oneshot is True and self._oneshot_send is False): … [2] value = None try: … [3] with open('/sys/bus/iio/devices/iio:device0/in_voltage1_raw') as file: value = int(file.read()) except Exception as e: self._logging.error("ADC value read error.") self._logging.error(e.args) report = Report.report_now( … [4] 'measurement', ADC_value=value ) self._oneshot_send = True return (report, None)
5. カスタマイズしたゲートウェイアプリケーションの動作確認
カスタマイズしたゲートウェイアプリケーションを実行するには、VSCode の [Terminal] メニューから [Run Task…] を選択し更に表示された項目から、 [App run on Armadillo] を選択します。
ゲートウェイアプリケーションのログはArmadillo上から以下の様に確認することが出来ます。
[armadillo ~]# tail -F /var/app/volumes/gw_container/log/sensing_mgr.log
以下のような内容が表示されていれば、ADCからの値の取得に成功しています。
2022-11-30 15:26:07,973 : {'data': {'ADC_value': 1046, 'timestamp': 1669789560}}
ゲートウェイアプリケーションのログについての詳細は Armadillo-IoT ゲートウェイ A6E製品マニュアル「10.2.9. ログ内容確認」をご確認ください。
実行したゲートウェイアプリケーションを停止する場合は、VSCode の [Terminal] メニューから [Run Task…] を選択し更に表示された項目から、 [App stop on Armadillo] を選択します。
6. リリース版ゲートウェイアプリケーションをビルド、書き込みする
完成したゲートウェイアプリケーションをリリース版としてビルドし、書き込む方法についてはArmadillo-IoT ゲートウェイ A6E製品マニュアル「10.1.6. リリース版のビルド」、「10.1.7. 製品への書き込み」をご確認ください。