From 18c8e0324f80e3c4405f3505637767f33f0f168a Mon Sep 17 00:00:00 2001 From: Ivan Malison Date: Tue, 14 Apr 2026 00:45:26 -0700 Subject: [PATCH] dotfiles: add coqui-read helper --- dotfiles/lib/functions/coqui-read | 171 ++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100755 dotfiles/lib/functions/coqui-read diff --git a/dotfiles/lib/functions/coqui-read b/dotfiles/lib/functions/coqui-read new file mode 100755 index 00000000..5caa72f2 --- /dev/null +++ b/dotfiles/lib/functions/coqui-read @@ -0,0 +1,171 @@ +#!/usr/bin/env bash + +function coqui-read { + local script_file stdin_file + script_file="$(mktemp)" + stdin_file="" + + if [[ "$#" -eq 0 && ! -t 0 ]]; then + stdin_file="$(mktemp)" + cat > "$stdin_file" + set -- --stdin-file "$stdin_file" + fi + + cat > "$script_file" <<'PY' +import argparse +import re +import subprocess +import sys +import tempfile +import urllib.parse +import urllib.request +from pathlib import Path + + +DEFAULT_HOST = "http://[::1]:11115" + + +def split_sentences(text: str) -> list[str]: + parts = re.split(r"(?<=[.!?])\s+", text.strip()) + return [part.strip() for part in parts if part.strip()] + + +def split_text(text: str, mode: str, max_chars: int) -> list[str]: + normalized = re.sub(r"\r\n?", "\n", text).strip() + if not normalized: + return [] + + if mode == "sentences": + units = split_sentences(normalized) + else: + units = [chunk.strip() for chunk in re.split(r"\n\s*\n+", normalized) if chunk.strip()] + + chunks: list[str] = [] + for unit in units: + if len(unit) <= max_chars: + chunks.append(unit) + continue + + sentences = split_sentences(unit) + if len(sentences) <= 1: + chunks.append(unit) + continue + + current = "" + for sentence in sentences: + candidate = sentence if not current else f"{current} {sentence}" + if len(candidate) <= max_chars: + current = candidate + else: + if current: + chunks.append(current) + current = sentence + if current: + chunks.append(current) + + return chunks + + +def build_url(base_url: str, text: str, speaker: str | None, language: str | None) -> str: + params = {"text": text} + if speaker: + params["speaker_id"] = speaker + if language: + params["language_id"] = language + query = urllib.parse.urlencode(params) + return f"{base_url.rstrip('/')}/api/tts?{query}" + + +def synthesize_chunk(base_url: str, text: str, speaker: str | None, language: str | None) -> Path: + request = urllib.request.Request(build_url(base_url, text, speaker, language)) + with urllib.request.urlopen(request, timeout=300) as response: + wav_data = response.read() + + temp_file = tempfile.NamedTemporaryFile(prefix="coqui-read-", suffix=".wav", delete=False) + temp_file.write(wav_data) + temp_file.close() + return Path(temp_file.name) + + +def play_file(path: Path, player: str) -> None: + if player == "ffplay": + cmd = [player, "-nodisp", "-autoexit", "-loglevel", "warning", str(path)] + else: + cmd = [player, str(path)] + subprocess.run(cmd, check=True) + + +def read_input(inputs: list[str]) -> str: + if inputs: + if len(inputs) == 1 and Path(inputs[0]).exists(): + return Path(inputs[0]).read_text() + return " ".join(inputs) + return sys.stdin.read() + + +def main() -> int: + parser = argparse.ArgumentParser(description="Read text incrementally through the local Coqui TTS service.") + parser.add_argument("--stdin-file", default=None, help=argparse.SUPPRESS) + parser.add_argument("inputs", nargs="*", help="Text to speak, or a single text-file path. Reads stdin when omitted.") + parser.add_argument("--host", default=DEFAULT_HOST, help=f"Coqui server base URL. Default: {DEFAULT_HOST}") + parser.add_argument("--speaker", default=None, help="Optional speaker_id value.") + parser.add_argument("--language", default=None, help="Optional language_id value.") + parser.add_argument( + "--chunk-mode", + choices=["paragraphs", "sentences"], + default="paragraphs", + help="Chunking strategy before synthesis.", + ) + parser.add_argument("--max-chars", type=int, default=700, help="Maximum characters per synthesized chunk.") + parser.add_argument( + "--player", + default="ffplay", + help="Playback command. Use 'none' to only synthesize and print wav paths.", + ) + parser.add_argument( + "--keep", + action="store_true", + help="Keep generated wav files on disk instead of deleting them after playback.", + ) + args = parser.parse_args() + + if args.stdin_file: + text = Path(args.stdin_file).read_text() + else: + text = read_input(args.inputs) + chunks = split_text(text, args.chunk_mode, args.max_chars) + if not chunks: + print("No text to synthesize.", file=sys.stderr) + return 1 + + created_files: list[Path] = [] + try: + for index, chunk in enumerate(chunks, start=1): + print(f"[{index}/{len(chunks)}] Synthesizing {len(chunk)} chars...", file=sys.stderr) + wav_path = synthesize_chunk(args.host, chunk, args.speaker, args.language) + created_files.append(wav_path) + print(wav_path) + if args.player != "none": + play_file(wav_path, args.player) + finally: + if not args.keep: + for wav_path in created_files: + wav_path.unlink(missing_ok=True) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) +PY + + python3 "$script_file" "$@" + local exit_code=$? + rm -f "$script_file" + if [[ -n "$stdin_file" ]]; then + rm -f "$stdin_file" + fi + return "$exit_code" +} + +coqui-read "$@"