import math from copy import deepcopy import json from dijkstra.dijkstra_algorithm import Graph import numpy as np import scipy.spatial as spt import message_pb2 as message import uuid class Node(object): def __init__(self, id, x, y): self.id_ = id self.x_ = x self.y_ = y def distance(self, other): if not isinstance(other, (Node, StreetNode, SpaceNode)): print(" node must be Node,street_node,space_node") return -1 return math.sqrt(math.pow(other.x_ - self.x_, 2) + math.pow(other.y_ - self.y_, 2)) class StreetNode(Node): def __init__(self, id, x, y): Node.__init__(self, id, x, y) class SpaceNode(Node): def __init__(self, id, x, y, yaw): Node.__init__(self, id, x, y) self.yaw_ = yaw def frontPoint(self, wheelBase): x = self.x_ + wheelBase / 2.0 * math.cos(self.yaw_) y = self.y_ + wheelBase / 2.0 * math.sin(self.yaw_) return [x, y] def backPoint(self, wheelBase): x = self.x_ - wheelBase / 2.0 * math.cos(self.yaw_) y = self.y_ - wheelBase / 2.0 * math.sin(self.yaw_) return [x, y] def singleton(cls): _instance = {} def inner(): if cls not in _instance: _instance[cls] = cls() return _instance[cls] return inner ''' map 采用单例 ''' class DijikstraMap(object): def __init__(self): self.nodes_ = {} # dict ,{id: StreetNode/SpaceNode} self.graph_ = Graph() def GetVertex(self, id): return self.nodes_.get(id) def AddVertex(self, node): if isinstance(node, (StreetNode)): print("add street node :%s " % (node.id_)) self.nodes_[node.id_] = node if isinstance(node, (SpaceNode)): print("add space node :%s " % (node.id_)) self.nodes_[node.id_] = node self.graph_.AddVertex(node.id_, [node.x_, node.y_]) return True def ResetVertexValue(self, node): self.graph_.ResetVertexValue(node.id_, [node.x_, node.y_]) self.nodes_[node.id_].x_ = node.x_ self.nodes_[node.id_].y_ = node.y_ def AddEdge(self, id1, id2, direct=False): if self.nodes_.get(id1) == None or self.nodes_.get(id2) == None: print("Exceptin: Add edge failed") print(id1, id2) raise ("Add edge failed %s or %s node is not exist" % (id1, id2)) print("Add Edge :%s-%s" % (id1, id2)) self.graph_.AddEdge(id1, id2) if direct == False: self.graph_.AddEdge(id2, id1) def VertexDict(self): return self.nodes_ def Edges(self): return self.graph_.graph_edges def findNeastNode(self, pt): labels = [] pts = [] for item in self.nodes_.items(): [label, node] = item labels.append(label) pts.append([node.x_, node.y_]) points = np.array(pts) ckt = spt.KDTree(data=points, leafsize=10) find_pt = np.array(pt) d, i = ckt.query(find_pt) if i >= 0 and i < len(pts): return [labels[i], pts[i]] def GetShortestPath(self, beg, end): [pathId, distance] = self.graph_.shortest_path(beg, end) print("distance:", distance) print("path:", pathId) path = [] for nodeId in pathId: node = self.nodes_[nodeId] path.append(node) return path @staticmethod def CreatePath(pathNodes, delta): last_node = None trajectry = [] for node in pathNodes: if last_node == None: last_node = node continue dis = last_node.distance(node) if dis < 0.5: last_node = node continue # 同一点 else: vector = [node.x_ - last_node.x_, node.y_ - last_node.y_] dx = vector[0] dy = vector[1] yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy)) if yaw >= 0: if dx < 0: yaw = math.pi - yaw if yaw < 0: if dx < 0: yaw = -math.pi - yaw len = int(math.sqrt(dx * dx + dy * dy) / delta) ax = math.cos(yaw) * delta ay = math.sin(yaw) * delta poses = [] if isinstance(last_node, (SpaceNode)): yaw = yaw + math.pi for i in range(len + 1): pose = [last_node.x_ + i * ax, last_node.y_ + i * ay, yaw] poses.append(pose) trajectry.append(poses) last_node = node return trajectry @staticmethod def CreateNavCmd(pose, path): if len(path) <= 1: return None cmd = message.NavCmd() cmd.action = 0 # 新导航 key = str(uuid.uuid4()) cmd.key = (key) adjustdiff = message.Pose2d() node_mpcdiff = message.Pose2d() enddiff = message.Pose2d() lastAdjustDiff = message.Pose2d() # 目标点精度设置 # 原地调整精度 adjustdiff.x = (0.1) adjustdiff.y = (0.1) adjustdiff.theta = (0.5 * math.pi / 180.0) # 过程点巡线目标精度 node_mpcdiff.x = (0.05) node_mpcdiff.y = (0.05) node_mpcdiff.theta = (10 * math.pi / 180.0) # 最后一个巡线目标点精度 enddiff.x = (0.02) enddiff.y = (0.02) enddiff.theta = (0.5 * math.pi / 180.0) # 最后一个原地调整精度 lastAdjustDiff.x = (0.03) lastAdjustDiff.y = (0.01) lastAdjustDiff.theta = (0.7 * math.pi / 180.0) # 速度限制 v_limit = message.SpeedLimit() angular_limit = message.SpeedLimit() horize_limit = message.SpeedLimit() v_limit.min = (0.1) v_limit.max = (0.2) horize_limit.min = (0.05) horize_limit.max = (0.2) angular_limit.min = (2) angular_limit.max = (40.0) # mpc速度限制 mpc_x_limit = message.SpeedLimit() last_MPC_v = message.SpeedLimit() mpc_angular_limit = message.SpeedLimit() mpc_x_limit.min = (0.05) mpc_x_limit.max = (1.2) last_MPC_v.min = 0.03 last_MPC_v.max = 0.4 mpc_angular_limit.min = (0 * math.pi / 180.0) mpc_angular_limit.max = (3 * math.pi / 180.0) # 创建动作集---------------------- last_node = None count = 0 for node in path: if last_node == None: last_node = node count += 1 continue # 运动到上一点 vector = [node.x_ - last_node.x_, node.y_ - last_node.y_] dx = vector[0] dy = vector[1] yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy)) if yaw >= 0: if dx < 0: yaw = math.pi - yaw if yaw < 0: if dx < 0: yaw = -math.pi - yaw if isinstance(last_node, (SpaceNode)): yaw = yaw + math.pi # 添加调整动作 act_adjust = message.Action() act_adjust.type = (1) act_adjust.target.x = (last_node.x_) act_adjust.target.y = (last_node.y_) act_adjust.target.theta = (yaw) # 最后一个调整点 if count == len(path) - 2: act_adjust.target_diff.CopyFrom(lastAdjustDiff) else: act_adjust.target_diff.CopyFrom(adjustdiff) act_adjust.velocity_limit.CopyFrom(v_limit) act_adjust.horize_limit.CopyFrom(horize_limit) act_adjust.angular_limit.CopyFrom(angular_limit) cmd.actions.add().CopyFrom(act_adjust) # 添加mpc动作 act_along = message.Action() act_along.type = (2) act_along.begin.x = (last_node.x_) act_along.begin.y = (last_node.y_) act_along.begin.theta = (yaw) act_along.target.x = (node.x_) act_along.target.y = (node.y_) act_along.target.theta = (yaw) if count == len(path) - 1: act_along.target_diff.CopyFrom(enddiff) else: act_along.target_diff.CopyFrom(node_mpcdiff) if isinstance(node, (SpaceNode)) or isinstance(last_node, (SpaceNode)): act_along.velocity_limit.CopyFrom(last_MPC_v) else: act_along.velocity_limit.CopyFrom(mpc_x_limit) act_along.angular_limit.CopyFrom(mpc_angular_limit) cmd.actions.add().CopyFrom(act_along) last_node = node count += 1 return cmd @singleton class MapManager(object): def __init__(self): self.default_wheel_base = 2.6 self.maps = {} self.maps["Base"] = DijikstraMap() self.maps["Front"] = DijikstraMap() self.maps["Back"] = DijikstraMap() def readJson(self, file): with open(file, 'r', encoding='utf-8') as fp: map = json.load(fp) return map def LoadJson(self, file): self.maps["Base"] = DijikstraMap() self.maps["Front"] = DijikstraMap() self.maps["Back"] = DijikstraMap() map = self.readJson(file) for node in map['street_nodes'].items(): [id, point] = node street_node = StreetNode(id, point[0], point[1]) self.maps["Base"].AddVertex(street_node) for node in map['space_nodes'].items(): [id, point] = node [x, y, yaw] = point space_node = SpaceNode(id, point[0], point[1], yaw) self.maps["Base"].AddVertex(space_node) for road in map['roads'].items(): [roadName, points] = road for pt1 in points: for pt2 in points: if not pt1 == pt2: self.maps["Base"].AddEdge(pt1, pt2) self.ResetMap("Front", 0, self.default_wheel_base/2) self.ResetMap("Back", 0, -self.default_wheel_base/2) def ResetMap(self, mapName, dx, dy): self.maps[mapName] = deepcopy(self.maps["Base"]) for id in self.maps[mapName].nodes_: if id.find("R_") >= 0: node = self.maps["Base"].GetVertex(id) self.maps[mapName].ResetVertexValue(StreetNode(node.id_, node.x_ + dx, node.y_ + dy)) return self.maps[mapName] def GetVertex(self, mapName, id): return self.maps[mapName].GetVertex(id) def AddVertex(self, mapName, node): self.maps[mapName].AddVertex(node) def AddEdge(self, mapName, id1, id2, direct=False): self.maps[mapName].AddEdge(id1, id2, direct) def Reset(self, mapName): self.maps[mapName] = DijikstraMap() def VertexDict(self, mapName): return self.maps[mapName].VertexDict() def Edges(self, mapName): return self.maps[mapName].Edges() def findNeastNode(self, mapName, pt): return self.maps[mapName].findNeastNode(pt) def GetShortestPath(self, mapName, beg, end): print("beg: ", self.maps[mapName].graph_.points[beg]) print("end: ", self.maps[mapName].graph_.points[end]) return self.maps[mapName].GetShortestPath(beg, end)