DstarLite3D.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import numpy as np
  2. import matplotlib.pyplot as plt
  3. import os
  4. import sys
  5. from collections import defaultdict
  6. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Search-based Planning/")
  7. from Search_3D.env3D import env
  8. from Search_3D.utils3D import getDist, heuristic_fun, getNearest, isinbound, \
  9. cost, children, StateSpace
  10. from Search_3D.plot_util3D import visualization
  11. from Search_3D import queue
  12. import time
  13. class D_star_Lite(object):
  14. # Original version of the D*lite
  15. def __init__(self, resolution = 1):
  16. self.Alldirec = {(1, 0, 0): 1, (0, 1, 0): 1, (0, 0, 1): 1, \
  17. (-1, 0, 0): 1, (0, -1, 0): 1, (0, 0, -1): 1, \
  18. (1, 1, 0): np.sqrt(2), (1, 0, 1): np.sqrt(2), (0, 1, 1): np.sqrt(2), \
  19. (-1, -1, 0): np.sqrt(2), (-1, 0, -1): np.sqrt(2), (0, -1, -1): np.sqrt(2), \
  20. (1, -1, 0): np.sqrt(2), (-1, 1, 0): np.sqrt(2), (1, 0, -1): np.sqrt(2), \
  21. (-1, 0, 1): np.sqrt(2), (0, 1, -1): np.sqrt(2), (0, -1, 1): np.sqrt(2), \
  22. (1, 1, 1): np.sqrt(3), (-1, -1, -1) : np.sqrt(3), \
  23. (1, -1, -1): np.sqrt(3), (-1, 1, -1): np.sqrt(3), (-1, -1, 1): np.sqrt(3), \
  24. (1, 1, -1): np.sqrt(3), (1, -1, 1): np.sqrt(3), (-1, 1, 1): np.sqrt(3)}
  25. self.env = env(resolution=resolution)
  26. #self.X = StateSpace(self.env)
  27. #self.x0, self.xt = getNearest(self.X, self.env.start), getNearest(self.X, self.env.goal)
  28. self.settings = 'CollisionChecking' # for collision checking
  29. self.x0, self.xt = tuple(self.env.start), tuple(self.env.goal)
  30. # self.OPEN = queue.QueuePrior()
  31. self.OPEN = queue.MinheapPQ()
  32. self.km = 0
  33. self.g = {} # all g initialized at inf
  34. self.rhs = {self.xt:0} # rhs(x0) = 0
  35. self.h = {}
  36. self.OPEN.put(self.xt, self.CalculateKey(self.xt))
  37. self.CLOSED = set()
  38. # init children set:
  39. self.CHILDREN = {}
  40. # init cost set
  41. self.COST = defaultdict(lambda: defaultdict(dict))
  42. # for visualization
  43. self.V = set() # vertice in closed
  44. self.ind = 0
  45. self.Path = []
  46. self.done = False
  47. def updatecost(self,range_changed=None, new=None, old=None, mode=False):
  48. # scan graph for changed cost, if cost is changed update it
  49. CHANGED = set()
  50. for xi in self.CLOSED:
  51. if xi in self.CHILDREN:
  52. oldchildren = self.CHILDREN[xi]# A
  53. if isinbound(old, xi, mode) or isinbound(new, xi, mode):
  54. newchildren = set(children(self,xi))# B
  55. removed = oldchildren.difference(newchildren)
  56. intersection = oldchildren.intersection(newchildren)
  57. added = newchildren.difference(oldchildren)
  58. self.CHILDREN[xi] = newchildren
  59. for xj in removed:
  60. self.COST[xi][xj] = cost(self, xi, xj)
  61. for xj in intersection.union(added):
  62. self.COST[xi][xj] = cost(self, xi, xj)
  63. CHANGED.add(xi)
  64. else:
  65. if isinbound(old, xi, mode) or isinbound(new, xi, mode):
  66. CHANGED.add(xi)
  67. children_added = set(children(self,xi))
  68. self.CHILDREN[xi] = children_added
  69. for xj in children_added:
  70. self.COST[xi][xj] = cost(self, xi, xj)
  71. return CHANGED
  72. def getcost(self, xi, xj):
  73. # use a LUT for getting the costd
  74. if xi not in self.COST:
  75. for (xj,xjcost) in children(self, xi, settings=1):
  76. self.COST[xi][xj] = cost(self, xi, xj, xjcost)
  77. # this might happen when there is a node changed.
  78. if xj not in self.COST[xi]:
  79. self.COST[xi][xj] = cost(self, xi, xj)
  80. return self.COST[xi][xj]
  81. def getchildren(self, xi):
  82. if xi not in self.CHILDREN:
  83. allchild = children(self, xi)
  84. self.CHILDREN[xi] = set(allchild)
  85. return self.CHILDREN[xi]
  86. def geth(self, xi):
  87. # when the heurisitic is first calculated
  88. if xi not in self.h:
  89. self.h[xi] = heuristic_fun(self, xi, self.x0)
  90. return self.h[xi]
  91. def getg(self, xi):
  92. if xi not in self.g:
  93. self.g[xi] = np.inf
  94. return self.g[xi]
  95. def getrhs(self, xi):
  96. if xi not in self.rhs:
  97. self.rhs[xi] = np.inf
  98. return self.rhs[xi]
  99. #-------------main functions for D*Lite-------------
  100. def CalculateKey(self, s, epsilion = 1):
  101. return [min(self.getg(s), self.getrhs(s)) + epsilion * self.geth(s) + self.km, min(self.getg(s), self.getrhs(s))]
  102. def UpdateVertex(self, u):
  103. # if still in the hunt
  104. if not getDist(self.xt, u) <= self.env.resolution: # originally: u != s_goal
  105. if u in self.CHILDREN and len(self.CHILDREN[u]) == 0:
  106. self.rhs[u] = np.inf
  107. else:
  108. self.rhs[u] = min([self.getcost(s, u) + self.getg(s) for s in self.getchildren(u)])
  109. # if u is in OPEN, remove it
  110. self.OPEN.check_remove(u)
  111. # if rhs(u) not equal to g(u)
  112. if self.getg(u) != self.getrhs(u):
  113. self.OPEN.put(u, self.CalculateKey(u))
  114. def ComputeShortestPath(self):
  115. while self.OPEN.top_key() < self.CalculateKey(self.x0) or self.getrhs(self.x0) != self.getg(self.x0) :
  116. kold = self.OPEN.top_key()
  117. u = self.OPEN.get()
  118. self.V.add(u)
  119. self.CLOSED.add(u)
  120. if not self.done: # first time running, we need to stop on this condition
  121. if getDist(self.x0,u) < 1*self.env.resolution:
  122. self.x0 = u
  123. break
  124. if kold < self.CalculateKey(u):
  125. self.OPEN.put(u, self.CalculateKey(u))
  126. if self.getg(u) > self.getrhs(u):
  127. self.g[u] = self.rhs[u]
  128. else:
  129. self.g[u] = np.inf
  130. self.UpdateVertex(u)
  131. for s in self.getchildren(u):
  132. self.UpdateVertex(s)
  133. # visualization(self)
  134. self.ind += 1
  135. def main(self):
  136. s_last = self.x0
  137. print('first run ...')
  138. self.ComputeShortestPath()
  139. self.Path = self.path()
  140. self.done = True
  141. visualization(self)
  142. plt.pause(0.5)
  143. # plt.show()
  144. print('running with map update ...')
  145. t = 0 # count time
  146. ischanged = False
  147. self.V = set()
  148. while getDist(self.x0, self.xt) > 2*self.env.resolution:
  149. #---------------------------------- at specific times, the environment is changed and cost is updated
  150. if t % 2 == 0:
  151. new0,old0 = self.env.move_block(a=[-0.1, 0, -0.2], s=0.5, block_to_move=1, mode='translation')
  152. new1,old1 = self.env.move_block(a=[0, 0, -0.2], s=0.5, block_to_move=0, mode='translation')
  153. new2,old2 = self.env.move_block(theta = [0,0,0.1*t], mode='rotation')
  154. #new2,old2 = self.env.move_block(a=[-0.3, 0, -0.1], s=0.5, block_to_move=1, mode='translation')
  155. ischanged = True
  156. self.Path = []
  157. #----------------------------------- traverse the route as originally planned
  158. if t == 0:
  159. children_new = [i for i in self.CLOSED if getDist(self.x0, i) <= self.env.resolution*np.sqrt(3)]
  160. else:
  161. children_new = list(children(self,self.x0))
  162. self.x0 = children_new[np.argmin([self.getcost(self.x0,s_p) + self.getg(s_p) for s_p in children_new])]
  163. # TODO add the moving robot position codes
  164. self.env.start = self.x0
  165. # ---------------------------------- if any cost changed, update km, reset slast,
  166. # for all directed edgees (u,v) with chaged edge costs,
  167. # update the edge cost c(u,v) and update vertex u. then replan
  168. if ischanged:
  169. self.km += heuristic_fun(self, self.x0, s_last)
  170. s_last = self.x0
  171. CHANGED = self.updatecost(True, new0, old0)
  172. CHANGED1 = self.updatecost(True, new1, old1)
  173. CHANGED2 = self.updatecost(True, new2, old2, mode='obb')
  174. CHANGED = CHANGED.union(CHANGED1, CHANGED2)
  175. # self.V = set()
  176. for u in CHANGED:
  177. self.UpdateVertex(u)
  178. self.ComputeShortestPath()
  179. ischanged = False
  180. self.Path = self.path(self.x0)
  181. visualization(self)
  182. t += 1
  183. plt.show()
  184. def path(self, s_start=None):
  185. '''After ComputeShortestPath()
  186. returns, one can then follow a shortest path from s_start to
  187. s_goal by always moving from the current vertex s, starting
  188. at s_start. , to any successor s' that minimizes c(s,s') + g(s')
  189. until s_goal is reached (ties can be broken arbitrarily).'''
  190. path = []
  191. s_goal = self.xt
  192. if not s_start:
  193. s = self.x0
  194. else:
  195. s= s_start
  196. ind = 0
  197. while s != s_goal:
  198. if s == self.x0:
  199. children = [i for i in self.CLOSED if getDist(s, i) <= self.env.resolution*np.sqrt(3)]
  200. else:
  201. children = list(self.CHILDREN[s])
  202. snext = children[np.argmin([self.getcost(s,s_p) + self.getg(s_p) for s_p in children])]
  203. path.append([s, snext])
  204. s = snext
  205. if ind > 100:
  206. break
  207. ind += 1
  208. return path
  209. if __name__ == '__main__':
  210. D_lite = D_star_Lite(1)
  211. a = time.time()
  212. D_lite.main()
  213. print('used time (s) is ' + str(time.time() - a))