Files

75 lines
2.5 KiB
Python

from __future__ import annotations
import asyncio
import json
import websockets
from aiortc import RTCPeerConnection, RTCSessionDescription
from loguru import logger
from app.signaling.ice_parser import parse_ice
from app.webrtc.video_receiver import VideoReceiver
class PeerSession:
"""WebRTC对等连接会话管理"""
def __init__(self) -> None:
self._pc = RTCPeerConnection()
self._video_task: asyncio.Task | None = None
async def handle(self, websocket) -> None:
"""处理WebSocket信令交互与WebRTC连接建立"""
self._setup_events()
try:
async for message in websocket:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "offer":
offer = RTCSessionDescription(sdp=data["sdp"], type="offer")
await self._pc.setRemoteDescription(offer)
answer = await self._pc.createAnswer()
await self._pc.setLocalDescription(answer)
await websocket.send(json.dumps({
"type": "answer",
"sdp": self._pc.localDescription.sdp,
}))
elif msg_type == "candidate":
cand = parse_ice(data)
if cand:
await self._pc.addIceCandidate(cand)
except websockets.ConnectionClosed:
pass
except Exception as e:
logger.exception(f"Error: {e}")
finally:
await self._cleanup()
def _setup_events(self) -> None:
"""注册ICE连接状态变化和视频轨道接收事件处理器"""
@self._pc.on("track")
async def on_track(track):
logger.info(f"Track received: kind={track.kind}")
if track.kind == "video":
receiver = VideoReceiver(track)
self._video_task = asyncio.ensure_future(receiver.run())
@self._pc.on("iceconnectionstatechange")
async def on_iceconnectionstatechange():
logger.info(f"ICE state: {self._pc.iceConnectionState}")
if self._pc.iceConnectionState in ("failed", "closed", "disconnected"):
await self._pc.close()
async def _cleanup(self) -> None:
"""清理视频任务并关闭对等连接"""
if self._video_task:
self._video_task.cancel()
try:
await self._video_task
except asyncio.CancelledError:
pass
await self._pc.close()