#!/usr/bin/env python3 import argparse import json import os import sys import threading import time import requests from pykefcontrol.kef_connector import KefConnector from zeroconf import IPVersion, ServiceBrowser, ServiceListener, Zeroconf SOURCE_ALIASES = {"optic": "optical"} SOURCES = ("wifi", "bluetooth", "tv", "optical", "coaxial", "analog", "standby") DISCOVERY_SERVICE_TYPES = ( "_kef-info._tcp.local.", "_sues800device._tcp.local.", "_http._tcp.local.", ) class KefDiscoveryListener(ServiceListener): def __init__(self): self._lock = threading.Lock() self.speakers = {} def add_service(self, zeroconf, service_type, name): self._record_service(zeroconf, service_type, name) def update_service(self, zeroconf, service_type, name): self._record_service(zeroconf, service_type, name) def remove_service(self, zeroconf, service_type, name): return None def _record_service(self, zeroconf, service_type, name): info = zeroconf.get_service_info(service_type, name, timeout=1000) if info is None: return properties = { key.decode("utf-8", errors="replace"): ( value.decode("utf-8", errors="replace") if value is not None else "" ) for key, value in info.properties.items() } addresses = [ address for address in info.parsed_addresses(IPVersion.V4Only) if not address.startswith("127.") ] if not addresses: return is_kef = ( service_type in ("_kef-info._tcp.local.", "_sues800device._tcp.local.") or properties.get("manufacturer", "").lower() == "kef" or "kef" in name.lower() or "ls50" in name.lower() ) if not is_kef: return key = addresses[0] with self._lock: existing = self.speakers.get(key, {}) self.speakers[key] = { **existing, "address": addresses[0], "hostname": (info.server.rstrip(".") if info.server else None) or existing.get("hostname"), "name": properties.get("name") or properties.get("fn") or existing.get("name") or strip_service_name(name), "model": properties.get("modelName") or properties.get("model") or properties.get("mn") or existing.get("model"), "version": properties.get("version") or existing.get("version"), "service": existing.get("service") or service_type.removesuffix(".local."), } def strip_service_name(name): for suffix in DISCOVERY_SERVICE_TYPES: if name.endswith("." + suffix): return name[: -(len(suffix) + 1)] return name.rstrip(".") def discover_speakers(timeout): listener = KefDiscoveryListener() zeroconf = Zeroconf(ip_version=IPVersion.V4Only) try: browsers = [ ServiceBrowser(zeroconf, service_type, listener) for service_type in DISCOVERY_SERVICE_TYPES ] deadline = time.monotonic() + timeout while time.monotonic() < deadline: if listener.speakers: time.sleep(0.25) break time.sleep(0.05) return sorted( listener.speakers.values(), key=lambda speaker: ( speaker.get("name") or "", speaker.get("address") or "", ), ) finally: for browser in locals().get("browsers", []): browser.cancel() zeroconf.close() def discover_host(timeout): speakers = discover_speakers(timeout) if not speakers: raise SystemExit( "Could not discover a KEF speaker. Pass --host or set KEF_HOST." ) if len(speakers) > 1: choices = ", ".join( f"{speaker.get('name') or 'KEF'} at {speaker['address']}" for speaker in speakers ) raise SystemExit(f"Multiple KEF speakers discovered: {choices}. Pass --host explicitly.") return speakers[0]["address"] def connector(args): host = ( args.host or os.environ.get("KEF_HOST") or os.environ.get("KEF_IP") or discover_host(args.discovery_timeout) ) return KefConnector(host, model=args.model) def print_status(speaker): info = { "name": speaker.speaker_name, "model": speaker.speaker_model, "firmware": speaker.firmware_version, "status": speaker.status, "source": speaker.source, "volume": speaker.volume, "playing": speaker.is_playing, } try: song = speaker.get_song_information() if any(song.values()): info["song"] = song except (KeyError, IndexError, TypeError, requests.RequestException): pass print(json.dumps(info, indent=2, sort_keys=True)) def bounded_volume(value): return max(0, min(100, value)) def normalize_source(source): if source is None: return None return SOURCE_ALIASES.get(source, source) def main(): parser = argparse.ArgumentParser(prog="kef", description="Control KEF W2 speakers.") parser.add_argument( "--host", help="Speaker IP address. Defaults to KEF_HOST/KEF_IP, then mDNS discovery.", ) parser.add_argument( "--model", default="LS50W2", help="Speaker model passed to pykefcontrol. Default: LS50W2.", ) parser.add_argument( "--discovery-timeout", default=2.0, type=float, help="Seconds to wait for mDNS discovery when --host/KEF_HOST is unset.", ) subparsers = parser.add_subparsers(dest="command", required=True) subparsers.add_parser("discover", help="List discovered KEF speakers as JSON.") subparsers.add_parser("status", help="Print speaker status as JSON.") subparsers.add_parser("on", help="Power on.") subparsers.add_parser("standby", help="Put the speaker in standby.") subparsers.add_parser("mute", help="Mute by setting volume to 0.") volume = subparsers.add_parser("volume", help="Get or set volume.") volume.add_argument("level", nargs="?", type=int, help="Volume level, 0-100.") up = subparsers.add_parser("up", help="Increase volume.") up.add_argument("step", nargs="?", default=3, type=int) down = subparsers.add_parser("down", help="Decrease volume.") down.add_argument("step", nargs="?", default=3, type=int) source = subparsers.add_parser("source", help="Get or set source.") source.add_argument("name", nargs="?", choices=SOURCES + tuple(SOURCE_ALIASES)) subparsers.add_parser("play-pause", help="Toggle play/pause.") subparsers.add_parser("next", help="Next track.") subparsers.add_parser("previous", help="Previous track.") args = parser.parse_args() if args.command == "discover": print(json.dumps(discover_speakers(args.discovery_timeout), indent=2, sort_keys=True)) return 0 speaker = connector(args) try: if args.command == "status": print_status(speaker) elif args.command == "on": speaker.power_on() elif args.command == "standby": speaker.shutdown() elif args.command == "mute": speaker.volume = 0 elif args.command == "volume": if args.level is None: print(speaker.volume) else: speaker.volume = bounded_volume(args.level) elif args.command == "up": speaker.volume = bounded_volume(speaker.volume + args.step) elif args.command == "down": speaker.volume = bounded_volume(speaker.volume - args.step) elif args.command == "source": if args.name is None: print(speaker.source) else: speaker.source = normalize_source(args.name) elif args.command == "play-pause": speaker.toggle_play_pause() elif args.command == "next": speaker.next_track() elif args.command == "previous": speaker.previous_track() except requests.RequestException as error: print(f"kef: failed to reach speaker: {error}", file=sys.stderr) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())