from __future__ import annotations from app.exercises.dead_bug.rules import detect_diagonal_extension, is_ready_position from app.exercises.dead_bug.types import DeadBugMetrics, DeadBugPhase, DeadBugResult _SMOOTHING_ALPHA = 0.45 _TREND_DELTA_DEGREES = 1.5 _SIDE_ANGLE_MARGIN = 10.0 _EXTENSION_START_ANGLE = 125.0 _EXTENSION_PEAK_ANGLE = 150.0 _READY_KNEE_ANGLE = 140.0 class DeadBugStateMachine: """死虫式动作状态机:管理READY/EXTENDING/NEED_RESET/NO_POSE状态转换""" def __init__(self, *, extension_confirm_frames: int = 4, reset_confirm_frames: int = 3) -> None: """初始化并设置状态转换确认帧数""" self.extension_confirm_frames = extension_confirm_frames self.reset_confirm_frames = reset_confirm_frames self.rep_count = 0 self.phase = DeadBugPhase.READY self.active_side: str | None = None self._candidate_side: str | None = None self._candidate_frames = 0 self._reset_frames = 0 self._smooth_left_knee_angle: float | None = None self._smooth_right_knee_angle: float | None = None self._left_knee_delta = 0.0 self._right_knee_delta = 0.0 def mark_no_pose(self) -> None: """姿态丢失时清掉候选帧;已确认的半程动作保留,等待重新可见后完成回收。""" if self.phase == DeadBugPhase.READY: self.phase = DeadBugPhase.NO_POSE self.active_side = None self._candidate_side = None self._candidate_frames = 0 self._reset_frames = 0 def update(self, metrics: DeadBugMetrics) -> DeadBugResult: """根据传入指标更新状态机并返回本次结果""" self._update_knee_trends(metrics) side = self._detect_motion_side(metrics) ready = self._is_stable_ready(metrics) if side is None: self._candidate_side = None self._candidate_frames = 0 elif side == self._candidate_side: self._candidate_frames += 1 else: self._candidate_side = side self._candidate_frames = 1 if self.phase in (DeadBugPhase.READY, DeadBugPhase.NO_POSE): if ready: self.phase = DeadBugPhase.READY if self._candidate_frames >= self.extension_confirm_frames and side is not None: self.phase = DeadBugPhase.EXTENDING self.active_side = side self._reset_frames = 0 elif self.phase == DeadBugPhase.EXTENDING: if ready or self._active_knee_retracting(): self.phase = DeadBugPhase.NEED_RESET self._reset_frames = 1 if ready else 0 elif self.phase == DeadBugPhase.NEED_RESET: if ready: self._reset_frames += 1 if self._reset_frames >= self.reset_confirm_frames: self.rep_count += 1 self.phase = DeadBugPhase.READY self.active_side = None self._candidate_side = None self._candidate_frames = 0 self._reset_frames = 0 else: self._reset_frames = 0 feedback = list(metrics.feedback) display_side = detect_diagonal_extension(metrics) if display_side is None and not ready: feedback.append("Extend opposite arm and leg only") if ready: feedback.append("Ready position") elif display_side == "left_arm_right_leg": feedback.append("Left arm + right leg") elif display_side == "right_arm_left_leg": feedback.append("Right arm + left leg") is_standard = display_side is not None and not metrics.feedback return DeadBugResult( rep_count=self.rep_count, phase=self.phase, side=display_side, is_standard=is_standard, feedback=feedback[:3], metrics=metrics, ) def _update_knee_trends(self, metrics: DeadBugMetrics) -> None: """更新平滑膝角和本帧变化量,用连续趋势抵消单帧抖动。""" previous_left = self._smooth_left_knee_angle previous_right = self._smooth_right_knee_angle if previous_left is None: self._smooth_left_knee_angle = metrics.left_knee_angle self._left_knee_delta = 0.0 else: self._smooth_left_knee_angle = ( _SMOOTHING_ALPHA * metrics.left_knee_angle + (1.0 - _SMOOTHING_ALPHA) * previous_left ) self._left_knee_delta = self._smooth_left_knee_angle - previous_left if previous_right is None: self._smooth_right_knee_angle = metrics.right_knee_angle self._right_knee_delta = 0.0 else: self._smooth_right_knee_angle = ( _SMOOTHING_ALPHA * metrics.right_knee_angle + (1.0 - _SMOOTHING_ALPHA) * previous_right ) self._right_knee_delta = self._smooth_right_knee_angle - previous_right def _detect_motion_side(self, metrics: DeadBugMetrics) -> str | None: """基于膝角趋势推断正在伸展的对角侧,手臂只作为辅助校验。""" raw_side = detect_diagonal_extension(metrics) if raw_side is not None and self._side_has_extension_motion(raw_side): return raw_side left = self._smooth_left_knee_angle right = self._smooth_right_knee_angle if left is None or right is None: return raw_side both_legs_high = left >= _EXTENSION_PEAK_ANGLE and right >= _EXTENSION_PEAK_ANGLE if both_legs_high and abs(left - right) < _SIDE_ANGLE_MARGIN: return None if ( right >= _EXTENSION_START_ANGLE and right - left >= _SIDE_ANGLE_MARGIN and (self._right_knee_delta >= _TREND_DELTA_DEGREES or right >= _EXTENSION_PEAK_ANGLE) ): return "left_arm_right_leg" if ( left >= _EXTENSION_START_ANGLE and left - right >= _SIDE_ANGLE_MARGIN and (self._left_knee_delta >= _TREND_DELTA_DEGREES or left >= _EXTENSION_PEAK_ANGLE) ): return "right_arm_left_leg" return raw_side def _side_has_extension_motion(self, side: str) -> bool: """确认对应腿处于伸展区或仍在伸展趋势中。""" if side == "left_arm_right_leg": angle = self._smooth_right_knee_angle delta = self._right_knee_delta else: angle = self._smooth_left_knee_angle delta = self._left_knee_delta if angle is None: return True return angle >= _EXTENSION_PEAK_ANGLE or ( angle >= _EXTENSION_START_ANGLE and delta >= _TREND_DELTA_DEGREES ) def _is_stable_ready(self, metrics: DeadBugMetrics) -> bool: """准备位需要双腿回到屈膝区域;使用平滑膝角避免单帧阈值跳变。""" left = self._smooth_left_knee_angle right = self._smooth_right_knee_angle if left is None or right is None: return is_ready_position(metrics) return left <= _READY_KNEE_ANGLE and right <= _READY_KNEE_ANGLE def _active_knee_retracting(self) -> bool: """确认已伸展的那条腿开始回收。""" if self.active_side == "left_arm_right_leg": return self._right_knee_delta <= -_TREND_DELTA_DEGREES if self.active_side == "right_arm_left_leg": return self._left_knee_delta <= -_TREND_DELTA_DEGREES return False