BLEDBusSession: get GATT services and characteristics

Using DBus interface, get GATT services and characteristics of the
connected BLE device. Also implement read request to a characteristic.
This commit is contained in:
Shin'ichiro Kawasaki
2023-08-27 19:46:35 +09:00
parent 39b157e839
commit 94a9555c4c

View File

@@ -289,6 +289,7 @@ class BLEDBusSession(Session):
except Exception:
None
params['peripheralId'] = node_name
self.devpath = devpath
await self._send_notification('didDiscoverPeripheral', params)
logger.debug("end _find_device.")
@@ -306,8 +307,75 @@ class BLEDBusSession(Session):
self.status = self.INITIAL
self.dbus = sd_bus_open_system()
self.discovery_running = False
self.iface = None
self.devpath = None
self.services = {}
self.chars = {}
self.chars_cache = {}
self._connect_to_adapters()
async def _get_characteristics(self, service_path):
service_introspect = DbusInterfaceCommonAsync()
service_introspect._connect('org.bluez', service_path, bus=self.dbus)
s = await service_introspect.dbus_introspect()
parser = ET.fromstring(s)
nodes = parser.findall("./node")
if not nodes:
logger.error(f"characteristic not found at {service_path}")
return
for node in nodes:
path = service_path + '/' + node.attrib['name']
if self.chars.get(path):
continue
logger.debug(f"getting GATT characteristic at {path}")
char = GattCharacteristicInterfaceAsync()
char._connect('org.bluez', path, bus=self.dbus)
self.chars[path] = char
cid = await char.uuid
logger.debug(f"found char {cid}")
async def _get_services(self):
# do D-Bus introspect to the device path and get service paths under it
dev_introspect = DbusInterfaceCommonAsync()
dev_introspect._connect('org.bluez', self.devpath, bus=self.dbus)
s = await dev_introspect.dbus_introspect()
parser = ET.fromstring(s)
nodes = parser.findall("./node")
if not nodes:
logger.error("service not found")
return []
for node in nodes:
path = self.devpath + '/' + node.attrib['name']
if self.services.get(path):
continue
logger.debug(f"getting GATT service at {path}")
service = GattServiceInterfaceAsync()
service._connect('org.bluez', path, bus=self.dbus)
self.services[path] = service
sid = await service.uuid
logger.debug(f"found service {sid}")
await self._get_characteristics(path)
async def _get_char(self, id):
char = self.chars_cache.get(id)
if char:
return char
await self._get_services()
btuuid = BTUUID(id)
for char in self.chars.values():
raw_uuid = await char.uuid
char_uuid = BTUUID(raw_uuid)
if char_uuid == btuuid:
self.chars_cache[id] = char
return char
logger.error(f"Can not get characteristic: {id}")
return None
async def _start_notification(self, char):
logger.debug('startNotification')
(fd, mtu) = await char.acquire_notify({})
logger.debug(f'fd={fd}')
def handle_request(self, method, params):
logger.debug("handle request")
@@ -347,6 +415,17 @@ class BLEDBusSession(Session):
res["error"] = { "message": "Failed to connect to device" }
self.status = self.DONE
elif self.status == self.CONNECTED and method == 'read':
logger.debug("handle read request")
service_id = params['serviceId']
chara_id = params['characteristicId']
c = await self._get_char(chara_id)
value = await c.read_value({})
message = base64.standard_b64encode(value).decode('ascii')
res['result'] = { 'message': message, 'encode': 'base64' }
if params.get('startNotifications') == True:
await self._start_notification(c)
logger.debug(res)
return res