node.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import sys
  2. import json
  3. sys.path.append("..")
  4. import datetime
  5. import os
  6. import threading
  7. import time
  8. import async_communication as CM
  9. import message_pb2 as message
  10. import google.protobuf.text_format as tf
  11. import check_command_XmSgj as CHE_XmSgj
  12. import check_command_GyBhhy as CHE_GyBhhy
  13. import check_command_ZKXY as CHE_ZKXY
  14. import mytool.json_helper.parse_json as parse_json
  15. from http.server import HTTPServer, BaseHTTPRequestHandler
  16. from mytool.log_helper.logger import HandleLog
  17. def check_park_command(body):
  18. # print("接收停车指令 %s message:%s" % (datetime.datetime.now(), body))
  19. log.info("接收停车指令 %s message:%s" % (datetime.datetime.now(),body[0:260]))
  20. cmd = checker.entrance_isOK(body)
  21. table = message.park_table()
  22. table.CopyFrom(cmd)
  23. cmd.car_number_info.plate_full_image = ''
  24. cmd.car_number_info.plate_clip_image = ''
  25. response_port = "user_park_command_%d_response_port" % cmd.terminal_id
  26. dispatch_key = ""
  27. entrance_key = "entrance_update_%d_port" % cmd.terminal_id
  28. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  29. if cmd.statu.execute_statu == message.eNormal:
  30. # 指令检查正常
  31. dispatch_key = "dispatch_park_command_%d_request_port" % cmd.unit_id
  32. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(table, as_utf8=True))
  33. log.info("停车指令 %s OK:%s 检查成功!\n" % (datetime.datetime.now(), tf.MessageToString(table, as_utf8=True)))
  34. else:
  35. print("---------------刷新投影-----------------" + entrance_key)
  36. g_rabbitmq.publish(mq_config['mq_statu_exchange_name'], entrance_key,
  37. tf.MessageToString(cmd, as_utf8=True))
  38. if cmd.statu.execute_statu == message.eWarning:
  39. # 给终端发送指令,刷新投影
  40. log.warning("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  41. datetime.datetime.now(), cmd.car_number,
  42. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  43. tf.MessageToString(cmd, as_utf8=True)))
  44. elif cmd.statu.execute_statu == message.eError:
  45. log.error("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  46. datetime.datetime.now(), cmd.car_number,
  47. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  48. tf.MessageToString(cmd, as_utf8=True)))
  49. else:
  50. log.critical("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  51. datetime.datetime.now(), cmd.car_number,
  52. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  53. tf.MessageToString(cmd, as_utf8=True)))
  54. def check_pick_command(body):
  55. log.info("接收取车指令 %s message:%s" % (datetime.datetime.now(), body))
  56. cmd = checker.exit_isOK(body)
  57. response_port = "user_pick_command_%d_response_port" % cmd.terminal_id
  58. dispatch_key = ""
  59. if cmd.statu.execute_statu == message.eNormal and cmd.statu.table_process_mod != message.PROCESS_ONLY_TO_PAY:
  60. # 指令检查正常
  61. log.info("取车指令 %s OK :%s 检查成功!\n" % (datetime.datetime.now(), tf.MessageToString(cmd, as_utf8=True)))
  62. dispatch_key = "dispatch_pick_command_%d_request_port" % cmd.unit_id
  63. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(cmd, as_utf8=True))
  64. else:
  65. if cmd.statu.execute_statu == message.eWarning:
  66. log.warning("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  67. datetime.datetime.now(), cmd.primary_key,
  68. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  69. tf.MessageToString(cmd, as_utf8=True)))
  70. elif cmd.statu.execute_statu == message.eError:
  71. log.error("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  72. datetime.datetime.now(), cmd.primary_key,
  73. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  74. tf.MessageToString(cmd, as_utf8=True)))
  75. else:
  76. log.critical("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  77. datetime.datetime.now(), cmd.primary_key,
  78. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  79. tf.MessageToString(cmd, as_utf8=True)))
  80. if cmd.terminal_id != 0:
  81. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  82. else:
  83. return cmd
  84. class MyHttpRequestHandler(BaseHTTPRequestHandler):
  85. def _send_cors_headers(self):
  86. """ Sets headers required for CORS """
  87. self.send_header('Content-type', 'application/json')
  88. self.send_header("Access-Control-Allow-Origin", "*")
  89. self.send_header("Access-Control-Allow-Methods", "*")
  90. self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type")
  91. def do_POST(self):
  92. print('do_post:')
  93. datas = self.rfile.read(int(self.headers['content-length'])) # 固定格式,获取表单提交的数据
  94. self.send_response(200) # 返回状态码
  95. self.end_headers() # 返回响应头结束
  96. try:
  97. st = datas.decode('utf-8')
  98. res = json.loads(st)
  99. print(res)
  100. if ('data' in res.keys()) and ('InRecordId' in res['data'].keys()):
  101. primary_key = res['data']['InRecordId']
  102. table = message.pick_table()
  103. table.primary_key = primary_key
  104. # table.statu.table_process_mod = message.PROCESS_ONLY_TO_DISPATCH
  105. cmd = check_pick_command(tf.MessageToString(table, as_utf8=True))
  106. if cmd.statu.execute_statu == message.eNormal:
  107. buf = {"Message": "取车成功", "Tag": 1}
  108. else:
  109. buf = {"Message": cmd.statu.statu_description, "Tag": 0}
  110. else:
  111. buf = {"Message": "取车指令缺少关键字段Data或InRecordId!", "Tag": 0}
  112. # 以下是返回报文
  113. except Exception as e:
  114. buf = {"Message": "请求解析失败!", "Tag": 0}
  115. self.wfile.write(json.dumps(buf, ensure_ascii=False).encode('utf-8')) # 发送json格式的返回包体
  116. print(buf)
  117. def recv_post():
  118. ts = HTTPServer(('192.168.2.101', 8899), MyHttpRequestHandler)
  119. ts.serve_forever()
  120. if __name__ == '__main__':
  121. config = parse_json.parse_json_with_comments('./config.json')
  122. log = HandleLog.get_logger(os.getcwd() + '_logs')
  123. # 数据库配置
  124. db_config = config['db_config']
  125. # rabbitmq配置
  126. mq_config = config['mq_config']
  127. # 日志记录
  128. # 消费指令消息
  129. cmd_callbacks = [["user_park_command_request_queue", check_park_command],
  130. ["user_pick_command_request_queue", check_pick_command]]
  131. if config['project_name'] == 'xm_sgj':
  132. checker = CHE_XmSgj.CommandChecker(db_config)
  133. elif config['project_name'] == 'gy_bhhy':
  134. checker = CHE_GyBhhy.CommandChecker(db_config)
  135. elif config['project_name'] == 'zkxy':
  136. checker = CHE_ZKXY.CommandChecker(db_config)
  137. g_rabbitmq = CM.RabbitAsyncCommunicator(mq_config['mq_ip'], mq_config['mq_port'], mq_config['mq_user'], mq_config['mq_password'])
  138. g_rabbitmq.Init(cmd_callbacks, mq_config['mq_statu_exchange_keys'],"checker")
  139. for ex,key in mq_config['mq_statu_exchange_keys']:
  140. if key.find("dispatch") >= 0:
  141. g_rabbitmq.bind_statu_callback(ex,key,checker.receive_dispatch_statu)
  142. if key.find("measure") >= 0:
  143. g_rabbitmq.bind_statu_callback(ex,key,checker.receive_measure_info)
  144. # t1 = threading.Thread(target=recv_post)
  145. g_rabbitmq.start()
  146. # t1.start()
  147. g_rabbitmq.join()
  148. # t1.join()