123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- import time
- import message_pb2 as message
- from mqtt_async import MqttAsync
- import google.protobuf.json_format as jtf
- from dijkstra.Map import DijikstraMap
- class TimeStatu:
- def __init__(self,statu=None,timeout=3):
- self.statu=statu
- self.time=time.time()
- self.timeout_ms=timeout
- def timeout(self):
- tm=time.time()
- return tm-self.time>self.timeout_ms or self.statu==None
- class Robot(MqttAsync):
- def __init__(self,name=""):
- self.timedPose_=TimeStatu(message.AGVStatu,0)
- self.timedNavStatu_=TimeStatu(message.NavStatu,0)
- self.dataTopic_={}
- self.messageArrivedSignal=None
- self.currentNavData_=None
- self.navCmdTopic_=None
- self.currentNavPathNodes_=None
- self.currentNavPath_=None
- self.pathColor_=[1,1,0]
- self.robotColor_=[0.7,0.2,0.3] #agv当前颜色
- self.name_=name
- self.l_=0.8 #轮长
- self.L_=1.3 #轴距
- self.W_=2.5 #宽
- def Color(self):
- if self.IsMainModeStatu():
- return [0,0,0]
- return self.robotColor_
- def IsMainAgv(self):
- if self.timedNavStatu_.timeout()==False:
- if self.timedNavStatu_.statu.main_agv==True:
- return True
- return False
- def IsMainModeStatu(self):
- if self.IsMainAgv():
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.move_mode==2:
- return True
- return False
- def SetSubDataTopic(self,match:dict,messageSignal=None):
- self.dataTopic_=match
- self.messageArrivedSignal=messageSignal
- for item in match.items():
- [topic,_]=item
- self.subscribe(topic,self.on_message)
- def SetCmdTopic(self,topic):
- self.navCmdTopic_=topic
- def SetColor(self,pathColor,robotColor):
- self.pathColor_=pathColor
- self.robotColor_=robotColor
- def ResetPose(self,agv : message.AGVStatu):
- self.timedPose_=TimeStatu(agv,0.5)
- def ResetNavStatu(self,statu:message.NavStatu):
- self.timedNavStatu_=TimeStatu(statu,0.5)
- def on_message(self,client, userdata, msg):
- topic=msg.topic
- if self.dataTopic_.get(topic) is not None:
- dtype=self.dataTopic_[topic]
- if dtype==message.AGVStatu.__name__:
- proto=message.AGVStatu()
- jtf.Parse(msg.payload.decode(),proto)
- self.ResetPose(proto)
- if dtype==message.NavStatu.__name__:
- self.last_running=self.IsNavigating()
- proto=message.NavStatu()
- jtf.Parse(msg.payload.decode(),proto)
- self.ResetNavStatu(proto)
- if self.last_running==True and self.IsNavigating()==False:
- self.currentNavPathNodes_=None
- self.currentNavPath_=None
- if self.messageArrivedSignal is not None:
- self.messageArrivedSignal()
- def MpcSelectTraj(self):
- traj=[]
- if self.timedNavStatu_.timeout()==False:
- nav=self.timedNavStatu_.statu
- for pose2d in nav.selected_traj.poses:
- traj.append([pose2d.x,pose2d.y,pose2d.theta])
- return traj
- def MpcPredictTraj(self):
- traj=[]
- if self.timedNavStatu_.timeout()==False:
- nav=self.timedNavStatu_.statu
- for pose2d in nav.predict_traj.poses:
- traj.append([pose2d.x,pose2d.y,pose2d.theta])
- return traj
- def Connected(self):
- return self.timedNavStatu_.timeout()==False and self.timedPose_.timeout()==False
- def IsNavigating(self):
- if self.timedNavStatu_.timeout()==False:
- key=self.timedNavStatu_.statu.key
- if key=="" or key==None:
- return False
- return True
- def GeneratePath(self,begID,endID):
- self.currentNavPathNodes_=DijikstraMap().GetShortestPath(begID,endID)
- self.currentNavPath_=DijikstraMap().CreatePath(self.currentNavPathNodes_,0.5)
- def ExecNavtask(self):
- if self.navCmdTopic_ == None:
- print("Robot has not set nav cmd topic")
- return
- if self.IsNavigating():
- print(" robot is navigating ,key:%s"%(self.timedNavStatu_.statu.key))
- return
- cmd=None
- if self.currentNavPathNodes_ is not None and self.timedPose_.timeout()==False:
- statu=self.timedPose_.statu
- if statu is not None:
- cmd=DijikstraMap().CreateNavCmd([statu.x,statu.y,statu.theta],self.currentNavPathNodes_)
- else:
- print("current path is none")
- return
- if cmd is None:
- print("Nav cmd is None")
- return
- self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
- '''while self.IsNavigating()==False:
- time.sleep(1)
- self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))'''
- def SwitchMoveMode(self,mode,wheelbase):
- cmd =message.NavCmd()
- if mode==2:
- cmd.wheelbase=wheelbase
- cmd.action=4
- if mode==1:
- cmd.action=5
- self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
- def CancelNavTask(self):
- cmd=message.NavCmd()
- cmd.action=3
- self.publish(self.navCmdTopic_,jtf.MessageToJson(cmd))
|