import math from dijkstra.dijkstra_algorithm import Graph import numpy as np import scipy.spatial as spt import message_pb2 as message import uuid class Node(object): def __init__(self, id, x, y): self.id_ = id self.x_ = x self.y_ = y def distance(self, other): if not isinstance(other, (Node, StreetNode, SpaceNode)): print(" node must be Node,street_node,space_node") return -1 return math.sqrt(math.pow(other.x_ - self.x_, 2) + math.pow(other.y_ - self.y_, 2)) class StreetNode(Node): def __init__(self, id, x, y): Node.__init__(self, id, x, y) class SpaceNode(Node): def __init__(self, id, x, y, connect_nodeId): Node.__init__(self, id, x, y) self.connect_nodeId_ = connect_nodeId def singleton(cls): _instance = {} def inner(): if cls not in _instance: _instance[cls] = cls() return _instance[cls] return inner ''' map 采用单例 ''' @singleton class DijikstraMap(object): def __init__(self): self.nodes_ = {} self.graph_ = Graph() def AddVertex(self, node): if isinstance(node, (StreetNode)): print("add street node :%s " % (node.id_)) self.nodes_[node.id_] = node if isinstance(node, (SpaceNode)): if self.nodes_.get(node.connect_nodeId_) == None: print("add space node failed ,connect street node not exist") return False print("add space node :%s connect_id:%s" % (node.id_, node.connect_nodeId_)) self.nodes_[node.id_] = node self.graph_.AddVertex(node.id_, [node.x_, node.y_]) return True def AddEdge(self, id1, id2, direct=False): if self.nodes_.get(id1) == None or self.nodes_.get(id2) == None: raise ("Add edge failed %s or %s node is not exist" % (id1, id2)) print("Add Edge :%s-%s" % (id1, id2)) self.graph_.AddEdge(id1, id2) if direct == False: self.graph_.AddEdge(id2, id1) def VertexDict(self): return self.nodes_ def Edges(self): return self.graph_.graph_edges def findNeastNode(self, pt): labels = [] pts = [] for item in self.nodes_.items(): [label, node] = item labels.append(label) pts.append([node.x_, node.y_]) points = np.array(pts) ckt = spt.KDTree(data=points, leafsize=10) find_pt = np.array(pt) d, i = ckt.query(find_pt) if i >= 0 and i < len(pts): return [labels[i], pts[i]] def GetShortestPath(self, beg, end): [pathId, distance] = self.graph_.shortest_path(beg, end) print("distance:", distance) print("path:", pathId) path = [] for nodeId in pathId: node = self.nodes_[nodeId] path.append(node) return path @staticmethod def CreatePath(pathNodes, delta): last_node = None trajectry = [] for node in pathNodes: if last_node == None: last_node = node continue dis = last_node.distance(node) if dis < 0.5: last_node = node continue # 同一点 else: vector = [node.x_ - last_node.x_, node.y_ - last_node.y_] dx = vector[0] dy = vector[1] yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy)) if yaw >= 0: if dx < 0: yaw = math.pi - yaw if yaw < 0: if dx < 0: yaw = -math.pi - yaw len = int(math.sqrt(dx * dx + dy * dy) / delta) ax = math.cos(yaw) * delta ay = math.sin(yaw) * delta poses = [] if isinstance(last_node, (SpaceNode)): yaw = yaw + math.pi for i in range(len + 1): pose = [last_node.x_ + i * ax, last_node.y_ + i * ay, yaw] poses.append(pose) trajectry.append(poses) last_node = node return trajectry @staticmethod def CreateNavCmd(pose, path): if len(path) <= 1: return None cmd = message.NavCmd() cmd.action = 0 # 新导航 key = str(uuid.uuid4()) cmd.key = (key) adjustdiff = message.Pose2d() node_mpcdiff = message.Pose2d() enddiff = message.Pose2d() lastAdjustDiff = message.Pose2d() # 目标点精度设置 adjustdiff.x = (0.1) adjustdiff.y = (0.05) adjustdiff.theta = (0.5 * math.pi / 180.0) node_mpcdiff.x = (0.1) node_mpcdiff.y = (0.1) node_mpcdiff.theta = (10 * math.pi / 180.0) enddiff.x = (0.1) enddiff.y = (0.1) enddiff.theta = (5 * math.pi / 180.0) lastAdjustDiff.x = (0.03) lastAdjustDiff.y = (0.03) lastAdjustDiff.theta = (0.3 * math.pi / 180.0) # 速度限制 v_limit = message.SpeedLimit() angular_limit = message.SpeedLimit() horize_limit = message.SpeedLimit() v_limit.min = (0.1) v_limit.max = (0.2) horize_limit.min = (0.05) horize_limit.max = (0.2) angular_limit.min = (1) angular_limit.max = (25.0) # mpc速度限制 mpc_x_limit = message.SpeedLimit() mpc_angular_limit = message.SpeedLimit() mpc_x_limit.min = (0.05) mpc_x_limit.max = (1.5) mpc_angular_limit.min = (0 * math.pi / 180.0) mpc_angular_limit.max = (3 * math.pi / 180.0) # 创建动作集---------------------- last_node = None count = 0 for node in path: if last_node == None: last_node = node count += 1 continue # 运动到上一点 vector = [node.x_ - last_node.x_, node.y_ - last_node.y_] dx = vector[0] dy = vector[1] yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy)) if yaw >= 0: if dx < 0: yaw = math.pi - yaw if yaw < 0: if dx < 0: yaw = -math.pi - yaw if isinstance(last_node, (SpaceNode)): yaw = yaw + math.pi # 添加调整动作 act_adjust = message.Action() act_adjust.type = (1) act_adjust.target.x = (last_node.x_) act_adjust.target.y = (last_node.y_) act_adjust.target.theta = (yaw) # 最后一个调整点 if count == len(path) - 2: act_adjust.target_diff.CopyFrom(lastAdjustDiff) else: act_adjust.target_diff.CopyFrom(adjustdiff) act_adjust.velocity_limit.CopyFrom(v_limit) act_adjust.horize_limit.CopyFrom(horize_limit) act_adjust.angular_limit.CopyFrom(angular_limit) cmd.actions.add().CopyFrom(act_adjust) # 添加mpc动作 act_along = message.Action() act_along.type = (2) act_along.begin.x = (last_node.x_) act_along.begin.y = (last_node.y_) act_along.begin.theta = (yaw) act_along.target.x = (node.x_) act_along.target.y = (node.y_) act_along.target.theta = (yaw) if count == len(path) - 1: act_along.target_diff.CopyFrom(enddiff) else: act_along.target_diff.CopyFrom(node_mpcdiff) act_along.velocity_limit.CopyFrom(mpc_x_limit) act_along.angular_limit.CopyFrom(mpc_angular_limit) cmd.actions.add().CopyFrom(act_along) last_node = node count += 1 return cmd # 生成指令的基础信息 @staticmethod def CreateNavCmd_part_base(): cmd = message.NavCmd() cmd.action = 0 # 新导航 key = str(uuid.uuid4()) cmd.key = (key) return cmd # 生成指令的动作集 @staticmethod def CreateNavCmd_part_actions(path): if len(path) <= 1: return None adjustdiff = message.Pose2d() node_mpcdiff = message.Pose2d() enddiff = message.Pose2d() lastAdjustDiff = message.Pose2d() # 目标点精度设置 adjustdiff.x = (0.1) adjustdiff.y = (0.05) adjustdiff.theta = (0.5 * math.pi / 180.0) node_mpcdiff.x = (0.1) node_mpcdiff.y = (0.1) node_mpcdiff.theta = (10 * math.pi / 180.0) enddiff.x = (0.1) enddiff.y = (0.1) enddiff.theta = (5 * math.pi / 180.0) lastAdjustDiff.x = (0.03) lastAdjustDiff.y = (0.03) lastAdjustDiff.theta = (0.3 * math.pi / 180.0) # 速度限制 v_limit = message.SpeedLimit() angular_limit = message.SpeedLimit() horize_limit = message.SpeedLimit() v_limit.min = (0.1) v_limit.max = (0.2) horize_limit.min = (0.05) horize_limit.max = (0.2) angular_limit.min = (1) angular_limit.max = (25.0) # mpc速度限制 mpc_x_limit = message.SpeedLimit() mpc_angular_limit = message.SpeedLimit() mpc_x_limit.min = (0.05) mpc_x_limit.max = (1.5) mpc_angular_limit.min = (0 * math.pi / 180.0) mpc_angular_limit.max = (3 * math.pi / 180.0) # 创建动作集---------------------- actions = list() last_node = None count = 0 for node in path: if last_node == None: last_node = node count += 1 continue # 运动到上一点 vector = [node.x_ - last_node.x_, node.y_ - last_node.y_] dx = vector[0] dy = vector[1] yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy)) if yaw >= 0: if dx < 0: yaw = math.pi - yaw if yaw < 0: if dx < 0: yaw = -math.pi - yaw if isinstance(last_node, (SpaceNode)): yaw = yaw + math.pi # 添加调整动作 act_adjust = message.Action() act_adjust.type = (1) act_adjust.target.x = (last_node.x_) act_adjust.target.y = (last_node.y_) act_adjust.target.theta = (yaw) # 最后一个调整点 if count == len(path) - 2: act_adjust.target_diff.CopyFrom(lastAdjustDiff) else: act_adjust.target_diff.CopyFrom(adjustdiff) act_adjust.velocity_limit.CopyFrom(v_limit) act_adjust.horize_limit.CopyFrom(horize_limit) act_adjust.angular_limit.CopyFrom(angular_limit) actions.append(act_adjust) # 添加mpc动作 act_along = message.Action() act_along.type = (2) act_along.begin.x = (last_node.x_) act_along.begin.y = (last_node.y_) act_along.begin.theta = (yaw) act_along.target.x = (node.x_) act_along.target.y = (node.y_) act_along.target.theta = (yaw) if count == len(path) - 1: act_along.target_diff.CopyFrom(enddiff) else: act_along.target_diff.CopyFrom(node_mpcdiff) act_along.velocity_limit.CopyFrom(mpc_x_limit) act_along.angular_limit.CopyFrom(mpc_angular_limit) actions.append(act_along) last_node = node count += 1 return actions