from __future__ import annotations import queue import shutil import subprocess import sys import threading import time from pathlib import Path from loguru import logger class RepAnnouncer: """运动次数语音播报器:读取预生成的音频文件直接播放""" def __init__( self, *, enabled: bool = True, max_count: int = 200, audio_dir: str | Path = "resources/audio/reps", ) -> None: self.enabled = enabled self.max_count = max_count self.audio_dir = Path(audio_dir) self._queue: queue.Queue[tuple[int, float] | None] = queue.Queue() self._thread: threading.Thread | None = None self._current_process: subprocess.Popen | None = None self._closed = False self._play_lock = threading.Lock() self._platform = sys.platform self._direct_playback = self._platform.startswith("win") if self.enabled: self._start() def announce_count(self, count: int) -> None: """将次数放入队列,后台线程播放对应音频""" if not self.enabled or self._closed: return if count <= 0 or count > self.max_count: return requested_at = time.perf_counter() if self._direct_playback: audio_file = self._audio_path(count) if not audio_file.exists(): logger.warning("Rep audio file missing: {}", audio_file) return try: self._play(audio_file) logger.info( "Rep audio submitted immediately: count={}, submit_ms={:.1f}", count, (time.perf_counter() - requested_at) * 1000, ) except Exception as exc: logger.warning("Failed to play rep count {}: {}", count, exc) return self._clear_pending_counts() self._queue.put((count, requested_at)) logger.info("Rep audio queued: count={}", count) def close(self) -> None: """停止播报线程并释放资源""" if not self.enabled or self._closed: return self._closed = True self._queue.put(None) if self._thread is not None: self._thread.join(timeout=1.0) self._stop_current_playback() logger.info("Rep announcer closed") def _start(self) -> None: """启动后台播报线程""" self.audio_dir.mkdir(parents=True, exist_ok=True) if self._direct_playback: import winsound logger.info("Rep announcer initialized in direct Windows mode, audio_dir={}", self.audio_dir) return self._thread = threading.Thread(target=self._run, name="RepAnnouncer", daemon=True) self._thread.start() logger.info("Rep announcer initialized in queued mode, audio_dir={}", self.audio_dir) def _run(self) -> None: """后台线程:从队列取次数,播放对应音频文件""" while True: item = self._queue.get() if item is None: return count, requested_at = item audio_file = self._audio_path(count) if not audio_file.exists(): logger.warning("Rep audio file missing: {}", audio_file) continue try: self._play(audio_file) logger.info( "Rep audio submitted from queue: count={}, queue_ms={:.1f}", count, (time.perf_counter() - requested_at) * 1000, ) except Exception as exc: logger.warning("Failed to play rep count {}: {}", count, exc) def _play(self, audio_file: Path) -> None: """播放音频文件(平台自适应)""" with self._play_lock: self._stop_current_playback() if self._platform == "darwin": self._current_process = subprocess.Popen( ["afplay", str(audio_file)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) elif self._platform.startswith("win"): import winsound winsound.PlaySound( str(audio_file), winsound.SND_FILENAME | winsound.SND_ASYNC | winsound.SND_NODEFAULT, ) else: player = shutil.which("paplay") or shutil.which("aplay") if player is None: logger.warning("No audio player found") return self._current_process = subprocess.Popen( [player, str(audio_file)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def _stop_current_playback(self) -> None: """中断当前正在播放的声音""" if self._platform.startswith("win"): try: import winsound winsound.PlaySound(None, winsound.SND_PURGE) except Exception: pass return if self._current_process is not None and self._current_process.poll() is None: self._current_process.terminate() self._current_process = None def _audio_path(self, count: int) -> Path: """获取某个次数对应的音频文件路径""" suffix = ".aiff" if self._platform == "darwin" else ".wav" return self.audio_dir / f"{count}{suffix}" def _clear_pending_counts(self) -> None: """清空队列中等待播放的次数,避免语音堆积""" while True: try: item = self._queue.get_nowait() if item is None: self._queue.put(None) return except queue.Empty: return