mirror of
https://github.com/kawasaki/pyscrlink.git
synced 2025-09-06 17:50:20 +02:00
Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
2d489321d0 | ||
|
48e460b9c6 | ||
|
3cf61a145b | ||
|
3ffbde35b1 | ||
|
18e0818e98 | ||
|
266bcc98bd | ||
|
80b99f84a1 | ||
|
f8ce9e3089 |
@@ -47,6 +47,8 @@ logger.propagate = False
|
||||
HOSTNAME="device-manager.scratch.mit.edu"
|
||||
scan_seconds=10.0
|
||||
|
||||
CCCD_UUID = 0x2902
|
||||
|
||||
class Session():
|
||||
"""Base class for BTSession and BLESession"""
|
||||
def __init__(self, websocket, loop):
|
||||
@@ -170,6 +172,9 @@ class BTSession(Session):
|
||||
logger.debug(f"Found device {name} addr={address} class={device_class} rssi={rssi}")
|
||||
major_class = (device_class & 0x1F00) >> 8
|
||||
minor_class = (device_class & 0xFF) >> 2
|
||||
if "LEGO Hub" in name:
|
||||
minor_class = 1
|
||||
logger.info(f"Pretend to be LEGO EV3 with LEGO Hub: class={major_class/minor_class}")
|
||||
if major_class == self.major_class and minor_class == self.minor_class:
|
||||
self.found_devices[address] = (name, device_class, rssi)
|
||||
|
||||
@@ -185,7 +190,13 @@ class BTSession(Session):
|
||||
self.ping_time = None
|
||||
|
||||
def discover(self):
|
||||
discoverer = self.BTDiscoverer(self.major_device_class, self.minor_device_class)
|
||||
major = self.major_device_class
|
||||
minor = self.minor_device_class
|
||||
if major == 8 and minor == 1:
|
||||
logger.info(f"Search LEGO Hub instead of LEGO EV3")
|
||||
minor = 4
|
||||
logger.debug(f"BT discover: class={major}/{minor}")
|
||||
discoverer = self.BTDiscoverer(major, minor)
|
||||
discoverer.find_devices(lookup_names=True)
|
||||
while self.session.status == self.session.DISCOVERY and not discoverer.done and not self.cancel_discovery:
|
||||
readable = select.select([discoverer], [], [], 0.5)[0]
|
||||
@@ -203,7 +214,7 @@ class BTSession(Session):
|
||||
def run(self):
|
||||
while self.session.status != self.session.DONE:
|
||||
|
||||
logger.debug("loop in BT thread")
|
||||
logger.debug(f"loop in BT thread: session status={self.session.status}")
|
||||
current_time = int(round(time.time()))
|
||||
|
||||
if self.session.status == self.session.DISCOVERY and not self.cancel_discovery:
|
||||
@@ -353,6 +364,14 @@ class BLESession(Session):
|
||||
scan_lock = threading.RLock()
|
||||
scan_started = False
|
||||
|
||||
@staticmethod
|
||||
def _getDevName(dev):
|
||||
"""
|
||||
Get AdType 0x09 (Completed local name). If it is not available,
|
||||
get AdType 0x08 (Shortened local name).
|
||||
"""
|
||||
return dev.getValueText(0x9) or dev.getValueText(0x8)
|
||||
|
||||
class BLEThread(threading.Thread):
|
||||
"""
|
||||
Separated thread to control notifications to Scratch.
|
||||
@@ -372,7 +391,7 @@ class BLESession(Session):
|
||||
for d in devices:
|
||||
params = { 'rssi': d.rssi }
|
||||
params['peripheralId'] = devices.index(d)
|
||||
params['name'] = d.getValueText(0x9) or d.getValueText(0x8)
|
||||
params['name'] = BLESession._getDevName(d)
|
||||
self.session.notify('didDiscoverPeripheral', params)
|
||||
time.sleep(1)
|
||||
elif self.session.status == self.session.CONNECTED:
|
||||
@@ -389,7 +408,8 @@ class BLESession(Session):
|
||||
logger.debug("after waitForNotification")
|
||||
logger.debug("released lock for waitForNotification")
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.error(f"Exception in waitForNotifications: "
|
||||
f"{type(e).__name__}: {e}")
|
||||
self.session.close()
|
||||
break
|
||||
else:
|
||||
@@ -413,7 +433,8 @@ class BLESession(Session):
|
||||
self.restart_notification_event.set()
|
||||
|
||||
def add_handle(self, serviceId, charId, handle):
|
||||
logger.debug(f"add handle for notification: {handle}")
|
||||
logger.debug(f"add handle for notification: "
|
||||
f"{serviceId} {charId} {handle}")
|
||||
params = { 'serviceId': UUID(serviceId).getCommonName(),
|
||||
'characteristicId': charId,
|
||||
'encoding': 'base64' }
|
||||
@@ -421,6 +442,14 @@ class BLESession(Session):
|
||||
|
||||
def handleNotification(self, handle, data):
|
||||
logger.debug(f"BLE notification: {handle} {data}")
|
||||
if handle not in self.handles:
|
||||
logger.error(f"Notification with unknown handle: {handle}")
|
||||
keys = list(self.handles.keys())
|
||||
if keys and len(keys) == 1:
|
||||
logger.debug(f"Debug: override {handle} with {keys[0]}")
|
||||
handle = keys[0]
|
||||
else:
|
||||
return
|
||||
params = self.handles[handle].copy()
|
||||
params['message'] = base64.standard_b64encode(data).decode('ascii')
|
||||
self.session.notify('characteristicDidChange', params)
|
||||
@@ -453,12 +482,14 @@ class BLESession(Session):
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
def _get_dev_uuid(self, dev):
|
||||
def _get_dev_uuids(self, dev):
|
||||
for adtype in self.SERVICE_CLASS_UUID_ADTYPES:
|
||||
service_class_uuid = dev.getValueText(adtype)
|
||||
if service_class_uuid:
|
||||
logger.debug(self.SERVICE_CLASS_UUID_ADTYPES[adtype])
|
||||
return UUID(service_class_uuid)
|
||||
service_class_uuids = dev.getValue(adtype)
|
||||
if service_class_uuids:
|
||||
for u in service_class_uuids:
|
||||
a = self.SERVICE_CLASS_UUID_ADTYPES[adtype]
|
||||
logger.debug(f"service class uuid for {a}/{adtype}: {u}")
|
||||
return service_class_uuids
|
||||
return None
|
||||
|
||||
def matches(self, dev, filters):
|
||||
@@ -471,18 +502,19 @@ class BLESession(Session):
|
||||
for s in f['services']:
|
||||
logger.debug(f"service to check: {s}")
|
||||
given_uuid = s
|
||||
logger.debug(f"given: {given_uuid}")
|
||||
dev_uuid = self._get_dev_uuid(dev)
|
||||
if not dev_uuid:
|
||||
logger.debug(f"given UUID: {given_uuid} hash={UUID(given_uuid).__hash__()}")
|
||||
dev_uuids = self._get_dev_uuids(dev)
|
||||
if not dev_uuids:
|
||||
continue
|
||||
logger.debug(f"dev: {dev_uuid}")
|
||||
logger.debug(given_uuid == dev_uuid)
|
||||
if given_uuid == dev_uuid:
|
||||
for u in dev_uuids:
|
||||
logger.debug(f"dev UUID: {u} hash={u.__hash__()}")
|
||||
logger.debug(given_uuid == u)
|
||||
if given_uuid == u:
|
||||
logger.debug("match...")
|
||||
return True
|
||||
if 'namePrefix' in f:
|
||||
# 0x08: Shortened Local Name
|
||||
deviceName = dev.getValueText(0x08)
|
||||
deviceName = self._getDevName(dev)
|
||||
if not deviceName:
|
||||
continue
|
||||
logger.debug(f"Name of \"{deviceName}\" begins with: \"{f['namePrefix']}\"?")
|
||||
@@ -598,7 +630,7 @@ class BLESession(Session):
|
||||
elif self.status == self.DISCOVERY and method == 'connect':
|
||||
logger.debug("connecting to the BLE device")
|
||||
self.device = BLESession.found_devices[params['peripheralId']]
|
||||
self.deviceName = self.device.getValueText(0x9) or self.device.getValueText(0x8)
|
||||
self.deviceName = self._getDevName(self.device)
|
||||
try:
|
||||
self.perip = Peripheral(self.device)
|
||||
logger.info(f"connected to the BLE peripheral: {self.deviceName}")
|
||||
@@ -675,11 +707,19 @@ class BLESession(Session):
|
||||
service = self._get_service(service_id)
|
||||
c = self._get_characteristic(chara_id)
|
||||
handle = c.getHandle()
|
||||
# get CCCD or Client Characterstic Configuration Descriptor
|
||||
cccd = None
|
||||
for d in c.getDescriptors():
|
||||
if d.uuid == UUID(CCCD_UUID):
|
||||
cccd = d
|
||||
if not cccd:
|
||||
logger.error("Characteristic {char_id} does not have CCCD")
|
||||
return
|
||||
# prepare notification handler
|
||||
self.delegate.add_handle(service_id, chara_id, handle)
|
||||
# request notification to the BLE device
|
||||
with self.lock:
|
||||
self.perip.writeCharacteristic(handle + 1, value, True)
|
||||
self.perip.writeCharacteristic(cccd.handle, value, True)
|
||||
|
||||
def startNotifications(self, service_id, chara_id):
|
||||
logger.debug(f"start notification for {chara_id}")
|
||||
|
Reference in New Issue
Block a user