From ad449a34164e59ed7575b7004ecea503db3af3a0 Mon Sep 17 00:00:00 2001 From: Ivan Malison Date: Wed, 15 Apr 2026 10:55:28 -0700 Subject: [PATCH] feat: add KEF speaker control command --- dotfiles/lib/bin/kef-control | 283 +++++++++++++++++++++++++++++++++++ nixos/overlay.nix | 35 +++++ 2 files changed, 318 insertions(+) create mode 100755 dotfiles/lib/bin/kef-control diff --git a/dotfiles/lib/bin/kef-control b/dotfiles/lib/bin/kef-control new file mode 100755 index 00000000..697d2ade --- /dev/null +++ b/dotfiles/lib/bin/kef-control @@ -0,0 +1,283 @@ +#!/usr/bin/env python3 +import argparse +import json +import os +import sys +from pathlib import Path + + +CONFIG_PATH = Path.home() / ".config" / "kef-control" / "config.json" +STATE_DIR = Path.home() / ".local" / "state" / "kef-control" +DEFAULT_UNMUTE_VOLUME = 30 +MODEL_ALIASES = { + "LS50W2": "LS50WII", + "LS50WII": "LS50WII", +} +SOURCE_ALIASES = { + "aux": "analog", + "optical": "optic", +} + + +def normalize_model(model): + if not model: + return None + return MODEL_ALIASES.get(model.upper(), model) + + +def normalize_source(source): + if not source: + return None + return SOURCE_ALIASES.get(source.lower(), source.lower()) + + +def load_json(path, default): + if not path.exists(): + return default + return json.loads(path.read_text()) + + +def save_json(path, data): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n") + + +def load_config(): + return load_json(CONFIG_PATH, {}) + + +def save_config(config): + save_json(CONFIG_PATH, config) + + +def state_path(host): + safe_host = host.replace("/", "_").replace(":", "_") + return STATE_DIR / f"{safe_host}.json" + + +def load_state(host): + return load_json(state_path(host), {}) + + +def save_state(host, state): + save_json(state_path(host), state) + + +def remember_volume(host, volume): + if volume <= 0: + return + state = load_state(host) + state["last_nonzero_volume"] = volume + save_state(host, state) + + +def restore_volume(host): + state = load_state(host) + volume = state.get("last_nonzero_volume", DEFAULT_UNMUTE_VOLUME) + return max(1, min(100, int(volume))) + + +def resolve_host(args, config, parser): + host = args.host or os.getenv("KEF_SPEAKER_IP") or config.get("host") + if host: + return host + parser.error( + "speaker host is required; pass --host, set KEF_SPEAKER_IP, or run " + "'kef-control config --host --model LS50WII'" + ) + + +def resolve_model(args, config): + return normalize_model( + args.model or os.getenv("KEF_SPEAKER_MODEL") or config.get("model") + ) + + +def connector_for(args, config, parser): + from pykefcontrol.kef_connector import KefConnector + + host = resolve_host(args, config, parser) + model = resolve_model(args, config) + return host, KefConnector(host, model=model) + + +def print_json(data): + print(json.dumps(data, indent=2, sort_keys=True)) + + +def handle_config(args, config): + should_save = args.clear or args.host_value is not None or args.model_value is not None + if args.clear: + config = {} + if args.host_value is not None: + config["host"] = args.host_value + if args.model_value is not None: + config["model"] = normalize_model(args.model_value) + if should_save: + save_config(config) + print_json(config) + + +def handle_status(connector, host): + status = { + "host": host, + "speaker_name": connector.speaker_name, + "speaker_model": connector.speaker_model, + "firmware_version": connector.firmware_version, + "power_status": connector.status, + "source": connector.source, + "volume": connector.volume, + "is_playing": connector.is_playing, + } + remember_volume(host, status["volume"]) + if status["is_playing"]: + status["song_info"] = connector.get_song_information() + status["song_length_ms"] = connector.song_length + status["song_position_ms"] = connector.song_status + codec_info = connector.get_audio_codec_information() + if codec_info: + status["audio_codec"] = codec_info + wifi_info = connector.get_wifi_information() + if wifi_info: + status["wifi"] = wifi_info + print_json(status) + + +def handle_source(connector, host, source): + if source is None: + print(connector.source) + return + connector.source = normalize_source(source) + print(connector.source) + remember_volume(host, connector.volume) + + +def handle_volume(connector, host, volume): + if volume is None: + print(connector.volume) + return + if not 0 <= volume <= 100: + raise ValueError("volume must be between 0 and 100") + connector.set_volume(volume) + if volume > 0: + remember_volume(host, volume) + print(connector.volume) + + +def build_parser(): + parser = argparse.ArgumentParser(description="Control KEF wireless speakers") + parser.add_argument("--host", help="Speaker IP or hostname") + parser.add_argument( + "--model", + help="Optional model override, e.g. LS50WII", + ) + + subparsers = parser.add_subparsers(dest="command", required=True) + + config_parser = subparsers.add_parser( + "config", + help="Show or update saved defaults in ~/.config/kef-control/config.json", + ) + config_parser.add_argument("--host", dest="host_value") + config_parser.add_argument("--model", dest="model_value") + config_parser.add_argument( + "--clear", + action="store_true", + help="Clear saved host and model", + ) + + subparsers.add_parser("status", help="Show current speaker status as JSON") + + power_parser = subparsers.add_parser("power", help="Turn speaker power on or off") + power_parser.add_argument("state", choices=["on", "off"]) + + source_parser = subparsers.add_parser( + "source", + help="Get or set source (wifi, bluetooth, tv, optic, coaxial, analog)", + ) + source_parser.add_argument("source", nargs="?") + + volume_parser = subparsers.add_parser("volume", help="Get or set volume") + volume_parser.add_argument("volume", nargs="?", type=int) + + subparsers.add_parser("mute", help="Set volume to 0 and remember the prior level") + subparsers.add_parser( + "unmute", + help="Restore the last remembered non-zero volume level", + ) + subparsers.add_parser("play-pause", help="Toggle play/pause") + subparsers.add_parser("next", help="Skip to the next track") + subparsers.add_parser("previous", help="Go to the previous track") + subparsers.add_parser("song-info", help="Show current song metadata as JSON") + subparsers.add_parser("codec-info", help="Show audio codec information as JSON") + subparsers.add_parser("wifi-info", help="Show current Wi-Fi information as JSON") + + return parser + + +def main(): + parser = build_parser() + args = parser.parse_args() + config = load_config() + + try: + if args.command == "config": + handle_config(args, config) + return 0 + + host, connector = connector_for(args, config, parser) + + if args.command == "status": + handle_status(connector, host) + elif args.command == "power": + if args.state == "on": + connector.power_on() + else: + connector.shutdown() + print(connector.status) + elif args.command == "source": + handle_source(connector, host, args.source) + elif args.command == "volume": + handle_volume(connector, host, args.volume) + elif args.command == "mute": + current_volume = connector.volume + remember_volume(host, current_volume) + connector.set_volume(0) + print(connector.volume) + elif args.command == "unmute": + connector.set_volume(restore_volume(host)) + print(connector.volume) + elif args.command == "play-pause": + connector.toggle_play_pause() + print("ok") + elif args.command == "next": + connector.next_track() + print("ok") + elif args.command == "previous": + connector.previous_track() + print("ok") + elif args.command == "song-info": + print_json( + { + "is_playing": connector.is_playing, + "song_info": connector.get_song_information(), + "song_length_ms": connector.song_length, + "song_position_ms": connector.song_status, + } + ) + elif args.command == "codec-info": + print_json(connector.get_audio_codec_information()) + elif args.command == "wifi-info": + print_json(connector.get_wifi_information()) + else: + parser.error(f"unsupported command: {args.command}") + except KeyboardInterrupt: + return 130 + except Exception as exc: + print(f"kef-control: {exc}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/nixos/overlay.nix b/nixos/overlay.nix index 3278fa6a..0d4d8bc7 100644 --- a/nixos/overlay.nix +++ b/nixos/overlay.nix @@ -318,6 +318,8 @@ from transformers import (/' \ }; }); + pykefcontrol = final.python3Packages.pykefcontrol; + python-with-my-packages = let my-python-packages = python-packages: with python-packages; [ @@ -329,6 +331,7 @@ from transformers import (/' \ numpy openpyxl pip + pykefcontrol requests ]; in @@ -337,6 +340,38 @@ from transformers import (/' \ pythonPackagesExtensions = prev.pythonPackagesExtensions ++ [ ( python-final: python-prev: { + pykefcontrol = python-prev.buildPythonPackage rec { + pname = "pykefcontrol"; + version = "0.9"; + pyproject = true; + + src = final.fetchFromGitHub { + owner = "N0ciple"; + repo = "pykefcontrol"; + rev = "530a7d15cf692c35a7c181c8f5e28edc0f1e085a"; + hash = "sha256-V/uYzzUv/PslfZ/zSSAK4j6kI9lLQOXBN1AG0rjRrpg="; + }; + + postPatch = '' + substituteInPlace pyproject.toml \ + --replace-fail 'version = "0.8"' 'version = "0.9"' + ''; + + build-system = with python-final; [ hatchling ]; + propagatedBuildInputs = with python-final; [ + aiohttp + requests + ]; + + pythonImportsCheck = [ "pykefcontrol" ]; + + meta = with final.lib; { + description = "Python library for controlling KEF LS50 Wireless II, LSX II, and LS60 speakers"; + homepage = "https://github.com/N0ciple/pykefcontrol"; + license = licenses.mit; + platforms = platforms.linux ++ platforms.darwin; + }; + }; pysilero-vad = python-prev.pysilero-vad.overridePythonAttrs (_: { src = final.fetchFromGitHub { owner = "colonelpanic8";