node.py 13 KB


  1. import hashlib
  2. import socketserver
  3. import sys
  4. import json
  5. sys.path.append("..")
  6. import datetime
  7. import os
  8. import threading
  9. import time
  10. import async_communication as CM
  11. import message_pb2 as message
  12. import google.protobuf.text_format as tf
  13. import check_command_XmSgj as CHE_XmSgj
  14. import check_command_GyBhhy as CHE_GyBhhy
  15. import check_command_ZKXY as CHE_ZKXY
  16. import mytool.json_helper.parse_json as parse_json
  17. from http.server import HTTPServer, BaseHTTPRequestHandler
  18. from mytool.log_helper.logger import HandleLog
  19. def check_park_command(body):
  20. # print("接收停车指令 %s message:%s" % (datetime.datetime.now(), body))
  21. log.info("接收停车指令 %s message:%s" % (datetime.datetime.now(), body[0:260]))
  22. cmd = checker.entrance_isOK(body)
  23. table = message.park_table()
  24. table.CopyFrom(cmd)
  25. cmd.car_number_info.plate_full_image = ''
  26. cmd.car_number_info.plate_clip_image = ''
  27. response_port = "user_park_command_%d_response_port" % cmd.terminal_id
  28. dispatch_key = ""
  29. entrance_key = "entrance_update_%d_port" % cmd.terminal_id
  30. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  31. if cmd.statu.execute_statu == message.eNormal:
  32. # 指令检查正常
  33. dispatch_key = "dispatch_park_command_%d_request_port" % cmd.unit_id
  34. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(table, as_utf8=True))
  35. log.info(
  36. "停车指令 %s OK:%s 检查成功!\n" % (datetime.datetime.now(), tf.MessageToString(table, as_utf8=True)))
  37. else:
  38. # print("---------------刷新投影-----------------" + entrance_key)
  39. # g_rabbitmq.publish(mq_config['mq_statu_exchange_name'], entrance_key,
  40. # tf.MessageToString(cmd, as_utf8=True))
  41. if cmd.statu.execute_statu == message.eWarning:
  42. # 给终端发送指令,刷新投影
  43. log.warning(
  44. "停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  45. datetime.datetime.now(), cmd.car_number,
  46. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  47. tf.MessageToString(cmd, as_utf8=True)))
  48. elif cmd.statu.execute_statu == message.eError:
  49. log.error("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  50. datetime.datetime.now(), cmd.car_number,
  51. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  52. tf.MessageToString(cmd, as_utf8=True)))
  53. else:
  54. log.critical(
  55. "停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  56. datetime.datetime.now(), cmd.car_number,
  57. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  58. tf.MessageToString(cmd, as_utf8=True)))
  59. def check_pick_command(body):
  60. log.info("接收取车指令 %s message:%s" % (datetime.datetime.now(), body))
  61. cmd = checker.exit_isOK(body)
  62. response_port = "user_pick_command_%d_response_port" % cmd.terminal_id
  63. dispatch_key = ""
  64. if cmd.statu.execute_statu == message.eNormal and cmd.statu.table_process_mod != message.PROCESS_ONLY_TO_PAY:
  65. # 指令检查正常
  66. log.info("取车指令 %s OK :%s 检查成功!\n" % (datetime.datetime.now(), tf.MessageToString(cmd, as_utf8=True)))
  67. dispatch_key = "dispatch_pick_command_%d_request_port" % cmd.unit_id
  68. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(cmd, as_utf8=True))
  69. else:
  70. if cmd.statu.execute_statu == message.eWarning:
  71. log.warning(
  72. "取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  73. datetime.datetime.now(), cmd.primary_key,
  74. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  75. tf.MessageToString(cmd, as_utf8=True)))
  76. elif cmd.statu.execute_statu == message.eError:
  77. log.error("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  78. datetime.datetime.now(), cmd.primary_key,
  79. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  80. tf.MessageToString(cmd, as_utf8=True)))
  81. else:
  82. log.critical(
  83. "取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
  84. datetime.datetime.now(), cmd.primary_key,
  85. mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
  86. tf.MessageToString(cmd, as_utf8=True)))
  87. if cmd.terminal_id != 0:
  88. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
  89. else:
  90. return cmd
  91. class MyHttpRequestHandler(BaseHTTPRequestHandler):
  92. def _send_cors_headers(self):
  93. """ Sets headers required for CORS """
  94. self.send_header('Content-type', 'application/json')
  95. self.send_header("Access-Control-Allow-Origin", "*")
  96. self.send_header("Access-Control-Allow-Methods", "*")
  97. self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type")
  98. def check(self,Appid,Time,Sign):
  99. self.Appid = '14249167070745034'
  100. self.AppSecret = 'vN2dZq2C3f3EAoiURrIDbjFWtPPlTetB'
  101. sign_iphone = Appid + Time + self.AppSecret
  102. hl_iphone = hashlib.md5()
  103. hl_iphone.update(sign_iphone.encode(encoding='utf-8'))
  104. sign_iphone = hl_iphone.hexdigest()
  105. if sign_iphone == Sign:
  106. return True
  107. return False
  108. def do_POST(self):
  109. print('do_post:')
  110. path = str(self.path)
  111. if path != '/api/operateOutScene/sgjQueryRankMsg' :
  112. code, msg, data = -200, "error", "地址错误!"
  113. self.send_write(code, msg, data)
  114. return
  115. try:
  116. Appid = self.headers['Appid']
  117. Time = self.headers['Time']
  118. Sign = self.headers['Sign']
  119. if self.check(Appid,Time,Sign) is False:
  120. code, msg, data = -200, "error", "验签失败!"
  121. self.send_write(code, msg, data)
  122. return
  123. except Exception:
  124. code,msg,data = -200,"error","验签失败!"
  125. self.send_write(code,msg,data)
  126. return
  127. datas = self.rfile.read(int(self.headers['content-length'])) # 固定格式,获取表单提交的数据
  128. self.send_response(200) # 返回状态码
  129. self.end_headers() # 返回响应头结束
  130. code = 200
  131. msg = "error"
  132. data = "初始化"
  133. try:
  134. # utf-8解码解析json
  135. st = datas.decode('utf-8')
  136. res = json.loads(st)
  137. print(res)
  138. except Exception:
  139. code,msg,data = -200,"error","解析失败!"
  140. self.send_write(code,msg,data)
  141. return
  142. # 查看是否有指令类型字段
  143. if 'commandType' in res.keys():
  144. # 取车指令
  145. if res['commandType'] == 'PickCommand':
  146. try:
  147. # 查看是否有唯一码字段
  148. primary_key = res['inRecordId']
  149. except Exception:
  150. code, msg, data = -20, "error", "PickCommand,解析失败,缺少关键字段inRecordId!"
  151. self.send_write(code, msg, data)
  152. return
  153. # 生成取车表单,走正常取车指令
  154. table = message.pick_table()
  155. table.primary_key = primary_key
  156. # 执行取车指令
  157. cmd = check_pick_command(tf.MessageToString(table, as_utf8=True))
  158. if cmd.statu.execute_statu == message.eNormal:
  159. code, msg, data = 0, "success", "取车指令,接收成功!"
  160. else:
  161. code, msg, data = -1, "error", "取车失败!%s" % cmd.statu.statu_description
  162. # 查询排队
  163. elif res['commandType'] == 'RankCommand':
  164. try:
  165. garageNo = res['garageNo']
  166. unit = checker.get_unit_int(garageNo)
  167. car_number = res['licenesePlate']
  168. primary_key = res['inRecordId']
  169. except Exception:
  170. code, msg, data = -20, "error", "RankCommand,解析失败,缺少关键字段!"
  171. self.send_write(code, msg, data)
  172. return
  173. # 查询排队信息
  174. print("%%%%%%%%%%%%%%%%%%%%%%%%%%%% 查询排队信息 %%%%%%%%%%%%%%%%%%%%%%%%%%%%" + str(datetime.datetime.now()))
  175. command_list = checker.db.query_queue_condition_in_unit(unit)
  176. i = 0
  177. for dict in command_list:
  178. if dict['car_number'] == car_number:
  179. command_count = dict['row_number() over(order by queue_time)']-1
  180. pickup_queue_dict = {
  181. "licenesePlate": car_number,
  182. "inRecordId": primary_key,
  183. "garageNo": garageNo,
  184. "waitNum": command_count,
  185. "remainingTime": command_count * 2
  186. }
  187. code, msg, data = 0, "success",pickup_queue_dict
  188. break
  189. i = i+1
  190. # 没找到对应车辆
  191. if i == len(command_list):
  192. code, msg, data = -1, "error", '未查询到该车辆,车牌号:%s,单元号:%s,流水号:%s'%(car_number,garageNo,primary_key)
  193. # 预约车位
  194. elif res['commandType'] == 'ApplyCommand':
  195. try:
  196. apply_table = message.apply_table()
  197. apply_table.car_number = res['licenesePlate']
  198. apply_table.apply_end_time = res['endTime']
  199. apply_table.operation_type = res['operationType']
  200. except Exception:
  201. code, msg, data = -20, "error", "ApplyCommand,解析失败,缺少关键字段!"
  202. self.send_write(code, msg, data)
  203. return
  204. #预约车位查重
  205. sql = "Select * from space where height>1.799 and wheel_base>2.899 and unit>30 and car_number is null and statu=0"
  206. c_empty_space_list = checker.db.query(sql)
  207. space_list = checker.db.query_space_in_car_number(apply_table.car_number)
  208. if len(space_list)>0:
  209. code, msg, data = -2, "error", "预约失败,车辆已在库内!"
  210. elif len(c_empty_space_list) < 0:
  211. code, msg, data = -1, "error", "预约失败,无空闲车位!"
  212. else:
  213. g_rabbitmq.publish(mq_config['mq_command_exchange_name'], "dispatch_apply_command_request_port",tf.MessageToString(apply_table, as_utf8=True))
  214. code, msg, data = 0, "success", "预约成功!"
  215. else:
  216. code, msg, data = -20, "error", "解析失败,缺少关键字段commandType!"
  217. # 发送答复
  218. self.send_write(code, msg, data)
  219. # 对接受到的post请求发送答复
  220. def send_write(self,code,msg,data):
  221. buf = {"code": code,
  222. "msg": msg,
  223. "data": data}
  224. self.wfile.write(json.dumps(buf, ensure_ascii=False).encode('utf-8')) # 发送json格式的返回包体
  225. print(buf)
  226. def recv_post():
  227. # 绑定IP
  228. ts = HTTPServer(('192.168.2.57', 8899), MyHttpRequestHandler)
  229. # 启动服务
  230. ts.serve_forever()
  231. if __name__ == '__main__':
  232. config = parse_json.parse_json_with_comments('./config.json')
  233. log = HandleLog.get_logger(os.getcwd() + '_logs')
  234. # 数据库配置
  235. db_config = config['db_config']
  236. # rabbitmq配置
  237. mq_config = config['mq_config']
  238. # 日志记录
  239. # 消费指令消息
  240. cmd_callbacks = [["user_park_command_request_queue", check_park_command],
  241. ["user_pick_command_request_queue", check_pick_command]]
  242. if config['project_name'] == 'xm_sgj':
  243. checker = CHE_XmSgj.CommandChecker(db_config)
  244. elif config['project_name'] == 'gy_bhhy':
  245. checker = CHE_GyBhhy.CommandChecker(db_config)
  246. elif config['project_name'] == 'zkxy':
  247. checker = CHE_ZKXY.CommandChecker(db_config)
  248. # 初始化rabbitmq
  249. g_rabbitmq = CM.RabbitAsyncCommunicator(mq_config['mq_ip'], mq_config['mq_port'], mq_config['mq_user'],
  250. mq_config['mq_password'])
  251. # g_rabbitmq.Init(cmd_callbacks, mq_config['mq_statu_exchange_keys'], "checker")
  252. # # 绑定队列
  253. # for ex, key in mq_config['mq_statu_exchange_keys']:
  254. # if key.find("dispatch") >= 0:
  255. # g_rabbitmq.bind_statu_callback(ex, key, checker.receive_dispatch_statu)
  256. # if key.find("measure") >= 0:
  257. # g_rabbitmq.bind_statu_callback(ex, key, checker.receive_measure_info)
  258. # 收费系统云端通信
  259. http_community = threading.Thread(target=recv_post)
  260. g_rabbitmq.start()
  261. http_community.start()
  262. g_rabbitmq.join()
  263. http_community.join()