from __future__ import annotations import queue import shutil import subprocess import sys import threading from pathlib import Path from loguru import logger class RepAnnouncer: """运动次数语音播报器:读取预生成的音频文件直接播放""" def __init__( self, *, enabled: bool = True, max_count: int = 200, audio_dir: str | Path = "resources/audio/reps", ) -> None: self.enabled = enabled self.max_count = max_count self.audio_dir = Path(audio_dir) self._queue: queue.Queue[int | None] = queue.Queue() self._thread: threading.Thread | None = None self._current_process: subprocess.Popen | None = None self._closed = False self._platform = sys.platform if self.enabled: self._start() def announce_count(self, count: int) -> None: """将次数放入队列,后台线程播放对应音频""" if not self.enabled or self._closed: return if count <= 0 or count > self.max_count: return self._clear_pending_counts() self._queue.put(count) def close(self) -> None: """停止播报线程并释放资源""" if not self.enabled or self._closed: return self._closed = True self._queue.put(None) if self._thread is not None: self._thread.join(timeout=1.0) self._stop_current_playback() logger.info("Rep announcer closed") def _start(self) -> None: """启动后台播报线程""" self.audio_dir.mkdir(parents=True, exist_ok=True) self._thread = threading.Thread(target=self._run, name="RepAnnouncer", daemon=True) self._thread.start() logger.info("Rep announcer initialized, audio_dir={}", self.audio_dir) def _run(self) -> None: """后台线程:从队列取次数,播放对应音频文件""" while True: count = self._queue.get() if count is None: return audio_file = self._audio_path(count) if not audio_file.exists(): logger.warning("Rep audio file missing: {}", audio_file) continue try: self._play(audio_file) except Exception as exc: logger.warning("Failed to play rep count {}: {}", count, exc) def _play(self, audio_file: Path) -> None: """播放音频文件(平台自适应)""" self._stop_current_playback() if self._platform == "darwin": self._current_process = subprocess.Popen( ["afplay", str(audio_file)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) elif self._platform.startswith("win"): import winsound winsound.PlaySound(str(audio_file), winsound.SND_FILENAME | winsound.SND_ASYNC) else: player = shutil.which("paplay") or shutil.which("aplay") if player is None: logger.warning("No audio player found") return self._current_process = subprocess.Popen( [player, str(audio_file)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def _stop_current_playback(self) -> None: """中断当前正在播放的声音""" if self._platform.startswith("win"): try: import winsound winsound.PlaySound(None, winsound.SND_PURGE) except Exception: pass return if self._current_process is not None and self._current_process.poll() is None: self._current_process.terminate() self._current_process = None def _audio_path(self, count: int) -> Path: """获取某个次数对应的音频文件路径""" suffix = ".aiff" if self._platform == "darwin" else ".wav" return self.audio_dir / f"{count}{suffix}" def _clear_pending_counts(self) -> None: """清空队列中等待播放的次数,避免语音堆积""" while True: try: item = self._queue.get_nowait() if item is None: self._queue.put(None) return except queue.Empty: return