main.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import logging, os, gc, cv2, time, threading
  2. import communication as rabbitmq
  3. import detect_people_pb2 as detect_people_info
  4. from ultralytics import YOLO
  5. import google.protobuf.text_format as tf
  6. class RTSCapture(cv2.VideoCapture):
  7. _cur_frame = None
  8. _reading = False
  9. schemes = ["rtsp://", "rtmp://"] # 用于识别实时流
  10. @staticmethod
  11. def create(url, *schemes):
  12. cap = RTSCapture(url)
  13. cap.frame_receiver = threading.Thread(target=cap.recv_frame, daemon=True)
  14. cap.schemes.extend(schemes)
  15. if isinstance(url, str) and url.startswith(tuple(cap.schemes)):
  16. cap._reading = True
  17. elif isinstance(url, int):
  18. # 这里可能是本机设备
  19. pass
  20. return cap
  21. def isStarted(self):
  22. is_ok = self.isOpened()
  23. if is_ok and self._reading:
  24. is_ok = self.frame_receiver.is_alive()
  25. else:
  26. return False
  27. return is_ok
  28. def recv_frame(self):
  29. while self._reading and self.isOpened():
  30. re_ok, re_frame = self.read()
  31. if not re_ok:
  32. del re_ok
  33. del re_frame
  34. break
  35. self._cur_frame = None
  36. self._cur_frame = re_frame
  37. del re_ok
  38. del re_frame
  39. self._reading = False
  40. def read2(self):
  41. rd_frame = self._cur_frame
  42. self._cur_frame = None
  43. return rd_frame is not None, rd_frame
  44. def start_read(self):
  45. self.frame_receiver.start()
  46. self.read_latest_frame = self.read2 if self._reading else self.read
  47. def stop_read(self):
  48. self._reading = False
  49. if self.frame_receiver.is_alive(): self.frame_receiver.join()
  50. if __name__ == '__main__':
  51. print(os.getpid())
  52. logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
  53. model = YOLO("yolov8n.pt")
  54. rtsp = "rtsp://admin:zx123456@192.168.2.64/Streaming/Channels/1"
  55. logging.info(rtsp)
  56. rtscap = RTSCapture.create(rtsp)
  57. rtscap.start_read() # 启动子线程并改变 read_latest_frame 的指向
  58. rabbitmq_client = rabbitmq.rabbitmq_client()
  59. time_lock = time.time()
  60. frame = None
  61. run_times = 0
  62. while run_times < 1000:
  63. run_times += 0
  64. # 维持循环最快0.1秒循环一次
  65. if time.time() - time_lock < 0.1:
  66. time.sleep(0.1 - time.time() + time_lock)
  67. else:
  68. time.sleep(0.001)
  69. time_lock = time.time()
  70. # 视频流抓取
  71. if rtscap.isStarted():
  72. ok, frame = rtscap.read_latest_frame()
  73. if not ok:
  74. logging.info("rtsp 连接断开")
  75. continue
  76. results = model.predict(source=frame, show=True)
  77. message = detect_people_info.DetectPeopleResults()
  78. for index in range(len(results[0].boxes.cls)):
  79. name = results[0].names[results[0].boxes.cls[index].item()]
  80. if name == "person":
  81. result = message.results.add()
  82. result.cls = name
  83. result.conf = results[0].boxes.conf[index].item()
  84. message_str = tf.MessageToString(message, as_utf8=True)
  85. rabbitmq_client.sendPredict(message_str)
  86. else:
  87. rtscap.stop_read()
  88. rtscap.release()
  89. rtscap = RTSCapture.create(rtsp)
  90. rtscap.start_read() # 启动子线程并改变 read_latest_frame 的指向
  91. cv2.destroyAllWindows()