zhm-real 5 年 前
コミット
4fbfa21dbb
36 ファイル変更0 行追加1436 行削除
  1. 0 3
      Model-free Control/.idea/.gitignore
  2. 0 8
      Model-free Control/.idea/Model-free Control.iml
  3. 0 7
      Model-free Control/.idea/dictionaries/Huiming_Zhou.xml
  4. 0 6
      Model-free Control/.idea/inspectionProfiles/profiles_settings.xml
  5. 0 4
      Model-free Control/.idea/misc.xml
  6. 0 8
      Model-free Control/.idea/modules.xml
  7. 0 6
      Model-free Control/.idea/vcs.xml
  8. 0 166
      Model-free Control/Q-learning.py
  9. 0 167
      Model-free Control/Sarsa.py
  10. BIN
      Model-free Control/__pycache__/env.cpython-37.pyc
  11. BIN
      Model-free Control/__pycache__/motion_model.cpython-37.pyc
  12. BIN
      Model-free Control/__pycache__/plotting.cpython-37.pyc
  13. 0 70
      Model-free Control/env.py
  14. BIN
      Model-free Control/gif/Qlearning.gif
  15. BIN
      Model-free Control/gif/SARSA.gif
  16. 0 38
      Model-free Control/motion_model.py
  17. 0 110
      Model-free Control/plotting.py
  18. 0 3
      Stochastic Shortest Path/.idea/.gitignore
  19. 0 11
      Stochastic Shortest Path/.idea/Stochastic Shortest Path.iml
  20. 0 6
      Stochastic Shortest Path/.idea/inspectionProfiles/profiles_settings.xml
  21. 0 4
      Stochastic Shortest Path/.idea/misc.xml
  22. 0 8
      Stochastic Shortest Path/.idea/modules.xml
  23. 0 6
      Stochastic Shortest Path/.idea/vcs.xml
  24. 0 155
      Stochastic Shortest Path/Q-policy_iteration.py
  25. 0 128
      Stochastic Shortest Path/Q-value_iteration.py
  26. BIN
      Stochastic Shortest Path/__pycache__/env.cpython-37.pyc
  27. BIN
      Stochastic Shortest Path/__pycache__/motion_model.cpython-37.pyc
  28. BIN
      Stochastic Shortest Path/__pycache__/plotting.cpython-37.pyc
  29. 0 87
      Stochastic Shortest Path/env.py
  30. BIN
      Stochastic Shortest Path/gif/VI.gif
  31. BIN
      Stochastic Shortest Path/gif/VI.jpeg
  32. BIN
      Stochastic Shortest Path/gif/VI_E.gif
  33. 0 38
      Stochastic Shortest Path/motion_model.py
  34. 0 113
      Stochastic Shortest Path/plotting.py
  35. 0 158
      Stochastic Shortest Path/policy_iteration.py
  36. 0 126
      Stochastic Shortest Path/value_iteration.py

+ 0 - 3
Model-free Control/.idea/.gitignore

@@ -1,3 +0,0 @@
-
-# Default ignored files
-/workspace.xml

+ 0 - 8
Model-free Control/.idea/Model-free Control.iml

@@ -1,8 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<module type="PYTHON_MODULE" version="4">
-  <component name="NewModuleRootManager">
-    <content url="file://$MODULE_DIR$" />
-    <orderEntry type="jdk" jdkName="Python 3.7 (Search-based Planning)" jdkType="Python SDK" />
-    <orderEntry type="sourceFolder" forTests="false" />
-  </component>
-</module>

+ 0 - 7
Model-free Control/.idea/dictionaries/Huiming_Zhou.xml

@@ -1,7 +0,0 @@
-<component name="ProjectDictionaryState">
-  <dictionary name="Huiming Zhou">
-    <words>
-      <w>sarsa</w>
-    </words>
-  </dictionary>
-</component>

+ 0 - 6
Model-free Control/.idea/inspectionProfiles/profiles_settings.xml

@@ -1,6 +0,0 @@
-<component name="InspectionProjectProfileManager">
-  <settings>
-    <option name="USE_PROJECT_PROFILE" value="false" />
-    <version value="1.0" />
-  </settings>
-</component>

+ 0 - 4
Model-free Control/.idea/misc.xml

@@ -1,4 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (Search-based Planning)" project-jdk-type="Python SDK" />
-</project>

+ 0 - 8
Model-free Control/.idea/modules.xml

@@ -1,8 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="ProjectModuleManager">
-    <modules>
-      <module fileurl="file://$PROJECT_DIR$/.idea/Model-free Control.iml" filepath="$PROJECT_DIR$/.idea/Model-free Control.iml" />
-    </modules>
-  </component>
-</project>

+ 0 - 6
Model-free Control/.idea/vcs.xml

@@ -1,6 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="VcsDirectoryMappings">
-    <mapping directory="$PROJECT_DIR$/.." vcs="Git" />
-  </component>
-</project>

+ 0 - 166
Model-free Control/Q-learning.py

@@ -1,166 +0,0 @@
-import env
-import plotting
-import motion_model
-
-import numpy as np
-
-
-class QLEARNING:
-    def __init__(self, x_start, x_goal):
-        self.xI, self.xG = x_start, x_goal
-        self.M = 500  # iteration numbers
-        self.gamma = 0.9  # discount factor
-        self.alpha = 0.5
-        self.epsilon = 0.1
-
-        self.env = env.Env(self.xI, self.xG)
-        self.motion = motion_model.Motion_model(self.xI, self.xG)
-        self.plotting = plotting.Plotting(self.xI, self.xG)
-
-        self.u_set = self.env.motions  # feasible input set
-        self.stateSpace = self.env.stateSpace  # state space
-        self.obs = self.env.obs_map()  # position of obstacles
-        self.lose = self.env.lose_map()  # position of lose states
-
-        self.name1 = "SARSA, M=" + str(self.M)
-
-        [self.value, self.policy] = self.Monte_Carlo(self.xI, self.xG)
-        self.path = self.extract_path(self.xI, self.xG, self.policy)
-        self.plotting.animation(self.path, self.name1)
-
-    def Monte_Carlo(self, xI, xG):
-        """
-        Monte_Carlo experiments
-
-        :return: Q_table, policy
-        """
-
-        Q_table = self.table_init()  # Q_table initialization
-        policy = {}  # policy table
-
-        for k in range(self.M):  # iterations
-            x = self.state_init()  # initial state
-            while x != xG:  # stop condition
-                u = self.epsilon_greedy(int(np.argmax(Q_table[x])), self.epsilon)  # epsilon_greedy policy
-                x_next = self.move_next(x, self.u_set[u])  # next state
-                reward = self.env.get_reward(x_next)  # reward observed
-                Q_table[x][u] = (1 - self.alpha) * Q_table[x][u] + \
-                                self.alpha * (reward + self.gamma * max(Q_table[x_next]))
-                x = x_next
-
-        for x in Q_table:
-            policy[x] = int(np.argmax(Q_table[x]))  # extract policy
-
-        return Q_table, policy
-
-    def table_init(self):
-        """
-        Initialize Q_table: Q(s, a)
-        :return: Q_table
-        """
-
-        Q_table = {}
-
-        for x in self.stateSpace:
-            u = []
-            if x not in self.obs:
-                for k in range(len(self.u_set)):
-                    if x == self.xG:
-                        u.append(0)
-                    else:
-                        u.append(np.random.random_sample())
-                    Q_table[x] = u
-        return Q_table
-
-    def state_init(self):
-        """
-        initialize a starting state
-        :return: starting state
-        """
-        while True:
-            i = np.random.randint(0, self.env.x_range - 1)
-            j = np.random.randint(0, self.env.y_range - 1)
-            if (i, j) not in self.obs:
-                return (i, j)
-
-    def epsilon_greedy(self, u, error):
-        """
-        generate a policy using epsilon_greedy algorithm
-
-        :param u: original input
-        :param error: epsilon value
-        :return: epsilon policy
-        """
-
-        if np.random.random_sample() < 3 / 4 * error:
-            u_e = u
-            while u_e == u:
-                p = np.random.random_sample()
-                if p < 0.25:
-                    u_e = 0
-                elif p < 0.5:
-                    u_e = 1
-                elif p < 0.75:
-                    u_e = 2
-                else:
-                    u_e = 3
-            return u_e
-        return u
-
-    def move_next(self, x, u):
-        """
-        get next state.
-
-        :param x: current state
-        :param u: input
-        :return: next state
-        """
-
-        x_next = (x[0] + u[0], x[1] + u[1])
-        if x_next in self.obs:
-            return x
-        return x_next
-
-    def extract_path(self, xI, xG, policy):
-        """
-        extract path from converged policy.
-
-        :param xI: starting state
-        :param xG: goal states
-        :param policy: converged policy
-        :return: path
-        """
-
-        x, path = xI, [xI]
-        while x != xG:
-            u = self.u_set[policy[x]]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next in self.obs:
-                print("Collision! Please run again!")
-                break
-            else:
-                path.append(x_next)
-                x = x_next
-        return path
-
-    def message(self):
-        """
-        print important message.
-
-        :param count: iteration numbers
-        :return: print
-        """
-
-        print("starting state: ", self.xI)
-        print("goal state: ", self.xG)
-        print("iteration numbers: ", self.M)
-        print("discount factor: ", self.gamma)
-        print("epsilon error: ", self.epsilon)
-        print("alpha: ", self.alpha)
-
-
-if __name__ == '__main__':
-    x_Start = (1, 1)
-    x_Goal = (12, 1)
-
-    Q_CALL = QLEARNING(x_Start, x_Goal)

+ 0 - 167
Model-free Control/Sarsa.py

@@ -1,167 +0,0 @@
-import env
-import plotting
-import motion_model
-
-import numpy as np
-
-
-class SARSA:
-    def __init__(self, x_start, x_goal):
-        self.xI, self.xG = x_start, x_goal
-        self.M = 500  # iteration numbers
-        self.gamma = 0.9  # discount factor
-        self.alpha = 0.5
-        self.epsilon = 0.1
-
-        self.env = env.Env(self.xI, self.xG)
-        self.motion = motion_model.Motion_model(self.xI, self.xG)
-        self.plotting = plotting.Plotting(self.xI, self.xG)
-
-        self.u_set = self.env.motions  # feasible input set
-        self.stateSpace = self.env.stateSpace  # state space
-        self.obs = self.env.obs_map()  # position of obstacles
-        self.lose = self.env.lose_map()  # position of lose states
-
-        self.name1 = "Q-learning, M=" + str(self.M)
-
-        [self.value, self.policy] = self.Monte_Carlo(self.xI, self.xG)
-        self.path = self.extract_path(self.xI, self.xG, self.policy)
-        self.plotting.animation(self.path, self.name1)
-
-    def Monte_Carlo(self, xI, xG):
-        """
-        Monte_Carlo experiments
-
-        :return: Q_table, policy
-        """
-
-        Q_table = self.table_init()  # Q_table initialization
-        policy = {}  # policy table
-
-        for k in range(self.M):  # iterations
-            x = self.state_init()  # initial state
-            u = self.epsilon_greedy(int(np.argmax(Q_table[x])), self.epsilon)
-            while x != xG:  # stop condition
-                x_next = self.move_next(x, self.u_set[u])  # next state
-                reward = self.env.get_reward(x_next)  # reward observed
-                u_next = self.epsilon_greedy(int(np.argmax(Q_table[x_next])), self.epsilon)
-                Q_table[x][u] = (1 - self.alpha) * Q_table[x][u] + \
-                                self.alpha * (reward + self.gamma * Q_table[x_next][u_next])
-                x, u = x_next, u_next
-
-        for x in Q_table:
-            policy[x] = int(np.argmax(Q_table[x]))  # extract policy
-
-        return Q_table, policy
-
-    def table_init(self):
-        """
-        Initialize Q_table: Q(s, a)
-        :return: Q_table
-        """
-
-        Q_table = {}
-
-        for x in self.stateSpace:
-            u = []
-            if x not in self.obs:
-                for k in range(len(self.u_set)):
-                    if x == self.xG:
-                        u.append(0)
-                    else:
-                        u.append(np.random.random_sample())
-                    Q_table[x] = u
-        return Q_table
-
-    def state_init(self):
-        """
-        initialize a starting state
-        :return: starting state
-        """
-        while True:
-            i = np.random.randint(0, self.env.x_range - 1)
-            j = np.random.randint(0, self.env.y_range - 1)
-            if (i, j) not in self.obs:
-                return (i, j)
-
-    def epsilon_greedy(self, u, error):
-        """
-        generate a policy using epsilon_greedy algorithm
-
-        :param u: original input
-        :param error: epsilon value
-        :return: epsilon policy
-        """
-
-        if np.random.random_sample() < 3 / 4 * error:
-            u_e = u
-            while u_e == u:
-                p = np.random.random_sample()
-                if p < 0.25:
-                    u_e = 0
-                elif p < 0.5:
-                    u_e = 1
-                elif p < 0.75:
-                    u_e = 2
-                else:
-                    u_e = 3
-            return u_e
-        return u
-
-    def move_next(self, x, u):
-        """
-        get next state.
-
-        :param x: current state
-        :param u: input
-        :return: next state
-        """
-
-        x_next = (x[0] + u[0], x[1] + u[1])
-        if x_next in self.obs:
-            return x
-        return x_next
-
-    def extract_path(self, xI, xG, policy):
-        """
-        extract path from converged policy.
-
-        :param xI: starting state
-        :param xG: goal states
-        :param policy: converged policy
-        :return: path
-        """
-
-        x, path = xI, [xI]
-        while x != xG:
-            u = self.u_set[policy[x]]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next in self.obs:
-                print("Collision! Please run again!")
-                return path
-            else:
-                path.append(x_next)
-                x = x_next
-        return path
-
-    def message(self):
-        """
-        print important message.
-
-        :param count: iteration numbers
-        :return: print
-        """
-
-        print("starting state: ", self.xI)
-        print("goal state: ", self.xG)
-        print("iteration numbers: ", self.M)
-        print("discount factor: ", self.gamma)
-        print("epsilon error: ", self.epsilon)
-        print("alpha: ", self.alpha)
-
-
-if __name__ == '__main__':
-    x_Start = (1, 1)
-    x_Goal = (12, 1)
-
-    SARSA_CALL = SARSA(x_Start, x_Goal)

BIN
Model-free Control/__pycache__/env.cpython-37.pyc


BIN
Model-free Control/__pycache__/motion_model.cpython-37.pyc


BIN
Model-free Control/__pycache__/plotting.cpython-37.pyc


+ 0 - 70
Model-free Control/env.py

@@ -1,70 +0,0 @@
-class Env:
-    def __init__(self, xI, xG):
-        self.x_range = 14                           # size of background
-        self.y_range = 6
-        self.motions = [(1, 0), (-1, 0), (0, 1), (0, -1)]
-        self.xI = xI
-        self.xG = xG
-        self.obs = self.obs_map()
-        self.lose = self.lose_map()
-        self.stateSpace = self.state_space()
-
-    def obs_map(self):
-        """
-        Initialize obstacles' positions
-
-        :return: map of obstacles
-        """
-        x = self.x_range
-        y = self.y_range
-        obs = []
-
-        for i in range(x):
-            obs.append((i, 0))
-        for i in range(x):
-            obs.append((i, y - 1))
-
-        for i in range(y):
-            obs.append((0, i))
-        for i in range(y):
-            obs.append((x - 1, i))
-
-        return obs
-
-    def lose_map(self):
-        """
-        Initialize losing states' positions
-        :return: losing states
-        """
-
-        lose = []
-        for i in range(2, 12):
-            lose.append((i, 1))
-
-        return lose
-
-    def state_space(self):
-        """
-        generate state space
-        :return: state space
-        """
-
-        state_space = []
-        for i in range(self.x_range):
-            for j in range(self.y_range):
-                if (i, j) not in self.obs:
-                    state_space.append((i, j))
-
-        return state_space
-
-    def get_reward(self, x_next):
-        """
-        calculate reward of next state
-
-        :param x_next: next state
-        :return: reward
-        """
-
-        if x_next in self.lose:
-            return -100                         # reward : -100, for lose states
-        return -1                               # reward : -1, for other states

BIN
Model-free Control/gif/Qlearning.gif


BIN
Model-free Control/gif/SARSA.gif


+ 0 - 38
Model-free Control/motion_model.py

@@ -1,38 +0,0 @@
-import env
-
-
-class Motion_model():
-    def __init__(self, xI, xG):
-        self.env = env.Env(xI, xG)
-        self.obs = self.env.obs_map()
-
-    def move_next(self, x, u, eta=0.2):
-        """
-        Motion model of robots,
-
-        :param x: current state (node)
-        :param u: input
-        :param obs: obstacle map
-        :param eta: noise in motion model
-        :return: next states and corresponding probability
-        """
-
-        p_next = [1 - eta, eta / 2, eta / 2]
-        x_next = []
-        if u == (0, 1):
-            u_real = [(0, 1), (-1, 0), (1, 0)]
-        elif u == (0, -1):
-            u_real = [(0, -1), (-1, 0), (1, 0)]
-        elif u == (-1, 0):
-            u_real = [(-1, 0), (0, 1), (0, -1)]
-        else:
-            u_real = [(1, 0), (0, 1), (0, -1)]
-
-        for act in u_real:
-            x_check = (x[0] + act[0], x[1] + act[1])
-            if x_check in self.obs:
-                x_next.append(x)
-            else:
-                x_next.append(x_check)
-
-        return x_next, p_next

+ 0 - 110
Model-free Control/plotting.py

@@ -1,110 +0,0 @@
-import matplotlib.pyplot as plt
-import env
-
-
-class Plotting():
-    def __init__(self, xI, xG):
-        self.xI, self.xG = xI, xG
-        self.env = env.Env(self.xI, self.xG)
-        self.obs = self.env.obs_map()
-        self.lose = self.env.lose_map()
-
-    def animation(self, path, name):
-        """
-        animation.
-
-        :param path: optimal path
-        :param name: tile of figure
-        :return: an animation
-        """
-
-        plt.figure(1)
-        self.plot_grid(name)
-        self.plot_lose()
-        self.plot_path(path)
-
-    def plot_grid(self, name):
-        """
-        plot the obstacles in environment.
-
-        :param name: title of figure
-        :return: plot
-        """
-
-        obs_x = [self.obs[i][0] for i in range(len(self.obs))]
-        obs_y = [self.obs[i][1] for i in range(len(self.obs))]
-
-        plt.plot(self.xI[0], self.xI[1], "bs", ms=24)
-        plt.plot(self.xG[0], self.xG[1], "gs", ms=24)
-
-        plt.plot(obs_x, obs_y, "sk", ms=24)
-        plt.title(name)
-        plt.axis("equal")
-
-    def plot_lose(self):
-        """
-        plot losing states in environment.
-        :return: a plot
-        """
-
-        lose_x = [self.lose[i][0] for i in range(len(self.lose))]
-        lose_y = [self.lose[i][1] for i in range(len(self.lose))]
-
-        plt.plot(lose_x, lose_y, color='#A52A2A', marker='s', ms=24)
-
-    def plot_visited(self, visited):
-        """
-        animation of order of visited nodes.
-
-        :param visited: visited nodes
-        :return: animation
-        """
-
-        visited.remove(self.xI)
-        count = 0
-
-        for x in visited:
-            count += 1
-            plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
-            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
-            [exit(0) if event.key == 'escape' else None])
-
-            if count < len(visited) / 3:
-                length = 15
-            elif count < len(visited) * 2 / 3:
-                length = 30
-            else:
-                length = 45
-
-            if count % length == 0: plt.pause(0.001)
-
-    def plot_path(self, path):
-        path.remove(self.xI)
-        path.remove(self.xG)
-
-        for x in path:
-            plt.plot(x[0], x[1], color='#808080', marker='o', ms=23)
-            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
-            [exit(0) if event.key == 'escape' else None])
-            plt.pause(0.001)
-        plt.show()
-        plt.pause(0.5)
-
-    def plot_diff(self, diff, name):
-        plt.figure(2)
-        plt.title(name, fontdict=None)
-        plt.xlabel('iterations')
-        plt.ylabel('difference of successive iterations')
-        plt.grid('on')
-
-        count = 0
-        for x in diff:
-            plt.plot(count, x, color='#808080', marker='o')  # plot dots for animation
-            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
-            [exit(0) if event.key == 'escape' else None])
-            plt.pause(0.07)
-            count += 1
-
-        plt.plot(diff, color='#808080')
-        plt.pause(0.01)
-        plt.show()

+ 0 - 3
Stochastic Shortest Path/.idea/.gitignore

@@ -1,3 +0,0 @@
-
-# Default ignored files
-/workspace.xml

+ 0 - 11
Stochastic Shortest Path/.idea/Stochastic Shortest Path.iml

@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<module type="PYTHON_MODULE" version="4">
-  <component name="NewModuleRootManager">
-    <content url="file://$MODULE_DIR$" />
-    <orderEntry type="jdk" jdkName="Python 3.7 (Search-based Planning)" jdkType="Python SDK" />
-    <orderEntry type="sourceFolder" forTests="false" />
-  </component>
-  <component name="TestRunnerService">
-    <option name="PROJECT_TEST_RUNNER" value="pytest" />
-  </component>
-</module>

+ 0 - 6
Stochastic Shortest Path/.idea/inspectionProfiles/profiles_settings.xml

@@ -1,6 +0,0 @@
-<component name="InspectionProjectProfileManager">
-  <settings>
-    <option name="USE_PROJECT_PROFILE" value="false" />
-    <version value="1.0" />
-  </settings>
-</component>

+ 0 - 4
Stochastic Shortest Path/.idea/misc.xml

@@ -1,4 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (Search-based Planning)" project-jdk-type="Python SDK" />
-</project>

+ 0 - 8
Stochastic Shortest Path/.idea/modules.xml

@@ -1,8 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="ProjectModuleManager">
-    <modules>
-      <module fileurl="file://$PROJECT_DIR$/.idea/Stochastic Shortest Path.iml" filepath="$PROJECT_DIR$/.idea/Stochastic Shortest Path.iml" />
-    </modules>
-  </component>
-</project>

+ 0 - 6
Stochastic Shortest Path/.idea/vcs.xml

@@ -1,6 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project version="4">
-  <component name="VcsDirectoryMappings">
-    <mapping directory="$PROJECT_DIR$/.." vcs="Git" />
-  </component>
-</project>

+ 0 - 155
Stochastic Shortest Path/Q-policy_iteration.py

@@ -1,155 +0,0 @@
-import env
-import plotting
-import motion_model
-
-import numpy as np
-import copy
-import sys
-
-
-class Q_policy_iteration:
-    def __init__(self, x_start, x_goal):
-        self.xI, self.xG = x_start, x_goal
-        self.e = 0.001  # threshold for convergence
-        self.gamma = 0.9  # discount factor
-
-        self.env = env.Env(self.xI, self.xG)  # class Env
-        self.motion = motion_model.Motion_model(self.xI, self.xG)  # class Motion_model
-        self.plotting = plotting.Plotting(self.xI, self.xG)  # class Plotting
-
-        self.u_set = self.env.motions  # feasible input set
-        self.stateSpace = self.env.stateSpace  # state space
-        self.obs = self.env.obs_map()  # position of obstacles
-        self.lose = self.env.lose_map()  # position of lose states
-
-        self.name1 = "Q-policy_iteration, gamma=" + str(self.gamma)
-
-        [self.value, self.policy] = self.iteration()
-        self.path = self.extract_path(self.xI, self.xG, self.policy)
-        self.plotting.animation(self.path, self.name1)
-
-    def policy_evaluation(self, policy, value):
-        """
-        evaluation process using current policy.
-
-        :param policy: current policy
-        :param value: value table
-        :return: converged value table
-        """
-
-        delta = sys.maxsize
-
-        while delta > self.e:  # convergence condition
-            x_value = 0
-            for x in value:
-                if x not in self.xG:
-                    for k in range(len(self.u_set)):
-                        [x_next, p_next] = self.motion.move_next(x, self.u_set[k])
-                        v_Q = self.cal_Q_value(x_next, p_next, policy, value)
-                        v_diff = abs(value[x][k] - v_Q)
-                        value[x][k] = v_Q
-                        if v_diff > 0:
-                            x_value = max(x_value, v_diff)
-            delta = x_value
-
-        return value
-
-    def policy_improvement(self, policy, value):
-        """
-        policy improvement process.
-
-        :param policy: policy table
-        :param value: current value table
-        :return: improved policy
-        """
-
-        for x in self.stateSpace:
-            if x not in self.xG:
-                policy[x] = int(np.argmax(value[x]))
-
-        return policy
-
-    def iteration(self):
-        """
-        Q-policy iteration
-        :return: converged policy and its value table.
-        """
-
-        Q_table = {}
-        policy = {}
-        count = 0
-
-        for x in self.stateSpace:
-            Q_table[x] = [0, 0, 0, 0]  # initialize Q_value table
-            policy[x] = 0  # initialize policy table
-
-        while True:
-            count += 1
-            policy_back = copy.deepcopy(policy)
-            Q_table = self.policy_evaluation(policy, Q_table)  # evaluation process
-            policy = self.policy_improvement(policy, Q_table)  # improvement process
-            if policy_back == policy: break  # convergence condition
-
-        self.message(count)
-
-        return Q_table, policy
-
-    def cal_Q_value(self, x, p, policy, table):
-        """
-        cal Q_value.
-
-        :param x: next state vector
-        :param p: probability of each state
-        :param table: value table
-        :return: Q-value
-        """
-
-        value = 0
-        reward = self.env.get_reward(x)  # get reward of next state
-        for i in range(len(x)):
-            value += p[i] * (reward[i] + self.gamma * table[x[i]][policy[x[i]]])
-
-        return value
-
-    def extract_path(self, xI, xG, policy):
-        """
-        extract path from converged policy.
-
-        :param xI: starting state
-        :param xG: goal states
-        :param policy: converged policy
-        :return: path
-        """
-
-        x, path = xI, [xI]
-        while x not in xG:
-            u = self.u_set[policy[x]]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next in self.obs:
-                print("Collision! Please run again!")
-                break
-            else:
-                path.append(x_next)
-                x = x_next
-        return path
-
-    def message(self, count):
-        """
-        print important message.
-
-        :param count: iteration numbers
-        :return: print
-        """
-
-        print("starting state: ", self.xI)
-        print("goal states: ", self.xG)
-        print("condition for convergence: ", self.e)
-        print("discount factor: ", self.gamma)
-        print("iteration times: ", count)
-
-
-if __name__ == '__main__':
-    x_Start = (5, 5)
-    x_Goal = [(49, 5), (49, 25)]
-
-    QPI = Q_policy_iteration(x_Start, x_Goal)

+ 0 - 128
Stochastic Shortest Path/Q-value_iteration.py

@@ -1,128 +0,0 @@
-import env
-import plotting
-import motion_model
-
-import numpy as np
-import sys
-
-
-class Q_value_iteration:
-    def __init__(self, x_start, x_goal):
-        self.xI, self.xG = x_start, x_goal
-        self.e = 0.001  # threshold for convergence
-        self.gamma = 0.9  # discount factor
-
-        self.env = env.Env(self.xI, self.xG)  # class Env
-        self.motion = motion_model.Motion_model(self.xI, self.xG)  # class Motion_model
-        self.plotting = plotting.Plotting(self.xI, self.xG)  # class Plotting
-
-        self.u_set = self.env.motions  # feasible input set
-        self.stateSpace = self.env.stateSpace  # state space
-        self.obs = self.env.obs_map()  # position of obstacles
-        self.lose = self.env.lose_map()  # position of lose states
-
-        self.name1 = "Q-value_iteration, gamma=" + str(self.gamma)
-        self.name2 = "converge process, e=" + str(self.e)
-
-        [self.value, self.policy, self.diff] = self.iteration(self.xI, self.xG)
-        self.path = self.extract_path(self.xI, self.xG, self.policy)
-        self.plotting.animation(self.path, self.name1)
-        self.plotting.plot_diff(self.diff, self.name2)
-
-    def iteration(self, xI, xG):
-        """
-        Q_value_iteration
-        :return: converged Q table and policy
-        """
-
-        Q_table = {}
-        policy = {}
-        diff = []
-        delta = sys.maxsize
-        count = 0
-
-        for x in self.stateSpace:
-            Q_table[x] = [0, 0, 0, 0]  # initialize Q_table
-
-        while delta > self.e:  # convergence condition
-            count += 1
-            x_value = 0
-            for x in self.stateSpace:
-                if x not in x_Goal:
-                    for k in range(len(self.u_set)):
-                        [x_next, p_next] = self.motion.move_next(x, self.u_set[k])
-                        Q_value = self.cal_Q_value(x_next, p_next, Q_table)
-                        v_diff = abs(Q_table[x][k] - Q_value)
-                        Q_table[x][k] = Q_value
-                        if v_diff > 0:
-                            x_value = max(x_value, v_diff)
-            diff.append(x_value)
-            delta = x_value
-
-        for x in self.stateSpace:
-            if x not in xG:
-                policy[x] = np.argmax(Q_table[x])
-
-        self.message(count)
-
-        return Q_table, policy, diff
-
-    def cal_Q_value(self, x, p, table):
-        """
-        cal Q_value.
-
-        :param x: next state vector
-        :param p: probability of each state
-        :param table: value table
-        :return: Q-value
-        """
-
-        value = 0
-        reward = self.env.get_reward(x)  # get reward of next state
-        for i in range(len(x)):
-            value += p[i] * (reward[i] + self.gamma * max(table[x[i]]))
-
-        return value
-
-    def extract_path(self, xI, xG, policy):
-        """
-        extract path from converged policy.
-
-        :param xI: starting state
-        :param xG: goal states
-        :param policy: converged policy
-        :return: path
-        """
-
-        x, path = xI, [xI]
-        while x not in xG:
-            u = self.u_set[policy[x]]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next in self.obs:
-                print("Collision! Please run again!")
-                break
-            else:
-                path.append(x_next)
-                x = x_next
-        return path
-
-    def message(self, count):
-        """
-        print important message.
-
-        :param count: iteration numbers
-        :return: print
-        """
-
-        print("starting state: ", self.xI)
-        print("goal states: ", self.xG)
-        print("condition for convergence: ", self.e)
-        print("discount factor: ", self.gamma)
-        print("iteration times: ", count)
-
-
-if __name__ == '__main__':
-    x_Start = (5, 5)
-    x_Goal = [(49, 5), (49, 25)]
-
-    QVI = Q_value_iteration(x_Start, x_Goal)

BIN
Stochastic Shortest Path/__pycache__/env.cpython-37.pyc


BIN
Stochastic Shortest Path/__pycache__/motion_model.cpython-37.pyc


BIN
Stochastic Shortest Path/__pycache__/plotting.cpython-37.pyc


+ 0 - 87
Stochastic Shortest Path/env.py

@@ -1,87 +0,0 @@
-class Env:
-    def __init__(self, xI, xG):
-        self.x_range = 51  # size of background
-        self.y_range = 31
-        self.motions = [(1, 0), (-1, 0), (0, 1), (0, -1)]
-        self.xI = xI
-        self.xG = xG
-        self.obs = self.obs_map()
-        self.lose = self.lose_map()
-        self.stateSpace = self.state_space()
-
-    def obs_map(self):
-        """
-        Initialize obstacles' positions
-        :return: map of obstacles
-        """
-
-        x = self.x_range
-        y = self.y_range
-        obs = []
-
-        for i in range(x):
-            obs.append((i, 0))
-        for i in range(x):
-            obs.append((i, y - 1))
-
-        for i in range(y):
-            obs.append((0, i))
-        for i in range(y):
-            obs.append((x - 1, i))
-
-        for i in range(10, 21):
-            obs.append((i, 15))
-        for i in range(15):
-            obs.append((20, i))
-
-        for i in range(15, 30):
-            obs.append((30, i))
-        for i in range(16):
-            obs.append((40, i))
-
-        return obs
-
-    def lose_map(self):
-        """
-        Initialize losing states' positions
-        :return: losing states
-        """
-
-        lose = []
-        for i in range(25, 36):
-            lose.append((i, 13))
-
-        return lose
-
-    def state_space(self):
-        """
-        generate state space
-        :return: state space
-        """
-
-        state_space = []
-        for i in range(self.x_range):
-            for j in range(self.y_range):
-                if (i, j) not in self.obs:
-                    state_space.append((i, j))
-
-        return state_space
-
-    def get_reward(self, x_next):
-        """
-        calculate reward of next state
-
-        :param x_next: next state
-        :return: reward
-        """
-
-        reward = []
-        for x in x_next:
-            if x in self.xG:
-                reward.append(10)  # reward : 10, for goal states
-            elif x in self.lose:
-                reward.append(-10)  # reward : -10, for lose states
-            else:
-                reward.append(0)  # reward : 0, for other states
-
-        return reward

BIN
Stochastic Shortest Path/gif/VI.gif


BIN
Stochastic Shortest Path/gif/VI.jpeg


BIN
Stochastic Shortest Path/gif/VI_E.gif


+ 0 - 38
Stochastic Shortest Path/motion_model.py

@@ -1,38 +0,0 @@
-import env
-
-
-class Motion_model():
-    def __init__(self, xI, xG):
-        self.env = env.Env(xI, xG)
-        self.obs = self.env.obs_map()
-
-    def move_next(self, x, u, eta=0.2):
-        """
-        Motion model of robots,
-
-        :param x: current state (node)
-        :param u: input
-        :param obs: obstacle map
-        :param eta: noise in motion model
-        :return: next states and corresponding probability
-        """
-
-        p_next = [1 - eta, eta / 2, eta / 2]
-        x_next = []
-        if u == (0, 1):
-            u_real = [(0, 1), (-1, 0), (1, 0)]
-        elif u == (0, -1):
-            u_real = [(0, -1), (-1, 0), (1, 0)]
-        elif u == (-1, 0):
-            u_real = [(-1, 0), (0, 1), (0, -1)]
-        else:
-            u_real = [(1, 0), (0, 1), (0, -1)]
-
-        for act in u_real:
-            x_check = (x[0] + act[0], x[1] + act[1])
-            if x_check in self.obs:
-                x_next.append(x)
-            else:
-                x_next.append(x_check)
-
-        return x_next, p_next

+ 0 - 113
Stochastic Shortest Path/plotting.py

@@ -1,113 +0,0 @@
-import matplotlib.pyplot as plt
-import env
-
-
-class Plotting():
-    def __init__(self, xI, xG):
-        self.xI, self.xG = xI, xG
-        self.env = env.Env(self.xI, self.xG)
-        self.obs = self.env.obs_map()
-        self.lose = self.env.lose_map()
-
-    def animation(self, path, name):
-        """
-        animation.
-
-        :param path: optimal path
-        :param name: tile of figure
-        :return: an animation
-        """
-
-        plt.figure(1)
-        self.plot_grid(name)
-        self.plot_lose()
-        self.plot_path(path)
-
-    def plot_grid(self, name):
-        """
-        plot the obstacles in environment.
-
-        :param name: title of figure
-        :return: plot
-        """
-
-        obs_x = [self.obs[i][0] for i in range(len(self.obs))]
-        obs_y = [self.obs[i][1] for i in range(len(self.obs))]
-
-        plt.plot(self.xI[0], self.xI[1], "bs")
-        for x in self.xG:
-            plt.plot(x[0], x[1], "gs")
-
-        plt.plot(obs_x, obs_y, "sk")
-        plt.title(name)
-        plt.axis("equal")
-
-    def plot_lose(self):
-        """
-        plot losing states in environment.
-        :return: a plot
-        """
-
-        lose_x = [self.lose[i][0] for i in range(len(self.lose))]
-        lose_y = [self.lose[i][1] for i in range(len(self.lose))]
-
-        plt.plot(lose_x, lose_y, color='#A52A2A', marker='s')
-
-    def plot_visited(self, visited):
-        """
-        animation of order of visited nodes.
-
-        :param visited: visited nodes
-        :return: animation
-        """
-
-        visited.remove(self.xI)
-        count = 0
-
-        for x in visited:
-            count += 1
-            plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
-            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
-            [exit(0) if event.key == 'escape' else None])
-
-            if count < len(visited) / 3:
-                length = 15
-            elif count < len(visited) * 2 / 3:
-                length = 30
-            else:
-                length = 45
-
-            if count % length == 0: plt.pause(0.001)
-
-    def plot_path(self, path):
-        path.remove(self.xI)
-        for x in self.xG:
-            if x in path:
-                path.remove(x)
-
-        for x in path:
-            plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')
-            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
-            [exit(0) if event.key == 'escape' else None])
-            plt.pause(0.001)
-        plt.show()
-        plt.pause(0.5)
-
-    def plot_diff(self, diff, name):
-        plt.figure(2)
-        plt.title(name, fontdict=None)
-        plt.xlabel('iterations')
-        plt.ylabel('difference of successive iterations')
-        plt.grid('on')
-
-        count = 0
-        for x in diff:
-            plt.plot(count, x, color='#808080', marker='o')  # plot dots for animation
-            plt.gcf().canvas.mpl_connect('key_release_event', lambda event:
-            [exit(0) if event.key == 'escape' else None])
-            plt.pause(0.07)
-            count += 1
-
-        plt.plot(diff, color='#808080')
-        plt.pause(0.01)
-        plt.show()

+ 0 - 158
Stochastic Shortest Path/policy_iteration.py

@@ -1,158 +0,0 @@
-import env
-import plotting
-import motion_model
-
-import numpy as np
-import sys
-import copy
-
-
-class Policy_iteration:
-    def __init__(self, x_start, x_goal):
-        self.xI, self.xG = x_start, x_goal
-        self.e = 0.001  # threshold for convergence
-        self.gamma = 0.9  # discount factor
-
-        self.env = env.Env(self.xI, self.xG)
-        self.motion = motion_model.Motion_model(self.xI, self.xG)
-        self.plotting = plotting.Plotting(self.xI, self.xG)
-
-        self.u_set = self.env.motions  # feasible input set
-        self.stateSpace = self.env.stateSpace  # state space
-        self.obs = self.env.obs_map()  # position of obstacles
-        self.lose = self.env.lose_map()  # position of lose states
-
-        self.name1 = "policy_iteration, gamma=" + str(self.gamma)
-
-        [self.value, self.policy] = self.iteration()
-        self.path = self.extract_path(self.xI, self.xG, self.policy)
-        self.plotting.animation(self.path, self.name1)
-
-    def policy_evaluation(self, policy, value):
-        """
-        Evaluate current policy.
-
-        :param policy: current policy
-        :param value: value table
-        :return: new value table generated by current policy
-        """
-
-        delta = sys.maxsize
-
-        while delta > self.e:  # convergence condition
-            x_value = 0
-            for x in self.stateSpace:
-                if x not in self.xG:
-                    [x_next, p_next] = self.motion.move_next(x, policy[x])
-                    v_Q = self.cal_Q_value(x_next, p_next, value)
-                    v_diff = abs(value[x] - v_Q)
-                    value[x] = v_Q
-                    if v_diff > 0:
-                        x_value = max(x_value, v_diff)
-            delta = x_value
-
-        return value
-
-    def policy_improvement(self, policy, value):
-        """
-        Improve policy using current value table.
-
-        :param policy: policy table
-        :param value: current value table
-        :return: improved policy table
-        """
-
-        for x in self.stateSpace:
-            if x not in self.xG:
-                value_list = []
-                for u in self.u_set:
-                    [x_next, p_next] = self.motion.move_next(x, u)
-                    value_list.append(self.cal_Q_value(x_next, p_next, value))
-                policy[x] = self.u_set[int(np.argmax(value_list))]
-
-        return policy
-
-    def iteration(self):
-        """
-        polity iteration: using evaluate and improvement process until convergence.
-        :return: value table and converged policy.
-        """
-
-        value_table = {}
-        policy = {}
-        count = 0
-
-        for x in self.stateSpace:
-            value_table[x] = 0  # initialize value table
-            policy[x] = self.u_set[0]  # initialize policy table
-
-        while True:
-            count += 1
-            policy_back = copy.deepcopy(policy)
-            value_table = self.policy_evaluation(policy, value_table)  # evaluation process
-            policy = self.policy_improvement(policy, value_table)  # policy improvement process
-            if policy_back == policy: break  # convergence condition
-
-        self.message(count)
-
-        return value_table, policy
-
-    def cal_Q_value(self, x, p, table):
-        """
-        cal Q_value.
-
-        :param x: next state vector
-        :param p: probability of each state
-        :param table: value table
-        :return: Q-value
-        """
-
-        value = 0
-        reward = self.env.get_reward(x)  # get reward of next state
-        for i in range(len(x)):
-            value += p[i] * (reward[i] + self.gamma * table[x[i]])  # cal Q-value
-
-        return value
-
-    def extract_path(self, xI, xG, policy):
-        """
-        extract path from converged policy.
-
-        :param xI: starting state
-        :param xG: goal states
-        :param policy: converged policy
-        :return: path
-        """
-
-        x, path = xI, [xI]
-        while x not in xG:
-            u = policy[x]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next in self.obs:
-                print("Collision! Please run again!")
-                break
-            else:
-                path.append(x_next)
-                x = x_next
-        return path
-
-    def message(self, count):
-        """
-        print important message.
-
-        :param count: iteration numbers
-        :return: print
-        """
-
-        print("starting state: ", self.xI)
-        print("goal states: ", self.xG)
-        print("condition for convergence: ", self.e)
-        print("discount factor: ", self.gamma)
-        print("iteration times: ", count)
-
-
-if __name__ == '__main__':
-    x_Start = (5, 5)
-    x_Goal = [(49, 5), (49, 25)]
-
-    PI = Policy_iteration(x_Start, x_Goal)

+ 0 - 126
Stochastic Shortest Path/value_iteration.py

@@ -1,126 +0,0 @@
-import env
-import plotting
-import motion_model
-
-import numpy as np
-import sys
-
-
-class Value_iteration:
-    def __init__(self, x_start, x_goal):
-        self.xI, self.xG = x_start, x_goal
-        self.e = 0.001  # threshold for convergence
-        self.gamma = 0.9  # discount factor
-
-        self.env = env.Env(self.xI, self.xG)  # class Env
-        self.motion = motion_model.Motion_model(self.xI, self.xG)  # class Motion_model
-        self.plotting = plotting.Plotting(self.xI, self.xG)  # class Plotting
-
-        self.u_set = self.env.motions  # feasible input set
-        self.stateSpace = self.env.stateSpace  # state space
-        self.obs = self.env.obs_map()  # position of obstacles
-        self.lose = self.env.lose_map()  # position of lose states
-
-        self.name1 = "value_iteration, gamma=" + str(self.gamma)
-        self.name2 = "converge process, e=" + str(self.e)
-
-        [self.value, self.policy, self.diff] = self.iteration(self.xI, self.xG)
-        self.path = self.extract_path(self.xI, self.xG, self.policy)
-        self.plotting.animation(self.path, self.name1)
-        self.plotting.plot_diff(self.diff, self.name2)
-
-    def iteration(self, xI, xG):
-        """
-        value_iteration.
-
-        :return: converged value table, optimal policy and variation of difference,
-        """
-
-        value_table = {}  # value table
-        policy = {}  # policy
-        diff = []  # maximum difference between two successive iteration
-        delta = sys.maxsize  # initialize maximum difference
-        count = 0  # iteration times
-
-        for x in self.stateSpace:  # initialize value table for feasible states
-            value_table[x] = 0
-
-        while delta > self.e:  # converged condition
-            count += 1
-            x_value = 0
-            for x in self.stateSpace:
-                if x not in xG:
-                    value_list = []
-                    for u in self.u_set:
-                        [x_next, p_next] = self.motion.move_next(x, u)  # recall motion model
-                        value_list.append(self.cal_Q_value(x_next, p_next, value_table))  # cal Q value
-                    policy[x] = self.u_set[int(np.argmax(value_list))]  # update policy
-                    v_diff = abs(value_table[x] - max(value_list))  # maximum difference
-                    value_table[x] = max(value_list)  # update value table
-                    x_value = max(x_value, v_diff)
-            delta = x_value  # update delta
-            diff.append(delta)
-
-        self.message(count)  # print messages
-
-        return value_table, policy, diff
-
-    def cal_Q_value(self, x, p, table):
-        """
-        cal Q_value.
-
-        :param x: next state vector
-        :param p: probability of each state
-        :param table: value table
-        :return: Q-value
-        """
-
-        value = 0
-        reward = self.env.get_reward(x)  # get reward of next state
-        for i in range(len(x)):
-            value += p[i] * (reward[i] + self.gamma * table[x[i]])  # cal Q-value
-
-        return value
-
-    def extract_path(self, xI, xG, policy):
-        """
-        extract path from converged policy.
-
-        :param xI: starting state
-        :param xG: goal states
-        :param policy: converged policy
-        :return: path
-        """
-
-        x, path = xI, [xI]
-        while x not in xG:
-            u = policy[x]
-            x_next = (x[0] + u[0], x[1] + u[1])
-            if x_next in self.obs:
-                print("Collision! Please run again!")
-                break
-            else:
-                path.append(x_next)
-                x = x_next
-        return path
-
-    def message(self, count):
-        """
-        print important message.
-
-        :param count: iteration numbers
-        :return: print
-        """
-
-        print("starting state: ", self.xI)
-        print("goal states: ", self.xG)
-        print("condition for convergence: ", self.e)
-        print("discount factor: ", self.gamma)
-        print("iteration times: ", count)
-
-
-if __name__ == '__main__':
-    x_Start = (5, 5)  # starting state
-    x_Goal = [(49, 5), (49, 25)]  # goal states
-
-    VI = Value_iteration(x_Start, x_Goal)