|
|
@@ -7,6 +7,8 @@ import os
|
|
|
import sys
|
|
|
import math
|
|
|
import numpy as np
|
|
|
+import matplotlib.pyplot as plt
|
|
|
+import matplotlib.patches as patches
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
|
|
"/../../Sampling-based Planning/")
|
|
|
@@ -14,30 +16,33 @@ sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
|
|
from rrt_2D import env
|
|
|
from rrt_2D import plotting
|
|
|
from rrt_2D import utils
|
|
|
+from rrt_2D import queue
|
|
|
|
|
|
|
|
|
class Node:
|
|
|
def __init__(self, n):
|
|
|
self.x = n[0]
|
|
|
self.y = n[1]
|
|
|
- self.cost = 0.0
|
|
|
self.parent = None
|
|
|
|
|
|
|
|
|
class RrtStar:
|
|
|
def __init__(self, x_start, x_goal, step_len,
|
|
|
goal_sample_rate, search_radius, iter_max):
|
|
|
- self.xI = Node(x_start)
|
|
|
- self.xG = Node(x_goal)
|
|
|
+ self.s_start = Node(x_start)
|
|
|
+ self.s_goal = Node(x_goal)
|
|
|
self.step_len = step_len
|
|
|
self.goal_sample_rate = goal_sample_rate
|
|
|
self.search_radius = search_radius
|
|
|
self.iter_max = iter_max
|
|
|
- self.vertex = [self.xI]
|
|
|
+ self.vertex = [self.s_start]
|
|
|
+ self.path = []
|
|
|
+ self.visited = []
|
|
|
|
|
|
self.env = env.Env()
|
|
|
self.plotting = plotting.Plotting(x_start, x_goal)
|
|
|
self.utils = utils.Utils()
|
|
|
+ # self.fig, self.ax = plt.subplots()
|
|
|
|
|
|
self.x_range = self.env.x_range
|
|
|
self.y_range = self.env.y_range
|
|
|
@@ -47,36 +52,29 @@ class RrtStar:
|
|
|
|
|
|
def planning(self):
|
|
|
for k in range(self.iter_max):
|
|
|
- if k % 500 == 0:
|
|
|
- print(k)
|
|
|
-
|
|
|
node_rand = self.generate_random_node(self.goal_sample_rate)
|
|
|
node_near = self.nearest_neighbor(self.vertex, node_rand)
|
|
|
node_new = self.new_state(node_near, node_rand)
|
|
|
|
|
|
+ if k % 500 == 0:
|
|
|
+ print(k)
|
|
|
+
|
|
|
if node_new and not self.utils.is_collision(node_near, node_new):
|
|
|
- self.vertex.append(node_new)
|
|
|
neighbor_index = self.find_near_neighbor(node_new)
|
|
|
+ self.vertex.append(node_new)
|
|
|
+
|
|
|
+ # if k % 20 == 0:
|
|
|
+ # self.visited.append([[[node.x, node.parent.x], [node.y, node.parent.y]]
|
|
|
+ # for node in self.vertex[1: len(self.vertex)]])
|
|
|
+
|
|
|
if neighbor_index:
|
|
|
- node_new = self.choose_parent(node_new, neighbor_index)
|
|
|
- self.vertex.append(node_new)
|
|
|
+ self.choose_parent(node_new, neighbor_index)
|
|
|
self.rewire(node_new, neighbor_index)
|
|
|
|
|
|
index = self.search_goal_parent()
|
|
|
- return self.extract_path(self.vertex[index])
|
|
|
-
|
|
|
- def generate_random_node(self, goal_sample_rate):
|
|
|
- delta = self.utils.delta
|
|
|
-
|
|
|
- if np.random.random() > goal_sample_rate:
|
|
|
- return Node((np.random.uniform(self.x_range[0] + delta, self.x_range[1] - delta),
|
|
|
- np.random.uniform(self.y_range[0] + delta, self.y_range[1] - delta)))
|
|
|
-
|
|
|
- return self.xG
|
|
|
+ self.path = self.extract_path(self.vertex[index])
|
|
|
|
|
|
- def nearest_neighbor(self, node_list, n):
|
|
|
- return self.vertex[int(np.argmin([math.hypot(nd.x - n.x, nd.y - n.y)
|
|
|
- for nd in node_list]))]
|
|
|
+ self.plotting.animation(self.vertex, self.path, "rrt*")
|
|
|
|
|
|
def new_state(self, node_start, node_goal):
|
|
|
dist, theta = self.get_distance_and_angle(node_start, node_goal)
|
|
|
@@ -84,67 +82,149 @@ class RrtStar:
|
|
|
dist = min(self.step_len, dist)
|
|
|
node_new = Node((node_start.x + dist * math.cos(theta),
|
|
|
node_start.y + dist * math.sin(theta)))
|
|
|
+
|
|
|
node_new.parent = node_start
|
|
|
|
|
|
return node_new
|
|
|
|
|
|
- def find_near_neighbor(self, node_new):
|
|
|
- n = len(self.vertex) + 1
|
|
|
- r = min(self.search_radius * math.sqrt((math.log(n) / n)), self.step_len)
|
|
|
-
|
|
|
- dist_table = [math.hypot(nd.x - node_new.x, nd.y - node_new.y) for nd in self.vertex]
|
|
|
- dist_table_index = [dist_table.index(d) for d in dist_table if d <= r]
|
|
|
- dist_table_index = [ind for ind in dist_table_index
|
|
|
- if not self.utils.is_collision(node_new, self.vertex[ind])]
|
|
|
-
|
|
|
- return dist_table_index
|
|
|
-
|
|
|
def choose_parent(self, node_new, neighbor_index):
|
|
|
- cost = []
|
|
|
+ cost = [self.get_new_cost(self.vertex[i], node_new) for i in neighbor_index]
|
|
|
+
|
|
|
+ cost_min_index = neighbor_index[int(np.argmin(cost))]
|
|
|
+ node_new.parent = self.vertex[cost_min_index]
|
|
|
|
|
|
+ def rewire(self, node_new, neighbor_index):
|
|
|
for i in neighbor_index:
|
|
|
node_neighbor = self.vertex[i]
|
|
|
- cost.append(self.get_new_cost(node_neighbor, node_new))
|
|
|
|
|
|
- cost_min_index = neighbor_index[int(np.argmin(cost))]
|
|
|
- node_new = self.new_state(self.vertex[cost_min_index], node_new)
|
|
|
- node_new.cost = min(cost)
|
|
|
-
|
|
|
- return node_new
|
|
|
+ if self.cost(node_neighbor) > self.get_new_cost(node_new, node_neighbor):
|
|
|
+ node_neighbor.parent = node_new
|
|
|
|
|
|
def search_goal_parent(self):
|
|
|
- dist_list = [math.hypot(n.x - self.xG.x, n.y - self.xG.y) for n in self.vertex]
|
|
|
+ dist_list = [math.hypot(n.x - self.s_goal.x, n.y - self.s_goal.y) for n in self.vertex]
|
|
|
node_index = [dist_list.index(i) for i in dist_list if i <= self.step_len]
|
|
|
|
|
|
if node_index:
|
|
|
- cost_list = [dist_list[i] + self.vertex[i].cost for i in node_index
|
|
|
- if not self.utils.is_collision(self.vertex[i], self.xG)]
|
|
|
+ cost_list = [dist_list[i] + self.cost(self.vertex[i]) for i in node_index
|
|
|
+ if not self.utils.is_collision(self.vertex[i], self.s_goal)]
|
|
|
return node_index[int(np.argmin(cost_list))]
|
|
|
|
|
|
- return None
|
|
|
-
|
|
|
- def rewire(self, node_new, neighbor_index):
|
|
|
- for i in neighbor_index:
|
|
|
- node_neighbor = self.vertex[i]
|
|
|
- new_cost = self.get_new_cost(node_new, node_neighbor)
|
|
|
-
|
|
|
- if node_neighbor.cost > new_cost:
|
|
|
- self.vertex[i] = self.new_state(node_new, node_neighbor)
|
|
|
- self.propagate_cost_to_leaves(node_new)
|
|
|
+ return len(self.vertex) - 1
|
|
|
|
|
|
def get_new_cost(self, node_start, node_end):
|
|
|
dist, _ = self.get_distance_and_angle(node_start, node_end)
|
|
|
- return node_start.cost + dist
|
|
|
|
|
|
- def propagate_cost_to_leaves(self, parent_node):
|
|
|
- for node in self.vertex:
|
|
|
- if node.parent == parent_node:
|
|
|
- node.cost = self.get_new_cost(parent_node, node)
|
|
|
- self.propagate_cost_to_leaves(node)
|
|
|
+ return self.cost(node_start) + dist
|
|
|
+
|
|
|
+ def generate_random_node(self, goal_sample_rate):
|
|
|
+ delta = self.utils.delta
|
|
|
+
|
|
|
+ if np.random.random() > goal_sample_rate:
|
|
|
+ return Node((np.random.uniform(self.x_range[0] + delta, self.x_range[1] - delta),
|
|
|
+ np.random.uniform(self.y_range[0] + delta, self.y_range[1] - delta)))
|
|
|
+
|
|
|
+ return self.s_goal
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def nearest_neighbor(node_list, n):
|
|
|
+ return node_list[int(np.argmin([math.hypot(nd.x - n.x, nd.y - n.y)
|
|
|
+ for nd in node_list]))]
|
|
|
+
|
|
|
+ def find_near_neighbor(self, node_new):
|
|
|
+ n = len(self.vertex) + 1
|
|
|
+ r = min(self.search_radius * math.sqrt((math.log(n) / n)), self.step_len)
|
|
|
+
|
|
|
+ dist_table = [math.hypot(nd.x - node_new.x, nd.y - node_new.y) for nd in self.vertex]
|
|
|
+ dist_table_index = [ind for ind in range(len(dist_table)) if dist_table[ind] <= r and
|
|
|
+ not self.utils.is_collision(node_new, self.vertex[ind])]
|
|
|
+
|
|
|
+ return dist_table_index
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def cost(node_p):
|
|
|
+ node = node_p
|
|
|
+ cost = 0.0
|
|
|
+
|
|
|
+ while node.parent:
|
|
|
+ cost += math.hypot(node.x - node.parent.x, node.y - node.parent.y)
|
|
|
+ node = node.parent
|
|
|
+
|
|
|
+ return cost
|
|
|
+
|
|
|
+ def animation(self, name):
|
|
|
+ self.plot_grid(name)
|
|
|
+ plt.pause(4)
|
|
|
+ for edge_set in self.visited:
|
|
|
+ plt.cla()
|
|
|
+
|
|
|
+ self.plot_grid(name)
|
|
|
+ for edges in edge_set:
|
|
|
+ plt.plot(edges[0], edges[1], "-g")
|
|
|
+
|
|
|
+ plt.pause(0.0001)
|
|
|
+
|
|
|
+ if self.path:
|
|
|
+ plt.plot([x[0] for x in self.path], [x[1] for x in self.path], '-r', linewidth=2)
|
|
|
+
|
|
|
+ plt.pause(0.5)
|
|
|
+ plt.show()
|
|
|
+
|
|
|
+ def plot_grid(self, name):
|
|
|
+
|
|
|
+ for (ox, oy, w, h) in self.obs_boundary:
|
|
|
+ self.ax.add_patch(
|
|
|
+ patches.Rectangle(
|
|
|
+ (ox, oy), w, h,
|
|
|
+ edgecolor='black',
|
|
|
+ facecolor='black',
|
|
|
+ fill=True
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ for (ox, oy, w, h) in self.obs_rectangle:
|
|
|
+ self.ax.add_patch(
|
|
|
+ patches.Rectangle(
|
|
|
+ (ox, oy), w, h,
|
|
|
+ edgecolor='black',
|
|
|
+ facecolor='gray',
|
|
|
+ fill=True
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ for (ox, oy, r) in self.obs_circle:
|
|
|
+ self.ax.add_patch(
|
|
|
+ patches.Circle(
|
|
|
+ (ox, oy), r,
|
|
|
+ edgecolor='black',
|
|
|
+ facecolor='gray',
|
|
|
+ fill=True
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ plt.plot(self.s_start.x, self.s_start.y, "bs", linewidth=3)
|
|
|
+ plt.plot(self.s_goal.x, self.s_goal.y, "gs", linewidth=3)
|
|
|
+
|
|
|
+ plt.title(name)
|
|
|
+ plt.axis("equal")
|
|
|
+
|
|
|
+ def update_cost(self, parent_node):
|
|
|
+ OPEN = queue.QueueFIFO()
|
|
|
+ OPEN.put(parent_node)
|
|
|
+
|
|
|
+ while not OPEN.empty():
|
|
|
+ node = OPEN.get()
|
|
|
+
|
|
|
+ if len(node.child) == 0:
|
|
|
+ continue
|
|
|
+
|
|
|
+ for node_c in node.child:
|
|
|
+ node_c.cost = self.get_new_cost(node, node_c)
|
|
|
+ OPEN.put(node_c)
|
|
|
|
|
|
def extract_path(self, node_end):
|
|
|
- path = [[self.xG.x, self.xG.y]]
|
|
|
+ path = [[self.s_goal.x, self.s_goal.y]]
|
|
|
node = node_end
|
|
|
+
|
|
|
while node.parent is not None:
|
|
|
path.append([node.x, node.y])
|
|
|
node = node.parent
|
|
|
@@ -163,13 +243,8 @@ def main():
|
|
|
x_start = (2, 2) # Starting node
|
|
|
x_goal = (49, 24) # Goal node
|
|
|
|
|
|
- rrt_star = RrtStar(x_start, x_goal, 10, 0.10, 20, 10000)
|
|
|
- path = rrt_star.planning()
|
|
|
-
|
|
|
- if path:
|
|
|
- rrt_star.plotting.animation(rrt_star.vertex, path, "RRT*")
|
|
|
- else:
|
|
|
- print("No Path Found!")
|
|
|
+ rrt_star = RrtStar(x_start, x_goal, 10, 0.10, 20, 15000)
|
|
|
+ rrt_star.planning()
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|