Howto

Armadillo-IoT G4 でAWS IoT Greengrass V2 を使用した Pub/Sub AWS IoT Core MQTT メッセージの確認方法

本 Howto ではすでに Armadillo-IoT G4 のコンテナ内で AWS IoT Greengrass V2 の動作確認と、カスタムコンポーネントのデプロイを終えているものとして説明します。Armadillo-IoT G4 での AWS IoT Greengrass V2 の使用がまだの方は、先にArmadillo-IoT G4 で AWS IoT Greengrass V2 を使用する方法をご確認ください。

Armadillo-IoT G4 で AWS IoT Greengrass V2 を使用した Pub/Sub AWS IoT Core MQTT メッセージの確認方法について説明します。

本 Howto の構成は以下です。

システム構成

以下にシステム構成図を示します。

前回は lambda 関数に対して Publish のみ確認しましたが、今回はカスタムコンポーネントを作成し Publish / Subscribe 両方について確認します。今回のカスタムコンポーネントは Publish/Subscribe に AWS IoT Core MQTT メッセージを使用します。

AWS IoT Core MQTT メッセージとは

AWS IoT Core MQTT メッセージは、AWS IoT Core が使用するデバイス通信プロトコルの一つです。詳細は、MQTTMQTT トピックをご確認ください。本 Howto では AWS IoT Core MQTT メッセージを使用して Pub/Sub コンポーネントの動作確認を行います。

Pub/Sub AWS IoT Core MQTT メッセージの動作確認

Pub/Sub AWS IoT Core MQTT メッセージの動作確認についてはパブリッシュ/サブスクライブAWS IoT CoreMQTT メッセージを参考に Publish コンポーネントと Subscribe コンポーネントを作成し、動作確認を行います。

Publish コンポ―ネントの作成と確認

最初に Publish コンポーネントの動作確認を行います。パブリッシュ/サブスクライブAWS IoT CoreMQTT メッセージを参考に Publish コンポーネントを作成します。

1. Publish コンポ―ネントの作成

以下のコンポーネントを作成します。

  • コンポーネント名: "com.example.Publisher"
  • バージョン: "1.0.0"

2. Amazon S3 へのアーティファクトのアップロード

カスタムコンポーネントの作成とデプロイ を参考に Amazon S3 へアーティファクトをアップロードします。今回は例として "<S3 バケット名 >/artifacts/com.example.Publisher/1.0.0/pub.py" に配置します。以下はトピック名 "my/topic" に対して、json メッセージを発行するサンプルプログラムになります。

import json
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
    QOS,
    PublishToIoTCoreRequest
)

TIMEOUT = 10

ipc_client = awsiot.greengrasscoreipc.connect()

topic = "my/topic" 
message = { "message": "Hello, World" }
message = json.dumps(message).encode('utf-8')
qos = QOS.AT_LEAST_ONCE

request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = message
request.qos = qos
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future = operation.get_response()
future.result(TIMEOUT)

3. AWS IoT Greengrass にコンポーネント作成

カスタムコンポーネントの作成とデプロイ を参考に以下の json 記述のレシピを用いてコンポーネントを作成します。今回は例として 2 で配置した "<S3 バケット名 >/artifacts/com.example.Publisher/1.0.0/pub.py" を指定します。

{
  "RecipeFormatVersion": "2020-01-25",
  "ComponentName": "com.example.Publisher",
  "ComponentVersion": "1.0.0",
  "ComponentDescription": "A component that publishes messages.",
  "ComponentPublisher": "Amazon",
  "ComponentConfiguration": {
    "DefaultConfiguration": {
      "accessControl": {
        "aws.greengrass.ipc.mqttproxy": {
          "com.example.Publisher:mqttproxy:1": {
            "policyDescription": "Allows access to publish/subscribe to all topics.",
            "operations": [
              "aws.greengrass#PublishToIoTCore"
            ],
            "resources": [
              "my/topic" 
            ]
          }
        }
      }
    }
  },
  "Manifests": [
    {
      "Platform": {
        "os": "linux" 
      },
      "Lifecycle": {
        "Install": "pip3 install awsiotsdk",
        "Run": "python3 -u {artifacts:path}/pub.py" 
      },
    "Artifacts": [
            {
                  "URI": "s3://<S3 バケット名>/artifacts/com.example.Publisher/1.0.0/pub.py",
                  "Unarchive": "NONE" 
            }
      ]
    }
  ]
}

4. IoT AWS console からの確認準備

デプロイされた直後に Publish コンポーネントが動くため、あらかじめ確認準備を行います。AWS IoT Core console を用いて動作確認を行います。AWS IoT Core console の "テスト" 内の "MQTT テストクライアント" を選択します。タブで "トピックをサブスクライブする" を選び、トピックのフィルターにトピック名 "my/topic" を入力し "サブスクライブ" ボタンを押して追加します。

5. IoT AWS Greengrassからのコンポーネントのデプロイ

カスタムコンポーネントの作成とデプロイを参考にコアデバイスにコンポーネントをデプロイします。

6. デプロイの確認

コンポーネントの一覧の中にコンポーネント名 "com.example.Publisher" が見つかればデプロイに成功しています。

7. Publish コンポ―ネントの動作確認

4 で設定した AWS IoT Core console の "テスト" 内の "MQTT テストクライアント" を確認します。以下のメッセージが確認できれば受信成功です。

{ "message": "Hello, World" }

Subscribe コンポ―ネントの作成と確認

次に Subscribe コンポーネントの動作確認を行います。パブリッシュ/サブスクライブAWS IoT CoreMQTT メッセージを参考に Subscribe コンポーネントを作成します。

1. Subscribe コンポ―ネントの作成

以下のコンポーネントを作成します。

  • コンポーネント名: "com.example.Subscriber"
  • バージョン: "1.0.0"

2. Amazon S3 へのアーティファクトのアップロード

カスタムコンポーネントの作成とデプロイ を参考に Amazon S3 へアーティファクトをアップロードします。今回は例として "<S3 バケット名 >/artifacts/com.example.Subscriber/1.0.0/sub.py" に配置します。以下はトピック名 "my/topic" を受信してメッセージ内容をログに表示するサンプルプログラムになります。

import time
import traceback

import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
    IoTCoreMessage,
    QOS,
    SubscribeToIoTCoreRequest
)

TIMEOUT = 10

ipc_client = awsiot.greengrasscoreipc.connect()

class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
    def __init__(self):
        super().__init__()

    def on_stream_event(self, event: IoTCoreMessage) -> None:
        try:
            message = str(event.message.payload, "utf-8")
            # Handle message.
            print("payload: {0}".format(message))
        except:
            traceback.print_exc()

    def on_stream_error(self, error: Exception) -> bool:
        # Handle error.
        return True  # Return True to close stream, False to keep stream open.

    def on_stream_closed(self) -> None:
        # Handle close.
        pass

topic = "my/topic" 
qos = QOS.AT_MOST_ONCE

request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
future = operation.activate(request)
future.result(TIMEOUT)

# Keep the main thread alive, or the process will exit.
while True:
    time.sleep(10)

# To stop subscribing, close the operation stream.
operation.close()

3. AWS IoT Greengrass にコンポーネント作成

カスタムコンポーネントの作成とデプロイ を参考に以下の json 記述のレシピを用いてコンポーネントを作成します。今回は例として 2 で配置した "<S3 バケット名 >/artifacts/com.example.Subscriber/1.0.0/sub.py" を指定します。

{
  "RecipeFormatVersion": "2020-01-25",
  "ComponentName": "com.example.Subscriber",
  "ComponentVersion": "1.0.0",
  "ComponentDescription": "A component that subscribes messages.",
  "ComponentPublisher": "Amazon",
  "ComponentConfiguration": {
    "DefaultConfiguration": {
      "accessControl": {
        "aws.greengrass.ipc.mqttproxy": {
          "com.example.Subscriber:mqttproxy:1": {
            "policyDescription": "Allows access to publish/subscribe to all topics.",
            "operations": [
              "aws.greengrass#SubscribeToIoTCore" 
            ],
            "resources": [
              "my/topic" 
            ]
          }
        }
      }
    }
  },
  "Manifests": [
    {
      "Platform": {
        "os": "linux" 
      },
      "Lifecycle": {
        "Install": "pip3 install awsiotsdk",
        "Run": "python3 -u {artifacts:path}/sub.py" 
      },
    "Artifacts": [
            {
                  "URI": "s3://<S3 バケット名>/artifacts/com.example.Subscriber/1.0.0/sub.py",
                  "Unarchive": "NONE" 
            }
      ]
    }
  ]
}

4. IoT AWS Greengrassからのコンポーネントのデプロイ

カスタムコンポーネントの作成とデプロイ を参考にコアデバイスにコンポーネントをデプロイします。

5. デプロイの確認

コンポーネントの一覧の中にコンポーネント名 "com.example.Subscriber" が見つかればデプロイに成功しています。

6. Subscribe コンポ―ネントの動作確認

AWS IoT Core console を用いて動作確認を行います。AWS IoT Core console の "テスト" 内の "MQTT テストクライアント" を選択します。タブで "トピックに公開する" を選びます。以下のように内容を設定します。

  • トピック名: my/topic
  • メッセージペイロード
{ "message": "hello!" }

上記設定後 "発行" を選択します。Subscribe コンポーネントが動作しているかコンテナ上でログを確認します。

[container ~]# sudo tail -f /greengrass/v2/logs/com.example.Subscriber.log

以下のようなログが確認できれば、動作確認は終了です。

2021-12-23T06:25:54.933Z [INFO] (Copier) com.example.Subscriber: stdout. payload: {. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
2021-12-23T06:25:54.934Z [INFO] (Copier) com.example.Subscriber: stdout. "message": "hello!". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
2021-12-23T06:25:54.934Z [INFO] (Copier) com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}

"MQTTテストクライアント" で発行したメッセージを確認できたので、受信も問題ないことがわかります。

以上で、Armadillo-IoT G4 で AWS IoT Greengrass V2 を使用した Pub/Sub AWS IoT Core MQTT メッセージの確認方法についての説明を終了します。