import logging, os, gc, cv2, time, threading import communication as rabbitmq import detect_people_pb2 as detect_people_info from ultralytics import YOLO import google.protobuf.text_format as tf class RTSCapture(cv2.VideoCapture): _cur_frame = None _reading = False schemes = ["rtsp://", "rtmp://"] # 用于识别实时流 @staticmethod def create(url, *schemes): cap = RTSCapture(url) cap.frame_receiver = threading.Thread(target=cap.recv_frame, daemon=True) cap.schemes.extend(schemes) if isinstance(url, str) and url.startswith(tuple(cap.schemes)): cap._reading = True elif isinstance(url, int): # 这里可能是本机设备 pass return cap def isStarted(self): is_ok = self.isOpened() if is_ok and self._reading: is_ok = self.frame_receiver.is_alive() else: return False return is_ok def recv_frame(self): while self._reading and self.isOpened(): re_ok, re_frame = self.read() if not re_ok: del re_ok del re_frame break self._cur_frame = None self._cur_frame = re_frame del re_ok del re_frame self._reading = False def read2(self): rd_frame = self._cur_frame self._cur_frame = None return rd_frame is not None, rd_frame def start_read(self): self.frame_receiver.start() self.read_latest_frame = self.read2 if self._reading else self.read def stop_read(self): self._reading = False if self.frame_receiver.is_alive(): self.frame_receiver.join() if __name__ == '__main__': print(os.getpid()) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") model = YOLO("yolov8n.pt") rtsp = "rtsp://admin:zx123456@192.168.2.64/Streaming/Channels/1" logging.info(rtsp) rtscap = RTSCapture.create(rtsp) rtscap.start_read() # 启动子线程并改变 read_latest_frame 的指向 rabbitmq_client = rabbitmq.rabbitmq_client() time_lock = time.time() frame = None run_times = 0 while run_times < 1000: run_times += 0 # 维持循环最快0.1秒循环一次 if time.time() - time_lock < 0.1: time.sleep(0.1 - time.time() + time_lock) else: time.sleep(0.001) time_lock = time.time() # 视频流抓取 if rtscap.isStarted(): ok, frame = rtscap.read_latest_frame() if not ok: logging.info("rtsp 连接断开") continue results = model.predict(source=frame, show=True) message = detect_people_info.DetectPeopleResults() for index in range(len(results[0].boxes.cls)): name = results[0].names[results[0].boxes.cls[index].item()] if name == "person": result = message.results.add() result.cls = name result.conf = results[0].boxes.conf[index].item() message_str = tf.MessageToString(message, as_utf8=True) rabbitmq_client.sendPredict(message_str) else: rtscap.stop_read() rtscap.release() rtscap = RTSCapture.create(rtsp) rtscap.start_read() # 启动子线程并改变 read_latest_frame 的指向 cv2.destroyAllWindows()