# app/audio/generate.py from __future__ import annotations import platform import shutil import subprocess import wave from pathlib import Path from loguru import logger def generate_rep_audio_files( *, max_count: int, rate: int, output_dir: Path, overwrite: bool = False, trim_leading_silence: bool = True, trim_silence_threshold: int = 500, trim_silence_padding_ms: int = 20, ) -> None: """ 确保 0~max_count 的运动次数语音 wav 文件存在。 默认生成到: resources/audio/reps/0.aiff # macOS resources/audio/reps/0.wav # Windows / Linux ... resources/audio/reps/200.aiff 或 200.wav 服务启动时调用一次即可。 """ output_dir.mkdir(parents=True, exist_ok=True) system = platform.system().lower() suffix = ".aiff" if system == "darwin" else ".wav" missing_counts = [ count for count in range(0, max_count + 1) if overwrite or not _audio_path(output_dir, count, suffix=suffix).exists() ] if not missing_counts: logger.info("Rep audio files already prepared: {}", output_dir) else: logger.info( "Preparing rep audio files, system={}, count={}, output_dir={}", system, len(missing_counts), output_dir, ) if system == "darwin": _generate_with_macos_say( counts=missing_counts, output_dir=output_dir, rate=rate, ) else: _generate_with_pyttsx3( counts=missing_counts, output_dir=output_dir, rate=rate, ) logger.info("Rep audio files prepared: {}", output_dir) if trim_leading_silence and suffix == ".wav": _trim_leading_silence_files( counts=list(range(0, max_count + 1)), output_dir=output_dir, suffix=suffix, threshold=trim_silence_threshold, padding_ms=trim_silence_padding_ms, ) def _generate_with_macos_say( *, counts: list[int], output_dir: Path, rate: int, ) -> None: """macOS 使用 say 命令生成 wav。""" if platform.system().lower() != "darwin": raise RuntimeError("say command is only available on macOS") if shutil.which("say") is None: raise RuntimeError("macOS say command not found") for count in counts: audio_file = _audio_path(output_dir, count, suffix=".aiff") try: subprocess.run( [ "say", "-r", str(rate), "--file-format=AIFF", "-o", str(audio_file), str(count), ], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, check=True, ) except subprocess.CalledProcessError as exc: message = exc.stderr.strip() or f"exit status {exc.returncode}" raise RuntimeError(f"Failed to generate {audio_file}: {message}") from exc def _generate_with_pyttsx3( *, counts: list[int], output_dir: Path, rate: int, ) -> None: """Windows / Linux 使用 pyttsx3 生成 wav。""" try: import pyttsx3 except Exception as exc: raise RuntimeError(f"pyttsx3 unavailable: {exc}") from exc engine = pyttsx3.init() engine.setProperty("rate", rate) engine.setProperty("volume", 1.0) for count in counts: audio_file = _audio_path(output_dir, count, suffix=".wav") engine.save_to_file(str(count), str(audio_file)) engine.runAndWait() def _audio_path(output_dir: Path, count: int, *, suffix: str) -> Path: return output_dir / f"{count}{suffix}" def _trim_leading_silence_files( *, counts: list[int], output_dir: Path, suffix: str, threshold: int, padding_ms: int, ) -> None: trimmed = 0 total_removed_ms = 0.0 for count in counts: audio_file = _audio_path(output_dir, count, suffix=suffix) if not audio_file.exists(): continue removed_ms = _trim_leading_silence(audio_file, threshold=threshold, padding_ms=padding_ms) if removed_ms > 0: trimmed += 1 total_removed_ms += removed_ms logger.info( "Rep audio leading silence trim complete: files_trimmed={}, total_removed_ms={:.1f}, threshold={}, padding_ms={}", trimmed, total_removed_ms, threshold, padding_ms, ) def _trim_leading_silence(audio_file: Path, *, threshold: int, padding_ms: int) -> float: with wave.open(str(audio_file), "rb") as reader: params = reader.getparams() frames = reader.readframes(params.nframes) frame_size = params.sampwidth * params.nchannels if params.nframes <= 0 or frame_size <= 0: return 0.0 chunk_frames = max(1, params.framerate // 100) leading_frames = 0 offset = 0 chunk_size = chunk_frames * frame_size while offset < len(frames): chunk = frames[offset : offset + chunk_size] if _pcm_rms(chunk, params.sampwidth) > threshold: break chunk_frame_count = len(chunk) // frame_size leading_frames += chunk_frame_count offset += chunk_size padding_frames = int(params.framerate * max(0, padding_ms) / 1000) remove_frames = max(0, leading_frames - padding_frames) if remove_frames <= 0: return 0.0 start = min(len(frames), remove_frames * frame_size) trimmed_frames = frames[start:] if not trimmed_frames: return 0.0 with wave.open(str(audio_file), "wb") as writer: writer.setparams(params) writer.writeframes(trimmed_frames) return remove_frames / params.framerate * 1000 def _pcm_rms(chunk: bytes, sample_width: int) -> float: if not chunk: return 0.0 if sample_width == 2: sample_count = len(chunk) // 2 if sample_count == 0: return 0.0 total = 0 for i in range(0, sample_count * 2, 2): sample = int.from_bytes(chunk[i : i + 2], "little", signed=True) total += sample * sample return (total / sample_count) ** 0.5 peak = max(abs(byte - 128) for byte in chunk) return float(peak)