from __future__ import annotations import asyncio import json import websockets from aiortc import RTCPeerConnection, RTCSessionDescription from loguru import logger from app.signaling.ice_parser import parse_ice from app.webrtc.video_receiver import VideoReceiver class PeerSession: """WebRTC对等连接会话管理""" def __init__(self) -> None: self._pc = RTCPeerConnection() self._video_task: asyncio.Task | None = None async def handle(self, websocket) -> None: """处理WebSocket信令交互与WebRTC连接建立""" self._setup_events() try: async for message in websocket: data = json.loads(message) msg_type = data.get("type") if msg_type == "offer": offer = RTCSessionDescription(sdp=data["sdp"], type="offer") await self._pc.setRemoteDescription(offer) answer = await self._pc.createAnswer() await self._pc.setLocalDescription(answer) await websocket.send(json.dumps({ "type": "answer", "sdp": self._pc.localDescription.sdp, })) elif msg_type == "candidate": cand = parse_ice(data) if cand: await self._pc.addIceCandidate(cand) except websockets.ConnectionClosed: pass except Exception as e: logger.exception(f"Error: {e}") finally: await self._cleanup() def _setup_events(self) -> None: """注册ICE连接状态变化和视频轨道接收事件处理器""" @self._pc.on("track") async def on_track(track): logger.info(f"Track received: kind={track.kind}") if track.kind == "video": receiver = VideoReceiver(track) self._video_task = asyncio.ensure_future(receiver.run()) @self._pc.on("iceconnectionstatechange") async def on_iceconnectionstatechange(): logger.info(f"ICE state: {self._pc.iceConnectionState}") if self._pc.iceConnectionState in ("failed", "closed", "disconnected"): await self._pc.close() async def _cleanup(self) -> None: """清理视频任务并关闭对等连接""" if self._video_task: self._video_task.cancel() try: await self._video_task except asyncio.CancelledError: pass await self._pc.close()