LiuZe 1 vuosi sitten
vanhempi
commit
6a36ed92e2

BIN
lib/ZX/__pycache__/async_communication.cpython-310.pyc


BIN
lib/ZX/__pycache__/async_communication_etc_pb2.cpython-310.pyc


BIN
lib/ZX/__pycache__/async_communication_etc_pb2.cpython-38.pyc


BIN
lib/ZX/__pycache__/tool.cpython-310.pyc


BIN
lib/ZX/__pycache__/tool.cpython-38.pyc


+ 135 - 0
lib/ZX/async_communication.py

@@ -0,0 +1,135 @@
+import threading
+import time
+import asyncio
+import aio_pika
+import queue
+
+
+class TimeStatu:
+    def __init__(self, statu=None, timeout=3):
+        self.statu = statu
+        self.time = time.time()
+        self.timeout_ms = timeout
+
+    def timeout(self, timeout=0):
+        tm = time.time()
+        if timeout > self.timeout_ms:
+            return tm - self.time > self.timeout_ms
+        return tm - self.time > self.timeout_ms
+
+    def reset(self, statu, timeout):
+        self.statu = statu
+        self.time = time.time()
+        self.timeout_ms = timeout
+
+
+class RabbitAsyncCommunicator(threading.Thread):
+    def __init__(self, host, port, user, password):
+        threading.Thread.__init__(self)
+        self._host = host
+        self._port = port
+        self._user = user
+        self._password = password
+        self._connection = None
+        self._channel_consumer = None
+        self._channel_send = None
+        self._channel_statu = None
+
+        self._consumer_callbacks = None
+        self._recv_status = None
+        self._queue_callbacks = {}
+        self._publish_msg_queue = queue.Queue()
+        self._status = {}
+        self._statu_callbacks = {}
+        self._closing = False
+
+    def Init(self, consumer_callbacks, recv_status):
+        self._consumer_callbacks = consumer_callbacks
+        self._recv_status = recv_status
+        if self._recv_status == None:
+            return
+        for ex_name, key in self._recv_status:
+            self._status[ex_name + ":" + key] = TimeStatu(None, 0.1)
+
+    def publish(self, ex_name, key, msg, timeout=None):
+        self._publish_msg_queue.put([ex_name, key, msg, timeout])
+
+    def bind_statu_callback(self, ex_name, key, callback):
+        self._statu_callbacks[ex_name + ":" + key] = callback
+
+    def close(self):
+        self._closing = True
+
+    async def init(self):
+        connection_string = "amqp://%s:%s@%s/" % (self._user, self._password, self._host)
+        self._connection = await aio_pika.connect_robust(connection_string)
+        self._channel_consumer = await self._connection.channel()
+        self._channel_send = await self._connection.channel()
+        self._channel_statu = await self._connection.channel()
+        # Will take no more than 10 messages in advance
+        await self._channel_consumer.set_qos(prefetch_count=1)
+        if self._consumer_callbacks == None:
+            return
+        for queue_name, callback in self._consumer_callbacks:
+            queue = await self._channel_consumer.get_queue(queue_name)
+            self._queue_callbacks[queue] = callback
+
+    def statu_callback(self, ex, key, msg):
+        id = ex + ":" + key
+        self._status[id] = TimeStatu(msg)
+        callback = self._statu_callbacks.get(id)
+        if not callback == None:
+            callback(self._status[id])
+
+    def __getitem__(self, key):
+        return self._status[key]
+
+    def __setitem__(self, key, value):
+        self._status[key] = value
+
+    async def recv(self, queue, callback):
+        async with queue.iterator() as queue_iter:
+            async for message in queue_iter:
+                if not callback == None:
+                    callback(message.body.decode())
+                    await message.ack()
+                if self._closing == True:
+                    return
+
+    async def recv_statu(self, ex_name, key, ttl=200):
+        statu_ex = await self._channel_statu.get_exchange(ex_name)
+        arg = {}
+        arg["x-message-ttl"] = ttl
+        queue = await self._channel_statu.declare_queue("", auto_delete=True, arguments=arg)
+        await queue.bind(statu_ex, routing_key=key)
+        async with queue.iterator() as queue_iter:
+            async for message in queue_iter:
+                async with message.process():
+                    self.statu_callback(ex_name, key, message.body.decode())
+                if self._closing == True:
+                    return
+
+    async def send(self):
+        while self._closing == False:
+            if self._publish_msg_queue.qsize() > 0:
+                msg_bag = self._publish_msg_queue.get(False)
+                if not msg_bag == None:
+                    ex_name, key, msg, ttl = msg_bag
+                    ex = await self._channel_send.get_exchange(ex_name)
+                    await ex.publish(aio_pika.Message(body=msg.encode(), expiration=ttl), routing_key=key)
+            await asyncio.sleep(0.001)
+
+    async def main(self):
+        await self.init()
+        tasks = []
+        tasks.append(self.send())
+        if not self._consumer_callbacks == None:
+            for queue, callback in self._queue_callbacks.items():
+                tasks.append(self.recv(queue, callback))
+        if not self._recv_status == None:
+            for ex_name, key in self._recv_status:
+                tasks.append(self.recv_statu(ex_name, key))
+        await asyncio.gather(*tasks)
+
+    def run(self):
+        asyncio.run(self.main())

+ 14 - 0
lib/ZX/async_communication_etc.proto

@@ -0,0 +1,14 @@
+syntax = "proto2";
+
+message RabbitmqBindEtc {
+    required string ex = 1;
+    required string route_key = 2;
+}
+
+message RabbitmqEtc {
+    required string ip = 1;
+    optional int32 port = 2 [default = 5672];
+    optional string user = 3 [default = "zx"];
+    optional string password = 4 [default = "zx123456"];
+    repeated RabbitmqBindEtc binds = 5;
+}

+ 27 - 0
lib/ZX/async_communication_etc_pb2.py

@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: async_communication_etc.proto
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1d\x61sync_communication_etc.proto\"0\n\x0fRabbitmqBindEtc\x12\n\n\x02\x65x\x18\x01 \x02(\t\x12\x11\n\troute_key\x18\x02 \x02(\t\"|\n\x0bRabbitmqEtc\x12\n\n\x02ip\x18\x01 \x02(\t\x12\x12\n\x04port\x18\x02 \x01(\x05:\x04\x35\x36\x37\x32\x12\x10\n\x04user\x18\x03 \x01(\t:\x02zx\x12\x1a\n\x08password\x18\x04 \x01(\t:\x08zx123456\x12\x1f\n\x05\x62inds\x18\x05 \x03(\x0b\x32\x10.RabbitmqBindEtc')
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'async_communication_etc_pb2', _globals)
+if _descriptor._USE_C_DESCRIPTORS == False:
+  DESCRIPTOR._options = None
+  _globals['_RABBITMQBINDETC']._serialized_start=33
+  _globals['_RABBITMQBINDETC']._serialized_end=81
+  _globals['_RABBITMQETC']._serialized_start=83
+  _globals['_RABBITMQETC']._serialized_end=207
+# @@protoc_insertion_point(module_scope)

+ 9 - 0
lib/ZX/tool.py

@@ -0,0 +1,9 @@
+import json, os
+from google.protobuf import json_format
+
+
+def getProtobufJsonConfig(fileName, message):
+    with open(fileName, 'r') as jsonFile:
+        # return json_format.Parse(json.dumps(jsonFile), etc.Devices())
+        json_etc = json.load(jsonFile)
+        return json_format.Parse(json.dumps(json_etc), message)