ブログ

Armadillo-640:BT/THオプションモジュールを使用してビーコンの接近を感知、警報を出力する

at_takuma.fukuda
2021年12月8日 13時52分

下記記事でビーコンデータを収集するアプリケーションを動作させましたが、
こちらを応用して、特定のビーコンがArmadillo-640に接近したら警報を出力するシステムを構築しました。
Armadillo-640:BT/THオプションモジュールを使用してビーコンデータを収集する

機器構成

  • Armadillo-640
  • BT/THオプションモジュール
  • 警報装置:IWT120-USB(東京デバイセズ)
    USBでArmadillo-640と接続します。
  • USBハブ
    USBの相性により、警報装置とArmadillo-640を直接接続しても使用出来なかったため、
    HUBを介して接続しています。
  • ビーコン:iBeaconタイプのもの
    このデモでは、 Feasycom社製FSC-BP03を使用

アプリケーション

事前準備

警報出力処理の準備

以下をダウンロードする
IWT120-USB 制御コマンド(Linux) 0.0.1
解凍し、README.txtに従ってビルドする
*ビルドの際にはlibusb-devが必要になるので事前にインストールしておいてください。

BLE通信機能

以下を実行し、必要なライブラリ・モジュールをインストールしておく

armadillo:~# apt-get install python3-pip python3-dev ipython
armadillo:~# apt-get install bluetooth libbluetooth-dev python3-yaml
armadillo:~# pip3 install pybluez

機能概要

BLEパケット受信

BLEパケットを受信する

受信したパケットデータのフィルタリング

iBeaconタイプのアドバタイジングパケットのみをフィルタリングする
UUID・Majorを元に対象のビーコンのパケットのみをフィルタリングする

ビーコンとの距離推定

txPowerとRSSIから距離を推定する

d = 10 ^ ((TxPower - RSSI) / 20)

警報動作

距離が一定の閾値よりも近くなると警報を発報する
IWT120-USBの警報発報コマンドを実行する
警報発報後、一定時間ビーコンが近距離に無ければ警報を解除する

例外処理

侵入許可リストにあるビーコンについては、近づいても警報を発報しない

設定変更

UUID・Major・警報発報距離・警報解除までの時間は設定ファイルにて変更できるものとする
→起動時に読み込んで動作する。設定変更の場合はアプリケーション再起動が必要
侵入許可リストはCSVファイルにて作成できるものとする
侵入を許可してもよいビーコンのMinor値を入力しておく

設定

検出するビーコンのUUID・Major、警報を発報する距離、警報解除までの秒数を設定出来るようにしています。
以下のフォーマットでファイルへ記載すると読み込んで動作します。
下記サンプルコードでは、/root直下の「BeaconScan.conf」という名前のファイルを設定ファイルとして読み込むようにしています。
コードを変更することで設定ファイル名や配置箇所も変更できます。

uuid:
- XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
major:
- XX
#距離(m)
distance:
- 0.1
#解除までの秒数
cancellation:
- 30

許可リスト

同じUUID・Majorのビーコンのうち、接近を許可したいものについては予めMinorをリストに登録しておくことで、
接近しても警報を発報しないように出来ます。
カンマ区切りのCSVファイルに記載することで登録可能です。 サンプルコードでは/root直下の「confirm.csv」という名前のファイルを読み込むようにしています。
設定ファイルと同様、コードを変更することで設定ファイル名や配置箇所も変更できます。

ソースコード

import sys
import struct

import time
import yaml
import csv

import os

import bluetooth._bluetooth as bluez

OGF_LE_CTL=0x08
OCF_LE_SET_SCAN_ENABLE=0x000C

OGF_LE_SET_SCAN_PARAMETERS=0x000b

#iBeaconを識別する値
IBEACONTYPE="4c000215"

#設定ファイルのファイルパス
CONFIGFILE='/root/BeaconScan.conf'

#許可リストCSVファイルのファイルパス
CONFIMCSV='/root/confirm.csv'

DEV_ID=0

#グローバル変数定義
st = 0
chkuuid=""
chkmajor=0
chkdistacnce=0
confirms=[]
cancellation=0

def open_dev(dev_id):
    sock=bluez.hci_open_dev(dev_id)
    return sock

def hci_enable_le_scan(sock):
    hci_toggle_le_scan(sock, 0x01)

def hci_disable_le_scan(sock):
    hci_toggle_le_scan(sock, 0x00)

def hci_toggle_le_scan(sock, enable):
    cmd_pkt = struct.pack("<BB", enable, 0x00)
    bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)

def hci_le_set_scan_parameters(sock):
    cmd_pkt = struct.pack("<BHHBB",0x01,0x0010,0x0010,0x00,0x00)
    bluez.hci_send_cmd(sock, OGF_LE_CTL, OGF_LE_SET_SCAN_PARAMETERS, cmd_pkt) 

def packetToString(packet):
    if sys.version_info > (3, 0):
        return ''.join('%02x' % struct.unpack("B", bytes([x]))[0] for x in packet)
    else:
        return ''.join('%02x' % struct.unpack("B", x)[0] for x in packet)
#バイナリを8ビット符号付整数に変換
def s8(value):
    return -(value & 0b10000000) | (value & 0b01111111)

def get_start_time():
    global st
    st =time.time()

#警報発報処理
#iwt120-USBの制御コマンド実行
def wake_alerm():
    os.system('/root/iwt120ctl/iwt120ctl set ANY 32')

#警報停止処理
#iwt120-USBの制御コマンド実行
def sleep_alerm():
    os.system('/root/iwt120ctl/iwt120ctl set ANY 0')

#confファイルからの設定情報読出処理
def get_setting_data():
    global chkuuid
    global chkmajor
    global chkdistance
    with open(CONFIGFILE, 'r') as file:
        config = yaml.load(file, Loader=yaml.SafeLoader)
        chkuuid = config['uuid'][0]
        chkmajor = config['major'][0]
        chkdistance = config['distance'][0]

#CSVファイルからのMinor値読出処理
def get_confirm_list():
    with open(CONFIMCSV) as f:
        global confirms
        reader=csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)
        confirms=[row for row in reader]
#受信したパケットのparseと内容確認
def parse_and_check(sock, loop_count=100):
    old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
    flt = bluez.hci_filter_new()
    bluez.hci_filter_all_events(flt)
    bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
    sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
    for i in range(0, loop_count):
        packet = sock.recv(255)
        packetOffset = 0
        dataString = packetToString(packet)

        type = dataString[38:46]
#iBeaconのパケットであるかを確認
        if dataString[38:46] == IBEACONTYPE:
#UUIDを取得
            uuid = dataString[46:54] + "-" + dataString[54:58] + "-" + dataString[58:62] + "-" + dataString[62:66] + "-" + dataString[66:78]
#Majorを取得
            major = dataString[78:82]
            majorVal = int("".join(major.split()[::-1]), 16)
#Minorを取得
            minor = dataString[82:86]
            minorVal = int("".join(minor.split()[::-1]), 16)
#txpowerを取得
            txpower = dataString[86:88]
            txpowerVal = s8(int("".join(txpower.split()[::-1]),16))
#rssiを取得
            if sys.version_info[0] == 3:
                rssi, = struct.unpack("b", bytes([packet[packetOffset-1]]))
            else:
                rssi, = struct.unpack("b", packet[packetOffset-1])
#取得したデータを確認する処理
            chk_beacondata(uuid,majorVal,minorVal,txpowerVal,rssi)

def chk_beacondata(uuid,majorVal,minorVal,txpowerVal,rssi):
    global chkuuid
    global chkmajor
    if uuid == chkuuid and majorVal == chkmajor:
        if chk_confirm(minorVal) == 0:
            chk_distance(txpowerVal,rssi)
#Minorを事前に登録した許可済のものと照合                            
def chk_confirm(minorVal):
    confirm = 0
    global confirms
    for i in confirms[0]:
        if minorVal == int(i):
            confirm = 1; 
    return confirm
#rssiとtxpowerから距離計算をして閾値と比較
def chk_distance(txpowerVal,rssi):
    distance = pow(10.0, (txpowerVal - rssi) / 20.0)
    global chkdistance
    global st
    if distance < chkdistance:
        get_start_time()
        wake_alerm()

def get_setting():
    get_setting_data()
    get_confirm_list()
    get_start_time()
    try:
        sock = open_dev(DEV_ID)
        return sock
    except:
        print ("Error accessing bluetooth")
#警報発報からの経過時間を確認し、一定時間経過していたら解除する
def chk_timer():
    nt =time.time()
    global st
    pt = nt - st
    global cancellation
    if  pt > cancellation:
        sleep_alerm()
        st = nt

def start_scan(sock):
    hci_le_set_scan_parameters(sock)
    hci_enable_le_scan(sock)
    try:
        while True:
            parse_and_check(sock, 10)
            chk_timer()

    except KeyboardInterrupt:
        hci_disable_le_scan(sock)
        pass

#メイン処理
sock = get_setting()
start_scan(sock)

実行

上記ソースコードを任意のファイル名(拡張子py)で保存し、
以下のように実行すると動作します。(仮にscanner.pyというファイル名にしています。)

armadillo:~# python3 scanner.py