1
0

2 Коммиты ff836f1163 ... 6a36ed92e2

Автор SHA1 Сообщение Дата
  LiuZe 6a36ed92e2 补充代码 1 год назад
  LiuZe a1dc768f5f 第一次测试完成 1 год назад

+ 43 - 0
communication.py

@@ -0,0 +1,43 @@
+import time
+
+import lib.ZX.async_communication as cm
+import lib.ZX.async_communication_etc_pb2 as rce
+import lib.ZX.tool as zx_tool
+import os, logging
+import detect_people_pb2 as detect_people_info
+import google.protobuf.text_format as tf
+
+default_rabbitmq_etc_file = os.path.dirname(os.path.abspath(__file__)) + "/etc/rabbitmq.json"
+
+
+class rabbitmq_client:
+    def rabbitmq_client_exex(self, message):
+        info = detect_people_info.DetectPeopleResults()
+        tf.Parse(message.statu, info)
+        # print("-------------------\n", info)
+
+    def __init__(self, file=default_rabbitmq_etc_file):
+        # 加载配置
+        rabbitmq_etc = zx_tool.getProtobufJsonConfig(file, rce.RabbitmqEtc())
+        print(rabbitmq_etc)
+        self.g_rabbitmq = cm.RabbitAsyncCommunicator(rabbitmq_etc.ip, rabbitmq_etc.port,
+                                                rabbitmq_etc.user, rabbitmq_etc.password)
+        self.statu_ex_keys = []
+        for bind in rabbitmq_etc.binds:
+            key = [bind.ex, bind.route_key]
+            self.statu_ex_keys.append(key)
+
+        self.g_rabbitmq.Init(None, self.statu_ex_keys)
+        self.g_rabbitmq.start()
+
+        self.g_rabbitmq.bind_statu_callback(self.statu_ex_keys[0][0], self.statu_ex_keys[0][1], self.rabbitmq_client_exex)
+
+    def sendPredict(self, message):
+        return self.g_rabbitmq.publish(self.statu_ex_keys[0][0], self.statu_ex_keys[0][1], message, timeout=1)
+
+
+if __name__ == '__main__':
+    client = rabbitmq_client()
+    while True:
+        client.sendPredict("test_msg")
+        time.sleep(1)

+ 10 - 0
detect_people.proto

@@ -0,0 +1,10 @@
+syntax = "proto2";
+
+message DetectPeopleResult {
+    required string cls = 1;
+    required float conf = 2;
+}
+message DetectPeopleResults {
+    repeated DetectPeopleResult results = 1;
+}
+

+ 27 - 0
detect_people_pb2.py

@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: detect_people.proto
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x64\x65tect_people.proto\"/\n\x12\x44\x65tectPeopleResult\x12\x0b\n\x03\x63ls\x18\x01 \x02(\t\x12\x0c\n\x04\x63onf\x18\x02 \x02(\x02\";\n\x13\x44\x65tectPeopleResults\x12$\n\x07results\x18\x01 \x03(\x0b\x32\x13.DetectPeopleResult')
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'detect_people_pb2', _globals)
+if _descriptor._USE_C_DESCRIPTORS == False:
+  DESCRIPTOR._options = None
+  _globals['_DETECTPEOPLERESULT']._serialized_start=23
+  _globals['_DETECTPEOPLERESULT']._serialized_end=70
+  _globals['_DETECTPEOPLERESULTS']._serialized_start=72
+  _globals['_DETECTPEOPLERESULTS']._serialized_end=131
+# @@protoc_insertion_point(module_scope)

+ 12 - 0
etc/rabbitmq.json

@@ -0,0 +1,12 @@
+{
+    "ip": "192.168.2.55",
+    "port": 5672,
+    "user": "zx",
+    "password": "zx123456",
+    "binds": [
+        {
+            "ex": "statu_ex",
+            "route_key": "detect_people_1101_port"
+        }
+    ]
+}

BIN
lib/ZX/__pycache__/async_communication.cpython-310.pyc


BIN
lib/ZX/__pycache__/async_communication_etc_pb2.cpython-310.pyc


BIN
lib/ZX/__pycache__/async_communication_etc_pb2.cpython-38.pyc


BIN
lib/ZX/__pycache__/tool.cpython-310.pyc


BIN
lib/ZX/__pycache__/tool.cpython-38.pyc


+ 135 - 0
lib/ZX/async_communication.py

@@ -0,0 +1,135 @@
+import threading
+import time
+import asyncio
+import aio_pika
+import queue
+
+
+class TimeStatu:
+    def __init__(self, statu=None, timeout=3):
+        self.statu = statu
+        self.time = time.time()
+        self.timeout_ms = timeout
+
+    def timeout(self, timeout=0):
+        tm = time.time()
+        if timeout > self.timeout_ms:
+            return tm - self.time > self.timeout_ms
+        return tm - self.time > self.timeout_ms
+
+    def reset(self, statu, timeout):
+        self.statu = statu
+        self.time = time.time()
+        self.timeout_ms = timeout
+
+
+class RabbitAsyncCommunicator(threading.Thread):
+    def __init__(self, host, port, user, password):
+        threading.Thread.__init__(self)
+        self._host = host
+        self._port = port
+        self._user = user
+        self._password = password
+        self._connection = None
+        self._channel_consumer = None
+        self._channel_send = None
+        self._channel_statu = None
+
+        self._consumer_callbacks = None
+        self._recv_status = None
+        self._queue_callbacks = {}
+        self._publish_msg_queue = queue.Queue()
+        self._status = {}
+        self._statu_callbacks = {}
+        self._closing = False
+
+    def Init(self, consumer_callbacks, recv_status):
+        self._consumer_callbacks = consumer_callbacks
+        self._recv_status = recv_status
+        if self._recv_status == None:
+            return
+        for ex_name, key in self._recv_status:
+            self._status[ex_name + ":" + key] = TimeStatu(None, 0.1)
+
+    def publish(self, ex_name, key, msg, timeout=None):
+        self._publish_msg_queue.put([ex_name, key, msg, timeout])
+
+    def bind_statu_callback(self, ex_name, key, callback):
+        self._statu_callbacks[ex_name + ":" + key] = callback
+
+    def close(self):
+        self._closing = True
+
+    async def init(self):
+        connection_string = "amqp://%s:%s@%s/" % (self._user, self._password, self._host)
+        self._connection = await aio_pika.connect_robust(connection_string)
+        self._channel_consumer = await self._connection.channel()
+        self._channel_send = await self._connection.channel()
+        self._channel_statu = await self._connection.channel()
+        # Will take no more than 10 messages in advance
+        await self._channel_consumer.set_qos(prefetch_count=1)
+        if self._consumer_callbacks == None:
+            return
+        for queue_name, callback in self._consumer_callbacks:
+            queue = await self._channel_consumer.get_queue(queue_name)
+            self._queue_callbacks[queue] = callback
+
+    def statu_callback(self, ex, key, msg):
+        id = ex + ":" + key
+        self._status[id] = TimeStatu(msg)
+        callback = self._statu_callbacks.get(id)
+        if not callback == None:
+            callback(self._status[id])
+
+    def __getitem__(self, key):
+        return self._status[key]
+
+    def __setitem__(self, key, value):
+        self._status[key] = value
+
+    async def recv(self, queue, callback):
+        async with queue.iterator() as queue_iter:
+            async for message in queue_iter:
+                if not callback == None:
+                    callback(message.body.decode())
+                    await message.ack()
+                if self._closing == True:
+                    return
+
+    async def recv_statu(self, ex_name, key, ttl=200):
+        statu_ex = await self._channel_statu.get_exchange(ex_name)
+        arg = {}
+        arg["x-message-ttl"] = ttl
+        queue = await self._channel_statu.declare_queue("", auto_delete=True, arguments=arg)
+        await queue.bind(statu_ex, routing_key=key)
+        async with queue.iterator() as queue_iter:
+            async for message in queue_iter:
+                async with message.process():
+                    self.statu_callback(ex_name, key, message.body.decode())
+                if self._closing == True:
+                    return
+
+    async def send(self):
+        while self._closing == False:
+            if self._publish_msg_queue.qsize() > 0:
+                msg_bag = self._publish_msg_queue.get(False)
+                if not msg_bag == None:
+                    ex_name, key, msg, ttl = msg_bag
+                    ex = await self._channel_send.get_exchange(ex_name)
+                    await ex.publish(aio_pika.Message(body=msg.encode(), expiration=ttl), routing_key=key)
+            await asyncio.sleep(0.001)
+
+    async def main(self):
+        await self.init()
+        tasks = []
+        tasks.append(self.send())
+        if not self._consumer_callbacks == None:
+            for queue, callback in self._queue_callbacks.items():
+                tasks.append(self.recv(queue, callback))
+        if not self._recv_status == None:
+            for ex_name, key in self._recv_status:
+                tasks.append(self.recv_statu(ex_name, key))
+        await asyncio.gather(*tasks)
+
+    def run(self):
+        asyncio.run(self.main())

+ 14 - 0
lib/ZX/async_communication_etc.proto

@@ -0,0 +1,14 @@
+syntax = "proto2";
+
+message RabbitmqBindEtc {
+    required string ex = 1;
+    required string route_key = 2;
+}
+
+message RabbitmqEtc {
+    required string ip = 1;
+    optional int32 port = 2 [default = 5672];
+    optional string user = 3 [default = "zx"];
+    optional string password = 4 [default = "zx123456"];
+    repeated RabbitmqBindEtc binds = 5;
+}

+ 27 - 0
lib/ZX/async_communication_etc_pb2.py

@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: async_communication_etc.proto
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1d\x61sync_communication_etc.proto\"0\n\x0fRabbitmqBindEtc\x12\n\n\x02\x65x\x18\x01 \x02(\t\x12\x11\n\troute_key\x18\x02 \x02(\t\"|\n\x0bRabbitmqEtc\x12\n\n\x02ip\x18\x01 \x02(\t\x12\x12\n\x04port\x18\x02 \x01(\x05:\x04\x35\x36\x37\x32\x12\x10\n\x04user\x18\x03 \x01(\t:\x02zx\x12\x1a\n\x08password\x18\x04 \x01(\t:\x08zx123456\x12\x1f\n\x05\x62inds\x18\x05 \x03(\x0b\x32\x10.RabbitmqBindEtc')
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'async_communication_etc_pb2', _globals)
+if _descriptor._USE_C_DESCRIPTORS == False:
+  DESCRIPTOR._options = None
+  _globals['_RABBITMQBINDETC']._serialized_start=33
+  _globals['_RABBITMQBINDETC']._serialized_end=81
+  _globals['_RABBITMQETC']._serialized_start=83
+  _globals['_RABBITMQETC']._serialized_end=207
+# @@protoc_insertion_point(module_scope)

+ 9 - 0
lib/ZX/tool.py

@@ -0,0 +1,9 @@
+import json, os
+from google.protobuf import json_format
+
+
+def getProtobufJsonConfig(fileName, message):
+    with open(fileName, 'r') as jsonFile:
+        # return json_format.Parse(json.dumps(jsonFile), etc.Devices())
+        json_etc = json.load(jsonFile)
+        return json_format.Parse(json.dumps(json_etc), message)

+ 104 - 0
main.py

@@ -0,0 +1,104 @@
+import logging, os, gc, cv2, time, threading
+import communication as rabbitmq
+import detect_people_pb2 as detect_people_info
+from ultralytics import YOLO
+import google.protobuf.text_format as tf
+
+
+class RTSCapture(cv2.VideoCapture):
+    _cur_frame = None
+    _reading = False
+    schemes = ["rtsp://", "rtmp://"]  # 用于识别实时流
+
+    @staticmethod
+    def create(url, *schemes):
+        cap = RTSCapture(url)
+        cap.frame_receiver = threading.Thread(target=cap.recv_frame, daemon=True)
+        cap.schemes.extend(schemes)
+        if isinstance(url, str) and url.startswith(tuple(cap.schemes)):
+            cap._reading = True
+        elif isinstance(url, int):
+            # 这里可能是本机设备
+            pass
+
+        return cap
+
+    def isStarted(self):
+        is_ok = self.isOpened()
+        if is_ok and self._reading:
+            is_ok = self.frame_receiver.is_alive()
+        else:
+            return False
+        return is_ok
+
+    def recv_frame(self):
+        while self._reading and self.isOpened():
+            re_ok, re_frame = self.read()
+            if not re_ok:
+                del re_ok
+                del re_frame
+                break
+            self._cur_frame = None
+            self._cur_frame = re_frame
+            del re_ok
+            del re_frame
+        self._reading = False
+
+    def read2(self):
+        rd_frame = self._cur_frame
+        self._cur_frame = None
+        return rd_frame is not None, rd_frame
+
+    def start_read(self):
+        self.frame_receiver.start()
+        self.read_latest_frame = self.read2 if self._reading else self.read
+
+    def stop_read(self):
+        self._reading = False
+        if self.frame_receiver.is_alive(): self.frame_receiver.join()
+
+
+if __name__ == '__main__':
+    print(os.getpid())
+    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
+    model = YOLO("yolov8n.pt")
+    rtsp = "rtsp://admin:zx123456@192.168.2.64/Streaming/Channels/1"
+    logging.info(rtsp)
+    rtscap = RTSCapture.create(rtsp)
+    rtscap.start_read()  # 启动子线程并改变 read_latest_frame 的指向
+    rabbitmq_client = rabbitmq.rabbitmq_client()
+    time_lock = time.time()
+    frame = None
+    run_times = 0
+    while run_times < 1000:
+        run_times += 0
+        # 维持循环最快0.1秒循环一次
+        if time.time() - time_lock < 0.1:
+            time.sleep(0.1 - time.time() + time_lock)
+        else:
+            time.sleep(0.001)
+        time_lock = time.time()
+
+        # 视频流抓取
+        if rtscap.isStarted():
+            ok, frame = rtscap.read_latest_frame()
+            if not ok:
+                logging.info("rtsp 连接断开")
+                continue
+            results = model.predict(source=frame, show=True)
+            message = detect_people_info.DetectPeopleResults()
+            for index in range(len(results[0].boxes.cls)):
+                name = results[0].names[results[0].boxes.cls[index].item()]
+                if name == "person":
+                    result = message.results.add()
+                    result.cls = name
+                    result.conf = results[0].boxes.conf[index].item()
+            message_str = tf.MessageToString(message, as_utf8=True)
+            rabbitmq_client.sendPredict(message_str)
+        else:
+            rtscap.stop_read()
+            rtscap.release()
+            rtscap = RTSCapture.create(rtsp)
+            rtscap.start_read()  # 启动子线程并改变 read_latest_frame 的指向
+
+    cv2.destroyAllWindows()

+ 2 - 0
proto.sh

@@ -0,0 +1,2 @@
+protoc -I=./ detect_people.proto --python_out=./
+protoc -I=./lib/ZX/ async_communication_etc.proto --python_out=./lib/ZX/

+ 69 - 0
setup.py

@@ -0,0 +1,69 @@
+from setuptools import setup
+
+
+def readme():
+    with open('README.md', encoding='utf-8') as f:
+        content = f.read()
+    return content
+
+
+setup(
+    # 包名称
+    name='DataAcquisition',
+
+    # 版本
+    version='1.0.0',
+
+    # 作者
+    author='LiuZe',
+
+    # 作者邮箱
+    author_email='lz2297519360@outlook.com',
+
+    # 长文描述
+    long_description=readme(),
+
+    # 长文描述的文本格式
+    long_description_content_type='text/markdown',
+
+    # 关键词
+    keywords='DataAcquisition',
+
+    # 包的分类信息,见https://pypi.org/pypi?%3Aaction=list_classifiers
+    classifiers=[
+    ],
+    # 许可证
+    license='Apache License 2.0',
+
+    # python版本要求
+    python_requires='>=3.7',
+
+    # 表明当前模块依赖哪些包,若环境中没有,则会从pypi中自动下载安装!!!
+    install_requires=[
+        # vzense tof3d-sdk need
+        "numpy",
+        "opencv-python",     # 如果无法自动安装,可以尝试在终端调用 pip install opencv-python 进行安装
+        # zx sdk
+        'ultralytics',
+        'aio-pika >= 9.0.0',
+        'protobuf == 4.23.4',
+        # 工具
+        # 'pyqt5',
+        # 'PyQt5-tools'
+    ],
+
+    # setup.py 本身要依赖的包,这通常是为一些setuptools的插件准备的配置,这里列出的包,不会自动安装。
+    setup_requires=[],
+
+    # 仅在测试时需要使用的依赖,在正常发布的代码中是没有用的。
+    # 在执行python setup.py test时,可以自动安装这三个库,确保测试的正常运行。
+    tests_require=[
+    ],
+
+    # install_requires 在安装模块时会自动安装依赖包
+    # 而 extras_require 不会,这里仅表示该模块会依赖这些包
+    # 但是这些包通常不会使用到,只有当你深度使用模块时,才会用到,这里需要你手动安装
+    extras_require={
+    }
+
+)