下記記事でビーコンデータを収集するアプリケーションを動作させましたが、
こちらを応用して、特定のビーコンがArmadillo-640に接近したら警報を出力するシステムを構築しました。
Armadillo-640:BT/THオプションモジュールを使用してビーコンデータを収集する
機器構成
- Armadillo-640
- BT/THオプションモジュール
- 警報装置:IWT120-USB(東京デバイセズ)
USBでArmadillo-640と接続します。 - USBハブ
USBの相性により、警報装置とArmadillo-640を直接接続しても使用出来なかったため、
HUBを介して接続しています。 - ビーコン:iBeaconタイプのもの
このデモでは、 Feasycom社製FSC-BP03を使用
アプリケーション
事前準備
警報出力処理の準備
以下をダウンロードする
IWT120-USB 制御コマンド(Linux) 0.0.1
解凍し、README.txtに従ってビルドする
*ビルドの際にはlibusb-devが必要になるので事前にインストールしておいてください。
BLE通信機能
以下を実行し、必要なライブラリ・モジュールをインストールしておく
armadillo:~# apt-get install python3-pip python3-dev ipython
armadillo:~# apt-get install bluetooth libbluetooth-dev python3-yaml
armadillo:~# pip3 install pybluez
機能概要
BLEパケット受信
BLEパケットを受信する
受信したパケットデータのフィルタリング
iBeaconタイプのアドバタイジングパケットのみをフィルタリングする
UUID・Majorを元に対象のビーコンのパケットのみをフィルタリングする
ビーコンとの距離推定
txPowerとRSSIから距離を推定する
d = 10 ^ ((TxPower - RSSI) / 20)
警報動作
距離が一定の閾値よりも近くなると警報を発報する
IWT120-USBの警報発報コマンドを実行する
警報発報後、一定時間ビーコンが近距離に無ければ警報を解除する
例外処理
侵入許可リストにあるビーコンについては、近づいても警報を発報しない
設定変更
UUID・Major・警報発報距離・警報解除までの時間は設定ファイルにて変更できるものとする
→起動時に読み込んで動作する。設定変更の場合はアプリケーション再起動が必要
侵入許可リストはCSVファイルにて作成できるものとする
侵入を許可してもよいビーコンのMinor値を入力しておく
設定
検出するビーコンのUUID・Major、警報を発報する距離、警報解除までの秒数を設定出来るようにしています。
以下のフォーマットでファイルへ記載すると読み込んで動作します。
下記サンプルコードでは、/root直下の「BeaconScan.conf」という名前のファイルを設定ファイルとして読み込むようにしています。
コードを変更することで設定ファイル名や配置箇所も変更できます。
uuid:
- XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
major:
- XX
#距離(m)
distance:
- 0.1
#解除までの秒数
cancellation:
- 30
許可リスト
同じUUID・Majorのビーコンのうち、接近を許可したいものについては予めMinorをリストに登録しておくことで、
接近しても警報を発報しないように出来ます。
カンマ区切りのCSVファイルに記載することで登録可能です。
サンプルコードでは/root直下の「confirm.csv」という名前のファイルを読み込むようにしています。
設定ファイルと同様、コードを変更することで設定ファイル名や配置箇所も変更できます。
ソースコード
import sys
import struct
import time
import yaml
import csv
import os
import bluetooth._bluetooth as bluez
OGF_LE_CTL=0x08
OCF_LE_SET_SCAN_ENABLE=0x000C
OGF_LE_SET_SCAN_PARAMETERS=0x000b
#iBeaconを識別する値
IBEACONTYPE="4c000215"
#設定ファイルのファイルパス
CONFIGFILE='/root/BeaconScan.conf'
#許可リストCSVファイルのファイルパス
CONFIMCSV='/root/confirm.csv'
DEV_ID=0
#グローバル変数定義
st = 0
chkuuid=""
chkmajor=0
chkdistacnce=0
confirms=[]
cancellation=0
def open_dev(dev_id):
sock=bluez.hci_open_dev(dev_id)
return sock
def hci_enable_le_scan(sock):
hci_toggle_le_scan(sock, 0x01)
def hci_disable_le_scan(sock):
hci_toggle_le_scan(sock, 0x00)
def hci_toggle_le_scan(sock, enable):
cmd_pkt = struct.pack("<BB", enable, 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
def hci_le_set_scan_parameters(sock):
cmd_pkt = struct.pack("<BHHBB",0x01,0x0010,0x0010,0x00,0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OGF_LE_SET_SCAN_PARAMETERS, cmd_pkt)
def packetToString(packet):
if sys.version_info > (3, 0):
return ''.join('%02x' % struct.unpack("B", bytes([x]))[0] for x in packet)
else:
return ''.join('%02x' % struct.unpack("B", x)[0] for x in packet)
#バイナリを8ビット符号付整数に変換
def s8(value):
return -(value & 0b10000000) | (value & 0b01111111)
def get_start_time():
global st
st =time.time()
#警報発報処理
#iwt120-USBの制御コマンド実行
def wake_alerm():
os.system('/root/iwt120ctl/iwt120ctl set ANY 32')
#警報停止処理
#iwt120-USBの制御コマンド実行
def sleep_alerm():
os.system('/root/iwt120ctl/iwt120ctl set ANY 0')
#confファイルからの設定情報読出処理
def get_setting_data():
global chkuuid
global chkmajor
global chkdistance
with open(CONFIGFILE, 'r') as file:
config = yaml.load(file, Loader=yaml.SafeLoader)
chkuuid = config['uuid'][0]
chkmajor = config['major'][0]
chkdistance = config['distance'][0]
#CSVファイルからのMinor値読出処理
def get_confirm_list():
with open(CONFIMCSV) as f:
global confirms
reader=csv.reader(f, quoting=csv.QUOTE_NONNUMERIC)
confirms=[row for row in reader]
#受信したパケットのparseと内容確認
def parse_and_check(sock, loop_count=100):
old_filter = sock.getsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, 14)
flt = bluez.hci_filter_new()
bluez.hci_filter_all_events(flt)
bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
sock.setsockopt( bluez.SOL_HCI, bluez.HCI_FILTER, flt )
for i in range(0, loop_count):
packet = sock.recv(255)
packetOffset = 0
dataString = packetToString(packet)
type = dataString[38:46]
#iBeaconのパケットであるかを確認
if dataString[38:46] == IBEACONTYPE:
#UUIDを取得
uuid = dataString[46:54] + "-" + dataString[54:58] + "-" + dataString[58:62] + "-" + dataString[62:66] + "-" + dataString[66:78]
#Majorを取得
major = dataString[78:82]
majorVal = int("".join(major.split()[::-1]), 16)
#Minorを取得
minor = dataString[82:86]
minorVal = int("".join(minor.split()[::-1]), 16)
#txpowerを取得
txpower = dataString[86:88]
txpowerVal = s8(int("".join(txpower.split()[::-1]),16))
#rssiを取得
if sys.version_info[0] == 3:
rssi, = struct.unpack("b", bytes([packet[packetOffset-1]]))
else:
rssi, = struct.unpack("b", packet[packetOffset-1])
#取得したデータを確認する処理
chk_beacondata(uuid,majorVal,minorVal,txpowerVal,rssi)
def chk_beacondata(uuid,majorVal,minorVal,txpowerVal,rssi):
global chkuuid
global chkmajor
if uuid == chkuuid and majorVal == chkmajor:
if chk_confirm(minorVal) == 0:
chk_distance(txpowerVal,rssi)
#Minorを事前に登録した許可済のものと照合
def chk_confirm(minorVal):
confirm = 0
global confirms
for i in confirms[0]:
if minorVal == int(i):
confirm = 1;
return confirm
#rssiとtxpowerから距離計算をして閾値と比較
def chk_distance(txpowerVal,rssi):
distance = pow(10.0, (txpowerVal - rssi) / 20.0)
global chkdistance
global st
if distance < chkdistance:
get_start_time()
wake_alerm()
def get_setting():
get_setting_data()
get_confirm_list()
get_start_time()
try:
sock = open_dev(DEV_ID)
return sock
except:
print ("Error accessing bluetooth")
#警報発報からの経過時間を確認し、一定時間経過していたら解除する
def chk_timer():
nt =time.time()
global st
pt = nt - st
global cancellation
if pt > cancellation:
sleep_alerm()
st = nt
def start_scan(sock):
hci_le_set_scan_parameters(sock)
hci_enable_le_scan(sock)
try:
while True:
parse_and_check(sock, 10)
chk_timer()
except KeyboardInterrupt:
hci_disable_le_scan(sock)
pass
#メイン処理
sock = get_setting()
start_scan(sock)
実行
上記ソースコードを任意のファイル名(拡張子py)で保存し、
以下のように実行すると動作します。(仮にscanner.pyというファイル名にしています。)
armadillo:~# python3 scanner.py