import math from dijkstra.dijkstra_algorithm import Graph import numpy as np import scipy.spatial as spt import message_pb2 as message import uuid class Node(object): def __init__(self, id, x, y): self.id_ = id self.x_ = x self.y_ = y def distance(self, other): if not isinstance(other, (Node, StreetNode, SpaceNode)): print(" node must be Node,street_node,space_node") return -1 return math.sqrt(math.pow(other.x_ - self.x_, 2) + math.pow(other.y_ - self.y_, 2)) class StreetNode(Node): def __init__(self, id, x, y): Node.__init__(self, id, x, y) class SpaceNode(Node): def __init__(self, id, x, y, yaw): Node.__init__(self, id, x, y) self.yaw_ = yaw def frontPoint(self,wheelBase): x=self.x_+wheelBase/2.0*math.cos(self.yaw_) y=self.y_+wheelBase/2.0*math.sin(self.yaw_) return [x,y] def backPoint(self,wheelBase): x=self.x_-wheelBase/2.0*math.cos(self.yaw_) y=self.y_-wheelBase/2.0*math.sin(self.yaw_) return [x,y] def singleton(cls): _instance = {} def inner(): if cls not in _instance: _instance[cls] = cls() return _instance[cls] return inner ''' map 采用单例 ''' @singleton class DijikstraMap(object): def __init__(self): self.nodes_ = {} self.graph_ = Graph() def GetVertex(self,id): return self.nodes_.get(id) def AddVertex(self, node): if isinstance(node, (StreetNode)): print("add street node :%s " % (node.id_)) self.nodes_[node.id_] = node if isinstance(node, (SpaceNode)): print("add space node :%s " % (node.id_)) self.nodes_[node.id_] = node self.graph_.AddVertex(node.id_, [node.x_, node.y_]) return True def AddEdge(self, id1, id2, direct=False): if self.nodes_.get(id1) == None or self.nodes_.get(id2) == None: raise ("Add edge failed %s or %s node is not exist" % (id1, id2)) print("Add Edge :%s-%s" % (id1, id2)) self.graph_.AddEdge(id1, id2) if direct == False: self.graph_.AddEdge(id2, id1) def VertexDict(self): return self.nodes_ def Edges(self): return self.graph_.graph_edges def findNeastNode(self,pt): labels=[] pts=[] for item in self.nodes_.items(): [label,node]=item labels.append(label) pts.append([node.x_,node.y_]) points=np.array(pts) ckt=spt.KDTree(data=points,leafsize=10) find_pt=np.array(pt) d,i=ckt.query(find_pt) if i>=0 and i=0: if dx<0: yaw=math.pi-yaw if yaw<0: if dx<0: yaw=-math.pi-yaw len=int(math.sqrt(dx*dx+dy*dy)/delta) ax=math.cos(yaw)*delta ay=math.sin(yaw)*delta poses=[] if isinstance(last_node,(SpaceNode)): yaw=yaw+math.pi for i in range(len+1): pose=[last_node.x_+i*ax,last_node.y_+i*ay,yaw] poses.append(pose) trajectry.append(poses) last_node=node return trajectry @staticmethod def CreateNavCmd(pose, path): if len(path) <= 1: return None cmd = message.NavCmd() cmd.action=0 # 新导航 key = str(uuid.uuid4()) cmd.key=(key) adjustdiff = message.Pose2d() node_mpcdiff = message.Pose2d() enddiff = message.Pose2d() lastAdjustDiff=message.Pose2d() # 目标点精度设置 # 原地调整精度 adjustdiff.x=(0.1) adjustdiff.y=(0.1) adjustdiff.theta=(0.5 * math.pi / 180.0) #过程点巡线目标精度 node_mpcdiff.x=(0.05) node_mpcdiff.y=(0.05) node_mpcdiff.theta=(10 * math.pi / 180.0) #最后一个巡线目标点精度 enddiff.x=(0.02) enddiff.y=(0.02) enddiff.theta=(0.5 * math.pi / 180.0) #最后一个原地调整精度 lastAdjustDiff.x=(0.03) lastAdjustDiff.y=(0.01) lastAdjustDiff.theta=(0.7 * math.pi / 180.0) # 速度限制 v_limit = message.SpeedLimit() angular_limit = message.SpeedLimit() horize_limit = message.SpeedLimit() v_limit.min=(0.1) v_limit.max=(0.2) horize_limit.min=(0.05) horize_limit.max=(0.2) angular_limit.min=(2) angular_limit.max=(40.0) # mpc速度限制 mpc_x_limit = message.SpeedLimit() last_MPC_v=message.SpeedLimit() mpc_angular_limit = message.SpeedLimit() mpc_x_limit.min=(0.05) mpc_x_limit.max=(1.2) last_MPC_v.min=0.03 last_MPC_v.max=0.4 mpc_angular_limit.min=(0 * math.pi / 180.0) mpc_angular_limit.max=(3 * math.pi / 180.0) # 创建动作集---------------------- last_node = None count = 0 for node in path: if last_node == None: last_node = node count+=1 continue # 运动到上一点 vector = [node.x_ - last_node.x_, node.y_ - last_node.y_] dx = vector[0] dy = vector[1] yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy)) if yaw >= 0: if dx < 0: yaw = math.pi - yaw if yaw < 0: if dx < 0: yaw = -math.pi - yaw if isinstance(last_node, (SpaceNode)): yaw = yaw + math.pi # 添加调整动作 act_adjust = message.Action() act_adjust.type=(1) act_adjust.target.x=(last_node.x_) act_adjust.target.y=(last_node.y_) act_adjust.target.theta=(yaw) #最后一个调整点 if count==len(path)-2: act_adjust.target_diff.CopyFrom(lastAdjustDiff) else: act_adjust.target_diff.CopyFrom(adjustdiff) act_adjust.velocity_limit.CopyFrom(v_limit) act_adjust.horize_limit.CopyFrom(horize_limit) act_adjust.angular_limit.CopyFrom(angular_limit) cmd.actions.add().CopyFrom(act_adjust) # 添加mpc动作 act_along = message.Action() act_along.type=(2) act_along.begin.x=(last_node.x_) act_along.begin.y=(last_node.y_) act_along.begin.theta=(yaw) act_along.target.x=(node.x_) act_along.target.y=(node.y_) act_along.target.theta=(yaw) if count==len(path)-1: act_along.target_diff.CopyFrom(enddiff) else: act_along.target_diff.CopyFrom(node_mpcdiff) if isinstance(node, (SpaceNode)) or isinstance(last_node, (SpaceNode)): act_along.velocity_limit.CopyFrom(last_MPC_v) else: act_along.velocity_limit.CopyFrom(mpc_x_limit) act_along.angular_limit.CopyFrom(mpc_angular_limit) cmd.actions.add().CopyFrom(act_along) last_node = node count+=1 return cmd