LP_Astar3D.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import numpy as np
  2. import matplotlib.pyplot as plt
  3. import os
  4. import sys
  5. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Search-based Planning/")
  6. from Search_3D.env3D import env
  7. from Search_3D import Astar3D
  8. from Search_3D.utils3D import getDist, getRay, g_Space, Heuristic, getNearest, isinbound, isinball, \
  9. cost, obstacleFree
  10. from Search_3D.plot_util3D import visualization
  11. import queue
  12. import pyrr
  13. import time
  14. class Lifelong_Astar(object):
  15. def __init__(self,resolution = 1):
  16. self.Alldirec = {(1, 0, 0): 1, (0, 1, 0): 1, (0, 0, 1): 1, \
  17. (-1, 0, 0): 1, (0, -1, 0): 1, (0, 0, -1): 1, \
  18. (1, 1, 0): np.sqrt(2), (1, 0, 1): np.sqrt(2), (0, 1, 1): np.sqrt(2), \
  19. (-1, -1, 0): np.sqrt(2), (-1, 0, -1): np.sqrt(2), (0, -1, -1): np.sqrt(2), \
  20. (1, -1, 0): np.sqrt(2), (-1, 1, 0): np.sqrt(2), (1, 0, -1): np.sqrt(2), \
  21. (-1, 0, 1): np.sqrt(2), (0, 1, -1): np.sqrt(2), (0, -1, 1): np.sqrt(2), \
  22. (1, 1, 1): np.sqrt(3), (-1, -1, -1) : np.sqrt(3), \
  23. (1, -1, -1): np.sqrt(3), (-1, 1, -1): np.sqrt(3), (-1, -1, 1): np.sqrt(3), \
  24. (1, 1, -1): np.sqrt(3), (1, -1, 1): np.sqrt(3), (-1, 1, 1): np.sqrt(3)}
  25. self.env = env(resolution=resolution)
  26. self.g = g_Space(self)
  27. self.start, self.goal = getNearest(self.g, self.env.start), getNearest(self.g, self.env.goal)
  28. self.x0, self.xt = self.start, self.goal
  29. self.rhs = g_Space(self) # rhs(.) = g(.) = inf
  30. self.rhs[self.start] = 0 # rhs(x0) = 0
  31. self.h = Heuristic(self.g, self.goal)
  32. self.OPEN = queue.MinheapPQ() # store [point,priority]
  33. self.OPEN.put(self.x0, [self.h[self.x0],0])
  34. self.CLOSED = set()
  35. # used for A*
  36. self.done = False
  37. self.Path = []
  38. self.V = []
  39. self.ind = 0
  40. # initialize children list
  41. self.CHILDREN = {}
  42. self.getCHILDRENset()
  43. # initialize cost list
  44. self.COST = {}
  45. _ = self.costset()
  46. def costset(self):
  47. NodeToChange = set()
  48. for xi in self.CHILDREN.keys():
  49. children = self.CHILDREN[xi]
  50. toUpdate = [self.cost(xj,xi) for xj in children]
  51. if xi in self.COST:
  52. # if the old cost not equal to new cost
  53. diff = np.not_equal(self.COST[xi],toUpdate)
  54. cd = np.array(children)[diff]
  55. for i in cd:
  56. NodeToChange.add(tuple(i))
  57. self.COST[xi] = toUpdate
  58. else:
  59. self.COST[xi] = toUpdate
  60. return NodeToChange
  61. def getCOSTset(self,xi,xj):
  62. ind, children = 0, self.CHILDREN[xi]
  63. for i in children:
  64. if i == xj:
  65. return self.COST[xi][ind]
  66. ind += 1
  67. def children(self, x):
  68. allchild = []
  69. resolution = self.env.resolution
  70. for direc in self.Alldirec:
  71. child = tuple(map(np.add,x,np.multiply(direc,resolution)))
  72. if isinbound(self.env.boundary,child):
  73. allchild.append(child)
  74. return allchild
  75. def getCHILDRENset(self):
  76. for xi in self.g.keys():
  77. self.CHILDREN[xi] = self.children(xi)
  78. def isCollide(self, x, child):
  79. ray , dist = getRay(x, child) , getDist(x, child)
  80. if not isinbound(self.env.boundary,child):
  81. return True, dist
  82. for i in self.env.AABB_pyrr:
  83. shot = pyrr.geometric_tests.ray_intersect_aabb(ray, i)
  84. if shot is not None:
  85. dist_wall = getDist(x, shot)
  86. if dist_wall <= dist: # collide
  87. return True, dist
  88. for i in self.env.balls:
  89. if isinball(i, child):
  90. return True, dist
  91. shot = pyrr.geometric_tests.ray_intersect_sphere(ray, i)
  92. if shot != []:
  93. dists_ball = [getDist(x, j) for j in shot]
  94. if all(dists_ball <= dist): # collide
  95. return True, dist
  96. return False, dist
  97. def cost(self, x, y):
  98. collide, dist = self.isCollide(x, y)
  99. if collide: return np.inf
  100. else: return dist
  101. def key(self,xi,epsilion = 1):
  102. return [min(self.g[xi],self.rhs[xi]) + epsilion*self.h[xi],min(self.g[xi],self.rhs[xi])]
  103. def path(self):
  104. path = []
  105. x = self.xt
  106. start = self.x0
  107. ind = 0
  108. while x != start:
  109. j = x
  110. nei = self.CHILDREN[x]
  111. gset = [self.g[xi] for xi in nei]
  112. # collision check and make g cost inf
  113. for i in range(len(nei)):
  114. if self.isCollide(nei[i],j)[0]:
  115. gset[i] = np.inf
  116. parent = nei[np.argmin(gset)]
  117. path.append([x, parent])
  118. x = parent
  119. if ind > 100:
  120. break
  121. ind += 1
  122. return path
  123. #------------------Lifelong Plannning A*
  124. def UpdateMembership(self, xi, xparent=None):
  125. if xi != self.x0:
  126. self.rhs[xi] = min([self.g[j] + self.getCOSTset(xi,j) for j in self.CHILDREN[xi]])
  127. self.OPEN.check_remove(xi)
  128. if self.g[xi] != self.rhs[xi]:
  129. self.OPEN.put(xi,self.key(xi))
  130. def ComputePath(self):
  131. print('computing path ...')
  132. while self.key(self.xt) > self.OPEN.top_key() or self.rhs[self.xt] != self.g[self.xt]:
  133. xi = self.OPEN.get()
  134. # if g > rhs, overconsistent
  135. if self.g[xi] > self.rhs[xi]:
  136. self.g[xi] = self.rhs[xi]
  137. # add xi to expanded node set
  138. if xi not in self.CLOSED:
  139. self.V.append(xi)
  140. self.CLOSED.add(xi)
  141. else: # underconsistent and consistent
  142. self.g[xi] = np.inf
  143. self.UpdateMembership(xi)
  144. for xj in self.CHILDREN[xi]:
  145. self.UpdateMembership(xj)
  146. # visualization(self)
  147. self.ind += 1
  148. self.Path = self.path()
  149. self.done = True
  150. visualization(self)
  151. plt.pause(2)
  152. def change_env(self):
  153. self.env.New_block()
  154. self.done = False
  155. self.Path = []
  156. self.CLOSED = set()
  157. N = self.costset()
  158. for xi in N:
  159. self.UpdateMembership(xi)
  160. if __name__ == '__main__':
  161. sta = time.time()
  162. Astar = Lifelong_Astar(1)
  163. Astar.ComputePath()
  164. Astar.change_env()
  165. Astar.ComputePath()
  166. plt.show()
  167. print(time.time() - sta)