zhm-real 5 년 전
부모
커밋
7f38cc3a21

+ 7 - 0
Model-free Control/.idea/dictionaries/Huiming_Zhou.xml

@@ -0,0 +1,7 @@
+<component name="ProjectDictionaryState">
+  <dictionary name="Huiming Zhou">
+    <words>
+      <w>sarsa</w>
+    </words>
+  </dictionary>
+</component>

+ 165 - 0
Model-free Control/Q-learning.py

@@ -0,0 +1,165 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+@author: huiming zhou
+"""
+
+import env
+import tools
+import motion_model
+
+import matplotlib.pyplot as plt
+import numpy as np
+import sys
+
+
+class QLEARNING:
+    def __init__(self, x_start, x_goal):
+        self.u_set = motion_model.motions                       # feasible input set
+        self.xI, self.xG = x_start, x_goal
+        self.M = 500
+        self.gamma = 0.9                                        # discount factor
+        self.alpha = 0.5
+        self.epsilon = 0.1
+        self.obs = env.obs_map()                                # position of obstacles
+        self.lose = env.lose_map()                              # position of lose states
+        self.name1 = "Qlearning, M=" + str(self.M)
+        self.name2 = "convergence of error"
+
+
+    def Monte_Carlo(self):
+        """
+        Monte_Carlo experiments
+
+        :return: Q_table, policy
+        """
+
+        Q_table = self.table_init()
+        policy = {}
+        count = 0
+
+        for k in range(self.M):
+            count += 1
+            x = self.state_init()
+            while x != self.xG:
+                u = self.epsilon_greedy(int(np.argmax(Q_table[x])), self.epsilon)
+                x_next = self.move_next(x, self.u_set[u])
+                reward = env.get_reward(x_next, self.lose)
+                Q_table[x][u] = (1 - self.alpha) * Q_table[x][u] + \
+                                self.alpha * (reward + self.gamma * max(Q_table[x_next]))
+                x = x_next
+
+        for x in Q_table:
+            policy[x] = int(np.argmax(Q_table[x]))
+
+        return Q_table, policy
+
+
+    def table_init(self):
+        """
+        Initialize Q_table: Q(s, a)
+        :return: Q_table
+        """
+
+        Q_table = {}
+
+        for i in range(env.x_range):
+            for j in range(env.y_range):
+                u = []
+                if (i, j) not in self.obs:
+                    for k in range(len(self.u_set)):
+                        if (i, j) == self.xG:
+                            u.append(0)
+                        else:
+                            u.append(np.random.random_sample())
+                    Q_table[(i, j)] = u
+
+        return Q_table
+
+
+    def state_init(self):
+        """
+        initialize a starting state
+        :return: starting state
+        """
+        while True:
+            i = np.random.randint(0, env.x_range - 1)
+            j = np.random.randint(0, env.y_range - 1)
+            if (i, j) not in self.obs:
+                return (i, j)
+
+
+    def epsilon_greedy(self, u, error):
+        """
+        generate a policy using epsilon_greedy algorithm
+
+        :param u: original input
+        :param error: epsilon value
+        :return: epsilon policy
+        """
+
+        if np.random.random_sample() < 3 / 4 * error:
+            u_e = u
+            while u_e == u:
+                p = np.random.random_sample()
+                if p < 0.25: u_e = 0
+                elif p < 0.5: u_e = 1
+                elif p < 0.75: u_e = 2
+                else: u_e = 3
+            return u_e
+        return u
+
+
+    def move_next(self, x, u):
+        """
+        get next state.
+
+        :param x: current state
+        :param u: input
+        :return: next state
+        """
+
+        x_next = (x[0] + u[0], x[1] + u[1])
+        if x_next in self.obs:
+            return x
+        return x_next
+
+
+    def simulation(self, xI, xG, policy):
+        """
+        simulate a path using converged policy.
+
+        :param xI: starting state
+        :param xG: goal state
+        :param policy: converged policy
+        :return: simulation path
+        """
+
+        plt.figure(1)  # path animation
+        tools.show_map(xI, xG, self.obs, self.lose, self.name1)  # show background
+
+        x, path = xI, []
+        while True:
+            u = self.u_set[policy[x]]
+            x_next = (x[0] + u[0], x[1] + u[1])
+            if x_next in self.obs:
+                print("Collision!")  # collision: simulation failed
+            else:
+                x = x_next
+                if x_next == xG:
+                    break
+                else:
+                    tools.plot_dots(x)  # each state in optimal path
+                    path.append(x)
+        plt.show()
+
+        return path
+
+
+if __name__ == '__main__':
+    x_Start = (1, 1)
+    x_Goal = (12, 1)
+
+    Q_CALL = QLEARNING(x_Start, x_Goal)
+    [value_SARSA, policy_SARSA] = Q_CALL.Monte_Carlo()
+    path_VI = Q_CALL.simulation(x_Start, x_Goal, policy_SARSA)

+ 166 - 0
Model-free Control/Sarsa.py

@@ -0,0 +1,166 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+@author: huiming zhou
+"""
+
+import env
+import tools
+import motion_model
+
+import matplotlib.pyplot as plt
+import numpy as np
+import sys
+
+
+class SARSA:
+    def __init__(self, x_start, x_goal):
+        self.u_set = motion_model.motions                       # feasible input set
+        self.xI, self.xG = x_start, x_goal
+        self.M = 500
+        self.gamma = 0.9                                        # discount factor
+        self.alpha = 0.5
+        self.epsilon = 0.1
+        self.obs = env.obs_map()                                # position of obstacles
+        self.lose = env.lose_map()                              # position of lose states
+        self.name1 = "SARSA, M=" + str(self.M)
+        self.name2 = "convergence of error"
+
+
+    def Monte_Carlo(self):
+        """
+        Monte_Carlo experiments
+
+        :return: Q_table, policy
+        """
+
+        Q_table = self.table_init()
+        policy = {}
+        count = 0
+
+        for k in range(self.M):
+            count += 1
+            x = self.state_init()
+            u = self.epsilon_greedy(int(np.argmax(Q_table[x])), self.epsilon)
+            while x != self.xG:
+                x_next = self.move_next(x, self.u_set[u])
+                reward = env.get_reward(x_next, self.lose)
+                u_next = self.epsilon_greedy(int(np.argmax(Q_table[x_next])), self.epsilon)
+                Q_table[x][u] = (1 - self.alpha) * Q_table[x][u] + \
+                                self.alpha * (reward + self.gamma * Q_table[x_next][u_next])
+                x, u = x_next, u_next
+
+        for x in Q_table:
+            policy[x] = int(np.argmax(Q_table[x]))
+
+        return Q_table, policy
+
+
+    def table_init(self):
+        """
+        Initialize Q_table: Q(s, a)
+        :return: Q_table
+        """
+
+        Q_table = {}
+
+        for i in range(env.x_range):
+            for j in range(env.y_range):
+                u = []
+                if (i, j) not in self.obs:
+                    for k in range(len(self.u_set)):
+                        if (i, j) == self.xG:
+                            u.append(0)
+                        else:
+                            u.append(np.random.random_sample())
+                    Q_table[(i, j)] = u
+
+        return Q_table
+
+
+    def state_init(self):
+        """
+        initialize a starting state
+        :return: starting state
+        """
+        while True:
+            i = np.random.randint(0, env.x_range - 1)
+            j = np.random.randint(0, env.y_range - 1)
+            if (i, j) not in self.obs:
+                return (i, j)
+
+
+    def epsilon_greedy(self, u, error):
+        """
+        generate a policy using epsilon_greedy algorithm
+
+        :param u: original input
+        :param error: epsilon value
+        :return: epsilon policy
+        """
+
+        if np.random.random_sample() < 3 / 4 * error:
+            u_e = u
+            while u_e == u:
+                p = np.random.random_sample()
+                if p < 0.25: u_e = 0
+                elif p < 0.5: u_e = 1
+                elif p < 0.75: u_e = 2
+                else: u_e = 3
+            return u_e
+        return u
+
+
+    def move_next(self, x, u):
+        """
+        get next state.
+
+        :param x: current state
+        :param u: input
+        :return: next state
+        """
+
+        x_next = (x[0] + u[0], x[1] + u[1])
+        if x_next in self.obs:
+            return x
+        return x_next
+
+
+    def simulation(self, xI, xG, policy):
+        """
+        simulate a path using converged policy.
+
+        :param xI: starting state
+        :param xG: goal state
+        :param policy: converged policy
+        :return: simulation path
+        """
+
+        plt.figure(1)  # path animation
+        tools.show_map(xI, xG, self.obs, self.lose, self.name1)  # show background
+
+        x, path = xI, []
+        while True:
+            u = self.u_set[policy[x]]
+            x_next = (x[0] + u[0], x[1] + u[1])
+            if x_next in self.obs:
+                print("Collision!")  # collision: simulation failed
+            else:
+                x = x_next
+                if x_next == xG:
+                    break
+                else:
+                    tools.plot_dots(x)  # each state in optimal path
+                    path.append(x)
+        plt.show()
+
+        return path
+
+
+if __name__ == '__main__':
+    x_Start = (1, 1)
+    x_Goal = (12, 1)
+
+    SARSA_CALL = SARSA(x_Start, x_Goal)
+    [value_SARSA, policy_SARSA] = SARSA_CALL.Monte_Carlo()
+    path_VI = SARSA_CALL.simulation(x_Start, x_Goal, policy_SARSA)

BIN
Model-free Control/__pycache__/env.cpython-37.pyc


BIN
Model-free Control/__pycache__/motion_model.cpython-37.pyc


BIN
Model-free Control/__pycache__/tools.cpython-37.pyc


+ 8 - 23
Model-free Control/env.py

@@ -4,7 +4,7 @@
 @author: huiming zhou
 """
 
-x_range, y_range = 51, 31     # size of background
+x_range, y_range = 14, 6     # size of background
 
 
 def obs_map():
@@ -25,16 +25,6 @@ def obs_map():
     for i in range(y_range):
         obs.append((x_range - 1, i))
 
-    for i in range(10, 21):
-        obs.append((i, 15))
-    for i in range(15):
-        obs.append((20, i))
-
-    for i in range(15, 30):
-        obs.append((30, i))
-    for i in range(16):
-        obs.append((40, i))
-
     return obs
 
 
@@ -45,13 +35,13 @@ def lose_map():
     """
 
     lose = []
-    for i in range(25, 36):
-        lose.append((i, 13))
+    for i in range(2, 12):
+        lose.append((i, 1))
 
     return lose
 
 
-def get_reward(x_next, xG, lose):
+def get_reward(x_next, lose):
     """
     calculate reward of next state
 
@@ -59,13 +49,8 @@ def get_reward(x_next, xG, lose):
     :return: reward
     """
 
-    reward = []
-    for x in x_next:
-        if x in xG:
-            reward.append(10)           # reward : 10, for goal states
-        elif x in lose:
-            reward.append(-10)          # reward : -10, for lose states
-        else:
-            reward.append(0)            # reward : 0, for other states
+    if x_next in lose:
+        return -100                      # reward : -100, for lose states
+    return -1                            # reward : -1, for other states
+
 
-    return reward

+ 1 - 0
Model-free Control/motion_model.py

@@ -7,6 +7,7 @@ import numpy as np
 
 motions = [(1, 0), (-1, 0), (0, 1), (0, -1)]                # feasible motion sets
 
+
 def move_prob(x, u, obs, eta = 0.2):
     """
     Motion model of robots,

+ 5 - 7
Model-free Control/tools.py

@@ -67,13 +67,11 @@ def show_map(xI, xG, obs_map, lose_map, name):
     lose_x = [lose_map[i][0] for i in range(len(lose_map))]
     lose_y = [lose_map[i][1] for i in range(len(lose_map))]
 
-    plt.plot(xI[0], xI[1], "bs")                                    # plot starting state (blue)
+    plt.plot(xI[0], xI[1], "bs", ms = 24)                                    # plot starting state (blue)
+    plt.plot(xG[0], xG[1], "gs", ms = 24)                                    # plot goal states (green)
 
-    for x in xG:
-        plt.plot(x[0], x[1], "gs")                                  # plot goal states (green)
-
-    plt.plot(obs_x, obs_y, "sk")                                    # plot obstacles (black)
-    plt.plot(lose_x, lose_y, marker = 's', color = '#A52A2A')       # plot losing states (grown)
+    plt.plot(obs_x, obs_y, "sk", ms = 24)                                    # plot obstacles (black)
+    plt.plot(lose_x, lose_y, marker = 's', color = '#808080', ms = 24)       # plot losing states (grown)
     plt.title(name, fontdict=None)
     plt.axis("equal")
 
@@ -86,7 +84,7 @@ def plot_dots(x):
     :return: a plot
     """
 
-    plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o')    # plot dots for animation
+    plt.plot(x[0], x[1], linewidth='3', color='#808080', marker='o', ms = 24)    # plot dots for animation
     plt.gcf().canvas.mpl_connect('key_release_event',
                                  lambda event: [exit(0) if event.key == 'escape' else None])
     plt.pause(0.001)