node.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import sys
  2. import json
  3. sys.path.append("..")
  4. import datetime
  5. import os
  6. import threading
  7. import time
  8. import async_communication as CM
  9. import message_pb2 as message
  10. import google.protobuf.text_format as tf
  11. import check_command_XmSgj as CHE_XmSgj
  12. import check_command_GyBhhy as CHE_GyBhhy
  13. import check_command_ZKXY as CHE_ZKXY
  14. import mytool.json_helper.parse_json as parse_json
  15. from http.server import HTTPServer, BaseHTTPRequestHandler
  16. from mytool.log_helper.logger import HandleLog
  17. def check_park_command(body):
  18. # print("接收停车指令 %s message:%s" % (datetime.datetime.now(), body))
  19. log.info("接收停车指令 %s message:%s" % (datetime.datetime.now(), body[0:260]))
  20. cmd = checker.entrance_isOK(body)
  21. table = message.park_table()
  22. table.CopyFrom(cmd)
  23. cmd.car_number_info.plate_full_image = ''
  24. cmd.car_number_info.plate_clip_image = ''
  25. response_port = "user_park_command_%d_response_port" % cmd.terminal_id
  26. dispatch_key = ""
  27. entrance_key = "entrance_update_%d_port" % cmd.terminal_id
  28. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  29. if cmd.statu.execute_statu == message.eNormal:
  30. # 指令检查正常
  31. dispatch_key = "dispatch_park_command_%d_request_port" % cmd.unit_id
  32. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(table, as_utf8=True))
  33. log.info(
  34. "停车指令 %s OK:%s 检查成功!\n" % (datetime.datetime.now(), tf.MessageToString(table, as_utf8=True)))
  35. else:
  36. # print("---------------刷新投影-----------------" + entrance_key)
  37. # g_rabbitmq.publish(mq_config['mq_statu_exchange_name'], entrance_key,
  38. # tf.MessageToString(cmd, as_utf8=True))
  39. if cmd.statu.execute_statu == message.eWarning:
  40. # 给终端发送指令,刷新投影
  41. log.warning(
  42. "停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  43. datetime.datetime.now(), cmd.car_number,
  44. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  45. tf.MessageToString(cmd, as_utf8=True)))
  46. elif cmd.statu.execute_statu == message.eError:
  47. log.error("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  48. datetime.datetime.now(), cmd.car_number,
  49. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  50. tf.MessageToString(cmd, as_utf8=True)))
  51. else:
  52. log.critical(
  53. "停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  54. datetime.datetime.now(), cmd.car_number,
  55. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  56. tf.MessageToString(cmd, as_utf8=True)))
  57. def check_pick_command(body):
  58. log.info("接收取车指令 %s message:%s" % (datetime.datetime.now(), body))
  59. cmd = checker.exit_isOK(body)
  60. response_port = "user_pick_command_%d_response_port" % cmd.terminal_id
  61. dispatch_key = ""
  62. if cmd.statu.execute_statu == message.eNormal and cmd.statu.table_process_mod != message.PROCESS_ONLY_TO_PAY:
  63. # 指令检查正常
  64. log.info("取车指令 %s OK :%s 检查成功!\n" % (datetime.datetime.now(), tf.MessageToString(cmd, as_utf8=True)))
  65. dispatch_key = "dispatch_pick_command_%d_request_port" % cmd.unit_id
  66. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(cmd, as_utf8=True))
  67. else:
  68. if cmd.statu.execute_statu == message.eWarning:
  69. log.warning(
  70. "取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  71. datetime.datetime.now(), cmd.primary_key,
  72. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  73. tf.MessageToString(cmd, as_utf8=True)))
  74. elif cmd.statu.execute_statu == message.eError:
  75. log.error("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  76. datetime.datetime.now(), cmd.primary_key,
  77. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  78. tf.MessageToString(cmd, as_utf8=True)))
  79. else:
  80. log.critical(
  81. "取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  82. datetime.datetime.now(), cmd.primary_key,
  83. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  84. tf.MessageToString(cmd, as_utf8=True)))
  85. if cmd.terminal_id != 0:
  86. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  87. else:
  88. return cmd
  89. class MyHttpRequestHandler(BaseHTTPRequestHandler):
  90. def _send_cors_headers(self):
  91. """ Sets headers required for CORS """
  92. self.send_header('Content-type', 'application/json')
  93. self.send_header("Access-Control-Allow-Origin", "*")
  94. self.send_header("Access-Control-Allow-Methods", "*")
  95. self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type")
  96. def do_POST(self):
  97. print('do_post:')
  98. datas = self.rfile.read(int(self.headers['content-length'])) # 固定格式,获取表单提交的数据
  99. self.send_response(200) # 返回状态码
  100. self.end_headers() # 返回响应头结束
  101. code = 200
  102. msg = "error"
  103. data = "初始化"
  104. try:
  105. # utf-8解码解析json
  106. st = datas.decode('utf-8')
  107. res = json.loads(st)
  108. print(res)
  109. except Exception:
  110. code,msg,data = -200,"error","解析失败!"
  111. self.send_write(code,msg,data)
  112. return
  113. # 查看是否有指令类型字段
  114. if 'commandType' in res.keys():
  115. # 取车指令
  116. if res['commandType'] == 'PickCommand':
  117. try:
  118. # 查看是否有唯一码字段
  119. primary_key = res['inRecordId']
  120. except Exception:
  121. code, msg, data = -2, "error", "PickCommand,解析失败,缺少关键字段inRecordId!"
  122. self.send_write(code, msg, data)
  123. return
  124. # 生成取车表单,走正常取车指令
  125. table = message.pick_table()
  126. table.primary_key = primary_key
  127. # 执行取车指令
  128. cmd = check_pick_command(tf.MessageToString(table, as_utf8=True))
  129. if cmd.statu.execute_statu == message.eNormal:
  130. code, msg, data = 0, "success", "取车指令,接收成功!"
  131. else:
  132. code, msg, data = -1, "error", "取车失败!%s" % cmd.statu.statu_description
  133. # 查询排队
  134. elif res['commandType'] == 'RankCommand':
  135. try:
  136. garageNo = res['garageNo']
  137. unit = checker.get_unit_int(garageNo)
  138. car_number = res['licenesePlate']
  139. primary_key = res['inRecordId']
  140. except Exception:
  141. code, msg, data = -2, "error", "RankCommand,解析失败,缺少关键字段!"
  142. self.send_write(code, msg, data)
  143. return
  144. # 查询排队信息
  145. print("%%%%%%%%%%%%%%%%%%%%%%%%%%%% 查询排队信息 %%%%%%%%%%%%%%%%%%%%%%%%%%%%" + str(datetime.datetime.now()))
  146. command_list = checker.db.query_queue_condition_in_unit(unit)
  147. i = 0
  148. for dict in command_list:
  149. if dict['car_number'] == car_number:
  150. command_count = dict['row_number() over(order by queue_time)']-1
  151. pickup_queue_dict = {
  152. "licenesePlate": car_number,
  153. "inRecordId": primary_key,
  154. "garageNo": garageNo,
  155. "waitNum": command_count,
  156. "remainingTime": command_count * 2
  157. }
  158. code, msg, data = 0, "success",pickup_queue_dict
  159. break
  160. i = i+1
  161. # 没找到对应车辆
  162. if i == len(command_list):
  163. code, msg, data = -1, "error", '未查询到该车辆,车牌号:%s,单元号:%s,流水号:%s'%(car_number,garageNo,primary_key)
  164. # 预约车位
  165. elif res['commandType'] == 'ApplyCommand':
  166. try:
  167. apply_table = message.apply_table()
  168. apply_table.car_number = res['licenesePlate']
  169. apply_table.apply_end_time = res['endTime']
  170. except Exception:
  171. code, msg, data = -2, "error", "ApplyCommand,解析失败,缺少关键字段!"
  172. self.send_write(code, msg, data)
  173. return
  174. #预约车位查重
  175. sql = "Select * from space where height>1.799 and wheel_base>2.899 and unit>30 and car_number is null and statu=0"
  176. c_empty_space_list = checker.db.query(sql)
  177. space_list = checker.db.query_space_in_car_number(apply_table.car_number)
  178. if len(space_list)>0:
  179. code, msg, data = -1, "error", "预约失败,车辆已在库内!"
  180. elif len(c_empty_space_list) < 0:
  181. code, msg, data = -1, "error", "预约失败,无空闲车位!"
  182. else:
  183. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], "dispatch_apply_command_request_port",tf.MessageToString(apply_table, as_utf8=True))
  184. code, msg, data = 0, "success", "预约成功!"
  185. else:
  186. code, msg, data = -2, "error", "解析失败,缺少关键字段commandType!"
  187. # 发送答复
  188. self.send_write(code, msg, data)
  189. # 对接受到的post请求发送答复
  190. def send_write(self,code,msg,data):
  191. buf = {"code": code,
  192. "msg": msg,
  193. "data": data}
  194. self.wfile.write(json.dumps(buf, ensure_ascii=False).encode('utf-8')) # 发送json格式的返回包体
  195. print(buf)
  196. def recv_post():
  197. # 绑定IP
  198. ts = HTTPServer(('192.168.2.57', 8899), MyHttpRequestHandler)
  199. # 启动服务
  200. ts.serve_forever()
  201. if __name__ == '__main__':
  202. config = parse_json.parse_json_with_comments('./config.json')
  203. log = HandleLog.get_logger(os.getcwd() + '_logs')
  204. # 数据库配置
  205. db_config = config['db_config']
  206. # rabbitmq配置
  207. mq_config = config['mq_config']
  208. # 日志记录
  209. # 消费指令消息
  210. cmd_callbacks = [["user_park_command_request_queue", check_park_command],
  211. ["user_pick_command_request_queue", check_pick_command]]
  212. if config['project_name'] == 'xm_sgj':
  213. checker = CHE_XmSgj.CommandChecker(db_config)
  214. elif config['project_name'] == 'gy_bhhy':
  215. checker = CHE_GyBhhy.CommandChecker(db_config)
  216. elif config['project_name'] == 'zkxy':
  217. checker = CHE_ZKXY.CommandChecker(db_config)
  218. # 初始化rabbitmq
  219. g_rabbitmq = CM.RabbitAsyncCommunicator(mq_config['mq_ip'], mq_config['mq_port'], mq_config['mq_user'],
  220. mq_config['mq_password'])
  221. # g_rabbitmq.Init(cmd_callbacks, mq_config['mq_statu_exchange_keys'], "checker")
  222. # # 绑定队列
  223. # for ex, key in mq_config['mq_statu_exchange_keys']:
  224. # if key.find("dispatch") >= 0:
  225. # g_rabbitmq.bind_statu_callback(ex, key, checker.receive_dispatch_statu)
  226. # if key.find("measure") >= 0:
  227. # g_rabbitmq.bind_statu_callback(ex, key, checker.receive_measure_info)
  228. # 收费系统云端通信
  229. http_community = threading.Thread(target=recv_post)
  230. g_rabbitmq.start()
  231. http_community.start()
  232. g_rabbitmq.join()
  233. http_community.join()