|
|
@@ -10,39 +10,40 @@ from collections import defaultdict
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Search-based Planning/")
|
|
|
from Search_3D.env3D import env
|
|
|
from Search_3D.utils3D import getDist, heuristic_fun, getNearest, isinbound, \
|
|
|
- cost, children, StateSpace
|
|
|
+ cost, children, StateSpace
|
|
|
from Search_3D.plot_util3D import visualization
|
|
|
from Search_3D import queue
|
|
|
import time
|
|
|
|
|
|
+
|
|
|
class Anytime_Dstar(object):
|
|
|
|
|
|
def __init__(self, resolution=1):
|
|
|
self.Alldirec = {(1, 0, 0): 1, (0, 1, 0): 1, (0, 0, 1): 1, \
|
|
|
- (-1, 0, 0): 1, (0, -1, 0): 1, (0, 0, -1): 1, \
|
|
|
- (1, 1, 0): np.sqrt(2), (1, 0, 1): np.sqrt(2), (0, 1, 1): np.sqrt(2), \
|
|
|
- (-1, -1, 0): np.sqrt(2), (-1, 0, -1): np.sqrt(2), (0, -1, -1): np.sqrt(2), \
|
|
|
- (1, -1, 0): np.sqrt(2), (-1, 1, 0): np.sqrt(2), (1, 0, -1): np.sqrt(2), \
|
|
|
- (-1, 0, 1): np.sqrt(2), (0, 1, -1): np.sqrt(2), (0, -1, 1): np.sqrt(2), \
|
|
|
- (1, 1, 1): np.sqrt(3), (-1, -1, -1) : np.sqrt(3), \
|
|
|
- (1, -1, -1): np.sqrt(3), (-1, 1, -1): np.sqrt(3), (-1, -1, 1): np.sqrt(3), \
|
|
|
- (1, 1, -1): np.sqrt(3), (1, -1, 1): np.sqrt(3), (-1, 1, 1): np.sqrt(3)}
|
|
|
+ (-1, 0, 0): 1, (0, -1, 0): 1, (0, 0, -1): 1, \
|
|
|
+ (1, 1, 0): np.sqrt(2), (1, 0, 1): np.sqrt(2), (0, 1, 1): np.sqrt(2), \
|
|
|
+ (-1, -1, 0): np.sqrt(2), (-1, 0, -1): np.sqrt(2), (0, -1, -1): np.sqrt(2), \
|
|
|
+ (1, -1, 0): np.sqrt(2), (-1, 1, 0): np.sqrt(2), (1, 0, -1): np.sqrt(2), \
|
|
|
+ (-1, 0, 1): np.sqrt(2), (0, 1, -1): np.sqrt(2), (0, -1, 1): np.sqrt(2), \
|
|
|
+ (1, 1, 1): np.sqrt(3), (-1, -1, -1): np.sqrt(3), \
|
|
|
+ (1, -1, -1): np.sqrt(3), (-1, 1, -1): np.sqrt(3), (-1, -1, 1): np.sqrt(3), \
|
|
|
+ (1, 1, -1): np.sqrt(3), (1, -1, 1): np.sqrt(3), (-1, 1, 1): np.sqrt(3)}
|
|
|
self.env = env(resolution=resolution)
|
|
|
- self.settings = 'CollisionChecking' # for collision checking
|
|
|
+ self.settings = 'CollisionChecking' # for collision checking
|
|
|
self.x0, self.xt = tuple(self.env.start), tuple(self.env.goal)
|
|
|
self.OPEN = queue.MinheapPQ()
|
|
|
- self.g = {} # all g initialized at inf
|
|
|
+ self.g = {} # all g initialized at inf
|
|
|
self.h = {}
|
|
|
- self.rhs = {self.xt:0} # rhs(x0) = 0
|
|
|
+ self.rhs = {self.xt: 0} # rhs(x0) = 0
|
|
|
self.OPEN.put(self.xt, self.key(self.xt))
|
|
|
self.INCONS = set()
|
|
|
self.CLOSED = set()
|
|
|
-
|
|
|
+
|
|
|
# init children set:
|
|
|
self.CHILDREN = {}
|
|
|
# init cost set
|
|
|
self.COST = defaultdict(lambda: defaultdict(dict))
|
|
|
-
|
|
|
+
|
|
|
# for visualization
|
|
|
self.V = set() # vertice in closed
|
|
|
self.ind = 0
|
|
|
@@ -57,7 +58,7 @@ class Anytime_Dstar(object):
|
|
|
def getcost(self, xi, xj):
|
|
|
# use a LUT for getting the costd
|
|
|
if xi not in self.COST:
|
|
|
- for (xj,xjcost) in children(self, xi, settings=1):
|
|
|
+ for (xj, xjcost) in children(self, xi, settings=1):
|
|
|
self.COST[xi][xj] = cost(self, xi, xj, xjcost)
|
|
|
# this might happen when there is a node changed.
|
|
|
if xj not in self.COST[xi]:
|
|
|
@@ -86,31 +87,33 @@ class Anytime_Dstar(object):
|
|
|
self.rhs[xi] = np.inf
|
|
|
return self.rhs[xi]
|
|
|
|
|
|
- def updatecost(self,range_changed=None, new=None, old=None, mode=False):
|
|
|
+ def updatecost(self, range_changed=None, new=None, old=None, mode=False):
|
|
|
# scan graph for changed cost, if cost is changed update it
|
|
|
CHANGED = set()
|
|
|
for xi in self.CLOSED:
|
|
|
if xi in self.CHILDREN:
|
|
|
- oldchildren = self.CHILDREN[xi]# A
|
|
|
+ oldchildren = self.CHILDREN[xi] # A
|
|
|
if isinbound(old, xi, mode) or isinbound(new, xi, mode):
|
|
|
- newchildren = set(children(self,xi))# B
|
|
|
+ newchildren = set(children(self, xi)) # B
|
|
|
removed = oldchildren.difference(newchildren)
|
|
|
intersection = oldchildren.intersection(newchildren)
|
|
|
added = newchildren.difference(oldchildren)
|
|
|
+ self.CHILDREN[xi] = newchildren
|
|
|
for xj in removed:
|
|
|
self.COST[xi][xj] = cost(self, xi, xj)
|
|
|
for xj in intersection.union(added):
|
|
|
self.COST[xi][xj] = cost(self, xi, xj)
|
|
|
CHANGED.add(xi)
|
|
|
- else:
|
|
|
+ else:
|
|
|
if isinbound(old, xi, mode) or isinbound(new, xi, mode):
|
|
|
CHANGED.add(xi)
|
|
|
- children_added = set(children(self,xi))
|
|
|
+ children_added = set(children(self, xi))
|
|
|
self.CHILDREN[xi] = children_added
|
|
|
for xj in children_added:
|
|
|
self.COST[xi][xj] = cost(self, xi, xj)
|
|
|
return CHANGED
|
|
|
-#--------------main functions for Anytime D star
|
|
|
+
|
|
|
+ # --------------main functions for Anytime D star
|
|
|
|
|
|
def key(self, s, epsilon=1):
|
|
|
if self.getg(s) > self.getrhs(s):
|
|
|
@@ -120,10 +123,10 @@ class Anytime_Dstar(object):
|
|
|
|
|
|
def UpdateState(self, s):
|
|
|
if s not in self.CLOSED:
|
|
|
- # TODO if s is not visited before
|
|
|
+ # TODO if s is not visited before
|
|
|
self.g[s] = np.inf
|
|
|
if s != self.xt:
|
|
|
- self.rhs[s] = min([self.getcost(s, s_p) + self.getg(s_p) for s_p in self.getchildren(s)])
|
|
|
+ self.rhs[s] = min([self.getcost(s, s_p) + self.getg(s_p) for s_p in self.getchildren(s)])
|
|
|
self.OPEN.check_remove(s)
|
|
|
if self.getg(s) != self.getrhs(s):
|
|
|
if s not in self.CLOSED:
|
|
|
@@ -132,7 +135,7 @@ class Anytime_Dstar(object):
|
|
|
self.INCONS.add(s)
|
|
|
|
|
|
def ComputeorImprovePath(self):
|
|
|
- while self.OPEN.top_key() < self.key(self.x0,self.epsilon) or self.rhs[self.x0] != self.g[self.x0]:
|
|
|
+ while self.OPEN.top_key() < self.key(self.x0, self.epsilon) or self.rhs[self.x0] != self.g[self.x0]:
|
|
|
s = self.OPEN.get()
|
|
|
|
|
|
if getDist(s, tuple(self.env.start)) < self.env.resolution:
|
|
|
@@ -156,7 +159,7 @@ class Anytime_Dstar(object):
|
|
|
islargelychanged = False
|
|
|
t = 0
|
|
|
self.ComputeorImprovePath()
|
|
|
- #TODO publish current epsilon sub-optimal solution
|
|
|
+ # TODO publish current epsilon sub-optimal solution
|
|
|
self.done = True
|
|
|
self.ind = 0
|
|
|
self.Path = self.path()
|
|
|
@@ -167,7 +170,7 @@ class Anytime_Dstar(object):
|
|
|
break
|
|
|
# change environment
|
|
|
# new2,old2 = self.env.move_block(theta = [0,0,0.1*t], mode='rotation')
|
|
|
- new2,old2 = self.env.move_block(a = [0,0,-0.2], mode='translation')
|
|
|
+ new2, old2 = self.env.move_block(a=[0, 0, -0.2], mode='translation')
|
|
|
ischanged = True
|
|
|
# islargelychanged = True
|
|
|
self.Path = []
|
|
|
@@ -181,23 +184,23 @@ class Anytime_Dstar(object):
|
|
|
self.ComputeorImprovePath()
|
|
|
ischanged = False
|
|
|
|
|
|
- if islargelychanged:
|
|
|
- self.epsilon += self.increment # or replan from scratch
|
|
|
+ if islargelychanged:
|
|
|
+ self.epsilon += self.increment # or replan from scratch
|
|
|
elif self.epsilon > 1:
|
|
|
self.epsilon -= self.decrement
|
|
|
-
|
|
|
+
|
|
|
# move states from the INCONS to OPEN
|
|
|
# update priorities in OPEN
|
|
|
Allnodes = self.INCONS.union(self.OPEN.allnodes())
|
|
|
for node in Allnodes:
|
|
|
- self.OPEN.put(node, self.key(node, self.epsilon))
|
|
|
+ self.OPEN.put(node, self.key(node, self.epsilon))
|
|
|
self.INCONS = set()
|
|
|
self.CLOSED = set()
|
|
|
self.ComputeorImprovePath()
|
|
|
- #TODO publish current epsilon sub optimal solution
|
|
|
+ # TODO publish current epsilon sub optimal solution
|
|
|
self.Path = self.path()
|
|
|
# if epsilon == 1:
|
|
|
- # wait for change to occur
|
|
|
+ # wait for change to occur
|
|
|
t += 1
|
|
|
|
|
|
def path(self, s_start=None):
|
|
|
@@ -212,10 +215,10 @@ class Anytime_Dstar(object):
|
|
|
ind = 0
|
|
|
while getDist(s, s_goal) > self.env.resolution:
|
|
|
if s == self.x0:
|
|
|
- children = [i for i in self.CLOSED if getDist(s, i) <= self.env.resolution*np.sqrt(3)]
|
|
|
- else:
|
|
|
+ children = [i for i in self.CLOSED if getDist(s, i) <= self.env.resolution * np.sqrt(3)]
|
|
|
+ else:
|
|
|
children = list(self.CHILDREN[s])
|
|
|
- snext = children[np.argmin([self.getcost(s,s_p) + self.getg(s_p) for s_p in children])]
|
|
|
+ snext = children[np.argmin([self.getcost(s, s_p) + self.getg(s_p) for s_p in children])]
|
|
|
path.append([s, snext])
|
|
|
s = snext
|
|
|
if ind > 100:
|
|
|
@@ -223,6 +226,7 @@ class Anytime_Dstar(object):
|
|
|
ind += 1
|
|
|
return path
|
|
|
|
|
|
+
|
|
|
if __name__ == '__main__':
|
|
|
- AD = Anytime_Dstar(resolution = 1)
|
|
|
- AD.Main()
|
|
|
+ AD = Anytime_Dstar(resolution=0.5)
|
|
|
+ AD.Main()
|