6dee2a2ff3
- 调整视频处理频率从每帧处理改为每2帧处理 - 添加膝角趋势平滑算法减少单帧抖动误判 - 改进对角伸展检测逻辑支持准备位手臂上举 - 优化状态机确保严格回到准备姿态才计数 - 添加姿态丢失时的候选帧清理机制 - 更新音频文件生成路径至resources目录 - 改进macOS音频生成使用AIFF格式提高质量 - 添加详细的帧处理日志输出间隔配置
185 lines
7.5 KiB
Python
185 lines
7.5 KiB
Python
from __future__ import annotations
|
|
|
|
from app.exercises.dead_bug.rules import detect_diagonal_extension, is_ready_position
|
|
from app.exercises.dead_bug.types import DeadBugMetrics, DeadBugPhase, DeadBugResult
|
|
|
|
_SMOOTHING_ALPHA = 0.45
|
|
_TREND_DELTA_DEGREES = 1.5
|
|
_SIDE_ANGLE_MARGIN = 10.0
|
|
_EXTENSION_START_ANGLE = 125.0
|
|
_EXTENSION_PEAK_ANGLE = 150.0
|
|
_READY_KNEE_ANGLE = 140.0
|
|
|
|
class DeadBugStateMachine:
|
|
"""死虫式动作状态机:管理READY/EXTENDING/NEED_RESET/NO_POSE状态转换"""
|
|
|
|
def __init__(self, *, extension_confirm_frames: int = 4, reset_confirm_frames: int = 3) -> None:
|
|
"""初始化并设置状态转换确认帧数"""
|
|
self.extension_confirm_frames = extension_confirm_frames
|
|
self.reset_confirm_frames = reset_confirm_frames
|
|
|
|
self.rep_count = 0
|
|
self.phase = DeadBugPhase.READY
|
|
self.active_side: str | None = None
|
|
self._candidate_side: str | None = None
|
|
self._candidate_frames = 0
|
|
self._reset_frames = 0
|
|
self._smooth_left_knee_angle: float | None = None
|
|
self._smooth_right_knee_angle: float | None = None
|
|
self._left_knee_delta = 0.0
|
|
self._right_knee_delta = 0.0
|
|
|
|
def mark_no_pose(self) -> None:
|
|
"""姿态丢失时清掉候选帧;已确认的半程动作保留,等待重新可见后完成回收。"""
|
|
if self.phase == DeadBugPhase.READY:
|
|
self.phase = DeadBugPhase.NO_POSE
|
|
self.active_side = None
|
|
self._candidate_side = None
|
|
self._candidate_frames = 0
|
|
self._reset_frames = 0
|
|
|
|
def update(self, metrics: DeadBugMetrics) -> DeadBugResult:
|
|
"""根据传入指标更新状态机并返回本次结果"""
|
|
self._update_knee_trends(metrics)
|
|
|
|
side = self._detect_motion_side(metrics)
|
|
ready = self._is_stable_ready(metrics)
|
|
|
|
if side is None:
|
|
self._candidate_side = None
|
|
self._candidate_frames = 0
|
|
elif side == self._candidate_side:
|
|
self._candidate_frames += 1
|
|
else:
|
|
self._candidate_side = side
|
|
self._candidate_frames = 1
|
|
|
|
if self.phase in (DeadBugPhase.READY, DeadBugPhase.NO_POSE):
|
|
if ready:
|
|
self.phase = DeadBugPhase.READY
|
|
if self._candidate_frames >= self.extension_confirm_frames and side is not None:
|
|
self.phase = DeadBugPhase.EXTENDING
|
|
self.active_side = side
|
|
self._reset_frames = 0
|
|
elif self.phase == DeadBugPhase.EXTENDING:
|
|
if ready or self._active_knee_retracting():
|
|
self.phase = DeadBugPhase.NEED_RESET
|
|
self._reset_frames = 1 if ready else 0
|
|
elif self.phase == DeadBugPhase.NEED_RESET:
|
|
if ready:
|
|
self._reset_frames += 1
|
|
if self._reset_frames >= self.reset_confirm_frames:
|
|
self.rep_count += 1
|
|
self.phase = DeadBugPhase.READY
|
|
self.active_side = None
|
|
self._candidate_side = None
|
|
self._candidate_frames = 0
|
|
self._reset_frames = 0
|
|
else:
|
|
self._reset_frames = 0
|
|
|
|
feedback = list(metrics.feedback)
|
|
display_side = detect_diagonal_extension(metrics)
|
|
if display_side is None and not ready:
|
|
feedback.append("Extend opposite arm and leg only")
|
|
if ready:
|
|
feedback.append("Ready position")
|
|
elif display_side == "left_arm_right_leg":
|
|
feedback.append("Left arm + right leg")
|
|
elif display_side == "right_arm_left_leg":
|
|
feedback.append("Right arm + left leg")
|
|
|
|
is_standard = display_side is not None and not metrics.feedback
|
|
return DeadBugResult(
|
|
rep_count=self.rep_count,
|
|
phase=self.phase,
|
|
side=display_side,
|
|
is_standard=is_standard,
|
|
feedback=feedback[:3],
|
|
metrics=metrics,
|
|
)
|
|
|
|
def _update_knee_trends(self, metrics: DeadBugMetrics) -> None:
|
|
"""更新平滑膝角和本帧变化量,用连续趋势抵消单帧抖动。"""
|
|
previous_left = self._smooth_left_knee_angle
|
|
previous_right = self._smooth_right_knee_angle
|
|
|
|
if previous_left is None:
|
|
self._smooth_left_knee_angle = metrics.left_knee_angle
|
|
self._left_knee_delta = 0.0
|
|
else:
|
|
self._smooth_left_knee_angle = (
|
|
_SMOOTHING_ALPHA * metrics.left_knee_angle
|
|
+ (1.0 - _SMOOTHING_ALPHA) * previous_left
|
|
)
|
|
self._left_knee_delta = self._smooth_left_knee_angle - previous_left
|
|
|
|
if previous_right is None:
|
|
self._smooth_right_knee_angle = metrics.right_knee_angle
|
|
self._right_knee_delta = 0.0
|
|
else:
|
|
self._smooth_right_knee_angle = (
|
|
_SMOOTHING_ALPHA * metrics.right_knee_angle
|
|
+ (1.0 - _SMOOTHING_ALPHA) * previous_right
|
|
)
|
|
self._right_knee_delta = self._smooth_right_knee_angle - previous_right
|
|
|
|
def _detect_motion_side(self, metrics: DeadBugMetrics) -> str | None:
|
|
"""基于膝角趋势推断正在伸展的对角侧,手臂只作为辅助校验。"""
|
|
raw_side = detect_diagonal_extension(metrics)
|
|
if raw_side is not None and self._side_has_extension_motion(raw_side):
|
|
return raw_side
|
|
|
|
left = self._smooth_left_knee_angle
|
|
right = self._smooth_right_knee_angle
|
|
if left is None or right is None:
|
|
return raw_side
|
|
|
|
both_legs_high = left >= _EXTENSION_PEAK_ANGLE and right >= _EXTENSION_PEAK_ANGLE
|
|
if both_legs_high and abs(left - right) < _SIDE_ANGLE_MARGIN:
|
|
return None
|
|
|
|
if (
|
|
right >= _EXTENSION_START_ANGLE
|
|
and right - left >= _SIDE_ANGLE_MARGIN
|
|
and (self._right_knee_delta >= _TREND_DELTA_DEGREES or right >= _EXTENSION_PEAK_ANGLE)
|
|
):
|
|
return "left_arm_right_leg"
|
|
if (
|
|
left >= _EXTENSION_START_ANGLE
|
|
and left - right >= _SIDE_ANGLE_MARGIN
|
|
and (self._left_knee_delta >= _TREND_DELTA_DEGREES or left >= _EXTENSION_PEAK_ANGLE)
|
|
):
|
|
return "right_arm_left_leg"
|
|
return raw_side
|
|
|
|
def _side_has_extension_motion(self, side: str) -> bool:
|
|
"""确认对应腿处于伸展区或仍在伸展趋势中。"""
|
|
if side == "left_arm_right_leg":
|
|
angle = self._smooth_right_knee_angle
|
|
delta = self._right_knee_delta
|
|
else:
|
|
angle = self._smooth_left_knee_angle
|
|
delta = self._left_knee_delta
|
|
if angle is None:
|
|
return True
|
|
return angle >= _EXTENSION_PEAK_ANGLE or (
|
|
angle >= _EXTENSION_START_ANGLE and delta >= _TREND_DELTA_DEGREES
|
|
)
|
|
|
|
def _is_stable_ready(self, metrics: DeadBugMetrics) -> bool:
|
|
"""准备位需要双腿回到屈膝区域;使用平滑膝角避免单帧阈值跳变。"""
|
|
left = self._smooth_left_knee_angle
|
|
right = self._smooth_right_knee_angle
|
|
if left is None or right is None:
|
|
return is_ready_position(metrics)
|
|
return left <= _READY_KNEE_ANGLE and right <= _READY_KNEE_ANGLE
|
|
|
|
def _active_knee_retracting(self) -> bool:
|
|
"""确认已伸展的那条腿开始回收。"""
|
|
if self.active_side == "left_arm_right_leg":
|
|
return self._right_knee_delta <= -_TREND_DELTA_DEGREES
|
|
if self.active_side == "right_arm_left_leg":
|
|
return self._left_knee_delta <= -_TREND_DELTA_DEGREES
|
|
return False
|