From c4c2b1e8bbdca20da9fb517579350caeb04d1095 Mon Sep 17 00:00:00 2001 From: Ivan Malison Date: Wed, 15 Apr 2026 10:55:44 -0700 Subject: [PATCH] fix: support POST requests in coqui-read --- dotfiles/lib/functions/coqui-read | 103 ++++++++++++++++++++++++------ 1 file changed, 83 insertions(+), 20 deletions(-) diff --git a/dotfiles/lib/functions/coqui-read b/dotfiles/lib/functions/coqui-read index 5caa72f2..a38f41e8 100755 --- a/dotfiles/lib/functions/coqui-read +++ b/dotfiles/lib/functions/coqui-read @@ -11,18 +11,26 @@ function coqui-read { set -- --stdin-file "$stdin_file" fi - cat > "$script_file" <<'PY' +cat > "$script_file" <<'PY' import argparse +import json import re import subprocess import sys import tempfile import urllib.parse import urllib.request +from urllib.error import HTTPError, URLError from pathlib import Path DEFAULT_HOST = "http://[::1]:11115" +MAX_FILE_PATH_CHARS = 255 +POST_FALLBACK_STATUS_CODES = {400, 404, 405} + + +class FallbackToGet(Exception): + pass def split_sentences(text: str) -> list[str]: @@ -66,22 +74,65 @@ def split_text(text: str, mode: str, max_chars: int) -> list[str]: return chunks -def build_url(base_url: str, text: str, speaker: str | None, language: str | None) -> str: - params = {"text": text} +def build_payload(text: str, speaker: str | None, language: str | None) -> dict[str, str]: + payload: dict[str, str] = {"text": text} if speaker: - params["speaker_id"] = speaker + payload["speaker_id"] = speaker + payload["speaker"] = speaker if language: - params["language_id"] = language + payload["language_id"] = language + payload["language"] = language + return payload + + +def post_tts(base_url: str, text: str, speaker: str | None, language: str | None) -> bytes: + body = json.dumps(build_payload(text, speaker, language), separators=(",", ":")).encode("utf-8") + request = urllib.request.Request( + f"{base_url.rstrip('/')}/api/tts", + data=body, + method="POST", + headers={"Content-Type": "application/json"}, + ) + try: + with urllib.request.urlopen(request, timeout=300) as response: + return response.read() + except HTTPError as error: + detail = error.read().decode("utf-8", errors="replace") + if error.code in POST_FALLBACK_STATUS_CODES: + raise FallbackToGet() + if detail.strip() and detail.strip().startswith("{"): + try: + detail = json.dumps(json.loads(detail), indent=2) + except Exception: + pass + raise SystemExit(f"TTS request failed (HTTP {error.code}): {detail or error.reason}") + except URLError as error: + raise SystemExit(f"Could not connect to Coqui service at {base_url}: {error}") + + +def get_tts(base_url: str, text: str, speaker: str | None, language: str | None) -> bytes: + # Keep GET fallback for servers that do not support POST. + params = build_payload(text, speaker, language) query = urllib.parse.urlencode(params) - return f"{base_url.rstrip('/')}/api/tts?{query}" + request = urllib.request.Request(f"{base_url.rstrip('/')}/api/tts?{query}") + try: + with urllib.request.urlopen(request, timeout=300) as response: + return response.read() + except HTTPError as error: + detail = error.read().decode("utf-8", errors="replace") + raise SystemExit(f"TTS request failed (HTTP {error.code}): {detail or error.reason}") + except URLError as error: + raise SystemExit(f"Could not connect to Coqui service at {base_url}: {error}") -def synthesize_chunk(base_url: str, text: str, speaker: str | None, language: str | None) -> Path: - request = urllib.request.Request(build_url(base_url, text, speaker, language)) - with urllib.request.urlopen(request, timeout=300) as response: - wav_data = response.read() +def synthesize_chunk(base_url: str, text: str, speaker: str | None, language: str | None, chunk_index: int) -> Path: + try: + wav_data = post_tts(base_url, text, speaker, language) + except FallbackToGet: + # Older Coqui server versions tend to require query-string inputs. + wav_data = get_tts(base_url, text, speaker, language) - temp_file = tempfile.NamedTemporaryFile(prefix="coqui-read-", suffix=".wav", delete=False) + temp_file = tempfile.NamedTemporaryFile(prefix=f"coqui-read-{chunk_index}-", suffix=".wav", delete=False) temp_file.write(wav_data) temp_file.close() return Path(temp_file.name) @@ -95,18 +146,23 @@ def play_file(path: Path, player: str) -> None: subprocess.run(cmd, check=True) -def read_input(inputs: list[str]) -> str: - if inputs: - if len(inputs) == 1 and Path(inputs[0]).exists(): - return Path(inputs[0]).read_text() - return " ".join(inputs) +def read_input(path: str | None) -> str: + if path: + if len(path) > MAX_FILE_PATH_CHARS: + raise SystemExit("Path too long. Pass text via stdin instead of an argument.") + if "\n" in path or "\r" in path: + raise SystemExit("Path contains newline characters. Pass text via stdin instead of an argument.") + if not Path(path).is_file(): + raise SystemExit(f"No such file: {path!r}") + return Path(path).read_text(encoding="utf-8", errors="replace") return sys.stdin.read() def main() -> int: parser = argparse.ArgumentParser(description="Read text incrementally through the local Coqui TTS service.") parser.add_argument("--stdin-file", default=None, help=argparse.SUPPRESS) - parser.add_argument("inputs", nargs="*", help="Text to speak, or a single text-file path. Reads stdin when omitted.") + parser.add_argument("--file", dest="file_path", default=None, help="Read text from a file path.") + parser.add_argument("path", nargs="?", help="Optional file path. Text from stdin is used when omitted.") parser.add_argument("--host", default=DEFAULT_HOST, help=f"Coqui server base URL. Default: {DEFAULT_HOST}") parser.add_argument("--speaker", default=None, help="Optional speaker_id value.") parser.add_argument("--language", default=None, help="Optional language_id value.") @@ -129,10 +185,17 @@ def main() -> int: ) args = parser.parse_args() + if args.file_path and args.path: + parser.error("Pass either --file or a positional path, not both.") + + if args.player == "none" and not args.keep: + print("--player none implies --keep; preserving synthesized wav files.", file=sys.stderr) + args.keep = True + if args.stdin_file: - text = Path(args.stdin_file).read_text() + text = Path(args.stdin_file).read_text(encoding="utf-8", errors="replace") else: - text = read_input(args.inputs) + text = read_input(args.file_path or args.path) chunks = split_text(text, args.chunk_mode, args.max_chars) if not chunks: print("No text to synthesize.", file=sys.stderr) @@ -142,7 +205,7 @@ def main() -> int: try: for index, chunk in enumerate(chunks, start=1): print(f"[{index}/{len(chunks)}] Synthesizing {len(chunk)} chars...", file=sys.stderr) - wav_path = synthesize_chunk(args.host, chunk, args.speaker, args.language) + wav_path = synthesize_chunk(args.host, chunk, args.speaker, args.language, index) created_files.append(wav_path) print(wav_path) if args.player != "none":