node.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import datetime
  2. import os
  3. import threading
  4. import time
  5. import async_communication as CM
  6. import message_pb2 as message
  7. import google.protobuf.text_format as tf
  8. import check_command_XmSgj as CHE_XmSgj
  9. import check_command_GyBhhy as CHE_GyBhhy
  10. import tool.db_helper.db_operation as spmng
  11. import tool.json_helper.parse_json as parse_json
  12. import sys
  13. sys.path.append("..")
  14. from tool.log_helper.logger import HandleLog
  15. def check_park_command(body):
  16. # print("接收停车指令 %s message:%s" % (datetime.datetime.now(), body))
  17. log.info("接收停车指令 %s message:%s" % (datetime.datetime.now(),body[0:120]))
  18. cmd = checker.entrance_isOK(body)
  19. response_port = "user_park_command_%d_response_port" % cmd.terminal_id
  20. dispatch_key = ""
  21. if cmd.statu.execute_statu == message.eNormal:
  22. # 指令检查正常
  23. log.info("停车指令 %s OK: 车牌号:%s 检查成功!\n" % (datetime.datetime.now(), cmd.car_number))
  24. dispatch_key = "park_command_request_port"
  25. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(cmd, as_utf8=True))
  26. else:
  27. if cmd.statu.execute_statu == message.eWarning:
  28. log.warning("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  29. datetime.datetime.now(), cmd.car_number,
  30. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  31. tf.MessageToString(cmd, as_utf8=True)))
  32. elif cmd.statu.execute_statu == message.eError:
  33. log.error("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  34. datetime.datetime.now(), cmd.car_number,
  35. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  36. tf.MessageToString(cmd, as_utf8=True)))
  37. else:
  38. log.critical("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  39. datetime.datetime.now(), cmd.car_number,
  40. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  41. tf.MessageToString(cmd, as_utf8=True)))
  42. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  43. def check_pick_command(body):
  44. log.info("接收取车指令 %s message:%s" % (datetime.datetime.now(), body))
  45. cmd = checker.exit_isOK(body)
  46. response_port = "user_pick_command_%d_response_port" % cmd.terminal_id
  47. dispatch_key = ""
  48. if cmd.statu.execute_statu == message.eNormal:
  49. # 指令检查正常
  50. log.info("取车指令 %s OK : 唯一码:%s 检查成功!\n" % (datetime.datetime.now(), cmd.primary_key))
  51. dispatch_key = "pick_command_request_port"
  52. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(cmd, as_utf8=True))
  53. else:
  54. if cmd.statu.execute_statu == message.eWarning:
  55. log.warning("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  56. datetime.datetime.now(), cmd.primary_key,
  57. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  58. tf.MessageToString(cmd, as_utf8=True)))
  59. elif cmd.statu.execute_statu == message.eError:
  60. log.error("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  61. datetime.datetime.now(), cmd.primary_key,
  62. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  63. tf.MessageToString(cmd, as_utf8=True)))
  64. else:
  65. log.critical("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  66. datetime.datetime.now(), cmd.primary_key,
  67. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  68. tf.MessageToString(cmd, as_utf8=True)))
  69. if cmd.terminal_id != 0:
  70. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  71. if __name__ == '__main__':
  72. config = parse_json.parse_json_with_comments('./config.json')
  73. # 数据库配置
  74. db_config = config['db_config']
  75. # rabbitmq配置
  76. mq_config = config['mq_config']
  77. # 日志记录
  78. log = HandleLog.get_logger(os.getcwd()+'\\logs')
  79. g_space = spmng.DBOperation(db_config['db_ip'],db_config['db_port'],db_config['db_name'],db_config['db_user'],db_config['db_password'])
  80. # 消费指令消息
  81. cmd_callbacks = [["user_park_command_request_queue", check_park_command],
  82. ["user_pick_command_request_queue", check_pick_command]]
  83. print("系统初始化中...... %s " % (datetime.datetime.now()))
  84. if config['project_name'] == 'xm_sgj':
  85. checker = CHE_XmSgj.CommandChecker(g_space)
  86. elif config['project_name'] == 'gy_bhhy':
  87. checker = CHE_GyBhhy.CommandChecker(g_space)
  88. print("系统初始化完成! %s " % (datetime.datetime.now()))
  89. g_rabbitmq = CM.RabbitAsyncCommunicator(mq_config['mq_ip'], mq_config['mq_port'], mq_config['mq_user'], mq_config['mq_password'])
  90. g_rabbitmq.Init(cmd_callbacks, mq_config['mq_statu_exchange_keys'],"checker")
  91. for ex,key in mq_config['mq_statu_exchange_keys']:
  92. if key.find("dispatch") >= 0:
  93. g_rabbitmq.bind_statu_callback(ex,key,checker.receive_dispatch_statu)
  94. if key.find("measure") >= 0:
  95. g_rabbitmq.bind_statu_callback(ex,key,checker.receive_measureInfo)
  96. g_rabbitmq.start()
  97. g_rabbitmq.join()