|
|
@@ -5,6 +5,7 @@ A_star 2D
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
+import math
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
|
|
|
"/../../Search-based Planning/")
|
|
|
@@ -15,165 +16,166 @@ from Search_2D import env
|
|
|
|
|
|
|
|
|
class Astar:
|
|
|
- def __init__(self, x_start, x_goal, e, heuristic_type):
|
|
|
- self.xI, self.xG = x_start, x_goal
|
|
|
+ def __init__(self, start, goal, heuristic_type):
|
|
|
+ self.s_start, self.s_goal = start, goal
|
|
|
self.heuristic_type = heuristic_type
|
|
|
|
|
|
- self.Env = env.Env() # class Env
|
|
|
+ self.Env = env.Env() # class Env
|
|
|
|
|
|
- self.e = e # weighted A*: e >= 1
|
|
|
- self.u_set = self.Env.motions # feasible input set
|
|
|
- self.obs = self.Env.obs # position of obstacles
|
|
|
+ self.u_set = self.Env.motions # feasible input set
|
|
|
+ self.obs = self.Env.obs # position of obstacles
|
|
|
|
|
|
- self.g = {self.xI: 0, self.xG: float("inf")} # cost to come
|
|
|
- self.OPEN = queue.QueuePrior() # priority queue / U set
|
|
|
- self.OPEN.put(self.xI, self.fvalue(self.xI))
|
|
|
- self.CLOSED = set() # closed set & visited
|
|
|
- self.VISITED = []
|
|
|
- self.PARENT = {self.xI: self.xI} # relations
|
|
|
+ self.g = {self.s_start: 0, self.s_goal: float("inf")} # cost to come
|
|
|
+ self.OPEN = queue.QueuePrior() # priority queue / OPEN set
|
|
|
+ self.OPEN.put(self.s_start, self.fvalue(self.s_start))
|
|
|
+ self.CLOSED = [] # CLOSED set / VISITED order
|
|
|
+ self.PARENT = {self.s_start: self.s_start}
|
|
|
|
|
|
def searching(self):
|
|
|
"""
|
|
|
- Searching using A_star.
|
|
|
-
|
|
|
- :return: path, order of visited nodes in the planning
|
|
|
+ A_star Searching.
|
|
|
+ :return: path, order of visited nodes
|
|
|
"""
|
|
|
|
|
|
while not self.OPEN.empty():
|
|
|
s = self.OPEN.get()
|
|
|
- self.CLOSED.add(s)
|
|
|
- self.VISITED.append(s)
|
|
|
+ self.CLOSED.append(s)
|
|
|
|
|
|
- if s == self.xG: # stop condition
|
|
|
+ if s == self.s_goal: # stop condition
|
|
|
break
|
|
|
|
|
|
- for u in self.u_set: # explore neighborhoods of current node
|
|
|
- s_next = tuple([s[i] + u[i] for i in range(2)])
|
|
|
- if s_next not in self.obs and s_next not in self.CLOSED:
|
|
|
- new_cost = self.g[s] + self.get_cost(s, u)
|
|
|
- if s_next not in self.g:
|
|
|
- self.g[s_next] = float("inf")
|
|
|
- if new_cost < self.g[s_next]: # conditions for updating cost
|
|
|
- self.g[s_next] = new_cost
|
|
|
- self.PARENT[s_next] = s
|
|
|
- self.OPEN.put(s_next, self.fvalue(s_next))
|
|
|
+ for s_n in self.get_neighbor(s):
|
|
|
+ if s_n not in self.CLOSED:
|
|
|
+ new_cost = self.g[s] + self.cost(s, s_n)
|
|
|
+ if s_n not in self.g:
|
|
|
+ self.g[s_n] = float("inf")
|
|
|
+ if new_cost < self.g[s_n]: # conditions for updating cost
|
|
|
+ self.g[s_n] = new_cost
|
|
|
+ self.PARENT[s_n] = s
|
|
|
+ self.OPEN.put(s_n, self.fvalue(s_n))
|
|
|
|
|
|
- return self.extract_path(self.PARENT), self.VISITED
|
|
|
+ return self.extract_path(self.PARENT), self.CLOSED
|
|
|
|
|
|
- def repeated_Searching(self, xI, xG, e):
|
|
|
+ def repeated_searching(self, e):
|
|
|
path, visited = [], []
|
|
|
|
|
|
while e >= 1:
|
|
|
- p_k, v_k = self.repeated_Astar(xI, xG, e)
|
|
|
+ p_k, v_k = self.repeated_Astar(self.s_start, self.s_goal, e)
|
|
|
path.append(p_k)
|
|
|
visited.append(v_k)
|
|
|
e -= 0.5
|
|
|
|
|
|
return path, visited
|
|
|
|
|
|
- def repeated_Astar(self, xI, xG, e):
|
|
|
- g = {xI: 0, xG: float("inf")}
|
|
|
+ def repeated_Astar(self, s_start, s_goal, e):
|
|
|
+ g = {s_start: 0, s_goal: float("inf")}
|
|
|
OPEN = queue.QueuePrior()
|
|
|
- OPEN.put(xI, g[xI] + e * self.Heuristic(xI))
|
|
|
- CLOSED = set()
|
|
|
- PARENT = {xI: xI}
|
|
|
- VISITED = []
|
|
|
+ OPEN.put(s_start, g[s_start] + e * self.Heuristic(s_start))
|
|
|
+ CLOSED = []
|
|
|
+ PARENT = {s_start: s_start}
|
|
|
|
|
|
while OPEN:
|
|
|
s = OPEN.get()
|
|
|
- CLOSED.add(s)
|
|
|
- VISITED.append(s)
|
|
|
+ CLOSED.append(s)
|
|
|
|
|
|
- if s == xG:
|
|
|
+ if s == s_goal:
|
|
|
break
|
|
|
|
|
|
- for u in self.u_set: # explore neighborhoods of current node
|
|
|
- s_next = tuple([s[i] + u[i] for i in range(2)])
|
|
|
- if s_next not in self.obs and s_next not in CLOSED:
|
|
|
- new_cost = g[s] + self.get_cost(s, u)
|
|
|
- if s_next not in g:
|
|
|
- g[s_next] = float("inf")
|
|
|
- if new_cost < g[s_next]: # conditions for updating cost
|
|
|
- g[s_next] = new_cost
|
|
|
- PARENT[s_next] = s
|
|
|
- OPEN.put(s_next, g[s_next] + e * self.Heuristic(s_next))
|
|
|
+ for s_n in self.get_neighbor(s):
|
|
|
+ if s_n not in CLOSED:
|
|
|
+ new_cost = g[s] + self.cost(s, s_n)
|
|
|
+ if s_n not in g:
|
|
|
+ g[s_n] = float("inf")
|
|
|
+ if new_cost < g[s_n]: # conditions for updating cost
|
|
|
+ g[s_n] = new_cost
|
|
|
+ PARENT[s_n] = s
|
|
|
+ OPEN.put(s_n, g[s_n] + e * self.Heuristic(s_n))
|
|
|
+
|
|
|
+ return self.extract_path(PARENT), CLOSED
|
|
|
+
|
|
|
+ def get_neighbor(self, s):
|
|
|
+ """
|
|
|
+ find neighbors of state s that not in obstacles.
|
|
|
+ :param s: state
|
|
|
+ :return: neighbors
|
|
|
+ """
|
|
|
+
|
|
|
+ s_list = set()
|
|
|
+
|
|
|
+ for u in self.u_set:
|
|
|
+ s_next = tuple([s[i] + u[i] for i in range(2)])
|
|
|
+ if s_next not in self.obs:
|
|
|
+ s_list.add(s_next)
|
|
|
|
|
|
- return self.extract_path(PARENT), VISITED
|
|
|
+ return s_list
|
|
|
|
|
|
- def fvalue(self, x, e=1):
|
|
|
+ def fvalue(self, x):
|
|
|
"""
|
|
|
f = g + h. (g: cost to come, h: heuristic function)
|
|
|
:param x: current state
|
|
|
:return: f
|
|
|
"""
|
|
|
|
|
|
- return self.g[x] + e * self.Heuristic(x)
|
|
|
+ return self.g[x] + self.Heuristic(x)
|
|
|
|
|
|
def extract_path(self, PARENT):
|
|
|
"""
|
|
|
- Extract the path based on the relationship of nodes.
|
|
|
-
|
|
|
+ Extract the path based on the PARENT set.
|
|
|
:return: The planning path
|
|
|
"""
|
|
|
|
|
|
- path_back = [self.xG]
|
|
|
- x_current = self.xG
|
|
|
+ path = [self.s_goal]
|
|
|
+ s = self.s_goal
|
|
|
|
|
|
while True:
|
|
|
- x_current = PARENT[x_current]
|
|
|
- path_back.append(x_current)
|
|
|
+ s = PARENT[s]
|
|
|
+ path.append(s)
|
|
|
|
|
|
- if x_current == self.xI:
|
|
|
+ if s == self.s_start:
|
|
|
break
|
|
|
|
|
|
- return list(path_back)
|
|
|
+ return list(path)
|
|
|
|
|
|
@staticmethod
|
|
|
- def get_cost(x, u):
|
|
|
+ def cost(s_start, s_goal):
|
|
|
"""
|
|
|
Calculate cost for this motion
|
|
|
-
|
|
|
- :param x: current node
|
|
|
- :param u: current input
|
|
|
+ :param s_start: starting node
|
|
|
+ :param s_goal: end node
|
|
|
:return: cost for this motion
|
|
|
:note: cost function could be more complicate!
|
|
|
"""
|
|
|
|
|
|
return 1
|
|
|
|
|
|
- def Heuristic(self, state):
|
|
|
+ def Heuristic(self, s):
|
|
|
"""
|
|
|
Calculate heuristic.
|
|
|
-
|
|
|
- :param state: current node (state)
|
|
|
+ :param s: current node (state)
|
|
|
:return: heuristic function value
|
|
|
"""
|
|
|
|
|
|
- heuristic_type = self.heuristic_type # heuristic type
|
|
|
- goal = self.xG # goal node
|
|
|
+ heuristic_type = self.heuristic_type # heuristic type
|
|
|
+ goal = self.s_goal # goal node
|
|
|
|
|
|
if heuristic_type == "manhattan":
|
|
|
- return abs(goal[0] - state[0]) + abs(goal[1] - state[1])
|
|
|
- elif heuristic_type == "euclidean":
|
|
|
- return ((goal[0] - state[0]) ** 2 + (goal[1] - state[1]) ** 2) ** (1 / 2)
|
|
|
+ return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
|
|
|
else:
|
|
|
- print("Please choose right heuristic type!")
|
|
|
+ return math.hypot(goal[0] - s[0], goal[1] - s[1])
|
|
|
|
|
|
|
|
|
def main():
|
|
|
- x_start = (5, 5)
|
|
|
- x_goal = (45, 25)
|
|
|
+ s_start = (5, 5)
|
|
|
+ s_goal = (45, 25)
|
|
|
|
|
|
- astar = Astar(x_start, x_goal, 1, "euclidean") # weight e = 1
|
|
|
- plot = plotting.Plotting(x_start, x_goal) # class Plotting
|
|
|
+ astar = Astar(s_start, s_goal, "euclidean")
|
|
|
+ plot = plotting.Plotting(s_start, s_goal)
|
|
|
|
|
|
- fig_name = "A*"
|
|
|
path, visited = astar.searching()
|
|
|
- plot.animation(path, visited, fig_name) # animation generate
|
|
|
+ plot.animation(path, visited, "A*") # animation
|
|
|
|
|
|
- # fig_name = "Repeated A*"
|
|
|
- # path, visited = astar.repeated_Searching(x_start, x_goal, 2.5)
|
|
|
- # plot.animation_ara_star(path, visited, fig_name)
|
|
|
+ # path, visited = astar.repeated_searching(2.5) # initial weight e = 2.5
|
|
|
+ # plot.animation_ara_star(path, visited, "Repeated A*")
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|