|
@@ -6,6 +6,7 @@ RTAAstar 2D (Real-time Adaptive A*)
|
|
|
import os
|
|
import os
|
|
|
import sys
|
|
import sys
|
|
|
import copy
|
|
import copy
|
|
|
|
|
+import math
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
|
|
"/../../Search-based Planning/")
|
|
"/../../Search-based Planning/")
|
|
@@ -16,8 +17,8 @@ from Search_2D import env
|
|
|
|
|
|
|
|
|
|
|
|
|
class RtaAstar:
|
|
class RtaAstar:
|
|
|
- def __init__(self, x_start, x_goal, N, heuristic_type):
|
|
|
|
|
- self.xI, self.xG = x_start, x_goal
|
|
|
|
|
|
|
+ def __init__(self, s_start, s_goal, N, heuristic_type):
|
|
|
|
|
+ self.s_start, self.s_goal = s_start, s_goal
|
|
|
self.heuristic_type = heuristic_type
|
|
self.heuristic_type = heuristic_type
|
|
|
|
|
|
|
|
self.Env = env.Env()
|
|
self.Env = env.Env()
|
|
@@ -35,7 +36,7 @@ class RtaAstar:
|
|
|
self.h_table[(i, j)] = self.h((i, j)) # initialize h_value
|
|
self.h_table[(i, j)] = self.h((i, j)) # initialize h_value
|
|
|
|
|
|
|
|
def searching(self):
|
|
def searching(self):
|
|
|
- s_start = self.xI # initialize start node
|
|
|
|
|
|
|
+ s_start = self.s_start # initialize start node
|
|
|
|
|
|
|
|
while True:
|
|
while True:
|
|
|
OPEN, CLOSED, g_table, PARENT = \
|
|
OPEN, CLOSED, g_table, PARENT = \
|
|
@@ -59,29 +60,12 @@ class RtaAstar:
|
|
|
for (_, x) in OPEN.enumerate():
|
|
for (_, x) in OPEN.enumerate():
|
|
|
v_open[x] = g_table[PARENT[x]] + 1 + self.h_table[x]
|
|
v_open[x] = g_table[PARENT[x]] + 1 + self.h_table[x]
|
|
|
s_open = min(v_open, key=v_open.get)
|
|
s_open = min(v_open, key=v_open.get)
|
|
|
- f_min = min(v_open.values())
|
|
|
|
|
|
|
+ f_min = v_open[s_open]
|
|
|
for x in CLOSED:
|
|
for x in CLOSED:
|
|
|
h_value[x] = f_min - g_table[x]
|
|
h_value[x] = f_min - g_table[x]
|
|
|
|
|
|
|
|
return s_open, h_value
|
|
return s_open, h_value
|
|
|
|
|
|
|
|
- def extract_path_in_CLOSE(self, s_end, s_start, h_value):
|
|
|
|
|
- path = [s_start]
|
|
|
|
|
- s = s_start
|
|
|
|
|
-
|
|
|
|
|
- while True:
|
|
|
|
|
- h_list = {}
|
|
|
|
|
- for u in self.u_set:
|
|
|
|
|
- s_next = tuple([s[i] + u[i] for i in range(2)])
|
|
|
|
|
- if s_next not in self.obs and s_next in h_value:
|
|
|
|
|
- h_list[s_next] = h_value[s_next]
|
|
|
|
|
- s_key = max(h_list, key=h_list.get) # move to the smallest node with min h_value
|
|
|
|
|
- path.append(s_key) # generate path
|
|
|
|
|
- s = s_key # use end of this iteration as the start of next
|
|
|
|
|
-
|
|
|
|
|
- if s_key == s_end: # reach the expected node in U set
|
|
|
|
|
- return s_start, list(reversed(path))
|
|
|
|
|
-
|
|
|
|
|
def iteration(self, CLOSED):
|
|
def iteration(self, CLOSED):
|
|
|
h_value = {}
|
|
h_value = {}
|
|
|
|
|
|
|
@@ -92,13 +76,11 @@ class RtaAstar:
|
|
|
h_value_rec = copy.deepcopy(h_value)
|
|
h_value_rec = copy.deepcopy(h_value)
|
|
|
for s in CLOSED:
|
|
for s in CLOSED:
|
|
|
h_list = []
|
|
h_list = []
|
|
|
- for u in self.u_set:
|
|
|
|
|
- s_next = tuple([s[i] + u[i] for i in range(2)])
|
|
|
|
|
- if s_next not in self.obs:
|
|
|
|
|
- if s_next not in CLOSED:
|
|
|
|
|
- h_list.append(self.get_cost(s, s_next) + self.h_table[s_next])
|
|
|
|
|
- else:
|
|
|
|
|
- h_list.append(self.get_cost(s, s_next) + h_value[s_next])
|
|
|
|
|
|
|
+ for s_n in self.get_neighbor(s):
|
|
|
|
|
+ if s_n not in CLOSED:
|
|
|
|
|
+ h_list.append(self.cost(s, s_n) + self.h_table[s_n])
|
|
|
|
|
+ else:
|
|
|
|
|
+ h_list.append(self.cost(s, s_n) + h_value[s_n])
|
|
|
h_value[s] = min(h_list) # update h_value of current node
|
|
h_value[s] = min(h_list) # update h_value of current node
|
|
|
|
|
|
|
|
if h_value == h_value_rec: # h_value table converged
|
|
if h_value == h_value_rec: # h_value table converged
|
|
@@ -107,77 +89,106 @@ class RtaAstar:
|
|
|
def Astar(self, x_start, N):
|
|
def Astar(self, x_start, N):
|
|
|
OPEN = queue.QueuePrior() # U set
|
|
OPEN = queue.QueuePrior() # U set
|
|
|
OPEN.put(x_start, self.h_table[x_start])
|
|
OPEN.put(x_start, self.h_table[x_start])
|
|
|
- CLOSED = set() # CLOSED set
|
|
|
|
|
- g_table = {x_start: 0, self.xG: float("inf")} # cost to come
|
|
|
|
|
|
|
+ CLOSED = [] # CLOSED set
|
|
|
|
|
+ g_table = {x_start: 0, self.s_goal: float("inf")} # cost to come
|
|
|
PARENT = {x_start: x_start} # relations
|
|
PARENT = {x_start: x_start} # relations
|
|
|
- visited = [] # order of visited nodes
|
|
|
|
|
count = 0 # counter
|
|
count = 0 # counter
|
|
|
|
|
|
|
|
while not OPEN.empty():
|
|
while not OPEN.empty():
|
|
|
count += 1
|
|
count += 1
|
|
|
s = OPEN.get()
|
|
s = OPEN.get()
|
|
|
- CLOSED.add(s)
|
|
|
|
|
- visited.append(s)
|
|
|
|
|
|
|
+ CLOSED.append(s)
|
|
|
|
|
|
|
|
- if s == self.xG: # reach the goal node
|
|
|
|
|
- self.visited.append(visited)
|
|
|
|
|
|
|
+ if s == self.s_goal: # reach the goal node
|
|
|
|
|
+ self.visited.append(CLOSED)
|
|
|
return "FOUND", self.extract_path(x_start, PARENT), [], []
|
|
return "FOUND", self.extract_path(x_start, PARENT), [], []
|
|
|
|
|
|
|
|
- for u in self.u_set:
|
|
|
|
|
- s_next = tuple([s[i] + u[i] for i in range(len(s))])
|
|
|
|
|
- if s_next not in self.obs and s_next not in CLOSED:
|
|
|
|
|
- new_cost = g_table[s] + self.get_cost(s, u)
|
|
|
|
|
- if s_next not in g_table:
|
|
|
|
|
- g_table[s_next] = float("inf")
|
|
|
|
|
- if new_cost < g_table[s_next]: # conditions for updating cost
|
|
|
|
|
- g_table[s_next] = new_cost
|
|
|
|
|
- PARENT[s_next] = s
|
|
|
|
|
- OPEN.put(s_next, g_table[s_next] + self.h_table[s_next])
|
|
|
|
|
|
|
+ for s_n in self.get_neighbor(s):
|
|
|
|
|
+ if s_n not in CLOSED:
|
|
|
|
|
+ new_cost = g_table[s] + self.cost(s, s_n)
|
|
|
|
|
+ if s_n not in g_table:
|
|
|
|
|
+ g_table[s_n] = float("inf")
|
|
|
|
|
+ if new_cost < g_table[s_n]: # conditions for updating cost
|
|
|
|
|
+ g_table[s_n] = new_cost
|
|
|
|
|
+ PARENT[s_n] = s
|
|
|
|
|
+ OPEN.put(s_n, g_table[s_n] + self.h_table[s_n])
|
|
|
|
|
|
|
|
if count == N: # expand needed CLOSED nodes
|
|
if count == N: # expand needed CLOSED nodes
|
|
|
break
|
|
break
|
|
|
|
|
|
|
|
- self.visited.append(visited) # visited nodes in each iteration
|
|
|
|
|
|
|
+ self.visited.append(CLOSED) # visited nodes in each iteration
|
|
|
|
|
|
|
|
return OPEN, CLOSED, g_table, PARENT
|
|
return OPEN, CLOSED, g_table, PARENT
|
|
|
|
|
|
|
|
|
|
+ def get_neighbor(self, s):
|
|
|
|
|
+ """
|
|
|
|
|
+ find neighbors of state s that not in obstacles.
|
|
|
|
|
+ :param s: state
|
|
|
|
|
+ :return: neighbors
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ s_list = set()
|
|
|
|
|
+
|
|
|
|
|
+ for u in self.u_set:
|
|
|
|
|
+ s_next = tuple([s[i] + u[i] for i in range(2)])
|
|
|
|
|
+ if s_next not in self.obs:
|
|
|
|
|
+ s_list.add(s_next)
|
|
|
|
|
+
|
|
|
|
|
+ return s_list
|
|
|
|
|
+
|
|
|
|
|
+ def extract_path_in_CLOSE(self, s_end, s_start, h_value):
|
|
|
|
|
+ path = [s_start]
|
|
|
|
|
+ s = s_start
|
|
|
|
|
+
|
|
|
|
|
+ while True:
|
|
|
|
|
+ h_list = {}
|
|
|
|
|
+ for s_n in self.get_neighbor(s):
|
|
|
|
|
+ if s_n in h_value:
|
|
|
|
|
+ h_list[s_n] = h_value[s_n]
|
|
|
|
|
+ s_key = max(h_list, key=h_list.get) # move to the smallest node with min h_value
|
|
|
|
|
+ path.append(s_key) # generate path
|
|
|
|
|
+ s = s_key # use end of this iteration as the start of next
|
|
|
|
|
+
|
|
|
|
|
+ if s_key == s_end: # reach the expected node in U set
|
|
|
|
|
+ return s_start, list(reversed(path))
|
|
|
|
|
+
|
|
|
def extract_path(self, x_start, parent):
|
|
def extract_path(self, x_start, parent):
|
|
|
"""
|
|
"""
|
|
|
Extract the path based on the relationship of nodes.
|
|
Extract the path based on the relationship of nodes.
|
|
|
-
|
|
|
|
|
:return: The planning path
|
|
:return: The planning path
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
- path_back = [self.xG]
|
|
|
|
|
- x_current = self.xG
|
|
|
|
|
|
|
+ path = [self.s_goal]
|
|
|
|
|
+ s = self.s_goal
|
|
|
|
|
|
|
|
while True:
|
|
while True:
|
|
|
- x_current = parent[x_current]
|
|
|
|
|
- path_back.append(x_current)
|
|
|
|
|
-
|
|
|
|
|
- if x_current == x_start:
|
|
|
|
|
|
|
+ s = parent[s]
|
|
|
|
|
+ path.append(s)
|
|
|
|
|
+ if s == x_start:
|
|
|
break
|
|
break
|
|
|
|
|
|
|
|
- return list(reversed(path_back))
|
|
|
|
|
|
|
+ return list(reversed(path))
|
|
|
|
|
|
|
|
def h(self, s):
|
|
def h(self, s):
|
|
|
- heuristic_type = self.heuristic_type
|
|
|
|
|
- goal = self.xG
|
|
|
|
|
|
|
+ """
|
|
|
|
|
+ Calculate heuristic.
|
|
|
|
|
+ :param s: current node (state)
|
|
|
|
|
+ :return: heuristic function value
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ heuristic_type = self.heuristic_type # heuristic type
|
|
|
|
|
+ goal = self.s_goal # goal node
|
|
|
|
|
|
|
|
if heuristic_type == "manhattan":
|
|
if heuristic_type == "manhattan":
|
|
|
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
|
|
return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
|
|
|
- elif heuristic_type == "euclidean":
|
|
|
|
|
- return ((goal[0] - s[0]) ** 2 + (goal[1] - s[1]) ** 2) ** (1 / 2)
|
|
|
|
|
else:
|
|
else:
|
|
|
- print("Please choose right heuristic type!")
|
|
|
|
|
|
|
+ return math.hypot(goal[0] - s[0], goal[1] - s[1])
|
|
|
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def get_cost(x, u):
|
|
|
|
|
|
|
+ def cost(self, s_start, s_end):
|
|
|
"""
|
|
"""
|
|
|
Calculate cost for this motion
|
|
Calculate cost for this motion
|
|
|
-
|
|
|
|
|
- :param x: current node
|
|
|
|
|
- :param u: input
|
|
|
|
|
|
|
+ :param s_start: starting node
|
|
|
|
|
+ :param s_end: end node
|
|
|
:return: cost for this motion
|
|
:return: cost for this motion
|
|
|
:note: cost function could be more complicate!
|
|
:note: cost function could be more complicate!
|
|
|
"""
|
|
"""
|
|
@@ -186,15 +197,15 @@ class RtaAstar:
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
def main():
|
|
|
- x_start = (10, 5)
|
|
|
|
|
- x_goal = (45, 25)
|
|
|
|
|
|
|
+ s_start = (10, 5)
|
|
|
|
|
+ s_goal = (45, 25)
|
|
|
|
|
|
|
|
- rtaa = RtaAstar(x_start, x_goal, 200, "euclidean")
|
|
|
|
|
- plot = plotting.Plotting(x_start, x_goal)
|
|
|
|
|
- fig_name = "Real-time Adaptive A* (RTAA*)"
|
|
|
|
|
|
|
+ rtaa = RtaAstar(s_start, s_goal, 220, "euclidean")
|
|
|
|
|
+ plot = plotting.Plotting(s_start, s_goal)
|
|
|
|
|
|
|
|
rtaa.searching()
|
|
rtaa.searching()
|
|
|
- plot.animation_lrta(rtaa.path, rtaa.visited, fig_name)
|
|
|
|
|
|
|
+ plot.animation_lrta(rtaa.path, rtaa.visited,
|
|
|
|
|
+ "Real-time Adaptive A* (RTAA*)")
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|