zhm-real пре 5 година
родитељ
комит
baa8bd1900

+ 2 - 6
Search-based Planning/.idea/workspace.xml

@@ -3,15 +3,11 @@
   <component name="ChangeListManager">
     <list default="true" id="025aff36-a6aa-4945-ab7e-b2c625055f47" name="Default Changelist" comment="">
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/a_star.py" beforeDir="false" afterPath="$PROJECT_DIR$/a_star.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/bfs.py" beforeDir="false" afterPath="$PROJECT_DIR$/bfs.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/dfs.py" beforeDir="false" afterPath="$PROJECT_DIR$/dfs.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/dijkstra.py" beforeDir="false" afterPath="$PROJECT_DIR$/dijkstra.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/env.py" beforeDir="false" afterPath="$PROJECT_DIR$/env.py" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/tools.py" beforeDir="false" afterPath="$PROJECT_DIR$/tools.py" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/Q-policy_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/Q-policy_iteration.py" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/Q-value_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/Q-value_iteration.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/env.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/env.py" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/policy_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/policy_iteration.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/tools.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/tools.py" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/value_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/value_iteration.py" afterDir="false" />
     </list>
     <option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" />

+ 80 - 38
Stochastic Shortest Path/Q-policy_iteration.py

@@ -22,14 +22,23 @@ class Q_policy_iteration:
         self.gamma = 0.9                                        # discount factor
         self.obs = env.obs_map()                                # position of obstacles
         self.lose = env.lose_map()                              # position of lose states
-        self.name1 = "Q-policy_iteration, e=" + str(self.e) + ", gamma=" + str(self.gamma)
+        self.name1 = "Q-policy_iteration, e=" + str(self.e) \
+                     + ", gamma=" + str(self.gamma)
         self.name2 = "convergence of error"
 
 
     def policy_evaluation(self, policy, value):
+        """
+        evaluation process using current policy.
+
+        :param policy: current policy
+        :param value: value table
+        :return: converged value table
+        """
+
         delta = sys.maxsize
 
-        while delta > self.e:
+        while delta > self.e:               # convergence condition
             x_value = 0
             for x in value:
                 if x not in self.xG:
@@ -41,73 +50,108 @@ class Q_policy_iteration:
                         if v_diff > 0:
                             x_value = max(x_value, v_diff)
             delta = x_value
+
         return value
 
 
     def policy_improvement(self, policy, value):
+        """
+        policy improvement process.
+
+        :param policy: policy table
+        :param value: current value table
+        :return: improved policy
+        """
+
         for x in value:
             if x not in self.xG:
                 policy[x] = int(np.argmax(value[x]))
+
         return policy
 
 
     def iteration(self):
+        """
+        Q-policy iteration
+        :return: converged policy and its value table.
+        """
+
         Q_table = {}
         policy = {}
+        count = 0
 
         for i in range(env.x_range):
             for j in range(env.y_range):
                 if (i, j) not in self.obs:
-                    Q_table[(i, j)] = [0, 0, 0, 0]
-                    policy[(i, j)] = 0
+                    Q_table[(i, j)] = [0, 0, 0, 0]              # initialize Q_value table
+                    policy[(i, j)] = 0                          # initialize policy table
 
         while True:
+            count += 1
             policy_back = copy.deepcopy(policy)
-            Q_table = self.policy_evaluation(policy, Q_table)
-            policy = self.policy_improvement(policy, Q_table)
-            if policy_back == policy: break
-        return Q_table, policy
+            Q_table = self.policy_evaluation(policy, Q_table)   # evaluation process
+            policy = self.policy_improvement(policy, Q_table)   # improvement process
+            if policy_back == policy: break                     # convergence condition
 
+        self.message(count)
 
-    def simulation(self, xI, xG, policy):
-        path = []
-        x = xI
-        while x not in xG:
-            u = self.u_set[policy[x]]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next not in self.obs:
-                x = x_next
-                path.append(x)
-        path.pop()
-        return path
+        return Q_table, policy
 
 
-    def animation(self, path):
-        plt.figure(1)
-        tools.show_map(self.xI, self.xG, self.obs, self.lose, self.name1)
-        for x in path:
-            tools.plot_dots(x)
-        plt.show()
+    def cal_Q_value(self, x, p, policy, table):
+        """
+        cal Q_value.
 
+        :param x: next state vector
+        :param p: probability of each state
+        :param table: value table
+        :return: Q-value
+        """
 
-    def cal_Q_value(self, x, p, policy, table):
         value = 0
-        reward = self.get_reward(x)
+        reward = env.get_reward(x, self.xG, self.lose)                  # get reward of next state
         for i in range(len(x)):
             value += p[i] * (reward[i] + self.gamma * table[x[i]][policy[x[i]]])
         return value
 
 
-    def get_reward(self, x_next):
-        reward = []
-        for x in x_next:
-            if x in self.xG:
-                reward.append(10)
-            elif x in self.lose:
-                reward.append(-10)
+    def simulation(self, xI, xG, policy):
+        """
+        simulate a path using converged policy.
+
+        :param xI: starting state
+        :param xG: goal state
+        :param policy: converged policy
+        :return: simulation path
+        """
+
+        plt.figure(1)                                               # path animation
+        tools.show_map(xI, xG, self.obs, self.lose, self.name1)     # show background
+
+        x, path = xI, []
+        while True:
+            u = self.u_set[policy[x]]
+            x_next = (x[0] + u[0], x[1] + u[1])
+            if x_next in self.obs:
+                print("Collision!")                                 # collision: simulation failed
             else:
-                reward.append(0)
-        return reward
+                x = x_next
+                if x_next in xG:
+                    break
+                else:
+                    tools.plot_dots(x)                              # each state in optimal path
+                    path.append(x)
+        plt.show()
+
+        return path
+
+
+    def message(self, count):
+        print("starting state: ", self.xI)
+        print("goal states: ", self.xG)
+        print("condition for convergence: ", self.e)
+        print("discount factor: ", self.gamma)
+        print("iteration times: ", count)
 
 
 if __name__ == '__main__':
@@ -117,5 +161,3 @@ if __name__ == '__main__':
     QPI = Q_policy_iteration(x_Start, x_Goal)
     [value_QPI, policy_QPI] = QPI.iteration()
     path_QPI = QPI.simulation(x_Start, x_Goal, policy_QPI)
-
-    QPI.animation(path_QPI)

+ 61 - 35
Stochastic Shortest Path/Q-value_iteration.py

@@ -12,6 +12,7 @@ import matplotlib.pyplot as plt
 import numpy as np
 import sys
 
+
 class Q_value_iteration:
     def __init__(self, x_start, x_goal):
         self.u_set = motion_model.motions                           # feasible input set
@@ -20,21 +21,29 @@ class Q_value_iteration:
         self.gamma = 0.9                                            # discount factor
         self.obs = env.obs_map()                                    # position of obstacles
         self.lose = env.lose_map()                                  # position of lose states
-        self.name1 = "Q-value_iteration, e=" + str(self.e) + ", gamma=" + str(self.gamma)
+        self.name1 = "Q-value_iteration, e=" + str(self.e) \
+                     + ", gamma=" + str(self.gamma)
         self.name2 = "convergence of error"
 
 
     def iteration(self):
+        """
+        Q_value_iteration
+        :return: converged Q table and policy
+        """
+
         Q_table = {}
         policy = {}
         delta = sys.maxsize
+        count = 0
 
         for i in range(env.x_range):
             for j in range(env.y_range):
                 if (i, j) not in self.obs:
-                    Q_table[(i, j)] = [0, 0, 0, 0]
+                    Q_table[(i, j)] = [0, 0, 0, 0]          # initialize Q_table
 
-        while delta > self.e:
+        while delta > self.e:                               # convergence condition
+            count += 1
             x_value = 0
             for x in Q_table:
                 if x not in x_Goal:
@@ -50,48 +59,67 @@ class Q_value_iteration:
         for x in Q_table:
             if x not in x_Goal:
                 policy[x] = np.argmax(Q_table[x])
-        return policy
 
+        self.message(count)
 
-    def simulation(self, xI, xG, policy):
-        path = []
-        x = xI
-        while x not in xG:
-            u = self.u_set[policy[x]]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next not in self.obs:
-                x = x_next
-                path.append(x)
-        path.pop()
-        return path
+        return Q_table, policy
 
 
-    def animation(self, path):
-        plt.figure(1)
-        tools.show_map(self.xI, self.xG, self.obs, self.lose, self.name1)
-        for x in path:
-            tools.plot_dots(x)
-        plt.show()
+    def cal_Q_value(self, x, p, table):
+        """
+        cal Q_value.
 
+        :param x: next state vector
+        :param p: probability of each state
+        :param table: value table
+        :return: Q-value
+        """
 
-    def cal_Q_value(self, x, p, table):
         value = 0
-        reward = self.get_reward(x)
+        reward = env.get_reward(x, self.xG, self.lose)                  # get reward of next state
         for i in range(len(x)):
             value += p[i] * (reward[i] + self.gamma * max(table[x[i]]))
+
         return value
 
 
-    def get_reward(self, x_next):
-        reward = []
-        for x in x_next:
-            if x in self.xG:
-                reward.append(10)
-            elif x in self.lose:
-                reward.append(-10)
+    def simulation(self, xI, xG, policy):
+        """
+        simulate a path using converged policy.
+
+        :param xI: starting state
+        :param xG: goal state
+        :param policy: converged policy
+        :return: simulation path
+        """
+
+        plt.figure(1)                                                   # path animation
+        tools.show_map(xI, xG, self.obs, self.lose, self.name1)         # show background
+
+        x, path = xI, []
+        while True:
+            u = self.u_set[policy[x]]
+            x_next = (x[0] + u[0], x[1] + u[1])
+            if x_next in self.obs:
+                print("Collision!")                                     # collision: simulation failed
             else:
-                reward.append(0)
-        return reward
+                x = x_next
+                if x_next in xG:
+                    break
+                else:
+                    tools.plot_dots(x)                                  # each state in optimal path
+                    path.append(x)
+        plt.show()
+
+        return path
+
+
+    def message(self, count):
+        print("starting state: ", self.xI)
+        print("goal states: ", self.xG)
+        print("condition for convergence: ", self.e)
+        print("discount factor: ", self.gamma)
+        print("iteration times: ", count)
 
 
 if __name__ == '__main__':
@@ -99,7 +127,5 @@ if __name__ == '__main__':
     x_Goal = [(49, 5), (49, 25)]
 
     QVI = Q_value_iteration(x_Start, x_Goal)
-    policy_QVI = QVI.iteration()
+    [value_QVI, policy_QVI] = QVI.iteration()
     path_VI = QVI.simulation(x_Start, x_Goal, policy_QVI)
-
-    QVI.animation(path_VI)

BIN
Stochastic Shortest Path/__pycache__/env.cpython-37.pyc


BIN
Stochastic Shortest Path/__pycache__/tools.cpython-37.pyc


+ 24 - 2
Stochastic Shortest Path/env.py

@@ -4,10 +4,9 @@
 @author: huiming zhou
 """
 
-import matplotlib.pyplot as plt
-
 x_range, y_range = 51, 31     # size of background
 
+
 def obs_map():
     """
     Initialize obstacles' positions
@@ -40,9 +39,32 @@ def obs_map():
 
 
 def lose_map():
+    """
+    Initialize losing states' positions
+    :return: losing states
+    """
+
     lose = []
     for i in range(25, 36):
         lose.append((i, 13))
     return lose
 
 
+def get_reward(x_next, xG, lose):
+    """
+    calculate reward of next state
+
+    :param x_next: next state
+    :return: reward
+    """
+
+    reward = []
+    for x in x_next:
+        if x in xG:
+            reward.append(10)           # reward : 10, for goal states
+        elif x in lose:
+            reward.append(-10)          # reward : -10, for lose states
+        else:
+            reward.append(0)            # reward : 0, for other states
+
+    return reward

+ 85 - 44
Stochastic Shortest Path/policy_iteration.py

@@ -22,18 +22,26 @@ class Policy_iteration:
         self.gamma = 0.9                                        # discount factor
         self.obs = env.obs_map()                                # position of obstacles
         self.lose = env.lose_map()                              # position of lose states
-        self.name1 = "policy_iteration, e=" + str(self.e) + ", gamma=" + str(self.gamma)
-        self.name2 = "convergence of error"
+        self.name1 = "policy_iteration, e=" + str(self.e) \
+                     + ", gamma=" + str(self.gamma)
+        self.name2 = "convergence of error, e=" + str(self.e)
 
 
     def policy_evaluation(self, policy, value):
+        """
+        Evaluate current policy.
+
+        :param policy: current policy
+        :param value: value table
+        :return: new value table generated by current policy
+        """
+
         delta = sys.maxsize
 
-        while delta > self.e:
+        while delta > self.e:           # convergence condition
             x_value = 0
             for x in value:
-                if x in self.xG: continue
-                else:
+                if x not in self.xG:
                     [x_next, p_next] = motion_model.move_prob(x, policy[x], self.obs)
                     v_Q = self.cal_Q_value(x_next, p_next, value)
                     v_diff = abs(value[x] - v_Q)
@@ -41,78 +49,113 @@ class Policy_iteration:
                     if v_diff > 0:
                         x_value = max(x_value, v_diff)
             delta = x_value
+
         return value
 
 
     def policy_improvement(self, policy, value):
+        """
+        Improve policy using current value table.
+
+        :param policy: policy table
+        :param value: current value table
+        :return: improved policy table
+        """
+
         for x in value:
-            if x in self.xG: continue
-            else:
+            if x not in self.xG:
                 value_list = []
                 for u in self.u_set:
                     [x_next, p_next] = motion_model.move_prob(x, u, self.obs)
                     value_list.append(self.cal_Q_value(x_next, p_next, value))
                 policy[x] = self.u_set[int(np.argmax(value_list))]
+
         return policy
 
 
     def iteration(self):
+        """
+        polity iteration: using evaluate and improvement process until convergence.
+        :return: value table and converged policy.
+        """
+
         value_table = {}
         policy = {}
+        count = 0
 
         for i in range(env.x_range):
             for j in range(env.y_range):
                 if (i, j) not in self.obs:
-                    value_table[(i, j)] = 0
-                    policy[(i, j)] = self.u_set[0]
+                    value_table[(i, j)] = 0             # initialize value table
+                    policy[(i, j)] = self.u_set[0]      # initialize policy table
 
         while True:
+            count += 1
             policy_back = copy.deepcopy(policy)
-            value_table = self.policy_evaluation(policy, value_table)
-            policy = self.policy_improvement(policy, value_table)
-            if policy_back == policy: break
-        return value_table, policy
+            value_table = self.policy_evaluation(policy, value_table)       # evaluation process
+            policy = self.policy_improvement(policy, value_table)           # policy improvement process
+            if policy_back == policy: break                                 # convergence condition
 
+        self.message(count)
 
-    def simulation(self, xI, xG, policy):
-        path = []
-        x = xI
-        while x not in xG:
-            u = policy[x]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next not in self.obs:
-                x = x_next
-                path.append(x)
-        path.pop()
-        return path
+        return value_table, policy
 
 
-    def animation(self, path):
-        plt.figure(1)
-        tools.show_map(self.xI, self.xG, self.obs, self.lose, self.name1)
-        for x in path:
-            tools.plot_dots(x)
-        plt.show()
+    def cal_Q_value(self, x, p, table):
+        """
+        cal Q_value.
 
+        :param x: next state vector
+        :param p: probability of each state
+        :param table: value table
+        :return: Q-value
+        """
 
-    def cal_Q_value(self, x, p, table):
         value = 0
-        reward = self.get_reward(x)
+        reward = env.get_reward(x, self.xG, self.lose)                  # get reward of next state
         for i in range(len(x)):
-            value += p[i] * (reward[i] + self.gamma * table[x[i]])
+            value += p[i] * (reward[i] + self.gamma * table[x[i]])      # cal Q-value
+
         return value
 
 
-    def get_reward(self, x_next):
-        reward = []
-        for x in x_next:
-            if x in self.xG:
-                reward.append(10)
-            elif x in self.lose:
-                reward.append(-10)
+    def simulation(self, xI, xG, policy):
+        """
+        simulate a path using converged policy.
+
+        :param xI: starting state
+        :param xG: goal state
+        :param policy: converged policy
+        :return: simulation path
+        """
+
+        plt.figure(1)                                               # path animation
+        tools.show_map(xI, xG, self.obs, self.lose, self.name1)     # show background
+
+        x, path = xI, []
+        while True:
+            u = policy[x]
+            x_next = (x[0] + u[0], x[1] + u[1])
+            if x_next in self.obs:
+                print("Collision!")                                 # collision: simulation failed
             else:
-                reward.append(0)
-        return reward
+                x = x_next
+                if x_next in xG:
+                    break
+                else:
+                    tools.plot_dots(x)                              # each state in optimal path
+                    path.append(x)
+        plt.show()
+
+        return path
+
+
+    def message(self, count):
+        print("starting state: ", self.xI)
+        print("goal states: ", self.xG)
+        print("condition for convergence: ", self.e)
+        print("discount factor: ", self.gamma)
+        print("iteration times: ", count)
 
 
 if __name__ == '__main__':
@@ -122,5 +165,3 @@ if __name__ == '__main__':
     PI = Policy_iteration(x_Start, x_Goal)
     [value_PI, policy_PI] = PI.iteration()
     path_PI = PI.simulation(x_Start, x_Goal, policy_PI)
-
-    PI.animation(path_PI)

+ 28 - 7
Stochastic Shortest Path/tools.py

@@ -5,11 +5,12 @@
 """
 
 import matplotlib.pyplot as plt
-import env
+
 
 def extract_path(xI, xG, parent, actions):
     """
     Extract the path based on the relationship of nodes.
+
     :param xI: Starting node
     :param xG: Goal node
     :param parent: Relationship between nodes
@@ -25,17 +26,20 @@ def extract_path(xI, xG, parent, actions):
         path_back.append(x_current)
         acts_back.append(actions[x_current])
         if x_current == xI: break
+
     return list(reversed(path_back)), list(reversed(acts_back))
 
 
 def showPath(xI, xG, path):
     """
     Plot the path.
+
     :param xI: Starting node
     :param xG: Goal node
     :param path: Planning path
     :return: A plot
     """
+
     path.remove(xI)
     path.remove(xG)
     path_x = [path[i][0] for i in range(len(path))]
@@ -46,26 +50,43 @@ def showPath(xI, xG, path):
 
 
 def show_map(xI, xG, obs_map, lose_map, name):
+    """
+    Plot the background you designed.
+
+    :param xI: starting state
+    :param xG: goal states
+    :param obs_map: positions of obstacles
+    :param lose_map: positions of losing state
+    :param name: name of this figure
+    :return: a figure
+    """
+
     obs_x = [obs_map[i][0] for i in range(len(obs_map))]
     obs_y = [obs_map[i][1] for i in range(len(obs_map))]
 
     lose_x = [lose_map[i][0] for i in range(len(lose_map))]
     lose_y = [lose_map[i][1] for i in range(len(lose_map))]
 
-    plt.plot(xI[0], xI[1], "bs")
+    plt.plot(xI[0], xI[1], "bs")                                    # plot starting state (blue)
 
     for x in xG:
-        plt.plot(x[0], x[1], "gs")
+        plt.plot(x[0], x[1], "gs")                                  # plot goal states (green)
 
-    plt.plot(obs_x, obs_y, "sk")
-    plt.plot(lose_x, lose_y, marker = 's', color = '#A52A2A')
+    plt.plot(obs_x, obs_y, "sk")                                    # plot obstacles (black)
+    plt.plot(lose_x, lose_y, marker = 's', color = '#A52A2A')       # plot losing states (grown)
     plt.title(name, fontdict=None)
-    plt.grid(True)
     plt.axis("equal")
 
 
 def plot_dots(x):
-    plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
+    """
+    Plot state x for animation
+
+    :param x: current node
+    :return: a plot
+    """
+
+    plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')    # plot dots for animation
     plt.gcf().canvas.mpl_connect('key_release_event',
                                  lambda event: [exit(0) if event.key == 'escape' else None])
     plt.pause(0.001)

+ 6 - 25
Stochastic Shortest Path/value_iteration.py

@@ -23,7 +23,7 @@ class Value_iteration:
         self.lose = env.lose_map()                             # position of lose states
         self.name1 = "value_iteration, e=" + str(self.e) \
                      + ", gamma=" + str(self.gamma)
-        self.name2 = "convergence of error, e=0.001"
+        self.name2 = "convergence of error, e=" + str(self.e)
 
 
     def iteration(self):
@@ -60,7 +60,7 @@ class Value_iteration:
                         x_value = max(x_value, v_diff)
             delta = x_value                                                                 # update delta
             diff.append(delta)
-        self.message(count)
+        self.message(count)                                                                 # print key parameters
 
         return value_table, policy, diff
 
@@ -76,33 +76,13 @@ class Value_iteration:
         """
 
         value = 0
-        reward = self.get_reward(x)                                     # get reward of next state
+        reward = env.get_reward(x, self.xG, self.lose)                  # get reward of next state
         for i in range(len(x)):
             value += p[i] * (reward[i] + self.gamma * table[x[i]])      # cal Q-value
 
         return value
 
 
-    def get_reward(self, x_next):
-        """
-        calculate reward of next state
-
-        :param x_next: next state
-        :return: reward
-        """
-
-        reward = []
-        for x in x_next:
-            if x in self.xG:
-                reward.append(10)           # reward : 10, for goal states
-            elif x in self.lose:
-                reward.append(-10)          # reward : -10, for lose states
-            else:
-                reward.append(0)            # reward : 0, for other states
-
-        return reward
-
-
     def simulation(self, xI, xG, policy, diff):
         """
         simulate a path using converged policy.
@@ -128,6 +108,7 @@ class Value_iteration:
                 else:
                     tools.plot_dots(x)                              # each state in optimal path
                     path.append(x)
+        plt.pause(1)
 
         plt.figure(2)                                               # difference between two successive iteration
         plt.plot(diff, color='#808080', marker='o')
@@ -148,8 +129,8 @@ class Value_iteration:
 
 
 if __name__ == '__main__':
-    x_Start = (5, 5)
-    x_Goal = [(49, 5), (49, 25)]
+    x_Start = (5, 5)                    # starting state
+    x_Goal = [(49, 5), (49, 25)]        # goal states
 
     VI = Value_iteration(x_Start, x_Goal)
     [value_VI, policy_VI, diff_VI] = VI.iteration()