dynamic_prog.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # ----------
  2. # User Instructions:
  3. #
  4. # Create a function compute_value which returns
  5. # a grid of values. The value of a cell is the minimum
  6. # number of moves required to get from the cell to the goal.
  7. #
  8. # If a cell is a wall or it is impossible to reach the goal from a cell,
  9. # assign that cell a value of 99.
  10. # ----------
  11. import heapq
  12. from collections import deque
  13. import numpy as np
  14. from icecream import ic
  15. from first_search import check_navigable
  16. grid = [[0, 1, 0, 0, 0, 0],
  17. [0, 1, 0, 0, 0, 0],
  18. [0, 1, 0, 0, 0, 0],
  19. [0, 1, 0, 0, 0, 0],
  20. [0, 0, 0, 0, 1, 0]]
  21. grid = np.array([[0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0],
  22. [0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0],
  23. [0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0],
  24. [0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0],
  25. [0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0],
  26. [0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0],
  27. [0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0],
  28. [0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0],
  29. [0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0],
  30. [0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0]])
  31. # init = [0,0]
  32. goal = [len(grid)-1, len(grid[0])-1]
  33. cost = 1 # the cost associated with moving from a cell to an adjacent one
  34. delta = [[-1, 0], # go up
  35. [0, -1], # go left
  36. [1, 0], # go down
  37. [0, 1]] # go right
  38. delta_name = ['^', '<', 'v', '>']
  39. def heuristic(*args):
  40. return 0
  41. def compute_value(grid, goal, cost):
  42. """Compute the reversed cost map. The goal point will have zero cost and
  43. each grid has a cost value representing the cost from here to the goal.
  44. Parameters
  45. ----------
  46. grid : np.ndarray
  47. Map of the env.
  48. goal : np.ndarray
  49. Goal position.
  50. cost : float
  51. Cost value for each step.
  52. Returns
  53. -------
  54. np.ndarray
  55. A reversed cost map.
  56. """
  57. grid = np.array(grid)
  58. goal = np.array(goal)
  59. open_list = []
  60. heapq.heappush(open_list, np.concatenate(
  61. ([heuristic(goal, goal), 0], goal)))
  62. closed_list = dict()
  63. closed_list[tuple(goal)] = 0
  64. cost_map = np.zeros([len(grid), len(grid[0])]) + float('inf')
  65. cost_map[goal[0], goal[1]] = 0
  66. expand_map = np.zeros([len(grid), len(grid[0])]) - float('inf')
  67. current_expand_num = 0
  68. expand_map[goal[0], goal[1]] = current_expand_num
  69. while open_list:
  70. current_center_grid = heapq.heappop(open_list)
  71. current_center_grid = current_center_grid[2:4] # type: list[int]
  72. if check_navigable(grid, current_center_grid):
  73. for move_num in range(len(delta)):
  74. next = np.array(current_center_grid)+delta[move_num]
  75. next = next.astype('int') # type: np.ndarray[int]
  76. if check_navigable(grid, next) and tuple(next) not in closed_list:
  77. current_expand_num += 1
  78. current_cost = closed_list[current_center_grid[0],
  79. current_center_grid[1]]+cost
  80. expand_grid = [heuristic(next, goal)+current_expand_num,
  81. current_expand_num] + list(next)
  82. heapq.heappush(open_list, expand_grid)
  83. closed_list[tuple(next)] = current_cost
  84. cost_map[next[0], next[1]] = current_cost
  85. expand_map[next[0], next[1]] = current_expand_num
  86. return cost_map
  87. def generate_policy(cost_map, goal):
  88. def check_navigable(cost_map, pos):
  89. goal = np.array(pos).astype(int)
  90. if all(goal >= np.array([0, 0])) and all(goal <= np.array([len(cost_map)-1, len(cost_map[0])-1])):
  91. if cost_map[goal[0], goal[1]] != float('inf'):
  92. return True
  93. return False
  94. delta = np.array([[-1, 0], # type: np.ndarray[int]
  95. [0, -1], # go left
  96. [1, 0], # go down
  97. [0, 1]]) # go right
  98. cost_map = np.array(cost_map)
  99. policy_map = np.array([['+' for _ in range(len(cost_map[0]))]
  100. for _ in range(len(cost_map))])
  101. policy_map[goal[0], goal[1]] = '*'
  102. open_list = deque()
  103. checked = set([tuple(goal)])
  104. neighbor_grids = np.array([goal for _ in range(4)]) + delta
  105. neighbor_grids = map(lambda pos: pos if check_navigable(
  106. cost_map, pos) else False, neighbor_grids)
  107. # print(list(neighbor_grids))
  108. for pos in neighbor_grids:
  109. if type(pos) == np.ndarray:
  110. open_list.append(pos)
  111. while open_list:
  112. current_grid = open_list.popleft()
  113. checked.add(tuple(current_grid))
  114. neighbor_grids = np.array([current_grid for _ in range(4)]) + delta
  115. neighbor_costs = list(map(lambda pos: cost_map[pos[0], pos[1]] if check_navigable(
  116. cost_map, pos) else float('inf'), neighbor_grids))
  117. # expand the open list
  118. for i in range(4):
  119. if neighbor_costs[i] != float('inf') and tuple(neighbor_grids[i]) not in checked:
  120. open_list.append(neighbor_grids[i])
  121. minimum_cost_grid_index = np.argmin(neighbor_costs)
  122. dlt_opst = ['^', '<', 'v', '>']
  123. policy_map[current_grid[0], current_grid[1]
  124. ] = dlt_opst[minimum_cost_grid_index]
  125. return policy_map
  126. cost_map = compute_value(grid, goal, cost)
  127. ic(generate_policy(cost_map, goal))