|
|
@@ -25,7 +25,7 @@ import sys
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Sampling_based_Planning/")
|
|
|
from rrt_3D.env3D import env
|
|
|
-from rrt_3D.utils3D import getDist, sampleFree, nearest, steer, isCollide, isinside
|
|
|
+from rrt_3D.utils3D import getDist, sampleFree, nearest, steer, isCollide, isinside, isinbound
|
|
|
from rrt_3D.plot_util3D import make_get_proj, draw_block_list, draw_Spheres, draw_obb, draw_line, make_transparent
|
|
|
from rrt_3D.queue import MinheapPQ
|
|
|
|
|
|
@@ -53,63 +53,84 @@ class BIT_star:
|
|
|
self.env = env()
|
|
|
self.xstart, self.xgoal = tuple(self.env.start), tuple(self.env.goal)
|
|
|
self.x0, self.xt = tuple(self.env.start), tuple(self.env.goal)
|
|
|
- self.maxiter = 3000 # used for determining how many batches needed
|
|
|
- # radius calc
|
|
|
- self.eta = 20 # bigger or equal to 1
|
|
|
+ self.maxiter = 5000 # used for determining how many batches needed
|
|
|
+
|
|
|
+ # radius calc parameter:
|
|
|
+ # larger value makes better 1-time-performance, but longer time trade off
|
|
|
+ self.eta = 7 # bigger or equal to 1
|
|
|
+
|
|
|
+ # sampling
|
|
|
self.m = 1000 # number of samples for one time sample
|
|
|
self.d = 3 # dimension we work with
|
|
|
- self.Path = []
|
|
|
|
|
|
- self.edgeCost = {} # corresponding to c
|
|
|
- self.heuristic_edgeCost = {} # correspoinding to c_hat
|
|
|
+ # instance of the cost to come gT
|
|
|
+ self.g = {self.xstart:0, self.xgoal:np.inf}
|
|
|
|
|
|
# draw ellipse
|
|
|
self.show_ellipse = show_ellipse
|
|
|
|
|
|
+ # denote if the path is found
|
|
|
+ self.done = False
|
|
|
+ self.Path = []
|
|
|
+
|
|
|
def run(self):
|
|
|
- self.V = {self.xstart}
|
|
|
- self.E = set()
|
|
|
- self.Parent = {}
|
|
|
- self.T = (self.V, self.E) # tree
|
|
|
- self.Xsamples = {self.xgoal}
|
|
|
- self.QE = set()
|
|
|
- self.QV = set()
|
|
|
- self.r = np.inf
|
|
|
+ self.V = {self.xstart} # node expanded
|
|
|
+ self.E = set() # edge set
|
|
|
+ self.Parent = {} # Parent relation
|
|
|
+ # self.T = (self.V, self.E) # tree
|
|
|
+ self.Xsamples = {self.xgoal} # sampled set
|
|
|
+ self.QE = set() # edges in queue
|
|
|
+ self.QV = set() # nodes in queue
|
|
|
+ self.r = np.inf # radius for evaluation
|
|
|
self.ind = 0
|
|
|
while True:
|
|
|
# for the first round
|
|
|
- print(self.ind)
|
|
|
- print(self.r)
|
|
|
+ print('round '+str(self.ind))
|
|
|
self.visualization()
|
|
|
# print(len(self.V))
|
|
|
if len(self.QE) == 0 and len(self.QV) == 0:
|
|
|
self.Prune(self.g_T(self.xgoal))
|
|
|
self.Xsamples = self.Sample(self.m, self.g_T(self.xgoal)) # sample function
|
|
|
- self.Vold = copy.deepcopy(self.V)
|
|
|
- self.QV = copy.deepcopy(self.V)
|
|
|
- self.r = self.radius(len(self.V) + len(self.Xsamples))
|
|
|
+ self.Xsamples.add(self.xgoal) # adding goal into the sample
|
|
|
+ self.Vold = {v for v in self.V}
|
|
|
+ self.QV = {v for v in self.V}
|
|
|
+ # setting the radius
|
|
|
+ if self.done:
|
|
|
+ self.r = 1
|
|
|
+ else:
|
|
|
+ self.r = self.radius(len(self.V) + len(self.Xsamples))
|
|
|
while self.BestQueueValue(self.QV, mode = 'QV') <= self.BestQueueValue(self.QE, mode = 'QE'):
|
|
|
self.ExpandVertex(self.BestInQueue(self.QV, mode = 'QV'))
|
|
|
(vm, xm) = self.BestInQueue(self.QE, mode = 'QE')
|
|
|
- self.QE.difference_update({(vm, xm)})
|
|
|
+ self.QE.remove((vm, xm))
|
|
|
if self.g_T(vm) + self.c_hat(vm, xm) + self.h_hat(xm) < self.g_T(self.xgoal):
|
|
|
- if self.g_hat(vm) + self.c(vm, xm) + self.h_hat(xm) < self.g_T(self.xgoal):
|
|
|
- if self.g_T(vm) + self.c(vm, xm) < self.g_T(xm):
|
|
|
+ cost = self.c(vm, xm)
|
|
|
+ if self.g_hat(vm) + cost + self.h_hat(xm) < self.g_T(self.xgoal):
|
|
|
+ if self.g_T(vm) + cost < self.g_T(xm):
|
|
|
if xm in self.V:
|
|
|
self.E.difference_update({(v, x) for (v, x) in self.E if x == xm})
|
|
|
else:
|
|
|
- self.Xsamples.difference_update({xm})
|
|
|
+ self.Xsamples.remove(xm)
|
|
|
self.V.add(xm)
|
|
|
self.QV.add(xm)
|
|
|
+ self.g[xm] = self.g[vm] + cost
|
|
|
self.E.add((vm, xm))
|
|
|
- self.Parent[vm] = xm # add parent or update parent
|
|
|
- self.QE.difference_update({(v, x) for (v, x) in self.QE if x == xm and self.g_T(v) + self.c_hat(v, x) >= self.g_T(x)})
|
|
|
-
|
|
|
+ self.Parent[xm] = vm # add parent or update parent
|
|
|
+ self.QE.difference_update({(v, x) for (v, x) in self.QE if x == xm and (self.g_T(v) + self.c_hat(v, xm)) >= self.g_T(xm)})
|
|
|
+
|
|
|
+ # reinitializing sampling
|
|
|
else:
|
|
|
self.QE = set()
|
|
|
self.QV = set()
|
|
|
self.ind += 1
|
|
|
+
|
|
|
+ # if the goal is reached
|
|
|
+ if self.xgoal in self.Parent:
|
|
|
+ print('locating path...')
|
|
|
+ self.done = True
|
|
|
+ self.Path = self.path()
|
|
|
|
|
|
+ # if the iteration is bigger
|
|
|
if self.ind > self.maxiter:
|
|
|
break
|
|
|
return self.T
|
|
|
@@ -133,7 +154,7 @@ class BIT_star:
|
|
|
self.C = C # save to global var
|
|
|
self.xcenter = xcenter
|
|
|
self.L = L
|
|
|
- x2 = set(map(tuple, x[np.array([not isinside(self, state) for state in x])])) # intersection with the state space
|
|
|
+ x2 = set(map(tuple, x[np.array([not isinside(self, state) and isinbound(self.env.boundary, state) for state in x])])) # intersection with the state space
|
|
|
xrand.update(x2)
|
|
|
# if there are samples inside obstacle: recursion
|
|
|
if len(x2) < m:
|
|
|
@@ -166,12 +187,12 @@ class BIT_star:
|
|
|
|
|
|
#----------BIT_star particular
|
|
|
def ExpandVertex(self, v):
|
|
|
- self.QV.difference_update({v})
|
|
|
+ self.QV.remove(v)
|
|
|
Xnear = {x for x in self.Xsamples if getDist(x, v) <= self.r}
|
|
|
- self.QE.update({(v, x) for v in self.V for x in Xnear if self.g_hat(v) + self.c_hat(v, x) + self.h_hat(x) < self.g_T(self.xgoal)})
|
|
|
+ self.QE.update({(v, x) for x in Xnear if self.g_hat(v) + self.c_hat(v, x) + self.h_hat(x) < self.g_T(self.xgoal)})
|
|
|
if v not in self.Vold:
|
|
|
Vnear = {w for w in self.V if getDist(w, v) <= self.r}
|
|
|
- self.QE.update({(v,w) for v in self.V for w in Vnear if \
|
|
|
+ self.QE.update({(v,w) for w in Vnear if \
|
|
|
((v,w) not in self.E) and \
|
|
|
(self.g_hat(v) + self.c_hat(v, w) + self.h_hat(w) < self.g_T(self.xgoal)) and \
|
|
|
(self.g_T(v) + self.c_hat(v, w) < self.g_T(w))})
|
|
|
@@ -207,26 +228,26 @@ class BIT_star:
|
|
|
def BestInQueue(self, inputset, mode):
|
|
|
# returns the best vertex in the vertex queue given this ordering
|
|
|
# mode = 'QE' or 'QV'
|
|
|
- _, best_state = self.find_best(inputset, mode)
|
|
|
- return best_state
|
|
|
+ if mode == 'QV':
|
|
|
+ V = {state: self.g_T(state) + self.h_hat(state) for state in self.QV}
|
|
|
+ if mode == 'QE':
|
|
|
+ V = {state: self.g_T(state[0]) + self.c_hat(state[0], state[1]) + self.h_hat(state[1]) for state in self.QE}
|
|
|
+ if len(V) == 0:
|
|
|
+ print(mode + 'empty')
|
|
|
+ return None
|
|
|
+ return min(V, key = V.get)
|
|
|
|
|
|
def BestQueueValue(self, inputset, mode):
|
|
|
# returns the best value in the vertex queue given this ordering
|
|
|
# mode = 'QE' or 'QV'
|
|
|
- best_val, _ = self.find_best(inputset, mode)
|
|
|
- return best_val
|
|
|
-
|
|
|
- def find_best(self, inputset, mode):
|
|
|
- min_val, min_state = np.inf, None
|
|
|
- for state in inputset:
|
|
|
- if mode == 'QE':
|
|
|
- curr_val = self.g_T(state[0]) + self.c_hat(state[0], state[1]) + self.h_hat(state[1])
|
|
|
- elif mode == 'QV':
|
|
|
- curr_val = self.g_T(state) + self.h_hat(state)
|
|
|
- if curr_val < min_val:
|
|
|
- min_val, min_state = curr_val, state
|
|
|
- return min_val, min_state
|
|
|
-
|
|
|
+ if mode == 'QV':
|
|
|
+ V = {self.g_T(state) + self.h_hat(state) for state in self.QV}
|
|
|
+ if mode == 'QE':
|
|
|
+ V = {self.g_T(state[0]) + self.c_hat(state[0], state[1]) + self.h_hat(state[1]) for state in self.QE}
|
|
|
+ if len(V) == 0:
|
|
|
+ return np.inf
|
|
|
+ return min(V)
|
|
|
+
|
|
|
def g_hat(self, v):
|
|
|
return getDist(self.xstart, v)
|
|
|
|
|
|
@@ -239,42 +260,40 @@ class BIT_star:
|
|
|
|
|
|
def c(self, v, w):
|
|
|
# admissible estimate of the cost of an edge between state v, w
|
|
|
- if (v,w) in self.edgeCost:
|
|
|
- pass
|
|
|
- else:
|
|
|
- collide, dist = isCollide(self, v, w)
|
|
|
- if collide:
|
|
|
- self.edgeCost[(v,w)] = np.inf
|
|
|
- else:
|
|
|
- self.edgeCost[(v,w)] = dist
|
|
|
- return self.edgeCost[(v,w)]
|
|
|
+ collide, dist = isCollide(self, v, w)
|
|
|
+ if collide:
|
|
|
+ return np.inf
|
|
|
+ else:
|
|
|
+ return dist
|
|
|
|
|
|
def c_hat(self, v, w):
|
|
|
# c_hat < c < np.inf
|
|
|
# heuristic estimate of the edge cost, since c is expensive
|
|
|
- if (v,w) in self.heuristic_edgeCost:
|
|
|
- pass
|
|
|
- else:
|
|
|
- self.heuristic_edgeCost[(v,w)] = getDist(v, w)
|
|
|
- return self.heuristic_edgeCost[(v,w)]
|
|
|
+ return getDist(v, w)
|
|
|
|
|
|
def g_T(self, v):
|
|
|
# represent cost-to-come from the start in the tree,
|
|
|
# if the state is not in tree, or unreachable, return inf
|
|
|
- if v in self.Parent:
|
|
|
- cost_to_come = 0
|
|
|
- while v != self.xstart:
|
|
|
- cost_to_come += self.c(v, self.Parent[v])
|
|
|
- v = self.Parent[v]
|
|
|
- return cost_to_come
|
|
|
- elif v == self.xstart:
|
|
|
- return 0
|
|
|
- else:
|
|
|
- return np.inf
|
|
|
+ if v not in self.g:
|
|
|
+ self.g[v] = np.inf
|
|
|
+ return self.g[v]
|
|
|
+
|
|
|
+ def path(self):
|
|
|
+ path = []
|
|
|
+ s = self.xgoal
|
|
|
+ i = 0
|
|
|
+ while s != self.xstart:
|
|
|
+ path.append((s, self.Parent[s]))
|
|
|
+ s = self.Parent[s]
|
|
|
+ if i > self.m:
|
|
|
+ break
|
|
|
+ i += 1
|
|
|
+ return path
|
|
|
|
|
|
def visualization(self):
|
|
|
if self.ind % 20 == 0:
|
|
|
V = np.array(list(self.V))
|
|
|
+ Xsample = np.array(list(self.Xsamples))
|
|
|
edges = list(map(list, self.E))
|
|
|
Path = np.array(self.Path)
|
|
|
start = self.env.start
|
|
|
@@ -305,6 +324,8 @@ class BIT_star:
|
|
|
draw_ellipsoid(ax, self.C, self.L, self.xcenter) # beware, depending on start and goal position, this might be bad for vis
|
|
|
if len(V) > 0:
|
|
|
ax.scatter3D(V[:, 0], V[:, 1], V[:, 2], s=2, color='g', )
|
|
|
+ if len(Xsample) > 0: # plot the sampled points
|
|
|
+ ax.scatter3D(Xsample[:, 0], Xsample[:, 1], Xsample[:, 2], s=2, color='b', )
|
|
|
ax.plot(start[0:1], start[1:2], start[2:], 'go', markersize=7, markeredgecolor='k')
|
|
|
ax.plot(goal[0:1], goal[1:2], goal[2:], 'ro', markersize=7, markeredgecolor='k')
|
|
|
# adjust the aspect ratio
|