zhm-real před 5 roky
rodič
revize
d4e622da39

+ 2 - 7
Search-based Planning/.idea/workspace.xml

@@ -2,15 +2,10 @@
 <project version="4">
   <component name="ChangeListManager">
     <list default="true" id="025aff36-a6aa-4945-ab7e-b2c625055f47" name="Default Changelist" comment="">
-      <change afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/motion_model.py" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/policy_iteration.py" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/a_star.py" beforeDir="false" afterPath="$PROJECT_DIR$/a_star.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/bfs.py" beforeDir="false" afterPath="$PROJECT_DIR$/bfs.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/dfs.py" beforeDir="false" afterPath="$PROJECT_DIR$/dfs.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/dijkstra.py" beforeDir="false" afterPath="$PROJECT_DIR$/dijkstra.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/env.py" beforeDir="false" afterPath="$PROJECT_DIR$/env.py" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/env.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/env.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/motion model.py" beforeDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/tools.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/tools.py" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/value_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/value_iteration.py" afterDir="false" />
     </list>
     <option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" />

binární
Stochastic Shortest Path/__pycache__/env.cpython-37.pyc


binární
Stochastic Shortest Path/__pycache__/tools.cpython-37.pyc


+ 1 - 17
Stochastic Shortest Path/env.py

@@ -41,24 +41,8 @@ def obs_map():
 
 def lose_map():
     lose = []
-    for i in range(27, 34):
+    for i in range(25, 36):
         lose.append((i, 13))
     return lose
 
 
-def show_map(xI, xG, obs_map, lose_map, name):
-    obs_x = [obs_map[i][0] for i in range(len(obs_map))]
-    obs_y = [obs_map[i][1] for i in range(len(obs_map))]
-
-    lose_x = [lose_map[i][0] for i in range(len(lose_map))]
-    lose_y = [lose_map[i][1] for i in range(len(lose_map))]
-
-    plt.plot(xI[0], xI[1], "bs")
-    plt.plot(xG[0], xG[1], "gs")
-    plt.plot(obs_x, obs_y, "sk")
-    plt.plot(lose_x, lose_y, marker = 's', color = '#A52A2A')
-    plt.title(name, fontdict=None)
-    plt.grid(True)
-    plt.axis("equal")
-    plt.show()
-

+ 125 - 0
Stochastic Shortest Path/policy_iteration.py

@@ -0,0 +1,125 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+@author: huiming zhou
+"""
+
+import env
+import tools
+import motion_model
+
+import matplotlib.pyplot as plt
+import numpy as np
+import copy
+import sys
+
+
+class Policy_iteration:
+    def __init__(self, x_start, x_goal):
+        self.u_set = motion_model.motions  # feasible input set
+        self.xI, self.xG = x_start, x_goal
+        self.e = 0.001
+        self.gamma = 0.9
+        self.obs = env.obs_map()  # position of obstacles
+        self.lose = env.lose_map()
+        self.name1 = "policy_iteration, e=" + str(self.e) + ", gamma=" + str(self.gamma)
+        self.name2 = "convergence of error"
+
+
+    def policy_evaluation(self, policy, value):
+        delta = sys.maxsize
+
+        while delta > self.e:
+            x_value = 0
+            for x in value:
+                if x in self.xG: continue
+                else:
+                    [x_next, p_next] = motion_model.move_prob(x, policy[x], self.obs)
+                    v_Q = self.cal_Q_value(x_next, p_next, value)
+                    v_diff = abs(value[x] - v_Q)
+                    value[x] = v_Q
+                    if v_diff > 0:
+                        x_value = max(x_value, v_diff)
+            delta = x_value
+        return value
+
+
+    def policy_improvement(self, policy, value):
+        for x in value:
+            if x in self.xG: continue
+            else:
+                value_list = []
+                for u in self.u_set:
+                    [x_next, p_next] = motion_model.move_prob(x, u, self.obs)
+                    value_list.append(self.cal_Q_value(x_next, p_next, value))
+                policy[x] = self.u_set[int(np.argmax(value_list))]
+        return policy
+
+
+    def iteration(self):
+        value_table = {}
+        policy = {}
+
+        for i in range(env.x_range):
+            for j in range(env.y_range):
+                if (i, j) not in self.obs:
+                    value_table[(i, j)] = 0
+                    policy[(i, j)] = self.u_set[0]
+
+        while True:
+            policy_back = copy.deepcopy(policy)
+            value_table = self.policy_evaluation(policy, value_table)
+            policy = self.policy_improvement(policy, value_table)
+            if policy_back == policy: break
+        return value_table, policy
+
+
+    def simulation(self, xI, xG, policy):
+        path = []
+        x = xI
+        while x not in xG:
+            u = policy[x]
+            x_next = (x[0] + u[0], x[1] + u[1])
+            if x_next not in self.obs:
+                x = x_next
+                path.append(x)
+        path.pop()
+        return path
+
+
+    def animation(self, path):
+        plt.figure(1)
+        tools.show_map(self.xI, self.xG, self.obs, self.lose, self.name1)
+        for x in path:
+            tools.plot_dots(x)
+        plt.show()
+
+
+    def cal_Q_value(self, x, p, table):
+        value = 0
+        reward = self.get_reward(x)
+        for i in range(len(x)):
+            value += p[i] * (reward[i] + self.gamma * table[x[i]])
+        return value
+
+    def get_reward(self, x_next):
+        reward = []
+        for x in x_next:
+            if x in self.xG:
+                reward.append(10)
+            elif x in self.lose:
+                reward.append(-10)
+            else:
+                reward.append(0)
+        return reward
+
+
+if __name__ == '__main__':
+    x_Start = (5, 5)
+    x_Goal = [(49, 5), (49, 25)]
+
+    PI = Policy_iteration(x_Start, x_Goal)
+    [value_PI, policy_PI] = PI.iteration()
+    path_PI = PI.simulation(x_Start, x_Goal, policy_PI)
+
+    PI.animation(path_PI)

+ 23 - 3
Stochastic Shortest Path/tools.py

@@ -5,6 +5,7 @@
 """
 
 import matplotlib.pyplot as plt
+import env
 
 def extract_path(xI, xG, parent, actions):
     """
@@ -44,10 +45,29 @@ def showPath(xI, xG, path):
     plt.show()
 
 
-def plot_dots(x, length):
+def show_map(xI, xG, obs_map, lose_map, name):
+    obs_x = [obs_map[i][0] for i in range(len(obs_map))]
+    obs_y = [obs_map[i][1] for i in range(len(obs_map))]
+
+    lose_x = [lose_map[i][0] for i in range(len(lose_map))]
+    lose_y = [lose_map[i][1] for i in range(len(lose_map))]
+
+    plt.plot(xI[0], xI[1], "bs")
+
+    for x in xG:
+        plt.plot(x[0], x[1], "gs")
+
+    plt.plot(obs_x, obs_y, "sk")
+    plt.plot(lose_x, lose_y, marker = 's', color = '#A52A2A')
+    plt.title(name, fontdict=None)
+    plt.grid(True)
+    plt.axis("equal")
+
+
+def plot_dots(x):
     plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
     plt.gcf().canvas.mpl_connect('key_release_event',
                                  lambda event: [exit(0) if event.key == 'escape' else None])
-    if length % 15 == 0:
-        plt.pause(0.001)
+    plt.pause(0.001)
+
 

+ 80 - 11
Stochastic Shortest Path/value_iteration.py

@@ -8,37 +8,106 @@ import env
 import tools
 import motion_model
 
+import matplotlib.pyplot as plt
 import numpy as np
-import copy
+import sys
 
 class Value_iteration:
     def __init__(self, x_start, x_goal):
         self.u_set = motion_model.motions                      # feasible input set
         self.xI, self.xG = x_start, x_goal
-        self.T = 500
+        self.e = 0.001
         self.gamma = 0.9
         self.obs = env.obs_map()                               # position of obstacles
         self.lose = env.lose_map()
-        self.name = "value_iteration, T=" + str(self.T) + ", gamma=" + str(self.gamma)
+        self.name1 = "value_iteration, e=" + str(self.e) + ", gamma=" + str(self.gamma)
+        self.name2 = "convergence of error"
 
-        env.show_map(self.xI, self.xG, self.obs, self.lose, self.name)
 
     def iteration(self):
         value_table = {}
         policy = {}
+        diff = []
+        delta = sys.maxsize
 
         for i in range(env.x_range):
             for j in range(env.y_range):
                 if (i, j) not in self.obs:
                     value_table[(i, j)] = 0
 
-        for k in range(self.T):
-            value_table_update = copy.deepcopy(value_table)
-            for key in value_table:
-                
+        while delta > self.e:
+            x_value = 0
+            for x in value_table:
+                if x in self.xG: continue
+                else:
+                    value_list = []
+                    for u in self.u_set:
+                        [x_next, p_next] = motion_model.move_prob(x, u, self.obs)
+                        value_list.append(self.cal_Q_value(x_next, p_next, value_table))
+                    policy[x] = self.u_set[int(np.argmax(value_list))]
+                    v_diff = abs(value_table[x] - max(value_list))
+                    value_table[x] = max(value_list)
+                    if v_diff > 0:
+                        x_value = max(x_value, v_diff)
+            delta = x_value
+            diff.append(delta)
+        return value_table, policy, diff
+
+
+    def simulation(self, xI, xG, policy):
+        path = []
+        x = xI
+        while x not in xG:
+            u = policy[x]
+            x_next = (x[0] + u[0], x[1] + u[1])
+            if x_next not in self.obs:
+                x = x_next
+                path.append(x)
+        path.pop()
+        return path
+
+
+    def animation(self, path, diff):
+        plt.figure(1)
+        tools.show_map(self.xI, self.xG, self.obs, self.lose, self.name1)
+        for x in path:
+            tools.plot_dots(x)
+        plt.show()
+
+        plt.figure(2)
+        plt.plot(diff, color='#808080', marker='o')
+        plt.title(self.name2, fontdict=None)
+        plt.xlabel('iterations')
+        plt.grid('on')
+        plt.show()
+
+
+    def cal_Q_value(self, x, p, table):
+        value = 0
+        reward = self.get_reward(x)
+        for i in range(len(x)):
+            value += p[i] * (reward[i] + self.gamma * table[x[i]])
+        return value
+
+
+    def get_reward(self, x_next):
+        reward = []
+        for x in x_next:
+            if x in self.xG:
+                reward.append(10)
+            elif x in self.lose:
+                reward.append(-10)
+            else:
+                reward.append(0)
+        return reward
 
 
 if __name__ == '__main__':
-    x_Start = (5, 5)                # Starting node
-    x_Goal = (49, 5)                # Goal node
-    VI = Value_iteration(x_Start, x_Goal)
+    x_Start = (5, 5)
+    x_Goal = [(49, 5), (49, 25)]
+
+    VI = Value_iteration(x_Start, x_Goal)
+    [value_VI, policy_VI, diff_VI] = VI.iteration()
+    path_VI = VI.simulation(x_Start, x_Goal, policy_VI)
+
+    VI.animation(path_VI, diff_VI)