10 Commits
v0.2.7 ... dbus

Author SHA1 Message Date
Shin'ichiro Kawasaki
2482bba1d8 ble: Separate BLEDBusSession class to a separated new file ble.py
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-18 16:51:43 +09:00
Shin'ichiro Kawasaki
06df4af4a0 BLEDBusSession: Create Device class to gather device attributes
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-18 15:31:46 +09:00
Shin'ichiro Kawasaki
adb746995e BLEDbusSession: implement multiple devices support and disconnection
Also implemented service search retry to stabilize the first read
operation. Next work is to create a class to gather device attirbutes
that BLEDBusSession refers.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-10 20:32:03 +09:00
Shin'ichiro Kawasaki
157e3458b0 BLEDBusSession: implement write method
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-05 21:20:18 +09:00
Shin'ichiro Kawasaki
fe3cd35ac0 BLEDbusSession: implement notification reader
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-05 21:08:15 +09:00
Shin'ichiro Kawasaki
94a9555c4c BLEDBusSession: get GATT services and characteristics
Using DBus interface, get GATT services and characteristics of the
connected BLE device. Also implement read request to a characteristic.
2023-08-27 19:46:35 +09:00
Shin'ichiro Kawasaki
39b157e839 BLEDBusSession: introduce BTUUID class
bluepy provides its unique UUID class which handles UUIDs of Bluetooth
devices well. Not to depend on bluepy, introduce BTUUID which extends
python standard uuid.UUID class.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-08-27 11:27:51 +09:00
Shin'ichiro Kawasaki
f79e7ac889 BLEDBusSession: WIP
I implmented 'discovery' and 'connect' commands. Next action is to
implment 'read', 'write' and 'notify'. I'm not yet sure how DBus handles
'notify'.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-08-27 11:27:35 +09:00
Shin'ichiro Kawasaki
dc2ee5a22f Tag version 0.2.8
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-07-02 21:52:08 +09:00
Shin'ichiro Kawasaki
caa344ecbb scratch_link.py: Support COMPLETE_LOCAL_NAME for 'namePrefix' filter
Current implementation of match() does filter matching for 'namePrefix'
keyword only for SHORT_LOCAL_NAME of the device. When the device does
not provide SHORT_LOCAL_NAME but provides COMPLETE_LOCAL_NAME, match()
for 'namePrefix' fails. For example, MicroBit More v2 [1] does not
provide the SHORT_LOCAL_NAME.

To support 'namePrefix' filter match with COMPLETE_LOCAL_NAME, add
support for it. While at it, replace the SHORT_LOCAL_NAME identifier
constant with the definition in btle.ScanEntry.

[1] https://lab.yengawa.com/project/microbit-more-v2/

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-07-02 21:52:08 +09:00
4 changed files with 451 additions and 25 deletions

View File

@@ -170,6 +170,10 @@ Please file issues to [GitHub issue tracker](https://github.com/kawasaki/pyscrli
Releases Releases
-------- --------
Release 0.2.8
* Supported Microbit More v2
Release 0.2.7 Release 0.2.7
* Supported Snap Firefox and Chromium * Supported Snap Firefox and Chromium

374
pyscrlink/ble.py Normal file
View File

@@ -0,0 +1,374 @@
from sdbus import DbusInterfaceCommonAsync, SdBus, sd_bus_open_system
from sdbus.dbus_proxy_async_interfaces import DbusIntrospectableAsync
import xml.etree.ElementTree as ET
from sdbus_async.bluez.adapter_api import AdapterInterfaceAsync
from sdbus_async.bluez.device_api import DeviceInterfaceAsync
from sdbus_async.bluez.gatt_api import (
GattCharacteristicInterfaceAsync,
GattServiceInterfaceAsync,
)
import asyncio
import base64
from asyncio import sleep
from os import dup, fdopen, close
import pyscrlink.scratch_link
from pyscrlink.scratch_link import BTUUID
import logging
logger = logging.getLogger('pyscrlink.scratch_link')
class BLEDBusSession(pyscrlink.scratch_link.Session):
"""
Manage a session for Bluetooth Low Energy device such as micro:bit using
DBus as backend.
"""
INITIAL = 1
DISCOVERY = 2
CONNECTED = 3
DONE = 4
MAX_SCANNER_IF = 3
connected_devices = {}
class Device():
def __init__(self, interface, path, node_name, name, address):
self.interface = interface
self.path = path
self.node_name = node_name
self.name = name
self.address = address
class Notification():
def __init__(self, loop, acquired_fd, fd, fp, params):
self.loop = loop
self.acquired_fd = acquired_fd
self.fd = fd
self.fp = fp
self.params = params
def close(self):
self.loop.remove_reader(self.fd)
self.fp.close()
def _connect_to_adapters(self):
self.iface = None
self.adapter = None
self.adapter_introspect = None
adapter = AdapterInterfaceAsync()
for i in range(self.MAX_SCANNER_IF):
iface = '/org/bluez/hci' + str(i)
logger.debug(f"try connect to {iface}")
try:
adapter._connect('org.bluez', iface, bus=self.dbus)
logger.debug(f"connected to {iface}")
adapter_introspect = DbusIntrospectableAsync()
adapter_introspect._connect('org.bluez', iface, bus=self.dbus)
self.iface = iface
self.adapter = adapter
self.adapter_introspect = adapter_introspect
return
except Exception as e:
logger.error(e)
raise Exception("no adapter is available")
async def _start_discovery(self):
logger.debug(f"Starting discovery... {self.adapter}")
assert not self.discovery_running
await self.adapter.start_discovery()
self.discovery_running = True
asyncio.create_task(self._find_devices())
asyncio.create_task(self._stop_discovery())
logger.debug(f"Task to stop discovery has got created.")
async def _matches(self, dev, filters):
"""
Check if the found BLE device matches the filters Scratch specifies.
"""
logger.debug(f"in matches {dev} {filters}")
for f in filters:
if 'services' in f:
for s in f['services']:
logger.debug(f"service to check: {s}")
given_uuid = BTUUID(s)
logger.debug(f"given UUID: {given_uuid} hash={given_uuid.__hash__()}")
dev_uuids = await dev.interface.uuids
if not dev_uuids:
logger.debug(f"dev UUID not available")
continue
for uuid in dev_uuids:
u = BTUUID(uuid)
logger.debug(f"dev UUID: {u} hash={u.__hash__()}")
logger.debug(given_uuid == u)
if given_uuid == u:
logger.debug("match...")
return True
if 'namePrefix' in f:
logger.debug(f"given namePrefix: {f['namePrefix']}")
if dev.name:
logger.debug(f"name: {dev. name}")
if dev.name.startswith(f['namePrefix']):
logger.debug(f"match...")
return True
if 'name' in f or 'manufactureData' in f:
logger.error("name/manufactureData filters not implemented")
# TODO: implement other filters defined:
# ref: https://github.com/LLK/scratch-link/blob/develop/Documentation/BluetoothLE.md
return False
async def _notify_device(self, device) -> None:
params = { 'rssi': -80, 'name': 'Unknown' }
try:
params['rssi'] = await device.interface.rssi
except Exception:
None
if device.name:
params['name'] = device.name
params['peripheralId'] = device.node_name
await self._send_notification('didDiscoverPeripheral', params)
async def _find_devices(self) -> None:
assert self.discovery_running
while self.discovery_running:
await sleep(1)
s = await self.adapter_introspect.dbus_introspect()
parser = ET.fromstring(s)
nodes = parser.findall("./node")
if not nodes:
logger.info("device not found")
continue
logger.debug(f"{len(nodes)} device(s) found")
for node in nodes:
node_name = node.attrib['name']
logger.debug(f" {node_name}")
if self.found_devices.get(node_name):
continue
devpath = self.iface + "/" + node_name
if BLEDBusSession.connected_devices.get(devpath):
continue
interface = DeviceInterfaceAsync()
interface._connect('org.bluez', devpath, bus=self.dbus)
try:
devname = await interface.name
except Exception as e:
logger.debug(f"device {node_name} does not have name: {e}")
devaddr = await interface.address
device = self.Device(interface, devpath, node_name, devname,
devaddr)
if not await self._matches(device, self.discover_filters):
await interface.disconnect()
continue
self.found_devices[node_name] = device
await self._notify_device(device)
logger.debug("end _find_device.")
async def _stop_discovery(self) -> None:
assert self.discovery_running
logger.debug(f"Wait discovery for {self.scan_seconds} seconds")
await sleep(self.scan_seconds)
logger.debug(f"Stopping discovery... {self.adapter}")
self.discovery_running = False
await self.adapter.stop_discovery()
def __init__(self, websocket, loop, scan_seconds):
super().__init__(websocket, loop, scan_seconds)
logger.debug("dbus init")
self.status = self.INITIAL
self.dbus = sd_bus_open_system()
self.discovery_running = False
self.iface = None
self.services = {}
self.chars = {}
self.chars_cache = {}
self.notifications = {}
self._connect_to_adapters()
self.found_devices = {}
async def _get_characteristics(self, service_path):
service_introspect = DbusInterfaceCommonAsync()
service_introspect._connect('org.bluez', service_path, bus=self.dbus)
s = await service_introspect.dbus_introspect()
parser = ET.fromstring(s)
nodes = parser.findall("./node")
if not nodes:
logger.error(f"characteristic not found at {service_path}")
return
for node in nodes:
path = service_path + '/' + node.attrib['name']
if self.chars.get(path):
continue
logger.debug(f"getting GATT characteristic at {path}")
char = GattCharacteristicInterfaceAsync()
char._connect('org.bluez', path, bus=self.dbus)
self.chars[path] = char
cid = await char.uuid
logger.debug(f"found char {cid}")
async def _get_services(self):
# do D-Bus introspect to the device path and get service paths under it
for i in range(5):
dev_introspect = DbusInterfaceCommonAsync()
dev_introspect._connect('org.bluez', self.device.path,
bus=self.dbus)
s = await dev_introspect.dbus_introspect()
parser = ET.fromstring(s)
nodes = parser.findall("./node")
if nodes:
break
else:
logger.error("Service not found. Try again.")
await sleep(1)
if not nodes:
return []
for node in nodes:
path = self.device.path + '/' + node.attrib['name']
if self.services.get(path):
continue
logger.debug(f"getting GATT service at {path}")
service = GattServiceInterfaceAsync()
service._connect('org.bluez', path, bus=self.dbus)
self.services[path] = service
sid = await service.uuid
logger.debug(f"found service {sid}")
await self._get_characteristics(path)
async def _get_char(self, id):
char = self.chars_cache.get(id)
if char:
return char
for i in range(5):
await self._get_services()
btuuid = BTUUID(id)
for char in self.chars.values():
raw_uuid = await char.uuid
char_uuid = BTUUID(raw_uuid)
if char_uuid == btuuid:
self.chars_cache[id] = char
return char
logger.error(f"Can not get characteristic: {id}. Retry.")
logger.error(f"Abandoned to get characteristic: {id}.")
return None
async def _start_notification(self, sid, cid, char):
logger.debug('startNotification')
(acquired_fd, mtu) = await char.acquire_notify({})
fd = dup(acquired_fd)
fp = fdopen(fd, mode='rb', buffering=0, newline=None)
self.loop.add_reader(fd, self._read_notification, fd)
notification = self.Notification(self.loop, acquired_fd, fd, fp, {
'serviceId': sid,
'characteristicId': cid,
'encoding': 'base64'
})
self.notifications[fd] = notification
logger.debug(f'added notification reader: {notification}')
def _stop_notifications(self):
for n in self.notifications.values():
n.close()
def _read_notification(self, *args):
fd = args[0]
notification = self.notifications[fd]
data = notification.fp.read()
if len(data) == 0:
logger.debug(f'empty notification data')
asyncio.create_task(self.async_close())
return
params = notification.params.copy()
params['message'] = base64.standard_b64encode(data).decode('ascii')
self.loop.create_task(self._send_notification('characteristicDidChange', params))
def handle_request(self, method, params):
logger.debug("handle request")
async def async_handle_request(self, method, params):
logger.debug(f"async handle request: {method} {params}")
res = { "jsonrpc": "2.0" }
err_msg = None
if self.status == self.INITIAL and method == 'discover':
self.discover_filters = params['filters']
logger.debug(f"discover: {self.discover_filters}")
try:
await self._start_discovery()
logger.debug(f"discover started: {self.discover_filters}")
res["result"] = None
self.status = self.DISCOVERY
except Exception as e:
res["error"] = { "message": "Failed to start device discovery" }
self.status = self.DONE
elif self.status == self.DISCOVERY and method == 'connect':
logger.debug("connecting to the BLE device")
dev = self.found_devices[params['peripheralId']]
try:
logger.debug(f" {dev}")
await dev.interface.connect()
res["result"] = None
self.device = dev
self.status = self.CONNECTED
logger.info(f"Connected: '{dev.name}'@{dev.address}")
BLEDBusSession.connected_devices[dev.path] = dev
except NotImplementedError as e:
logger.error(e)
res["error"] = { "message": "Failed to connect to device" }
self.status = self.DONE
except Exception as e:
logger.error(f"failed to connect: {e}")
res["error"] = { "message": "Failed to connect to device" }
self.status = self.DONE
elif self.status == self.CONNECTED and method == 'read':
logger.debug("handle read request")
service_id = params['serviceId']
chara_id = params['characteristicId']
c = await self._get_char(chara_id)
value = await c.read_value({})
message = base64.standard_b64encode(value).decode('ascii')
res['result'] = { 'message': message, 'encode': 'base64' }
if params.get('startNotifications') == True:
await self._start_notification(service_id, chara_id, c)
elif self.status == self.CONNECTED and method == 'write':
logger.debug(f"handle write request {params}")
service_id = params['serviceId']
chara_id = params['characteristicId']
c = await self._get_char(chara_id)
if params['encoding'] != 'base64':
logger.error("encoding other than base 64 is not "
"yet supported: ", params['encoding'])
else:
msg_bstr = params['message'].encode('ascii')
data = base64.standard_b64decode(msg_bstr)
await c.write_value(data, {})
res['result'] = len(data)
logger.debug(res)
return res
def end_request(self):
logger.debug("end request")
return False
async def async_close(self):
if not self.device:
return
dev = self.device
logger.info(f"Disconnecting from '{dev.name}'@{dev.address}")
self._stop_notifications()
await dev.interface.disconnect()
BLEDBusSession.connected_devices.pop(dev.path)
logger.info(f"Disconnected from '{dev.name}'@{dev.address}")
self.device = None
await self.websocket.close()
return
def close(self):
logger.debug("close")
return

View File

@@ -10,6 +10,7 @@ import ssl
import websockets import websockets
import socket import socket
import json import json
import uuid
import base64 import base64
import logging import logging
import sys import sys
@@ -18,21 +19,24 @@ import traceback
import argparse import argparse
# for BLESession (e.g. BBC micro:bit) # for BLESession (e.g. BBC micro:bit)
from bluepy.btle import Scanner, UUID, Peripheral, DefaultDelegate from bluepy.btle import Scanner, UUID, Peripheral, DefaultDelegate, ScanEntry
from bluepy.btle import BTLEDisconnectError, BTLEManagementError from bluepy.btle import BTLEDisconnectError, BTLEManagementError
from pyscrlink import bluepy_helper_cap from pyscrlink import bluepy_helper_cap
import threading import threading
import time import time
import queue import queue
from asyncio import sleep
# for websockets certificate # for websockets certificate
from pyscrlink import gencert from pyscrlink import gencert
from pyscrlink import ble
logLevel = logging.INFO logLevel = logging.INFO
# for logging # for logging
logger = logging.getLogger(__name__) logger = logging.getLogger('pyscrlink.scratch_link')
formatter = logging.Formatter(fmt='%(asctime)s %(message)s') formatter = logging.Formatter(fmt='%(asctime)s %(message)s')
handler = logging.StreamHandler() handler = logging.StreamHandler()
handler.setLevel(logLevel) handler.setLevel(logLevel)
@@ -44,11 +48,30 @@ logger.propagate = False
HOSTNAME="device-manager.scratch.mit.edu" HOSTNAME="device-manager.scratch.mit.edu"
scan_seconds=10.0 scan_seconds=10.0
class BTUUID(uuid.UUID):
BLUETOOTH_BASE_UUID = "00001000800000805F9B34FB"
def __init__(self, val):
if isinstance(val, int):
if (val < 0) or (val > 0xFFFFFFFF):
raise ValueError(
"Short form UUIDs must be in range 0..0xFFFFFFFF")
val = "%04X" % val
else:
val = str(val)
val = val.replace("-", "")
if len(val) <= 8: # Short form
val = ("0" * (8 - len(val))) + val + self.BLUETOOTH_BASE_UUID
uuid.UUID.__init__(self, val)
class Session(): class Session():
"""Base class for BTSession and BLESession""" """Base class for BTSession and BLESession"""
def __init__(self, websocket, loop): def __init__(self, websocket, loop, scan_seconds):
self.websocket = websocket self.websocket = websocket
self.loop = loop self.loop = loop
self.scan_seconds = scan_seconds
self.lock = threading.RLock() self.lock = threading.RLock()
self.notification_queue = queue.Queue() self.notification_queue = queue.Queue()
@@ -67,6 +90,10 @@ class Session():
if jsonreq['jsonrpc'] != '2.0': if jsonreq['jsonrpc'] != '2.0':
logger.error("error: jsonrpc version is not 2.0") logger.error("error: jsonrpc version is not 2.0")
return True return True
if type(self) is ble.BLEDBusSession:
jsonres = await self.async_handle_request(jsonreq['method'],
jsonreq['params'])
else:
jsonres = self.handle_request(jsonreq['method'], jsonreq['params']) jsonres = self.handle_request(jsonreq['method'], jsonreq['params'])
if 'id' in jsonreq: if 'id' in jsonreq:
jsonres['id'] = jsonreq['id'] jsonres['id'] = jsonreq['id']
@@ -81,6 +108,10 @@ class Session():
"""Default request handler""" """Default request handler"""
logger.debug(f"default handle_request: {method}, {params}") logger.debug(f"default handle_request: {method}, {params}")
async def async_handle_request(self, method, params):
"""Default async request handler"""
logger.debug(f"default async handle_request: {method}, {params}")
def end_request(self): def end_request(self):
""" """
Default callback at request end. This callback is required to Default callback at request end. This callback is required to
@@ -120,11 +151,17 @@ class Session():
break break
await self._send_notifications() await self._send_notifications()
logger.debug("in handle loop") logger.debug("in handle loop")
except websockets.ConnectionClosedError as e: except (websockets.ConnectionClosedOK, websockets.ConnectionClosedError) as e:
logger.info("scratch closed session") logger.info("scratch closed session")
logger.debug(e) logger.debug(e)
if type(self) is ble.BLEDBusSession:
await self.async_close()
else:
self.close() self.close()
break break
except Exception as e:
t = type(e)
logger.info(f"scratch closed with unkown exception: {e}: {t}")
def close(self): def close(self):
""" """
@@ -137,7 +174,8 @@ class BTSession(Session):
class BLESession(Session): class BLESession(Session):
""" """
Manage a session for Bluetooth Low Energy device such as micro:bit Manage a session for Bluetooth Low Energy device such as micro:bit using
bluepy as backend.
""" """
INITIAL = 1 INITIAL = 1
@@ -185,6 +223,7 @@ class BLESession(Session):
time.sleep(1) time.sleep(1)
elif self.session.status == self.session.CONNECTED: elif self.session.status == self.session.CONNECTED:
logger.debug("in connected status:") logger.debug("in connected status:")
delegate = self.session.delegate delegate = self.session.delegate
if delegate and len(delegate.handles) > 0: if delegate and len(delegate.handles) > 0:
if not delegate.restart_notification_event.is_set(): if not delegate.restart_notification_event.is_set():
@@ -233,8 +272,8 @@ class BLESession(Session):
params['message'] = base64.standard_b64encode(data).decode('ascii') params['message'] = base64.standard_b64encode(data).decode('ascii')
self.session.notify('characteristicDidChange', params) self.session.notify('characteristicDidChange', params)
def __init__(self, websocket, loop): def __init__(self, websocket, loop, scan_seconds):
super().__init__(websocket, loop) super().__init__(websocket, loop, scan_seconds)
self.status = self.INITIAL self.status = self.INITIAL
self.device = None self.device = None
self.deviceName = None self.deviceName = None
@@ -275,7 +314,7 @@ class BLESession(Session):
""" """
Check if the found BLE device matches the filters Scratch specifies. Check if the found BLE device matches the filters Scratch specifies.
""" """
logger.debug(f"in matches {dev.addr} {filters}") logger.debug(f"in matches {dev.address} {filters}")
for f in filters: for f in filters:
if 'services' in f: if 'services' in f:
for s in f['services']: for s in f['services']:
@@ -292,15 +331,19 @@ class BLESession(Session):
logger.debug("match...") logger.debug("match...")
return True return True
if 'namePrefix' in f: if 'namePrefix' in f:
# 0x08: Shortened Local Name logger.debug(f"given namePrefix: {f['namePrefix']}")
deviceName = dev.getValueText(0x08) deviceName = dev.getValueText(ScanEntry.SHORT_LOCAL_NAME)
if not deviceName: if deviceName:
continue logger.debug(f"SHORT_LOCAL_NAME: {deviceName}")
logger.debug(f"Name of \"{deviceName}\" begins with: \"{f['namePrefix']}\"?") if deviceName.startswith(f['namePrefix']):
if(deviceName.startswith(f['namePrefix'])): logger.debug(f"match...")
logger.debug("Yes") return True
deviceName = dev.getValueText(ScanEntry.COMPLETE_LOCAL_NAME)
if deviceName:
logger.debug(f"COMPLETE_LOCAL_NAME: {deviceName}")
if deviceName.startswith(f['namePrefix']):
logger.debug(f"match...")
return True return True
logger.debug("No")
if 'name' in f or 'manufactureData' in f: if 'name' in f or 'manufactureData' in f:
logger.error("name/manufactureData filters not implemented") logger.error("name/manufactureData filters not implemented")
# TODO: implement other filters defined: # TODO: implement other filters defined:
@@ -308,7 +351,6 @@ class BLESession(Session):
return False return False
def _scan_devices(self, params): def _scan_devices(self, params):
global scan_seconds
if BLESession.nr_connected > 0: if BLESession.nr_connected > 0:
return len(BLESession.found_devices) > 0 return len(BLESession.found_devices) > 0
found = False found = False
@@ -320,8 +362,8 @@ class BLESession(Session):
scanner = Scanner(iface=i) scanner = Scanner(iface=i)
for j in range(scan_retry): for j in range(scan_retry):
try: try:
logger.debug(f"start BLE scan: {scan_seconds} seconds") logger.debug(f"start BLE scan: {self.scan_seconds} seconds")
devices = scanner.scan(scan_seconds) devices = scanner.scan(self.scan_seconds)
for dev in devices: for dev in devices:
if self.matches(dev, params['filters']): if self.matches(dev, params['filters']):
BLESession.found_devices.append(dev) BLESession.found_devices.append(dev)
@@ -514,11 +556,13 @@ class BLESession(Session):
return self.status == self.DONE return self.status == self.DONE
async def ws_handler(websocket, path): async def ws_handler(websocket, path):
sessionTypes = { '/scratch/ble': BLESession, '/scratch/bt': BTSession } global scan_seconds
sessionTypes = { '/scratch/bt': BTSession }
sessionTypes['/scratch/ble'] = ble.BLEDBusSession if dbus else BLESession
try: try:
logger.info(f"Start session for web socket path: {path}") logger.info(f"Start session for web socket path: {path}")
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
session = sessionTypes[path](websocket, loop) session = sessionTypes[path](websocket, loop, scan_seconds)
await session.handle() await session.handle()
except Exception as e: except Exception as e:
logger.error(f"Failure in session for web socket path: {path}") logger.error(f"Failure in session for web socket path: {path}")
@@ -542,6 +586,7 @@ def stack_trace():
def main(): def main():
global scan_seconds global scan_seconds
global scan_retry global scan_retry
global dbus
parser = argparse.ArgumentParser(description='start Scratch-link') parser = argparse.ArgumentParser(description='start Scratch-link')
parser.add_argument('-d', '--debug', action='store_true', parser.add_argument('-d', '--debug', action='store_true',
help='print debug messages') help='print debug messages')
@@ -549,6 +594,8 @@ def main():
help='specifiy duration to scan BLE devices in seconds') help='specifiy duration to scan BLE devices in seconds')
parser.add_argument('-r', '--scan_retry', type=int, default=1, parser.add_argument('-r', '--scan_retry', type=int, default=1,
help='specifiy retry times to scan BLE devices') help='specifiy retry times to scan BLE devices')
parser.add_argument('-b', '--dbus', action='store_true',
help='use DBus backend for BLE devices')
args = parser.parse_args() args = parser.parse_args()
if args.debug: if args.debug:
print("Print debug messages") print("Print debug messages")
@@ -557,6 +604,7 @@ def main():
logger.setLevel(logLevel) logger.setLevel(logLevel)
scan_seconds = args.scan_seconds scan_seconds = args.scan_seconds
scan_retry = args.scan_retry scan_retry = args.scan_retry
dbus = args.dbus
logger.debug(f"set scan_seconds: {scan_seconds}") logger.debug(f"set scan_seconds: {scan_seconds}")
logger.debug(f"set scan_retry: {scan_retry}") logger.debug(f"set scan_retry: {scan_retry}")

View File

@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="pyscrlink", name="pyscrlink",
version="0.2.7", version="0.2.8",
author="Shin'ichiro Kawasaki", author="Shin'ichiro Kawasaki",
author_email='kawasaki@juno.dti.ne.jp', author_email='kawasaki@juno.dti.ne.jp',
description='Scratch-link for Linux with Python', description='Scratch-link for Linux with Python',