diff --git a/app/audio/generate.py b/app/audio/generate.py index da4dfe4..e13ed73 100644 --- a/app/audio/generate.py +++ b/app/audio/generate.py @@ -21,27 +21,27 @@ def generate_rep_audio_files( 默认生成到: - app/audio/reps/0.wav - app/audio/reps/1.wav + resources/audio/reps/0.aiff # macOS + resources/audio/reps/0.wav # Windows / Linux ... - app/audio/reps/200.wav + resources/audio/reps/200.aiff 或 200.wav 服务启动时调用一次即可。 """ output_dir.mkdir(parents=True, exist_ok=True) + system = platform.system().lower() + suffix = ".aiff" if system == "darwin" else ".wav" missing_counts = [ count for count in range(0, max_count + 1) - if overwrite or not _audio_path(output_dir, count).exists() + if overwrite or not _audio_path(output_dir, count, suffix=suffix).exists() ] if not missing_counts: logger.info("Rep audio files already prepared: {}", output_dir) return - system = platform.system().lower() - logger.info( "Preparing rep audio files, system={}, count={}, output_dir={}", system, @@ -79,22 +79,27 @@ def _generate_with_macos_say( raise RuntimeError("macOS say command not found") for count in counts: - audio_file = _audio_path(output_dir, count) + audio_file = _audio_path(output_dir, count, suffix=".aiff") - subprocess.run( - [ - "say", - "-r", - str(rate), - "--file-format=WAVE", - "-o", - str(audio_file), - str(count), - ], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - check=True, - ) + try: + subprocess.run( + [ + "say", + "-r", + str(rate), + "--file-format=AIFF", + "-o", + str(audio_file), + str(count), + ], + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + text=True, + check=True, + ) + except subprocess.CalledProcessError as exc: + message = exc.stderr.strip() or f"exit status {exc.returncode}" + raise RuntimeError(f"Failed to generate {audio_file}: {message}") from exc def _generate_with_pyttsx3( @@ -114,11 +119,11 @@ def _generate_with_pyttsx3( engine.setProperty("volume", 1.0) for count in counts: - audio_file = _audio_path(output_dir, count) + audio_file = _audio_path(output_dir, count, suffix=".wav") engine.save_to_file(str(count), str(audio_file)) engine.runAndWait() -def _audio_path(output_dir: Path, count: int) -> Path: - return output_dir / f"{count}.wav" +def _audio_path(output_dir: Path, count: int, *, suffix: str) -> Path: + return output_dir / f"{count}{suffix}" diff --git a/app/exercises/dead_bug/detector.py b/app/exercises/dead_bug/detector.py index 670bcee..c51f8b1 100644 --- a/app/exercises/dead_bug/detector.py +++ b/app/exercises/dead_bug/detector.py @@ -110,6 +110,7 @@ class DeadBugDetector: annotated = bgr_frame.copy() if pose_result is None or not pose_result.pose_landmarks: + self._state.mark_no_pose() result = DeadBugResult( rep_count=self._state.rep_count, phase=DeadBugPhase.NO_POSE, @@ -125,6 +126,7 @@ class DeadBugDetector: draw_landmarks(annotated, landmarks, REQUIRED_LANDMARKS, visibility_threshold=self.visibility_threshold) if not has_required_visibility(landmarks, REQUIRED_LANDMARKS, self.visibility_threshold): + self._state.mark_no_pose() result = DeadBugResult( rep_count=self._state.rep_count, phase=DeadBugPhase.NO_POSE, diff --git a/app/exercises/dead_bug/rules.py b/app/exercises/dead_bug/rules.py index e477525..d966352 100644 --- a/app/exercises/dead_bug/rules.py +++ b/app/exercises/dead_bug/rules.py @@ -9,7 +9,7 @@ def has_required_visibility(landmarks: list[Point], required_indices: tuple[int, def detect_diagonal_extension(metrics: DeadBugMetrics) -> str | None: - """检测是否存在对角伸展(左臂+右腿 或 右臂+左腿)""" + """检测对角伸展(腿部只允许单侧伸展,手臂允许准备位上举带来的识别重叠)""" if metrics.left_leg_extended and metrics.right_leg_extended: return None @@ -21,7 +21,7 @@ def detect_diagonal_extension(metrics: DeadBugMetrics) -> str | None: def is_ready_position(metrics: DeadBugMetrics) -> bool: - """判断是否处于准备姿态(膝盖弯曲且四肢未伸展)""" + """判断是否处于准备姿态(双膝弯曲且双腿未伸展;dead bug 准备位允许手臂上举)""" knees_bent = metrics.left_knee_angle <= 140 and metrics.right_knee_angle <= 140 legs_not_extended = not metrics.left_leg_extended and not metrics.right_leg_extended - return knees_bent and legs_not_extended and detect_diagonal_extension(metrics) is None + return knees_bent and legs_not_extended diff --git a/app/exercises/dead_bug/state_machine.py b/app/exercises/dead_bug/state_machine.py index a5e48dc..9508a7b 100644 --- a/app/exercises/dead_bug/state_machine.py +++ b/app/exercises/dead_bug/state_machine.py @@ -3,6 +3,13 @@ from __future__ import annotations from app.exercises.dead_bug.rules import detect_diagonal_extension, is_ready_position from app.exercises.dead_bug.types import DeadBugMetrics, DeadBugPhase, DeadBugResult +_SMOOTHING_ALPHA = 0.45 +_TREND_DELTA_DEGREES = 1.5 +_SIDE_ANGLE_MARGIN = 10.0 +_EXTENSION_START_ANGLE = 125.0 +_EXTENSION_PEAK_ANGLE = 150.0 +_READY_KNEE_ANGLE = 140.0 + class DeadBugStateMachine: """死虫式动作状态机:管理READY/EXTENDING/NEED_RESET/NO_POSE状态转换""" @@ -17,11 +24,26 @@ class DeadBugStateMachine: self._candidate_side: str | None = None self._candidate_frames = 0 self._reset_frames = 0 + self._smooth_left_knee_angle: float | None = None + self._smooth_right_knee_angle: float | None = None + self._left_knee_delta = 0.0 + self._right_knee_delta = 0.0 + + def mark_no_pose(self) -> None: + """姿态丢失时清掉候选帧;已确认的半程动作保留,等待重新可见后完成回收。""" + if self.phase == DeadBugPhase.READY: + self.phase = DeadBugPhase.NO_POSE + self.active_side = None + self._candidate_side = None + self._candidate_frames = 0 + self._reset_frames = 0 def update(self, metrics: DeadBugMetrics) -> DeadBugResult: """根据传入指标更新状态机并返回本次结果""" - side = detect_diagonal_extension(metrics) - ready = is_ready_position(metrics) + self._update_knee_trends(metrics) + + side = self._detect_motion_side(metrics) + ready = self._is_stable_ready(metrics) if side is None: self._candidate_side = None @@ -33,13 +55,16 @@ class DeadBugStateMachine: self._candidate_frames = 1 if self.phase in (DeadBugPhase.READY, DeadBugPhase.NO_POSE): + if ready: + self.phase = DeadBugPhase.READY if self._candidate_frames >= self.extension_confirm_frames and side is not None: self.phase = DeadBugPhase.EXTENDING self.active_side = side self._reset_frames = 0 elif self.phase == DeadBugPhase.EXTENDING: - if side == self.active_side: + if ready or self._active_knee_retracting(): self.phase = DeadBugPhase.NEED_RESET + self._reset_frames = 1 if ready else 0 elif self.phase == DeadBugPhase.NEED_RESET: if ready: self._reset_frames += 1 @@ -54,21 +79,106 @@ class DeadBugStateMachine: self._reset_frames = 0 feedback = list(metrics.feedback) - if side is None and not ready: + display_side = detect_diagonal_extension(metrics) + if display_side is None and not ready: feedback.append("Extend opposite arm and leg only") if ready: feedback.append("Ready position") - elif side == "left_arm_right_leg": + elif display_side == "left_arm_right_leg": feedback.append("Left arm + right leg") - elif side == "right_arm_left_leg": + elif display_side == "right_arm_left_leg": feedback.append("Right arm + left leg") - is_standard = side is not None and not metrics.feedback + is_standard = display_side is not None and not metrics.feedback return DeadBugResult( rep_count=self.rep_count, phase=self.phase, - side=side, + side=display_side, is_standard=is_standard, feedback=feedback[:3], metrics=metrics, ) + + def _update_knee_trends(self, metrics: DeadBugMetrics) -> None: + """更新平滑膝角和本帧变化量,用连续趋势抵消单帧抖动。""" + previous_left = self._smooth_left_knee_angle + previous_right = self._smooth_right_knee_angle + + if previous_left is None: + self._smooth_left_knee_angle = metrics.left_knee_angle + self._left_knee_delta = 0.0 + else: + self._smooth_left_knee_angle = ( + _SMOOTHING_ALPHA * metrics.left_knee_angle + + (1.0 - _SMOOTHING_ALPHA) * previous_left + ) + self._left_knee_delta = self._smooth_left_knee_angle - previous_left + + if previous_right is None: + self._smooth_right_knee_angle = metrics.right_knee_angle + self._right_knee_delta = 0.0 + else: + self._smooth_right_knee_angle = ( + _SMOOTHING_ALPHA * metrics.right_knee_angle + + (1.0 - _SMOOTHING_ALPHA) * previous_right + ) + self._right_knee_delta = self._smooth_right_knee_angle - previous_right + + def _detect_motion_side(self, metrics: DeadBugMetrics) -> str | None: + """基于膝角趋势推断正在伸展的对角侧,手臂只作为辅助校验。""" + raw_side = detect_diagonal_extension(metrics) + if raw_side is not None and self._side_has_extension_motion(raw_side): + return raw_side + + left = self._smooth_left_knee_angle + right = self._smooth_right_knee_angle + if left is None or right is None: + return raw_side + + both_legs_high = left >= _EXTENSION_PEAK_ANGLE and right >= _EXTENSION_PEAK_ANGLE + if both_legs_high and abs(left - right) < _SIDE_ANGLE_MARGIN: + return None + + if ( + right >= _EXTENSION_START_ANGLE + and right - left >= _SIDE_ANGLE_MARGIN + and (self._right_knee_delta >= _TREND_DELTA_DEGREES or right >= _EXTENSION_PEAK_ANGLE) + ): + return "left_arm_right_leg" + if ( + left >= _EXTENSION_START_ANGLE + and left - right >= _SIDE_ANGLE_MARGIN + and (self._left_knee_delta >= _TREND_DELTA_DEGREES or left >= _EXTENSION_PEAK_ANGLE) + ): + return "right_arm_left_leg" + return raw_side + + def _side_has_extension_motion(self, side: str) -> bool: + """确认对应腿处于伸展区或仍在伸展趋势中。""" + if side == "left_arm_right_leg": + angle = self._smooth_right_knee_angle + delta = self._right_knee_delta + else: + angle = self._smooth_left_knee_angle + delta = self._left_knee_delta + if angle is None: + return True + return angle >= _EXTENSION_PEAK_ANGLE or ( + angle >= _EXTENSION_START_ANGLE and delta >= _TREND_DELTA_DEGREES + ) + + def _is_stable_ready(self, metrics: DeadBugMetrics) -> bool: + """准备位需要双腿回到屈膝区域;使用平滑膝角避免单帧阈值跳变。""" + left = self._smooth_left_knee_angle + right = self._smooth_right_knee_angle + if left is None or right is None: + return is_ready_position(metrics) + return left <= _READY_KNEE_ANGLE and right <= _READY_KNEE_ANGLE + + def _active_knee_retracting(self) -> bool: + """确认已伸展的那条腿开始回收。""" + if self.active_side == "left_arm_right_leg": + return self._right_knee_delta <= -_TREND_DELTA_DEGREES + if self.active_side == "right_arm_left_leg": + return self._left_knee_delta <= -_TREND_DELTA_DEGREES + return False diff --git a/app/webrtc/video_receiver.py b/app/webrtc/video_receiver.py index 78db8f0..ce32b10 100644 --- a/app/webrtc/video_receiver.py +++ b/app/webrtc/video_receiver.py @@ -33,7 +33,12 @@ class VideoReceiver: async def run(self) -> None: """持续接收视频帧并进行姿态检测、渲染和语音播报""" - logger.info("Start receiving video frames, process_every_n={}", config.video.process_every_n_frames) + log_every_n_frames = max(1, config.video.log_every_n_frames) + logger.info( + "Start receiving video frames, process_every_n={}, log_every_n={}", + config.video.process_every_n_frames, + log_every_n_frames, + ) frame_count = 0 processed_count = 0 @@ -70,7 +75,7 @@ class VideoReceiver: display_img = last_annotated if last_annotated is not None else raw_img show_frame(display_img) - if frame_count % 100 == 0: + if frame_count % log_every_n_frames == 0: logger.info( "Received {} frames, processed={}, raw_shape={}, reps={}, phase={}, feedback={}, {}", frame_count, diff --git a/config.yaml b/config.yaml index b97dc42..3732501 100644 --- a/config.yaml +++ b/config.yaml @@ -7,7 +7,8 @@ server: max_ws_size: 10485760 # 10 MB video: - process_every_n_frames: 1 + process_every_n_frames: 2 + log_every_n_frames: 2 model: path: "./pose_models/pose_landmarker_full.task" diff --git a/configs/models.py b/configs/models.py index bc8e910..a460d63 100644 --- a/configs/models.py +++ b/configs/models.py @@ -15,7 +15,8 @@ class ServerConfig: @dataclass class VideoConfig: """视频帧处理配置""" - process_every_n_frames: int = 1 + process_every_n_frames: int = 2 + log_every_n_frames: int = 2 @dataclass diff --git a/tests/test_dead_bug_rules.py b/tests/test_dead_bug_rules.py index 4c413de..7b318ed 100644 --- a/tests/test_dead_bug_rules.py +++ b/tests/test_dead_bug_rules.py @@ -49,6 +49,28 @@ class TestDeadBugRules: ) assert detect_diagonal_extension(metrics) == "left_arm_right_leg" + def test_detect_diagonal_extension_allows_ready_arm_overlap(self): + """测试:准备位双臂上举时,单腿对侧伸展仍应识别""" + metrics = DeadBugMetrics( + left_arm_extended=True, right_arm_extended=True, + left_leg_extended=False, right_leg_extended=True, + left_elbow_angle=160, right_elbow_angle=160, + left_knee_angle=90, right_knee_angle=160, + feedback=[], + ) + assert detect_diagonal_extension(metrics) == "left_arm_right_leg" + + def test_detect_diagonal_extension_rejects_both_legs(self): + """测试:双腿同时伸展不应识别为可计数对角伸展""" + metrics = DeadBugMetrics( + left_arm_extended=True, right_arm_extended=True, + left_leg_extended=True, right_leg_extended=True, + left_elbow_angle=160, right_elbow_angle=160, + left_knee_angle=160, right_knee_angle=160, + feedback=[], + ) + assert detect_diagonal_extension(metrics) is None + def test_is_ready_position(self): """测试:膝盖弯曲且四肢收缩应识别为准备姿态""" metrics = DeadBugMetrics( @@ -60,6 +82,17 @@ class TestDeadBugRules: ) assert is_ready_position(metrics) + def test_is_ready_allows_arms_extended(self): + """测试:dead bug 准备位允许双臂上举""" + metrics = DeadBugMetrics( + left_arm_extended=True, right_arm_extended=True, + left_leg_extended=False, right_leg_extended=False, + left_elbow_angle=160, right_elbow_angle=160, + left_knee_angle=100, right_knee_angle=100, + feedback=[], + ) + assert is_ready_position(metrics) + def test_is_not_ready_legs_extended(self): """测试:腿部伸展时不识别为准备姿态""" metrics = DeadBugMetrics( diff --git a/tests/test_dead_bug_state_machine.py b/tests/test_dead_bug_state_machine.py index e187d79..9930d70 100644 --- a/tests/test_dead_bug_state_machine.py +++ b/tests/test_dead_bug_state_machine.py @@ -26,6 +26,36 @@ class TestDeadBugStateMachine: feedback=[], ) + def _both_legs_extended(self) -> DeadBugMetrics: + """构建双腿同时伸展的非标准姿态""" + return DeadBugMetrics( + left_arm_extended=True, right_arm_extended=True, + left_leg_extended=True, right_leg_extended=True, + left_elbow_angle=160, right_elbow_angle=160, + left_knee_angle=160, right_knee_angle=160, + feedback=[], + ) + + def _arms_extended_ready_legs(self) -> DeadBugMetrics: + """构建腿已收回但手臂未收回的姿态""" + return DeadBugMetrics( + left_arm_extended=True, right_arm_extended=True, + left_leg_extended=False, right_leg_extended=False, + left_elbow_angle=160, right_elbow_angle=160, + left_knee_angle=100, right_knee_angle=100, + feedback=[], + ) + + def _right_knee_angle(self, angle: float) -> DeadBugMetrics: + """构建右膝角连续变化但伸展布尔值尚未稳定的姿态""" + return DeadBugMetrics( + left_arm_extended=True, right_arm_extended=True, + left_leg_extended=False, right_leg_extended=False, + left_elbow_angle=160, right_elbow_angle=160, + left_knee_angle=90, right_knee_angle=angle, + feedback=[], + ) + def test_initial_state(self): """测试:状态机初始化后应为READY且计数为0""" sm = DeadBugStateMachine() @@ -46,3 +76,58 @@ class TestDeadBugStateMachine: assert sm.phase == DeadBugPhase.READY sm.update(self._extended_left()) assert sm.phase == DeadBugPhase.EXTENDING + + def test_confirm_extension_from_knee_angle_trend(self): + """测试:膝角连续上升时,不依赖单帧伸展布尔值也能确认伸展""" + sm = DeadBugStateMachine(extension_confirm_frames=2, reset_confirm_frames=2) + + sm.update(self._right_knee_angle(100)) + sm.update(self._right_knee_angle(130)) + assert sm.phase == DeadBugPhase.READY + sm.update(self._right_knee_angle(145)) + sm.update(self._right_knee_angle(150)) + + assert sm.phase == DeadBugPhase.EXTENDING + + def test_full_rep_counts_once_after_strict_reset(self): + """测试:确认伸展后,只有严格回到准备姿态才计一次""" + sm = DeadBugStateMachine(extension_confirm_frames=2, reset_confirm_frames=2) + + sm.update(self._extended_left()) + sm.update(self._extended_left()) + assert sm.phase == DeadBugPhase.EXTENDING + + sm.update(self._arms_extended_ready_legs()) + assert sm.rep_count == 0 + assert sm.phase == DeadBugPhase.NEED_RESET + + sm.update(self._ready_metrics()) + result = sm.update(self._ready_metrics()) + assert result.rep_count == 1 + assert sm.phase == DeadBugPhase.READY + + result = sm.update(self._ready_metrics()) + assert result.rep_count == 1 + + def test_both_legs_do_not_start_rep(self): + """测试:双腿同时伸展不进入计数流程""" + sm = DeadBugStateMachine(extension_confirm_frames=2, reset_confirm_frames=2) + sm.update(self._both_legs_extended()) + result = sm.update(self._both_legs_extended()) + + assert result.rep_count == 0 + assert sm.phase == DeadBugPhase.READY + + def test_no_pose_preserves_confirmed_rep_until_reset(self): + """测试:已确认伸展后短暂丢姿态,回到准备位仍能完成计数""" + sm = DeadBugStateMachine(extension_confirm_frames=2, reset_confirm_frames=2) + sm.update(self._extended_left()) + sm.update(self._extended_left()) + assert sm.phase == DeadBugPhase.EXTENDING + + sm.mark_no_pose() + sm.update(self._ready_metrics()) + result = sm.update(self._ready_metrics()) + + assert result.rep_count == 1 + assert sm.phase == DeadBugPhase.READY