mirror of
https://github.com/morgan9e/UxPlay
synced 2026-04-15 00:34:05 +09:00
windows beacon seems to be working
This commit is contained in:
@@ -4,25 +4,98 @@
|
|||||||
# a standalone python-3.6 or later winrt-based AirPlay Service-Discovery Bluetooth LE beacon for UxPlay
|
# a standalone python-3.6 or later winrt-based AirPlay Service-Discovery Bluetooth LE beacon for UxPlay
|
||||||
# (c) F. Duncanh, October 2025
|
# (c) F. Duncanh, October 2025
|
||||||
|
|
||||||
|
|
||||||
import gi
|
import gi
|
||||||
try:
|
try:
|
||||||
from gi.repository import GLib
|
from gi.repository import GLib
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print(f"ImportError: failed to import GLib")
|
print(f"ImportError: failed to import GLib")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Import WinRT APIs
|
||||||
|
|
||||||
|
try:
|
||||||
|
import winrt.windows.foundation.collections
|
||||||
|
except ImportError:
|
||||||
|
print(f"ImportError from winrt-Windows.Foundation.Collections")
|
||||||
|
print(f"Install with 'pip install winrt-Windows.Foundation.Collections'")
|
||||||
|
|
||||||
|
try:
|
||||||
|
import winrt.windows.devices.bluetooth.advertisement as ble_adv
|
||||||
|
except ImportError:
|
||||||
|
print(f"ImportError from winrt-Windows.Devices.Bluetooth.Advertisement")
|
||||||
|
print(f"Install with 'pip install winrt-Windows.Devices.Bluetooth.Advertisement'")
|
||||||
|
|
||||||
|
try:
|
||||||
|
import winrt.windows.storage.streams as streams
|
||||||
|
except ImportError:
|
||||||
|
print(f"ImportError from winrt-Windows.Storage.Streams")
|
||||||
|
print(f"Install with 'pip install winrt-Windows.Storage.Streams'")
|
||||||
|
|
||||||
|
import struct
|
||||||
|
import ipaddress
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
#global variables used by winrt.windows.devices.bluetooth.advertisement code
|
||||||
|
publisher = None
|
||||||
|
|
||||||
|
def on_status_changed(sender, args):
|
||||||
|
print(f"Publisher status: {args.status}")
|
||||||
|
|
||||||
|
|
||||||
|
def create_airplay_service_discovery_advertisement_publisher(ipv4_str, port):
|
||||||
|
assert port > 0
|
||||||
|
assert port <= 65535
|
||||||
|
mfg_data = bytearray([0x09, 0x08, 0x13, 0x30]) # Apple Data Unit type 9 (Airplay), length 8, flags 0001 0011, seed 30
|
||||||
|
ipv4_address = ipaddress.ip_address(ipv4_str)
|
||||||
|
ipv4 = bytearray(ipv4_address.packed)
|
||||||
|
mfg_data.extend(ipv4)
|
||||||
|
port_bytes = port.to_bytes(2, 'big')
|
||||||
|
mfg_data.extend(port_bytes)
|
||||||
|
writer = streams.DataWriter()
|
||||||
|
writer.write_bytes(mfg_data)
|
||||||
|
manufacturer_data = ble_adv.BluetoothLEManufacturerData()
|
||||||
|
manufacturer_data.company_id = 0x004C #Apple
|
||||||
|
manufacturer_data.data = writer.detach_buffer()
|
||||||
|
advertisement = ble_adv.BluetoothLEAdvertisement()
|
||||||
|
advertisement.manufacturer_data.append(manufacturer_data)
|
||||||
|
global publisher
|
||||||
|
publisher = ble_adv.BluetoothLEAdvertisementPublisher(advertisement)
|
||||||
|
publisher.add_status_changed(on_status_changed)
|
||||||
|
|
||||||
|
async def publish_advertisement():
|
||||||
|
try:
|
||||||
|
publisher.start()
|
||||||
|
print(f"Publisher started successfully")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to start Publihser: {e}")
|
||||||
|
ptint(f"Publisher Status: {publisher.status}")
|
||||||
|
|
||||||
|
|
||||||
def setup_beacon(ipv4_str, port, advmin, advmax, index):
|
def setup_beacon(ipv4_str, port, advmin, advmax, index):
|
||||||
print(f"setup_beacon port {ipv4_str}:{port} [{advmin}:{advmax}] ({index})")
|
#index will be ignored
|
||||||
|
print(f"setup_beacon port {ipv4_str}:{port} Note: min, max advertising interval and index are not used)")
|
||||||
|
create_airplay_service_discovery_advertisement_publisher(ipv4_str, port)
|
||||||
|
|
||||||
def beacon_on():
|
def beacon_on():
|
||||||
print(f"beacon_on")
|
print(f"beacon_on")
|
||||||
return True
|
global publisher
|
||||||
|
try:
|
||||||
|
asyncio.run( publish_advertisement())
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to start publisher: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def beacon_off():
|
def beacon_off():
|
||||||
print(f"beacon off")
|
print(f"beacon off")
|
||||||
|
global publisher
|
||||||
|
publisher.stop()
|
||||||
|
publisher = None
|
||||||
|
|
||||||
#==generic code (non-dbus) below here =============
|
#==generic code (non-winrt) below here =============
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import os
|
import os
|
||||||
|
|||||||
Reference in New Issue
Block a user