Files
posefit-server/dead_bug_detector.py
T

432 lines
16 KiB
Python

from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
import cv2
import mediapipe as mp
import numpy as np
from loguru import logger
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode
BaseOptions = mp.tasks.BaseOptions
class DeadBugPhase(str, Enum):
READY = "ready"
EXTENDING = "extending"
NEED_RESET = "need_reset"
NO_POSE = "no_pose"
@dataclass(frozen=True)
class Point:
x: float
y: float
z: float
visibility: float
@dataclass
class DeadBugMetrics:
left_arm_extended: bool
right_arm_extended: bool
left_leg_extended: bool
right_leg_extended: bool
left_elbow_angle: float
right_elbow_angle: float
left_knee_angle: float
right_knee_angle: float
feedback: list[str]
@dataclass
class DeadBugResult:
rep_count: int
phase: DeadBugPhase
side: str | None
is_standard: bool
feedback: list[str]
metrics: DeadBugMetrics | None
class DeadBugDetector:
"""MediaPipe Pose based dead bug detector and counter.
The rules are intentionally conservative because a phone stream only gives
us 2D landmarks. A rep is counted when one diagonal pair extends cleanly and
the body returns to the bent-knee ready position.
"""
LEFT_SHOULDER = 11
RIGHT_SHOULDER = 12
LEFT_ELBOW = 13
RIGHT_ELBOW = 14
LEFT_WRIST = 15
RIGHT_WRIST = 16
LEFT_HIP = 23
RIGHT_HIP = 24
LEFT_KNEE = 25
RIGHT_KNEE = 26
LEFT_ANKLE = 27
RIGHT_ANKLE = 28
REQUIRED_LANDMARKS = (
LEFT_SHOULDER,
RIGHT_SHOULDER,
LEFT_ELBOW,
RIGHT_ELBOW,
LEFT_WRIST,
RIGHT_WRIST,
LEFT_HIP,
RIGHT_HIP,
LEFT_KNEE,
RIGHT_KNEE,
LEFT_ANKLE,
RIGHT_ANKLE,
)
def __init__(
self,
model_path: str | Path | None = None,
*,
visibility_threshold: float = 0.45,
extension_confirm_frames: int = 4,
reset_confirm_frames: int = 3,
prefer_gpu: bool = True,
) -> None:
if model_path is None:
model_path = Path(__file__).resolve().parent / "pose_models" / "pose_landmarker_full.task"
self.model_path = str(model_path)
self.visibility_threshold = visibility_threshold
self.extension_confirm_frames = extension_confirm_frames
self.reset_confirm_frames = reset_confirm_frames
self.delegate = BaseOptions.Delegate.GPU if prefer_gpu else BaseOptions.Delegate.CPU
self._latest_result = None
self._result_lock = threading.Lock()
self._result_event = threading.Event()
self._inflight = False
self._inflight_started_at = 0.0
def on_result(pose_result, _image, _timestamp_ms):
with self._result_lock:
self._latest_result = pose_result
self._inflight = False
self._inflight_started_at = 0.0
self._result_event.set()
self._landmarker = self._create_landmarker(on_result)
self.rep_count = 0
self.phase = DeadBugPhase.READY
self.active_side: str | None = None
self._candidate_side: str | None = None
self._candidate_frames = 0
self._reset_frames = 0
self._last_timestamp_ms = -1
def close(self) -> None:
self._landmarker.close()
def _create_landmarker(self, result_callback):
try:
landmarker = PoseLandmarker.create_from_options(
self._build_options(self.delegate, result_callback)
)
logger.info("MediaPipe PoseLandmarker initialized with {} delegate", self.delegate.name)
return landmarker
except Exception as exc:
if self.delegate == BaseOptions.Delegate.CPU:
raise
logger.warning("MediaPipe GPU delegate unavailable, falling back to CPU: {}", exc)
self.delegate = BaseOptions.Delegate.CPU
landmarker = PoseLandmarker.create_from_options(
self._build_options(self.delegate, result_callback)
)
logger.info("MediaPipe PoseLandmarker initialized with CPU delegate")
return landmarker
def _build_options(self, delegate, result_callback):
return PoseLandmarkerOptions(
base_options=BaseOptions(model_asset_path=self.model_path, delegate=delegate),
running_mode=VisionRunningMode.LIVE_STREAM,
result_callback=result_callback,
num_poses=1,
min_pose_detection_confidence=0.5,
min_pose_presence_confidence=0.5,
min_tracking_confidence=0.5,
)
def process_frame(self, bgr_frame: np.ndarray, timestamp_ms: int) -> tuple[np.ndarray, DeadBugResult]:
timestamp_ms = self._normalize_timestamp(timestamp_ms)
with self._result_lock:
if self._inflight and time.monotonic() - self._inflight_started_at > 0.5:
logger.warning("MediaPipe detect_async timed out; allowing next frame submission")
self._inflight = False
self._inflight_started_at = 0.0
should_submit = not self._inflight
if should_submit:
self._inflight = True
self._inflight_started_at = time.monotonic()
if should_submit:
rgba_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGBA)
mp_image = mp.Image(image_format=mp.ImageFormat.SRGBA, data=rgba_frame)
self._result_event.clear()
try:
self._landmarker.detect_async(mp_image, timestamp_ms)
except Exception:
with self._result_lock:
self._inflight = False
self._inflight_started_at = 0.0
raise
self._result_event.wait(timeout=0.08)
with self._result_lock:
pose_result = self._latest_result
annotated = bgr_frame.copy()
if pose_result is None or not pose_result.pose_landmarks:
result = DeadBugResult(
rep_count=self.rep_count,
phase=DeadBugPhase.NO_POSE,
side=self.active_side,
is_standard=False,
feedback=["No full body detected"],
metrics=None,
)
self._draw_status(annotated, result)
return annotated, result
landmarks = [Point(lm.x, lm.y, lm.z, getattr(lm, "visibility", 1.0)) for lm in pose_result.pose_landmarks[0]]
self._draw_landmarks(annotated, landmarks)
if not self._has_required_visibility(landmarks):
result = DeadBugResult(
rep_count=self.rep_count,
phase=DeadBugPhase.NO_POSE,
side=self.active_side,
is_standard=False,
feedback=["Keep shoulders, elbows, wrists, hips, knees, ankles visible"],
metrics=None,
)
self._draw_status(annotated, result)
return annotated, result
metrics = self._calculate_metrics(landmarks)
result = self._update_state(metrics)
self._draw_status(annotated, result)
return annotated, result
def _normalize_timestamp(self, timestamp_ms: int) -> int:
if timestamp_ms <= self._last_timestamp_ms:
timestamp_ms = self._last_timestamp_ms + 1
self._last_timestamp_ms = timestamp_ms
return timestamp_ms
def _has_required_visibility(self, landmarks: list[Point]) -> bool:
return all(landmarks[i].visibility >= self.visibility_threshold for i in self.REQUIRED_LANDMARKS)
def _calculate_metrics(self, lm: list[Point]) -> DeadBugMetrics:
left_elbow = angle(lm[self.LEFT_SHOULDER], lm[self.LEFT_ELBOW], lm[self.LEFT_WRIST])
right_elbow = angle(lm[self.RIGHT_SHOULDER], lm[self.RIGHT_ELBOW], lm[self.RIGHT_WRIST])
left_knee = angle(lm[self.LEFT_HIP], lm[self.LEFT_KNEE], lm[self.LEFT_ANKLE])
right_knee = angle(lm[self.RIGHT_HIP], lm[self.RIGHT_KNEE], lm[self.RIGHT_ANKLE])
shoulder_width = distance(lm[self.LEFT_SHOULDER], lm[self.RIGHT_SHOULDER])
hip_width = distance(lm[self.LEFT_HIP], lm[self.RIGHT_HIP])
scale = max(shoulder_width, hip_width, 0.08)
left_arm_extended = (
left_elbow >= 145
and distance(lm[self.LEFT_SHOULDER], lm[self.LEFT_WRIST]) >= scale * 1.15
and lm[self.LEFT_WRIST].y <= lm[self.LEFT_SHOULDER].y + scale * 0.35
)
right_arm_extended = (
right_elbow >= 145
and distance(lm[self.RIGHT_SHOULDER], lm[self.RIGHT_WRIST]) >= scale * 1.15
and lm[self.RIGHT_WRIST].y <= lm[self.RIGHT_SHOULDER].y + scale * 0.35
)
left_leg_extended = (
left_knee >= 150
and distance(lm[self.LEFT_HIP], lm[self.LEFT_ANKLE]) >= scale * 1.55
and lm[self.LEFT_ANKLE].y >= lm[self.LEFT_KNEE].y - scale * 0.2
)
right_leg_extended = (
right_knee >= 150
and distance(lm[self.RIGHT_HIP], lm[self.RIGHT_ANKLE]) >= scale * 1.55
and lm[self.RIGHT_ANKLE].y >= lm[self.RIGHT_KNEE].y - scale * 0.2
)
feedback: list[str] = []
if left_arm_extended and left_elbow < 160:
feedback.append("Straighten left arm")
if right_arm_extended and right_elbow < 160:
feedback.append("Straighten right arm")
if left_leg_extended and left_knee < 165:
feedback.append("Straighten left leg")
if right_leg_extended and right_knee < 165:
feedback.append("Straighten right leg")
return DeadBugMetrics(
left_arm_extended=left_arm_extended,
right_arm_extended=right_arm_extended,
left_leg_extended=left_leg_extended,
right_leg_extended=right_leg_extended,
left_elbow_angle=left_elbow,
right_elbow_angle=right_elbow,
left_knee_angle=left_knee,
right_knee_angle=right_knee,
feedback=feedback,
)
def _update_state(self, metrics: DeadBugMetrics) -> DeadBugResult:
side = self._detect_diagonal_extension(metrics)
ready = self._is_ready_position(metrics)
if side is None:
self._candidate_side = None
self._candidate_frames = 0
elif side == self._candidate_side:
self._candidate_frames += 1
else:
self._candidate_side = side
self._candidate_frames = 1
if self.phase in (DeadBugPhase.READY, DeadBugPhase.NO_POSE):
if self._candidate_frames >= self.extension_confirm_frames and side is not None:
self.phase = DeadBugPhase.EXTENDING
self.active_side = side
self._reset_frames = 0
elif self.phase == DeadBugPhase.EXTENDING:
if side == self.active_side:
self.phase = DeadBugPhase.NEED_RESET
elif self.phase == DeadBugPhase.NEED_RESET:
if ready:
self._reset_frames += 1
if self._reset_frames >= self.reset_confirm_frames:
self.rep_count += 1
self.phase = DeadBugPhase.READY
self.active_side = None
self._candidate_side = None
self._candidate_frames = 0
self._reset_frames = 0
else:
self._reset_frames = 0
feedback = list(metrics.feedback)
if side is None and not ready:
feedback.append("Extend opposite arm and leg only")
if ready:
feedback.append("Ready position")
elif side == "left_arm_right_leg":
feedback.append("Left arm + right leg")
elif side == "right_arm_left_leg":
feedback.append("Right arm + left leg")
is_standard = side is not None and not metrics.feedback
return DeadBugResult(
rep_count=self.rep_count,
phase=self.phase,
side=side,
is_standard=is_standard,
feedback=feedback[:3],
metrics=metrics,
)
def _detect_diagonal_extension(self, metrics: DeadBugMetrics) -> str | None:
if metrics.left_leg_extended and metrics.right_leg_extended:
return None
# Dead bug starts with both arms raised, so the non-moving arm may also
# look "extended" in 2D. Infer the rep from the single extended leg and
# require the opposite arm to be extended, instead of rejecting both-arm
# frames as same-side noise.
if metrics.right_leg_extended and metrics.left_arm_extended:
return "left_arm_right_leg"
if metrics.left_leg_extended and metrics.right_arm_extended:
return "right_arm_left_leg"
return None
def _is_ready_position(self, metrics: DeadBugMetrics) -> bool:
knees_bent = metrics.left_knee_angle <= 140 and metrics.right_knee_angle <= 140
legs_not_extended = not metrics.left_leg_extended and not metrics.right_leg_extended
return knees_bent and legs_not_extended and self._detect_diagonal_extension(metrics) is None
def _draw_landmarks(self, image: np.ndarray, landmarks: list[Point]) -> None:
h, w = image.shape[:2]
connections = getattr(getattr(mp, "solutions", None), "pose", None)
pose_connections = getattr(connections, "POSE_CONNECTIONS", _POSE_CONNECTIONS)
for start, end in pose_connections:
if start >= len(landmarks) or end >= len(landmarks):
continue
p1 = landmarks[start]
p2 = landmarks[end]
if p1.visibility < self.visibility_threshold or p2.visibility < self.visibility_threshold:
continue
cv2.line(
image,
(int(p1.x * w), int(p1.y * h)),
(int(p2.x * w), int(p2.y * h)),
(65, 180, 255),
2,
)
for idx in self.REQUIRED_LANDMARKS:
p = landmarks[idx]
if p.visibility >= self.visibility_threshold:
cv2.circle(image, (int(p.x * w), int(p.y * h)), 4, (80, 255, 120), -1)
def _draw_status(self, image: np.ndarray, result: DeadBugResult) -> None:
color = (60, 220, 90) if result.is_standard else (50, 180, 255)
cv2.rectangle(image, (12, 12), (520, 142), (20, 20, 20), -1)
cv2.putText(image, f"Dead bug reps: {result.rep_count}", (28, 48), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
cv2.putText(image, f"phase: {result.phase.value}", (28, 82), cv2.FONT_HERSHEY_SIMPLEX, 0.68, (230, 230, 230), 2)
status = "standard" if result.is_standard else "adjust"
cv2.putText(image, f"status: {status}", (28, 116), cv2.FONT_HERSHEY_SIMPLEX, 0.68, color, 2)
y = 170
for text in result.feedback:
cv2.putText(image, text, (28, y), cv2.FONT_HERSHEY_SIMPLEX, 0.68, (255, 255, 255), 2)
y += 30
def angle(a: Point, b: Point, c: Point) -> float:
ba = np.array([a.x - b.x, a.y - b.y], dtype=np.float32)
bc = np.array([c.x - b.x, c.y - b.y], dtype=np.float32)
denom = float(np.linalg.norm(ba) * np.linalg.norm(bc))
if denom == 0:
return 0.0
cos_value = float(np.dot(ba, bc) / denom)
return float(np.degrees(np.arccos(np.clip(cos_value, -1.0, 1.0))))
def distance(a: Point, b: Point) -> float:
return float(np.hypot(a.x - b.x, a.y - b.y))
_POSE_CONNECTIONS = (
(11, 12),
(11, 13),
(13, 15),
(12, 14),
(14, 16),
(11, 23),
(12, 24),
(23, 24),
(23, 25),
(25, 27),
(24, 26),
(26, 28),
)