import time import message_pb2 as message from mqtt_async import MqttAsync import google.protobuf.json_format as jtf from dijkstra.Map import DijikstraMap class AGVTaskTag: # ---------------------------------------------------------------- # DefaultTask Default_Base =0 Default_Idle =Default_Base +1 #初始空闲状态 # ---------------------------------------------------------------- # ParkTask Park_Base = 1000 Park_Idle = Park_Base + 1 # 等待 Park_Pause = Park_Base + 2 # 暂停 Park_Stop = Park_Base + 3 # 停止(非正常) Park_End = Park_Base + 4 # 结束(正常) Park_Ready2Park = Park_Base + 10 # 调整准备好存车 Park_Move2Entrance = Park_Base + 20 # 移动至入口 Park_LoadCar = Park_Base + 30 # 装车(夹持) Park_Move2Space = Park_Base + 40 # 移动至车位 Park_UnloadCar = Park_Base + 50 # 卸车(松开夹持) Park_Move2Safety = Park_Base + 60 # 移动至安全位置 # ---------------------------------------------------------------- # PickTask Pick_Base = 2000 Pick_Idle = Pick_Base + 0 # 等待 Pick_Pause = Pick_Base + 1 # 暂停 Pick_Stop = Pick_Base + 2 # 停止(非正常) Pick_End = Pick_Base + 3 # 结束(正常) Pick_Ready2Pick = Pick_Base + 10 # 调整准备好取车 Pick_Move2Space = Pick_Base + 20 # 移动至车位 Pick_LoadCar = Pick_Base + 30 # 装车(夹持) Pick_Move2Exit = Pick_Base + 40 # 移动至出口 Pick_UnloadCar = Pick_Base + 50 # 卸车(松开夹持) Pick_Move2Safety = Pick_Base + 60 # 移动至安全位置 class TimeStatu: def __init__(self, statu=None, timeout=3.0): self.statu = statu self.time = time.time() self.timeout_ms = timeout def timeout(self): tm = time.time() return tm - self.time > self.timeout_ms or self.statu == None class Robot(MqttAsync): def __init__(self, name=""): self.timedPose_ = TimeStatu(message.AGVStatu, 0) self.timedNavStatu_ = TimeStatu(message.NavStatu, 0) self.dataTopic_ = {} self.messageArrivedSignal = None self.currentNavData_ = None self.navCmdTopic_ = None self.currentNavPathNodes_ = None self.currentNavPath_ = None self.pathColor_ = [1, 1, 0] self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色 self.name_ = name self.l_ = 0.8 # 轮长 self.L_ = 1.3 # 轴距 self.W_ = 2.5 # 宽 # 地图参数设置 self.safety_location = "N8" # 测试用,安全位置 self.entrance_location = "N8" # 测试用,入口位置 self.exit_location = "N8" # 测试用,出口位置 # 流程控制相关 self.agv_task_tag = AGVTaskTag.Default_Idle def Color(self): if self.IsMainModeStatu(): return [0, 0, 0] return self.robotColor_ def IsMainAgv(self): if self.timedNavStatu_.timeout() == False: if self.timedNavStatu_.statu.main_agv == True: return True return False def IsMainModeStatu(self): if self.IsMainAgv(): if self.timedNavStatu_.timeout() == False: if self.timedNavStatu_.statu.move_mode == 2: return True return False def SetSubDataTopic(self, match: dict, messageSignal=None): self.dataTopic_ = match self.messageArrivedSignal = messageSignal for item in match.items(): [topic, _] = item self.subscribe(topic, self.on_message) def SetCmdTopic(self, topic): self.navCmdTopic_ = topic def SetColor(self, pathColor, robotColor): self.pathColor_ = pathColor self.robotColor_ = robotColor def ResetPose(self, agv: message.AGVStatu): self.timedPose_ = TimeStatu(agv, 0.5) def ResetNavStatu(self, statu: message.NavStatu): self.timedNavStatu_ = TimeStatu(statu, 0.5) def on_message(self, client, userdata, msg): topic = msg.topic if self.dataTopic_.get(topic) is not None: dtype = self.dataTopic_[topic] if dtype == message.AGVStatu.__name__: proto = message.AGVStatu() jtf.Parse(msg.payload.decode(), proto) self.ResetPose(proto) if dtype == message.NavStatu.__name__: self.last_running = self.IsNavigating() proto = message.NavStatu() jtf.Parse(msg.payload.decode(), proto) self.ResetNavStatu(proto) if self.last_running == True and self.IsNavigating() == False: self.currentNavPathNodes_ = None self.currentNavPath_ = None if self.messageArrivedSignal is not None: self.messageArrivedSignal() def MpcSelectTraj(self): traj = [] if self.timedNavStatu_.timeout() == False: nav = self.timedNavStatu_.statu for pose2d in nav.selected_traj.poses: traj.append([pose2d.x, pose2d.y, pose2d.theta]) return traj def MpcPredictTraj(self): traj = [] if self.timedNavStatu_.timeout() == False: nav = self.timedNavStatu_.statu for pose2d in nav.predict_traj.poses: traj.append([pose2d.x, pose2d.y, pose2d.theta]) return traj def Connected(self): return self.timedNavStatu_.timeout() == False and self.timedPose_.timeout() == False def IsNavigating(self): if self.timedNavStatu_.timeout() == False: key = self.timedNavStatu_.statu.key if key == "" or key == None: return False return True def GeneratePath(self, begID, endID): self.currentNavPathNodes_ = DijikstraMap().GetShortestPath(begID, endID) self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5) def ExecNavtask(self): if self.navCmdTopic_ == None: print("Robot has not set nav cmd topic") return if self.IsNavigating(): print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key)) return cmd = None if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() == False: statu = self.timedPose_.statu if statu is not None: cmd = DijikstraMap().CreateNavCmd([statu.x, statu.y, statu.theta], self.currentNavPathNodes_) else: print("current path is none") return if cmd is None: print("Nav cmd is None") return self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd)) '''while self.IsNavigating()==False: time.sleep(1) self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))''' def SwitchMoveMode(self, mode, wheelbase): cmd = message.NavCmd() if mode == 2: cmd.wheelbase = wheelbase cmd.action = 4 if mode == 1: cmd.action = 5 self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd)) def CancelNavTask(self): cmd = message.NavCmd() cmd.action = 3 self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd)) # 存车 def ParkTask(self, targetID=None, begID=None): if targetID is None: return if begID is None: # 默认AGV当前位置为安全位置,否则需要给定位置 begID = self.safety_location if self.navCmdTopic_ == None: print("Robot has not set nav cmd topic") return if self.IsNavigating(): print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key)) return # 计算完整最短路径(分三段,最后合并) ParkPathNodes_1 = DijikstraMap().GetShortestPath(begID, self.entrance_location) ParkPathNodes_2 = DijikstraMap().GetShortestPath(self.entrance_location, targetID) ParkPathNodes_3 = DijikstraMap().GetShortestPath(targetID, self.safety_location) self.currentNavPathNodes_ = ParkPathNodes_1[0:len(ParkPathNodes_1) - 1] + ParkPathNodes_2[0:len( ParkPathNodes_2) - 1] + ParkPathNodes_3 self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5) # 生成完整指令 cmd = None if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() is False: # if self.currentNavPathNodes_ is not None: statu = self.timedPose_.statu if statu is not None: cmd = self.CreateNavCmd(ParkPathNodes_1, ParkPathNodes_2, ParkPathNodes_3, AGVTaskTag.Park_Base) else: print("current path is none") return if cmd is None: print("Park cmd is None") return self.setAGVTaskTag(AGVTaskTag.Park_Base) self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd)) # 取车 def PickTask(self, targetID=None, begID=None): if targetID is None: return if begID is None: # 默认AGV当前位置为安全位置,否则需要给定位置 begID = self.safety_location if self.navCmdTopic_ == None: print("Robot has not set nav cmd topic") return if self.IsNavigating(): print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key)) return # 计算完整最短路径(分三段,最后合并) ParkPathNodes_1 = DijikstraMap().GetShortestPath(begID, targetID) ParkPathNodes_2 = DijikstraMap().GetShortestPath(targetID, self.exit_location) ParkPathNodes_3 = DijikstraMap().GetShortestPath(self.exit_location, self.safety_location) self.currentNavPathNodes_ = ParkPathNodes_1[0:len(ParkPathNodes_1) - 1] + ParkPathNodes_2[0:len( ParkPathNodes_2) - 1] + ParkPathNodes_3 self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5) # 生成完整指令 cmd = None if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() is False: statu = self.timedPose_.statu if statu is not None: cmd = self.CreateNavCmd(ParkPathNodes_1, ParkPathNodes_2, ParkPathNodes_3, AGVTaskTag.Pick_Base) else: print("current path is none") return if cmd is None: print("Park cmd is None") return self.setAGVTaskTag(AGVTaskTag.Pick_Base) self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd)) # parameter: # task_type: 根据其值,分段生成对应的的cmd def CreateNavCmd(self, path_1, path_2, path_3, task_type): cmd_base = DijikstraMap().CreateNavCmd_part_base() cmd_actions = list() if task_type == AGVTaskTag.Park_Base: cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_1)) act_clamp_1 = message.Action() act_clamp_1.type = (3) cmd_actions.append([act_clamp_1]) # 夹持 cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_2)) act_clamp_2 = message.Action() act_clamp_2.type = (4) cmd_actions.append([act_clamp_2]) # 松夹持 cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_3)) elif task_type == AGVTaskTag.Pick_Base: cmd_base = DijikstraMap().CreateNavCmd_part_base() cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_1)) act_clamp_1 = message.Action() act_clamp_1.type = (3) cmd_actions.append([act_clamp_1]) # 夹持 cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_2)) act_clamp_2 = message.Action() act_clamp_2.type = (4) cmd_actions.append([act_clamp_2]) # 松夹持 cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_3)) if len(cmd_actions) < 3: return None else: for i in range(len(cmd_actions)): if cmd_actions[i] is not None: for action in cmd_actions[i]: cmd_base.actions.add().CopyFrom(action) return cmd_base def setAGVTaskTag(self, task_tag): self.agv_task_tag = task_tag def getAGVTaskTag(self): return self.agv_task_tag