|
|
@@ -3,11 +3,12 @@ import matplotlib.pyplot as plt
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
+from collections import defaultdict
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Search-based Planning/")
|
|
|
from Search_3D.env3D import env
|
|
|
from Search_3D import Astar3D
|
|
|
-from Search_3D.utils3D import getDist, getRay
|
|
|
+from Search_3D.utils3D import getDist, getRay, isinbound, isinball
|
|
|
import pyrr
|
|
|
|
|
|
def StateSpace(env, factor = 0):
|
|
|
@@ -19,29 +20,70 @@ def StateSpace(env, factor = 0):
|
|
|
xarr = np.arange(xmin,xmax,resolution).astype(float)
|
|
|
yarr = np.arange(ymin,ymax,resolution).astype(float)
|
|
|
zarr = np.arange(zmin,zmax,resolution).astype(float)
|
|
|
- g = {}
|
|
|
+ g = set()
|
|
|
for x in xarr:
|
|
|
for y in yarr:
|
|
|
for z in zarr:
|
|
|
- g[(x,y,z)] = np.inf
|
|
|
+ g.add((x,y,z))
|
|
|
return g
|
|
|
|
|
|
-def Heuristic(initparams,x):
|
|
|
- h = {}
|
|
|
- x = np.array(x)
|
|
|
- for xi in initparams.g.keys():
|
|
|
- h[xi] = max(abs(x-np.array(xi)))
|
|
|
- return h
|
|
|
-
|
|
|
def getNearest(Space,pt):
|
|
|
'''get the nearest point on the grid'''
|
|
|
mindis,minpt = 1000,None
|
|
|
- for pts in Space.keys():
|
|
|
+ for pts in Space:
|
|
|
dis = getDist(pts,pt)
|
|
|
if dis < mindis:
|
|
|
mindis,minpt = dis,pts
|
|
|
return minpt
|
|
|
|
|
|
+def isCollide(initparams, x, child):
|
|
|
+ '''see if line intersects obstacle'''
|
|
|
+ ray , dist = getRay(x, child) , getDist(x, child)
|
|
|
+ if not isinbound(initparams.env.boundary,child):
|
|
|
+ return True, dist
|
|
|
+ for i in initparams.env.AABB:
|
|
|
+ shot = pyrr.geometric_tests.ray_intersect_aabb(ray, i)
|
|
|
+ if shot is not None:
|
|
|
+ dist_wall = getDist(x, shot)
|
|
|
+ if dist_wall <= dist: # collide
|
|
|
+ return True, dist
|
|
|
+ for i in initparams.env.balls:
|
|
|
+ if isinball(i, child):
|
|
|
+ return True, dist
|
|
|
+ shot = pyrr.geometric_tests.ray_intersect_sphere(ray, i)
|
|
|
+ if shot != []:
|
|
|
+ dists_ball = [getDist(x, j) for j in shot]
|
|
|
+ if all(dists_ball <= dist): # collide
|
|
|
+ return True, dist
|
|
|
+ return False, dist
|
|
|
+
|
|
|
+def children(initparams, x):
|
|
|
+ # get the neighbor of a specific state
|
|
|
+ allchild = []
|
|
|
+ resolution = initparams.env.resolution
|
|
|
+ for direc in initparams.Alldirec:
|
|
|
+ child = tuple(map(np.add,x,np.multiply(direc,resolution)))
|
|
|
+ if isinbound(initparams.env.boundary,child):
|
|
|
+ allchild.append(child)
|
|
|
+ return allchild
|
|
|
+
|
|
|
+def cost(initparams, x, y):
|
|
|
+ # get the cost between two points,
|
|
|
+ # do collision check here
|
|
|
+ collide, dist = isCollide(initparams,x,y)
|
|
|
+ if collide: return np.inf
|
|
|
+ else: return dist
|
|
|
+
|
|
|
+def initcost(initparams):
|
|
|
+ # initialize cost dictionary, could be modifed lateron
|
|
|
+ c = defaultdict(lambda: defaultdict(dict)) # two key dicionary
|
|
|
+ for xi in initparams.X:
|
|
|
+ cdren = children(initparams, xi)
|
|
|
+ for child in cdren:
|
|
|
+ c[xi][child] = cost(initparams, xi, child)
|
|
|
+ return c
|
|
|
+
|
|
|
+
|
|
|
class D_star(object):
|
|
|
def __init__(self,resolution = 1):
|
|
|
self.Alldirec = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 1, 0], [1, 0, 1], [0, 1, 1], [1, 1, 1],
|
|
|
@@ -50,10 +92,110 @@ class D_star(object):
|
|
|
[1, -1, 0], [-1, 1, 0], [1, 0, -1], [-1, 0, 1], [0, 1, -1], [0, -1, 1],
|
|
|
[1, -1, -1], [-1, 1, -1], [-1, -1, 1], [1, 1, -1], [1, -1, 1], [-1, 1, 1]])
|
|
|
self.env = env(resolution = resolution)
|
|
|
- self.g = StateSpace(self.env)
|
|
|
- self.x0, self.xt = getNearest(self.g, self.env.start), getNearest(self.g, self.env.goal)
|
|
|
- self.h = Heuristic(self,self.x0) # getting heuristic for x0
|
|
|
+ self.X = StateSpace(self.env)
|
|
|
+ self.x0, self.xt = getNearest(self.X, self.env.start), getNearest(self.X, self.env.goal)
|
|
|
+ self.b = {} # back pointers every state has one except xt.
|
|
|
+ self.OPEN = {} # OPEN list, here use a hashmap implementation. hash is point, key is value
|
|
|
+ self.h = self.initH() # estimate from a point to the end point
|
|
|
+ self.tag = self.initTag() # set all states to new
|
|
|
+
|
|
|
+ # initialize cost set
|
|
|
+ self.c = initcost(self)
|
|
|
+
|
|
|
+ # put G (ending state) into the OPEN list
|
|
|
+ self.OPEN[self.xt] = 0
|
|
|
+
|
|
|
+ def initH(self):
|
|
|
+ # h set, all initialzed h vals are 0 for all states.
|
|
|
+ h = {}
|
|
|
+ for xi in self.X:
|
|
|
+ h[xi] = 0
|
|
|
+ return h
|
|
|
+
|
|
|
+ def initTag(self):
|
|
|
+ # tag , New point (never been in the OPEN list)
|
|
|
+ # Open point ( currently in OPEN )
|
|
|
+ # Closed (currently in CLOSED)
|
|
|
+ t = {}
|
|
|
+ for xi in self.X:
|
|
|
+ t[xi] = 'New'
|
|
|
+ return t
|
|
|
+
|
|
|
+ def get_kmin(self):
|
|
|
+ # get the minimum of the k val in OPEN
|
|
|
+ # -1 if it does not exist
|
|
|
+ if self.OPEN:
|
|
|
+ minv = np.inf
|
|
|
+ for k,v in enumerate(self.OPEN):
|
|
|
+ if v < minv: minv = v
|
|
|
+ return minv
|
|
|
+ return -1
|
|
|
+
|
|
|
+ def min_state(self):
|
|
|
+ # returns the state in OPEN with min k(.)
|
|
|
+ # if empty, returns None and -1
|
|
|
+ # it also removes this min value form the OPEN set.
|
|
|
+ if self.OPEN:
|
|
|
+ minv = np.inf
|
|
|
+ for k,v in enumerate(self.OPEN):
|
|
|
+ if v < minv: mink, minv = k, v
|
|
|
+ return mink, self.OPEN.pop(mink)
|
|
|
+ return None, -1
|
|
|
+
|
|
|
+ def insert(self, x, h_new):
|
|
|
+ # inserting a key and value into OPEN list (x, kx)
|
|
|
+ # depending on following situations
|
|
|
+ if self.tag[x] == 'New':
|
|
|
+ kx = h_new
|
|
|
+ if self.tag[x] == 'Open':
|
|
|
+ kx = min(self.OPEN[x],h_new)
|
|
|
+ if self.tag[x] == 'Closed':
|
|
|
+ kx = min(self.h[x], h_new)
|
|
|
+ self.OPEN[x] = kx
|
|
|
+ self.h[x],self.tag[x] = h_new, 'Open'
|
|
|
+
|
|
|
+ def process_state(self):
|
|
|
+ x, kold = self.min_state()
|
|
|
+ self.tag[x] = 'Closed'
|
|
|
+ if x == None: return -1
|
|
|
+ if kold < self.h[x]: # raised states
|
|
|
+ for y in children(self,x):
|
|
|
+ a = self.h[y] + self.c[y][x]
|
|
|
+ if self.h[y] <= kold and self.h[x] > a:
|
|
|
+ self.b[x], self.h[x] = y , a
|
|
|
+ elif kold == self.h[x]:# lower
|
|
|
+ for y in children(self,x):
|
|
|
+ bb = self.h[x] + self.c[x][y]
|
|
|
+ if self.tag[y] == 'New' or \
|
|
|
+ (self.b[y] == x and self.h[y] != bb) or \
|
|
|
+ (self.b[y] != x and self.h[y] > bb):
|
|
|
+ self.b[y] = x
|
|
|
+ self.insert(y, bb)
|
|
|
+ else:
|
|
|
+ for y in children(self,x):
|
|
|
+ bb = self.h[x] + self.c[x][y]
|
|
|
+ if self.tag[y] == 'New' or \
|
|
|
+ (self.b[y] == x and self.h[y] != bb):
|
|
|
+ self.b[y] = x
|
|
|
+ self.insert(y, bb)
|
|
|
+ else:
|
|
|
+ if self.b[y] != x and self.h[y] > bb:
|
|
|
+ self.insert(x, self.h[x])
|
|
|
+ else:
|
|
|
+ if self.b[y] != x and self.h[y] > bb and \
|
|
|
+ self.tag[y] == 'Closed' and self.h[y] == kold:
|
|
|
+ self.insert(y, self.h[y])
|
|
|
+ return self.get_kmin()
|
|
|
+
|
|
|
+ def modify_cost(self,x,y,cval):
|
|
|
+ self.c[x][y] = cval # set the new cost to the cval
|
|
|
+ if self.tag[x] == 'Closed': self.insert(x,self.h[x])
|
|
|
+ return self.get_kmin()
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ # TODO: implementation of changing obstable in process
|
|
|
+ pass
|
|
|
+
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- D = D_star(1)
|
|
|
- print(D.h[D.x0])
|
|
|
+ D = D_star(1)
|