123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import logging, os, gc, cv2, time, threading
- import communication as rabbitmq
- import detect_people_pb2 as detect_people_info
- from ultralytics import YOLO
- import google.protobuf.text_format as tf
- class RTSCapture(cv2.VideoCapture):
- _cur_frame = None
- _reading = False
- schemes = ["rtsp://", "rtmp://"] # 用于识别实时流
- @staticmethod
- def create(url, *schemes):
- cap = RTSCapture(url)
- cap.frame_receiver = threading.Thread(target=cap.recv_frame, daemon=True)
- cap.schemes.extend(schemes)
- if isinstance(url, str) and url.startswith(tuple(cap.schemes)):
- cap._reading = True
- elif isinstance(url, int):
- # 这里可能是本机设备
- pass
- return cap
- def isStarted(self):
- is_ok = self.isOpened()
- if is_ok and self._reading:
- is_ok = self.frame_receiver.is_alive()
- else:
- return False
- return is_ok
- def recv_frame(self):
- while self._reading and self.isOpened():
- re_ok, re_frame = self.read()
- if not re_ok:
- del re_ok
- del re_frame
- break
- self._cur_frame = None
- self._cur_frame = re_frame
- del re_ok
- del re_frame
- self._reading = False
- def read2(self):
- rd_frame = self._cur_frame
- self._cur_frame = None
- return rd_frame is not None, rd_frame
- def start_read(self):
- self.frame_receiver.start()
- self.read_latest_frame = self.read2 if self._reading else self.read
- def stop_read(self):
- self._reading = False
- if self.frame_receiver.is_alive(): self.frame_receiver.join()
- if __name__ == '__main__':
- print(os.getpid())
- logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
- model = YOLO("yolov8n.pt")
- rtsp = "rtsp://admin:zx123456@192.168.2.64/Streaming/Channels/1"
- logging.info(rtsp)
- rtscap = RTSCapture.create(rtsp)
- rtscap.start_read() # 启动子线程并改变 read_latest_frame 的指向
- rabbitmq_client = rabbitmq.rabbitmq_client()
- time_lock = time.time()
- frame = None
- run_times = 0
- while run_times < 1000:
- run_times += 0
- # 维持循环最快0.1秒循环一次
- if time.time() - time_lock < 0.1:
- time.sleep(0.1 - time.time() + time_lock)
- else:
- time.sleep(0.001)
- time_lock = time.time()
- # 视频流抓取
- if rtscap.isStarted():
- ok, frame = rtscap.read_latest_frame()
- if not ok:
- logging.info("rtsp 连接断开")
- continue
- results = model.predict(source=frame, show=True)
- message = detect_people_info.DetectPeopleResults()
- for index in range(len(results[0].boxes.cls)):
- name = results[0].names[results[0].boxes.cls[index].item()]
- if name == "person":
- result = message.results.add()
- result.cls = name
- result.conf = results[0].boxes.conf[index].item()
- message_str = tf.MessageToString(message, as_utf8=True)
- rabbitmq_client.sendPredict(message_str)
- else:
- rtscap.stop_read()
- rtscap.release()
- rtscap = RTSCapture.create(rtsp)
- rtscap.start_read() # 启动子线程并改变 read_latest_frame 的指向
- cv2.destroyAllWindows()
|