Refactor into modular app structure
Split monolithic files into focused modules: - app/core: settings, logging, lifecycle - app/signaling: websocket server, ICE parser, message models - app/webrtc: peer session, video receiver, frame source - app/vision: pose landmarker wrapper, model config, pose types - app/exercises/dead_bug: detector, metrics, rules, state machine, types - app/rendering: skeleton renderer, status overlay, window display - app/audio: rep announcer - app/diagnostics: perf timer, crash handler - configs: environment-based settings - tests: unit tests for rules, state machine, ICE parser - run.py: entry point
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from app.exercises.dead_bug.rules import detect_diagonal_extension, is_ready_position
|
||||
from app.exercises.dead_bug.types import DeadBugMetrics, DeadBugPhase, DeadBugResult
|
||||
|
||||
|
||||
class DeadBugStateMachine:
|
||||
def __init__(self, *, extension_confirm_frames: int = 4, reset_confirm_frames: int = 3) -> None:
|
||||
self.extension_confirm_frames = extension_confirm_frames
|
||||
self.reset_confirm_frames = reset_confirm_frames
|
||||
|
||||
self.rep_count = 0
|
||||
self.phase = DeadBugPhase.READY
|
||||
self.active_side: str | None = None
|
||||
self._candidate_side: str | None = None
|
||||
self._candidate_frames = 0
|
||||
self._reset_frames = 0
|
||||
|
||||
def update(self, metrics: DeadBugMetrics) -> DeadBugResult:
|
||||
side = detect_diagonal_extension(metrics)
|
||||
ready = is_ready_position(metrics)
|
||||
|
||||
if side is None:
|
||||
self._candidate_side = None
|
||||
self._candidate_frames = 0
|
||||
elif side == self._candidate_side:
|
||||
self._candidate_frames += 1
|
||||
else:
|
||||
self._candidate_side = side
|
||||
self._candidate_frames = 1
|
||||
|
||||
if self.phase in (DeadBugPhase.READY, DeadBugPhase.NO_POSE):
|
||||
if self._candidate_frames >= self.extension_confirm_frames and side is not None:
|
||||
self.phase = DeadBugPhase.EXTENDING
|
||||
self.active_side = side
|
||||
self._reset_frames = 0
|
||||
elif self.phase == DeadBugPhase.EXTENDING:
|
||||
if side == self.active_side:
|
||||
self.phase = DeadBugPhase.NEED_RESET
|
||||
elif self.phase == DeadBugPhase.NEED_RESET:
|
||||
if ready:
|
||||
self._reset_frames += 1
|
||||
if self._reset_frames >= self.reset_confirm_frames:
|
||||
self.rep_count += 1
|
||||
self.phase = DeadBugPhase.READY
|
||||
self.active_side = None
|
||||
self._candidate_side = None
|
||||
self._candidate_frames = 0
|
||||
self._reset_frames = 0
|
||||
else:
|
||||
self._reset_frames = 0
|
||||
|
||||
feedback = list(metrics.feedback)
|
||||
if side is None and not ready:
|
||||
feedback.append("Extend opposite arm and leg only")
|
||||
if ready:
|
||||
feedback.append("Ready position")
|
||||
elif side == "left_arm_right_leg":
|
||||
feedback.append("Left arm + right leg")
|
||||
elif side == "right_arm_left_leg":
|
||||
feedback.append("Right arm + left leg")
|
||||
|
||||
is_standard = side is not None and not metrics.feedback
|
||||
return DeadBugResult(
|
||||
rep_count=self.rep_count,
|
||||
phase=self.phase,
|
||||
side=side,
|
||||
is_standard=is_standard,
|
||||
feedback=feedback[:3],
|
||||
metrics=metrics,
|
||||
)
|
||||
Reference in New Issue
Block a user