node.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import sys
  2. import json
  3. sys.path.append("..")
  4. import datetime
  5. import os
  6. import threading
  7. import time
  8. import async_communication as CM
  9. import message_pb2 as message
  10. import google.protobuf.text_format as tf
  11. import check_command_XmSgj as CHE_XmSgj
  12. import check_command_GyBhhy as CHE_GyBhhy
  13. import mytool.json_helper.parse_json as parse_json
  14. from http.server import HTTPServer, BaseHTTPRequestHandler
  15. from mytool.log_helper.logger import HandleLog
  16. def check_park_command(body):
  17. # print("接收停车指令 %s message:%s" % (datetime.datetime.now(), body))
  18. log.info("接收停车指令 %s message:%s" % (datetime.datetime.now(),body[0:250]))
  19. cmd = checker.entrance_isOK(body)
  20. park_cmd = message.park_table()
  21. park_cmd.CopyFrom(cmd)
  22. park_cmd.car_number_info.plate_full_image = ''
  23. park_cmd.car_number_info.plate_clip_image = ''
  24. response_port = "user_park_command_%d_response_port" % cmd.terminal_id
  25. dispatch_key = ""
  26. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(park_cmd, as_utf8=True))
  27. if cmd.statu.execute_statu == message.eNormal:
  28. # 指令检查正常
  29. log.info("停车指令 %s OK: 车牌号:%s 检查成功!\n" % (datetime.datetime.now(), cmd.car_number))
  30. dispatch_key = "dispatch_park_command_%d_request_port" % cmd.unit_id
  31. if park_cmd.unit_id != 31 and park_cmd.unit_id != 32:
  32. request = checker.get_request_data(cmd, 'in')
  33. statu,response = checker.push_http_request(checker.get_in_url(), request)
  34. cmd.car_number_info.plate_full_image = ''
  35. cmd.car_number_info.plate_clip_image = ''
  36. if statu == message.eNormal:
  37. if ('Data' in response.keys()) and ('InPicUrl' in response['Data'].keys()):
  38. cmd.car_number_info.plate_full_image = response['Data']['InPicUrl']
  39. cmd.car_number_info.plate_clip_image = response['Data']['InPlateUrl']
  40. cmd.car_number_info.plate_full_image = ''
  41. cmd.car_number_info.plate_clip_image = ''
  42. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(cmd, as_utf8=True))
  43. else:
  44. cmd.car_number_info.plate_full_image = ''
  45. cmd.car_number_info.plate_clip_image = ''
  46. if cmd.statu.execute_statu == message.eWarning:
  47. log.warning("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  48. datetime.datetime.now(), cmd.car_number,
  49. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  50. tf.MessageToString(cmd, as_utf8=True)))
  51. elif cmd.statu.execute_statu == message.eError:
  52. log.error("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  53. datetime.datetime.now(), cmd.car_number,
  54. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  55. tf.MessageToString(cmd, as_utf8=True)))
  56. else:
  57. log.critical("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  58. datetime.datetime.now(), cmd.car_number,
  59. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  60. tf.MessageToString(cmd, as_utf8=True)))
  61. def check_pick_command(body):
  62. log.info("接收取车指令 %s message:%s" % (datetime.datetime.now(), body))
  63. cmd = checker.exit_isOK(body)
  64. response_port = "user_pick_command_%d_response_port" % cmd.terminal_id
  65. dispatch_key = ""
  66. if cmd.statu.execute_statu == message.eNormal and cmd.statu.table_process_mod != message.PROCESS_ONLY_TO_PAY:
  67. # 指令检查正常
  68. log.info("取车指令 %s OK : 唯一码:%s 检查成功!\n" % (datetime.datetime.now(), cmd.primary_key))
  69. dispatch_key = "dispatch_pick_command_%d_request_port" % cmd.unit_id
  70. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(cmd, as_utf8=True))
  71. else:
  72. if cmd.statu.execute_statu == message.eWarning:
  73. log.warning("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  74. datetime.datetime.now(), cmd.primary_key,
  75. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  76. tf.MessageToString(cmd, as_utf8=True)))
  77. elif cmd.statu.execute_statu == message.eError:
  78. log.error("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  79. datetime.datetime.now(), cmd.primary_key,
  80. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  81. tf.MessageToString(cmd, as_utf8=True)))
  82. else:
  83. log.critical("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  84. datetime.datetime.now(), cmd.primary_key,
  85. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  86. tf.MessageToString(cmd, as_utf8=True)))
  87. if cmd.terminal_id != 0:
  88. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  89. else:
  90. return cmd
  91. class MyHttpRequestHandler(BaseHTTPRequestHandler):
  92. def _send_cors_headers(self):
  93. """ Sets headers required for CORS """
  94. self.send_header('Content-type', 'application/json')
  95. self.send_header("Access-Control-Allow-Origin", "*")
  96. self.send_header("Access-Control-Allow-Methods", "*")
  97. self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type")
  98. def do_POST(self):
  99. print('do_post:')
  100. datas = self.rfile.read(int(self.headers['content-length'])) # 固定格式,获取表单提交的数据
  101. self.send_response(200) # 返回状态码
  102. self.end_headers() # 返回响应头结束
  103. try:
  104. st = datas.decode('utf-8')
  105. res = json.loads(st)
  106. print(res)
  107. if ('data' in res.keys()) and ('InRecordId' in res['data'].keys()):
  108. primary_key = res['data']['InRecordId']
  109. table = message.pick_table()
  110. table.primary_key = primary_key
  111. cmd = check_pick_command(tf.MessageToString(table, as_utf8=True))
  112. if cmd.statu.execute_statu == message.eNormal:
  113. buf = {"Message": "取车成功", "Tag": 1}
  114. else:
  115. buf = {"Message": cmd.statu.statu_description, "Tag": 0}
  116. else:
  117. buf = {"Message": "取车指令缺少关键字段Data或InRecordId!", "Tag": 0}
  118. # 以下是返回报文
  119. except Exception as e:
  120. buf = {"Message": "请求解析失败!", "Tag": 0}
  121. self.wfile.write(json.dumps(buf, ensure_ascii=False).encode('utf-8')) # 发送json格式的返回包体
  122. print(buf)
  123. def recv_post():
  124. ts = HTTPServer(('0.0.0.0', 8899), MyHttpRequestHandler)
  125. ts.serve_forever()
  126. if __name__ == '__main__':
  127. config = parse_json.parse_json_with_comments('./config.json')
  128. # 数据库配置
  129. db_config = config['db_config']
  130. # rabbitmq配置
  131. mq_config = config['mq_config']
  132. # 日志记录
  133. log = HandleLog.get_logger(os.getcwd()+'\\logs')
  134. # 消费指令消息
  135. cmd_callbacks = [["user_park_command_request_queue", check_park_command],
  136. ["user_pick_command_request_queue", check_pick_command]]
  137. if config['project_name'] == 'xm_sgj':
  138. checker = CHE_XmSgj.CommandChecker(db_config)
  139. elif config['project_name'] == 'gy_bhhy':
  140. checker = CHE_GyBhhy.CommandChecker(db_config)
  141. g_rabbitmq = CM.RabbitAsyncCommunicator(mq_config['mq_ip'], mq_config['mq_port'], mq_config['mq_user'], mq_config['mq_password'])
  142. g_rabbitmq.Init(cmd_callbacks, mq_config['mq_statu_exchange_keys'],"checker")
  143. for ex,key in mq_config['mq_statu_exchange_keys']:
  144. if key.find("dispatch") >= 0:
  145. g_rabbitmq.bind_statu_callback(ex,key,checker.receive_dispatch_statu)
  146. if key.find("measure") >= 0:
  147. g_rabbitmq.bind_statu_callback(ex,key,checker.receive_measure_info)
  148. t1 = threading.Thread(target=recv_post)
  149. t1.start()
  150. g_rabbitmq.start()
  151. g_rabbitmq.join()
  152. t1.join()