RobotData.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import time
  2. import message_pb2 as message
  3. from mqtt_async import MqttAsync
  4. import google.protobuf.json_format as jtf
  5. from dijkstra.Map import DijikstraMap
  6. class AGVTaskTag:
  7. # ----------------------------------------------------------------
  8. # DefaultTask
  9. Default_Base =0
  10. Default_Idle =Default_Base +1 #初始空闲状态
  11. # ----------------------------------------------------------------
  12. # ParkTask
  13. Park_Base = 1000
  14. Park_Idle = Park_Base + 1 # 等待
  15. Park_Pause = Park_Base + 2 # 暂停
  16. Park_Stop = Park_Base + 3 # 停止(非正常)
  17. Park_End = Park_Base + 4 # 结束(正常)
  18. Park_Ready2Park = Park_Base + 10 # 调整准备好存车
  19. Park_Move2Entrance = Park_Base + 20 # 移动至入口
  20. Park_LoadCar = Park_Base + 30 # 装车(夹持)
  21. Park_Move2Space = Park_Base + 40 # 移动至车位
  22. Park_UnloadCar = Park_Base + 50 # 卸车(松开夹持)
  23. Park_Move2Safety = Park_Base + 60 # 移动至安全位置
  24. # ----------------------------------------------------------------
  25. # PickTask
  26. Pick_Base = 2000
  27. Pick_Idle = Pick_Base + 0 # 等待
  28. Pick_Pause = Pick_Base + 1 # 暂停
  29. Pick_Stop = Pick_Base + 2 # 停止(非正常)
  30. Pick_End = Pick_Base + 3 # 结束(正常)
  31. Pick_Ready2Pick = Pick_Base + 10 # 调整准备好取车
  32. Pick_Move2Space = Pick_Base + 20 # 移动至车位
  33. Pick_LoadCar = Pick_Base + 30 # 装车(夹持)
  34. Pick_Move2Exit = Pick_Base + 40 # 移动至出口
  35. Pick_UnloadCar = Pick_Base + 50 # 卸车(松开夹持)
  36. Pick_Move2Safety = Pick_Base + 60 # 移动至安全位置
  37. class TimeStatu:
  38. def __init__(self, statu=None, timeout=3.0):
  39. self.statu = statu
  40. self.time = time.time()
  41. self.timeout_ms = timeout
  42. def timeout(self):
  43. tm = time.time()
  44. return tm - self.time > self.timeout_ms or self.statu == None
  45. class Robot(MqttAsync):
  46. def __init__(self, name=""):
  47. self.timedPose_ = TimeStatu(message.AGVStatu, 0)
  48. self.timedNavStatu_ = TimeStatu(message.NavStatu, 0)
  49. self.dataTopic_ = {}
  50. self.messageArrivedSignal = None
  51. self.currentNavData_ = None
  52. self.navCmdTopic_ = None
  53. self.currentNavPathNodes_ = None
  54. self.currentNavPath_ = None
  55. self.pathColor_ = [1, 1, 0]
  56. self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色
  57. self.name_ = name
  58. self.l_ = 0.8 # 轮长
  59. self.L_ = 1.3 # 轴距
  60. self.W_ = 2.5 # 宽
  61. # 地图参数设置
  62. self.safety_location = "N8" # 测试用,安全位置
  63. self.entrance_location = "N8" # 测试用,入口位置
  64. self.exit_location = "N8" # 测试用,出口位置
  65. # 流程控制相关
  66. self.agv_task_tag = AGVTaskTag.Default_Idle
  67. def Color(self):
  68. if self.IsMainModeStatu():
  69. return [0, 0, 0]
  70. return self.robotColor_
  71. def IsMainAgv(self):
  72. if self.timedNavStatu_.timeout() == False:
  73. if self.timedNavStatu_.statu.main_agv == True:
  74. return True
  75. return False
  76. def IsMainModeStatu(self):
  77. if self.IsMainAgv():
  78. if self.timedNavStatu_.timeout() == False:
  79. if self.timedNavStatu_.statu.move_mode == 2:
  80. return True
  81. return False
  82. def SetSubDataTopic(self, match: dict, messageSignal=None):
  83. self.dataTopic_ = match
  84. self.messageArrivedSignal = messageSignal
  85. for item in match.items():
  86. [topic, _] = item
  87. self.subscribe(topic, self.on_message)
  88. def SetCmdTopic(self, topic):
  89. self.navCmdTopic_ = topic
  90. def SetColor(self, pathColor, robotColor):
  91. self.pathColor_ = pathColor
  92. self.robotColor_ = robotColor
  93. def ResetPose(self, agv: message.AGVStatu):
  94. self.timedPose_ = TimeStatu(agv, 0.5)
  95. def ResetNavStatu(self, statu: message.NavStatu):
  96. self.timedNavStatu_ = TimeStatu(statu, 0.5)
  97. def on_message(self, client, userdata, msg):
  98. topic = msg.topic
  99. if self.dataTopic_.get(topic) is not None:
  100. dtype = self.dataTopic_[topic]
  101. if dtype == message.AGVStatu.__name__:
  102. proto = message.AGVStatu()
  103. jtf.Parse(msg.payload.decode(), proto)
  104. self.ResetPose(proto)
  105. if dtype == message.NavStatu.__name__:
  106. self.last_running = self.IsNavigating()
  107. proto = message.NavStatu()
  108. jtf.Parse(msg.payload.decode(), proto)
  109. self.ResetNavStatu(proto)
  110. if self.last_running == True and self.IsNavigating() == False:
  111. self.currentNavPathNodes_ = None
  112. self.currentNavPath_ = None
  113. if self.messageArrivedSignal is not None:
  114. self.messageArrivedSignal()
  115. def MpcSelectTraj(self):
  116. traj = []
  117. if self.timedNavStatu_.timeout() == False:
  118. nav = self.timedNavStatu_.statu
  119. for pose2d in nav.selected_traj.poses:
  120. traj.append([pose2d.x, pose2d.y, pose2d.theta])
  121. return traj
  122. def MpcPredictTraj(self):
  123. traj = []
  124. if self.timedNavStatu_.timeout() == False:
  125. nav = self.timedNavStatu_.statu
  126. for pose2d in nav.predict_traj.poses:
  127. traj.append([pose2d.x, pose2d.y, pose2d.theta])
  128. return traj
  129. def Connected(self):
  130. return self.timedNavStatu_.timeout() == False and self.timedPose_.timeout() == False
  131. def IsNavigating(self):
  132. if self.timedNavStatu_.timeout() == False:
  133. key = self.timedNavStatu_.statu.key
  134. if key == "" or key == None:
  135. return False
  136. return True
  137. def GeneratePath(self, begID, endID):
  138. self.currentNavPathNodes_ = DijikstraMap().GetShortestPath(begID, endID)
  139. self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
  140. def ExecNavtask(self):
  141. if self.navCmdTopic_ == None:
  142. print("Robot has not set nav cmd topic")
  143. return
  144. if self.IsNavigating():
  145. print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
  146. return
  147. cmd = None
  148. if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() == False:
  149. statu = self.timedPose_.statu
  150. if statu is not None:
  151. cmd = DijikstraMap().CreateNavCmd([statu.x, statu.y, statu.theta], self.currentNavPathNodes_)
  152. else:
  153. print("current path is none")
  154. return
  155. if cmd is None:
  156. print("Nav cmd is None")
  157. return
  158. self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
  159. '''while self.IsNavigating()==False:
  160. time.sleep(1)
  161. self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))'''
  162. def SwitchMoveMode(self, mode, wheelbase):
  163. cmd = message.NavCmd()
  164. if mode == 2:
  165. cmd.wheelbase = wheelbase
  166. cmd.action = 4
  167. if mode == 1:
  168. cmd.action = 5
  169. self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
  170. def CancelNavTask(self):
  171. cmd = message.NavCmd()
  172. cmd.action = 3
  173. self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
  174. # 存车
  175. def ParkTask(self, targetID=None, begID=None):
  176. if targetID is None:
  177. return
  178. if begID is None: # 默认AGV当前位置为安全位置,否则需要给定位置
  179. begID = self.safety_location
  180. if self.navCmdTopic_ == None:
  181. print("Robot has not set nav cmd topic")
  182. return
  183. if self.IsNavigating():
  184. print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
  185. return
  186. # 计算完整最短路径(分三段,最后合并)
  187. ParkPathNodes_1 = DijikstraMap().GetShortestPath(begID, self.entrance_location)
  188. ParkPathNodes_2 = DijikstraMap().GetShortestPath(self.entrance_location, targetID)
  189. ParkPathNodes_3 = DijikstraMap().GetShortestPath(targetID, self.safety_location)
  190. self.currentNavPathNodes_ = ParkPathNodes_1[0:len(ParkPathNodes_1) - 1] + ParkPathNodes_2[0:len(
  191. ParkPathNodes_2) - 1] + ParkPathNodes_3
  192. self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
  193. # 生成完整指令
  194. cmd = None
  195. if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() is False:
  196. # if self.currentNavPathNodes_ is not None:
  197. statu = self.timedPose_.statu
  198. if statu is not None:
  199. cmd = self.CreateNavCmd(ParkPathNodes_1, ParkPathNodes_2, ParkPathNodes_3, AGVTaskTag.Park_Base)
  200. else:
  201. print("current path is none")
  202. return
  203. if cmd is None:
  204. print("Park cmd is None")
  205. return
  206. self.setAGVTaskTag(AGVTaskTag.Park_Base)
  207. self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
  208. # 取车
  209. def PickTask(self, targetID=None, begID=None):
  210. if targetID is None:
  211. return
  212. if begID is None: # 默认AGV当前位置为安全位置,否则需要给定位置
  213. begID = self.safety_location
  214. if self.navCmdTopic_ == None:
  215. print("Robot has not set nav cmd topic")
  216. return
  217. if self.IsNavigating():
  218. print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
  219. return
  220. # 计算完整最短路径(分三段,最后合并)
  221. ParkPathNodes_1 = DijikstraMap().GetShortestPath(begID, targetID)
  222. ParkPathNodes_2 = DijikstraMap().GetShortestPath(targetID, self.exit_location)
  223. ParkPathNodes_3 = DijikstraMap().GetShortestPath(self.exit_location, self.safety_location)
  224. self.currentNavPathNodes_ = ParkPathNodes_1[0:len(ParkPathNodes_1) - 1] + ParkPathNodes_2[0:len(
  225. ParkPathNodes_2) - 1] + ParkPathNodes_3
  226. self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
  227. # 生成完整指令
  228. cmd = None
  229. if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() is False:
  230. statu = self.timedPose_.statu
  231. if statu is not None:
  232. cmd = self.CreateNavCmd(ParkPathNodes_1, ParkPathNodes_2, ParkPathNodes_3, AGVTaskTag.Pick_Base)
  233. else:
  234. print("current path is none")
  235. return
  236. if cmd is None:
  237. print("Park cmd is None")
  238. return
  239. self.setAGVTaskTag(AGVTaskTag.Pick_Base)
  240. self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
  241. # parameter:
  242. # task_type: 根据其值,分段生成对应的的cmd
  243. def CreateNavCmd(self, path_1, path_2, path_3, task_type):
  244. cmd_base = DijikstraMap().CreateNavCmd_part_base()
  245. cmd_actions = list()
  246. if task_type == AGVTaskTag.Park_Base:
  247. cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_1))
  248. act_clamp_1 = message.Action()
  249. act_clamp_1.type = (3)
  250. cmd_actions.append([act_clamp_1]) # 夹持
  251. cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_2))
  252. act_clamp_2 = message.Action()
  253. act_clamp_2.type = (4)
  254. cmd_actions.append([act_clamp_2]) # 松夹持
  255. cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_3))
  256. elif task_type == AGVTaskTag.Pick_Base:
  257. cmd_base = DijikstraMap().CreateNavCmd_part_base()
  258. cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_1))
  259. act_clamp_1 = message.Action()
  260. act_clamp_1.type = (3)
  261. cmd_actions.append([act_clamp_1]) # 夹持
  262. cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_2))
  263. act_clamp_2 = message.Action()
  264. act_clamp_2.type = (4)
  265. cmd_actions.append([act_clamp_2]) # 松夹持
  266. cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_3))
  267. if len(cmd_actions) < 3:
  268. return None
  269. else:
  270. for i in range(len(cmd_actions)):
  271. if cmd_actions[i] is not None:
  272. for action in cmd_actions[i]:
  273. cmd_base.actions.add().CopyFrom(action)
  274. return cmd_base
  275. def setAGVTaskTag(self, task_tag):
  276. self.agv_task_tag = task_tag
  277. def getAGVTaskTag(self):
  278. return self.agv_task_tag