| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- # check paper of
- # [Likhachev2005]
- import numpy as np
- import matplotlib.pyplot as plt
- import os
- import sys
- from collections import defaultdict
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Search-based Planning/")
- from Search_3D.env3D import env
- from Search_3D.utils3D import getDist, heuristic_fun, getNearest, isinbound, \
- cost, children, StateSpace
- from Search_3D.plot_util3D import visualization
- from Search_3D import queue
- import time
- class Anytime_Dstar(object):
- def __init__(self, resolution=1):
- self.Alldirec = {(1, 0, 0): 1, (0, 1, 0): 1, (0, 0, 1): 1, \
- (-1, 0, 0): 1, (0, -1, 0): 1, (0, 0, -1): 1, \
- (1, 1, 0): np.sqrt(2), (1, 0, 1): np.sqrt(2), (0, 1, 1): np.sqrt(2), \
- (-1, -1, 0): np.sqrt(2), (-1, 0, -1): np.sqrt(2), (0, -1, -1): np.sqrt(2), \
- (1, -1, 0): np.sqrt(2), (-1, 1, 0): np.sqrt(2), (1, 0, -1): np.sqrt(2), \
- (-1, 0, 1): np.sqrt(2), (0, 1, -1): np.sqrt(2), (0, -1, 1): np.sqrt(2), \
- (1, 1, 1): np.sqrt(3), (-1, -1, -1): np.sqrt(3), \
- (1, -1, -1): np.sqrt(3), (-1, 1, -1): np.sqrt(3), (-1, -1, 1): np.sqrt(3), \
- (1, 1, -1): np.sqrt(3), (1, -1, 1): np.sqrt(3), (-1, 1, 1): np.sqrt(3)}
- self.env = env(resolution=resolution)
- self.settings = 'CollisionChecking' # for collision checking
- self.x0, self.xt = tuple(self.env.start), tuple(self.env.goal)
- self.OPEN = queue.MinheapPQ()
- self.g = {} # all g initialized at inf
- self.h = {}
- self.rhs = {self.xt: 0} # rhs(x0) = 0
- self.OPEN.put(self.xt, self.key(self.xt))
- self.INCONS = set()
- self.CLOSED = set()
- # init children set:
- self.CHILDREN = {}
- # init cost set
- self.COST = defaultdict(lambda: defaultdict(dict))
- # for visualization
- self.V = set() # vertice in closed
- self.ind = 0
- self.Path = []
- self.done = False
- # epsilon in the key caculation
- self.epsilon = 1
- self.increment = 0.1
- self.decrement = 0.2
- def getcost(self, xi, xj):
- # use a LUT for getting the costd
- if xi not in self.COST:
- for (xj, xjcost) in children(self, xi, settings=1):
- self.COST[xi][xj] = cost(self, xi, xj, xjcost)
- # this might happen when there is a node changed.
- if xj not in self.COST[xi]:
- self.COST[xi][xj] = cost(self, xi, xj)
- return self.COST[xi][xj]
- def getchildren(self, xi):
- if xi not in self.CHILDREN:
- allchild = children(self, xi)
- self.CHILDREN[xi] = set(allchild)
- return self.CHILDREN[xi]
- def geth(self, xi):
- # when the heurisitic is first calculated
- if xi not in self.h:
- self.h[xi] = heuristic_fun(self, xi, self.x0)
- return self.h[xi]
- def getg(self, xi):
- if xi not in self.g:
- self.g[xi] = np.inf
- return self.g[xi]
- def getrhs(self, xi):
- if xi not in self.rhs:
- self.rhs[xi] = np.inf
- return self.rhs[xi]
- def updatecost(self, range_changed=None, new=None, old=None, mode=False):
- # scan graph for changed cost, if cost is changed update it
- CHANGED = set()
- for xi in self.CLOSED:
- if isinbound(old, xi, mode) or isinbound(new, xi, mode):
- newchildren = set(children(self, xi)) # B
- self.CHILDREN[xi] = newchildren
- for xj in newchildren:
- self.COST[xi][xj] = cost(self, xi, xj)
- CHANGED.add(xi)
- return CHANGED
- # def updateGraphCost(self, range_changed=None, new=None, old=None, mode=False):
- # # TODO scan graph for changed cost, if cost is changed update it
- # # make the graph cost via vectorization
- # CHANGED = set()
- # Allnodes = np.array(list(self.CLOSED))
- # isChanged = isinbound(old, Allnodes, mode = mode, isarray = True) & \
- # isinbound(new, Allnodes, mode = mode, isarray = True)
- # Changednodes = Allnodes[isChanged]
- # for xi in Changednodes:
- # xi = tuple(xi)
- # CHANGED.add(xi)
- # self.CHILDREN[xi] = set(children(self, xi))
- # for xj in self.CHILDREN:
- # self.COST[xi][xj] = cost(self, xi, xj)
-
- # --------------main functions for Anytime D star
- def key(self, s, epsilon=1):
- if self.getg(s) > self.getrhs(s):
- return [self.rhs[s] + epsilon * heuristic_fun(self, s, self.x0), self.rhs[s]]
- else:
- return [self.getg(s) + heuristic_fun(self, s, self.x0), self.getg(s)]
- def UpdateState(self, s):
- if s not in self.CLOSED:
- # TODO if s is not visited before
- self.g[s] = np.inf
- if s != self.xt:
- self.rhs[s] = min([self.getcost(s, s_p) + self.getg(s_p) for s_p in self.getchildren(s)])
- self.OPEN.check_remove(s)
- if self.getg(s) != self.getrhs(s):
- if s not in self.CLOSED:
- self.OPEN.put(s, self.key(s))
- else:
- self.INCONS.add(s)
- def ComputeorImprovePath(self):
- while self.OPEN.top_key() < self.key(self.x0, self.epsilon) or self.rhs[self.x0] != self.g[self.x0]:
- s = self.OPEN.get()
- if getDist(s, tuple(self.env.start)) < self.env.resolution:
- break
- if self.g[s] > self.rhs[s]:
- self.g[s] = self.rhs[s]
- self.CLOSED.add(s)
- self.V.add(s)
- for s_p in self.getchildren(s):
- self.UpdateState(s_p)
- else:
- self.g[s] = np.inf
- self.UpdateState(s)
- for s_p in self.getchildren(s):
- self.UpdateState(s_p)
- self.ind += 1
- def Main(self):
- ischanged = False
- islargelychanged = False
- t = 0
- self.ComputeorImprovePath()
- # TODO publish current epsilon sub-optimal solution
- self.done = True
- self.ind = 0
- self.Path = self.path()
- visualization(self)
- while True:
- visualization(self)
- if t == 20:
- break
- # change environment
- # new2,old2 = self.env.move_block(theta = [0,0,0.1*t], mode='rotation')
- new2, old2 = self.env.move_block(a=[0, 0, -0.2], mode='translation')
- ischanged = True
- # islargelychanged = True
- self.Path = []
- # update cost with changed environment
- if ischanged:
- # CHANGED = self.updatecost(True, new2, old2, mode='obb')
- CHANGED = self.updatecost(True, new2, old2)
- for u in CHANGED:
- self.UpdateState(u)
- self.ComputeorImprovePath()
- ischanged = False
- if islargelychanged:
- self.epsilon += self.increment # or replan from scratch
- elif self.epsilon > 1:
- self.epsilon -= self.decrement
- # move states from the INCONS to OPEN
- # update priorities in OPEN
- Allnodes = self.INCONS.union(self.OPEN.allnodes())
- for node in Allnodes:
- self.OPEN.put(node, self.key(node, self.epsilon))
- self.INCONS = set()
- self.CLOSED = set()
- self.ComputeorImprovePath()
- # publish current epsilon sub optimal solution
- self.Path = self.path()
- # if epsilon == 1:
- # wait for change to occur
- t += 1
- def path(self, s_start=None):
- '''After ComputeShortestPath()
- returns, one can then follow a shortest path from s_start to
- s_goal by always moving from the current vertex s, starting
- at s_start. , to any successor s' that minimizes c(s,s') + g(s')
- until s_goal is reached (ties can be broken arbitrarily).'''
- path = []
- s_goal = self.xt
- s = self.x0
- ind = 0
- while getDist(s, s_goal) > self.env.resolution:
- if s == self.x0:
- children = [i for i in self.CLOSED if getDist(s, i) <= self.env.resolution * np.sqrt(3)]
- else:
- children = list(self.CHILDREN[s])
- snext = children[np.argmin([self.getcost(s, s_p) + self.getg(s_p) for s_p in children])]
- path.append([s, snext])
- s = snext
- if ind > 100:
- break
- ind += 1
- return path
- if __name__ == '__main__':
- AD = Anytime_Dstar(resolution=1)
- AD.Main()
|