8 Commits
dbus2 ... dbus

Author SHA1 Message Date
Shin'ichiro Kawasaki
2482bba1d8 ble: Separate BLEDBusSession class to a separated new file ble.py
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-18 16:51:43 +09:00
Shin'ichiro Kawasaki
06df4af4a0 BLEDBusSession: Create Device class to gather device attributes
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-18 15:31:46 +09:00
Shin'ichiro Kawasaki
adb746995e BLEDbusSession: implement multiple devices support and disconnection
Also implemented service search retry to stabilize the first read
operation. Next work is to create a class to gather device attirbutes
that BLEDBusSession refers.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-10 20:32:03 +09:00
Shin'ichiro Kawasaki
157e3458b0 BLEDBusSession: implement write method
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-05 21:20:18 +09:00
Shin'ichiro Kawasaki
fe3cd35ac0 BLEDbusSession: implement notification reader
Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-09-05 21:08:15 +09:00
Shin'ichiro Kawasaki
94a9555c4c BLEDBusSession: get GATT services and characteristics
Using DBus interface, get GATT services and characteristics of the
connected BLE device. Also implement read request to a characteristic.
2023-08-27 19:46:35 +09:00
Shin'ichiro Kawasaki
39b157e839 BLEDBusSession: introduce BTUUID class
bluepy provides its unique UUID class which handles UUIDs of Bluetooth
devices well. Not to depend on bluepy, introduce BTUUID which extends
python standard uuid.UUID class.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-08-27 11:27:51 +09:00
Shin'ichiro Kawasaki
f79e7ac889 BLEDBusSession: WIP
I implmented 'discovery' and 'connect' commands. Next action is to
implment 'read', 'write' and 'notify'. I'm not yet sure how DBus handles
'notify'.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
2023-08-27 11:27:35 +09:00
3 changed files with 25 additions and 26 deletions

View File

@@ -102,7 +102,7 @@ Installation
5. For micro:bit, install Scratch-link hex on your device.
* Download and unzip the [micro:bit Scratch Hex file](https://downloads.scratch.mit.edu/microbit/scratch-microbit.hex.zip).
* Download and unzip the [micro:bit Scratch Hex file](https://downloads.scratch.mit.edu/microbit/scratch-microbit-1.1.0.hex.zip).
* Flash the micro:bit over USB with the Scratch Hex File, you will see the
five character name of the micro:bit scroll across the screen such as
'zo9ev'.

View File

@@ -34,8 +34,8 @@ class BLEDBusSession(pyscrlink.scratch_link.Session):
connected_devices = {}
class Device():
def __init__(self, iface, path, node_name, name, address):
self.iface = iface
def __init__(self, interface, path, node_name, name, address):
self.interface = interface
self.path = path
self.node_name = node_name
self.name = name
@@ -95,7 +95,7 @@ class BLEDBusSession(pyscrlink.scratch_link.Session):
logger.debug(f"service to check: {s}")
given_uuid = BTUUID(s)
logger.debug(f"given UUID: {given_uuid} hash={given_uuid.__hash__()}")
dev_uuids = await dev.iface.uuids
dev_uuids = await dev.interface.uuids
if not dev_uuids:
logger.debug(f"dev UUID not available")
continue
@@ -122,7 +122,7 @@ class BLEDBusSession(pyscrlink.scratch_link.Session):
async def _notify_device(self, device) -> None:
params = { 'rssi': -80, 'name': 'Unknown' }
try:
params['rssi'] = await device.iface.rssi
params['rssi'] = await device.interface.rssi
except Exception:
None
if device.name:
@@ -149,17 +149,17 @@ class BLEDBusSession(pyscrlink.scratch_link.Session):
devpath = self.iface + "/" + node_name
if BLEDBusSession.connected_devices.get(devpath):
continue
iface = DeviceInterfaceAsync()
iface._connect('org.bluez', devpath, bus=self.dbus)
interface = DeviceInterfaceAsync()
interface._connect('org.bluez', devpath, bus=self.dbus)
try:
devname = await iface.name
devname = await interface.name
except Exception as e:
logger.debug(f"device {node_name} does not have name: {e}")
devaddr = await iface.address
device = self.Device(iface, devpath, node_name, devname,
devaddr = await interface.address
device = self.Device(interface, devpath, node_name, devname,
devaddr)
if not await self._matches(device, self.discover_filters):
await iface.disconnect()
await interface.disconnect()
continue
self.found_devices[node_name] = device
await self._notify_device(device)
@@ -309,7 +309,7 @@ class BLEDBusSession(pyscrlink.scratch_link.Session):
dev = self.found_devices[params['peripheralId']]
try:
logger.debug(f" {dev}")
await dev.iface.connect()
await dev.interface.connect()
res["result"] = None
self.device = dev
self.status = self.CONNECTED
@@ -362,7 +362,7 @@ class BLEDBusSession(pyscrlink.scratch_link.Session):
dev = self.device
logger.info(f"Disconnecting from '{dev.name}'@{dev.address}")
self._stop_notifications()
await dev.iface.disconnect()
await dev.interface.disconnect()
BLEDBusSession.connected_devices.pop(dev.path)
logger.info(f"Disconnected from '{dev.name}'@{dev.address}")
self.device = None

View File

@@ -33,8 +33,17 @@ from pyscrlink import gencert
from pyscrlink import ble
logLevel = logging.INFO
# for logging
logger = logging.getLogger('pyscrlink.scratch_link')
formatter = logging.Formatter(fmt='%(asctime)s %(message)s')
handler = logging.StreamHandler()
handler.setLevel(logLevel)
handler.setFormatter(formatter)
logger.setLevel(logLevel)
logger.addHandler(handler)
logger.propagate = False
HOSTNAME="device-manager.scratch.mit.edu"
scan_seconds=10.0
@@ -588,24 +597,14 @@ def main():
parser.add_argument('-b', '--dbus', action='store_true',
help='use DBus backend for BLE devices')
args = parser.parse_args()
logLevel = logging.INFO
if args.debug:
print("Print debug messages")
logLevel = logging.DEBUG
formatter = logging.Formatter(fmt='%(asctime)s %(message)s')
handler = logging.StreamHandler()
handler.setLevel(logLevel)
handler.setFormatter(formatter)
logger.setLevel(logLevel)
logger.addHandler(handler)
logger.propagate = False
handler.setLevel(logLevel)
logger.setLevel(logLevel)
scan_seconds = args.scan_seconds
scan_retry = args.scan_retry
dbus = args.dbus
if args.debug:
logger.debug("Print debug messages")
logger.debug(f"set scan_seconds: {scan_seconds}")
logger.debug(f"set scan_retry: {scan_retry}")