|
@@ -1,129 +1,181 @@
|
|
|
-
|
|
|
-
|
|
|
import time
|
|
|
import message_pb2 as message
|
|
|
from mqtt_async import MqttAsync
|
|
|
import google.protobuf.json_format as jtf
|
|
|
from dijkstra.Map import DijikstraMap
|
|
|
+
|
|
|
+
|
|
|
+class AGVTaskTag:
|
|
|
+ # ----------------------------------------------------------------
|
|
|
+ # DefaultTask
|
|
|
+ Default_Base =0
|
|
|
+ Default_Idle =Default_Base +1 #初始空闲状态
|
|
|
+ # ----------------------------------------------------------------
|
|
|
+ # ParkTask
|
|
|
+ Park_Base = 1000
|
|
|
+ Park_Idle = Park_Base + 1 # 等待
|
|
|
+ Park_Pause = Park_Base + 2 # 暂停
|
|
|
+ Park_Stop = Park_Base + 3 # 停止(非正常)
|
|
|
+ Park_End = Park_Base + 4 # 结束(正常)
|
|
|
+ Park_Ready2Park = Park_Base + 10 # 调整准备好存车
|
|
|
+ Park_Move2Entrance = Park_Base + 20 # 移动至入口
|
|
|
+ Park_LoadCar = Park_Base + 30 # 装车(夹持)
|
|
|
+ Park_Move2Space = Park_Base + 40 # 移动至车位
|
|
|
+ Park_UnloadCar = Park_Base + 50 # 卸车(松开夹持)
|
|
|
+ Park_Move2Safety = Park_Base + 60 # 移动至安全位置
|
|
|
+ # ----------------------------------------------------------------
|
|
|
+ # PickTask
|
|
|
+ Pick_Base = 2000
|
|
|
+ Pick_Idle = Pick_Base + 0 # 等待
|
|
|
+ Pick_Pause = Pick_Base + 1 # 暂停
|
|
|
+ Pick_Stop = Pick_Base + 2 # 停止(非正常)
|
|
|
+ Pick_End = Pick_Base + 3 # 结束(正常)
|
|
|
+ Pick_Ready2Pick = Pick_Base + 10 # 调整准备好取车
|
|
|
+ Pick_Move2Space = Pick_Base + 20 # 移动至车位
|
|
|
+ Pick_LoadCar = Pick_Base + 30 # 装车(夹持)
|
|
|
+ Pick_Move2Exit = Pick_Base + 40 # 移动至出口
|
|
|
+ Pick_UnloadCar = Pick_Base + 50 # 卸车(松开夹持)
|
|
|
+ Pick_Move2Safety = Pick_Base + 60 # 移动至安全位置
|
|
|
+
|
|
|
+
|
|
|
class TimeStatu:
|
|
|
- def __init__(self,statu=None,timeout=3):
|
|
|
- self.statu=statu
|
|
|
- self.time=time.time()
|
|
|
- self.timeout_ms=timeout
|
|
|
+ def __init__(self, statu=None, timeout=3.0):
|
|
|
+ self.statu = statu
|
|
|
+ self.time = time.time()
|
|
|
+ self.timeout_ms = timeout
|
|
|
|
|
|
def timeout(self):
|
|
|
- tm=time.time()
|
|
|
- return tm-self.time>self.timeout_ms or self.statu==None
|
|
|
+ tm = time.time()
|
|
|
+ return tm - self.time > self.timeout_ms or self.statu == None
|
|
|
+
|
|
|
|
|
|
class Robot(MqttAsync):
|
|
|
- def __init__(self,name=""):
|
|
|
- self.timedPose_=TimeStatu(message.AGVStatu,0)
|
|
|
- self.timedNavStatu_=TimeStatu(message.NavStatu,0)
|
|
|
- self.dataTopic_={}
|
|
|
- self.messageArrivedSignal=None
|
|
|
- self.currentNavData_=None
|
|
|
- self.navCmdTopic_=None
|
|
|
- self.currentNavPathNodes_=None
|
|
|
- self.currentNavPath_=None
|
|
|
-
|
|
|
- self.pathColor_=[1,1,0]
|
|
|
- self.robotColor_=[0.7,0.2,0.3] #agv当前颜色
|
|
|
- self.name_=name
|
|
|
-
|
|
|
- self.l_=0.8 #轮长
|
|
|
- self.L_=1.3 #轴距
|
|
|
- self.W_=2.5 #宽
|
|
|
+ def __init__(self, name=""):
|
|
|
+ self.timedPose_ = TimeStatu(message.AGVStatu, 0)
|
|
|
+ self.timedNavStatu_ = TimeStatu(message.NavStatu, 0)
|
|
|
+ self.dataTopic_ = {}
|
|
|
+ self.messageArrivedSignal = None
|
|
|
+ self.currentNavData_ = None
|
|
|
+ self.navCmdTopic_ = None
|
|
|
+ self.currentNavPathNodes_ = None
|
|
|
+ self.currentNavPath_ = None
|
|
|
+
|
|
|
+ self.pathColor_ = [1, 1, 0]
|
|
|
+ self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色
|
|
|
+ self.name_ = name
|
|
|
+
|
|
|
+ self.l_ = 0.8 # 轮长
|
|
|
+ self.L_ = 1.3 # 轴距
|
|
|
+ self.W_ = 2.5 # 宽
|
|
|
+
|
|
|
+ # 地图参数设置
|
|
|
+ self.safety_location = "N8" # 测试用,安全位置
|
|
|
+ self.entrance_location = "N8" # 测试用,入口位置
|
|
|
+ self.exit_location = "N8" # 测试用,出口位置
|
|
|
+
|
|
|
+ # 流程控制相关
|
|
|
+ self.agv_task_tag = AGVTaskTag.Default_Idle
|
|
|
|
|
|
def Color(self):
|
|
|
if self.IsMainModeStatu():
|
|
|
- return [0,0,0]
|
|
|
+ return [0, 0, 0]
|
|
|
return self.robotColor_
|
|
|
+
|
|
|
def IsMainAgv(self):
|
|
|
- if self.timedNavStatu_.timeout()==False:
|
|
|
- if self.timedNavStatu_.statu.main_agv==True:
|
|
|
+ if self.timedNavStatu_.timeout() == False:
|
|
|
+ if self.timedNavStatu_.statu.main_agv == True:
|
|
|
return True
|
|
|
return False
|
|
|
+
|
|
|
def IsMainModeStatu(self):
|
|
|
if self.IsMainAgv():
|
|
|
if self.timedNavStatu_.timeout() == False:
|
|
|
- if self.timedNavStatu_.statu.move_mode==2:
|
|
|
+ if self.timedNavStatu_.statu.move_mode == 2:
|
|
|
return True
|
|
|
return False
|
|
|
- def SetSubDataTopic(self,match:dict,messageSignal=None):
|
|
|
- self.dataTopic_=match
|
|
|
- self.messageArrivedSignal=messageSignal
|
|
|
+
|
|
|
+ def SetSubDataTopic(self, match: dict, messageSignal=None):
|
|
|
+ self.dataTopic_ = match
|
|
|
+ self.messageArrivedSignal = messageSignal
|
|
|
for item in match.items():
|
|
|
- [topic,_]=item
|
|
|
- self.subscribe(topic,self.on_message)
|
|
|
- def SetCmdTopic(self,topic):
|
|
|
- self.navCmdTopic_=topic
|
|
|
- def SetColor(self,pathColor,robotColor):
|
|
|
- self.pathColor_=pathColor
|
|
|
- self.robotColor_=robotColor
|
|
|
- def ResetPose(self,agv : message.AGVStatu):
|
|
|
- self.timedPose_=TimeStatu(agv,0.5)
|
|
|
- def ResetNavStatu(self,statu:message.NavStatu):
|
|
|
- self.timedNavStatu_=TimeStatu(statu,0.5)
|
|
|
-
|
|
|
- def on_message(self,client, userdata, msg):
|
|
|
- topic=msg.topic
|
|
|
+ [topic, _] = item
|
|
|
+ self.subscribe(topic, self.on_message)
|
|
|
+
|
|
|
+ def SetCmdTopic(self, topic):
|
|
|
+ self.navCmdTopic_ = topic
|
|
|
+
|
|
|
+ def SetColor(self, pathColor, robotColor):
|
|
|
+ self.pathColor_ = pathColor
|
|
|
+ self.robotColor_ = robotColor
|
|
|
+
|
|
|
+ def ResetPose(self, agv: message.AGVStatu):
|
|
|
+ self.timedPose_ = TimeStatu(agv, 0.5)
|
|
|
+
|
|
|
+ def ResetNavStatu(self, statu: message.NavStatu):
|
|
|
+ self.timedNavStatu_ = TimeStatu(statu, 0.5)
|
|
|
+
|
|
|
+ def on_message(self, client, userdata, msg):
|
|
|
+ topic = msg.topic
|
|
|
if self.dataTopic_.get(topic) is not None:
|
|
|
- dtype=self.dataTopic_[topic]
|
|
|
- if dtype==message.AGVStatu.__name__:
|
|
|
- proto=message.AGVStatu()
|
|
|
- jtf.Parse(msg.payload.decode(),proto)
|
|
|
+ dtype = self.dataTopic_[topic]
|
|
|
+ if dtype == message.AGVStatu.__name__:
|
|
|
+ proto = message.AGVStatu()
|
|
|
+ jtf.Parse(msg.payload.decode(), proto)
|
|
|
self.ResetPose(proto)
|
|
|
- if dtype==message.NavStatu.__name__:
|
|
|
- self.last_running=self.IsNavigating()
|
|
|
- proto=message.NavStatu()
|
|
|
- jtf.Parse(msg.payload.decode(),proto)
|
|
|
+ if dtype == message.NavStatu.__name__:
|
|
|
+ self.last_running = self.IsNavigating()
|
|
|
+ proto = message.NavStatu()
|
|
|
+ jtf.Parse(msg.payload.decode(), proto)
|
|
|
self.ResetNavStatu(proto)
|
|
|
- if self.last_running==True and self.IsNavigating()==False:
|
|
|
- self.currentNavPathNodes_=None
|
|
|
- self.currentNavPath_=None
|
|
|
+ if self.last_running == True and self.IsNavigating() == False:
|
|
|
+ self.currentNavPathNodes_ = None
|
|
|
+ self.currentNavPath_ = None
|
|
|
if self.messageArrivedSignal is not None:
|
|
|
self.messageArrivedSignal()
|
|
|
|
|
|
def MpcSelectTraj(self):
|
|
|
- traj=[]
|
|
|
- if self.timedNavStatu_.timeout()==False:
|
|
|
- nav=self.timedNavStatu_.statu
|
|
|
+ traj = []
|
|
|
+ if self.timedNavStatu_.timeout() == False:
|
|
|
+ nav = self.timedNavStatu_.statu
|
|
|
for pose2d in nav.selected_traj.poses:
|
|
|
- traj.append([pose2d.x,pose2d.y,pose2d.theta])
|
|
|
+ traj.append([pose2d.x, pose2d.y, pose2d.theta])
|
|
|
return traj
|
|
|
|
|
|
def MpcPredictTraj(self):
|
|
|
- traj=[]
|
|
|
- if self.timedNavStatu_.timeout()==False:
|
|
|
- nav=self.timedNavStatu_.statu
|
|
|
+ traj = []
|
|
|
+ if self.timedNavStatu_.timeout() == False:
|
|
|
+ nav = self.timedNavStatu_.statu
|
|
|
for pose2d in nav.predict_traj.poses:
|
|
|
- traj.append([pose2d.x,pose2d.y,pose2d.theta])
|
|
|
+ traj.append([pose2d.x, pose2d.y, pose2d.theta])
|
|
|
return traj
|
|
|
|
|
|
def Connected(self):
|
|
|
- return self.timedNavStatu_.timeout()==False and self.timedPose_.timeout()==False
|
|
|
+ return self.timedNavStatu_.timeout() == False and self.timedPose_.timeout() == False
|
|
|
+
|
|
|
def IsNavigating(self):
|
|
|
- if self.timedNavStatu_.timeout()==False:
|
|
|
- key=self.timedNavStatu_.statu.key
|
|
|
- if key=="" or key==None:
|
|
|
+ if self.timedNavStatu_.timeout() == False:
|
|
|
+ key = self.timedNavStatu_.statu.key
|
|
|
+ if key == "" or key == None:
|
|
|
return False
|
|
|
return True
|
|
|
- def GeneratePath(self,begID,endID):
|
|
|
- self.currentNavPathNodes_=DijikstraMap().GetShortestPath(begID,endID)
|
|
|
- self.currentNavPath_=DijikstraMap().CreatePath(self.currentNavPathNodes_,0.5)
|
|
|
+
|
|
|
+ def GeneratePath(self, begID, endID):
|
|
|
+ self.currentNavPathNodes_ = DijikstraMap().GetShortestPath(begID, endID)
|
|
|
+ self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
|
|
|
+
|
|
|
def ExecNavtask(self):
|
|
|
if self.navCmdTopic_ == None:
|
|
|
print("Robot has not set nav cmd topic")
|
|
|
return
|
|
|
if self.IsNavigating():
|
|
|
- print(" robot is navigating ,key:%s"%(self.timedNavStatu_.statu.key))
|
|
|
+ print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
|
|
|
return
|
|
|
|
|
|
- cmd=None
|
|
|
- if self.currentNavPathNodes_ is not None and self.timedPose_.timeout()==False:
|
|
|
- statu=self.timedPose_.statu
|
|
|
+ cmd = None
|
|
|
+ if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() == False:
|
|
|
+ statu = self.timedPose_.statu
|
|
|
if statu is not None:
|
|
|
- cmd=DijikstraMap().CreateNavCmd([statu.x,statu.y,statu.theta],self.currentNavPathNodes_)
|
|
|
+ cmd = DijikstraMap().CreateNavCmd([statu.x, statu.y, statu.theta], self.currentNavPathNodes_)
|
|
|
else:
|
|
|
print("current path is none")
|
|
|
return
|
|
@@ -132,27 +184,144 @@ class Robot(MqttAsync):
|
|
|
print("Nav cmd is None")
|
|
|
return
|
|
|
|
|
|
- self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
|
|
|
+ self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
|
|
|
'''while self.IsNavigating()==False:
|
|
|
time.sleep(1)
|
|
|
self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))'''
|
|
|
- def SwitchMoveMode(self,mode,wheelbase):
|
|
|
|
|
|
- cmd =message.NavCmd()
|
|
|
- if mode==2:
|
|
|
- cmd.wheelbase=wheelbase
|
|
|
- cmd.action=4
|
|
|
- if mode==1:
|
|
|
- cmd.action=5
|
|
|
+ def SwitchMoveMode(self, mode, wheelbase):
|
|
|
+
|
|
|
+ cmd = message.NavCmd()
|
|
|
+ if mode == 2:
|
|
|
+ cmd.wheelbase = wheelbase
|
|
|
+ cmd.action = 4
|
|
|
+ if mode == 1:
|
|
|
+ cmd.action = 5
|
|
|
|
|
|
- self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
|
|
|
+ self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
|
|
|
|
|
|
def CancelNavTask(self):
|
|
|
- cmd=message.NavCmd()
|
|
|
- cmd.action=3
|
|
|
- self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
|
|
|
+ cmd = message.NavCmd()
|
|
|
+ cmd.action = 3
|
|
|
+ self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
|
|
|
|
|
|
+ # 存车
|
|
|
+ def ParkTask(self, targetID=None, begID=None):
|
|
|
+ if targetID is None:
|
|
|
+ return
|
|
|
+ if begID is None: # 默认AGV当前位置为安全位置,否则需要给定位置
|
|
|
+ begID = self.safety_location
|
|
|
+ if self.navCmdTopic_ == None:
|
|
|
+ print("Robot has not set nav cmd topic")
|
|
|
+ return
|
|
|
+ if self.IsNavigating():
|
|
|
+ print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
|
|
|
+ return
|
|
|
+ # 计算完整最短路径(分三段,最后合并)
|
|
|
+ ParkPathNodes_1 = DijikstraMap().GetShortestPath(begID, self.entrance_location)
|
|
|
+ ParkPathNodes_2 = DijikstraMap().GetShortestPath(self.entrance_location, targetID)
|
|
|
+ ParkPathNodes_3 = DijikstraMap().GetShortestPath(targetID, self.safety_location)
|
|
|
+ self.currentNavPathNodes_ = ParkPathNodes_1[0:len(ParkPathNodes_1) - 1] + ParkPathNodes_2[0:len(
|
|
|
+ ParkPathNodes_2) - 1] + ParkPathNodes_3
|
|
|
+ self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
|
|
|
+ # 生成完整指令
|
|
|
+ cmd = None
|
|
|
+ if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() is False:
|
|
|
+ # if self.currentNavPathNodes_ is not None:
|
|
|
+ statu = self.timedPose_.statu
|
|
|
+ if statu is not None:
|
|
|
+ cmd = self.CreateNavCmd(ParkPathNodes_1, ParkPathNodes_2, ParkPathNodes_3, AGVTaskTag.Park_Base)
|
|
|
+ else:
|
|
|
+ print("current path is none")
|
|
|
+ return
|
|
|
+ if cmd is None:
|
|
|
+ print("Park cmd is None")
|
|
|
+ return
|
|
|
+
|
|
|
+ self.setAGVTaskTag(AGVTaskTag.Park_Base)
|
|
|
+ self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
|
|
|
+
|
|
|
+ # 取车
|
|
|
+ def PickTask(self, targetID=None, begID=None):
|
|
|
+ if targetID is None:
|
|
|
+ return
|
|
|
+ if begID is None: # 默认AGV当前位置为安全位置,否则需要给定位置
|
|
|
+ begID = self.safety_location
|
|
|
+
|
|
|
+ if self.navCmdTopic_ == None:
|
|
|
+ print("Robot has not set nav cmd topic")
|
|
|
+ return
|
|
|
+ if self.IsNavigating():
|
|
|
+ print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
|
|
|
+ return
|
|
|
+
|
|
|
+ # 计算完整最短路径(分三段,最后合并)
|
|
|
+ ParkPathNodes_1 = DijikstraMap().GetShortestPath(begID, targetID)
|
|
|
+ ParkPathNodes_2 = DijikstraMap().GetShortestPath(targetID, self.exit_location)
|
|
|
+ ParkPathNodes_3 = DijikstraMap().GetShortestPath(self.exit_location, self.safety_location)
|
|
|
+ self.currentNavPathNodes_ = ParkPathNodes_1[0:len(ParkPathNodes_1) - 1] + ParkPathNodes_2[0:len(
|
|
|
+ ParkPathNodes_2) - 1] + ParkPathNodes_3
|
|
|
+ self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
|
|
|
+
|
|
|
+ # 生成完整指令
|
|
|
+ cmd = None
|
|
|
+ if self.currentNavPathNodes_ is not None and self.timedPose_.timeout() is False:
|
|
|
+ statu = self.timedPose_.statu
|
|
|
+ if statu is not None:
|
|
|
+ cmd = self.CreateNavCmd(ParkPathNodes_1, ParkPathNodes_2, ParkPathNodes_3, AGVTaskTag.Pick_Base)
|
|
|
+ else:
|
|
|
+ print("current path is none")
|
|
|
+ return
|
|
|
+
|
|
|
+ if cmd is None:
|
|
|
+ print("Park cmd is None")
|
|
|
+ return
|
|
|
+
|
|
|
+ self.setAGVTaskTag(AGVTaskTag.Pick_Base)
|
|
|
+ self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
|
|
|
+
|
|
|
+ # parameter:
|
|
|
+ # task_type: 根据其值,分段生成对应的的cmd
|
|
|
+ def CreateNavCmd(self, path_1, path_2, path_3, task_type):
|
|
|
+ cmd_base = DijikstraMap().CreateNavCmd_part_base()
|
|
|
+ cmd_actions = list()
|
|
|
+
|
|
|
+ if task_type == AGVTaskTag.Park_Base:
|
|
|
+ cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_1))
|
|
|
+ act_clamp_1 = message.Action()
|
|
|
+ act_clamp_1.type = (3)
|
|
|
+ cmd_actions.append([act_clamp_1]) # 夹持
|
|
|
+ cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_2))
|
|
|
+ act_clamp_2 = message.Action()
|
|
|
+ act_clamp_2.type = (4)
|
|
|
+ cmd_actions.append([act_clamp_2]) # 松夹持
|
|
|
+ cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_3))
|
|
|
+
|
|
|
+ elif task_type == AGVTaskTag.Pick_Base:
|
|
|
+ cmd_base = DijikstraMap().CreateNavCmd_part_base()
|
|
|
+ cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_1))
|
|
|
+ act_clamp_1 = message.Action()
|
|
|
+ act_clamp_1.type = (3)
|
|
|
+ cmd_actions.append([act_clamp_1]) # 夹持
|
|
|
+ cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_2))
|
|
|
+ act_clamp_2 = message.Action()
|
|
|
+ act_clamp_2.type = (4)
|
|
|
+ cmd_actions.append([act_clamp_2]) # 松夹持
|
|
|
+ cmd_actions.append(DijikstraMap().CreateNavCmd_part_actions(path_3))
|
|
|
+
|
|
|
+ if len(cmd_actions) < 3:
|
|
|
+ return None
|
|
|
+ else:
|
|
|
+ for i in range(len(cmd_actions)):
|
|
|
+ if cmd_actions[i] is not None:
|
|
|
+ for action in cmd_actions[i]:
|
|
|
+ cmd_base.actions.add().CopyFrom(action)
|
|
|
|
|
|
+ return cmd_base
|
|
|
|
|
|
+ def setAGVTaskTag(self, task_tag):
|
|
|
+ self.agv_task_tag = task_tag
|
|
|
|
|
|
+ def getAGVTaskTag(self):
|
|
|
+ return self.agv_task_tag
|
|
|
|