123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- import time
- import message_pb2 as message
- from mqtt_async import MqttAsync
- import google.protobuf.json_format as jtf
- from dijkstra.Map import DijikstraMap
- class AGVTaskTag:
- # ----------------------------------------------------------------
- # DefaultTask
- Default_Base =0
- Default_Idle =Default_Base +1 #初始空闲状态
- # ----------------------------------------------------------------
- # ParkTask
- Park_Base = 1000
- Park_Idle = Park_Base + 1 # 等待
- Park_Pause = Park_Base + 2 # 暂停
- Park_Stop = Park_Base + 3 # 停止(非正常)
- Park_End = Park_Base + 4 # 结束(正常)
- Park_Ready2Park = Park_Base + 10 # 调整准备好存车
- Park_Move2Entrance = Park_Base + 20 # 移动至入口
- Park_LoadCar = Park_Base + 30 # 装车(夹持)
- Park_Move2Space = Park_Base + 40 # 移动至车位
- Park_UnloadCar = Park_Base + 50 # 卸车(松开夹持)
- Park_Move2Safety = Park_Base + 60 # 移动至安全位置
- # ----------------------------------------------------------------
- # PickTask
- Pick_Base = 2000
- Pick_Idle = Pick_Base + 0 # 等待
- Pick_Pause = Pick_Base + 1 # 暂停
- Pick_Stop = Pick_Base + 2 # 停止(非正常)
- Pick_End = Pick_Base + 3 # 结束(正常)
- Pick_Ready2Pick = Pick_Base + 10 # 调整准备好取车
- Pick_Move2Space = Pick_Base + 20 # 移动至车位
- Pick_LoadCar = Pick_Base + 30 # 装车(夹持)
- Pick_Move2Exit = Pick_Base + 40 # 移动至出口
- Pick_UnloadCar = Pick_Base + 50 # 卸车(松开夹持)
- Pick_Move2Safety = Pick_Base + 60 # 移动至安全位置
- class TimeStatu:
- def __init__(self, statu=None, timeout=3.0):
- self.statu = statu
- self.time = time.time()
- self.timeout_ms = timeout
- def timeout(self):
- tm = time.time()
- return tm - self.time > self.timeout_ms or self.statu == None
- class Robot(MqttAsync):
- def __init__(self, name=""):
- self.timedPose_ = TimeStatu(message.AGVStatu, 0)
- self.timedNavStatu_ = TimeStatu(message.NavStatu, 0)
- self.dataTopic_ = {}
- self.messageArrivedSignal = None
- self.currentNavData_ = None
- self.navCmdTopic_ = None
- self.currentNavPathNodes_ = None
- self.currentNavPath_ = None
- self.pathColor_ = [1, 1, 0]
- self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色
- self.name_ = name
- self.l_ = 0.8 # 轮长
- self.L_ = 1.3 # 轴距
- self.W_ = 2.5 # 宽
- # 地图参数设置
- self.safety_location = "N8" # 测试用,安全位置
- self.entrance_location = "N8" # 测试用,入口位置
- self.exit_location = "N8" # 测试用,出口位置
- # 流程控制相关
- self.agv_task_tag = AGVTaskTag.Default_Idle
- def Color(self):
- if self.IsMainModeStatu():
- return [0, 0, 0]
- return self.robotColor_
- def IsMainAgv(self):
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.main_agv == True:
- return True
- return False
- def IsMainModeStatu(self):
- if self.IsMainAgv():
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.move_mode == 2:
- return True
- return False
- def SetSubDataTopic(self, match: dict, messageSignal=None):
- self.dataTopic_ = match
- self.messageArrivedSignal = messageSignal
- for item in match.items():
- [topic, _] = item
- self.subscribe(topic, self.on_message)
- def SetCmdTopic(self, topic):
- self.navCmdTopic_ = topic
- def SetColor(self, pathColor, robotColor):
- self.pathColor_ = pathColor
- self.robotColor_ = robotColor
- def ResetPose(self, agv: message.AGVStatu):
- self.timedPose_ = TimeStatu(agv, 0.5)
- def ResetNavStatu(self, statu: message.NavStatu):
- self.timedNavStatu_ = TimeStatu(statu, 0.5)
- def on_message(self, client, userdata, msg):
- topic = msg.topic
- if self.dataTopic_.get(topic) is not None:
- dtype = self.dataTopic_[topic]
- if dtype == message.AGVStatu.__name__:
- proto = message.AGVStatu()
- jtf.Parse(msg.payload.decode(), proto)
- self.ResetPose(proto)
- if dtype == message.NavStatu.__name__:
- self.last_running = self.IsNavigating()
- proto = message.NavStatu()
- jtf.Parse(msg.payload.decode(), proto)
- self.ResetNavStatu(proto)
- if self.last_running == True and self.IsNavigating() == False:
- self.currentNavPathNodes_ = None
- self.currentNavPath_ = None
- if self.messageArrivedSignal is not None:
- self.messageArrivedSignal()
- def MpcSelectTraj(self):
- traj = []
- if self.timedNavStatu_.timeout() == False:
- nav = self.timedNavStatu_.statu
- for pose2d in nav.selected_traj.poses:
- traj.append([pose2d.x, pose2d.y, pose2d.theta])
- return traj
- def MpcPredictTraj(self):
- traj = []
- if self.timedNavStatu_.timeout() == False:
- nav = self.timedNavStatu_.statu
- for pose2d in nav.predict_traj.poses:
- traj.append([pose2d.x, pose2d.y, pose2d.theta])
- return traj
- def Connected(self):
- return self.timedNavStatu_.timeout() == False and self.timedPose_.timeout() == False
- def IsNavigating(self):
- if self.timedNavStatu_.timeout() == False:
- key = self.timedNavStatu_.statu.key
- if key == "" or key == None:
- return False
- return True
- def GeneratePath(self, begID, endID):
- self.currentNavPathNodes_ = DijikstraMap().GetShortestPath(begID, endID)
- self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
- def ExecNavtask(self):
- if self.navCmdTopic_ == None:
- print("Robot has not set nav cmd topic")
- return
- if self.IsNavigating():
- print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
- return
- cmd = None
- if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() == False:
- statu = self.timedPose_.statu
- if statu is not None:
- cmd = DijikstraMap().CreateNavCmd([statu.x, statu.y, statu.theta], self.currentNavPathNodes_)
- else:
- print("current path is none")
- return
- if cmd is None:
- print("Nav cmd is None")
- return
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- '''while self.IsNavigating()==False:
- time.sleep(1)
- self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))'''
- def SwitchMoveMode(self, mode, wheelbase):
- cmd = message.NavCmd()
- if mode == 2:
- cmd.wheelbase = wheelbase
- cmd.action = 4
- if mode == 1:
- cmd.action = 5
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- def CancelNavTask(self):
- cmd = message.NavCmd()
- cmd.action = 3
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- # 存车
- def ParkTask(self, targetID=None, begID=None):
- if targetID is None:
- return
- if begID is None: # 默认AGV当前位置为安全位置,否则需要给定位置
- begID = self.safety_location
- if self.navCmdTopic_ == None:
- print("Robot has not set nav cmd topic")
- return
- if self.IsNavigating():
- print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
- return
- # 计算完整最短路径(分三段,最后合并)
- ParkPathNodes_1 = DijikstraMap().GetShortestPath(begID, self.entrance_location)
- ParkPathNodes_2 = DijikstraMap().GetShortestPath(self.entrance_location, targetID)
- ParkPathNodes_3 = DijikstraMap().GetShortestPath(targetID, self.safety_location)
- self.currentNavPathNodes_ = ParkPathNodes_1[0:len(ParkPathNodes_1) - 1] + ParkPathNodes_2[0:len(
- ParkPathNodes_2) - 1] + ParkPathNodes_3
- self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
- # 生成完整指令
- cmd = None
- if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() is False:
- # if self.currentNavPathNodes_ is not None:
- statu = self.timedPose_.statu
- if statu is not None:
- cmd = self.CreateNavCmd(ParkPathNodes_1, ParkPathNodes_2, ParkPathNodes_3, AGVTaskTag.Park_Base)
- else:
- print("current path is none")
- return
- if cmd is None:
- print("Park cmd is None")
- return
- self.setAGVTaskTag(AGVTaskTag.Park_Base)
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- # 取车
- def PickTask(self, targetID=None, begID=None):
- if targetID is None:
- return
- if begID is None: # 默认AGV当前位置为安全位置,否则需要给定位置
- begID = self.safety_location
- if self.navCmdTopic_ == None:
- print("Robot has not set nav cmd topic")
- return
- if self.IsNavigating():
- print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
- return
- # 计算完整最短路径(分三段,最后合并)
- ParkPathNodes_1 = DijikstraMap().GetShortestPath(begID, targetID)
- ParkPathNodes_2 = DijikstraMap().GetShortestPath(targetID, self.exit_location)
- ParkPathNodes_3 = DijikstraMap().GetShortestPath(self.exit_location, self.safety_location)
- self.currentNavPathNodes_ = ParkPathNodes_1[0:len(ParkPathNodes_1) - 1] + ParkPathNodes_2[0:len(
- ParkPathNodes_2) - 1] + ParkPathNodes_3
- self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
- # 生成完整指令
- cmd = None
- if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() is False:
- statu = self.timedPose_.statu
- if statu is not None:
- cmd = self.CreateNavCmd(ParkPathNodes_1, ParkPathNodes_2, ParkPathNodes_3, AGVTaskTag.Pick_Base)
- else:
- print("current path is none")
- return
- if cmd is None:
- print("Park cmd is None")
- return
- self.setAGVTaskTag(AGVTaskTag.Pick_Base)
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- # parameter:
- # task_type: 根据其值,分段生成对应的的cmd
- def CreateNavCmd(self, path_1, path_2, path_3, task_type):
- cmd_base = DijikstraMap().CreateNavCmd_part_base()
- cmd_actions = list()
- if task_type == AGVTaskTag.Park_Base:
- cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_1))
- act_clamp_1 = message.Action()
- act_clamp_1.type = (3)
- cmd_actions.append([act_clamp_1]) # 夹持
- cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_2))
- act_clamp_2 = message.Action()
- act_clamp_2.type = (4)
- cmd_actions.append([act_clamp_2]) # 松夹持
- cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_3))
- elif task_type == AGVTaskTag.Pick_Base:
- cmd_base = DijikstraMap().CreateNavCmd_part_base()
- cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_1))
- act_clamp_1 = message.Action()
- act_clamp_1.type = (3)
- cmd_actions.append([act_clamp_1]) # 夹持
- cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_2))
- act_clamp_2 = message.Action()
- act_clamp_2.type = (4)
- cmd_actions.append([act_clamp_2]) # 松夹持
- cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_3))
- if len(cmd_actions) < 3:
- return None
- else:
- for i in range(len(cmd_actions)):
- if cmd_actions[i] is not None:
- for action in cmd_actions[i]:
- cmd_base.actions.add().CopyFrom(action)
- return cmd_base
- def setAGVTaskTag(self, task_tag):
- self.agv_task_tag = task_tag
- def getAGVTaskTag(self):
- return self.agv_task_tag
|