'Deep Q Learning **WITHOUT** OpenAI Gym

Does anyone have or know of any tutorials / courses that teach q learning without the use of open ai gym. I'm trying to make a convolutional q learning model and I have no problem doing this with pytorch and open ai gym, easy! but when I try and apply it all to an environment that isn't in open ai gym its a whole different story, trying to apply this to other games that aren't Atari so I don't have access to the env.reset and all those other nice options, it becomes a whole new ball game. If someone knows of a place to learn this or who is willing to teach me / help me with it I am more than willing to pay for any help as well. Thanks.



Solution 1:[1]

You can take a code of an environnement such as a game and then you implement your algorithms.

But think to be explicit in your code for the rewards and the actions. Return a reward for each action.

If you want to start RL without Gym. Try to do a simple game and implement NEAT algorithm. And then try to implement Q-learning and modify your code to add a reward for each action.

Solution 2:[2]

If you want to make deep learning algorithms work for games, you can actually use openai gym for that!

The workaround

You can use from PIL import ImageGrab to take a screenshot, and control the game using pyautogui Then load it with opencv, and convert it to a greyscale image. Then you can use this code for the Q-Learning:

# imports for DQNAgent
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Activation, Flatten
from tensorflow.keras.optimizers import Adam
from keras.callbacks import TensorBoard
import tensorflow as tf
from collections import deque
import time
import random
import os

# Hide GPU from visible devices 
# (If you don't have a GPU, uncomment the next line)
#tf.config.set_visible_devices([], 'GPU')

DISCOUNT = 0.99
REPLAY_MEMORY_SIZE = 50_000  # How many last steps to keep for model training
MIN_REPLAY_MEMORY_SIZE = 1_000  # Minimum number of steps in a memory to start training
MINIBATCH_SIZE = 64  # How many steps (samples) to use for training
UPDATE_TARGET_EVERY = 5  # Terminal states (end of episodes)
MODEL_NAME = 'BOX'

# Exploration settings
ELIPSON_DECAY = 0.999988877665
MIN_EPSILON = 0.0001

# For stats
ep_rewards = [-200]

# For more repetitive results
random.seed(1)
np.random.seed(1)

# Memory fraction, used mostly when training multiple agents
#gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=MEMORY_FRACTION)
#backend.set_session(tf.Session(config=tf.ConfigProto(gpu_options=gpu_options)))

# Create models folder
if not os.path.isdir('models'):
    os.makedirs('models')


# Own Tensorboard class
class ModifiedTensorBoard(TensorBoard):

    # Overriding init to set initial step and writer (we want one log file for all .fit() calls)
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.step = 1
        self.writer = tf.summary.create_file_writer(self.log_dir)

    # Overriding this method to stop creating default log writer
    def set_model(self, model):
        pass

    # Overrided, saves logs with our step number
    # (otherwise every .fit() will start writing from 0th step)
    def on_epoch_end(self, epoch, logs=None):
        self.update_stats(**logs)

    # Overrided
    # We train for one batch only, no need to save anything at epoch end
    def on_batch_end(self, batch, logs=None):
        pass

    # Overrided, so won't close writer
    def on_train_end(self, _):
        pass

    # Custom method for saving own metrics
    # Creates writer, writes custom metrics and closes writer
    def update_stats(self, **stats):
        self._write_logs(stats, self.step)


# Agent class
class DQNAgent:
    def __init__(self, env):
        self.env = env

        # Main model
        self.model = self.create_model()

        # Target network
        self.target_model = self.create_model()
        self.target_model.set_weights(self.model.get_weights())

        # An array with last n steps for training
        self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)

        # Custom tensorboard object
        self.tensorboard = ModifiedTensorBoard(log_dir="logs/{}-{}".format(MODEL_NAME, int(time.time())))

        # Used to count when to update target network with main network's weights
        self.target_update_counter = 0

    def create_model(self,):
        model = Sequential()

        observation_space = 60000, IMAGE_WIDTH, IMAGE_HEIGHT, 1
        action_space = self.env.action_space.n

        model.add(Conv2D(32, (3, 3), activation='relu', input_shape=observation_space[1:]))
        model.add(Activation('relu'))
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Dropout(0.2))

        model.add(Conv2D(256, (3, 3)))
        model.add(Activation('relu'))
        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Dropout(0.2))

        model.add(Flatten())  # this converts our 3D feature maps to 1D feature vectors
        model.add(Dense(64))

        model.add(Dense(action_space, activation='linear'))  # ACTION_SPACE_SIZE = how many choices (9)
        model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=['accuracy'])
        return model

    # Adds step's data to a memory replay array
    # (observation space, action, reward, new observation space, done)
    def update_replay_memory(self, transition):
        self.replay_memory.append(transition)

    # Trains main network every step during episode
    def train(self, terminal_state, step):

        # Start training only if certain number of samples is already saved
        if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return

        # Get a minibatch of random samples from memory replay table
        minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)

        # Get current states from minibatch, then query NN model for Q values
        current_states = np.array([transition[0] for transition in minibatch])/255
        current_qs_list = self.model.predict(current_states)

        # Get future states from minibatch, then query NN model for Q values
        # When using target network, query it, otherwise main network should be queried
        new_current_states = np.array([transition[3] for transition in minibatch])/255
        future_qs_list = self.target_model.predict(new_current_states)

        X = []
        y = []

        # Now we need to enumerate our batches
        for index, (current_state, action, reward, new_current_state, done) in enumerate(minibatch):

            # If not a terminal state, get new q from future states, otherwise set it to 0
            # almost like with Q Learning, but we use just part of equation here
            if not done:
                max_future_q = np.max(future_qs_list[index])
                new_q = reward + DISCOUNT * max_future_q
            else:
                new_q = reward

            # Update Q value for given state
            current_qs = current_qs_list[index]
            current_qs[action] = new_q

            # And append to our training data
            X.append(current_state)
            y.append(current_qs)

        # Fit on all samples as one batch, log only on terminal state
        self.model.fit(np.array(X)/255, np.array(y), batch_size=MINIBATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if terminal_state else None)

        # Update target network counter every episode
        if terminal_state:
            self.target_update_counter += 1

        # If counter reaches set value, update target network with weights of main network
        if self.target_update_counter > UPDATE_TARGET_EVERY:
            self.target_model.set_weights(self.model.get_weights())
            self.target_update_counter = 0

    # Queries main network for Q values given current observation space (environment state)
    def get_qs(self, state):
        return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]

Dont't forget to replace IMAGE_WIDTH and IMAGE_HEIGHT!

In your env.update function, take a screenshot, and update the replay memory of the agent with agent.update_replay_memory. Then in the same function, use agent.train. To get the next move of the agent, use agent.get_qs.

import random, time, gym, cv2
from PIL import ImageGrab
import pyautogui

class MyEnv(gym.Env):
    def __init__(self):
        super().__init__(self)

        self.observation_space = gym.Box()
        self.action_space = gym.Discrete(N_ACTIONS) #number of controls
        
        self.model = DQNAgent()
        self.previous_observation = None

    def step(self, action):
        # Conditional logic for what to do with actions
        # an example
        if action == 0:
            pyautogui.press('w') # Go forwards
            reward = 1
        
        shot = np.array(ImageGrab.grab(bbox=("""x, y, width, height of game window""")))
        gray = cv2.cvtColor(Screen, cv2.COLOR_BGR2GRAY)
        if self.previous_observation is not None:
            self.model.update_replay_memory((self.previous_observation, action, reward, gray))

        self.prevoius_observation = gray

        # check if the player has lost, and call self.reset()

        return observation, action, reward, {}

    def reset(self):
        # reset the game (re-open it, or something like that)

env = MyEnv()
epsilon = 0.1
decay = 0.99998
min = 0.001
steps = 60000

# open the game here
# ...

for i in range(0, steps):
    if random.random() < epsilon:
        env.step(env.action_space.sample())
        elipson *= decay
    else:
        try:
            env.step(env.model.get_qs(env.previous_observation))

env.reset()
env.model.save('models/player.h5')
# close the game here
# ...

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 somePythonProgrammer