from __future__ import annotations import os import queue import shutil import subprocess import sys import threading from pathlib import Path from typing import Any from loguru import logger class RepAnnouncer: """运动次数语音播报器:预生成 0~200 音频文件,运行时直接播放""" def __init__( self, *, enabled: bool = True, rate: int = 185, volume: float = 1.0, max_count: int = 200, cache_dir: str | Path = "runtime/tts_cache/reps", ) -> None: self.enabled = enabled self.rate = rate self.volume = volume self.max_count = max_count self.cache_dir = Path(cache_dir) self._queue: queue.Queue[int | None] = queue.Queue() self._thread: threading.Thread | None = None self._engine: Any | None = None self._current_process: subprocess.Popen | None = None self._closed = False self._use_macos_say = sys.platform == "darwin" self._use_windows_winsound = sys.platform.startswith("win") if self.enabled: self._start() def announce_count(self, count: int) -> None: """将次数放入队列,后台线程播放对应音频""" if not self.enabled or self._closed: return if count <= 0 or count > self.max_count: return # 保留“只播最新一次”的策略,避免语音堆积 self._clear_pending_counts() self._queue.put(count) def close(self) -> None: """停止播报线程并释放资源""" if not self.enabled or self._closed: return self._closed = True self._queue.put(None) if self._thread is not None: self._thread.join(timeout=1.0) self._stop_current_playback() logger.info("Rep announcer closed") def _start(self) -> None: """初始化并预生成语音缓存""" self.cache_dir.mkdir(parents=True, exist_ok=True) try: self._prepare_audio_cache() except Exception as exc: self.enabled = False logger.warning("Rep announcer disabled, failed to prepare audio cache: {}", exc) return self._thread = threading.Thread( target=self._run, name="RepAnnouncer", daemon=True, ) self._thread.start() logger.info( "Rep announcer initialized with audio cache, platform={}, max_count={}, cache_dir={}", sys.platform, self.max_count, self.cache_dir, ) def _prepare_audio_cache(self) -> None: """生成 0~max_count 的语音文件,只生成缺失文件""" if self._use_macos_say: self._prepare_macos_say_cache() else: self._prepare_pyttsx3_cache() def _prepare_macos_say_cache(self) -> None: """macOS: 使用 say 预生成 aiff 文件""" if shutil.which("say") is None: raise RuntimeError("macOS say command not found") for count in range(0, self.max_count + 1): audio_file = self._audio_path(count) if audio_file.exists(): continue subprocess.run( [ "say", "-r", str(self.rate), "-o", str(audio_file), str(count), ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, ) logger.info("macOS say audio cache prepared") def _prepare_pyttsx3_cache(self) -> None: """非 macOS: 使用 pyttsx3 预生成 wav 文件""" try: import pyttsx3 except Exception as exc: raise RuntimeError(f"pyttsx3 unavailable: {exc}") from exc self._engine = pyttsx3.init() self._engine.setProperty("rate", self.rate) self._engine.setProperty("volume", self.volume) need_generate = False for count in range(0, self.max_count + 1): if not self._audio_path(count).exists(): need_generate = True self._engine.save_to_file(str(count), str(self._audio_path(count))) if need_generate: self._engine.runAndWait() logger.info("pyttsx3 audio cache prepared") def _run(self) -> None: """后台线程:只负责播放已经生成好的音频文件""" while True: count = self._queue.get() if count is None: return try: audio_file = self._audio_path(count) if not audio_file.exists(): logger.warning("Rep audio file missing: {}", audio_file) continue self._play_audio(audio_file) except Exception as exc: logger.warning("Failed to play rep count {}: {}", count, exc) def _play_audio(self, audio_file: Path) -> None: """根据平台播放音频""" self._stop_current_playback() if self._use_macos_say: self._current_process = subprocess.Popen( ["afplay", str(audio_file)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return if self._use_windows_winsound: import winsound # SND_ASYNC 表示异步播放;PURGE 会被 _stop_current_playback 调用中断 winsound.PlaySound(str(audio_file), winsound.SND_FILENAME | winsound.SND_ASYNC) return # Linux:优先 paplay,其次 aplay player = shutil.which("paplay") or shutil.which("aplay") if player is None: logger.warning("No audio player found, expected paplay or aplay") return self._current_process = subprocess.Popen( [player, str(audio_file)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def _stop_current_playback(self) -> None: """中断当前正在播放的声音""" if self._use_windows_winsound: try: import winsound winsound.PlaySound(None, winsound.SND_PURGE) except Exception: pass return if self._current_process is not None and self._current_process.poll() is None: self._current_process.terminate() self._current_process = None def _audio_path(self, count: int) -> Path: """获取某个次数对应的音频文件路径""" suffix = ".aiff" if self._use_macos_say else ".wav" return self.cache_dir / f"{count}{suffix}" def _clear_pending_counts(self) -> None: """清空队列中等待播放的次数,避免语音堆积""" while True: try: item = self._queue.get_nowait() if item is None: # close 信号不要吞掉 self._queue.put(None) return except queue.Empty: return