RobotData.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import time
  2. import message_pb2 as message
  3. from mqtt_async import MqttAsync
  4. import google.protobuf.json_format as jtf
  5. from dijkstra.Map import DijikstraMap
  6. class TimeStatu:
  7. def __init__(self,statu=None,timeout=3):
  8. self.statu=statu
  9. self.time=time.time()
  10. self.timeout_ms=timeout
  11. def timeout(self):
  12. tm=time.time()
  13. return tm-self.time>self.timeout_ms or self.statu==None
  14. class Robot(MqttAsync):
  15. def __init__(self,name=""):
  16. self.timedPose_=TimeStatu(message.AGVStatu,0)
  17. self.timedNavStatu_=TimeStatu(message.NavStatu,0)
  18. self.dataTopic_={}
  19. self.messageArrivedSignal=None
  20. self.currentNavData_=None
  21. self.navCmdTopic_=None
  22. self.currentNavPathNodes_=None
  23. self.currentNavPath_=None
  24. self.pathColor_=[1,1,0]
  25. self.robotColor_=[0.7,0.2,0.3] #agv当前颜色
  26. self.name_=name
  27. self.l_=0.8 #轮长
  28. self.L_=1.3 #轴距
  29. self.W_=2.5 #宽
  30. def Color(self):
  31. if self.IsMainModeStatu():
  32. return [0,0,0]
  33. return self.robotColor_
  34. def IsMainAgv(self):
  35. if self.timedNavStatu_.timeout()==False:
  36. if self.timedNavStatu_.statu.main_agv==True:
  37. return True
  38. return False
  39. def IsMainModeStatu(self):
  40. if self.IsMainAgv():
  41. if self.timedNavStatu_.timeout() == False:
  42. if self.timedNavStatu_.statu.move_mode==2:
  43. return True
  44. return False
  45. def SetSubDataTopic(self,match:dict,messageSignal=None):
  46. self.dataTopic_=match
  47. self.messageArrivedSignal=messageSignal
  48. for item in match.items():
  49. [topic,_]=item
  50. self.subscribe(topic,self.on_message)
  51. def SetCmdTopic(self,topic):
  52. self.navCmdTopic_=topic
  53. def SetColor(self,pathColor,robotColor):
  54. self.pathColor_=pathColor
  55. self.robotColor_=robotColor
  56. def ResetPose(self,agv : message.AGVStatu):
  57. self.timedPose_=TimeStatu(agv,0.5)
  58. def ResetNavStatu(self,statu:message.NavStatu):
  59. self.timedNavStatu_=TimeStatu(statu,0.5)
  60. def on_message(self,client, userdata, msg):
  61. topic=msg.topic
  62. if self.dataTopic_.get(topic) is not None:
  63. dtype=self.dataTopic_[topic]
  64. if dtype==message.AGVStatu.__name__:
  65. proto=message.AGVStatu()
  66. jtf.Parse(msg.payload.decode(),proto)
  67. self.ResetPose(proto)
  68. if dtype==message.NavStatu.__name__:
  69. self.last_running=self.IsNavigating()
  70. proto=message.NavStatu()
  71. jtf.Parse(msg.payload.decode(),proto)
  72. self.ResetNavStatu(proto)
  73. if self.last_running==True and self.IsNavigating()==False:
  74. self.currentNavPathNodes_=None
  75. self.currentNavPath_=None
  76. if self.messageArrivedSignal is not None:
  77. self.messageArrivedSignal()
  78. def MpcSelectTraj(self):
  79. traj=[]
  80. if self.timedNavStatu_.timeout()==False:
  81. nav=self.timedNavStatu_.statu
  82. for pose2d in nav.selected_traj.poses:
  83. traj.append([pose2d.x,pose2d.y,pose2d.theta])
  84. return traj
  85. def MpcPredictTraj(self):
  86. traj=[]
  87. if self.timedNavStatu_.timeout()==False:
  88. nav=self.timedNavStatu_.statu
  89. for pose2d in nav.predict_traj.poses:
  90. traj.append([pose2d.x,pose2d.y,pose2d.theta])
  91. return traj
  92. def Connected(self):
  93. return self.timedNavStatu_.timeout()==False and self.timedPose_.timeout()==False
  94. def IsNavigating(self):
  95. if self.timedNavStatu_.timeout()==False:
  96. key=self.timedNavStatu_.statu.key
  97. if key=="" or key==None:
  98. return False
  99. return True
  100. def GeneratePath(self,begID,endID):
  101. self.currentNavPathNodes_=DijikstraMap().GetShortestPath(begID,endID)
  102. self.currentNavPath_=DijikstraMap().CreatePath(self.currentNavPathNodes_,0.5)
  103. def ExecNavtask(self):
  104. if self.navCmdTopic_ == None:
  105. print("Robot has not set nav cmd topic")
  106. return
  107. if self.IsNavigating():
  108. print(" robot is navigating ,key:%s"%(self.timedNavStatu_.statu.key))
  109. return
  110. cmd=None
  111. if self.currentNavPathNodes_ is not None and self.timedPose_.timeout()==False:
  112. statu=self.timedPose_.statu
  113. if statu is not None:
  114. cmd=DijikstraMap().CreateNavCmd([statu.x,statu.y,statu.theta],self.currentNavPathNodes_)
  115. else:
  116. print("current path is none")
  117. return
  118. if cmd is None:
  119. print("Nav cmd is None")
  120. return
  121. self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
  122. '''while self.IsNavigating()==False:
  123. time.sleep(1)
  124. self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))'''
  125. def SwitchMoveMode(self,mode,wheelbase):
  126. cmd =message.NavCmd()
  127. if mode==2:
  128. cmd.wheelbase=wheelbase
  129. cmd.action=4
  130. if mode==1:
  131. cmd.action=5
  132. self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
  133. def CancelNavTask(self):
  134. cmd=message.NavCmd()
  135. cmd.action=3
  136. self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))