diff --git a/nixos/extra.nix b/nixos/extra.nix index ae2ff1de..92ce0e3b 100644 --- a/nixos/extra.nix +++ b/nixos/extra.nix @@ -10,6 +10,7 @@ makeEnable config "myModules.extra" false { signal-desktop gource gimp + kef texlive.combined.scheme-full tor yt-dlp diff --git a/nixos/nix.nix b/nixos/nix.nix index 4170fd3e..6c6e9dee 100644 --- a/nixos/nix.nix +++ b/nixos/nix.nix @@ -113,6 +113,8 @@ codex = inputs.codex-cli-nix.packages.${prev.stdenv.hostPlatform.system}.default; claude-code = inputs.claude-code-nix.packages.${prev.stdenv.hostPlatform.system}.default; git-sync-rs = inputs.git-sync-rs.packages.${prev.stdenv.hostPlatform.system}.default; + kef = final.callPackage ./packages/kef {}; + pykefcontrol = final.python3Packages.callPackage ./packages/pykefcontrol {}; }) ] ++ ( diff --git a/nixos/packages/kef/default.nix b/nixos/packages/kef/default.nix new file mode 100644 index 00000000..cd41548c --- /dev/null +++ b/nixos/packages/kef/default.nix @@ -0,0 +1,30 @@ +{ + lib, + python3, + python3Packages, + writeShellApplication, +}: + +let + pykefcontrol = python3Packages.callPackage ../pykefcontrol {}; + python = python3.withPackages (ps: [ + pykefcontrol + ps.zeroconf + ]); +in +writeShellApplication { + name = "kef"; + + runtimeInputs = [ python ]; + + text = '' + exec python ${./kef.py} "$@" + ''; + + meta = { + description = "Command-line controller for KEF W2 speakers"; + license = lib.licenses.mit; + maintainers = with lib.maintainers; [ imalison ]; + mainProgram = "kef"; + }; +} diff --git a/nixos/packages/kef/kef.py b/nixos/packages/kef/kef.py new file mode 100644 index 00000000..148fa760 --- /dev/null +++ b/nixos/packages/kef/kef.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +import argparse +import json +import os +import sys +import threading +import time + +import requests +from pykefcontrol.kef_connector import KefConnector +from zeroconf import IPVersion, ServiceBrowser, ServiceListener, Zeroconf + + +SOURCES = ("wifi", "bluetooth", "tv", "optic", "coaxial", "analog", "standby") +DISCOVERY_SERVICE_TYPES = ( + "_kef-info._tcp.local.", + "_sues800device._tcp.local.", + "_http._tcp.local.", +) + + +class KefDiscoveryListener(ServiceListener): + def __init__(self): + self._lock = threading.Lock() + self.speakers = {} + + def add_service(self, zeroconf, service_type, name): + self._record_service(zeroconf, service_type, name) + + def update_service(self, zeroconf, service_type, name): + self._record_service(zeroconf, service_type, name) + + def remove_service(self, zeroconf, service_type, name): + return None + + def _record_service(self, zeroconf, service_type, name): + info = zeroconf.get_service_info(service_type, name, timeout=1000) + if info is None: + return + + properties = { + key.decode("utf-8", errors="replace"): ( + value.decode("utf-8", errors="replace") if value is not None else "" + ) + for key, value in info.properties.items() + } + addresses = [ + address + for address in info.parsed_addresses(IPVersion.V4Only) + if not address.startswith("127.") + ] + if not addresses: + return + + is_kef = ( + service_type in ("_kef-info._tcp.local.", "_sues800device._tcp.local.") + or properties.get("manufacturer", "").lower() == "kef" + or "kef" in name.lower() + or "ls50" in name.lower() + ) + if not is_kef: + return + + key = addresses[0] + with self._lock: + existing = self.speakers.get(key, {}) + self.speakers[key] = { + **existing, + "address": addresses[0], + "hostname": (info.server.rstrip(".") if info.server else None) + or existing.get("hostname"), + "name": properties.get("name") + or properties.get("fn") + or existing.get("name") + or strip_service_name(name), + "model": properties.get("modelName") + or properties.get("model") + or properties.get("mn") + or existing.get("model"), + "version": properties.get("version") or existing.get("version"), + "service": existing.get("service") or service_type.removesuffix(".local."), + } + + +def strip_service_name(name): + for suffix in DISCOVERY_SERVICE_TYPES: + if name.endswith("." + suffix): + return name[: -(len(suffix) + 1)] + return name.rstrip(".") + + +def discover_speakers(timeout): + listener = KefDiscoveryListener() + zeroconf = Zeroconf(ip_version=IPVersion.V4Only) + try: + browsers = [ + ServiceBrowser(zeroconf, service_type, listener) + for service_type in DISCOVERY_SERVICE_TYPES + ] + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if listener.speakers: + time.sleep(0.25) + break + time.sleep(0.05) + return sorted( + listener.speakers.values(), + key=lambda speaker: ( + speaker.get("name") or "", + speaker.get("address") or "", + ), + ) + finally: + for browser in locals().get("browsers", []): + browser.cancel() + zeroconf.close() + + +def discover_host(timeout): + speakers = discover_speakers(timeout) + if not speakers: + raise SystemExit( + "Could not discover a KEF speaker. Pass --host or set KEF_HOST." + ) + if len(speakers) > 1: + choices = ", ".join( + f"{speaker.get('name') or 'KEF'} at {speaker['address']}" for speaker in speakers + ) + raise SystemExit(f"Multiple KEF speakers discovered: {choices}. Pass --host explicitly.") + return speakers[0]["address"] + + +def connector(args): + host = ( + args.host + or os.environ.get("KEF_HOST") + or os.environ.get("KEF_IP") + or discover_host(args.discovery_timeout) + ) + return KefConnector(host, model=args.model) + + +def print_status(speaker): + info = { + "name": speaker.speaker_name, + "model": speaker.speaker_model, + "firmware": speaker.firmware_version, + "status": speaker.status, + "source": speaker.source, + "volume": speaker.volume, + "playing": speaker.is_playing, + } + try: + song = speaker.get_song_information() + if any(song.values()): + info["song"] = song + except (KeyError, IndexError, TypeError, requests.RequestException): + pass + print(json.dumps(info, indent=2, sort_keys=True)) + + +def bounded_volume(value): + return max(0, min(100, value)) + + +def main(): + parser = argparse.ArgumentParser(prog="kef", description="Control KEF W2 speakers.") + parser.add_argument( + "--host", + help="Speaker IP address. Defaults to KEF_HOST/KEF_IP, then mDNS discovery.", + ) + parser.add_argument( + "--model", + default="LS50W2", + help="Speaker model passed to pykefcontrol. Default: LS50W2.", + ) + parser.add_argument( + "--discovery-timeout", + default=2.0, + type=float, + help="Seconds to wait for mDNS discovery when --host/KEF_HOST is unset.", + ) + subparsers = parser.add_subparsers(dest="command", required=True) + + subparsers.add_parser("discover", help="List discovered KEF speakers as JSON.") + subparsers.add_parser("status", help="Print speaker status as JSON.") + subparsers.add_parser("on", help="Power on.") + subparsers.add_parser("standby", help="Put the speaker in standby.") + subparsers.add_parser("mute", help="Mute by setting volume to 0.") + + volume = subparsers.add_parser("volume", help="Get or set volume.") + volume.add_argument("level", nargs="?", type=int, help="Volume level, 0-100.") + + up = subparsers.add_parser("up", help="Increase volume.") + up.add_argument("step", nargs="?", default=3, type=int) + + down = subparsers.add_parser("down", help="Decrease volume.") + down.add_argument("step", nargs="?", default=3, type=int) + + source = subparsers.add_parser("source", help="Get or set source.") + source.add_argument("name", nargs="?", choices=SOURCES) + + subparsers.add_parser("play-pause", help="Toggle play/pause.") + subparsers.add_parser("next", help="Next track.") + subparsers.add_parser("previous", help="Previous track.") + + args = parser.parse_args() + if args.command == "discover": + print(json.dumps(discover_speakers(args.discovery_timeout), indent=2, sort_keys=True)) + return 0 + + speaker = connector(args) + + try: + if args.command == "status": + print_status(speaker) + elif args.command == "on": + speaker.power_on() + elif args.command == "standby": + speaker.shutdown() + elif args.command == "mute": + speaker.volume = 0 + elif args.command == "volume": + if args.level is None: + print(speaker.volume) + else: + speaker.volume = bounded_volume(args.level) + elif args.command == "up": + speaker.volume = bounded_volume(speaker.volume + args.step) + elif args.command == "down": + speaker.volume = bounded_volume(speaker.volume - args.step) + elif args.command == "source": + if args.name is None: + print(speaker.source) + else: + speaker.source = args.name + elif args.command == "play-pause": + speaker.toggle_play_pause() + elif args.command == "next": + speaker.next_track() + elif args.command == "previous": + speaker.previous_track() + except requests.RequestException as error: + print(f"kef: failed to reach speaker: {error}", file=sys.stderr) + return 1 + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/nixos/packages/pykefcontrol/default.nix b/nixos/packages/pykefcontrol/default.nix new file mode 100644 index 00000000..df38921c --- /dev/null +++ b/nixos/packages/pykefcontrol/default.nix @@ -0,0 +1,37 @@ +{ + lib, + buildPythonPackage, + fetchPypi, + hatchling, + aiohttp, + requests, +}: + +buildPythonPackage rec { + pname = "pykefcontrol"; + version = "0.9.2"; + pyproject = true; + + src = fetchPypi { + inherit pname version; + hash = "sha256-3kGhN+E7driiE6ePyF0EZOEnUhTm07sxHCKdzrn/MxM="; + }; + + build-system = [ + hatchling + ]; + + dependencies = [ + aiohttp + requests + ]; + + pythonImportsCheck = [ "pykefcontrol" ]; + + meta = { + description = "Python library for controlling KEF LS50WII, LSX II, and LS60 speakers"; + homepage = "https://github.com/N0ciple/pykefcontrol"; + license = lib.licenses.mit; + maintainers = with lib.maintainers; [ imalison ]; + }; +}