dotfiles: add coqui-read helper
This commit is contained in:
171
dotfiles/lib/functions/coqui-read
Executable file
171
dotfiles/lib/functions/coqui-read
Executable file
@@ -0,0 +1,171 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
function coqui-read {
|
||||
local script_file stdin_file
|
||||
script_file="$(mktemp)"
|
||||
stdin_file=""
|
||||
|
||||
if [[ "$#" -eq 0 && ! -t 0 ]]; then
|
||||
stdin_file="$(mktemp)"
|
||||
cat > "$stdin_file"
|
||||
set -- --stdin-file "$stdin_file"
|
||||
fi
|
||||
|
||||
cat > "$script_file" <<'PY'
|
||||
import argparse
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
DEFAULT_HOST = "http://[::1]:11115"
|
||||
|
||||
|
||||
def split_sentences(text: str) -> list[str]:
|
||||
parts = re.split(r"(?<=[.!?])\s+", text.strip())
|
||||
return [part.strip() for part in parts if part.strip()]
|
||||
|
||||
|
||||
def split_text(text: str, mode: str, max_chars: int) -> list[str]:
|
||||
normalized = re.sub(r"\r\n?", "\n", text).strip()
|
||||
if not normalized:
|
||||
return []
|
||||
|
||||
if mode == "sentences":
|
||||
units = split_sentences(normalized)
|
||||
else:
|
||||
units = [chunk.strip() for chunk in re.split(r"\n\s*\n+", normalized) if chunk.strip()]
|
||||
|
||||
chunks: list[str] = []
|
||||
for unit in units:
|
||||
if len(unit) <= max_chars:
|
||||
chunks.append(unit)
|
||||
continue
|
||||
|
||||
sentences = split_sentences(unit)
|
||||
if len(sentences) <= 1:
|
||||
chunks.append(unit)
|
||||
continue
|
||||
|
||||
current = ""
|
||||
for sentence in sentences:
|
||||
candidate = sentence if not current else f"{current} {sentence}"
|
||||
if len(candidate) <= max_chars:
|
||||
current = candidate
|
||||
else:
|
||||
if current:
|
||||
chunks.append(current)
|
||||
current = sentence
|
||||
if current:
|
||||
chunks.append(current)
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def build_url(base_url: str, text: str, speaker: str | None, language: str | None) -> str:
|
||||
params = {"text": text}
|
||||
if speaker:
|
||||
params["speaker_id"] = speaker
|
||||
if language:
|
||||
params["language_id"] = language
|
||||
query = urllib.parse.urlencode(params)
|
||||
return f"{base_url.rstrip('/')}/api/tts?{query}"
|
||||
|
||||
|
||||
def synthesize_chunk(base_url: str, text: str, speaker: str | None, language: str | None) -> Path:
|
||||
request = urllib.request.Request(build_url(base_url, text, speaker, language))
|
||||
with urllib.request.urlopen(request, timeout=300) as response:
|
||||
wav_data = response.read()
|
||||
|
||||
temp_file = tempfile.NamedTemporaryFile(prefix="coqui-read-", suffix=".wav", delete=False)
|
||||
temp_file.write(wav_data)
|
||||
temp_file.close()
|
||||
return Path(temp_file.name)
|
||||
|
||||
|
||||
def play_file(path: Path, player: str) -> None:
|
||||
if player == "ffplay":
|
||||
cmd = [player, "-nodisp", "-autoexit", "-loglevel", "warning", str(path)]
|
||||
else:
|
||||
cmd = [player, str(path)]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
|
||||
def read_input(inputs: list[str]) -> str:
|
||||
if inputs:
|
||||
if len(inputs) == 1 and Path(inputs[0]).exists():
|
||||
return Path(inputs[0]).read_text()
|
||||
return " ".join(inputs)
|
||||
return sys.stdin.read()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Read text incrementally through the local Coqui TTS service.")
|
||||
parser.add_argument("--stdin-file", default=None, help=argparse.SUPPRESS)
|
||||
parser.add_argument("inputs", nargs="*", help="Text to speak, or a single text-file path. Reads stdin when omitted.")
|
||||
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Coqui server base URL. Default: {DEFAULT_HOST}")
|
||||
parser.add_argument("--speaker", default=None, help="Optional speaker_id value.")
|
||||
parser.add_argument("--language", default=None, help="Optional language_id value.")
|
||||
parser.add_argument(
|
||||
"--chunk-mode",
|
||||
choices=["paragraphs", "sentences"],
|
||||
default="paragraphs",
|
||||
help="Chunking strategy before synthesis.",
|
||||
)
|
||||
parser.add_argument("--max-chars", type=int, default=700, help="Maximum characters per synthesized chunk.")
|
||||
parser.add_argument(
|
||||
"--player",
|
||||
default="ffplay",
|
||||
help="Playback command. Use 'none' to only synthesize and print wav paths.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--keep",
|
||||
action="store_true",
|
||||
help="Keep generated wav files on disk instead of deleting them after playback.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.stdin_file:
|
||||
text = Path(args.stdin_file).read_text()
|
||||
else:
|
||||
text = read_input(args.inputs)
|
||||
chunks = split_text(text, args.chunk_mode, args.max_chars)
|
||||
if not chunks:
|
||||
print("No text to synthesize.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
created_files: list[Path] = []
|
||||
try:
|
||||
for index, chunk in enumerate(chunks, start=1):
|
||||
print(f"[{index}/{len(chunks)}] Synthesizing {len(chunk)} chars...", file=sys.stderr)
|
||||
wav_path = synthesize_chunk(args.host, chunk, args.speaker, args.language)
|
||||
created_files.append(wav_path)
|
||||
print(wav_path)
|
||||
if args.player != "none":
|
||||
play_file(wav_path, args.player)
|
||||
finally:
|
||||
if not args.keep:
|
||||
for wav_path in created_files:
|
||||
wav_path.unlink(missing_ok=True)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
PY
|
||||
|
||||
python3 "$script_file" "$@"
|
||||
local exit_code=$?
|
||||
rm -f "$script_file"
|
||||
if [[ -n "$stdin_file" ]]; then
|
||||
rm -f "$stdin_file"
|
||||
fi
|
||||
return "$exit_code"
|
||||
}
|
||||
|
||||
coqui-read "$@"
|
||||
Reference in New Issue
Block a user