freelance-project-34-market.../python/online/fxreader/pr34/tasks/ble.py
LLM c2bfca5550 [+] ruff line-length 100, add meson:install:list, bump pr34 v0.1.5.66
1. set ruff line-length to 100, reformat 18 files;
  2. move pr34 version to pyproject.common.toml, read via toml in meson.build;
  3. fix meson install_subdir to only include meson/toolchains, not entire meson/;
  4. add meson:install:list command with --mode meson|pyproject for dry-run;
  5. add .venv-whl-cache to .gitignore exceptions and .gitattributes lfs tracking;
  6. release pr34 v0.1.5.66 .whl;
  7. commit python/.venv-whl-cache and archlinux/.venv-whl-cache via lfs;
2026-04-06 09:56:11 +00:00

118 lines
2.0 KiB
Python

import bleak
import inspect
import traceback
import pprint
async def f1():
devices = await bleak.BleakScanner.discover()
return devices
async def f2(device, timeout=None):
if timeout is None:
timeout = 1.0
assert isinstance(timeout, float) and timeout >= 1e-8
p = await bleak.BleakClient(
device,
timeout=timeout,
).__aenter__()
return p
async def f3(client):
t1 = [
dict(
service=o.__dict__,
characteristics=[o2.__dict__ for o2 in o.characteristics],
)
for o in client.services
]
return t1
async def f5(
name_check=None,
):
t2 = []
attempt = 0
while True:
t1 = await f1()
pprint.pprint([o.__dict__ for o in t1])
if not name_check is None:
assert inspect.isfunction(name_check)
t5 = {i: o.details[0].name() for i, o in enumerate(t1)}
t2.extend([t1[k] for k, v in t5.items() if isinstance(v, str) and name_check(v)])
else:
t2.extend(t1)
if len(t2) > 0:
break
attempt += 1
print('\rattempt #%d' % attempt, end='')
return t2
async def f4(
timeout=None,
characteristics=None,
operations=None,
name_check=None,
):
if isinstance(name_check, str):
assert name_check in [
'watch fit',
]
name_check2 = lambda current_name: name_check.lower() in current_name.lower()
else:
name_check2 = name_check
assert not name_check2 is None
if characteristics is None:
characteristics = [
'0000ffd1-0000-1000-8000-00805f9b34fb',
]
t2 = await f5(
name_check=name_check2,
)
if len(t2) == 0:
print('not found')
return
t3 = None
try:
t3 = await f2(t2[0], timeout=timeout)
t4 = await f3(t3)
pprint.pprint(t4)
if not operations is None and inspect.isfunction(operations):
await operations(
client=t3,
t4=t4,
)
else:
t6 = {}
for o in characteristics:
try:
t7 = await t3.read_gatt_char(o)
except Exception as exception:
print(traceback.format_exc())
t7 = None
t6[o] = t7
pprint.pprint(t6)
finally:
if not t3 is None:
await t3.disconnect()