下記記事でビーコンデータを収集するアプリケーションを動作させましたが、
こちらを応用して、特定のビーコンがArmadillo-640に接近したら警報を出力するシステムを構築しました。
Armadillo-640:BT/THオプションモジュールを使用してビーコンデータを収集する
機器構成
- Armadillo-640
- BT/THオプションモジュール
- 警報装置:IWT120-USB(東京デバイセズ)
USBでArmadillo-640と接続します。 - USBハブ
USBの相性により、警報装置とArmadillo-640を直接接続しても使用出来なかったため、
HUBを介して接続しています。 - ビーコン:iBeaconタイプのもの
このデモでは、 Feasycom社製FSC-BP03を使用
アプリケーション
事前準備
警報出力処理の準備
以下をダウンロードする
IWT120-USB 制御コマンド(Linux) 0.0.1
解凍し、README.txtに従ってビルドする
*ビルドの際にはlibusb-devが必要になるので事前にインストールしておいてください。
BLE通信機能
以下を実行し、必要なライブラリ・モジュールをインストールしておく
armadillo:~# apt-get install python3-pip python3-dev ipython armadillo:~# apt-get install bluetooth libbluetooth-dev python3-yaml armadillo:~# pip3 install pybluez
機能概要
BLEパケット受信
BLEパケットを受信する
受信したパケットデータのフィルタリング
iBeaconタイプのアドバタイジングパケットのみをフィルタリングする
UUID・Majorを元に対象のビーコンのパケットのみをフィルタリングする
ビーコンとの距離推定
txPowerとRSSIから距離を推定する
d = 10 ^ ((TxPower - RSSI) / 20)
警報動作
距離が一定の閾値よりも近くなると警報を発報する
IWT120-USBの警報発報コマンドを実行する
警報発報後、一定時間ビーコンが近距離に無ければ警報を解除する
例外処理
侵入許可リストにあるビーコンについては、近づいても警報を発報しない
設定変更
UUID・Major・警報発報距離・警報解除までの時間は設定ファイルにて変更できるものとする
→起動時に読み込んで動作する。設定変更の場合はアプリケーション再起動が必要
侵入許可リストはCSVファイルにて作成できるものとする
侵入を許可してもよいビーコンのMinor値を入力しておく
設定
検出するビーコンのUUID・Major、警報を発報する距離、警報解除までの秒数を設定出来るようにしています。
以下のフォーマットでファイルへ記載すると読み込んで動作します。
下記サンプルコードでは、/root直下の「BeaconScan.conf」という名前のファイルを設定ファイルとして読み込むようにしています。
コードを変更することで設定ファイル名や配置箇所も変更できます。
uuid: - XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX major: - XX #距離(m) distance: - 0.1 #解除までの秒数 cancellation: - 30
許可リスト
同じUUID・Majorのビーコンのうち、接近を許可したいものについては予めMinorをリストに登録しておくことで、
接近しても警報を発報しないように出来ます。
カンマ区切りのCSVファイルに記載することで登録可能です。
サンプルコードでは/root直下の「confirm.csv」という名前のファイルを読み込むようにしています。
設定ファイルと同様、コードを変更することで設定ファイル名や配置箇所も変更できます。
ソースコード
import sys import struct import time import yaml import csv import os import bluetooth._bluetooth as bluez OGF_LE_CTL=0x08 OCF_LE_SET_SCAN_ENABLE=0x000C OGF_LE_SET_SCAN_PARAMETERS=0x000b #iBeaconを識別する値 IBEACONTYPE="4c000215" #設定ファイルのファイルパス CONFIGFILE='/root/BeaconScan.conf' #許可リストCSVファイルのファイルパス CONFIMCSV='/root/confirm.csv' DEV_ID=0 #グローバル変数定義 st = 0 chkuuid="" chkmajor=0 chkdistacnce=0 confirms=[] cancellation=0 def open_dev(dev_id): sock=bluez.hci_open_dev(dev_id) return sock def hci_enable_le_scan(sock): hci_toggle_le_scan(sock, 0x01) def hci_disable_le_scan(sock): hci_toggle_le_scan(sock, 0x00) def hci_toggle_le_scan(sock, enable): cmd_pkt = struct.pack("<BB", enable, 0x00) bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt) def hci_le_set_scan_parameters(sock): cmd_pkt = struct.pack("<BHHBB",0x01,0x0010,0x0010,0x00,0x00) bluez.hci_send_cmd(sock, OGF_LE_CTL, OGF_LE_SET_SCAN_PARAMETERS, cmd_pkt) def packetToString(packet): if sys.version_info > (3, 0): return ''.join('%02x' % struct.unpack("B", bytes([x]))[0] for x in packet) else: return ''.join('%02x' % struct.unpack("B", x)[0] for x in packet) #バイナリを8ビット符号付整数に変換 def s8(value): return -(value & 0b10000000) | (value & 0b01111111) def get_start_time(): global st st =time.time() #警報発報処理 #iwt120-USBの制御コマンド実行 def wake_alerm(): os.system('/root/iwt120ctl/iwt120ctl set ANY 32') #警報停止処理 #iwt120-USBの制御コマンド実行 def sleep_alerm(): os.system('/root/iwt120ctl/iwt120ctl set ANY 0') #confファイルからの設定情報読出処理 def get_setting_data(): global chkuuid global chkmajor global chkdistance with open(CONFIGFILE, 'r') as file: config = yaml.load(file, Loader=yaml.SafeLoader) chkuuid = config['uuid'][0] chkmajor = config['major'][0] chkdistance = config['distance'][0] #CSVファイルからのMinor値読出処理 def get_confirm_list(): with open(CONFIMCSV) as f: global confirms reader=csv.reader(f, quoting=csv.QUOTE_NONNUMERIC) confirms=[row for row in reader] #受信したパケットのparseと内容確認 def parse_and_check(sock, loop_count=100): old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14) flt = bluez.hci_filter_new() bluez.hci_filter_all_events(flt) bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT) sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt ) for i in range(0, loop_count): packet = sock.recv(255) packetOffset = 0 dataString = packetToString(packet) type = dataString[38:46] #iBeaconのパケットであるかを確認 if dataString[38:46] == IBEACONTYPE: #UUIDを取得 uuid = dataString[46:54] + "-" + dataString[54:58] + "-" + dataString[58:62] + "-" + dataString[62:66] + "-" + dataString[66:78] #Majorを取得 major = dataString[78:82] majorVal = int("".join(major.split()[::-1]), 16) #Minorを取得 minor = dataString[82:86] minorVal = int("".join(minor.split()[::-1]), 16) #txpowerを取得 txpower = dataString[86:88] txpowerVal = s8(int("".join(txpower.split()[::-1]),16)) #rssiを取得 if sys.version_info[0] == 3: rssi, = struct.unpack("b", bytes([packet[packetOffset-1]])) else: rssi, = struct.unpack("b", packet[packetOffset-1]) #取得したデータを確認する処理 chk_beacondata(uuid,majorVal,minorVal,txpowerVal,rssi) def chk_beacondata(uuid,majorVal,minorVal,txpowerVal,rssi): global chkuuid global chkmajor if uuid == chkuuid and majorVal == chkmajor: if chk_confirm(minorVal) == 0: chk_distance(txpowerVal,rssi) #Minorを事前に登録した許可済のものと照合 def chk_confirm(minorVal): confirm = 0 global confirms for i in confirms[0]: if minorVal == int(i): confirm = 1; return confirm #rssiとtxpowerから距離計算をして閾値と比較 def chk_distance(txpowerVal,rssi): distance = pow(10.0, (txpowerVal - rssi) / 20.0) global chkdistance global st if distance < chkdistance: get_start_time() wake_alerm() def get_setting(): get_setting_data() get_confirm_list() get_start_time() try: sock = open_dev(DEV_ID) return sock except: print ("Error accessing bluetooth") #警報発報からの経過時間を確認し、一定時間経過していたら解除する def chk_timer(): nt =time.time() global st pt = nt - st global cancellation if pt > cancellation: sleep_alerm() st = nt def start_scan(sock): hci_le_set_scan_parameters(sock) hci_enable_le_scan(sock) try: while True: parse_and_check(sock, 10) chk_timer() except KeyboardInterrupt: hci_disable_le_scan(sock) pass #メイン処理 sock = get_setting() start_scan(sock)
実行
上記ソースコードを任意のファイル名(拡張子py)で保存し、
以下のように実行すると動作します。(仮にscanner.pyというファイル名にしています。)
armadillo:~# python3 scanner.py