mirror of
https://github.com/morgan9e/UxPlay
synced 2026-04-14 16:24:29 +09:00
uxplay-beacon.py: code cleanups
This commit is contained in:
@@ -5,30 +5,21 @@
|
||||
# a standalone python-3.6 or later DBus-based AirPlay Service-Discovery Bluetooth LE beacon for UxPlay
|
||||
# (c) F. Duncanh, October 2025
|
||||
|
||||
import argparse
|
||||
import gi
|
||||
import os
|
||||
import sys
|
||||
import psutil
|
||||
import struct
|
||||
import socket
|
||||
|
||||
from gi.repository import GLib
|
||||
try:
|
||||
from gi.repository import GLib
|
||||
except ImportError:
|
||||
print(f"ImportError: failed to import GLib")
|
||||
|
||||
|
||||
import dbus
|
||||
import dbus.exceptions
|
||||
import dbus.mainloop.glib
|
||||
import dbus.service
|
||||
import time
|
||||
import threading
|
||||
|
||||
ad_manager = None
|
||||
airplay_advertisement = None
|
||||
port = int(0)
|
||||
advmin = int(100)
|
||||
advmax = int(100)
|
||||
ipv4_str = "ipv4_address"
|
||||
index = int(0)
|
||||
server_address = None
|
||||
|
||||
BLUEZ_SERVICE_NAME = 'org.bluez'
|
||||
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
|
||||
@@ -112,7 +103,7 @@ class AirPlay_Service_Discovery_Advertisement(dbus.service.Object):
|
||||
in_signature='',
|
||||
out_signature='')
|
||||
def Release(self):
|
||||
print('%s: Released!' % self.path)
|
||||
print(f'{self.path}: Released!')
|
||||
|
||||
|
||||
class AirPlayAdvertisement(AirPlay_Service_Discovery_Advertisement):
|
||||
@@ -134,14 +125,14 @@ class AirPlayAdvertisement(AirPlay_Service_Discovery_Advertisement):
|
||||
|
||||
|
||||
def register_ad_cb():
|
||||
global ipv4_str
|
||||
global port
|
||||
print(f'AirPlay Service_Discovery Advertisement ({ipv4_str}:{port}) registered')
|
||||
global server_address
|
||||
print(f'AirPlay Service_Discovery Advertisement ({server_address}) registered')
|
||||
|
||||
|
||||
def register_ad_error_cb(error):
|
||||
print('Failed to register advertisement: ' + str(error))
|
||||
mainloop.quit()
|
||||
print(f'Failed to register advertisement: {error}')
|
||||
global ad_manager
|
||||
ad_manager = None
|
||||
|
||||
|
||||
def find_adapter(bus):
|
||||
@@ -159,11 +150,13 @@ def find_adapter(bus):
|
||||
def setup_beacon(ipv4_str, port, advmin, advmax, index):
|
||||
global ad_manager
|
||||
global airplay_advertisement
|
||||
global server_address
|
||||
server_address = f"{ipv4_str}:{port}"
|
||||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||||
bus = dbus.SystemBus()
|
||||
adapter = find_adapter(bus)
|
||||
if not adapter:
|
||||
print('LEAdvertisingManager1 interface not found')
|
||||
print(f'LEAdvertisingManager1 interface not found')
|
||||
return
|
||||
adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
|
||||
"org.freedesktop.DBus.Properties")
|
||||
@@ -176,9 +169,16 @@ def setup_beacon(ipv4_str, port, advmin, advmax, index):
|
||||
|
||||
def beacon_on():
|
||||
global ad_manager
|
||||
global airplay_advertisement
|
||||
ad_manager.RegisterAdvertisement(airplay_advertisement.get_path(), {},
|
||||
reply_handler=register_ad_cb,
|
||||
error_handler=register_ad_error_cb)
|
||||
if ad_manager is None:
|
||||
airplay_advertisement = None
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def beacon_off():
|
||||
global ad_manager
|
||||
global airplay_advertisement
|
||||
@@ -190,12 +190,24 @@ def beacon_off():
|
||||
|
||||
#==generic code (non-dbus) below here =============
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import psutil
|
||||
import struct
|
||||
import socket
|
||||
|
||||
# global variables
|
||||
beacon_is_running = False
|
||||
beacon_is_pending_on = False
|
||||
beacon_is_pending_off = False
|
||||
|
||||
port = int(0)
|
||||
advmin = int(100)
|
||||
advmax = int(100)
|
||||
ipv4_str = "ipv4_address"
|
||||
index = int(0)
|
||||
|
||||
def start_beacon():
|
||||
global beacon_is_running
|
||||
global port
|
||||
@@ -204,8 +216,7 @@ def start_beacon():
|
||||
global advmax
|
||||
global index
|
||||
setup_beacon(ipv4_str, port, advmin, advmax, index)
|
||||
beacon_on()
|
||||
beacon_is_running = True
|
||||
beacon_is_running = beacon_on()
|
||||
|
||||
def stop_beacon():
|
||||
global beacon_is_running
|
||||
@@ -227,12 +238,10 @@ def check_pending():
|
||||
global beacon_is_pending_on
|
||||
global beacon_is_pending_off
|
||||
if beacon_is_running:
|
||||
#print(f"beacon running")
|
||||
if beacon_is_pending_off:
|
||||
stop_beacon()
|
||||
beacon_is_pending_off = False
|
||||
else:
|
||||
#print(f"beacon not running")
|
||||
if beacon_is_pending_on:
|
||||
start_beacon()
|
||||
beacon_is_pending_on = False
|
||||
@@ -259,12 +268,12 @@ def check_file_exists(file_path):
|
||||
if not beacon_is_running:
|
||||
beacon_is_pending_on = True
|
||||
else:
|
||||
print(f"orphan beacon file {file_path} exists, but process {pname} (pid {pid}) is no longer active")
|
||||
print(f'orphan beacon file {file_path} exists, but process {pname} (pid {pid}) is no longer active')
|
||||
try:
|
||||
os.remove(file_path)
|
||||
print(f"File '{file_path}' deleted successfully.")
|
||||
print(f'File "{file_path}" deleted successfully.')
|
||||
except FileNotFoundError:
|
||||
print(f"File '{file_path}' not found.")
|
||||
print(f'File "{file_path}" not found.')
|
||||
if beacon_is_running:
|
||||
beacon_is_pending_off = True
|
||||
else:
|
||||
@@ -281,7 +290,7 @@ def process_input(value):
|
||||
my_integer = int(value)
|
||||
return my_integer
|
||||
except ValueError:
|
||||
printf(f"Error: could not convert '{value}' to integer: {my_integer}")
|
||||
print(f'Error: could not convert "{value}" to integer: {my_integer}')
|
||||
return None
|
||||
|
||||
|
||||
@@ -289,11 +298,11 @@ def process_input(value):
|
||||
#check AdvInterval
|
||||
def check_adv_intrvl(min, max):
|
||||
if not (100 <= min):
|
||||
raise ValueError("AdvMin was smaller than 100 msecs")
|
||||
raise ValueError('AdvMin was smaller than 100 msecs')
|
||||
if not (max >= min):
|
||||
raise ValueError("AdvMax was smaller than AdvMin")
|
||||
raise ValueError('AdvMax was smaller than AdvMin')
|
||||
if not (max <= 10240):
|
||||
raise ValueError("AdvMax was larger than 10240 msecs")
|
||||
raise ValueError('AdvMax was larger than 10240 msecs')
|
||||
|
||||
|
||||
def main(file_path, ipv4_str_in, advmin_in, advmax_in, index_in):
|
||||
@@ -311,7 +320,7 @@ def main(file_path, ipv4_str_in, advmin_in, advmax_in, index_in):
|
||||
try:
|
||||
check_adv_intrvl(advmin, advmax)
|
||||
except ValueError as e:
|
||||
print(f"Error: {e}")
|
||||
print(f'Error: {e}')
|
||||
raise SystemExit(1)
|
||||
|
||||
GLib.timeout_add_seconds(5, on_timeout, file_path)
|
||||
@@ -319,12 +328,16 @@ def main(file_path, ipv4_str_in, advmin_in, advmax_in, index_in):
|
||||
mainloop = GLib.MainLoop()
|
||||
mainloop.run()
|
||||
except KeyboardInterrupt:
|
||||
print(f"\nExiting ...")
|
||||
print(f'\nExiting ...')
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
|
||||
if not sys.version_info >= (3,6):
|
||||
print("uxplay-beacon.py requires Python 3.6 or higher")
|
||||
|
||||
# Create an ArgumentParser object
|
||||
parser = argparse.ArgumentParser(
|
||||
@@ -383,7 +396,7 @@ if __name__ == '__main__':
|
||||
index = int(0)
|
||||
|
||||
if args.file:
|
||||
print(f"Using config file: {args.file}")
|
||||
print(f'Using config file: {args.file}')
|
||||
if os.path.exists(args.file):
|
||||
with open(args.file, 'r') as file:
|
||||
for line in file:
|
||||
@@ -403,22 +416,22 @@ if __name__ == '__main__':
|
||||
if value.isdigit():
|
||||
advmin = int(value)
|
||||
else:
|
||||
print(f"Invalid config file input (--AdvMin) {value} in {args.file}")
|
||||
print(f'Invalid config file input (--AdvMin) {value} in {args.file}')
|
||||
raise SystemExit(1)
|
||||
elif key == "--AdvMax":
|
||||
if value.isdigit():
|
||||
advmax = int(value)
|
||||
else:
|
||||
print(f"Invalid config file input (--AdvMax) {value} in {args.file}")
|
||||
print(f'Invalid config file input (--AdvMax) {value} in {args.file}')
|
||||
raise SystemExit(1)
|
||||
elif key == "--index":
|
||||
if value.isdigit():
|
||||
index = int(value)
|
||||
else:
|
||||
print(f"Invalid config file input (--index) {value} in {args.file}")
|
||||
print(f'Invalid config file input (--index) {value} in {args.file}')
|
||||
raise SystemExit(1)
|
||||
else:
|
||||
print(f"Unknown key \"{key}\" in config file {args.file}")
|
||||
print(f'Unknown key "{key}" in config file {args.file}')
|
||||
raise SystemExit(1)
|
||||
|
||||
if args.ipv4 == "use gethostbyname":
|
||||
@@ -431,26 +444,26 @@ if __name__ == '__main__':
|
||||
if args.AdvMin.isdigit():
|
||||
advmin = int(args.AdvMin)
|
||||
else:
|
||||
print("Invalid input (AdvMin) {args.AdvMin}")
|
||||
print(f'Invalid input (AdvMin) {args.AdvMin}')
|
||||
raise SystemExit(1)
|
||||
|
||||
if args.AdvMax != "0":
|
||||
if args.AdvMax.isdigit():
|
||||
advmax = int(args.AdvMax)
|
||||
else:
|
||||
print("Invalid input (AdvMin) {args.AdvMin}")
|
||||
print(f'Invalid input (AdvMin) {args.AdvMin}')
|
||||
raise SystemExit(1)
|
||||
|
||||
if args.index != "0":
|
||||
if args.index.isdigit():
|
||||
index = int(args.index)
|
||||
else:
|
||||
print("Invalid input (AdvMin) {args.AdvMin}")
|
||||
print(f'Invalid input (AdvMin) {args.AdvMin}')
|
||||
raise SystemExit(1)
|
||||
if index < 0:
|
||||
raise ValueError("index was negative (forbidden)")
|
||||
raise ValueError('index was negative (forbidden)')
|
||||
|
||||
print(f"AirPlay Service-Discovery Bluetooth LE beacon: using BLE file {args.path}, advmin:advmax {advmin}:{advmax} index:{index}")
|
||||
print(f"(Press Ctrl+C to exit)")
|
||||
print(f'AirPlay Service-Discovery Bluetooth LE beacon: using BLE file {args.path}, advmin:advmax {advmin}:{advmax} index:{index}')
|
||||
print(f'(Press Ctrl+C to exit)')
|
||||
main(args.path, ipv4_str, advmin, advmax, index)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user