from bluepy.btle import Peripheral, DefaultDelegate, \ Scanner, BTLEException, UUID import bluepy.btle from threading import Thread, Timer import time import sys import struct import argparse import csv import datetime import os ids = {'0c4c3001-7700-46f4-aa96-d5e974e32a54'} CSV_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) + "/log" ADDR_TYPE_PUBLIC = "public" ADDR_TYPE_RANDOM = "random" Debugging = False def DBG(*args): if Debugging: msg = " ".join([str(a) for a in args]) sys.stdout.flush() Verbose = False def MSG(*args): if Verbose: msg = " ".join([str(a) for a in args]) sys.stdout.flush() def timeoutRetry(addr): MSG('timer expired (%s)' % addr) devThread = scannedDevs[addr] devThread.forceDisconnect() MSG('Thread disconnected (%s)' % addr) def csvHeader(): strHead = "Sequence_number",\ "Temperature",\ "Relative_humidity",\ "Ambient_light",\ "UV",\ "Barometric_pressure",\ "Sound_noise",\ "Discomfort_index",\ "Heat_stroke_risk",\ "Batt" return strHead def csvFormat(seq, temp, humid, light, uv, press, noise, discomfort, heat, batt): strData = str(seq),\ str(temp),\ str(humid),\ str(light),\ str(uv),\ str(press),\ str(noise),\ str(discomfort),\ str(heat),\ str(batt) return strData def writeCSV(dataRow): print("-----------------------------------") (seq, temp) = struct.unpack_from('Bh', dataRow,0) (temp, humid, light, uv, press, noise, discomfort, heat, batt) = \ struct.unpack_from('hhhhhhhhH',dataRow,1) f = open(csvPath,'a') writer = csv.writer(f,lineterminator='\n') data = csvFormat(seq, temp / 100 , humid / 100 , light, \ uv / 100 , press / 10 , noise /100 , discomfort / 100 , heat / 100 , batt) writer.writerow(data) f.close() print('write to csv.') print("-----------------------------------") return class EnvSensor(Thread, Peripheral): def __init__(self, dev ,uuid): Peripheral.__init__(self) Thread.__init__(self) self.setDaemon(True) self.dev = dev self.isConnected = False self.uuid = uuid def run(self): while True: t = Timer(30, timeoutRetry, [self.dev.addr]) t.start() while self.isConnected == False: try: self.connect(self.dev,ADDR_TYPE_RANDOM) self.isConnected = True except BTLEException as e: MSG('BTLE ExcePtion while connect on ', self.dev.addr) MSG('type:' + str(type(e))) MSG('args:' + str(e.args)) MSG('connected to ', self.dev.addr) try: latestDataRow = self.getCharacteristics(uuid=self.uuid)[0] dataRow = latestDataRow.read() writeCSV(dataRow) t.cancel() time.sleep(60.0) except BTLEException as e: MSG('BTLE Exception while getCharacteristics on ', self.dev.addr ) MSG('type:' + str(type(e))) MSG('args:' + str(e.args)) self.disconnect() self.isConnected = False t.cancel() def forceDisconnect(self): if self.isConnected: self.disconnect() self.isConnected = False scannedDevs = {} class OmronScan(DefaultDelegate): def __init__(self): self.devs = [UUID(i) for i in ids] def handleDiscovery(self, dev, isNewDev, isNewData): for i in dev.getScanData(): if BDaddr == dev.addr: if i[2] == 'EP': if dev.addr in scannedDevs.keys(): return MSG('New %s' % (dev.addr)) devThread = EnvSensor(dev,self.devs[0]) scannedDevs[dev.addr] = devThread devThread.start() def fileCreate(): f = open(csvPath, 'w') writer = csv.writer(f) writer.writerow(csvHeader()) f.close() def main(): parser = argparse.ArgumentParser() parser.add_argument('-d',action='store_true', help='debug msg on') parser.add_argument('-v',action='store_true', help='verbose msg on') parser.add_argument('BDaddr', help='target BDaddr') args = parser.parse_args(sys.argv[1:]) global Debugging Debugging = args.d bluepy.btle.Debugging = args.d global Verbose if args.v: Verbose = True global BDaddr BDaddr = args.BDaddr.lower() if not os.path.isdir(CSV_DIR_PATH): os.makedirs(CSV_DIR_PATH) global csvPath filename = ('gatt_data_output_%s.csv' % (datetime.date.today())) csvPath = CSV_DIR_PATH + "/" + filename fileCreate() scanner = Scanner().withDelegate(OmronScan()) while True: try: scanner.scan(5.0) except BTLEException: MSG('BTLE Exception while scannning.') if __name__ == "__main__": main()