import os import sys import math import heapq from heapdict import heapdict import time import numpy as np import matplotlib.pyplot as plt import scipy.spatial.kdtree as kd from planner.hybridastar import astar as astar from planner.hybridastar import planer_reeds_shepp as rs from input import make_car sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Onsite_Parking/") C = make_car.C class Node: def __init__(self, xind, yind, yawind, direction, x, y, yaw, directions, steer, cost, pind): self.xind = xind self.yind = yind self.yawind = yawind self.direction = direction self.x = x self.y = y self.yaw = yaw self.directions = directions self.steer = steer self.cost = cost self.pind = pind class Para: def __init__(self, minx, miny, minyaw, maxx, maxy, maxyaw, xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree): self.minx = minx self.miny = miny self.minyaw = minyaw self.maxx = maxx self.maxy = maxy self.maxyaw = maxyaw self.xw = xw self.yw = yw self.yaww = yaww self.xyreso = xyreso self.yawreso = yawreso self.ox = ox self.oy = oy self.kdtree = kdtree class QueuePrior: def __init__(self): self.queue = heapdict() def empty(self): return len(self.queue) == 0 # if Q is empty def put(self, item, priority): self.queue[item] = priority # push def get(self): return self.queue.popitem()[0] # pop out element with smallest priority class Path: def __init__(self, x, y, yaw, direction, cost): self.x = x self.y = y self.yaw = yaw self.direction = direction self.cost = cost def calc_parameters(ox, oy, xyreso, yawreso, kdtree): minx = round(min(ox) / xyreso) miny = round(min(oy) / xyreso) maxx = round(max(ox) / xyreso) maxy = round(max(oy) / xyreso) xw, yw = maxx - minx, maxy - miny minyaw = round(-C.PI / yawreso) - 1 maxyaw = round(C.PI / yawreso) yaww = maxyaw - minyaw return Para(minx, miny, minyaw, maxx, maxy, maxyaw, xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree) def calc_motion_set(): s = np.arange(C.MAX_STEER / C.N_STEER, C.MAX_STEER, C.MAX_STEER / C.N_STEER) steer = list(s) + [0.0] + list(-s) direc = [1.0 for _ in range(len(steer))] + [-1.0 for _ in range(len(steer))] steer = steer + steer return steer, direc def calc_index(node, P): ind = (node.yawind - P.minyaw) * P.xw * P.yw + \ (node.yind - P.miny) * P.xw + \ (node.xind - P.minx) return ind def calc_hybrid_cost(node, hmap, P): cost = node.cost + \ C.H_COST * hmap[node.xind - P.minx][node.yind - P.miny] return cost def update_node_with_analystic_expantion(n_curr, ngoal, P): path = analystic_expantion(n_curr, ngoal, P) # rs path: n -> ngoal if not path: return False, None fx = path.x[1:-1] fy = path.y[1:-1] fyaw = path.yaw[1:-1] fd = path.directions[1:-1] fcost = n_curr.cost + calc_rs_path_cost(path) fpind = calc_index(n_curr, P) fsteer = 0.0 fpath = Node(n_curr.xind, n_curr.yind, n_curr.yawind, n_curr.direction, fx, fy, fyaw, fd, fsteer, fcost, fpind) return True, fpath def calc_rs_path_cost(rspath): cost = 0.0 for lr in rspath.lengths: if lr >= 0: cost += 1 else: cost += abs(lr) * C.BACKWARD_COST for i in range(len(rspath.lengths) - 1): if rspath.lengths[i] * rspath.lengths[i + 1] < 0.0: cost += C.GEAR_COST for ctype in rspath.ctypes: if ctype != "S": cost += C.STEER_ANGLE_COST * abs(C.MAX_STEER) nctypes = len(rspath.ctypes) ulist = [0.0 for _ in range(nctypes)] for i in range(nctypes): if rspath.ctypes[i] == "R": ulist[i] = -C.MAX_STEER elif rspath.ctypes[i] == "WB": ulist[i] = C.MAX_STEER for i in range(nctypes - 1): cost += C.STEER_CHANGE_COST * abs(ulist[i + 1] - ulist[i]) return cost def calc_motion_set(): s = np.arange(C.MAX_STEER / C.N_STEER, C.MAX_STEER, C.MAX_STEER / C.N_STEER) steer = list(s) + [0.0] + list(-s) direc = [1.0 for _ in range(len(steer))] + [-1.0 for _ in range(len(steer))] steer = steer + steer return steer, direc def is_same_grid(node1, node2): if node1.xind != node2.xind or \ node1.yind != node2.yind or \ node1.yawind != node2.yawind: return False return True def calc_index(node, P): ind = (node.yawind - P.minyaw) * P.xw * P.yw + \ (node.yind - P.miny) * P.xw + \ (node.xind - P.minx) return ind def is_collision(x, y, yaw, P): for ix, iy, iyaw in zip(x, y, yaw): d = 20 dl = (C.RF - C.RB) / 2.0 r = (C.RF + C.RB) / 2.0 + d cx = ix + dl * math.cos(iyaw) cy = iy + dl * math.sin(iyaw) ids = P.kdtree.query_ball_point([cx, cy], r) if not ids: continue for i in ids: xo = P.ox[i] - cx yo = P.oy[i] - cy dx = xo * math.cos(iyaw) + yo * math.sin(iyaw) dy = -xo * math.sin(iyaw) + yo * math.cos(iyaw) if abs(dx) < r and abs(dy) < C.W / 2 + d: return True return False def calc_parameters(ox, oy, xyreso, yawreso, kdtree): minx = round(min(ox) / xyreso) miny = round(min(oy) / xyreso) maxx = round(max(ox) / xyreso) maxy = round(max(oy) / xyreso) xw, yw = maxx - minx, maxy - miny minyaw = round(-C.PI / yawreso) - 1 maxyaw = round(C.PI / yawreso) yaww = maxyaw - minyaw return Para(minx, miny, minyaw, maxx, maxy, maxyaw, xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree) def analystic_expantion(node, ngoal, P): sx, sy, syaw = node.x[-1], node.y[-1], node.yaw[-1] gx, gy, gyaw = ngoal.x[-1], ngoal.y[-1], ngoal.yaw[-1] maxc = math.tan(C.MAX_STEER) / C.WB paths = rs.calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=C.MOVE_STEP) if not paths: return None pq = QueuePrior() for path in paths: pq.put(path, calc_rs_path_cost(path)) while not pq.empty(): path = pq.get() ind = range(0, len(path.x), C.COLLISION_CHECK_STEP) pathx = [path.x[k] for k in ind] pathy = [path.y[k] for k in ind] pathyaw = [path.yaw[k] for k in ind] if not is_collision(pathx, pathy, pathyaw, P): return path return None def is_index_ok(xind, yind, xlist, ylist, yawlist, P): if xind <= P.minx or \ xind >= P.maxx or \ yind <= P.miny or \ yind >= P.maxy: return False ind = range(0, len(xlist), C.COLLISION_CHECK_STEP) nodex = [xlist[k] for k in ind] nodey = [ylist[k] for k in ind] nodeyaw = [yawlist[k] for k in ind] if is_collision(nodex, nodey, nodeyaw, P): return False return True def extract_path(closed, ngoal, nstart): rx, ry, ryaw, direc = [], [], [], [] cost = 0.0 node = ngoal while True: rx += node.x[::-1] ry += node.y[::-1] ryaw += node.yaw[::-1] direc += node.directions[::-1] cost += node.cost if is_same_grid(node, nstart): break node = closed[node.pind] rx = rx[::-1] ry = ry[::-1] ryaw = ryaw[::-1] direc = direc[::-1] direc[0] = direc[1] path = Path(rx, ry, ryaw, direc, cost) return path def calc_next_node(n_curr, c_id, u, d, P): step = C.XY_RESO * 2 nlist = math.ceil(step / C.MOVE_STEP) xlist = [n_curr.x[-1] + d * C.MOVE_STEP * math.cos(n_curr.yaw[-1])] ylist = [n_curr.y[-1] + d * C.MOVE_STEP * math.sin(n_curr.yaw[-1])] yawlist = [rs.pi_2_pi(n_curr.yaw[-1] + d * C.MOVE_STEP / C.WB * math.tan(u))] for i in range(nlist - 1): xlist.append(xlist[i] + d * C.MOVE_STEP * math.cos(yawlist[i])) ylist.append(ylist[i] + d * C.MOVE_STEP * math.sin(yawlist[i])) yawlist.append(rs.pi_2_pi(yawlist[i] + d * C.MOVE_STEP / C.WB * math.tan(u))) xind = round(xlist[-1] / P.xyreso) yind = round(ylist[-1] / P.xyreso) yawind = round(yawlist[-1] / P.yawreso) if not is_index_ok(xind, yind, xlist, ylist, yawlist, P): return None cost = 0.0 if d > 0: direction = 1 cost += abs(step) else: direction = -1 cost += abs(step) * C.BACKWARD_COST if direction != n_curr.direction: # switch back penalty cost += C.GEAR_COST cost += C.STEER_ANGLE_COST * abs(u) # steer angle penalyty cost += C.STEER_CHANGE_COST * abs(n_curr.steer - u) # steer change penalty cost = n_curr.cost + cost directions = [direction for _ in range(len(xlist))] node = Node(xind, yind, yawind, direction, xlist, ylist, yawlist, directions, u, cost, c_id) return node def hybrid_astar_planning(sx, sy, syaw, gx, gy, gyaw, ox, oy, xyreso, yawreso): sxr, syr = round(sx / xyreso), round(sy / xyreso) gxr, gyr = round(gx / xyreso), round(gy / xyreso) syawr = round(rs.pi_2_pi(syaw) / yawreso) gyawr = round(rs.pi_2_pi(gyaw) / yawreso) nstart = Node(sxr, syr, syawr, 1, [sx], [sy], [syaw], [1], 0.0, 0.0, -1) ngoal = Node(gxr, gyr, gyawr, 1, [gx], [gy], [gyaw], [1], 0.0, 0.0, -1) kdtree = kd.KDTree([[x, y] for x, y in zip(ox, oy)]) P = calc_parameters(ox, oy, xyreso, yawreso, kdtree) hmap = astar.calc_holonomic_heuristic_with_obstacle(ngoal, P.ox, P.oy, P.xyreso, 1.0) steer_set, direc_set = calc_motion_set() open_set, closed_set = {calc_index(nstart, P): nstart}, {} qp = QueuePrior() qp.put(calc_index(nstart, P), calc_hybrid_cost(nstart, hmap, P)) while True: if not open_set: return None ind = qp.get() n_curr = open_set[ind] closed_set[ind] = n_curr open_set.pop(ind) update, fpath = update_node_with_analystic_expantion(n_curr, ngoal, P) if update: fnode = fpath break for i in range(len(steer_set)): node = calc_next_node(n_curr, ind, steer_set[i], direc_set[i], P) if not node: continue node_ind = calc_index(node, P) if node_ind in closed_set: continue if node_ind not in open_set: open_set[node_ind] = node qp.put(node_ind, calc_hybrid_cost(node, hmap, P)) else: if open_set[node_ind].cost > node.cost: open_set[node_ind] = node qp.put(node_ind, calc_hybrid_cost(node, hmap, P)) return extract_path(closed_set, fnode, nstart)