ブログ

Armadillo-640:BT/THオプションモジュールを使用してビーコンの接近を感知、警報を出力する

at_takuma.fukuda
2021年12月8日 13時52分

下記記事でビーコンデータを収集するアプリケーションを動作させましたが、
こちらを応用して、特定のビーコンがArmadillo-640に接近したら警報を出力するシステムを構築しました。
Armadillo-640:BT/THオプションモジュールを使用してビーコンデータを収集する

機器構成

  • Armadillo-640
  • BT/THオプションモジュール
  • 警報装置:IWT120-USB(東京デバイセズ)
    USBでArmadillo-640と接続します。
  • USBハブ
    USBの相性により、警報装置とArmadillo-640を直接接続しても使用出来なかったため、
    HUBを介して接続しています。
  • ビーコン:iBeaconタイプのもの
    このデモでは、 Feasycom社製FSC-BP03を使用

アプリケーション

事前準備

警報出力処理の準備

以下をダウンロードする
IWT120-USB 制御コマンド(Linux) 0.0.1
解凍し、README.txtに従ってビルドする

BLE通信機能

以下を実行し、必要なライブラリ・モジュールをインストールしておく

armadillo:~# apt-get install python3-pip python3-dev ipython
armadillo:~# apt-get install bluetooth libbluetooth-dev
armadillo:~# pip3 install pybluez

機能概要

BLEパケット受信

BLEパケットを受信する

受信したパケットデータのフィルタリング

iBeaconタイプのアドバタイジングパケットのみをフィルタリングする
UUID・Majorを元に対象のビーコンのパケットのみをフィルタリングする

ビーコンとの距離推定

txPowerとRSSIから距離を推定する

d = 10 ^ ((TxPower - RSSI) / 20)

警報動作

距離が一定の閾値よりも近くなると警報を発報する
IWT120-USBの警報発報コマンドを実行する
警報発報後、一定時間ビーコンが近距離に無ければ警報を解除する

例外処理

侵入許可リストにあるビーコンについては、近づいても警報を発報しない

設定変更

UUID・Major・警報発報距離・警報解除までの時間は設定ファイルにて変更できるものとする
→起動時に読み込んで動作する。設定変更の場合はアプリケーション再起動が必要
侵入許可リストはCSVファイルにて作成できるものとする
侵入を許可してもよいビーコンのMinor値を入力しておく

設定

検出するビーコンのUUID・Major、警報を発報する距離、警報解除までの秒数を設定出来るようにしています。
以下のフォーマットでファイルへ記載すると読み込んで動作します。

uuid:
- XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
major:
- XX
#距離(m)
distance:
- 0.1
#解除までの秒数
cancellation:
- 30

許可リスト

同じUUID・Majorのビーコンのうち、接近を許可したいものについては予めMinorをリストに登録しておくことで、
接近しても警報を発報しないように出来ます。
カンマ区切りのCSVファイルに記載することで登録可能です。

ソースコード

import sys
import struct
 
import time
import yaml
import csv
 
import os
 
import bluetooth._bluetooth as bluez
 
OGF_LE_CTL=0x08
OCF_LE_SET_SCAN_ENABLE=0x000C
 
OGF_LE_SET_SCAN_PARAMETERS=0x000b
 
#iBeaconを識別する値
IBEACONTYPE="4c000215"
 
#設定ファイルのファイルパス
CONFIGFILE='/root/BeaconScan.conf'
 
#許可リストCSVファイルのファイルパス
CONFIMCSV='/root/confirm.csv'
 
DEV_ID=0
 
#グローバル変数定義
st = 0
chkuuid=""
chkmajor=0
chkdistacnce=0
confirms=[]
 
def open_dev(dev_id):
    sock=bluez.hci_open_dev(dev_id)
    return sock
 
def hci_enable_le_scan(sock):
    hci_toggle_le_scan(sock, 0x01)
 
def hci_disable_le_scan(sock):
    hci_toggle_le_scan(sock, 0x00)
 
def hci_toggle_le_scan(sock, enable):
    cmd_pkt = struct.pack("<BB", enable, 0x00)
    bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
 
def hci_le_set_scan_parameters(sock):
    cmd_pkt = struct.pack("<BHHBB",0x01,0x0010,0x0010,0x00,0x00)
    bluez.hci_send_cmd(sock, OGF_LE_CTL, OGF_LE_SET_SCAN_PARAMETERS, cmd_pkt) 
 
def packetToString(packet):
    if sys.version_info > (3, 0):
        return ''.join('%02x' % struct.unpack("B", bytes([x]))[0] for x in packet)
    else:
        return ''.join('%02x' % struct.unpack("B", x)[0] for x in packet)
#バイナリを8ビット符号付整数に変換
def s8(value):
    return -(value & 0b10000000) | (value & 0b01111111)
 
def get_start_time():
    global st
    st =time.time()
 
#警報発報処理
#iwt120-USBの制御コマンド実行
def wake_alerm():
    os.system('/root/iwt120ctl/iwt120ctl set ANY 32')
 
#警報停止処理
#iwt120-USBの制御コマンド実行
def sleep_alerm():
    os.system('/root/iwt120ctl/iwt120ctl set ANY 0')
 
#confファイルからの設定情報読出処理
def get_setting_data():
    global chkuuid
    global chkmajor
    global chkdistance
    with open(CONFIGFILE, 'r') as file:
        config = yaml.load(file, Loader=yaml.SafeLoader)
        chkuuid = config['uuid'][0]
        chkmajor = config['major'][0]
        chkdistance = config['distance'][0]
 
#CSVファイルからのMinor値読出処理
def get_confirm_list():
    with open(CONFIMCSV) as f:
        global confirms
        reader=csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)
        confirms=[row for row in reader]
#受信したパケットのparseと内容確認
def parse_and_check(sock, loop_count=100):
    old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
    flt = bluez.hci_filter_new()
    bluez.hci_filter_all_events(flt)
    bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
    sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
    for i in range(0, loop_count):
        packet = sock.recv(255)
        packetOffset = 0
        dataString = packetToString(packet)
 
        type = dataString[38:46]
#iBeaconのパケットであるかを確認
        if dataString[38:46] == IBEACONTYPE:
#UUIDを取得
            uuid = dataString[46:54] + "-" + dataString[54:58] + "-" + dataString[58:62] + "-" + dataString[62:66] + "-" + dataString[66:78]
#Majorを取得
            major = dataString[78:82]
            majorVal = int("".join(major.split()[::-1]), 16)
#Minorを取得
            minor = dataString[82:86]
            minorVal = int("".join(minor.split()[::-1]), 16)
#txpowerを取得
            txpower = dataString[86:88]
            txpowerVal = s8(int("".join(txpower.split()[::-1]),16))
#rssiを取得
            if sys.version_info[0] == 3:
                rssi, = struct.unpack("b", bytes([packet[packetOffset-1]]))
            else:
                rssi, = struct.unpack("b", packet[packetOffset-1])
#取得したデータを確認する処理
            chk_beacondata(uuid,majorVal,minorVal,txpowerVal,rssi)
 
def chk_beacondata(uuid,majorVal,minorVal,txpowerVal,rssi):
    global chkuuid
    global chkmajor
    if uuid == chkuuid and majorVal == chkmajor:
        if chk_confirm(minorVal) == 0:
            chk_distance(txpowerVal,rssi)
#Minorを事前に登録した許可済のものと照合                            
def chk_confirm(minorVal):
    confirm = 0
    global confirms
    for i in confirms[0]:
        if minorVal == int(i):
            confirm = 1; 
    return confirm
#rssiとtxpowerから距離計算をして閾値と比較
def chk_distance(txpowerVal,rssi):
    distance = pow(10.0, (txpowerVal - rssi) / 20.0)
    global chkdistance
    global st
    if distance < chkdistance:
        get_start_time()
        wake_alerm()
 
def get_setting():
    get_setting_data()
    get_confirm_list()
    get_start_time()
    try:
        sock = open_dev(DEV_ID)
        return sock
    except:
        print ("Error accessing bluetooth")
#警報発報からの経過時間を確認し、一定時間経過していたら解除する
def chk_timer():
    nt =time.time()
    global st
    pt = nt - st
    global cancellation
    if  pt > cancellation:
        sleep_alerm()
        st = nt
 
def start_scan(sock):
    hci_le_set_scan_parameters(sock)
    hci_enable_le_scan(sock)
    try:
        while True:
            parse_and_check(sock, 10)
            chk_timer()
 
    except KeyboardInterrupt:
        hci_disable_le_scan(sock)
        pass
 
#メイン処理
sock = get_setting()
start_scan(sock)

実行

上記ソースコードを任意のファイル名(拡張子py)で保存し、
以下のように実行すると動作します。(仮にscanner.pyというファイル名にしています。)

armadillo:~# python3 scanner.py