Quellcode durchsuchen

unify bfs, dfs, Dijkstra, best_first, Astar

Shilong vor 5 Jahren
Ursprung
Commit
cfed23253d

+ 3 - 1
Search_based_Planning/Search_2D/Astar.py

@@ -11,10 +11,12 @@ import heapq
 sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
                 "/../../Search_based_Planning/")
 
-from Search_based_Planning.Search_2D import plotting, env
+from Search_2D import plotting, env
 
 
 class AStar:
+    """AStar set the cost + heuristics as the priority
+    """
     def __init__(self, s_start, s_goal, heuristic_type):
         self.s_start = s_start
         self.s_goal = s_goal

+ 20 - 83
Search_based_Planning/Search_2D/Best_First.py

@@ -11,116 +11,53 @@ import heapq
 sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
                 "/../../Search_based_Planning/")
 
-from Search_based_Planning.Search_2D import plotting, env
+from Search_2D import plotting, env
+from Search_2D.Astar import AStar
 
 
-class BestFirst:
-    def __init__(self, s_start, s_goal):
-        self.s_start = s_start
-        self.s_goal = s_goal
-
-        self.Env = env.Env()
-        self.plotting = plotting.Plotting(self.s_start, self.s_goal)
-
-        self.u_set = self.Env.motions  # feasible input set
-        self.obs = self.Env.obs  # position of obstacles
-
-        self.OPEN = []  # OPEN set: visited nodes
-        self.CLOSED = []  # CLOSED set / visited order
-        self.PARENT = dict()  # recorded parent
-
+class BestFirst(AStar):
+    """BestFirst set the heuristics as the priority 
+    """
     def searching(self):
         """
-        Best-first Searching
-        :return: planning path, visited order
+        Breadth-first Searching.
+        :return: path, visited order
         """
 
         self.PARENT[self.s_start] = self.s_start
+        self.g[self.s_start] = 0
+        self.g[self.s_goal] = math.inf
         heapq.heappush(self.OPEN,
                        (self.heuristic(self.s_start), self.s_start))
 
         while self.OPEN:
             _, s = heapq.heappop(self.OPEN)
+            self.CLOSED.append(s)
 
             if s == self.s_goal:
                 break
-            self.CLOSED.append(s)
 
             for s_n in self.get_neighbor(s):
-                if self.is_collision(s, s_n):
-                    continue
-
-                if s_n not in self.PARENT:  # node not explored
-                    heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))
-                    self.PARENT[s_n] = s
-
-        return self.extract_path(), self.CLOSED
-
-    def heuristic(self, s):
-        """
-        estimated distance between current state and goal state.
-        :param s: current state
-        :return: Euclidean distance
-        """
-
-        return math.hypot(s[0] - self.s_goal[0], s[1] - self.s_goal[1])
-
-    def get_neighbor(self, s):
-        """
-        find neighbors of state s that not in obstacles.
-        :param s: state
-        :return: neighbors
-        """
-
-        return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
-
-    def is_collision(self, s_start, s_end):
-        """
-        check if the line segment (s_start, s_end) is collision.
-        :param s_start: start node
-        :param s_end: end node
-        :return: True: is collision / False: not collision
-        """
-
-        if s_start in self.obs or s_end in self.obs:
-            return True
-
-        if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
-            if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
-                s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
-                s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
-            else:
-                s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
-                s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
+                new_cost = self.g[s] + self.cost(s, s_n)
 
-            if s1 in self.obs or s2 in self.obs:
-                return True
+                if s_n not in self.g:
+                    self.g[s_n] = math.inf
 
-        return False
-
-    def extract_path(self):
-        """
-        Extract the path based on the relationship of nodes.
-        :return: The planning path
-        """
-
-        path = [self.s_goal]
-        s = self.s_goal
+                if new_cost < self.g[s_n]:  # conditions for updating Cost
+                    self.g[s_n] = new_cost
+                    self.PARENT[s_n] = s
 
-        while True:
-            s = self.PARENT[s]
-            path.append(s)
-            if s == self.s_start:
-                break
+                    # best first set the heuristics as the priority 
+                    heapq.heappush(self.OPEN, (self.heuristic(s_n), s_n))
 
-        return list(path)
+        return self.extract_path(self.PARENT), self.CLOSED
 
 
 def main():
     s_start = (5, 5)
     s_goal = (45, 25)
 
-    BF = BestFirst(s_start, s_goal)
+    BF = BestFirst(s_start, s_goal, 'euclidean')
     plot = plotting.Plotting(s_start, s_goal)
 
     path, visited = BF.searching()

+ 15 - 85
Search_based_Planning/Search_2D/Dijkstra.py

@@ -11,35 +11,25 @@ import heapq
 sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
                 "/../../Search_based_Planning/")
 
-from Search_based_Planning.Search_2D import plotting, env
+from Search_2D import plotting, env
 
+from Search_2D.Astar import AStar
 
-class Dijkstra:
-    def __init__(self, s_start, s_goal):
-        self.s_start = s_start
-        self.s_goal = s_goal
-
-        self.Env = env.Env()
-        self.plotting = plotting.Plotting(self.s_start, self.s_goal)
-
-        self.u_set = self.Env.motions  # feasible input set
-        self.obs = self.Env.obs  # position of obstacles
-
-        self.OPEN = []  # priority queue / OPEN set
-        self.CLOSED = []  # closed set & visited
-        self.PARENT = dict()  # record parent
-        self.g = dict()  # Cost to come
 
+class Dijkstra(AStar):
+    """Dijkstra set the cost as the priority 
+    """
     def searching(self):
         """
-        Dijkstra Searching.
+        Breadth-first Searching.
         :return: path, visited order
         """
 
         self.PARENT[self.s_start] = self.s_start
         self.g[self.s_start] = 0
         self.g[self.s_goal] = math.inf
-        heapq.heappush(self.OPEN, (0, self.s_start))
+        heapq.heappush(self.OPEN,
+                       (0, self.s_start))
 
         while self.OPEN:
             _, s = heapq.heappop(self.OPEN)
@@ -50,85 +40,25 @@ class Dijkstra:
 
             for s_n in self.get_neighbor(s):
                 new_cost = self.g[s] + self.cost(s, s_n)
+
                 if s_n not in self.g:
                     self.g[s_n] = math.inf
-                if new_cost < self.g[s_n]:
+
+                if new_cost < self.g[s_n]:  # conditions for updating Cost
                     self.g[s_n] = new_cost
-                    heapq.heappush(self.OPEN, (new_cost, s_n))
                     self.PARENT[s_n] = s
 
-        return self.extract_path(), self.CLOSED
-
-    def get_neighbor(self, s):
-        """
-        find neighbors of state s that not in obstacles.
-        :param s: state
-        :return: neighbors
-        """
-
-        return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
-
-    def extract_path(self):
-        """
-        Extract the path based on PARENT set.
-        :return: The planning path
-        """
-
-        path = [self.s_goal]
-        s = self.s_goal
-
-        while True:
-            s = self.PARENT[s]
-            path.append(s)
-
-            if s == self.s_start:
-                break
-
-        return list(path)
-
-    def cost(self, s_start, s_goal):
-        """
-        Calculate Cost for this motion
-        :param s_start: starting node
-        :param s_goal: end node
-        :return:  Cost for this motion
-        """
-
-        if self.is_collision(s_start, s_goal):
-            return math.inf
-
-        return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
-
-    def is_collision(self, s_start, s_end):
-        """
-        check if the line segment (s_start, s_end) is collision.
-        :param s_start: start node
-        :param s_end: end node
-        :return: True: is collision / False: not collision
-        """
-
-        if s_start in self.obs or s_end in self.obs:
-            return True
-
-        if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
-            if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
-                s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
-                s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
-            else:
-                s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
-                s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
-
-            if s1 in self.obs or s2 in self.obs:
-                return True
+                    # best first set the heuristics as the priority 
+                    heapq.heappush(self.OPEN, (new_cost, s_n))
 
-        return False
+        return self.extract_path(self.PARENT), self.CLOSED
 
 
 def main():
     s_start = (5, 5)
     s_goal = (45, 25)
 
-    dijkstra = Dijkstra(s_start, s_goal)
+    dijkstra = Dijkstra(s_start, s_goal, 'None')
     plot = plotting.Plotting(s_start, s_goal)
 
     path, visited = dijkstra.searching()

+ 25 - 76
Search_based_Planning/Search_2D/bfs.py

@@ -10,24 +10,14 @@ from collections import deque
 sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
                 "/../../Search_based_Planning/")
 
-from Search_based_Planning.Search_2D import plotting, env
-
-
-class BFS:
-    def __init__(self, s_start, s_goal):
-        self.s_start = s_start
-        self.s_goal = s_goal
-
-        self.Env = env.Env()
-        self.plotting = plotting.Plotting(self.s_start, self.s_goal)
-
-        self.u_set = self.Env.motions  # feasible input set
-        self.obs = self.Env.obs  # position of obstacles
-
-        self.OPEN = deque()  # OPEN set: visited nodes
-        self.PARENT = dict()  # recorded parent
-        self.CLOSED = []  # CLOSED set: explored nodes
-
+from Search_2D import plotting, env
+from Search_2D.Astar import AStar
+import math
+import heapq
+
+class BFS(AStar):
+    """BFS add the new visited node in the end of the openset
+    """
     def searching(self):
         """
         Breadth-first Searching.
@@ -35,81 +25,40 @@ class BFS:
         """
 
         self.PARENT[self.s_start] = self.s_start
-        self.OPEN.append(self.s_start)
+        self.g[self.s_start] = 0
+        self.g[self.s_goal] = math.inf
+        heapq.heappush(self.OPEN,
+                       (0, self.s_start))
 
         while self.OPEN:
-            s = self.OPEN.popleft()
+            _, s = heapq.heappop(self.OPEN)
+            self.CLOSED.append(s)
 
             if s == self.s_goal:
                 break
-            self.CLOSED.append(s)
 
             for s_n in self.get_neighbor(s):
-                if self.is_collision(s, s_n):
-                    continue
-                if s_n not in self.PARENT:  # node not explored
-                    self.OPEN.append(s_n)
-                    self.PARENT[s_n] = s
-
-        return self.extract_path(), self.CLOSED
-
-    def get_neighbor(self, s):
-        """
-        find neighbors of state s that not in obstacles.
-        :param s: state
-        :return: neighbors : [nodes]
-        """
+                new_cost = self.g[s] + self.cost(s, s_n)
 
-        return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
+                if s_n not in self.g:
+                    self.g[s_n] = math.inf
 
-    def is_collision(self, s_start, s_end):
-        """
-        check if the line segment (s_start, s_end) is collision.
-        :param s_start: start node
-        :param s_end: end node
-        :return: True: is collision / False: not collision
-        """
-
-        if s_start in self.obs or s_end in self.obs:
-            return True
-
-        if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
-            if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
-                s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
-                s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
-            else:
-                s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
-                s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
-
-            if s1 in self.obs or s2 in self.obs:
-                return True
-
-        return False
-
-    def extract_path(self):
-        """
-        Extract the path based on the PARENT set.
-        :return: The planning path : [nodes]
-        """
-
-        path = [self.s_goal]
-        s = self.s_goal
-
-        while True:
-            s = self.PARENT[s]
-            path.append(s)
+                if new_cost < self.g[s_n]:  # conditions for updating Cost
+                    self.g[s_n] = new_cost
+                    self.PARENT[s_n] = s
 
-            if s == self.s_start:
-                break
+                    # bfs, add new node to the end of the openset
+                    prior = self.OPEN[-1][0]+1 if len(self.OPEN)>0 else 0
+                    heapq.heappush(self.OPEN, (prior, s_n))
 
-        return list(path)
+        return self.extract_path(self.PARENT), self.CLOSED
 
 
 def main():
     s_start = (5, 5)
     s_goal = (45, 25)
 
-    bfs = BFS(s_start, s_goal)
+    bfs = BFS(s_start, s_goal, 'None')
     plot = plotting.Plotting(s_start, s_goal)
 
     path, visited = bfs.searching()

+ 27 - 81
Search_based_Planning/Search_2D/dfs.py

@@ -1,117 +1,63 @@
-"""
-Depth-first Searching_2D (DFS)
-@author: huiming zhou
-"""
 
 import os
 import sys
-from collections import deque
+import math
+import heapq
 
 sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
                 "/../../Search_based_Planning/")
 
-from Search_based_Planning.Search_2D import plotting, env
-
-
-class DFS:
-    def __init__(self, s_start, s_goal):
-        self.s_start = s_start
-        self.s_goal = s_goal
-
-        self.Env = env.Env()
-        self.plotting = plotting.Plotting(self.s_start, self.s_goal)
-
-        self.u_set = self.Env.motions  # feasible input set
-        self.obs = self.Env.obs  # position of obstacles
-
-        self.OPEN = deque()  # OPEN set: visited nodes
-        self.PARENT = dict()  # recorded parent
-        self.CLOSED = []  # CLOSED set / visited order
+from Search_2D import plotting, env
+from Search_2D.Astar import AStar
 
+class DFS(AStar):
+    """DFS add the new visited node in the front of the openset
+    """
     def searching(self):
         """
-        Depth-first Searching
-        :return: planning path, visited order
+        Breadth-first Searching.
+        :return: path, visited order
         """
 
         self.PARENT[self.s_start] = self.s_start
-        self.OPEN.append(self.s_start)
+        self.g[self.s_start] = 0
+        self.g[self.s_goal] = math.inf
+        heapq.heappush(self.OPEN,
+                       (0, self.s_start))
 
         while self.OPEN:
-            s = self.OPEN.pop()
+            _, s = heapq.heappop(self.OPEN)
+            self.CLOSED.append(s)
 
             if s == self.s_goal:
                 break
-            self.CLOSED.append(s)
 
             for s_n in self.get_neighbor(s):
-                if self.is_collision(s, s_n):
-                    continue
-                if s_n not in self.PARENT:  # node not explored
-                    self.OPEN.append(s_n)
-                    self.PARENT[s_n] = s
-
-        return self.extract_path(), self.CLOSED
-
-    def get_neighbor(self, s):
-        """
-        find neighbors of state s that not in obstacles.
-        :param s: state
-        :return: neighbors : [nodes]
-        """
+                new_cost = self.g[s] + self.cost(s, s_n)
 
-        return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]
+                if s_n not in self.g:
+                    self.g[s_n] = math.inf
 
-    def is_collision(self, s_start, s_end):
-        """
-        check if the line segment (s_start, s_end) is collision.
-        :param s_start: start node
-        :param s_end: end node
-        :return: True: is collision / False: not collision
-        """
-
-        if s_start in self.obs or s_end in self.obs:
-            return True
-
-        if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
-            if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
-                s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
-                s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
-            else:
-                s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
-                s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
-
-            if s1 in self.obs or s2 in self.obs:
-                return True
-
-        return False
-
-    def extract_path(self):
-        """
-        Extract the path based on the relationship of nodes.
-        :return: The planning path
-        """
-
-        path = [self.s_goal]
-        s = self.s_goal
+                if new_cost < self.g[s_n]:  # conditions for updating Cost
+                    self.g[s_n] = new_cost
+                    self.PARENT[s_n] = s
 
-        while True:
-            s = self.PARENT[s]
-            path.append(s)
-            if s == self.s_start:
-                break
+                    # dfs, add new node to the front of the openset
+                    prior = self.OPEN[0][0]-1 if len(self.OPEN)>0 else 0
+                    heapq.heappush(self.OPEN, (prior, s_n))
 
-        return list(path)
+        return self.extract_path(self.PARENT), self.CLOSED
 
 
 def main():
     s_start = (5, 5)
     s_goal = (45, 25)
 
-    dfs = DFS(s_start, s_goal)
+    dfs = DFS(s_start, s_goal, 'None')
     plot = plotting.Plotting(s_start, s_goal)
 
     path, visited = dfs.searching()
+    visited = list(dict.fromkeys(visited))
     plot.animation(path, visited, "Depth-first Searching (DFS)")  # animation