123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- import enum
- import time
- import message_pb2 as message
- from mqtt_async import MqttAsync
- import google.protobuf.json_format as jtf
- from dijkstra.Map import DijikstraMap,SpaceNode,StreetNode
- import uuid
- from auto_test import TestCount
- import random
- import math
- class ActType(enum.Enum):
- eReady=0
- eRotation=1
- eHorizon=2
- eVertical=3
- eMPC=4
- eClampOpen=5
- eClampClose=6
- class TimeStatu:
- def __init__(self, statu=None, timeout=3):
- self.statu = statu
- self.time = time.time()
- self.timeout_ms = timeout
- def timeout(self):
- tm = time.time()
- return tm - self.time > self.timeout_ms or self.statu == None
- class Robot(MqttAsync):
- def __init__(self, name=""):
- MqttAsync.__init__(self)
- self.timedRobotStatu_ = TimeStatu(message.RobotStatu, 0)
- self.timedNavStatu_ = TimeStatu(message.NavStatu, 0)
- self.dataTopic_ = {}
- self.messageArrivedSignal = None
- self.currentNavData_ = None
- self.navCmdTopic_ = None
- self.currentNavPathNodes_ = None
- self.currentNavPath_ = None
- self.pathColor_ = [1, 1, 0]
- self.robotColor_ = [0.7, 0.2, 0.3] # agv当前颜色
- self.name_ = name
- self.begId_="------"
- self.targetId_="------"
- self.autoTest_=False
- self.l_ = 0.8 # 轮长
- self.L_ = 1.3 # 轴距
- self.W_ = 2.5 # 宽
- def Color(self):
- if self.IsMainModeStatu():
- return [0, 0, 0]
- return self.robotColor_
- def IsMainAgv(self):
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.main_agv == True:
- return True
- return False
- def IsMainModeStatu(self):
- if self.IsMainAgv():
- if self.timedNavStatu_.timeout() == False:
- if self.timedNavStatu_.statu.move_mode == 2:
- return True
- return False
- def SetSubDataTopic(self, match: dict, messageSignal=None):
- self.dataTopic_ = match
- self.messageArrivedSignal = messageSignal
- for item in match.items():
- [topic, _] = item
- self.subscribe(topic, self.on_message)
- def SetCmdTopic(self, topic):
- self.navCmdTopic_ = topic
- def SetColor(self, pathColor, robotColor):
- self.pathColor_ = pathColor
- self.robotColor_ = robotColor
- def ResetPose(self, agv: message.RobotStatu):
- self.timedRobotStatu_ = TimeStatu(agv, 0.5)
- def ResetNavStatu(self, statu: message.NavStatu):
- self.timedNavStatu_ = TimeStatu(statu, 0.5)
- def on_message(self, client, userdata, msg):
- topic = msg.topic
- if self.dataTopic_.get(topic) is not None:
- dtype = self.dataTopic_[topic]
- if dtype == message.RobotStatu.__name__:
- proto = message.RobotStatu()
- jtf.Parse(msg.payload.decode(), proto)
- self.ResetPose(proto)
- if dtype == message.NavStatu.__name__:
- self.last_running = self.IsNavigating()
- proto = message.NavStatu()
- jtf.Parse(msg.payload.decode(), proto)
- self.ResetNavStatu(proto)
- if self.last_running == True and self.IsNavigating() == False:
- self.currentNavPathNodes_ = None
- self.currentNavPath_ = None
- if self.messageArrivedSignal is not None:
- self.messageArrivedSignal()
- def MpcSelectTraj(self):
- traj = []
- if self.timedNavStatu_.timeout() == False:
- nav = self.timedNavStatu_.statu
- for pose2d in nav.selected_traj.poses:
- traj.append([pose2d.x, pose2d.y, pose2d.theta])
- return traj
- def MpcPredictTraj(self):
- traj = []
- if self.timedNavStatu_.timeout() == False:
- nav = self.timedNavStatu_.statu
- for pose2d in nav.predict_traj.poses:
- traj.append([pose2d.x, pose2d.y, pose2d.theta])
- return traj
- def Connected(self):
- return self.timedNavStatu_.timeout() == False and self.timedRobotStatu_.timeout() == False
- def IsNavigating(self):
- return not self.ActionType()==ActType.eReady
- '''if self.timedNavStatu_.timeout() == False:
- key = self.timedNavStatu_.statu.key
- if key == "" or key == None:
- return False
- return True'''
- def GeneratePath(self, begID, endID):
- self.currentNavPathNodes_ = DijikstraMap().GetShortestPath(begID, endID)
- self.currentNavPath_ = DijikstraMap().CreatePath(self.currentNavPathNodes_, 0.5)
- if self.currentNavPathNodes_ is not None and self.currentNavPath_ is not None:
- self.begId_=begID
- self.targetId_=endID
- return True
- return False
- def PositionId(self):
- if self.timedRobotStatu_.timeout():
- return None
- x=self.timedRobotStatu_.statu.x
- y=self.timedRobotStatu_.statu.y
- djks_map = DijikstraMap()
- [label, pt] = djks_map.findNeastNode([x, y])
- return [label,pt]
- '''
- 阻塞直到导航完成
- '''
- def Navigatting(self, begId, targetId):
- print("nav")
- self.GeneratePath(begId, targetId)
- self.ExecPathNodes()
- #self.ExecNavtask()
- while self.IsNavigating() == True:
- '''if self.Connected() == False:
- print("robot disconnected cancel task")
- self.CancelNavTask()
- return False'''
- time.sleep(0.5)
- print(" Nav completed!!!")
- return True
- def TestNavClampOnce(self,begId,targetId):
- if self.IsClampClosed()==False:
- print("closed")
- if self.ClampClose()==False:
- return False
- if self.Navigatting(begId,targetId):
- if self.ClampOpen():
- TestCount().loadCountAdd()
- return True
- return False
- def AutoTestNavClamp(self,begId,targetId):
- beg=begId
- target=targetId
- while self.autoTest_:
- if self.TestNavClampOnce(beg,target)==False:
- print(" quit auto Test")
- break
- print("1111")
- posInfo=self.PositionId()
- if posInfo is not None:
- [label,pt]=posInfo
- beg=label
- print("beg",beg)
- node=DijikstraMap().GetVertex(beg)
- if node is not None:
- if isinstance(node,(SpaceNode)):
- target="O"
- if isinstance(node,(StreetNode)):
- end_nodes=["E","B","C","D"]
- id=random.randint(0,1000)%4
- target=end_nodes[id]
- print("3333",target)
- print(" Next nav clamp cmd,%s to %s"%(beg,target))
- def ActionType(self):
- if self.timedNavStatu_.timeout() == False:
- runningStatu=self.timedNavStatu_.statu
- if runningStatu.statu==0:
- return ActType.eReady
- if runningStatu.statu==1:
- return ActType.eRotation
- if runningStatu.statu==2:
- return ActType.eHorizon
- if runningStatu.statu==3:
- return ActType.eVertical
- if runningStatu.statu==4:
- return ActType.eMPC
- if runningStatu.statu==5:
- return ActType.eClampOpen
- if runningStatu.statu==6:
- return ActType.eClampClose
- else:
- return ActType.eReady
- def ExecNavtask(self):
- if self.navCmdTopic_ == None:
- print("Robot has not set nav cmd topic")
- return False
- if not self.ActionType()==ActType.eReady:
- print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
- return False
- cmd = None
- if self.currentNavPathNodes_ is not None and self.timedRobotStatu_.timeout() == False:
- statu = self.timedRobotStatu_.statu
- if statu is not None:
- cmd = DijikstraMap().CreateNavCmd([statu.x, statu.y, statu.theta], self.currentNavPathNodes_)
- else:
- print("current path is none")
- return False
- if cmd is None:
- print("Nav cmd is None")
- return False
- published=False
- while self.IsNavigating() == False:
- if not self.ActionType() == ActType.eReady:
- published=True
- if published==False:
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- time.sleep(1)
- print("send nav cmd completed!!!")
- return True
- def ExecPathNodes(self,autoDirect=False):
- if self.navCmdTopic_ == None:
- print("Robot has not set nav cmd topic")
- return False
- if not self.ActionType()==ActType.eReady:
- print(" robot is navigating ,key:%s" % (self.timedNavStatu_.statu.key))
- return False
- cmd = message.NavCmd()
- cmd.action=6
- if self.currentNavPathNodes_ is not None and self.timedRobotStatu_.timeout() == False:
- statu = self.timedRobotStatu_.statu
- if statu is not None:
- newAction=message.NewAction()
- limitMPC_v=message.SpeedLimit()
- limitRotate=message.SpeedLimit()
- limAdjustV=message.SpeedLimit()
- limitAdjustH=message.SpeedLimit()
- limitMPC_v.min=0.05
- limitMPC_v.max=1.2
- limitRotate.min=3*math.pi/180.0
- limitRotate.max=25*math.pi/180.0
- limAdjustV.min=0.03
- limAdjustV.max=0.3
- limitAdjustH.min=0.03
- limitAdjustH.max=0.3
- count=0
- size=len(self.currentNavPathNodes_)
- for i in range(size):
- node=self.currentNavPathNodes_[i]
- pose=message.Pose2d()
- accuracy=message.Pose2d()
- pose.x=node.x_
- pose.y=node.y_
- if i+1<size:
- next=self.currentNavPathNodes_[i+1]
- vector = [next.x_ - node.x_, next.y_ - node.y_]
- dx = vector[0]
- dy = vector[1]
- yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy))
- if yaw >= 0:
- if dx < 0:
- yaw = math.pi - yaw
- if yaw < 0:
- if dx < 0:
- yaw = -math.pi - yaw
- pose.theta=yaw
- else:
- pose.theta=0
- if count==size-1:
- accuracy.x=0.02
- accuracy.y=0.02
- accuracy.theta=0.5*math.pi/(180.0)
- else:
- accuracy.x=0.05
- accuracy.y=0.1
- accuracy.theta=3*math.pi/180.0
- pathNode=message.PathNode()
- pathNode.pose.CopyFrom(pose)
- pathNode.accuracy.CopyFrom(accuracy)
- newAction.type=4
- if autoDirect:
- newAction.type=3
- newAction.pathNodes.add().CopyFrom(pathNode)
- newAction.NodeVelocityLimit.CopyFrom(limitMPC_v)
- newAction.NodeAngularLimit.CopyFrom(limitRotate)
- newAction.adjustVelocitylimit.CopyFrom(limAdjustV)
- newAction.adjustHorizonLimit.CopyFrom(limitAdjustH)
- cmd.newActions.add().CopyFrom(newAction)
- count+=1
- else:
- print("statu is None")
- return False
- else:
- print("current path is none")
- return False
- if cmd is None:
- print("Nav cmd is None")
- return False
- print(cmd)
- published=False
- while self.IsNavigating() == False:
- if not self.ActionType() == ActType.eReady:
- published=True
- if published==False:
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- time.sleep(1)
- print("send nav cmd completed!!!")
- return True
- def SwitchMoveMode(self, mode, wheelbase):
- if self.IsMainAgv() == False:
- print(" this agv is single can not switch mode")
- return False
- cmd = message.NavCmd()
- if mode == 2:
- cmd.wheelbase = wheelbase
- cmd.action = 4
- if mode == 1:
- cmd.action = 5
- loop=3
- while loop>0:
- if mode == 2:
- if self.IsMainModeStatu():
- return True
- if mode == 1:
- if self.IsMainModeStatu() == False:
- return True
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- loop-=1
- time.sleep(0.5)
- return False
- def IsClampClosed(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 1 and clamp_other == 1)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 1
- return False
- def IsClampRunning(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 0 or clamp_other == 0)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 0
- return False
- def IsClampOpened(self):
- if self.timedRobotStatu_.timeout() == False:
- if self.IsMainModeStatu():
- clamp = self.timedRobotStatu_.statu.agvStatu.clamp
- clamp_other = self.timedRobotStatu_.statu.agvStatu.clamp_other
- return (clamp == 2 and clamp_other == 2)
- else:
- return self.timedRobotStatu_.statu.agvStatu.clamp == 2
- return False
- def ClampClose(self):
- if self.IsClampClosed() == True:
- print("clamp closed")
- return True
- cmd = message.NavCmd()
- key = str(uuid.uuid4())
- cmd.key = (key)
- act = message.Action()
- act.type = 3
- cmd.actions.add().CopyFrom(act)
- published=False
- while self.IsClampClosed()==False:
- if self.ActionType()==ActType.eClampClose:
- published=True
- if published==False:
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- time.sleep(0.5)
- return True
- def ClampOpen(self):
- if self.IsClampOpened() == True:
- print("clamp opended")
- return True
- cmd = message.NavCmd()
- key = str(uuid.uuid4())
- cmd.key = (key)
- act = message.Action()
- act.type = 4
- cmd.actions.add().CopyFrom(act)
- published=False
- while self.IsClampOpened()==False:
- if self.ActionType()==ActType.eClampOpen:
- published=True
- if published==False:
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- time.sleep(0.5)
- return True
- def CancelNavTask(self):
- cmd = message.NavCmd()
- cmd.action = 3
- while self.IsNavigating() == True:
- self.publish(self.navCmdTopic_, jtf.MessageToJson(cmd))
- time.sleep(1)
- print(" Cancel task completed!!!")
- return True
|