Dstar3D.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import numpy as np
  2. import matplotlib.pyplot as plt
  3. import os
  4. import sys
  5. from collections import defaultdict
  6. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../Search_based_Planning/")
  7. from Search_3D.env3D import env
  8. from Search_3D import Astar3D
  9. from Search_3D.utils3D import StateSpace, getDist, getNearest, getRay, isinbound, isinball, isCollide, children, cost, \
  10. initcost
  11. from Search_3D.plot_util3D import visualization
  12. class D_star(object):
  13. def __init__(self, resolution=1):
  14. self.Alldirec = {(1, 0, 0): 1, (0, 1, 0): 1, (0, 0, 1): 1, \
  15. (-1, 0, 0): 1, (0, -1, 0): 1, (0, 0, -1): 1, \
  16. (1, 1, 0): np.sqrt(2), (1, 0, 1): np.sqrt(2), (0, 1, 1): np.sqrt(2), \
  17. (-1, -1, 0): np.sqrt(2), (-1, 0, -1): np.sqrt(2), (0, -1, -1): np.sqrt(2), \
  18. (1, -1, 0): np.sqrt(2), (-1, 1, 0): np.sqrt(2), (1, 0, -1): np.sqrt(2), \
  19. (-1, 0, 1): np.sqrt(2), (0, 1, -1): np.sqrt(2), (0, -1, 1): np.sqrt(2), \
  20. (1, 1, 1): np.sqrt(3), (-1, -1, -1) : np.sqrt(3), \
  21. (1, -1, -1): np.sqrt(3), (-1, 1, -1): np.sqrt(3), (-1, -1, 1): np.sqrt(3), \
  22. (1, 1, -1): np.sqrt(3), (1, -1, 1): np.sqrt(3), (-1, 1, 1): np.sqrt(3)}
  23. self.settings = 'CollisionChecking'
  24. self.env = env(resolution=resolution)
  25. self.X = StateSpace(self.env)
  26. self.x0, self.xt = getNearest(self.X, self.env.start), getNearest(self.X, self.env.goal)
  27. # self.x0, self.xt = tuple(self.env.start), tuple(self.env.goal)
  28. self.b = defaultdict(lambda: defaultdict(dict)) # back pointers every state has one except xt.
  29. self.OPEN = {} # OPEN list, here use a hashmap implementation. hash is point, key is value
  30. self.h = {} # estimate from a point to the end point
  31. self.tag = {} # set all states to new
  32. self.V = set() # vertice in closed
  33. # for visualization
  34. self.ind = 0
  35. self.Path = []
  36. self.done = False
  37. self.Obstaclemap = {}
  38. def checkState(self, y):
  39. if y not in self.h:
  40. self.h[y] = 0
  41. if y not in self.tag:
  42. self.tag[y] = 'New'
  43. def get_kmin(self):
  44. # get the minimum of the k val in OPEN
  45. # -1 if it does not exist
  46. if self.OPEN:
  47. return min(self.OPEN.values())
  48. return -1
  49. def min_state(self):
  50. # returns the state in OPEN with min k(.)
  51. # if empty, returns None and -1
  52. # it also removes this min value form the OPEN set.
  53. if self.OPEN:
  54. minvalue = min(self.OPEN.values())
  55. for k in self.OPEN.keys():
  56. if self.OPEN[k] == minvalue:
  57. return k, self.OPEN.pop(k)
  58. return None, -1
  59. def insert(self, x, h_new):
  60. # inserting a key and value into OPEN list (s, kx)
  61. # depending on following situations
  62. if self.tag[x] == 'New':
  63. kx = h_new
  64. if self.tag[x] == 'Open':
  65. kx = min(self.OPEN[x], h_new)
  66. if self.tag[x] == 'Closed':
  67. kx = min(self.h[x], h_new)
  68. self.OPEN[x] = kx
  69. self.h[x], self.tag[x] = h_new, 'Open'
  70. def process_state(self):
  71. # main function of the D star algorithm, perform the process state
  72. # around the old path when needed.
  73. x, kold = self.min_state()
  74. self.tag[x] = 'Closed'
  75. self.V.add(x)
  76. if x is None:
  77. return -1
  78. # check if 1st timer s
  79. self.checkState(x)
  80. if kold < self.h[x]: # raised states
  81. for y in children(self, x):
  82. # check y
  83. self.checkState(y)
  84. a = self.h[y] + cost(self, y, x)
  85. if self.h[y] <= kold and self.h[x] > a:
  86. self.b[x], self.h[x] = y, a
  87. if kold == self.h[x]: # lower
  88. for y in children(self, x):
  89. # check y
  90. self.checkState(y)
  91. bb = self.h[x] + cost(self, x, y)
  92. if self.tag[y] == 'New' or \
  93. (self.b[y] == x and self.h[y] != bb) or \
  94. (self.b[y] != x and self.h[y] > bb):
  95. self.b[y] = x
  96. self.insert(y, bb)
  97. else:
  98. for y in children(self, x):
  99. # check y
  100. self.checkState(y)
  101. bb = self.h[x] + cost(self, x, y)
  102. if self.tag[y] == 'New' or \
  103. (self.b[y] == x and self.h[y] != bb):
  104. self.b[y] = x
  105. self.insert(y, bb)
  106. else:
  107. if self.b[y] != x and self.h[y] > bb:
  108. self.insert(x, self.h[x])
  109. else:
  110. if self.b[y] != x and self.h[y] > bb and \
  111. self.tag[y] == 'Closed' and self.h[y] == kold:
  112. self.insert(y, self.h[y])
  113. return self.get_kmin()
  114. def modify_cost(self, x):
  115. xparent = self.b[x]
  116. if self.tag[x] == 'Closed':
  117. self.insert(x, self.h[xparent] + cost(self, x, xparent))
  118. def modify(self, x):
  119. self.modify_cost(x)
  120. while True:
  121. kmin = self.process_state()
  122. # visualization(self)
  123. if kmin >= self.h[x]:
  124. break
  125. def path(self, goal=None):
  126. path = []
  127. if not goal:
  128. x = self.x0
  129. else:
  130. x = goal
  131. start = self.xt
  132. while x != start:
  133. path.append([np.array(x), np.array(self.b[x])])
  134. x = self.b[x]
  135. return path
  136. def run(self):
  137. # put G (ending state) into the OPEN list
  138. self.OPEN[self.xt] = 0
  139. self.tag[self.x0] = 'New'
  140. # first run
  141. while True:
  142. # TODO: self.x0 =
  143. self.process_state()
  144. # visualization(self)
  145. if self.tag[self.x0] == "Closed":
  146. break
  147. self.ind += 1
  148. self.Path = self.path()
  149. self.done = True
  150. visualization(self)
  151. plt.pause(0.2)
  152. # plt.show()
  153. # when the environemnt changes over time
  154. for i in range(5):
  155. self.env.move_block(a=[0, -0.50, 0], s=0.5, block_to_move=1, mode='translation')
  156. self.env.move_block(a=[-0.25, 0, 0], s=0.5, block_to_move=0, mode='translation')
  157. # travel from end to start
  158. s = tuple(self.env.start)
  159. # self.V = set()
  160. while s != self.xt:
  161. if s == tuple(self.env.start):
  162. sparent = self.b[self.x0]
  163. else:
  164. sparent = self.b[s]
  165. # if there is a change of Cost, or a collision.
  166. if cost(self, s, sparent) == np.inf:
  167. self.modify(s)
  168. continue
  169. self.ind += 1
  170. s = sparent
  171. self.Path = self.path()
  172. visualization(self)
  173. plt.show()
  174. if __name__ == '__main__':
  175. D = D_star(1)
  176. D.run()