fix: support POST requests in coqui-read

This commit is contained in:
2026-04-15 10:55:44 -07:00
committed by Kat Huang
parent ad449a3416
commit c4c2b1e8bb

View File

@@ -11,18 +11,26 @@ function coqui-read {
set -- --stdin-file "$stdin_file" set -- --stdin-file "$stdin_file"
fi fi
cat > "$script_file" <<'PY' cat > "$script_file" <<'PY'
import argparse import argparse
import json
import re import re
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import urllib.parse import urllib.parse
import urllib.request import urllib.request
from urllib.error import HTTPError, URLError
from pathlib import Path from pathlib import Path
DEFAULT_HOST = "http://[::1]:11115" DEFAULT_HOST = "http://[::1]:11115"
MAX_FILE_PATH_CHARS = 255
POST_FALLBACK_STATUS_CODES = {400, 404, 405}
class FallbackToGet(Exception):
pass
def split_sentences(text: str) -> list[str]: def split_sentences(text: str) -> list[str]:
@@ -66,22 +74,65 @@ def split_text(text: str, mode: str, max_chars: int) -> list[str]:
return chunks return chunks
def build_url(base_url: str, text: str, speaker: str | None, language: str | None) -> str: def build_payload(text: str, speaker: str | None, language: str | None) -> dict[str, str]:
params = {"text": text} payload: dict[str, str] = {"text": text}
if speaker: if speaker:
params["speaker_id"] = speaker payload["speaker_id"] = speaker
payload["speaker"] = speaker
if language: if language:
params["language_id"] = language payload["language_id"] = language
query = urllib.parse.urlencode(params) payload["language"] = language
return f"{base_url.rstrip('/')}/api/tts?{query}" return payload
def synthesize_chunk(base_url: str, text: str, speaker: str | None, language: str | None) -> Path: def post_tts(base_url: str, text: str, speaker: str | None, language: str | None) -> bytes:
request = urllib.request.Request(build_url(base_url, text, speaker, language)) body = json.dumps(build_payload(text, speaker, language), separators=(",", ":")).encode("utf-8")
request = urllib.request.Request(
f"{base_url.rstrip('/')}/api/tts",
data=body,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(request, timeout=300) as response: with urllib.request.urlopen(request, timeout=300) as response:
wav_data = response.read() return response.read()
except HTTPError as error:
detail = error.read().decode("utf-8", errors="replace")
if error.code in POST_FALLBACK_STATUS_CODES:
raise FallbackToGet()
if detail.strip() and detail.strip().startswith("{"):
try:
detail = json.dumps(json.loads(detail), indent=2)
except Exception:
pass
raise SystemExit(f"TTS request failed (HTTP {error.code}): {detail or error.reason}")
except URLError as error:
raise SystemExit(f"Could not connect to Coqui service at {base_url}: {error}")
temp_file = tempfile.NamedTemporaryFile(prefix="coqui-read-", suffix=".wav", delete=False)
def get_tts(base_url: str, text: str, speaker: str | None, language: str | None) -> bytes:
# Keep GET fallback for servers that do not support POST.
params = build_payload(text, speaker, language)
query = urllib.parse.urlencode(params)
request = urllib.request.Request(f"{base_url.rstrip('/')}/api/tts?{query}")
try:
with urllib.request.urlopen(request, timeout=300) as response:
return response.read()
except HTTPError as error:
detail = error.read().decode("utf-8", errors="replace")
raise SystemExit(f"TTS request failed (HTTP {error.code}): {detail or error.reason}")
except URLError as error:
raise SystemExit(f"Could not connect to Coqui service at {base_url}: {error}")
def synthesize_chunk(base_url: str, text: str, speaker: str | None, language: str | None, chunk_index: int) -> Path:
try:
wav_data = post_tts(base_url, text, speaker, language)
except FallbackToGet:
# Older Coqui server versions tend to require query-string inputs.
wav_data = get_tts(base_url, text, speaker, language)
temp_file = tempfile.NamedTemporaryFile(prefix=f"coqui-read-{chunk_index}-", suffix=".wav", delete=False)
temp_file.write(wav_data) temp_file.write(wav_data)
temp_file.close() temp_file.close()
return Path(temp_file.name) return Path(temp_file.name)
@@ -95,18 +146,23 @@ def play_file(path: Path, player: str) -> None:
subprocess.run(cmd, check=True) subprocess.run(cmd, check=True)
def read_input(inputs: list[str]) -> str: def read_input(path: str | None) -> str:
if inputs: if path:
if len(inputs) == 1 and Path(inputs[0]).exists(): if len(path) > MAX_FILE_PATH_CHARS:
return Path(inputs[0]).read_text() raise SystemExit("Path too long. Pass text via stdin instead of an argument.")
return " ".join(inputs) if "\n" in path or "\r" in path:
raise SystemExit("Path contains newline characters. Pass text via stdin instead of an argument.")
if not Path(path).is_file():
raise SystemExit(f"No such file: {path!r}")
return Path(path).read_text(encoding="utf-8", errors="replace")
return sys.stdin.read() return sys.stdin.read()
def main() -> int: def main() -> int:
parser = argparse.ArgumentParser(description="Read text incrementally through the local Coqui TTS service.") parser = argparse.ArgumentParser(description="Read text incrementally through the local Coqui TTS service.")
parser.add_argument("--stdin-file", default=None, help=argparse.SUPPRESS) parser.add_argument("--stdin-file", default=None, help=argparse.SUPPRESS)
parser.add_argument("inputs", nargs="*", help="Text to speak, or a single text-file path. Reads stdin when omitted.") parser.add_argument("--file", dest="file_path", default=None, help="Read text from a file path.")
parser.add_argument("path", nargs="?", help="Optional file path. Text from stdin is used when omitted.")
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Coqui server base URL. Default: {DEFAULT_HOST}") parser.add_argument("--host", default=DEFAULT_HOST, help=f"Coqui server base URL. Default: {DEFAULT_HOST}")
parser.add_argument("--speaker", default=None, help="Optional speaker_id value.") parser.add_argument("--speaker", default=None, help="Optional speaker_id value.")
parser.add_argument("--language", default=None, help="Optional language_id value.") parser.add_argument("--language", default=None, help="Optional language_id value.")
@@ -129,10 +185,17 @@ def main() -> int:
) )
args = parser.parse_args() args = parser.parse_args()
if args.file_path and args.path:
parser.error("Pass either --file or a positional path, not both.")
if args.player == "none" and not args.keep:
print("--player none implies --keep; preserving synthesized wav files.", file=sys.stderr)
args.keep = True
if args.stdin_file: if args.stdin_file:
text = Path(args.stdin_file).read_text() text = Path(args.stdin_file).read_text(encoding="utf-8", errors="replace")
else: else:
text = read_input(args.inputs) text = read_input(args.file_path or args.path)
chunks = split_text(text, args.chunk_mode, args.max_chars) chunks = split_text(text, args.chunk_mode, args.max_chars)
if not chunks: if not chunks:
print("No text to synthesize.", file=sys.stderr) print("No text to synthesize.", file=sys.stderr)
@@ -142,7 +205,7 @@ def main() -> int:
try: try:
for index, chunk in enumerate(chunks, start=1): for index, chunk in enumerate(chunks, start=1):
print(f"[{index}/{len(chunks)}] Synthesizing {len(chunk)} chars...", file=sys.stderr) print(f"[{index}/{len(chunks)}] Synthesizing {len(chunk)} chars...", file=sys.stderr)
wav_path = synthesize_chunk(args.host, chunk, args.speaker, args.language) wav_path = synthesize_chunk(args.host, chunk, args.speaker, args.language, index)
created_files.append(wav_path) created_files.append(wav_path)
print(wav_path) print(wav_path)
if args.player != "none": if args.player != "none":