LiuZe 1 год назад
Родитель
Сommit
a1dc768f5f
8 измененных файлов с 267 добавлено и 0 удалено
  1. 43 0
      communication.py
  2. 10 0
      detect_people.proto
  3. 27 0
      detect_people_pb2.py
  4. 12 0
      etc/rabbitmq.json
  5. 104 0
      main.py
  6. 2 0
      proto.sh
  7. 69 0
      setup.py
  8. BIN
      yolov8n.pt

+ 43 - 0
communication.py

@@ -0,0 +1,43 @@
+import time
+
+import lib.ZX.async_communication as cm
+import lib.ZX.async_communication_etc_pb2 as rce
+import lib.ZX.tool as zx_tool
+import os, logging
+import detect_people_pb2 as detect_people_info
+import google.protobuf.text_format as tf
+
+default_rabbitmq_etc_file = os.path.dirname(os.path.abspath(__file__)) + "/etc/rabbitmq.json"
+
+
+class rabbitmq_client:
+    def rabbitmq_client_exex(self, message):
+        info = detect_people_info.DetectPeopleResults()
+        tf.Parse(message.statu, info)
+        # print("-------------------\n", info)
+
+    def __init__(self, file=default_rabbitmq_etc_file):
+        # 加载配置
+        rabbitmq_etc = zx_tool.getProtobufJsonConfig(file, rce.RabbitmqEtc())
+        print(rabbitmq_etc)
+        self.g_rabbitmq = cm.RabbitAsyncCommunicator(rabbitmq_etc.ip, rabbitmq_etc.port,
+                                                rabbitmq_etc.user, rabbitmq_etc.password)
+        self.statu_ex_keys = []
+        for bind in rabbitmq_etc.binds:
+            key = [bind.ex, bind.route_key]
+            self.statu_ex_keys.append(key)
+
+        self.g_rabbitmq.Init(None, self.statu_ex_keys)
+        self.g_rabbitmq.start()
+
+        self.g_rabbitmq.bind_statu_callback(self.statu_ex_keys[0][0], self.statu_ex_keys[0][1], self.rabbitmq_client_exex)
+
+    def sendPredict(self, message):
+        return self.g_rabbitmq.publish(self.statu_ex_keys[0][0], self.statu_ex_keys[0][1], message, timeout=1)
+
+
+if __name__ == '__main__':
+    client = rabbitmq_client()
+    while True:
+        client.sendPredict("test_msg")
+        time.sleep(1)

+ 10 - 0
detect_people.proto

@@ -0,0 +1,10 @@
+syntax = "proto2";
+
+message DetectPeopleResult {
+    required string cls = 1;
+    required float conf = 2;
+}
+message DetectPeopleResults {
+    repeated DetectPeopleResult results = 1;
+}
+

+ 27 - 0
detect_people_pb2.py

@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: detect_people.proto
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x64\x65tect_people.proto\"/\n\x12\x44\x65tectPeopleResult\x12\x0b\n\x03\x63ls\x18\x01 \x02(\t\x12\x0c\n\x04\x63onf\x18\x02 \x02(\x02\";\n\x13\x44\x65tectPeopleResults\x12$\n\x07results\x18\x01 \x03(\x0b\x32\x13.DetectPeopleResult')
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'detect_people_pb2', _globals)
+if _descriptor._USE_C_DESCRIPTORS == False:
+  DESCRIPTOR._options = None
+  _globals['_DETECTPEOPLERESULT']._serialized_start=23
+  _globals['_DETECTPEOPLERESULT']._serialized_end=70
+  _globals['_DETECTPEOPLERESULTS']._serialized_start=72
+  _globals['_DETECTPEOPLERESULTS']._serialized_end=131
+# @@protoc_insertion_point(module_scope)

+ 12 - 0
etc/rabbitmq.json

@@ -0,0 +1,12 @@
+{
+    "ip": "192.168.2.55",
+    "port": 5672,
+    "user": "zx",
+    "password": "zx123456",
+    "binds": [
+        {
+            "ex": "statu_ex",
+            "route_key": "detect_people_1101_port"
+        }
+    ]
+}

+ 104 - 0
main.py

@@ -0,0 +1,104 @@
+import logging, os, gc, cv2, time, threading
+import communication as rabbitmq
+import detect_people_pb2 as detect_people_info
+from ultralytics import YOLO
+import google.protobuf.text_format as tf
+
+
+class RTSCapture(cv2.VideoCapture):
+    _cur_frame = None
+    _reading = False
+    schemes = ["rtsp://", "rtmp://"]  # 用于识别实时流
+
+    @staticmethod
+    def create(url, *schemes):
+        cap = RTSCapture(url)
+        cap.frame_receiver = threading.Thread(target=cap.recv_frame, daemon=True)
+        cap.schemes.extend(schemes)
+        if isinstance(url, str) and url.startswith(tuple(cap.schemes)):
+            cap._reading = True
+        elif isinstance(url, int):
+            # 这里可能是本机设备
+            pass
+
+        return cap
+
+    def isStarted(self):
+        is_ok = self.isOpened()
+        if is_ok and self._reading:
+            is_ok = self.frame_receiver.is_alive()
+        else:
+            return False
+        return is_ok
+
+    def recv_frame(self):
+        while self._reading and self.isOpened():
+            re_ok, re_frame = self.read()
+            if not re_ok:
+                del re_ok
+                del re_frame
+                break
+            self._cur_frame = None
+            self._cur_frame = re_frame
+            del re_ok
+            del re_frame
+        self._reading = False
+
+    def read2(self):
+        rd_frame = self._cur_frame
+        self._cur_frame = None
+        return rd_frame is not None, rd_frame
+
+    def start_read(self):
+        self.frame_receiver.start()
+        self.read_latest_frame = self.read2 if self._reading else self.read
+
+    def stop_read(self):
+        self._reading = False
+        if self.frame_receiver.is_alive(): self.frame_receiver.join()
+
+
+if __name__ == '__main__':
+    print(os.getpid())
+    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
+    model = YOLO("yolov8n.pt")
+    rtsp = "rtsp://admin:zx123456@192.168.2.64/Streaming/Channels/1"
+    logging.info(rtsp)
+    rtscap = RTSCapture.create(rtsp)
+    rtscap.start_read()  # 启动子线程并改变 read_latest_frame 的指向
+    rabbitmq_client = rabbitmq.rabbitmq_client()
+    time_lock = time.time()
+    frame = None
+    run_times = 0
+    while run_times < 1000:
+        run_times += 0
+        # 维持循环最快0.1秒循环一次
+        if time.time() - time_lock < 0.1:
+            time.sleep(0.1 - time.time() + time_lock)
+        else:
+            time.sleep(0.001)
+        time_lock = time.time()
+
+        # 视频流抓取
+        if rtscap.isStarted():
+            ok, frame = rtscap.read_latest_frame()
+            if not ok:
+                logging.info("rtsp 连接断开")
+                continue
+            results = model.predict(source=frame, show=True)
+            message = detect_people_info.DetectPeopleResults()
+            for index in range(len(results[0].boxes.cls)):
+                name = results[0].names[results[0].boxes.cls[index].item()]
+                if name == "person":
+                    result = message.results.add()
+                    result.cls = name
+                    result.conf = results[0].boxes.conf[index].item()
+            message_str = tf.MessageToString(message, as_utf8=True)
+            rabbitmq_client.sendPredict(message_str)
+        else:
+            rtscap.stop_read()
+            rtscap.release()
+            rtscap = RTSCapture.create(rtsp)
+            rtscap.start_read()  # 启动子线程并改变 read_latest_frame 的指向
+
+    cv2.destroyAllWindows()

+ 2 - 0
proto.sh

@@ -0,0 +1,2 @@
+protoc -I=./ detect_people.proto --python_out=./
+protoc -I=./lib/ZX/ async_communication_etc.proto --python_out=./lib/ZX/

+ 69 - 0
setup.py

@@ -0,0 +1,69 @@
+from setuptools import setup
+
+
+def readme():
+    with open('README.md', encoding='utf-8') as f:
+        content = f.read()
+    return content
+
+
+setup(
+    # 包名称
+    name='DataAcquisition',
+
+    # 版本
+    version='1.0.0',
+
+    # 作者
+    author='LiuZe',
+
+    # 作者邮箱
+    author_email='lz2297519360@outlook.com',
+
+    # 长文描述
+    long_description=readme(),
+
+    # 长文描述的文本格式
+    long_description_content_type='text/markdown',
+
+    # 关键词
+    keywords='DataAcquisition',
+
+    # 包的分类信息,见https://pypi.org/pypi?%3Aaction=list_classifiers
+    classifiers=[
+    ],
+    # 许可证
+    license='Apache License 2.0',
+
+    # python版本要求
+    python_requires='>=3.7',
+
+    # 表明当前模块依赖哪些包,若环境中没有,则会从pypi中自动下载安装!!!
+    install_requires=[
+        # vzense tof3d-sdk need
+        "numpy",
+        "opencv-python",     # 如果无法自动安装,可以尝试在终端调用 pip install opencv-python 进行安装
+        # zx sdk
+        'ultralytics',
+        'aio-pika >= 9.0.0',
+        'protobuf == 4.23.4',
+        # 工具
+        # 'pyqt5',
+        # 'PyQt5-tools'
+    ],
+
+    # setup.py 本身要依赖的包,这通常是为一些setuptools的插件准备的配置,这里列出的包,不会自动安装。
+    setup_requires=[],
+
+    # 仅在测试时需要使用的依赖,在正常发布的代码中是没有用的。
+    # 在执行python setup.py test时,可以自动安装这三个库,确保测试的正常运行。
+    tests_require=[
+    ],
+
+    # install_requires 在安装模块时会自动安装依赖包
+    # 而 extras_require 不会,这里仅表示该模块会依赖这些包
+    # 但是这些包通常不会使用到,只有当你深度使用模块时,才会用到,这里需要你手动安装
+    extras_require={
+    }
+
+)