Add Roborock vacuum control

Add a python-roborock based CLI wrapper and package it for the NixOS system profile.
This commit is contained in:
2026-04-28 14:52:57 -07:00
parent 1696845579
commit 8e2128b8d4
4 changed files with 307 additions and 0 deletions

275
dotfiles/lib/bin/roborock-control Executable file
View File

@@ -0,0 +1,275 @@
#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
CONFIG_PATH = Path.home() / ".config" / "roborock-control" / "config.json"
PYTHON_ENV_EXPR = (
"with import <nixpkgs> {}; "
"python313.withPackages (ps: [ ps.python-roborock ps.pyyaml ps.pyshark ])"
)
COMMON_COMMANDS = {
"start": ("app_start", None),
"pause": ("app_pause", None),
"stop": ("app_stop", None),
"dock": ("app_charge", None),
"find": ("find_me", None),
"dust": ("app_start_collect_dust", None),
"stop-dust": ("app_stop_collect_dust", None),
"wash": ("app_start_wash", None),
"stop-wash": ("app_stop_wash", None),
}
def load_config():
if not CONFIG_PATH.exists():
return {}
return json.loads(CONFIG_PATH.read_text())
def save_config(config):
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps(config, indent=2, sort_keys=True) + "\n")
def roborock_args(*args):
if os.getenv("ROBOROCK_CONTROL_RUNNER") == "direct":
return ["roborock", *args]
return [
"nix",
"shell",
"--impure",
"--expr",
PYTHON_ENV_EXPR,
"--command",
"roborock",
*args,
]
def run_roborock(*args, capture=False, check=True):
kwargs = {
"text": True,
"check": check,
}
if capture:
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.PIPE
return subprocess.run(roborock_args(*args), **kwargs)
def configured_device_id(args, config):
return args.device_id or os.getenv("ROBOROCK_DEVICE_ID") or config.get("device_id")
def infer_single_device_id():
result = run_roborock("list-devices", capture=True)
devices = json.loads(result.stdout)
if len(devices) != 1:
names = ", ".join(sorted(devices)) or "none"
raise SystemExit(
"Could not infer a default device. "
f"Found {len(devices)} devices: {names}. "
"Pass --device-id or run 'roborock-control config --device-id <id>'."
)
return next(iter(devices.values()))
def get_device_id(args, config):
return configured_device_id(args, config) or infer_single_device_id()
def handle_config(args):
config = load_config()
changed = False
if args.clear:
config = {}
changed = True
if args.device_id is not None:
config["device_id"] = args.device_id
changed = True
if args.email is not None:
config["email"] = args.email
changed = True
if changed:
save_config(config)
print(json.dumps(config, indent=2, sort_keys=True))
def handle_login(args):
config = load_config()
email = args.email or os.getenv("ROBOROCK_EMAIL") or config.get("email")
if not email:
raise SystemExit(
"Email is required. Pass --email or run "
"'roborock-control config --email <address>'."
)
cli_args = ["login", "--email", email]
if args.reauth:
cli_args.append("--reauth")
if args.password_command:
password = subprocess.check_output(args.password_command, shell=True, text=True).strip()
cli_args.extend(["--password", password])
run_roborock(*cli_args)
def handle_devices(args):
if args.refresh:
run_roborock("discover")
run_roborock("list-devices")
def command_with_device(args, upstream_command):
config = load_config()
return [upstream_command, "--device_id", get_device_id(args, config)]
def handle_home(args):
cli_args = command_with_device(args, "home")
if args.refresh:
cli_args.append("--refresh")
run_roborock(*cli_args)
def handle_maps(args):
run_roborock(*command_with_device(args, "maps"))
def handle_rooms(args):
run_roborock(*command_with_device(args, "rooms"))
def handle_map_data(args):
cli_args = command_with_device(args, "map-data")
if args.include_path:
cli_args.append("--include_path")
run_roborock(*cli_args)
def handle_map_image(args):
run_roborock(
*command_with_device(args, "map-image"),
"--output-file",
args.output_file,
)
def handle_command(args):
config = load_config()
params = args.params
cli_args = [
"command",
"--device_id",
get_device_id(args, config),
"--cmd",
args.command_name,
]
if params is not None:
cli_args.extend(["--params", params])
run_roborock(*cli_args)
def handle_common_command(args):
command_name, params = COMMON_COMMANDS[args.action]
config = load_config()
cli_args = [
"command",
"--device_id",
get_device_id(args, config),
"--cmd",
command_name,
]
if params is not None:
cli_args.extend(["--params", params])
run_roborock(*cli_args)
def handle_status(args):
config = load_config()
run_roborock("status", "--device_id", get_device_id(args, config))
def handle_upstream(args):
run_roborock(*args.args)
def build_parser():
parser = argparse.ArgumentParser(
description="Control Roborock vacuums through python-roborock."
)
parser.add_argument(
"--device-id",
help="Roborock device id. Defaults to ROBOROCK_DEVICE_ID, saved config, or the only cached device.",
)
subparsers = parser.add_subparsers(dest="subcommand", required=True)
config_parser = subparsers.add_parser("config", help="Show or update saved defaults")
config_parser.add_argument("--device-id")
config_parser.add_argument("--email")
config_parser.add_argument("--clear", action="store_true")
config_parser.set_defaults(func=handle_config)
login_parser = subparsers.add_parser("login", help="Login with email code or password")
login_parser.add_argument("--email")
login_parser.add_argument("--reauth", action="store_true")
login_parser.add_argument(
"--password-command",
help="Shell command that prints the Roborock password, e.g. 'pass show path/to/login'.",
)
login_parser.set_defaults(func=handle_login)
devices_parser = subparsers.add_parser("devices", help="List cached devices")
devices_parser.add_argument("--refresh", action="store_true", help="Refresh discovery first")
devices_parser.set_defaults(func=handle_devices)
subparsers.add_parser("status", help="Show vacuum status").set_defaults(func=handle_status)
home_parser = subparsers.add_parser("home", help="Discover and cache home layout")
home_parser.add_argument("--refresh", action="store_true")
home_parser.set_defaults(func=handle_home)
subparsers.add_parser("maps", help="Show map metadata").set_defaults(func=handle_maps)
subparsers.add_parser("rooms", help="Show room metadata").set_defaults(func=handle_rooms)
map_data_parser = subparsers.add_parser("map-data", help="Show parsed map data as JSON")
map_data_parser.add_argument("--include-path", action="store_true")
map_data_parser.set_defaults(func=handle_map_data)
map_image_parser = subparsers.add_parser("map-image", help="Save the map image")
map_image_parser.add_argument("output_file")
map_image_parser.set_defaults(func=handle_map_image)
raw_parser = subparsers.add_parser("raw", help="Send a raw python-roborock command")
raw_parser.add_argument("command_name")
raw_parser.add_argument("params", nargs="?")
raw_parser.set_defaults(func=handle_command)
upstream_parser = subparsers.add_parser("cli", help="Pass arguments to the upstream CLI")
upstream_parser.add_argument("args", nargs=argparse.REMAINDER)
upstream_parser.set_defaults(func=handle_upstream)
for action in sorted(COMMON_COMMANDS):
subparsers.add_parser(action).set_defaults(func=handle_common_command, action=action)
return parser
def main():
parser = build_parser()
args = parser.parse_args()
try:
args.func(args)
except subprocess.CalledProcessError as error:
if error.stdout:
print(error.stdout, end="")
if error.stderr:
print(error.stderr, end="", file=sys.stderr)
raise SystemExit(error.returncode)
if __name__ == "__main__":
main()

View File

@@ -11,6 +11,7 @@ makeEnable config "myModules.extra" false {
gource gource
gimp gimp
kef kef
roborock-control
texlive.combined.scheme-full texlive.combined.scheme-full
tor tor
yt-dlp yt-dlp

View File

@@ -115,6 +115,7 @@
git-sync-rs = inputs.git-sync-rs.packages.${prev.stdenv.hostPlatform.system}.default; git-sync-rs = inputs.git-sync-rs.packages.${prev.stdenv.hostPlatform.system}.default;
kef = final.callPackage ./packages/kef {}; kef = final.callPackage ./packages/kef {};
pykefcontrol = final.python3Packages.callPackage ./packages/pykefcontrol {}; pykefcontrol = final.python3Packages.callPackage ./packages/pykefcontrol {};
roborock-control = final.callPackage ./packages/roborock-control {};
}) })
] ]
++ ( ++ (

View File

@@ -0,0 +1,30 @@
{
lib,
python3,
writeShellApplication,
}:
let
python = python3.withPackages (ps: [
ps.python-roborock
ps.pyshark
ps.pyyaml
]);
in
writeShellApplication {
name = "roborock-control";
runtimeInputs = [ python ];
text = ''
export ROBOROCK_CONTROL_RUNNER=direct
exec python ${../../../dotfiles/lib/bin/roborock-control} "$@"
'';
meta = {
description = "Command-line controller for Roborock vacuums";
license = lib.licenses.mit;
maintainers = with lib.maintainers; [ imalison ];
mainProgram = "roborock-control";
};
}