3 Commits
dev ... v0.2.5

Author SHA1 Message Date
Shin'ichiro Kawasaki
7cc9ccac2e Tag version 0.2.5
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2021-11-21 14:25:56 +09:00
Shin'ichiro Kawasaki
4558ea43df BLESession: handle device UUIDs as list
In the debug for the issue #30, it turned out that the bluepy API
ScanEntry.getValueText() for adtypes may return multiple UUIDs
concatenated with comma. Current code assumes that the API returns
single UUID, then multiple UUIDs cause the "Error: Non-hexadecimal digit
found" for the comma.

To fix it, replace ScanEntry.getValueText() call with getValue(), and
handle the result as a list of UUIDs. Also rename the helper function
_get_dev_uuid() to _get_dev_uuids() accordingly.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2021-11-21 14:21:50 +09:00
Shin'ichiro Kawasaki
641b84a86e BLESession: Enrich logs for device UUID check
To debug the GitHub issue #30, add more debug logs to functions to check
UUID.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2021-11-21 14:21:42 +09:00
3 changed files with 13 additions and 46 deletions

View File

@@ -206,6 +206,10 @@ Please file issues to [GitHub issue tracker](https://github.com/kawasaki/pyscrli
Releases
--------
Release 0.2.5
* Fixed handling of multiple UUIDs for LEGO Boost
Release 0.2.4
* Added -s option to specify BLE scan duration

View File

@@ -47,8 +47,6 @@ logger.propagate = False
HOSTNAME="device-manager.scratch.mit.edu"
scan_seconds=10.0
CCCD_UUID = 0x2902
class Session():
"""Base class for BTSession and BLESession"""
def __init__(self, websocket, loop):
@@ -172,9 +170,6 @@ class BTSession(Session):
logger.debug(f"Found device {name} addr={address} class={device_class} rssi={rssi}")
major_class = (device_class & 0x1F00) >> 8
minor_class = (device_class & 0xFF) >> 2
if "LEGO Hub" in name:
minor_class = 1
logger.info(f"Pretend to be LEGO EV3 with LEGO Hub: class={major_class/minor_class}")
if major_class == self.major_class and minor_class == self.minor_class:
self.found_devices[address] = (name, device_class, rssi)
@@ -190,13 +185,7 @@ class BTSession(Session):
self.ping_time = None
def discover(self):
major = self.major_device_class
minor = self.minor_device_class
if major == 8 and minor == 1:
logger.info(f"Search LEGO Hub instead of LEGO EV3")
minor = 4
logger.debug(f"BT discover: class={major}/{minor}")
discoverer = self.BTDiscoverer(major, minor)
discoverer = self.BTDiscoverer(self.major_device_class, self.minor_device_class)
discoverer.find_devices(lookup_names=True)
while self.session.status == self.session.DISCOVERY and not discoverer.done and not self.cancel_discovery:
readable = select.select([discoverer], [], [], 0.5)[0]
@@ -214,7 +203,7 @@ class BTSession(Session):
def run(self):
while self.session.status != self.session.DONE:
logger.debug(f"loop in BT thread: session status={self.session.status}")
logger.debug("loop in BT thread")
current_time = int(round(time.time()))
if self.session.status == self.session.DISCOVERY and not self.cancel_discovery:
@@ -364,14 +353,6 @@ class BLESession(Session):
scan_lock = threading.RLock()
scan_started = False
@staticmethod
def _getDevName(dev):
"""
Get AdType 0x09 (Completed local name). If it is not available,
get AdType 0x08 (Shortened local name).
"""
return dev.getValueText(0x9) or dev.getValueText(0x8)
class BLEThread(threading.Thread):
"""
Separated thread to control notifications to Scratch.
@@ -391,7 +372,7 @@ class BLESession(Session):
for d in devices:
params = { 'rssi': d.rssi }
params['peripheralId'] = devices.index(d)
params['name'] = BLESession._getDevName(d)
params['name'] = d.getValueText(0x9) or d.getValueText(0x8)
self.session.notify('didDiscoverPeripheral', params)
time.sleep(1)
elif self.session.status == self.session.CONNECTED:
@@ -408,8 +389,7 @@ class BLESession(Session):
logger.debug("after waitForNotification")
logger.debug("released lock for waitForNotification")
except Exception as e:
logger.error(f"Exception in waitForNotifications: "
f"{type(e).__name__}: {e}")
logger.error(e)
self.session.close()
break
else:
@@ -433,8 +413,7 @@ class BLESession(Session):
self.restart_notification_event.set()
def add_handle(self, serviceId, charId, handle):
logger.debug(f"add handle for notification: "
f"{serviceId} {charId} {handle}")
logger.debug(f"add handle for notification: {handle}")
params = { 'serviceId': UUID(serviceId).getCommonName(),
'characteristicId': charId,
'encoding': 'base64' }
@@ -442,14 +421,6 @@ class BLESession(Session):
def handleNotification(self, handle, data):
logger.debug(f"BLE notification: {handle} {data}")
if handle not in self.handles:
logger.error(f"Notification with unknown handle: {handle}")
keys = list(self.handles.keys())
if keys and len(keys) == 1:
logger.debug(f"Debug: override {handle} with {keys[0]}")
handle = keys[0]
else:
return
params = self.handles[handle].copy()
params['message'] = base64.standard_b64encode(data).decode('ascii')
self.session.notify('characteristicDidChange', params)
@@ -514,7 +485,7 @@ class BLESession(Session):
return True
if 'namePrefix' in f:
# 0x08: Shortened Local Name
deviceName = self._getDevName(dev)
deviceName = dev.getValueText(0x08)
if not deviceName:
continue
logger.debug(f"Name of \"{deviceName}\" begins with: \"{f['namePrefix']}\"?")
@@ -630,7 +601,7 @@ class BLESession(Session):
elif self.status == self.DISCOVERY and method == 'connect':
logger.debug("connecting to the BLE device")
self.device = BLESession.found_devices[params['peripheralId']]
self.deviceName = self._getDevName(self.device)
self.deviceName = self.device.getValueText(0x9) or self.device.getValueText(0x8)
try:
self.perip = Peripheral(self.device)
logger.info(f"connected to the BLE peripheral: {self.deviceName}")
@@ -707,19 +678,11 @@ class BLESession(Session):
service = self._get_service(service_id)
c = self._get_characteristic(chara_id)
handle = c.getHandle()
# get CCCD or Client Characterstic Configuration Descriptor
cccd = None
for d in c.getDescriptors():
if d.uuid == UUID(CCCD_UUID):
cccd = d
if not cccd:
logger.error("Characteristic {char_id} does not have CCCD")
return
# prepare notification handler
self.delegate.add_handle(service_id, chara_id, handle)
# request notification to the BLE device
with self.lock:
self.perip.writeCharacteristic(cccd.handle, value, True)
self.perip.writeCharacteristic(handle + 1, value, True)
def startNotifications(self, service_id, chara_id):
logger.debug(f"start notification for {chara_id}")

View File

@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup(
name="pyscrlink",
version="0.2.4",
version="0.2.5",
author="Shin'ichiro Kawasaki",
author_email='kawasaki@juno.dti.ne.jp',
description='Scratch-link for Linux with Python',