import asyncio import json import re import websockets import cv2 from loguru import logger from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate async def handle_client(websocket): client = websocket.remote_address logger.info(f"Client connected: {client}") pc = RTCPeerConnection() video_task = None def parse_ice(data): match = re.match( r'candidate:(\S+) (\d) (\S+) (\d+) (\S+) (\d+) typ (\S+)(?: raddr (\S+) rport (\d+))?', data["candidate"] ) if not match: return None g = match.groups() cand = RTCIceCandidate( foundation=g[0], component=int(g[1]), protocol=g[2].lower(), priority=int(g[3]), ip=g[4], port=int(g[5]), type=g[6], relatedAddress=g[7], relatedPort=int(g[8]) if g[8] else None, ) cand.sdpMid = data.get("sdpMid") cand.sdpMLineIndex = data.get("sdpMLineIndex", 0) return cand async def receive_video(track): logger.info("Start receiving video frames") frame_count = 0 try: while True: frame = await track.recv() frame_count += 1 img = frame.to_ndarray(format="bgr24") cv2.imshow("Android Camera (WebRTC)", img) if frame_count % 100 == 0: logger.info(f"Received {frame_count} frames, shape={img.shape}") if cv2.waitKey(1) & 0xFF == 27: logger.info("ESC pressed, closing display") break except asyncio.CancelledError: logger.info("Video receive task cancelled") except Exception as e: logger.error(f"Video receive error: {e}") @pc.on("track") async def on_track(track): logger.info(f"Track received: kind={track.kind}") if track.kind == "video": nonlocal video_task video_task = asyncio.ensure_future(receive_video(track)) @pc.on("iceconnectionstatechange") async def on_iceconnectionstatechange(): logger.info(f"ICE state: {pc.iceConnectionState}") if pc.iceConnectionState in ("failed", "closed", "disconnected"): await pc.close() try: async for message in websocket: data = json.loads(message) msg_type = data.get("type") if msg_type == "offer": offer = RTCSessionDescription(sdp=data["sdp"], type="offer") await pc.setRemoteDescription(offer) answer = await pc.createAnswer() await pc.setLocalDescription(answer) await websocket.send(json.dumps({ "type": "answer", "sdp": pc.localDescription.sdp, })) elif msg_type == "candidate": cand = parse_ice(data) if cand: await pc.addIceCandidate(cand) except websockets.ConnectionClosed: logger.info(f"Client disconnected: {client}") except Exception as e: logger.exception(f"Error: {e}") finally: if video_task: video_task.cancel() try: await video_task except asyncio.CancelledError: pass await pc.close() cv2.destroyAllWindows() logger.info(f"Connection closed: {client}") async def main(): host = "0.0.0.0" port = 8765 logger.info(f"WebRTC signaling server: ws://{host}:{port}") async with websockets.serve(handle_client, host, port, max_size=10 * 1024 * 1024): await asyncio.Future() if __name__ == "__main__": asyncio.run(main())