node.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import datetime
  2. import time
  3. import async_communication as cmt
  4. import PushCommand as PC
  5. import message_pb2 as message
  6. import google.protobuf.text_format as tf
  7. #mq参数
  8. mq_ip="192.168.1.233"
  9. mq_port=5672
  10. mq_user="zx"
  11. mq_password="zx123456"
  12. ex_name="command_ex"
  13. #db参数
  14. db_ip="192.168.1.233"
  15. db_port=3306
  16. db_name="ct_project"
  17. db_user="zx"
  18. db_password="zx123456"
  19. #rabbitmq
  20. g_rabbitmq=cmt.RabbitAsyncCommunicator(mq_ip,mq_port,mq_user,mq_password)
  21. #DB
  22. g_DBcommand=PC.DBCommand(db_ip,db_port,db_name,db_user,db_password)
  23. def command_enqueue_callback(table):
  24. print("收到入队请求:%s"%table)
  25. table=g_DBcommand.push_command(table)
  26. if table==None:
  27. return
  28. command_type=g_DBcommand.table_type(table)
  29. if command_type=="park":
  30. cmd=message.park_table()
  31. tf.Parse(table,cmd)
  32. response_port="park_response_%d_port"%(cmd.terminal_id)
  33. else:
  34. cmd=message.pick_table()
  35. tf.Parse(table,cmd)
  36. response_port="pick_response_%d_port"%(cmd.terminal_id)
  37. if cmd.statu.execute_statu==message.eNormal:
  38. #指令入队正常
  39. port="count_command_signal_%d_port"%cmd.unit_id
  40. g_rabbitmq.publish(ex_name,port,table) #推送指令作为计数
  41. print("%s OK : %s指令 %s入队成功 key:%s 单元号:%d"%(datetime.datetime.now(),command_type,cmd.car_number,cmd.primary_key,cmd.unit_id))
  42. else:
  43. print("%s ERROR : %s指令 %s入队失败 : %s 单元号:%d"%(datetime.datetime.now(),command_type,cmd.car_number,cmd.statu.statu_description,cmd.unit_id))
  44. print("发送反馈 ex:%s,port:%s"%(ex_name,response_port))
  45. g_rabbitmq.publish(ex_name,response_port,table)
  46. if __name__ == '__main__':
  47. #消费指令消息
  48. print("入队节点启动----")
  49. cmd_callbacks=[["command_enqueue_queue",command_enqueue_callback]]
  50. g_rabbitmq.Init(cmd_callbacks,None)
  51. #发送自身状态
  52. # while True:
  53. # statu=message.STATU
  54. # statu=message.eNormal
  55. # g_sender.send(tf.MessageToString(statu,as_utf8=True),"statu_ex","enqueue_node_statu_port")
  56. # time.sleep(1)
  57. g_rabbitmq.start()
  58. g_rabbitmq.join()