node.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import datetime
  2. import time
  3. import async_communication as CM
  4. import message_pb2 as message
  5. import google.protobuf.text_format as tf
  6. import CheckEntrance as CHE
  7. import threading
  8. # 状态消息接收器参数
  9. statu_ex_keys = [
  10. ["statu_ex","in_mcpu_1_statu_port"],
  11. ["statu_ex","in_mcpu_2_statu_port"],
  12. ["statu_ex","in_mcpu_3_statu_port"],
  13. ["statu_ex","in_mcpu_4_statu_port"],
  14. ["statu_ex","in_mcpu_5_statu_port"],
  15. ["statu_ex","in_mcpu_6_statu_port"],
  16. ["statu_ex","measure_1_statu_port"],
  17. ["statu_ex","measure_2_statu_port"],
  18. ["statu_ex","measure_3_statu_port"],
  19. ["statu_ex","measure_4_statu_port"],
  20. ["statu_ex","measure_5_statu_port"],
  21. ["statu_ex","measure_6_statu_port"]
  22. ]
  23. # mq参数
  24. mq_ip = "192.168.1.233"
  25. mq_port = 5672
  26. mq_user = "zx"
  27. mq_password = "zx123456"
  28. ex_name = "command_ex"
  29. def user_command_callback(body):
  30. if body.find("car_number")>=0:
  31. check_park_command(body)
  32. elif body.find("primary_key")>=0:
  33. check_pick_command(body)
  34. else:
  35. print("指令类型错误!\n%s" % body)
  36. def check_park_command(body):
  37. print("recieve park_command_queue message:%s" % body)
  38. park = message.park_table()
  39. tf.Parse(body, park)
  40. table = EntranceCheckers[park.terminal_id-1].entrance_isOK(park)
  41. cmd = message.park_table()
  42. tf.Parse(table, cmd)
  43. if cmd.statu.execute_statu == message.eNormal:
  44. # 指令检查正常
  45. print("停车指令 %s OK: 车牌号:%s 检查成功!" % (
  46. datetime.datetime.now(), cmd.car_number))
  47. response_port = "command_enqueue_port"
  48. else:
  49. print("停车指令 %s ERROR: 车牌号:%s 检查失败!" % (
  50. datetime.datetime.now(), cmd.car_number))
  51. response_port = "park_response_%d_port" % cmd.terminal_id
  52. g_rabbitmq.publish( ex_name, response_port,table)
  53. print("发送反馈端口:%s 反馈表单:%s"%(response_port,table))
  54. def check_pick_command(body):
  55. print("recv pick_command_queue message:%s" % body)
  56. pick = message.pick_table()
  57. tf.Parse(body, pick)
  58. table = EntranceCheckers[0].exit_isOK(pick)
  59. cmd = message.pick_table()
  60. tf.Parse(table, cmd)
  61. if cmd.statu.execute_statu == message.eNormal:
  62. # 指令检查正常
  63. print("取车指令 %s OK : 唯一码:%s 检查成功!" % (
  64. datetime.datetime.now(), cmd.primary_key))
  65. response_port = "command_enqueue_port"
  66. else:
  67. print("取车指令 %s ERROR : 唯一码:%s 检查失败!" % (
  68. datetime.datetime.now(), cmd.primary_key))
  69. response_port = "pick_response_%d_port" % cmd.terminal_id
  70. g_rabbitmq.publish( ex_name, response_port,table)
  71. print("发送反馈端口:%s 反馈表单:%s"%(response_port,table))
  72. def statu_msg_thread():
  73. statu = message.table_statu()
  74. statu.execute_statu = message.eNormal
  75. body = tf.MessageToString(statu, as_utf8=True)
  76. while True:
  77. g_rabbitmq.publish("statu_ex", "command_check_statu_port",body)
  78. time.sleep(1)
  79. cmd_callbacks=[["user_command_queue",user_command_callback]]
  80. g_rabbitmq=CM.RabbitAsyncCommunicator(mq_ip,mq_port,mq_user,mq_password)
  81. g_rabbitmq.Init(cmd_callbacks,statu_ex_keys)
  82. g_rabbitmq.start()
  83. EntranceCheckers=[]
  84. for i in range(6):
  85. checker=CHE.EntranceChecker()
  86. g_rabbitmq.bind_statu_callback("statu_ex", "in_mcpu_%d_statu_port"%(i+1), checker.receive_icpu)
  87. g_rabbitmq.bind_statu_callback("statu_ex", "measure_%d_statu_port"%(i+1), checker.receive_measureInfo)
  88. EntranceCheckers.append(checker)
  89. checker.start()
  90. if __name__ == '__main__':
  91. # 消费指令消息
  92. statu = message.table_statu()
  93. statu.execute_statu = message.eNormal
  94. body = tf.MessageToString(statu, as_utf8=True)
  95. while True:
  96. g_rabbitmq.publish("statu_ex", "command_check_statu_port",body)
  97. time.sleep(1)