RobotData.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import time
  2. import message_pb2 as message
  3. from mqtt_async import MqttAsync
  4. import google.protobuf.json_format as jtf
  5. from dijkstra.Map import DijikstraMap
  6. class TimeStatu:
  7. def __init__(self,statu=None,timeout=3):
  8. self.statu=statu
  9. self.time=time.time()
  10. self.timeout_ms=timeout
  11. def timeout(self):
  12. tm=time.time()
  13. return tm-self.time>self.timeout_ms or self.statu==None
  14. class Robot(MqttAsync):
  15. def __init__(self,name=""):
  16. self.timedPose_=TimeStatu(message.AGVStatu,0)
  17. self.timedNavStatu_=TimeStatu(message.NavStatu,0)
  18. self.dataTopic_={}
  19. self.messageArrivedSignal=None
  20. self.currentNavData_=None
  21. self.navCmdTopic_=None
  22. self.currentNavPathNodes_=None
  23. self.currentNavPath_=None
  24. self.pathColor_=[1,1,0]
  25. self.robotColor_=[0.7,0.2,0.3] #agv当前颜色
  26. self.name_=name
  27. def SetSubDataTopic(self,match:dict,messageSignal=None):
  28. self.dataTopic_=match
  29. self.messageArrivedSignal=messageSignal
  30. for item in match.items():
  31. [topic,_]=item
  32. self.subscribe(topic,self.on_message)
  33. def SetCmdTopic(self,topic):
  34. self.navCmdTopic_=topic
  35. def SetColor(self,pathColor,robotColor):
  36. self.pathColor_=pathColor
  37. self.robotColor_=robotColor
  38. def ResetPose(self,agv : message.AGVStatu):
  39. self.timedPose_=TimeStatu(agv,0.5)
  40. def ResetNavStatu(self,statu:message.NavStatu):
  41. self.timedNavStatu_=TimeStatu(statu,0.5)
  42. def on_message(self,client, userdata, msg):
  43. topic=msg.topic
  44. if self.dataTopic_.get(topic) is not None:
  45. dtype=self.dataTopic_[topic]
  46. if dtype==message.AGVStatu.__name__:
  47. proto=message.AGVStatu()
  48. jtf.Parse(msg.payload.decode(),proto)
  49. self.ResetPose(proto)
  50. if dtype==message.NavStatu.__name__:
  51. self.last_running=self.IsNavigating()
  52. proto=message.NavStatu()
  53. jtf.Parse(msg.payload.decode(),proto)
  54. self.ResetNavStatu(proto)
  55. if self.last_running==True and self.IsNavigating()==False:
  56. self.currentNavPathNodes_=None
  57. self.currentNavPath_=None
  58. if self.messageArrivedSignal is not None:
  59. self.messageArrivedSignal()
  60. def MpcSelectTraj(self):
  61. traj=[]
  62. if self.timedNavStatu_.timeout()==False:
  63. nav=self.timedNavStatu_.statu
  64. for pose2d in nav.selected_traj.poses:
  65. traj.append([pose2d.x,pose2d.y,pose2d.theta])
  66. return traj
  67. def MpcPredictTraj(self):
  68. traj=[]
  69. if self.timedNavStatu_.timeout()==False:
  70. nav=self.timedNavStatu_.statu
  71. for pose2d in nav.predict_traj.poses:
  72. traj.append([pose2d.x,pose2d.y,pose2d.theta])
  73. return traj
  74. def Connected(self):
  75. return self.timedNavStatu_.timeout()==False and self.timedPose_.timeout()==False
  76. def IsNavigating(self):
  77. if self.timedNavStatu_.timeout()==False:
  78. key=self.timedNavStatu_.statu.key
  79. if key=="" or key==None:
  80. return False
  81. return True
  82. def GeneratePath(self,begID,endID):
  83. self.currentNavPathNodes_=DijikstraMap().GetShortestPath(begID,endID)
  84. self.currentNavPath_=DijikstraMap().CreatePath(self.currentNavPathNodes_,0.5)
  85. def ExecNavtask(self):
  86. if self.navCmdTopic_ == None:
  87. print("Robot has not set nav cmd topic")
  88. return
  89. if self.IsNavigating():
  90. print(" robot is navigating ,key:%s"%(self.timedNavStatu_.statu.key))
  91. return
  92. cmd=None
  93. if self.currentNavPathNodes_ is not None and self.timedPose_.timeout()==False:
  94. statu=self.timedPose_.statu
  95. if statu is not None:
  96. cmd=DijikstraMap().CreateNavCmd([statu.x,statu.y,statu.theta],self.currentNavPathNodes_)
  97. else:
  98. print("current path is none")
  99. return
  100. if cmd is None:
  101. print("Nav cmd is None")
  102. return
  103. self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
  104. '''while self.IsNavigating()==False:
  105. time.sleep(1)
  106. self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))'''
  107. def CancelNavTask(self):
  108. cmd=message.NavCmd()
  109. cmd.action=3
  110. self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))