zhm-real il y a 5 ans
Parent
commit
4cbaa2367e

+ 17 - 3
Search-based Planning/.idea/workspace.xml

@@ -3,6 +3,19 @@
   <component name="ChangeListManager">
     <list default="true" id="025aff36-a6aa-4945-ab7e-b2c625055f47" name="Default Changelist" comment="">
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/a_star.py" beforeDir="false" afterPath="$PROJECT_DIR$/a_star.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/bfs.py" beforeDir="false" afterPath="$PROJECT_DIR$/bfs.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/dfs.py" beforeDir="false" afterPath="$PROJECT_DIR$/dfs.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/dijkstra.py" beforeDir="false" afterPath="$PROJECT_DIR$/dijkstra.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/plotting.py" beforeDir="false" afterPath="$PROJECT_DIR$/plotting.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/queue.py" beforeDir="false" afterPath="$PROJECT_DIR$/queue.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/Q-policy_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/Q-policy_iteration.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/Q-value_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/Q-value_iteration.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/env.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/env.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/gif/VI_E.gif" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/gif/VI_E.gif" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/motion_model.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/motion_model.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/policy_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/policy_iteration.py" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/tools.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/plotting.py" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../Stochastic Shortest Path/value_iteration.py" beforeDir="false" afterPath="$PROJECT_DIR$/../Stochastic Shortest Path/value_iteration.py" afterDir="false" />
     </list>
     <option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" />
@@ -26,6 +39,7 @@
     <ConfirmationsSetting value="2" id="Add" />
   </component>
   <component name="PropertiesComponent">
+    <property name="ASKED_ADD_EXTERNAL_FILES" value="true" />
     <property name="last_opened_file_path" value="$PROJECT_DIR$/../Stochastic Shortest Path" />
     <property name="restartRequiresConfirmation" value="false" />
     <property name="settings.editor.selected.configurable" value="com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable" />
@@ -42,7 +56,7 @@
       </list>
     </option>
   </component>
-  <component name="RunManager" selected="Python.a_star">
+  <component name="RunManager" selected="Python.dfs">
     <configuration name="a_star" type="PythonConfigurationType" factoryName="Python" temporary="true">
       <module name="Search-based Planning" />
       <option name="INTERPRETER_OPTIONS" value="" />
@@ -150,10 +164,10 @@
     </configuration>
     <recent_temporary>
       <list>
-        <item itemvalue="Python.a_star" />
-        <item itemvalue="Python.dijkstra" />
         <item itemvalue="Python.dfs" />
         <item itemvalue="Python.bfs" />
+        <item itemvalue="Python.dijkstra" />
+        <item itemvalue="Python.a_star" />
         <item itemvalue="Python.searching" />
       </list>
     </recent_temporary>

BIN
Search-based Planning/__pycache__/plotting.cpython-37.pyc


+ 3 - 3
Search-based Planning/a_star.py

@@ -8,20 +8,20 @@ import queue
 import plotting
 import env
 
-
 class Astar:
     def __init__(self, x_start, x_goal, heuristic_type):
         self.xI, self.xG = x_start, x_goal
 
         self.Env = env.Env()
+        self.plotting = plotting.Plotting(self.xI, self.xG)
+
         self.u_set = self.Env.motions                               # feasible input set
         self.obs = self.Env.obs                                     # position of obstacles
 
         [self.path, self.policy, self.visited] = self.searching(self.xI, self.xG, heuristic_type)
 
         self.fig_name = "A* Algorithm"
-        plotting.animation(self.xI, self.xG, self.obs,
-                           self.path, self.visited, self.fig_name)  # animation generate
+        self.plotting.animation(self.path, self.visited, self.fig_name)  # animation generate
 
 
     def searching(self, xI, xG, heuristic_type):

+ 4 - 3
Search-based Planning/bfs.py

@@ -8,19 +8,20 @@ import queue
 import plotting
 import env
 
-
 class BFS:
     def __init__(self, x_start, x_goal):
         self.xI, self.xG = x_start, x_goal
 
         self.Env = env.Env()
+        self.plotting = plotting.Plotting(self.xI, self.xG)
+
         self.u_set = self.Env.motions                               # feasible input set
         self.obs = self.Env.obs                                     # position of obstacles
+
         [self.path, self.policy, self.visited] = self.searching(self.xI, self.xG)
 
         self.fig_name = "Breadth-first Searching"
-        plotting.animation(self.xI, self.xG, self.obs,
-                           self.path, self.visited, self.fig_name)  # animation generate
+        self.plotting.animation(self.path, self.visited, self.fig_name)  # animation generate
 
 
     def searching(self, xI, xG):

+ 4 - 3
Search-based Planning/dfs.py

@@ -8,19 +8,20 @@ import queue
 import plotting
 import env
 
-
 class DFS:
     def __init__(self, x_start, x_goal):
         self.xI, self.xG = x_start, x_goal
 
         self.Env = env.Env()
+        self.plotting = plotting.Plotting(self.xI, self.xG)
+
         self.u_set = self.Env.motions                   # feasible input set
         self.obs = self.Env.obs                         # position of obstacles
+
         [self.path, self.policy, self.visited] = self.searching(self.xI, self.xG)
 
         self.fig_name = "Depth-first Searching"
-        plotting.animation(self.xI, self.xG, self.obs,
-                           self.path, self.visited, self.fig_name)      # animation generate
+        self.plotting.animation(self.path, self.visited, self.fig_name)      # animation generate
 
 
     def searching(self, xI, xG):

+ 4 - 3
Search-based Planning/dijkstra.py

@@ -8,19 +8,20 @@ import queue
 import env
 import plotting
 
-
 class Dijkstra:
     def __init__(self, x_start, x_goal):
         self.xI, self.xG = x_start, x_goal
 
         self.Env = env.Env()
+        self.plotting = plotting.Plotting(self.xI, self.xG)
+
         self.u_set = self.Env.motions                               # feasible input set
         self.obs = self.Env.obs                                     # position of obstacles
+
         [self.path, self.policy, self.visited] = self.searching(self.xI, self.xG)
 
         self.fig_name = "Dijkstra's Algorithm"
-        plotting.animation(self.xI, self.xG, self.obs,
-                           self.path, self.visited, self.fig_name)  # animation generate
+        self.plotting.animation(self.path, self.visited, self.fig_name)  # animation generate
 
 
     def searching(self, xI, xG):

+ 53 - 47
Search-based Planning/plotting.py

@@ -5,52 +5,58 @@
 """
 
 import matplotlib.pyplot as plt
+import env
 
+class Plotting():
+    def __init__(self, xI, xG):
+        self.xI, self.xG = xI, xG
+        self.env = env.Env()
+        self.obs = self.env.obs_map()
 
-def animation(xI, xG, obs, path, visited, name):
-    """
-    generate animation for exploring process of algorithm
-
-    :param xI: starting state
-    :param xG: goal state
-    :param obs: obstacle map
-    :param path: optimal path
-    :param visited: visited nodes
-    :param name: name of this figure
-    :return: animation
-    """
-
-    visited.remove(xI)
-    path.remove(xI)
-    path.remove(xG)
-
-    # plot gridworld
-    obs_x = [obs[i][0] for i in range(len(obs))]
-    obs_y = [obs[i][1] for i in range(len(obs))]
-    plt.plot(xI[0], xI[1], "bs")
-    plt.plot(xG[0], xG[1], "gs")
-    plt.plot(obs_x, obs_y, "sk")
-    plt.title(name)
-    plt.axis("equal")
-
-    # animation for the exploring order of visited nodes
-    count = 0
-    for x in visited:
-        count += 1
-        plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
-        plt.gcf().canvas.mpl_connect('key_release_event',
-                                     lambda event: [exit(0) if event.key == 'escape' else None])
-        if count < 500: length = 20
-        elif count < 700: length = 30
-        else: length = 50
-
-        if count % length == 0: plt.pause(0.001)
-
-    # plot optimal path
-    path_x = [path[i][0] for i in range(len(path))]
-    path_y = [path[i][1] for i in range(len(path))]
-    plt.plot(path_x, path_y, linewidth='3', color='r', marker='o')
-
-    # show animation
-    plt.pause(0.01)
-    plt.show()
+
+    def animation(self, path, visited, name):
+        self.plot_grid(name)
+        self.plot_visited(visited)
+        self.plot_path(path)
+
+
+    def plot_grid(self, name):
+        obs_x = [self.obs[i][0] for i in range(len(self.obs))]
+        obs_y = [self.obs[i][1] for i in range(len(self.obs))]
+
+        plt.plot(self.xI[0], self.xI[1], "bs")
+        plt.plot(self.xG[0], self.xG[1], "gs")
+        plt.plot(obs_x, obs_y, "sk")
+        plt.title(name)
+        plt.axis("equal")
+
+
+    def plot_visited(self, visited):
+        visited.remove(self.xI)
+        count = 0
+
+        for x in visited:
+            count += 1
+            plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
+            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
+            [exit(0) if event.key == 'escape' else None])
+
+            if count < len(visited) / 3:
+                length = 15
+            elif count < len(visited) * 2 / 3:
+                length = 30
+            else:
+                length = 45
+
+            if count % length == 0: plt.pause(0.001)
+
+
+    def plot_path(self, path):
+        path.remove(self.xI)
+        path.remove(self.xG)
+        path_x = [path[i][0] for i in range(len(path))]
+        path_y = [path[i][1] for i in range(len(path))]
+
+        plt.plot(path_x, path_y, linewidth='3', color='r', marker='o')
+        plt.pause(0.01)
+        plt.show()

+ 9 - 15
Search-based Planning/queue.py

@@ -9,14 +9,12 @@
 import collections
 import heapq
 
-
-"""
+class QueueFIFO:
+    """
     Class: QueueFIFO
     Description: QueueFIFO is designed for First-in-First-out rule.
-"""
-
+    """
 
-class QueueFIFO:
     def __init__(self):
         self.queue = collections.deque()
 
@@ -29,14 +27,12 @@ class QueueFIFO:
     def get(self):
         return self.queue.popleft()        # leave from front
 
-
-"""
+class QueueLIFO:
+    """
     Class: QueueLIFO
     Description: QueueLIFO is designed for Last-in-First-out rule.
-"""
-
+    """
 
-class QueueLIFO:
     def __init__(self):
         self.queue = collections.deque()
 
@@ -49,14 +45,12 @@ class QueueLIFO:
     def get(self):
         return self.queue.pop()            # leave from back
 
-
-"""
+class QueuePrior:
+    """
     Class: QueuePrior
     Description: QueuePrior reorders elements using value [priority]
-"""
+    """
 
-
-class QueuePrior:
     def __init__(self):
         self.queue = []
 

+ 40 - 38
Stochastic Shortest Path/Q-policy_iteration.py

@@ -5,26 +5,33 @@
 """
 
 import env
-import tools
+import plotting
 import motion_model
 
-import matplotlib.pyplot as plt
 import numpy as np
 import copy
 import sys
 
-
 class Q_policy_iteration:
     def __init__(self, x_start, x_goal):
-        self.u_set = motion_model.motions                       # feasible input set
         self.xI, self.xG = x_start, x_goal
-        self.e = 0.001                                          # threshold for convergence
-        self.gamma = 0.9                                        # discount factor
-        self.obs = env.obs_map()                                # position of obstacles
-        self.lose = env.lose_map()                              # position of lose states
-        self.name1 = "Q-policy_iteration, e=" + str(self.e) \
-                     + ", gamma=" + str(self.gamma)
-        self.name2 = "convergence of error"
+        self.e = 0.001  # threshold for convergence
+        self.gamma = 0.9  # discount factor
+
+        self.env = env.Env(self.xI, self.xG)
+        self.motion = motion_model.Motion_model(self.xI, self.xG)
+        self.plotting = plotting.Plotting(self.xI, self.xG)
+
+        self.u_set = self.env.motions  # feasible input set
+        self.stateSpace = self.env.stateSpace  # state space
+        self.obs = self.env.obs_map()  # position of obstacles
+        self.lose = self.env.lose_map()  # position of lose states
+
+        self.name1 = "Q-policy_iteration, gamma=" + str(self.gamma)
+
+        [self.value, self.policy] = self.iteration()
+        self.path = self.extract_path(self.xI, self.xG, self.policy)
+        self.plotting.animation(self.path, self.name1)
 
 
     def policy_evaluation(self, policy, value):
@@ -43,7 +50,7 @@ class Q_policy_iteration:
             for x in value:
                 if x not in self.xG:
                     for k in range(len(self.u_set)):
-                        [x_next, p_next] = motion_model.move_prob(x, self.u_set[k], self.obs)
+                        [x_next, p_next] = self.motion.move_next(x, self.u_set[k])
                         v_Q = self.cal_Q_value(x_next, p_next, policy, value)
                         v_diff = abs(value[x][k] - v_Q)
                         value[x][k] = v_Q
@@ -63,7 +70,7 @@ class Q_policy_iteration:
         :return: improved policy
         """
 
-        for x in value:
+        for x in self.stateSpace:
             if x not in self.xG:
                 policy[x] = int(np.argmax(value[x]))
 
@@ -80,11 +87,9 @@ class Q_policy_iteration:
         policy = {}
         count = 0
 
-        for i in range(env.x_range):
-            for j in range(env.y_range):
-                if (i, j) not in self.obs:
-                    Q_table[(i, j)] = [0, 0, 0, 0]              # initialize Q_value table
-                    policy[(i, j)] = 0                          # initialize policy table
+        for x in self.stateSpace:
+            Q_table[x] = [0, 0, 0, 0]              # initialize Q_value table
+            policy[x] = 0                          # initialize policy table
 
         while True:
             count += 1
@@ -109,45 +114,44 @@ class Q_policy_iteration:
         """
 
         value = 0
-        reward = env.get_reward(x, self.xG, self.lose)                  # get reward of next state
+        reward = self.env.get_reward(x)                  # get reward of next state
         for i in range(len(x)):
             value += p[i] * (reward[i] + self.gamma * table[x[i]][policy[x[i]]])
 
         return value
 
 
-    def simulation(self, xI, xG, policy):
+    def extract_path(self, xI, xG, policy):
         """
-        simulate a path using converged policy.
+        extract path from converged policy.
 
         :param xI: starting state
-        :param xG: goal state
+        :param xG: goal states
         :param policy: converged policy
-        :return: simulation path
+        :return: path
         """
 
-        plt.figure(1)                                               # path animation
-        tools.show_map(xI, xG, self.obs, self.lose, self.name1)     # show background
-
-        x, path = xI, []
-        while True:
+        x, path = xI, [xI]
+        while x not in xG:
             u = self.u_set[policy[x]]
             x_next = (x[0] + u[0], x[1] + u[1])
             if x_next in self.obs:
-                print("Collision!")                                 # collision: simulation failed
+                print("Collision! Please run again!")
+                break
             else:
+                path.append(x_next)
                 x = x_next
-                if x_next in xG:
-                    break
-                else:
-                    tools.plot_dots(x)                              # each state in optimal path
-                    path.append(x)
-        plt.show()
-
         return path
 
 
     def message(self, count):
+        """
+        print important message.
+
+        :param count: iteration numbers
+        :return: print
+        """
+
         print("starting state: ", self.xI)
         print("goal states: ", self.xG)
         print("condition for convergence: ", self.e)
@@ -160,5 +164,3 @@ if __name__ == '__main__':
     x_Goal = [(49, 5), (49, 25)]
 
     QPI = Q_policy_iteration(x_Start, x_Goal)
-    [value_QPI, policy_QPI] = QPI.iteration()
-    path_QPI = QPI.simulation(x_Start, x_Goal, policy_QPI)

+ 47 - 41
Stochastic Shortest Path/Q-value_iteration.py

@@ -5,28 +5,37 @@
 """
 
 import env
-import tools
+import plotting
 import motion_model
 
-import matplotlib.pyplot as plt
 import numpy as np
 import sys
 
-
 class Q_value_iteration:
     def __init__(self, x_start, x_goal):
-        self.u_set = motion_model.motions                           # feasible input set
         self.xI, self.xG = x_start, x_goal
-        self.e = 0.001                                              # threshold for convergence
-        self.gamma = 0.9                                            # discount factor
-        self.obs = env.obs_map()                                    # position of obstacles
-        self.lose = env.lose_map()                                  # position of lose states
-        self.name1 = "Q-value_iteration, e=" + str(self.e) \
-                     + ", gamma=" + str(self.gamma)
-        self.name2 = "convergence of error"
+        self.e = 0.001  # threshold for convergence
+        self.gamma = 0.9  # discount factor
+
+        self.env = env.Env(self.xI, self.xG)
+        self.motion = motion_model.Motion_model(self.xI, self.xG)
+        self.plotting = plotting.Plotting(self.xI, self.xG)
+
+        self.u_set = self.env.motions  # feasible input set
+        self.stateSpace = self.env.stateSpace  # state space
+        self.obs = self.env.obs_map()  # position of obstacles
+        self.lose = self.env.lose_map()  # position of lose states
 
+        self.name1 = "Q-value_iteration, gamma=" + str(self.gamma)
+        self.name2 = "converge process, e=" + str(self.e)
 
-    def iteration(self):
+        [self.value, self.policy, self.diff] = self.iteration(self.xI, self.xG)
+        self.path = self.extract_path(self.xI, self.xG, self.policy)
+        self.plotting.animation(self.path, self.name1)
+        self.plotting.plot_diff(self.diff, self.name2)
+
+
+    def iteration(self, xI, xG):
         """
         Q_value_iteration
         :return: converged Q table and policy
@@ -34,35 +43,35 @@ class Q_value_iteration:
 
         Q_table = {}
         policy = {}
+        diff = []
         delta = sys.maxsize
         count = 0
 
-        for i in range(env.x_range):
-            for j in range(env.y_range):
-                if (i, j) not in self.obs:
-                    Q_table[(i, j)] = [0, 0, 0, 0]          # initialize Q_table
+        for x in self.stateSpace:
+            Q_table[x] = [0, 0, 0, 0]          # initialize Q_table
 
         while delta > self.e:                               # convergence condition
             count += 1
             x_value = 0
-            for x in Q_table:
+            for x in self.stateSpace:
                 if x not in x_Goal:
                     for k in range(len(self.u_set)):
-                        [x_next, p_next] = motion_model.move_prob(x, self.u_set[k], self.obs)
+                        [x_next, p_next] = self.motion.move_next(x, self.u_set[k])
                         Q_value = self.cal_Q_value(x_next, p_next, Q_table)
                         v_diff = abs(Q_table[x][k] - Q_value)
                         Q_table[x][k] = Q_value
                         if v_diff > 0:
                             x_value = max(x_value, v_diff)
+            diff.append(x_value)
             delta = x_value
 
-        for x in Q_table:
-            if x not in x_Goal:
+        for x in self.stateSpace:
+            if x not in xG:
                 policy[x] = np.argmax(Q_table[x])
 
         self.message(count)
 
-        return Q_table, policy
+        return Q_table, policy, diff
 
 
     def cal_Q_value(self, x, p, table):
@@ -76,45 +85,44 @@ class Q_value_iteration:
         """
 
         value = 0
-        reward = env.get_reward(x, self.xG, self.lose)                  # get reward of next state
+        reward = self.env.get_reward(x)                  # get reward of next state
         for i in range(len(x)):
             value += p[i] * (reward[i] + self.gamma * max(table[x[i]]))
 
         return value
 
 
-    def simulation(self, xI, xG, policy):
+    def extract_path(self, xI, xG, policy):
         """
-        simulate a path using converged policy.
+        extract path from converged policy.
 
         :param xI: starting state
-        :param xG: goal state
+        :param xG: goal states
         :param policy: converged policy
-        :return: simulation path
+        :return: path
         """
 
-        plt.figure(1)                                                   # path animation
-        tools.show_map(xI, xG, self.obs, self.lose, self.name1)         # show background
-
-        x, path = xI, []
-        while True:
+        x, path = xI, [xI]
+        while x not in xG:
             u = self.u_set[policy[x]]
             x_next = (x[0] + u[0], x[1] + u[1])
             if x_next in self.obs:
-                print("Collision!")                                     # collision: simulation failed
+                print("Collision! Please run again!")
+                break
             else:
+                path.append(x_next)
                 x = x_next
-                if x_next in xG:
-                    break
-                else:
-                    tools.plot_dots(x)                                  # each state in optimal path
-                    path.append(x)
-        plt.show()
-
         return path
 
 
     def message(self, count):
+        """
+        print important message.
+
+        :param count: iteration numbers
+        :return: print
+        """
+
         print("starting state: ", self.xI)
         print("goal states: ", self.xG)
         print("condition for convergence: ", self.e)
@@ -127,5 +135,3 @@ if __name__ == '__main__':
     x_Goal = [(49, 5), (49, 25)]
 
     QVI = Q_value_iteration(x_Start, x_Goal)
-    [value_QVI, policy_QVI] = QVI.iteration()
-    path_VI = QVI.simulation(x_Start, x_Goal, policy_QVI)

BIN
Stochastic Shortest Path/__pycache__/env.cpython-37.pyc


BIN
Stochastic Shortest Path/__pycache__/motion_model.cpython-37.pyc


BIN
Stochastic Shortest Path/__pycache__/plotting.cpython-37.pyc


BIN
Stochastic Shortest Path/__pycache__/tools.cpython-37.pyc


+ 91 - 65
Stochastic Shortest Path/env.py

@@ -4,68 +4,94 @@
 @author: huiming zhou
 """
 
-x_range, y_range = 51, 31     # size of background
-
-
-def obs_map():
-    """
-    Initialize obstacles' positions
-
-    :return: map of obstacles
-    """
-
-    obs = []
-    for i in range(x_range):
-        obs.append((i, 0))
-    for i in range(x_range):
-        obs.append((i, y_range - 1))
-
-    for i in range(y_range):
-        obs.append((0, i))
-    for i in range(y_range):
-        obs.append((x_range - 1, i))
-
-    for i in range(10, 21):
-        obs.append((i, 15))
-    for i in range(15):
-        obs.append((20, i))
-
-    for i in range(15, 30):
-        obs.append((30, i))
-    for i in range(16):
-        obs.append((40, i))
-
-    return obs
-
-
-def lose_map():
-    """
-    Initialize losing states' positions
-    :return: losing states
-    """
-
-    lose = []
-    for i in range(25, 36):
-        lose.append((i, 13))
-
-    return lose
-
-
-def get_reward(x_next, xG, lose):
-    """
-    calculate reward of next state
-
-    :param x_next: next state
-    :return: reward
-    """
-
-    reward = []
-    for x in x_next:
-        if x in xG:
-            reward.append(10)           # reward : 10, for goal states
-        elif x in lose:
-            reward.append(-10)          # reward : -10, for lose states
-        else:
-            reward.append(0)            # reward : 0, for other states
-
-    return reward
+class Env():
+    def __init__(self, xI, xG):
+        self.x_range = 51           # size of background
+        self.y_range = 31
+        self.motions = [(1, 0), (-1, 0), (0, 1), (0, -1)]
+        self.xI = xI
+        self.xG = xG
+        self.obs = self.obs_map()
+        self.lose = self.lose_map()
+        self.stateSpace = self.state_space()
+
+
+    def obs_map(self):
+        """
+        Initialize obstacles' positions
+        :return: map of obstacles
+        """
+
+        x = self.x_range
+        y = self.y_range
+        obs = []
+
+        for i in range(x):
+            obs.append((i, 0))
+        for i in range(x):
+            obs.append((i, y - 1))
+
+        for i in range(y):
+            obs.append((0, i))
+        for i in range(y):
+            obs.append((x - 1, i))
+
+        for i in range(10, 21):
+            obs.append((i, 15))
+        for i in range(15):
+            obs.append((20, i))
+
+        for i in range(15, 30):
+            obs.append((30, i))
+        for i in range(16):
+            obs.append((40, i))
+
+        return obs
+
+
+    def lose_map(self):
+        """
+        Initialize losing states' positions
+        :return: losing states
+        """
+
+        lose = []
+        for i in range(25, 36):
+            lose.append((i, 13))
+
+        return lose
+
+
+    def state_space(self):
+        """
+        generate state space
+        :return: state space
+        """
+
+        state_space = []
+        for i in range(self.x_range):
+            for j in range(self.y_range):
+                if (i, j) not in self.obs:
+                    state_space.append((i, j))
+
+        return state_space
+
+
+    def get_reward(self, x_next):
+        """
+        calculate reward of next state
+
+        :param x_next: next state
+        :return: reward
+        """
+
+        reward = []
+        for x in x_next:
+            if x in self.xG:
+                reward.append(10)  # reward : 10, for goal states
+            elif x in self.lose:
+                reward.append(-10)  # reward : -10, for lose states
+            else:
+                reward.append(0)  # reward : 0, for other states
+
+        return reward

BIN
Stochastic Shortest Path/gif/VI_E.gif


+ 37 - 31
Stochastic Shortest Path/motion_model.py

@@ -3,36 +3,42 @@
 """
 @author: huiming zhou
 """
-import numpy as np
-
-motions = [(1, 0), (-1, 0), (0, 1), (0, -1)]                # feasible motion sets
-
-def move_prob(x, u, obs, eta = 0.2):
-    """
-    Motion model of robots,
-
-    :param x: current state (node)
-    :param u: input
-    :param obs: obstacle map
-    :param eta: noise in motion model
-    :return: next states and corresponding probability
-    """
-
-    p_next = [1 - eta, eta / 2, eta / 2]
-    x_next = []
-    if u == (0, 1):
-        u_real = [(0, 1), (-1, 0), (1, 0)]
-    elif u == (0, -1):
-        u_real = [(0, -1), (-1, 0), (1, 0)]
-    elif u == (-1, 0):
-        u_real = [(-1, 0), (0, 1), (0, -1)]
-    else:
-        u_real = [(1, 0), (0, 1), (0, -1)]
-
-    for act in u_real:
-        if (x[0] + act[0], x[1] + act[1]) in obs:
-            x_next.append(x)
+
+import env
+
+class Motion_model():
+    def __init__(self, xI, xG):
+        self.env = env.Env(xI, xG)
+        self.obs = self.env.obs_map()
+
+
+    def move_next(self, x, u, eta=0.2):
+        """
+        Motion model of robots,
+
+        :param x: current state (node)
+        :param u: input
+        :param obs: obstacle map
+        :param eta: noise in motion model
+        :return: next states and corresponding probability
+        """
+
+        p_next = [1 - eta, eta / 2, eta / 2]
+        x_next = []
+        if u == (0, 1):
+            u_real = [(0, 1), (-1, 0), (1, 0)]
+        elif u == (0, -1):
+            u_real = [(0, -1), (-1, 0), (1, 0)]
+        elif u == (-1, 0):
+            u_real = [(-1, 0), (0, 1), (0, -1)]
         else:
-            x_next.append((x[0] + act[0], x[1] + act[1]))
+            u_real = [(1, 0), (0, 1), (0, -1)]
+
+        for act in u_real:
+            x_check = (x[0] + act[0], x[1] + act[1])
+            if x_check in self.obs:
+                x_next.append(x)
+            else:
+                x_next.append(x_check)
 
-    return x_next, p_next
+        return x_next, p_next

+ 124 - 0
Stochastic Shortest Path/plotting.py

@@ -0,0 +1,124 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+@author: huiming zhou
+"""
+
+import matplotlib.pyplot as plt
+import env
+
+class Plotting():
+    def __init__(self, xI, xG):
+        self.xI, self.xG = xI, xG
+        self.env = env.Env(self.xI, self.xG)
+        self.obs = self.env.obs_map()
+        self.lose = self.env.lose_map()
+
+
+    def animation(self, path, name):
+        """
+        animation.
+
+        :param path: optimal path
+        :param name: tile of figure
+        :return: an animation
+        """
+
+        plt.figure(1)
+        self.plot_grid(name)
+        self.plot_lose()
+        self.plot_path(path)
+
+
+    def plot_grid(self, name):
+        """
+        plot the obstacles in environment.
+
+        :param name: title of figure
+        :return: plot
+        """
+
+        obs_x = [self.obs[i][0] for i in range(len(self.obs))]
+        obs_y = [self.obs[i][1] for i in range(len(self.obs))]
+
+        plt.plot(self.xI[0], self.xI[1], "bs")
+        for x in self.xG:
+            plt.plot(x[0], x[1], "gs")
+
+        plt.plot(obs_x, obs_y, "sk")
+        plt.title(name)
+        plt.axis("equal")
+
+
+    def plot_lose(self):
+        """
+        plot losing states in environment.
+        :return: a plot
+        """
+
+        lose_x = [self.lose[i][0] for i in range(len(self.lose))]
+        lose_y = [self.lose[i][1] for i in range(len(self.lose))]
+
+        plt.plot(lose_x, lose_y, color = '#A52A2A', marker = 's')
+
+
+    def plot_visited(self, visited):
+        """
+        animation of order of visited nodes.
+
+        :param visited: visited nodes
+        :return: animation
+        """
+
+        visited.remove(self.xI)
+        count = 0
+
+        for x in visited:
+            count += 1
+            plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
+            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
+            [exit(0) if event.key == 'escape' else None])
+
+            if count < len(visited) / 3:
+                length = 15
+            elif count < len(visited) * 2 / 3:
+                length = 30
+            else:
+                length = 45
+
+            if count % length == 0: plt.pause(0.001)
+
+
+    def plot_path(self, path):
+        path.remove(self.xI)
+        for x in self.xG:
+            if x in path:
+                path.remove(x)
+
+        for x in path:
+            plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
+            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
+            [exit(0) if event.key == 'escape' else None])
+            plt.pause(0.001)
+        plt.show()
+        plt.pause(0.5)
+
+
+    def plot_diff(self, diff, name):
+        plt.figure(2)
+        plt.title(name, fontdict=None)
+        plt.xlabel('iterations')
+        plt.ylabel('difference of successive iterations')
+        plt.grid('on')
+
+        count = 0
+        for x in diff:
+            plt.plot(count, x, color='#808080', marker='o')  # plot dots for animation
+            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
+            [exit(0) if event.key == 'escape' else None])
+            plt.pause(0.07)
+            count += 1
+
+        plt.plot(diff, color='#808080')
+        plt.pause(0.01)
+        plt.show()

+ 43 - 41
Stochastic Shortest Path/policy_iteration.py

@@ -5,26 +5,33 @@
 """
 
 import env
-import tools
+import plotting
 import motion_model
 
-import matplotlib.pyplot as plt
 import numpy as np
-import copy
 import sys
-
+import copy
 
 class Policy_iteration:
     def __init__(self, x_start, x_goal):
-        self.u_set = motion_model.motions                       # feasible input set
         self.xI, self.xG = x_start, x_goal
-        self.e = 0.001                                          # threshold for convergence
-        self.gamma = 0.9                                        # discount factor
-        self.obs = env.obs_map()                                # position of obstacles
-        self.lose = env.lose_map()                              # position of lose states
-        self.name1 = "policy_iteration, e=" + str(self.e) \
-                     + ", gamma=" + str(self.gamma)
-        self.name2 = "convergence of error, e=" + str(self.e)
+        self.e = 0.001  # threshold for convergence
+        self.gamma = 0.9  # discount factor
+
+        self.env = env.Env(self.xI, self.xG)
+        self.motion = motion_model.Motion_model(self.xI, self.xG)
+        self.plotting = plotting.Plotting(self.xI, self.xG)
+
+        self.u_set = self.env.motions  # feasible input set
+        self.stateSpace = self.env.stateSpace  # state space
+        self.obs = self.env.obs_map()  # position of obstacles
+        self.lose = self.env.lose_map()  # position of lose states
+
+        self.name1 = "policy_iteration, gamma=" + str(self.gamma)
+
+        [self.value, self.policy] = self.iteration()
+        self.path = self.extract_path(self.xI, self.xG, self.policy)
+        self.plotting.animation(self.path, self.name1)
 
 
     def policy_evaluation(self, policy, value):
@@ -40,9 +47,9 @@ class Policy_iteration:
 
         while delta > self.e:           # convergence condition
             x_value = 0
-            for x in value:
+            for x in self.stateSpace:
                 if x not in self.xG:
-                    [x_next, p_next] = motion_model.move_prob(x, policy[x], self.obs)
+                    [x_next, p_next] = self.motion.move_next(x, policy[x])
                     v_Q = self.cal_Q_value(x_next, p_next, value)
                     v_diff = abs(value[x] - v_Q)
                     value[x] = v_Q
@@ -62,11 +69,11 @@ class Policy_iteration:
         :return: improved policy table
         """
 
-        for x in value:
+        for x in self.stateSpace:
             if x not in self.xG:
                 value_list = []
                 for u in self.u_set:
-                    [x_next, p_next] = motion_model.move_prob(x, u, self.obs)
+                    [x_next, p_next] = self.motion.move_next(x, u)
                     value_list.append(self.cal_Q_value(x_next, p_next, value))
                 policy[x] = self.u_set[int(np.argmax(value_list))]
 
@@ -83,11 +90,9 @@ class Policy_iteration:
         policy = {}
         count = 0
 
-        for i in range(env.x_range):
-            for j in range(env.y_range):
-                if (i, j) not in self.obs:
-                    value_table[(i, j)] = 0             # initialize value table
-                    policy[(i, j)] = self.u_set[0]      # initialize policy table
+        for x in self.stateSpace:
+            value_table[x] = 0             # initialize value table
+            policy[x] = self.u_set[0]      # initialize policy table
 
         while True:
             count += 1
@@ -112,45 +117,44 @@ class Policy_iteration:
         """
 
         value = 0
-        reward = env.get_reward(x, self.xG, self.lose)                  # get reward of next state
+        reward = self.env.get_reward(x)                  # get reward of next state
         for i in range(len(x)):
             value += p[i] * (reward[i] + self.gamma * table[x[i]])      # cal Q-value
 
         return value
 
 
-    def simulation(self, xI, xG, policy):
+    def extract_path(self, xI, xG, policy):
         """
-        simulate a path using converged policy.
+        extract path from converged policy.
 
         :param xI: starting state
-        :param xG: goal state
+        :param xG: goal states
         :param policy: converged policy
-        :return: simulation path
+        :return: path
         """
 
-        plt.figure(1)                                               # path animation
-        tools.show_map(xI, xG, self.obs, self.lose, self.name1)     # show background
-
-        x, path = xI, []
-        while True:
+        x, path = xI, [xI]
+        while x not in xG:
             u = policy[x]
             x_next = (x[0] + u[0], x[1] + u[1])
             if x_next in self.obs:
-                print("Collision!")                                 # collision: simulation failed
+                print("Collision! Please run again!")
+                break
             else:
+                path.append(x_next)
                 x = x_next
-                if x_next in xG:
-                    break
-                else:
-                    tools.plot_dots(x)                              # each state in optimal path
-                    path.append(x)
-        plt.show()
-
         return path
 
 
     def message(self, count):
+        """
+        print important message.
+
+        :param count: iteration numbers
+        :return: print
+        """
+
         print("starting state: ", self.xI)
         print("goal states: ", self.xG)
         print("condition for convergence: ", self.e)
@@ -163,5 +167,3 @@ if __name__ == '__main__':
     x_Goal = [(49, 5), (49, 25)]
 
     PI = Policy_iteration(x_Start, x_Goal)
-    [value_PI, policy_PI] = PI.iteration()
-    path_PI = PI.simulation(x_Start, x_Goal, policy_PI)

+ 0 - 94
Stochastic Shortest Path/tools.py

@@ -1,94 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-@author: huiming zhou
-"""
-
-import matplotlib.pyplot as plt
-
-
-def extract_path(xI, xG, parent, actions):
-    """
-    Extract the path based on the relationship of nodes.
-
-    :param xI: Starting node
-    :param xG: Goal node
-    :param parent: Relationship between nodes
-    :param actions: Action needed for transfer between two nodes
-    :return: The planning path
-    """
-
-    path_back = [xG]
-    acts_back = [actions[xG]]
-    x_current = xG
-    while True:
-        x_current = parent[x_current]
-        path_back.append(x_current)
-        acts_back.append(actions[x_current])
-        if x_current == xI: break
-
-    return list(reversed(path_back)), list(reversed(acts_back))
-
-
-def showPath(xI, xG, path):
-    """
-    Plot the path.
-
-    :param xI: Starting node
-    :param xG: Goal node
-    :param path: Planning path
-    :return: A plot
-    """
-
-    path.remove(xI)
-    path.remove(xG)
-    path_x = [path[i][0] for i in range(len(path))]
-    path_y = [path[i][1] for i in range(len(path))]
-    plt.plot(path_x, path_y, linewidth='5', color='r', linestyle='-')
-    plt.pause(0.001)
-    plt.show()
-
-
-def show_map(xI, xG, obs_map, lose_map, name):
-    """
-    Plot the background you designed.
-
-    :param xI: starting state
-    :param xG: goal states
-    :param obs_map: positions of obstacles
-    :param lose_map: positions of losing state
-    :param name: name of this figure
-    :return: a figure
-    """
-
-    obs_x = [obs_map[i][0] for i in range(len(obs_map))]
-    obs_y = [obs_map[i][1] for i in range(len(obs_map))]
-
-    lose_x = [lose_map[i][0] for i in range(len(lose_map))]
-    lose_y = [lose_map[i][1] for i in range(len(lose_map))]
-
-    plt.plot(xI[0], xI[1], "bs")                                    # plot starting state (blue)
-
-    for x in xG:
-        plt.plot(x[0], x[1], "gs")                                  # plot goal states (green)
-
-    plt.plot(obs_x, obs_y, "sk")                                    # plot obstacles (black)
-    plt.plot(lose_x, lose_y, marker = 's', color = '#A52A2A')       # plot losing states (grown)
-    plt.title(name, fontdict=None)
-    plt.axis("equal")
-
-
-def plot_dots(x):
-    """
-    Plot state x for animation
-
-    :param x: current node
-    :return: a plot
-    """
-
-    plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')    # plot dots for animation
-    plt.gcf().canvas.mpl_connect('key_release_event',
-                                 lambda event: [exit(0) if event.key == 'escape' else None])
-    plt.pause(0.001)
-
-

+ 49 - 68
Stochastic Shortest Path/value_iteration.py

@@ -5,27 +5,37 @@
 """
 
 import env
-import tools
+import plotting
 import motion_model
 
-import matplotlib.pyplot as plt
 import numpy as np
 import sys
 
-
 class Value_iteration:
     def __init__(self, x_start, x_goal):
-        self.u_set = motion_model.motions                      # feasible input set
         self.xI, self.xG = x_start, x_goal
-        self.e = 0.001                                         # threshold for convergence
-        self.gamma = 0.9                                       # discount factor
-        self.obs = env.obs_map()                               # position of obstacles
-        self.lose = env.lose_map()                             # position of lose states
+        self.e = 0.001                                              # threshold for convergence
+        self.gamma = 0.9                                            # discount factor
+
+        self.env = env.Env(self.xI, self.xG)
+        self.motion = motion_model.Motion_model(self.xI, self.xG)
+        self.plotting = plotting.Plotting(self.xI, self.xG)
+
+        self.u_set = self.env.motions                               # feasible input set
+        self.stateSpace = self.env.stateSpace                       # state space
+        self.obs = self.env.obs_map()                               # position of obstacles
+        self.lose = self.env.lose_map()                             # position of lose states
+
         self.name1 = "value_iteration, gamma=" + str(self.gamma)
         self.name2 = "converge process, e=" + str(self.e)
 
+        [self.value, self.policy, self.diff] = self.iteration(self.xI, self.xG)
+        self.path = self.extract_path(self.xI, self.xG, self.policy)
+        self.plotting.animation(self.path, self.name1)
+        self.plotting.plot_diff(self.diff, self.name2)
+
 
-    def iteration(self):
+    def iteration(self, xI, xG):
         """
         value_iteration.
 
@@ -38,28 +48,26 @@ class Value_iteration:
         delta = sys.maxsize                     # initialize maximum difference
         count = 0                               # iteration times
 
-        for i in range(env.x_range):
-            for j in range(env.y_range):
-                if (i, j) not in self.obs:
-                    value_table[(i, j)] = 0     # initialize value table for feasible states
+        for x in self.stateSpace:               # initialize value table for feasible states
+            value_table[x] = 0
 
         while delta > self.e:                   # converged condition
             count += 1
             x_value = 0
-            for x in value_table:
-                if x not in self.xG:
+            for x in self.stateSpace:
+                if x not in xG:
                     value_list = []
                     for u in self.u_set:
-                        [x_next, p_next] = motion_model.move_prob(x, u, self.obs)           # recall motion model
+                        [x_next, p_next] = self.motion.move_next(x, u)                      # recall motion model
                         value_list.append(self.cal_Q_value(x_next, p_next, value_table))    # cal Q value
                     policy[x] = self.u_set[int(np.argmax(value_list))]                      # update policy
                     v_diff = abs(value_table[x] - max(value_list))                          # maximum difference
                     value_table[x] = max(value_list)                                        # update value table
-                    if v_diff > 0:
-                        x_value = max(x_value, v_diff)
+                    x_value = max(x_value, v_diff)
             delta = x_value                                                                 # update delta
             diff.append(delta)
-        self.message(count)                                                                 # print key parameters
+
+        self.message(count)                                                                 # print messages
 
         return value_table, policy, diff
 
@@ -75,68 +83,44 @@ class Value_iteration:
         """
 
         value = 0
-        reward = env.get_reward(x, self.xG, self.lose)                  # get reward of next state
+        reward = self.env.get_reward(x)                                 # get reward of next state
         for i in range(len(x)):
             value += p[i] * (reward[i] + self.gamma * table[x[i]])      # cal Q-value
 
         return value
 
 
-    def simulation(self, xI, xG, policy, diff):
+    def extract_path(self, xI, xG, policy):
         """
-        simulate a path using converged policy.
+        extract path from converged policy.
 
         :param xI: starting state
-        :param xG: goal state
+        :param xG: goal states
         :param policy: converged policy
-        :return: simulation path
+        :return: path
         """
 
-        # plt.figure(1)                                               # path animation
-        # tools.show_map(xI, xG, self.obs, self.lose, self.name1)     # show background
-        #
-        # x, path = xI, []
-        # while True:
-        #     u = policy[x]
-        #     x_next = (x[0] + u[0], x[1] + u[1])
-        #     if x_next in self.obs:
-        #         print("Collision!")                                 # collision: simulation failed
-        #     else:
-        #         x = x_next
-        #         if x_next in xG: break
-        #         else:
-        #             tools.plot_dots(x)                              # each state in optimal path
-        #             path.append(x)
-        # plt.pause(1)
-
-        # plt.figure(2)                                               # difference between two successive iteration
-        # plt.plot(diff, color='#808080', marker='o')
-        fig, ax = plt.subplots()
-
-        ax.set_xlim(-5, 60)
-        ax.set_ylim(-1, 9)
-        plt.title(self.name2, fontdict=None)
-        plt.xlabel('iterations')
-        plt.ylabel('difference of successive iterations')
-        plt.grid('on')
-
-        count = 0
-
-        for x in diff:
-            plt.plot(count, x, color='#808080', marker='o')  # plot dots for animation
-            plt.gcf().canvas.mpl_connect('key_release_event',
-                                         lambda event: [exit(0) if event.key == 'escape' else None])
-            plt.pause(0.07)
-            count += 1
+        x, path = xI, [xI]
+        while x not in xG:
+            u = policy[x]
+            x_next = (x[0] + u[0], x[1] + u[1])
+            if x_next in self.obs:
+                print("Collision! Please run again!")
+                break
+            else:
+                path.append(x_next)
+                x = x_next
+        return path
 
-        plt.plot(diff, color='#808080')
-        plt.pause(0.01)
-        plt.show()
 
-        return
+    def message(self, count):
+        """
+        print important message.
 
+        :param count: iteration numbers
+        :return: print
+        """
 
-    def message(self, count):
         print("starting state: ", self.xI)
         print("goal states: ", self.xG)
         print("condition for convergence: ", self.e)
@@ -149,6 +133,3 @@ if __name__ == '__main__':
     x_Goal = [(49, 5), (49, 25)]        # goal states
 
     VI = Value_iteration(x_Start, x_Goal)
-    [value_VI, policy_VI, diff_VI] = VI.iteration()
-    path_VI = VI.simulation(x_Start, x_Goal, policy_VI, diff_VI)
-