123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- import os
- import sys
- import math
- import heapq
- from heapdict import heapdict
- import time
- import numpy as np
- import matplotlib.pyplot as plt
- import scipy.spatial.kdtree as kd
- from planner.hybridastar import astar as astar
- from planner.hybridastar import planer_reeds_shepp as rs
- from input import make_car
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
- "/../../Onsite_Parking/")
- C = make_car.C
- class Node:
- def __init__(self, xind, yind, yawind, direction, x, y,
- yaw, directions, steer, cost, pind):
- self.xind = xind
- self.yind = yind
- self.yawind = yawind
- self.direction = direction
- self.x = x
- self.y = y
- self.yaw = yaw
- self.directions = directions
- self.steer = steer
- self.cost = cost
- self.pind = pind
- class Para:
- def __init__(self, minx, miny, minyaw, maxx, maxy, maxyaw,
- xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree):
- self.minx = minx
- self.miny = miny
- self.minyaw = minyaw
- self.maxx = maxx
- self.maxy = maxy
- self.maxyaw = maxyaw
- self.xw = xw
- self.yw = yw
- self.yaww = yaww
- self.xyreso = xyreso
- self.yawreso = yawreso
- self.ox = ox
- self.oy = oy
- self.kdtree = kdtree
- class QueuePrior:
- def __init__(self):
- self.queue = heapdict()
- def empty(self):
- return len(self.queue) == 0 # if Q is empty
- def put(self, item, priority):
- self.queue[item] = priority # push
- def get(self):
- return self.queue.popitem()[0] # pop out element with smallest priority
- class Path:
- def __init__(self, x, y, yaw, direction, cost):
- self.x = x
- self.y = y
- self.yaw = yaw
- self.direction = direction
- self.cost = cost
- def calc_parameters(ox, oy, xyreso, yawreso, kdtree):
- minx = round(min(ox) / xyreso)
- miny = round(min(oy) / xyreso)
- maxx = round(max(ox) / xyreso)
- maxy = round(max(oy) / xyreso)
- xw, yw = maxx - minx, maxy - miny
- minyaw = round(-C.PI / yawreso) - 1
- maxyaw = round(C.PI / yawreso)
- yaww = maxyaw - minyaw
- return Para(minx, miny, minyaw, maxx, maxy, maxyaw,
- xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree)
- def calc_motion_set():
- s = np.arange(C.MAX_STEER / C.N_STEER,
- C.MAX_STEER, C.MAX_STEER / C.N_STEER)
- steer = list(s) + [0.0] + list(-s)
- direc = [1.0 for _ in range(len(steer))] + [-1.0 for _ in range(len(steer))]
- steer = steer + steer
- return steer, direc
- def calc_index(node, P):
- ind = (node.yawind - P.minyaw) * P.xw * P.yw + \
- (node.yind - P.miny) * P.xw + \
- (node.xind - P.minx)
- return ind
- def calc_hybrid_cost(node, hmap, P):
- cost = node.cost + \
- C.H_COST * hmap[node.xind - P.minx][node.yind - P.miny]
- return cost
- def update_node_with_analystic_expantion(n_curr, ngoal, P):
- path = analystic_expantion(n_curr, ngoal, P) # rs path: n -> ngoal
- if not path:
- return False, None
- fx = path.x[1:-1]
- fy = path.y[1:-1]
- fyaw = path.yaw[1:-1]
- fd = path.directions[1:-1]
- fcost = n_curr.cost + calc_rs_path_cost(path)
- fpind = calc_index(n_curr, P)
- fsteer = 0.0
- fpath = Node(n_curr.xind, n_curr.yind, n_curr.yawind, n_curr.direction,
- fx, fy, fyaw, fd, fsteer, fcost, fpind)
- return True, fpath
- def calc_rs_path_cost(rspath):
- cost = 0.0
- for lr in rspath.lengths:
- if lr >= 0:
- cost += 1
- else:
- cost += abs(lr) * C.BACKWARD_COST
- for i in range(len(rspath.lengths) - 1):
- if rspath.lengths[i] * rspath.lengths[i + 1] < 0.0:
- cost += C.GEAR_COST
- for ctype in rspath.ctypes:
- if ctype != "S":
- cost += C.STEER_ANGLE_COST * abs(C.MAX_STEER)
- nctypes = len(rspath.ctypes)
- ulist = [0.0 for _ in range(nctypes)]
- for i in range(nctypes):
- if rspath.ctypes[i] == "R":
- ulist[i] = -C.MAX_STEER
- elif rspath.ctypes[i] == "WB":
- ulist[i] = C.MAX_STEER
- for i in range(nctypes - 1):
- cost += C.STEER_CHANGE_COST * abs(ulist[i + 1] - ulist[i])
- return cost
- def calc_motion_set():
- s = np.arange(C.MAX_STEER / C.N_STEER,
- C.MAX_STEER, C.MAX_STEER / C.N_STEER)
- steer = list(s) + [0.0] + list(-s)
- direc = [1.0 for _ in range(len(steer))] + [-1.0 for _ in range(len(steer))]
- steer = steer + steer
- return steer, direc
- def is_same_grid(node1, node2):
- if node1.xind != node2.xind or \
- node1.yind != node2.yind or \
- node1.yawind != node2.yawind:
- return False
- return True
- def calc_index(node, P):
- ind = (node.yawind - P.minyaw) * P.xw * P.yw + \
- (node.yind - P.miny) * P.xw + \
- (node.xind - P.minx)
- return ind
- def is_collision(x, y, yaw, P):
- for ix, iy, iyaw in zip(x, y, yaw):
- d = 20
- dl = (C.RF - C.RB) / 2.0
- r = (C.RF + C.RB) / 2.0 + d
- cx = ix + dl * math.cos(iyaw)
- cy = iy + dl * math.sin(iyaw)
- ids = P.kdtree.query_ball_point([cx, cy], r)
- if not ids:
- continue
- for i in ids:
- xo = P.ox[i] - cx
- yo = P.oy[i] - cy
- dx = xo * math.cos(iyaw) + yo * math.sin(iyaw)
- dy = -xo * math.sin(iyaw) + yo * math.cos(iyaw)
- if abs(dx) < r and abs(dy) < C.W / 2 + d:
- return True
- return False
- def calc_parameters(ox, oy, xyreso, yawreso, kdtree):
- minx = round(min(ox) / xyreso)
- miny = round(min(oy) / xyreso)
- maxx = round(max(ox) / xyreso)
- maxy = round(max(oy) / xyreso)
- xw, yw = maxx - minx, maxy - miny
- minyaw = round(-C.PI / yawreso) - 1
- maxyaw = round(C.PI / yawreso)
- yaww = maxyaw - minyaw
- return Para(minx, miny, minyaw, maxx, maxy, maxyaw,
- xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree)
- def analystic_expantion(node, ngoal, P):
- sx, sy, syaw = node.x[-1], node.y[-1], node.yaw[-1]
- gx, gy, gyaw = ngoal.x[-1], ngoal.y[-1], ngoal.yaw[-1]
- maxc = math.tan(C.MAX_STEER) / C.WB
- paths = rs.calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=C.MOVE_STEP)
- if not paths:
- return None
- pq = QueuePrior()
- for path in paths:
- pq.put(path, calc_rs_path_cost(path))
- while not pq.empty():
- path = pq.get()
- ind = range(0, len(path.x), C.COLLISION_CHECK_STEP)
- pathx = [path.x[k] for k in ind]
- pathy = [path.y[k] for k in ind]
- pathyaw = [path.yaw[k] for k in ind]
- if not is_collision(pathx, pathy, pathyaw, P):
- return path
- return None
- def is_index_ok(xind, yind, xlist, ylist, yawlist, P):
- if xind <= P.minx or \
- xind >= P.maxx or \
- yind <= P.miny or \
- yind >= P.maxy:
- return False
- ind = range(0, len(xlist), C.COLLISION_CHECK_STEP)
- nodex = [xlist[k] for k in ind]
- nodey = [ylist[k] for k in ind]
- nodeyaw = [yawlist[k] for k in ind]
- if is_collision(nodex, nodey, nodeyaw, P):
- return False
- return True
- def extract_path(closed, ngoal, nstart):
- rx, ry, ryaw, direc = [], [], [], []
- cost = 0.0
- node = ngoal
- while True:
- rx += node.x[::-1]
- ry += node.y[::-1]
- ryaw += node.yaw[::-1]
- direc += node.directions[::-1]
- cost += node.cost
- if is_same_grid(node, nstart):
- break
- node = closed[node.pind]
- rx = rx[::-1]
- ry = ry[::-1]
- ryaw = ryaw[::-1]
- direc = direc[::-1]
- direc[0] = direc[1]
- path = Path(rx, ry, ryaw, direc, cost)
- return path
- def calc_next_node(n_curr, c_id, u, d, P):
- step = C.XY_RESO * 2
- nlist = math.ceil(step / C.MOVE_STEP)
- xlist = [n_curr.x[-1] + d * C.MOVE_STEP * math.cos(n_curr.yaw[-1])]
- ylist = [n_curr.y[-1] + d * C.MOVE_STEP * math.sin(n_curr.yaw[-1])]
- yawlist = [rs.pi_2_pi(n_curr.yaw[-1] + d * C.MOVE_STEP / C.WB * math.tan(u))]
- for i in range(nlist - 1):
- xlist.append(xlist[i] + d * C.MOVE_STEP * math.cos(yawlist[i]))
- ylist.append(ylist[i] + d * C.MOVE_STEP * math.sin(yawlist[i]))
- yawlist.append(rs.pi_2_pi(yawlist[i] + d * C.MOVE_STEP / C.WB * math.tan(u)))
- xind = round(xlist[-1] / P.xyreso)
- yind = round(ylist[-1] / P.xyreso)
- yawind = round(yawlist[-1] / P.yawreso)
- if not is_index_ok(xind, yind, xlist, ylist, yawlist, P):
- return None
- cost = 0.0
- if d > 0:
- direction = 1
- cost += abs(step)
- else:
- direction = -1
- cost += abs(step) * C.BACKWARD_COST
- if direction != n_curr.direction: # switch back penalty
- cost += C.GEAR_COST
- cost += C.STEER_ANGLE_COST * abs(u) # steer angle penalyty
- cost += C.STEER_CHANGE_COST * abs(n_curr.steer - u) # steer change penalty
- cost = n_curr.cost + cost
- directions = [direction for _ in range(len(xlist))]
- node = Node(xind, yind, yawind, direction, xlist, ylist,
- yawlist, directions, u, cost, c_id)
- return node
- def hybrid_astar_planning(sx, sy, syaw, gx, gy, gyaw, ox, oy, xyreso, yawreso):
- sxr, syr = round(sx / xyreso), round(sy / xyreso)
- gxr, gyr = round(gx / xyreso), round(gy / xyreso)
- syawr = round(rs.pi_2_pi(syaw) / yawreso)
- gyawr = round(rs.pi_2_pi(gyaw) / yawreso)
- nstart = Node(sxr, syr, syawr, 1, [sx], [sy], [syaw], [1], 0.0, 0.0, -1)
- ngoal = Node(gxr, gyr, gyawr, 1, [gx], [gy], [gyaw], [1], 0.0, 0.0, -1)
- kdtree = kd.KDTree([[x, y] for x, y in zip(ox, oy)])
- P = calc_parameters(ox, oy, xyreso, yawreso, kdtree)
- hmap = astar.calc_holonomic_heuristic_with_obstacle(ngoal, P.ox, P.oy, P.xyreso, 1.0)
- steer_set, direc_set = calc_motion_set()
- open_set, closed_set = {calc_index(nstart, P): nstart}, {}
- qp = QueuePrior()
- qp.put(calc_index(nstart, P), calc_hybrid_cost(nstart, hmap, P))
- while True:
- if not open_set:
- return None
- ind = qp.get()
- n_curr = open_set[ind]
- closed_set[ind] = n_curr
- open_set.pop(ind)
- update, fpath = update_node_with_analystic_expantion(n_curr, ngoal, P)
- if update:
- fnode = fpath
- break
- for i in range(len(steer_set)):
- node = calc_next_node(n_curr, ind, steer_set[i], direc_set[i], P)
- if not node:
- continue
- node_ind = calc_index(node, P)
- if node_ind in closed_set:
- continue
- if node_ind not in open_set:
- open_set[node_ind] = node
- qp.put(node_ind, calc_hybrid_cost(node, hmap, P))
- else:
- if open_set[node_ind].cost > node.cost:
- open_set[node_ind] = node
- qp.put(node_ind, calc_hybrid_cost(node, hmap, P))
- return extract_path(closed_set, fnode, nstart)
|