yue qi 5 gadi atpakaļ
vecāks
revīzija
5f20059cc3

+ 148 - 0
Sampling_based_Planning/rrt_3D/ABIT_star3D.py

@@ -0,0 +1,148 @@
+# This is Advanced Batched Informed Tree star 3D algorithm 
+# implementation
+"""
+This is ABIT* code for 3D
+@author: yue qi 
+source: M.P.Strub, J.D.Gammel. "Advanced BIT* (ABIT*):
+        Sampling-Based Planning with Advanced Graph-Search Techniques" 
+"""
+
+import numpy as np
+import matplotlib.pyplot as plt
+import time
+import copy
+
+
+import os
+import sys
+
+sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Sampling_based_Planning/")
+from rrt_3D.env3D import env
+from rrt_3D.utils3D import getDist, sampleFree, nearest, steer, isCollide
+from rrt_3D.plot_util3D import make_get_proj, draw_block_list, draw_Spheres, draw_obb, draw_line, make_transparent
+from rrt_3D.queue import MinheapPQ
+
+class ABIT_star:
+
+    def __init__(self):
+        self.env = env()
+        self.xstart, self.xgoal = tuple(self.env.start), tuple(self.env.goal)
+        self.maxiter = 1000
+        self.done = False
+        self.n = 1000# used in radius calc r(q)
+        self.lam = 10 # used in radius calc r(q)
+
+    def run(self):
+        V, E = {self.xstart}, set()
+        T = (V,E)
+        Xunconnected = {self.xgoal}
+        q = len(V) + len(Xunconnected)
+        eps_infl, eps_trunc = np.inf, np.inf
+        Vclosed, Vinconsistent = set(), set()
+        Q = self.expand(self.xstart, T, Xunconnected, np.inf)
+
+        ind = 0
+        while True:
+            if self.is_search_marked_finished():
+                if self.update_approximation(eps_infl, eps_trunc):
+                    T, Xunconnected = self.prune(T, Xunconnected, self.xgoal)
+                    Xunconnected.update(self.sample(m, self.xgoal))
+                    q = len(V) + len(Xunconnected)
+                    Q = self.expand({self.xstart}, T, Xunconnected, self.r(q))
+                else:
+                    Q.update(self.expand(Vinconsistent, T, Xunconnected, self.r(q)))
+                eps_infl = self.update_inflation_factor()
+                eps_trunc = self.update_truncation_factor()
+                Vclosed = set()
+                Vinconsistent = set()
+                self.mark_search_unfinished()
+            else:
+                state_tuple = list(Q)
+                (xp, xc) = state_tuple[np.argmin( [self.g_T[xi] + self.c_hat(xi,xj) + eps_infl * self.h_hat(xj) for (xi,xj) in Q] )]
+                Q = Q.difference({(xp, xc)})
+                if (xp, xc) in E:
+                    if xc in Vclosed:
+                        Vinconsistent.add(xc)
+                    else:
+                        Q.update(self.expand({xc}, T, Xunconnected, self.r(q)))
+                        Vclosed.add(xc)
+                elif eps_trunc * (self.g_T(xp) + self.c_hat(xi, xj) + self.h_hat(xc)) <= self.g_T(self.xgoal):
+                    if self.g_T(xp) + self.c_hat(xp, xc) < self.g_T(xc):
+                        if self.g_T(xp) + self.c(xp, xc) + self.h_hat(xc) < self.g_T(self.xgoal):
+                            if self.g_T(xp) + self.c(xp, xc) < self.g_T(xc):
+                                if xc in V:
+                                    E = E.difference({(xprev, xc)})
+                                else:
+                                    Xunconnected.difference_update({xc})
+                                    V.add(xc)
+                                    E.add((xp, xc))
+                                if xc in Vclosed:
+                                    Vinconsistent.add(xc)
+                                else:
+                                    Q.update(self.expand({xc}, T, Xunconnected, self.r(q)))
+                                    Vclosed.add(xc)
+                else: 
+                    self.mark_search_finished()
+            ind += 1
+            # until stop
+            if ind > self.maxiter:
+                break
+
+    def sample(self, m, xgoal):
+        pass
+
+    def expand(self, set_xi, T, Xunconnected, r):
+        V, E = T
+        Eout = set()
+        for xp in set_xi:
+            Eout.update({(x1, x2) for (x1, x2) in E if x1 == xp})
+            for xc in {x for x in Xunconnected.union(V) if getDist(xp, x) <= r}:
+                if self.g_hat(xp) + self.c_hat(xp, xc) + self.h_hat(xc) <= self.g_T(self.xgoal):
+                    if self.g_hat(xp) + self.c_hat(xp, xc) <= self.g_hat(xc):
+                        Eout.add((xp,xc))
+        return Eout
+
+    def prune(self, T, Xunconnected, xgoal):
+        V, E = T
+        Xunconnected.difference_update({x for x in Xunconnected if self.f_hat(x) >= self.g_T(xgoal)})
+        V.difference_update({x for x in V if self.f_hat(x) > self.g_T(xgoal)})
+        E.difference_update({(xp, xc) for (xp, xc) in E if self.f_hat(xp) > self.g_T(xgoal) or self.f_hat(xc) > self.g_T(xgoal)})
+        Xunconnected.update({xc for (xp, xc) in E if (xp not in V) and (xc in V)})
+        V.difference_update({xc for (xp, xc) in E if (xp not in V) and (xc in V)})
+        T = (V,E)
+        return T, Xunconnected
+
+    def g_hat(self, x):
+        pass
+    
+    def h_hat(self, x):
+        pass
+
+    def c_hat(self, x1, x2):
+        pass
+
+    def f_hat(self, x):
+        pass
+
+    def g_T(self, x):
+        pass
+
+    def r(self, q):
+        return self.eta * (2 * (1 + 1/self.n) * (self.Lambda(self.Xf_hat) / self.Zeta) * (np.log(q) / q)) ** (1 / self.n)
+
+    def Lambda(self, inputset):
+        pass
+
+    def Zeta(self):
+        pass
+
+    def is_search_marked_finished(self):
+        return self.done
+
+    def mark_search_unfinished(self):
+        self.done = False
+        return self.done
+
+    def mark_search_finished(self):
+        self.done = True
+        return self.done

+ 139 - 0
Sampling_based_Planning/rrt_3D/BIT_star3D.py

@@ -0,0 +1,139 @@
+# This is Batched Informed Tree star 3D algorithm 
+# implementation
+"""
+This is ABIT* code for 3D
+@author: yue qi 
+Algorithm 1
+source: Gammell, Jonathan D., Siddhartha S. Srinivasa, and Timothy D. Barfoot. "Batch informed trees (BIT*): 
+        Sampling-based optimal planning via the heuristically guided search of implicit random geometric graphs." 
+        2015 IEEE international conference on robotics and automation (ICRA). IEEE, 2015.  
+and 
+source: Gammell, Jonathan D., Timothy D. Barfoot, and Siddhartha S. Srinivasa. 
+        "Batch Informed Trees (BIT*): Informed asymptotically optimal anytime search." 
+        The International Journal of Robotics Research 39.5 (2020): 543-567.
+"""
+
+import numpy as np
+import matplotlib.pyplot as plt
+import time
+import copy
+
+
+import os
+import sys
+
+sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Sampling_based_Planning/")
+from rrt_3D.env3D import env
+from rrt_3D.utils3D import getDist, sampleFree, nearest, steer, isCollide
+from rrt_3D.plot_util3D import make_get_proj, draw_block_list, draw_Spheres, draw_obb, draw_line, make_transparent
+from rrt_3D.queue import MinheapPQ
+
+class BIT_star:
+
+    def __init__(self):
+        self.env = env()
+        self.xstart, self.xgoal = tuple(self.env.start), tuple(self.env.goal)
+        self.maxiter = 1000
+        # radius calc
+        self.eta = 1 # bigger or equal to 1
+        self.n = 1000
+        self.Xf_hat = 1 # TODO
+        self.nn = 1 # TODO
+
+    def run(self):
+        V = {self.xstart}
+        E = set()
+        T = (V, E) # tree
+        Xsamples = {self.xgoal}
+        QE = set()
+        QV = set()
+        r = np.inf
+        ind = 0
+        while True:
+            if len(QE) == 0 and len(QV) == 0:
+                Xsamples, V, E = self.Prune(self.g_T(self.xgoal), Xsamples, V, E)
+                Vold = copy.deepcopy(V)
+                QV = copy.deepcopy(V)
+                r = self.radius(len(V) + len(Xsamples))
+            while self.BestQueueValue(QV) <= self.BestQueueValue(QE):
+                QV, QE = self.ExpandVertex(self.BestInQueue(QV), QV, QE, Xsamples, Vold, E, V, r)
+            (vm, xm) = self.BestInQueue(QE)
+            QE.difference_update({(vm, xm)})
+            if self.g_T(vm) + self.c_hat(vm, xm) + self.h_hat(xm) < self.g_T(self.xgoal):
+                if self.g_hat(vm) + self.c(vm, xm) + self.h_hat(xm) < self.g_T(self.xgoal):
+                    if self.g_T(vm) + self.c(vm, xm) < self.g_T(xm):
+                        if xm in V:
+                            E.difference_update({(v, x) for (v, x) in E if x == xm})
+                        else:
+                            Xsamples.difference_update({xm})
+                            V.add(xm)
+                            QV.add(xm)
+                        E.add((vm, xm))
+                        QE.difference_update({(v, x) for (v, x) in QE if x == xm and self.g_T(v) + self.c_hat(v, x) >= self.g_T(x)})
+
+            else:
+                QE = set()
+                QV = set()
+            ind += 1
+            if ind > self.maxiter:
+                break
+            return T
+
+    def ExpandVertex(self, v , QV, QE, Xsamples, Vold, E, V, r):
+        QV.difference_update({v})
+        Xnear = {x for x in Xsamples if getDist(x, v) <= r}
+        QE = {(v, x) for v in V for x in Xnear if self.g_hat(v) + self.c_hat(v, x) + self.h_hat(x) < self.g_T(self.xgoal)}
+        if v not in Vold:
+            Vnear = {w for w in V if getDist(w, v) <= r}
+            QE.update({(v,w) for v in V for w in Vnear if \
+                ((v,w) not in E) and \
+                (self.g_hat(v) + self.c_hat(v, w) + self.h_hat(w) < self.g_T(self.xgoal)) and \
+                (self.g_T(v) + self.c_hat(v, w) < self.g_T(w))})
+        return QV, QE
+
+    def Prune(self, c, Xsamples, V, E):
+        Xsamples = {x for x in Xsamples if self.f_hat(x) >= c}
+        V.difference_update({v for v in V if self.f_hat(v) >=c})
+        E.difference_update({(v, w) for (v, w) in E if (self.f_hat(v) > c) or (self.f_hat(w) > c)})
+        Xsamples.update({v for v in V if self.g_T(v) == np.inf})
+        V.difference_update({v for v in V if self.g_T(v) == np.inf})
+        return Xsamples, V, E
+
+    def radius(self, q):
+        return 2 * self.eta * (1 + 1/self.n) ** (1/self.n) * \
+            (self.Lambda(self.Xf_hat) / self.Zeta ) ** (1/self.n) * \
+            (np.log(q) / q) ** (1/self.n)
+
+    def Lambda(self, inputset):
+        # lebesgue measure of a set, defined as 
+        # mu: L(Rn) --> [0, inf], e.g. volume
+        pass 
+
+    def Zeta(self):
+        # unit ball
+        pass
+
+    def BestInQueue(self, inputset):
+        pass
+
+    def BestQueueValue(self, inputset):
+        pass
+
+    def g_hat(self, v):
+        pass
+
+    def c(self, v, w):
+        pass
+
+    def c_hat(self, v, w):
+        pass
+
+    def f_hat(self, v):
+        pass
+
+    def h_hat(self, v):
+        pass
+
+    def g_T(self, v):
+        pass
+    

+ 4 - 4
Sampling_based_Planning/rrt_3D/FMT_star3D.py

@@ -22,14 +22,14 @@ from rrt_3D.queue import MinheapPQ
 
 class FMT_star:
 
-    def __init__(self):
+    def __init__(self, radius = 1, n = 1000):
         self.env = env()
         # init start and goal
             # note that the xgoal could be a region since this algorithm is a multiquery method
         self.xinit, self.xgoal = tuple(self.env.start), tuple(self.env.goal)
         self.x0, self.xt = tuple(self.env.start), tuple(self.env.goal) # used for sample free
-        self.n = 1000 # number of samples
-        self.radius = 2.5 # radius of the ball
+        self.n = n # number of samples
+        self.radius = radius # radius of the ball
         # self.radius = 40 * np.sqrt((np.log(self.n) / self.n))
         # sets
         self.Vopen, self.Vopen_queue, self.Vclosed, self.V, self.Vunvisited, self.c = self.initNodeSets()
@@ -188,7 +188,7 @@ class FMT_star:
             plt.pause(0.0001)
 
 if __name__ == '__main__':
-    A = FMT_star()
+    A = FMT_star(radius = 1, n = 3000)
     A.FMTrun()
 
 

+ 0 - 0
Sampling_based_Planning/rrt_3D/rrt_star_smart3D.py