123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- import hashlib
- import socketserver
- import sys
- import json
- sys.path.append("..")
- import datetime
- import os
- import threading
- import time
- import async_communication as CM
- import message_pb2 as message
- import google.protobuf.text_format as tf
- import check_command_XmSgj as CHE_XmSgj
- import check_command_GyBhhy as CHE_GyBhhy
- import check_command_ZKXY as CHE_ZKXY
- import mytool.json_helper.parse_json as parse_json
- from http.server import HTTPServer, BaseHTTPRequestHandler
- from mytool.log_helper.logger import HandleLog
- def check_park_command(body):
- # print("接收停车指令 %s message:%s" % (datetime.datetime.now(), body))
- log.info("接收停车指令 %s message:%s" % (datetime.datetime.now(), body[0:260]))
- cmd = checker.entrance_isOK(body)
- table = message.park_table()
- table.CopyFrom(cmd)
- cmd.car_number_info.plate_full_image = ''
- cmd.car_number_info.plate_clip_image = ''
- response_port = "user_park_command_%d_response_port" % cmd.terminal_id
- dispatch_key = ""
- entrance_key = "entrance_update_%d_port" % cmd.terminal_id
- g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
- if cmd.statu.execute_statu == message.eNormal:
- # 指令检查正常
- dispatch_key = "dispatch_park_command_%d_request_port" % cmd.unit_id
- g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(table, as_utf8=True))
- log.info(
- "停车指令 %s OK:%s 检查成功!\n" % (datetime.datetime.now(), tf.MessageToString(table, as_utf8=True)))
- else:
- # print("---------------刷新投影-----------------" + entrance_key)
- # g_rabbitmq.publish(mq_config['mq_statu_exchange_name'], entrance_key,
- # tf.MessageToString(cmd, as_utf8=True))
- if cmd.statu.execute_statu == message.eWarning:
- # 给终端发送指令,刷新投影
- log.warning(
- "停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
- datetime.datetime.now(), cmd.car_number,
- mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
- tf.MessageToString(cmd, as_utf8=True)))
- elif cmd.statu.execute_statu == message.eError:
- log.error("停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
- datetime.datetime.now(), cmd.car_number,
- mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
- tf.MessageToString(cmd, as_utf8=True)))
- else:
- log.critical(
- "停车指令 %s 车牌号:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
- datetime.datetime.now(), cmd.car_number,
- mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
- tf.MessageToString(cmd, as_utf8=True)))
- def check_pick_command(body):
- log.info("接收取车指令 %s message:%s" % (datetime.datetime.now(), body))
- cmd = checker.exit_isOK(body)
- response_port = "user_pick_command_%d_response_port" % cmd.terminal_id
- dispatch_key = ""
- if cmd.statu.execute_statu == message.eNormal and cmd.statu.table_process_mod != message.PROCESS_ONLY_TO_PAY:
- # 指令检查正常
- log.info("取车指令 %s OK :%s 检查成功!\n" % (datetime.datetime.now(), tf.MessageToString(cmd, as_utf8=True)))
- dispatch_key = "dispatch_pick_command_%d_request_port" % cmd.unit_id
- g_rabbitmq.publish(mq_config['mq_command_exchange_name'], dispatch_key, tf.MessageToString(cmd, as_utf8=True))
- else:
- if cmd.statu.execute_statu == message.eWarning:
- log.warning(
- "取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
- datetime.datetime.now(), cmd.primary_key,
- mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
- tf.MessageToString(cmd, as_utf8=True)))
- elif cmd.statu.execute_statu == message.eError:
- log.error("取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
- datetime.datetime.now(), cmd.primary_key,
- mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
- tf.MessageToString(cmd, as_utf8=True)))
- else:
- log.critical(
- "取车指令 %s 唯一码:%s 检查失败!\n 发送反馈 交换机:%s 发送反馈端口:%s 反馈表单:%s" % (
- datetime.datetime.now(), cmd.primary_key,
- mq_config['mq_command_exchange_name'], response_port + "," + dispatch_key,
- tf.MessageToString(cmd, as_utf8=True)))
- if cmd.terminal_id != 0:
- g_rabbitmq.publish(mq_config['mq_command_exchange_name'], response_port, tf.MessageToString(cmd, as_utf8=True))
- else:
- return cmd
- class MyHttpRequestHandler(BaseHTTPRequestHandler):
- def _send_cors_headers(self):
- """ Sets headers required for CORS """
- self.send_header('Content-type', 'application/json')
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header("Access-Control-Allow-Methods", "*")
- self.send_header("Access-Control-Allow-Headers", "Authorization, Content-Type")
- def check(self,Appid,Time,Sign):
- self.Appid = '14249167070745034'
- self.AppSecret = 'vN2dZq2C3f3EAoiURrIDbjFWtPPlTetB'
- sign_iphone = Appid + Time + self.AppSecret
- hl_iphone = hashlib.md5()
- hl_iphone.update(sign_iphone.encode(encoding='utf-8'))
- sign_iphone = hl_iphone.hexdigest()
- if sign_iphone == Sign:
- return True
- return False
- def do_POST(self):
- print('do_post:')
- path = str(self.path)
- if path != '/api/operateOutScene/sgjQueryRankMsg' :
- code, msg, data = -200, "error", "地址错误!"
- self.send_write(code, msg, data)
- return
- try:
- Appid = self.headers['Appid']
- Time = self.headers['Time']
- Sign = self.headers['Sign']
- if self.check(Appid,Time,Sign) is False:
- code, msg, data = -200, "error", "验签失败!"
- self.send_write(code, msg, data)
- return
- except Exception:
- code,msg,data = -200,"error","验签失败!"
- self.send_write(code,msg,data)
- return
- datas = self.rfile.read(int(self.headers['content-length'])) # 固定格式,获取表单提交的数据
- self.send_response(200) # 返回状态码
- self.end_headers() # 返回响应头结束
- code = 200
- msg = "error"
- data = "初始化"
- try:
- # utf-8解码解析json
- st = datas.decode('utf-8')
- res = json.loads(st)
- print(res)
- except Exception:
- code,msg,data = -200,"error","解析失败!"
- self.send_write(code,msg,data)
- return
- # 查看是否有指令类型字段
- if 'commandType' in res.keys():
- # 取车指令
- if res['commandType'] == 'PickCommand':
- try:
- # 查看是否有唯一码字段
- primary_key = res['inRecordId']
- except Exception:
- code, msg, data = -20, "error", "PickCommand,解析失败,缺少关键字段inRecordId!"
- self.send_write(code, msg, data)
- return
- # 生成取车表单,走正常取车指令
- table = message.pick_table()
- table.primary_key = primary_key
- # 执行取车指令
- cmd = check_pick_command(tf.MessageToString(table, as_utf8=True))
- if cmd.statu.execute_statu == message.eNormal:
- code, msg, data = 0, "success", "取车指令,接收成功!"
- else:
- code, msg, data = -1, "error", "取车失败!%s" % cmd.statu.statu_description
- # 查询排队
- elif res['commandType'] == 'RankCommand':
- try:
- garageNo = res['garageNo']
- unit = checker.get_unit_int(garageNo)
- car_number = res['licenesePlate']
- primary_key = res['inRecordId']
- except Exception:
- code, msg, data = -20, "error", "RankCommand,解析失败,缺少关键字段!"
- self.send_write(code, msg, data)
- return
- # 查询排队信息
- print("%%%%%%%%%%%%%%%%%%%%%%%%%%%% 查询排队信息 %%%%%%%%%%%%%%%%%%%%%%%%%%%%" + str(datetime.datetime.now()))
- command_list = checker.db.query_queue_condition_in_unit(unit)
- i = 0
- for dict in command_list:
- if dict['car_number'] == car_number:
- command_count = dict['row_number() over(order by queue_time)']-1
- pickup_queue_dict = {
- "licenesePlate": car_number,
- "inRecordId": primary_key,
- "garageNo": garageNo,
- "waitNum": command_count,
- "remainingTime": command_count * 2
- }
- code, msg, data = 0, "success",pickup_queue_dict
- break
- i = i+1
- # 没找到对应车辆
- if i == len(command_list):
- code, msg, data = -1, "error", '未查询到该车辆,车牌号:%s,单元号:%s,流水号:%s'%(car_number,garageNo,primary_key)
- # 预约车位
- elif res['commandType'] == 'ApplyCommand':
- try:
- apply_table = message.apply_table()
- apply_table.car_number = res['licenesePlate']
- apply_table.apply_end_time = res['endTime']
- apply_table.operation_type = res['operationType']
- except Exception:
- code, msg, data = -20, "error", "ApplyCommand,解析失败,缺少关键字段!"
- self.send_write(code, msg, data)
- return
- #预约车位查重
- sql = "Select * from space where height>1.799 and wheel_base>2.899 and unit>30 and car_number is null and statu=0"
- c_empty_space_list = checker.db.query(sql)
- space_list = checker.db.query_space_in_car_number(apply_table.car_number)
- if len(space_list)>0:
- code, msg, data = -2, "error", "预约失败,车辆已在库内!"
- elif len(c_empty_space_list) < 0:
- code, msg, data = -1, "error", "预约失败,无空闲车位!"
- else:
- g_rabbitmq.publish(mq_config['mq_command_exchange_name'], "dispatch_apply_command_request_port",tf.MessageToString(apply_table, as_utf8=True))
- code, msg, data = 0, "success", "预约成功!"
- else:
- code, msg, data = -20, "error", "解析失败,缺少关键字段commandType!"
- # 发送答复
- self.send_write(code, msg, data)
- # 对接受到的post请求发送答复
- def send_write(self,code,msg,data):
- buf = {"code": code,
- "msg": msg,
- "data": data}
- self.wfile.write(json.dumps(buf, ensure_ascii=False).encode('utf-8')) # 发送json格式的返回包体
- print(buf)
- def recv_post():
- # 绑定IP
- ts = HTTPServer(('192.168.2.57', 8899), MyHttpRequestHandler)
- # 启动服务
- ts.serve_forever()
- if __name__ == '__main__':
- config = parse_json.parse_json_with_comments('./config.json')
- log = HandleLog.get_logger(os.getcwd() + '_logs')
- # 数据库配置
- db_config = config['db_config']
- # rabbitmq配置
- mq_config = config['mq_config']
- # 日志记录
- # 消费指令消息
- cmd_callbacks = [["user_park_command_request_queue", check_park_command],
- ["user_pick_command_request_queue", check_pick_command]]
- if config['project_name'] == 'xm_sgj':
- checker = CHE_XmSgj.CommandChecker(db_config)
- elif config['project_name'] == 'gy_bhhy':
- checker = CHE_GyBhhy.CommandChecker(db_config)
- elif config['project_name'] == 'zkxy':
- checker = CHE_ZKXY.CommandChecker(db_config)
- # 初始化rabbitmq
- g_rabbitmq = CM.RabbitAsyncCommunicator(mq_config['mq_ip'], mq_config['mq_port'], mq_config['mq_user'],
- mq_config['mq_password'])
- # g_rabbitmq.Init(cmd_callbacks, mq_config['mq_statu_exchange_keys'], "checker")
- # # 绑定队列
- # for ex, key in mq_config['mq_statu_exchange_keys']:
- # if key.find("dispatch") >= 0:
- # g_rabbitmq.bind_statu_callback(ex, key, checker.receive_dispatch_statu)
- # if key.find("measure") >= 0:
- # g_rabbitmq.bind_statu_callback(ex, key, checker.receive_measure_info)
- # 收费系统云端通信
- http_community = threading.Thread(target=recv_post)
- g_rabbitmq.start()
- http_community.start()
- g_rabbitmq.join()
- http_community.join()
|