import datetime import time from async_communication import rabbit_async_communicator as g_rabbitmq import message_pb2 as message import google.protobuf.text_format as tf from CheckCommand import check_command as g_check import threading # 状态消息接收器参数 statu_ex_keys = [ ["statu_ex","in_mcpu_1_statu_port"], ["statu_ex","in_mcpu_2_statu_port"], ["statu_ex","in_mcpu_3_statu_port"], ["statu_ex","in_mcpu_4_statu_port"], ["statu_ex","in_mcpu_5_statu_port"], ["statu_ex","in_mcpu_6_statu_port"], ["statu_ex","measure_1_statu_port"], ["statu_ex","measure_2_statu_port"], ["statu_ex","measure_3_statu_port"], ["statu_ex","measure_4_statu_port"], ["statu_ex","measure_5_statu_port"], ["statu_ex","measure_6_statu_port"], ["statu_ex","dispatch_1_statu_port"], ["statu_ex","dispatch_2_statu_port"], ["statu_ex","dispatch_3_statu_port"] ] # mq参数 mq_ip = "192.168.1.233" mq_port = 5672 mq_user = "zx" mq_password = "zx123456" ex_name = "command_ex" def user_command_callback(body): if body.find("car_number")>=0: check_park_command(body) elif body.find("primary_key")>=0: check_pick_command(body) else: print("指令类型错误!\n%s" % body) def check_park_command(body): print("recieve park_command_queue message:%s" % body) table = g_check.check_park_command(body) cmd = message.park_table() tf.Parse(table, cmd) if cmd.statu.execute_statu == message.eNormal: # 指令检查正常 print("停车指令 %s OK: 车牌号:%s 检查成功!" % ( datetime.datetime.now(), cmd.car_number)) response_port = "command_enqueue_port" else: print("停车指令 %s ERROR: 车牌号:%s 检查失败!" % ( datetime.datetime.now(), cmd.car_number)) response_port = "park_response_%d_port" % cmd.terminal_id g_rabbitmq.publish( ex_name, response_port,table) print("发送反馈端口:%s 反馈表单:%s"%(response_port,table)) def check_pick_command(body): print("recv pick_command_queue message:%s" % body) table = g_check.check_pick_command(body) cmd = message.pick_table() tf.Parse(table, cmd) if cmd.statu.execute_statu == message.eNormal: # 指令检查正常 print("取车指令 %s OK : 唯一码:%s 检查成功!" % ( datetime.datetime.now(), cmd.primary_key)) response_port = "command_enqueue_port" else: print("取车指令 %s ERROR : 唯一码:%s 检查失败!" % ( datetime.datetime.now(), cmd.primary_key)) response_port = "pick_response_port" g_rabbitmq.publish( ex_name, response_port,table) print("发送反馈端口:%s 反馈表单:%s"%(response_port,table)) def statu_msg_thread(): statu = message.table_statu() statu.execute_statu = message.eNormal body = tf.MessageToString(statu, as_utf8=True) while True: g_rabbitmq.publish("statu_ex", "command_check_statu_port",body) time.sleep(1) if __name__ == '__main__': # 消费指令消息 cmd_callbacks=[["user_command_queue",user_command_callback]] g_rabbitmq.InitRabbitmq(mq_ip,mq_port,mq_user,mq_password) g_rabbitmq.InitConsumer(cmd_callbacks,statu_ex_keys) g_rabbitmq.start() statu = message.table_statu() statu.execute_statu = message.eNormal body = tf.MessageToString(statu, as_utf8=True) while True: g_rabbitmq.publish("statu_ex", "command_check_statu_port",body) time.sleep(1) g_rabbitmq.join()