BLEDBusSession: Create Device class to gather device attributes

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
This commit is contained in:
Shin'ichiro Kawasaki
2023-09-18 15:31:46 +09:00
parent adb746995e
commit 06df4af4a0

View File

@@ -196,6 +196,14 @@ class BLEDBusSession(Session):
connected_devices = {}
class Device():
def __init__(self, interface, path, node_name, name, address):
self.interface = interface
self.path = path
self.node_name = node_name
self.name = name
self.address = address
class Notification():
def __init__(self, loop, acquired_fd, fd, fp, params):
self.loop = loop
@@ -251,7 +259,7 @@ class BLEDBusSession(Session):
logger.debug(f"service to check: {s}")
given_uuid = BTUUID(s)
logger.debug(f"given UUID: {given_uuid} hash={given_uuid.__hash__()}")
dev_uuids = await dev.uuids
dev_uuids = await dev.interface.uuids
if not dev_uuids:
logger.debug(f"dev UUID not available")
continue
@@ -264,10 +272,9 @@ class BLEDBusSession(Session):
return True
if 'namePrefix' in f:
logger.debug(f"given namePrefix: {f['namePrefix']}")
deviceName = await dev.name
if deviceName:
logger.debug(f"name: {deviceName}")
if deviceName.startswith(f['namePrefix']):
if dev.name:
logger.debug(f"name: {dev. name}")
if dev.name.startswith(f['namePrefix']):
logger.debug(f"match...")
return True
if 'name' in f or 'manufactureData' in f:
@@ -276,23 +283,20 @@ class BLEDBusSession(Session):
# ref: https://github.com/LLK/scratch-link/blob/develop/Documentation/BluetoothLE.md
return False
async def _notify_device(self, device, node_name) -> None:
async def _notify_device(self, device) -> None:
params = { 'rssi': -80, 'name': 'Unknown' }
try:
params['rssi'] = await device.rssi
params['rssi'] = await device.interface.rssi
except Exception:
None
try:
params['name'] = await device.name
except Exception:
None
params['peripheralId'] = node_name
if device.name:
params['name'] = device.name
params['peripheralId'] = device.node_name
await self._send_notification('didDiscoverPeripheral', params)
async def _find_devices(self) -> None:
assert self.discovery_running
while self.discovery_running:
logger.info("in _find_devices: wait 1 second")
await sleep(1)
s = await self.adapter_introspect.dbus_introspect()
parser = ET.fromstring(s)
@@ -304,20 +308,25 @@ class BLEDBusSession(Session):
for node in nodes:
node_name = node.attrib['name']
logger.debug(f" {node_name}")
if self.found_devices.get(node_name):
continue
devpath = self.iface + "/" + node_name
if BLEDBusSession.connected_devices.get(devpath):
continue
device = DeviceInterfaceAsync()
device._connect('org.bluez', devpath, bus=self.dbus)
interface = DeviceInterfaceAsync()
interface._connect('org.bluez', devpath, bus=self.dbus)
try:
devname = await interface.name
except Exception as e:
logger.debug(f"device {node_name} does not have name: {e}")
devaddr = await interface.address
device = self.Device(interface, devpath, node_name, devname,
devaddr)
if not await self._matches(device, self.discover_filters):
continue
if self.found_devices.get(node_name):
await interface.disconnect()
continue
self.found_devices[node_name] = device
self.devpath = devpath
self.devname = await device.name
self.devaddr = await device.address
await self._notify_device(device, node_name)
await self._notify_device(device)
logger.debug("end _find_device.")
@@ -336,9 +345,6 @@ class BLEDBusSession(Session):
self.dbus = sd_bus_open_system()
self.discovery_running = False
self.iface = None
self.devpath = None
self.devname = None
self.devaddr = None
self.services = {}
self.chars = {}
self.chars_cache = {}
@@ -370,7 +376,8 @@ class BLEDBusSession(Session):
# do D-Bus introspect to the device path and get service paths under it
for i in range(5):
dev_introspect = DbusInterfaceCommonAsync()
dev_introspect._connect('org.bluez', self.devpath, bus=self.dbus)
dev_introspect._connect('org.bluez', self.device.path,
bus=self.dbus)
s = await dev_introspect.dbus_introspect()
parser = ET.fromstring(s)
nodes = parser.findall("./node")
@@ -382,7 +389,7 @@ class BLEDBusSession(Session):
if not nodes:
return []
for node in nodes:
path = self.devpath + '/' + node.attrib['name']
path = self.device.path + '/' + node.attrib['name']
if self.services.get(path):
continue
logger.debug(f"getting GATT service at {path}")
@@ -463,14 +470,15 @@ class BLEDBusSession(Session):
elif self.status == self.DISCOVERY and method == 'connect':
logger.debug("connecting to the BLE device")
self.device = self.found_devices[params['peripheralId']]
dev = self.found_devices[params['peripheralId']]
try:
logger.debug(f" {self.device}")
await self.device.connect()
logger.debug(f" {dev}")
await dev.interface.connect()
res["result"] = None
self.device = dev
self.status = self.CONNECTED
logger.info(f"Connected: '{self.devname}'@{self.devaddr}")
BLEDBusSession.connected_devices[self.devpath] = self.device
logger.info(f"Connected: '{dev.name}'@{dev.address}")
BLEDBusSession.connected_devices[dev.path] = dev
except NotImplementedError as e:
logger.error(e)
res["error"] = { "message": "Failed to connect to device" }
@@ -515,15 +523,13 @@ class BLEDBusSession(Session):
async def async_close(self):
if not self.device:
return
logger.info(f"Disconnecting from '{self.devname}'@{self.devaddr}")
dev = self.device
logger.info(f"Disconnecting from '{dev.name}'@{dev.address}")
self._stop_notifications()
await self.device.disconnect()
BLEDBusSession.connected_devices.pop(self.devpath)
logger.info(f"Disconnected from '{self.devname}'@{self.devaddr}")
await dev.interface.disconnect()
BLEDBusSession.connected_devices.pop(dev.path)
logger.info(f"Disconnected from '{dev.name}'@{dev.address}")
self.device = None
self.devpath = None
self.devname = None
self.devaddr = None
await self.websocket.close()
return
@@ -582,6 +588,7 @@ class BLESession(Session):
time.sleep(1)
elif self.session.status == self.session.CONNECTED:
logger.debug("in connected status:")
delegate = self.session.delegate
if delegate and len(delegate.handles) > 0:
if not delegate.restart_notification_event.is_set():
@@ -672,7 +679,7 @@ class BLESession(Session):
"""
Check if the found BLE device matches the filters Scratch specifies.
"""
logger.debug(f"in matches {dev.addr} {filters}")
logger.debug(f"in matches {dev.address} {filters}")
for f in filters:
if 'services' in f:
for s in f['services']: