import datetime import time import async_communication as CM import message_pb2 as message import google.protobuf.text_format as tf import CheckEntrance as CHE import threading # 状态消息接收器参数 statu_ex_keys = [ ["statu_ex","in_mcpu_1_statu_port"], ["statu_ex","in_mcpu_2_statu_port"], ["statu_ex","in_mcpu_3_statu_port"], ["statu_ex","in_mcpu_4_statu_port"], ["statu_ex","in_mcpu_5_statu_port"], ["statu_ex","in_mcpu_6_statu_port"], ["statu_ex","measure_1_statu_port"], ["statu_ex","measure_2_statu_port"], ["statu_ex","measure_3_statu_port"], ["statu_ex","measure_4_statu_port"], ["statu_ex","measure_5_statu_port"], ["statu_ex","measure_6_statu_port"], ["statu_ex","dispatch_1_statu_port"], ["statu_ex","dispatch_2_statu_port"], ["statu_ex","dispatch_3_statu_port"] ] # mq参数 mq_ip = "192.168.1.233" mq_port = 5672 mq_user = "zx" mq_password = "zx123456" ex_name = "command_ex" async def user_command_callback(body): if body.find("car_number")>=0: await check_park_command(body) elif body.find("primary_key")>=0: await check_pick_command(body) else: print("指令类型错误!\n%s" % body) async def check_park_command(body): print("recieve park_command_queue message:%s" % body) park = message.park_table() tf.Parse(body, park) table = await EntranceCheckers[park.terminal_id-1].entrance_isOK(park) cmd = message.park_table() tf.Parse(table, cmd) if cmd.statu.execute_statu == message.eNormal: # 指令检查正常 print("停车指令 %s OK: 车牌号:%s 检查成功!" % ( datetime.datetime.now(), cmd.car_number)) response_port = "command_enqueue_port" else: print("停车指令 %s ERROR: 车牌号:%s 检查失败!" % ( datetime.datetime.now(), cmd.car_number)) response_port = "park_response_%d_port" % cmd.terminal_id g_rabbitmq.publish( ex_name, response_port,table) print("发送反馈端口:%s 反馈表单:%s"%(response_port,table)) async def check_pick_command(body): print("recv pick_command_queue message:%s" % body) pick = message.pick_table() tf.Parse(body, pick) table = await EntranceCheckers[0].exit_isOK(pick) cmd = message.pick_table() tf.Parse(table, cmd) if cmd.statu.execute_statu == message.eNormal: # 指令检查正常 print("取车指令 %s OK : 唯一码:%s 检查成功!" % ( datetime.datetime.now(), cmd.primary_key)) response_port = "command_enqueue_port" else: print("取车指令 %s ERROR : 唯一码:%s 检查失败!" % ( datetime.datetime.now(), cmd.primary_key)) response_port = "pick_response_%d_port" % cmd.terminal_id g_rabbitmq.publish( ex_name, response_port,table) print("发送反馈端口:%s 反馈表单:%s"%(response_port,table)) def statu_msg_thread(): statu = message.table_statu() statu.execute_statu = message.eNormal body = tf.MessageToString(statu, as_utf8=True) while True: g_rabbitmq.publish("statu_ex", "command_check_statu_port",body) time.sleep(1) cmd_callbacks=[["user_command_queue",user_command_callback]] g_rabbitmq=CM.RabbitAsyncCommunicator(mq_ip,mq_port,mq_user,mq_password) g_rabbitmq.Init(cmd_callbacks,statu_ex_keys) g_rabbitmq.start() EntranceCheckers=[] for i in range(6): checker=CHE.EntranceChecker() g_rabbitmq.bind_statu_callback("statu_ex", "in_mcpu_%d_statu_port"%(i+1), checker.receive_icpu) g_rabbitmq.bind_statu_callback("statu_ex", "measure_%d_statu_port"%(i+1), checker.receive_measureInfo) EntranceCheckers.append(checker) checker.start() if __name__ == '__main__': # 消费指令消息 statu = message.table_statu() statu.execute_statu = message.eNormal body = tf.MessageToString(statu, as_utf8=True) while True: g_rabbitmq.publish("statu_ex", "command_check_statu_port",body) time.sleep(1)