1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| import base64 import asyncio import threading import time import cv2 import numpy as np import uvicorn from io import BytesIO from typing import List from PIL import Image from fastapi import FastAPI, WebSocket, WebSocketDisconnect from PyQt5.QtCore import QObject, pyqtSignal from collections import deque from IDQueue import IDQueue
max_len_queue = 5 socket_to_ai_queue = deque(maxlen=max_len_queue) ai_to_socket_queue = IDQueue() socket_to_ai_lock = threading.Lock() ai_to_socket_lock = threading.Lock()
class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message)
async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message)
class WebTask(QObject): signal_text = pyqtSignal(str) signal_client_msg = pyqtSignal(int, int, np.ndarray)
def __init__(self): super().__init__() self.app = FastAPI() self.manager = ConnectionManager()
def long_running(self): @self.app.websocket("/ws_bytes/{client_id}") async def websocket_client_msg(websocket: WebSocket, client_id: int): await self.manager.connect(websocket) ai_to_socket_queue.clear(client_id) try: while True: data = await websocket.receive_json() images = data.get("images", {}) img_b64 = images.get("encoded", "") img_id_send = images.get("image_id", -1) if len(img_b64) > 0: img_file = BytesIO(base64.b64decode(img_b64)) img_array = np.frombuffer(img_file.getvalue(), dtype=np.uint8) im = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if len(socket_to_ai_queue) < max_len_queue: with socket_to_ai_lock: print("socket_to_ai_queue append") socket_to_ai_queue.append((client_id, img_id_send, im)) else: print("socket_to_ai_queue full") timeout = 1000 while timeout: timeout -= 1 if ai_to_socket_queue.len(client_id): with ai_to_socket_lock: print("ai_to_socket_queue pop") img_id_read, count = ai_to_socket_queue.pop(client_id) if img_id_send == img_id_read: break else: print(f"img_id_send:{img_id_send} != img_id_read:{img_id_read}") else: await asyncio.sleep(0.003)
await self.manager.send_personal_message(f"You client_id:{client_id}, img_id:{img_id_send}, count:{count}", websocket) except WebSocketDisconnect: self.manager.disconnect(websocket)
asyncio.create_task(uvicorn.run(self.app, host="0.0.0.0", port=8000))
class ProcessingTask(QObject): def __init__(self): super().__init__()
def work(self): print("work start") time.sleep(0.1) print("work finsh")
def send_result(self, client_id, val): if ai_to_socket_queue.len(client_id) < max_len_queue: with ai_to_socket_lock: print("ai_to_socket_queue append") ai_to_socket_queue.add(client_id, val) else: print("ai_to_socket_queue full")
def long_running(self): count = 0 print(f'ProcessingTask running') while True: if len(socket_to_ai_queue) > 0: with socket_to_ai_lock: print("socket_to_ai_lock popleft") client_id, img_id, im = socket_to_ai_queue.popleft() print(f"thread count:{count}") count += 1 self.work() self.send_result(client_id, (img_id, count)) else: time.sleep(0.003)
|