from __future__ import annotations import asyncio import time import cv2 from aiortc.mediastreams import MediaStreamError from loguru import logger from app.audio.rep_announcer import RepAnnouncer from app.exercises.dead_bug.detector import DeadBugDetector from app.rendering.window_display import close_window, is_esc_pressed, show_frame from configs.load import config def _format_pose_debug(pose_result) -> str: """格式化姿态检测结果用于调试日志输出""" metrics = pose_result.metrics if metrics is None: return "metrics=None" return ( f"side={pose_result.side}, standard={pose_result.is_standard}, " f"angles(le={metrics.left_elbow_angle:.1f}, re={metrics.right_elbow_angle:.1f}, " f"lk={metrics.left_knee_angle:.1f}, rk={metrics.right_knee_angle:.1f}), " f"extended(la={metrics.left_arm_extended}, ra={metrics.right_arm_extended}, " f"ll={metrics.left_leg_extended}, rl={metrics.right_leg_extended})" ) def _new_perf_window() -> dict: return { "frames": 0, "processed": 0, "loop_ms": 0.0, "to_ndarray_ms": 0.0, "detect_ms": 0.0, "show_ms": 0.0, "max_loop_ms": 0.0, "max_detect_ms": 0.0, "detector": {}, } def _add_detector_timing(perf: dict, timing: dict[str, float | bool]) -> None: detector = perf["detector"] for key, value in timing.items(): if key == "submitted": detector[key] = detector.get(key, 0) + (1 if value else 0) continue value = float(value) detector[key] = detector.get(key, 0.0) + value max_key = f"max_{key}" detector[max_key] = max(detector.get(max_key, 0.0), value) def _avg(perf: dict, key: str, denominator: int) -> float: if denominator <= 0: return 0.0 return perf.get(key, 0.0) / denominator class VideoReceiver: """视频轨道接收与运动检测流水线""" def __init__(self, track) -> None: self._track = track async def run(self) -> None: """持续接收视频帧并进行姿态检测、渲染和语音播报""" log_every_n_frames = max(1, config.video.log_every_n_frames) perf_log_every_n_frames = max(1, config.video.perf_log_every_n_frames) slow_frame_ms = max(0.0, config.video.slow_frame_ms) logger.info( "Start receiving video frames, process_every_n={}, log_every_n={}, perf_log_every_n={}, slow_frame_ms={}", config.video.process_every_n_frames, log_every_n_frames, perf_log_every_n_frames, slow_frame_ms, ) logger.info( "OpenCV OpenCL status: have_opencl={}, use_opencl={}", cv2.ocl.haveOpenCL(), cv2.ocl.useOpenCL(), ) frame_count = 0 processed_count = 0 detector = DeadBugDetector( model_path=config.model.resolved_path, visibility_threshold=config.dead_bug.visibility_threshold, extension_confirm_frames=config.dead_bug.extension_confirm_frames, reset_confirm_frames=config.dead_bug.reset_confirm_frames, prefer_gpu=config.model.prefer_gpu, ) announcer = RepAnnouncer( enabled=config.audio.rep_announcer_enabled, max_count=config.audio.rep_max_count, audio_dir=config.audio.resolved_audio_dir, ) last_announced_rep = 0 last_pose_result = None last_annotated = None perf = _new_perf_window() try: while True: loop_started = time.perf_counter() frame = await self._track.recv() frame_count += 1 recv_done = time.perf_counter() raw_img = frame.to_ndarray(format="bgr24") ndarray_done = time.perf_counter() timestamp_ms = int(frame.time * 1000) if frame.time is not None else frame_count * 33 detect_ms = 0.0 if frame_count % config.video.process_every_n_frames == 0 or last_pose_result is None: detect_started = time.perf_counter() processed_count += 1 last_annotated, last_pose_result = detector.process_frame(raw_img, timestamp_ms) detect_ms = (time.perf_counter() - detect_started) * 1000 perf["processed"] += 1 perf["detect_ms"] += detect_ms perf["max_detect_ms"] = max(perf["max_detect_ms"], detect_ms) _add_detector_timing(perf, detector.last_timing) if last_pose_result.rep_count > last_announced_rep: last_announced_rep = last_pose_result.rep_count announce_started = time.perf_counter() announcer.announce_count(last_announced_rep) logger.info( "Rep completed and audio requested: count={}, frame={}, announce_call_ms={:.1f}", last_announced_rep, frame_count, (time.perf_counter() - announce_started) * 1000, ) display_img = last_annotated if last_annotated is not None else raw_img show_started = time.perf_counter() show_frame(display_img) show_done = time.perf_counter() if frame_count % log_every_n_frames == 0: logger.info( "Received {} frames, processed={}, raw_shape={}, reps={}, phase={}, feedback={}, {}", frame_count, processed_count, raw_img.shape, last_pose_result.rep_count if last_pose_result is not None else 0, last_pose_result.phase.value if last_pose_result is not None else "none", " | ".join(last_pose_result.feedback) if last_pose_result is not None else "", _format_pose_debug(last_pose_result) if last_pose_result is not None else "metrics=None", ) loop_ms = (show_done - loop_started) * 1000 to_ndarray_ms = (ndarray_done - recv_done) * 1000 show_ms = (show_done - show_started) * 1000 perf["frames"] += 1 perf["loop_ms"] += loop_ms perf["to_ndarray_ms"] += to_ndarray_ms perf["show_ms"] += show_ms perf["max_loop_ms"] = max(perf["max_loop_ms"], loop_ms) if slow_frame_ms and loop_ms >= slow_frame_ms: logger.warning( "Slow video frame: frame={}, loop_ms={:.1f}, detect_ms={:.1f}, to_ndarray_ms={:.1f}, show_ms={:.1f}, shape={}", frame_count, loop_ms, detect_ms, to_ndarray_ms, show_ms, raw_img.shape, ) if frame_count % perf_log_every_n_frames == 0: frames = perf["frames"] processed = perf["processed"] detector_perf = perf["detector"] logger.info( "Perf window: frames={}, processed={}, avg_loop_ms={:.1f}, max_loop_ms={:.1f}, avg_to_ndarray_ms={:.1f}, " "avg_detect_ms={:.1f}, max_detect_ms={:.1f}, avg_show_ms={:.1f}, detector_avg_total_ms={:.1f}, " "detector_max_total_ms={:.1f}, detector_avg_wait_ms={:.1f}, detector_max_wait_ms={:.1f}, " "detector_avg_convert_ms={:.1f}, detector_avg_postprocess_draw_ms={:.1f}, detector_submitted={}", frames, processed, _avg(perf, "loop_ms", frames), perf["max_loop_ms"], _avg(perf, "to_ndarray_ms", frames), _avg(perf, "detect_ms", processed), perf["max_detect_ms"], _avg(perf, "show_ms", frames), _avg(detector_perf, "total_ms", processed), detector_perf.get("max_total_ms", 0.0), _avg(detector_perf, "wait_ms", processed), detector_perf.get("max_wait_ms", 0.0), _avg(detector_perf, "convert_ms", processed), _avg(detector_perf, "postprocess_draw_ms", processed), detector_perf.get("submitted", 0), ) perf = _new_perf_window() if is_esc_pressed(): logger.info("ESC pressed, closing display") break except asyncio.CancelledError: logger.info("Video receive task cancelled") except MediaStreamError: logger.info("Video track ended") except Exception as e: logger.exception(f"Video receive error: {e!r}") finally: announcer.close() detector.close() close_window()