123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- import math
- from dijkstra.dijkstra_algorithm import Graph
- import numpy as np
- import scipy.spatial as spt
- import message_pb2 as message
- import uuid
- class Node(object):
- def __init__(self, id, x, y):
- self.id_ = id
- self.x_ = x
- self.y_ = y
- def distance(self, other):
- if not isinstance(other, (Node, StreetNode, SpaceNode)):
- print(" node must be Node,street_node,space_node")
- return -1
- return math.sqrt(math.pow(other.x_ - self.x_, 2) + math.pow(other.y_ - self.y_, 2))
- class StreetNode(Node):
- def __init__(self, id, x, y):
- Node.__init__(self, id, x, y)
- class SpaceNode(Node):
- def __init__(self, id, x, y, connect_nodeId):
- Node.__init__(self, id, x, y)
- self.connect_nodeId_ = connect_nodeId
- def singleton(cls):
- _instance = {}
- def inner():
- if cls not in _instance:
- _instance[cls] = cls()
- return _instance[cls]
- return inner
- '''
- map 采用单例
- '''
- @singleton
- class DijikstraMap(object):
- def __init__(self):
- self.nodes_ = {}
- self.graph_ = Graph()
- def AddVertex(self, node):
- if isinstance(node, (StreetNode)):
- print("add street node :%s " % (node.id_))
- self.nodes_[node.id_] = node
- if isinstance(node, (SpaceNode)):
- if self.nodes_.get(node.connect_nodeId_) == None:
- print("add space node failed ,connect street node not exist")
- return False
- print("add space node :%s connect_id:%s" % (node.id_, node.connect_nodeId_))
- self.nodes_[node.id_] = node
- self.graph_.AddVertex(node.id_, [node.x_, node.y_])
- return True
- def AddEdge(self, id1, id2, direct=False):
- if self.nodes_.get(id1) == None or self.nodes_.get(id2) == None:
- raise ("Add edge failed %s or %s node is not exist" % (id1, id2))
- print("Add Edge :%s-%s" % (id1, id2))
- self.graph_.AddEdge(id1, id2)
- if direct == False:
- self.graph_.AddEdge(id2, id1)
- def VertexDict(self):
- return self.nodes_
- def Edges(self):
- return self.graph_.graph_edges
- def findNeastNode(self, pt):
- labels = []
- pts = []
- for item in self.nodes_.items():
- [label, node] = item
- labels.append(label)
- pts.append([node.x_, node.y_])
- points = np.array(pts)
- ckt = spt.KDTree(data=points, leafsize=10)
- find_pt = np.array(pt)
- d, i = ckt.query(find_pt)
- if i >= 0 and i < len(pts):
- return [labels[i], pts[i]]
- def GetShortestPath(self, beg, end):
- [pathId, distance] = self.graph_.shortest_path(beg, end)
- print("distance:", distance)
- print("path:", pathId)
- path = []
- for nodeId in pathId:
- node = self.nodes_[nodeId]
- path.append(node)
- return path
- @staticmethod
- def CreatePath(pathNodes, delta):
- last_node = None
- trajectry = []
- for node in pathNodes:
- if last_node == None:
- last_node = node
- continue
- dis = last_node.distance(node)
- if dis < 0.5:
- last_node = node
- continue # 同一点
- else:
- vector = [node.x_ - last_node.x_, node.y_ - last_node.y_]
- dx = vector[0]
- dy = vector[1]
- yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy))
- if yaw >= 0:
- if dx < 0:
- yaw = math.pi - yaw
- if yaw < 0:
- if dx < 0:
- yaw = -math.pi - yaw
- len = int(math.sqrt(dx * dx + dy * dy) / delta)
- ax = math.cos(yaw) * delta
- ay = math.sin(yaw) * delta
- poses = []
- if isinstance(last_node, (SpaceNode)):
- yaw = yaw + math.pi
- for i in range(len + 1):
- pose = [last_node.x_ + i * ax, last_node.y_ + i * ay, yaw]
- poses.append(pose)
- trajectry.append(poses)
- last_node = node
- return trajectry
- @staticmethod
- def CreateNavCmd(pose, path):
- if len(path) <= 1:
- return None
- cmd = message.NavCmd()
- cmd.action = 0 # 新导航
- key = str(uuid.uuid4())
- cmd.key = (key)
- adjustdiff = message.Pose2d()
- node_mpcdiff = message.Pose2d()
- enddiff = message.Pose2d()
- lastAdjustDiff = message.Pose2d()
- # 目标点精度设置
- adjustdiff.x = (0.1)
- adjustdiff.y = (0.05)
- adjustdiff.theta = (0.5 * math.pi / 180.0)
- node_mpcdiff.x = (0.1)
- node_mpcdiff.y = (0.1)
- node_mpcdiff.theta = (10 * math.pi / 180.0)
- enddiff.x = (0.1)
- enddiff.y = (0.1)
- enddiff.theta = (5 * math.pi / 180.0)
- lastAdjustDiff.x = (0.03)
- lastAdjustDiff.y = (0.03)
- lastAdjustDiff.theta = (0.3 * math.pi / 180.0)
- # 速度限制
- v_limit = message.SpeedLimit()
- angular_limit = message.SpeedLimit()
- horize_limit = message.SpeedLimit()
- v_limit.min = (0.1)
- v_limit.max = (0.2)
- horize_limit.min = (0.05)
- horize_limit.max = (0.2)
- angular_limit.min = (1)
- angular_limit.max = (25.0)
- # mpc速度限制
- mpc_x_limit = message.SpeedLimit()
- mpc_angular_limit = message.SpeedLimit()
- mpc_x_limit.min = (0.05)
- mpc_x_limit.max = (1.5)
- mpc_angular_limit.min = (0 * math.pi / 180.0)
- mpc_angular_limit.max = (3 * math.pi / 180.0)
- # 创建动作集----------------------
- last_node = None
- count = 0
- for node in path:
- if last_node == None:
- last_node = node
- count += 1
- continue
- # 运动到上一点
- vector = [node.x_ - last_node.x_, node.y_ - last_node.y_]
- dx = vector[0]
- dy = vector[1]
- yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy))
- if yaw >= 0:
- if dx < 0:
- yaw = math.pi - yaw
- if yaw < 0:
- if dx < 0:
- yaw = -math.pi - yaw
- if isinstance(last_node, (SpaceNode)):
- yaw = yaw + math.pi
- # 添加调整动作
- act_adjust = message.Action()
- act_adjust.type = (1)
- act_adjust.target.x = (last_node.x_)
- act_adjust.target.y = (last_node.y_)
- act_adjust.target.theta = (yaw)
- # 最后一个调整点
- if count == len(path) - 2:
- act_adjust.target_diff.CopyFrom(lastAdjustDiff)
- else:
- act_adjust.target_diff.CopyFrom(adjustdiff)
- act_adjust.velocity_limit.CopyFrom(v_limit)
- act_adjust.horize_limit.CopyFrom(horize_limit)
- act_adjust.angular_limit.CopyFrom(angular_limit)
- cmd.actions.add().CopyFrom(act_adjust)
- # 添加mpc动作
- act_along = message.Action()
- act_along.type = (2)
- act_along.begin.x = (last_node.x_)
- act_along.begin.y = (last_node.y_)
- act_along.begin.theta = (yaw)
- act_along.target.x = (node.x_)
- act_along.target.y = (node.y_)
- act_along.target.theta = (yaw)
- if count == len(path) - 1:
- act_along.target_diff.CopyFrom(enddiff)
- else:
- act_along.target_diff.CopyFrom(node_mpcdiff)
- act_along.velocity_limit.CopyFrom(mpc_x_limit)
- act_along.angular_limit.CopyFrom(mpc_angular_limit)
- cmd.actions.add().CopyFrom(act_along)
- last_node = node
- count += 1
- return cmd
- # 生成指令的基础信息
- @staticmethod
- def CreateNavCmd_part_base():
- cmd = message.NavCmd()
- cmd.action = 0 # 新导航
- key = str(uuid.uuid4())
- cmd.key = (key)
- return cmd
- # 生成指令的动作集
- @staticmethod
- def CreateNavCmd_part_actions(path):
- if len(path) <= 1:
- return None
- adjustdiff = message.Pose2d()
- node_mpcdiff = message.Pose2d()
- enddiff = message.Pose2d()
- lastAdjustDiff = message.Pose2d()
- # 目标点精度设置
- adjustdiff.x = (0.1)
- adjustdiff.y = (0.05)
- adjustdiff.theta = (0.5 * math.pi / 180.0)
- node_mpcdiff.x = (0.1)
- node_mpcdiff.y = (0.1)
- node_mpcdiff.theta = (10 * math.pi / 180.0)
- enddiff.x = (0.1)
- enddiff.y = (0.1)
- enddiff.theta = (5 * math.pi / 180.0)
- lastAdjustDiff.x = (0.03)
- lastAdjustDiff.y = (0.03)
- lastAdjustDiff.theta = (0.3 * math.pi / 180.0)
- # 速度限制
- v_limit = message.SpeedLimit()
- angular_limit = message.SpeedLimit()
- horize_limit = message.SpeedLimit()
- v_limit.min = (0.1)
- v_limit.max = (0.2)
- horize_limit.min = (0.05)
- horize_limit.max = (0.2)
- angular_limit.min = (1)
- angular_limit.max = (25.0)
- # mpc速度限制
- mpc_x_limit = message.SpeedLimit()
- mpc_angular_limit = message.SpeedLimit()
- mpc_x_limit.min = (0.05)
- mpc_x_limit.max = (1.5)
- mpc_angular_limit.min = (0 * math.pi / 180.0)
- mpc_angular_limit.max = (3 * math.pi / 180.0)
- # 创建动作集----------------------
- actions = list()
- last_node = None
- count = 0
- for node in path:
- if last_node == None:
- last_node = node
- count += 1
- continue
- # 运动到上一点
- vector = [node.x_ - last_node.x_, node.y_ - last_node.y_]
- dx = vector[0]
- dy = vector[1]
- yaw = math.asin(dy / math.sqrt(dx * dx + dy * dy))
- if yaw >= 0:
- if dx < 0:
- yaw = math.pi - yaw
- if yaw < 0:
- if dx < 0:
- yaw = -math.pi - yaw
- if isinstance(last_node, (SpaceNode)):
- yaw = yaw + math.pi
- # 添加调整动作
- act_adjust = message.Action()
- act_adjust.type = (1)
- act_adjust.target.x = (last_node.x_)
- act_adjust.target.y = (last_node.y_)
- act_adjust.target.theta = (yaw)
- # 最后一个调整点
- if count == len(path) - 2:
- act_adjust.target_diff.CopyFrom(lastAdjustDiff)
- else:
- act_adjust.target_diff.CopyFrom(adjustdiff)
- act_adjust.velocity_limit.CopyFrom(v_limit)
- act_adjust.horize_limit.CopyFrom(horize_limit)
- act_adjust.angular_limit.CopyFrom(angular_limit)
- actions.append(act_adjust)
- # 添加mpc动作
- act_along = message.Action()
- act_along.type = (2)
- act_along.begin.x = (last_node.x_)
- act_along.begin.y = (last_node.y_)
- act_along.begin.theta = (yaw)
- act_along.target.x = (node.x_)
- act_along.target.y = (node.y_)
- act_along.target.theta = (yaw)
- if count == len(path) - 1:
- act_along.target_diff.CopyFrom(enddiff)
- else:
- act_along.target_diff.CopyFrom(node_mpcdiff)
- act_along.velocity_limit.CopyFrom(mpc_x_limit)
- act_along.angular_limit.CopyFrom(mpc_angular_limit)
- actions.append(act_along)
- last_node = node
- count += 1
- return actions
|