Меня повысили до пассажира

Как я научил виртуальную машину водить себя с помощью Reinforcement Learning

К сожалению, до моего 16-летия еще довольно много времени. А это значит, что мне придется подождать полгода, прежде чем я смогу отправиться в путь.

А пока, думаю, я подожду, пока этот день не наступит.

Есть и вторая вещь, которую я мог бы сделать. Недавно я услышал, что в Калифорнии появились автомобили с автоматическим управлением, и это может стать решением всех моих проблем. Waymo, Uber, все они делают это.

Поэтому я спросил себя, почему я не могу?

Обучение с подкреплением

Что еще за ажиотаж вокруг этого метода машинного обучения, кроме очередного длинного модного словечка?

По сути, думайте об этом как о более интуитивном способе обучения машин.

Наш мозг устроен таким образом, что отличается от нейронных сетей. Простые нейронные сети просто принимают входные данные и принимают решения. Они очень линейны. Мы точно говорим им, что делать, и они это делают.

Обучение с подкреплением, с другой стороны, похоже на обучение ребенка ходить. Мы его ничему не учим, пусть он начнет с чистого листа. Рано или поздно он во всем разберется сам, без какого-либо вмешательства с нашей стороны.

Как это работает?

Обучение с подкреплением имеет свой собственный набор жаргона. Упрощая процесс, мы можем получить эту диаграмму.

Агент – это объект, который выполняет действия внутри среды. В нашем случае агент — это машина, а среда — наш симулятор AirSim. Мы хотим, чтобы автомобиль выполнял правильные действия через наблюдение.

Многие из этих терминов говорят сами за себя.

Награда — это балл, который он получает в зависимости от того, насколько хорошо он работал. В нашем случае, если он врезается в меньшее количество вещей, он получает более высокий балл, потому что он безопаснее и стабильнее.

Состояние – это ситуация, в которой находится агент, и использует свою сеть принятия решений для выполнения определенных действий.

Соединяя все это вместе, мы, по сути, помещаем автомобиль в эту виртуальную среду и позволяем ему понять, как водить. Мы «наказываем» его, присваивая ему меньшую оценку всякий раз, когда он сталкивается с другим объектом (например, деревом), и «награждаем», когда он движется безопасно и стабильно.

Давайте построим!

Теперь, когда у нас есть общее представление о том, как работает обучение с подкреплением, давайте приступим к созданию реального проекта!

Вернемся к нашей исходной проблеме: как сказать машине, что делать? Ответ: нет. Мы только даем ему оценку, и мы берем самые высокие оценки, создаем их варианты, тестируем их на многих итерациях (поколениях) и, в конце концов, приходим к довольно приличной модели.

Для построения нашей модели мы будем использовать Keras, высокоуровневый API для разработки моделей машинного обучения.

Для начала мы импортируем все необходимые библиотеки.

# import useful Python libraries
import numpy as np
import threading
import os
import tensorflow as tf
from keras.models import Model, clone_model
from keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense, Input
from keras.optimizers import Adam
import keras.backend as k
from keras.initializers import random_normal

После этого мы собираемся построить нейронную сеть для обработки изображений, которые получает наша модель. Это входные данные, используемые для принятия решений.

def __init__(self, weights_path, train_conv_layers):
   self.__angle_values = [-1, -0.5, 0, 0.5, 1]

   self.__nb_actions = 5
   self.__gamma = 0.99

   # Define the model
   activation = 'relu'
   pic_input = Input(shape=(59, 255, 3))

   img_stack = Conv2D(16, (3, 3), name='convolution0', padding='same', activation=activation,
                  trainable=train_conv_layers)(pic_input)
   img_stack = MaxPooling2D(pool_size=(2, 2))(img_stack)
   img_stack = Conv2D(32, (3, 3), activation=activation, padding='same', name='convolution1',
                  trainable=train_conv_layers)(img_stack)
   img_stack = MaxPooling2D(pool_size=(2, 2))(img_stack)
   img_stack = Conv2D(32, (3, 3), activation=activation, padding='same', name='convolution2',
                  trainable=train_conv_layers)(img_stack)
   img_stack = MaxPooling2D(pool_size=(2, 2))(img_stack)
   img_stack = Flatten()(img_stack)
   img_stack = Dropout(0.2)(img_stack)

   img_stack = Dense(128, name='rl_dense', kernel_initializer=random_normal(stddev=0.01))(img_stack)
   img_stack = Dropout(0.2)(img_stack)
   output = Dense(self.__nb_actions, name='rl_output', kernel_initializer=random_normal(stddev=0.01))(img_stack)

   opt = Adam()
   self.__action_model = Model(inputs=[pic_input], outputs=output)

   self.__action_model.compile(optimizer=opt, loss='mean_squared_error')
   self.__action_model.summary()

Обычно нашей модели требуется много времени для обучения на обычном компьютере. В среднем на обучение этой модели уйдет несколько дней. Однако, используя предварительно обученные веса, мы можем ускорить этот процесс до нескольких часов. Мы собираемся использовать предварительно обученные веса, чтобы сделать это быстрее.

if weights_path is not None and len(weights_path) > 0:
   print('Loading weights from my_model_weights.h5...')
   print('Current working dir is {0}'.format(os.getcwd()))
   self.__action_model.load_weights(weights_path, by_name=True)

   print('First layer: ')
   w = np.array(self.__action_model.get_weights()[0])
   print(w)
else:
   print('Not loading weights')

Далее мы собираемся настроить модель для обучения, а также создать функцию для чтения весов, которые мы импортируем в эту модель. Хотя мы импортируем веса, они представляют собой просто двоичные числа. Нам нужно написать функцию, которая преобразует файл и устанавливает веса в нашу модель.

# Set up the target model, allows the model to converge more rapidly.
   self.__action_context = tf.get_default_graph()
   self.__target_model = clone_model(self.__action_model)

   self.__target_context = tf.get_default_graph()
   self.__model_lock = threading.Lock()

# reads the trained model weights from the file
def from_packet(self, packet):
   with self.__action_context.as_default():
      self.__action_model.set_weights([np.array(w) for w in packet['action_model']])
      self.__action_context = tf.get_default_graph()
   if 'target_model' in packet:
      with self.__target_context.as_default():
         self.__target_model.set_weights([np.array(w) for w in packet['target_model']])
         self.__target_context = tf.get_default_graph()

После этого мы собираемся передать нашу модель агенту и позволить агенту тренироваться в среде (в нашем случае мы будем использовать AirSim).

# transfer finished model to agent
def to_packet(self, get_target=True):
   packet = {}
   with self.__action_context.as_default():
      packet['action_model'] = [w.tolist() for w in self.__action_model.get_weights()]
      self.__action_context = tf.get_default_graph()
   if get_target:
      with self.__target_context.as_default():
         packet['target_model'] = [w.tolist() for w in self.__target_model.get_weights()]
   return packet

Мы собираемся использовать функцию потерь и градиентный спуск, чтобы определить оптимальную модель, а приведенная ниже функция обновляет веса и смещения на основе итераций обновления обучения, отправленных агентом в качестве наблюдения.

# Updates the model with the supplied gradients
# This is used by the trainer to accept a training iteration update from the agent
def update_with_gradient(self, gradients, should_update_critic):
   with self.__action_context.as_default():
      action_weights = self.__action_model.get_weights()
      if len(action_weights) != len(gradients):
         raise ValueError('len of action_weights is {0}, but len gradients is {1}'.format(len(action_weights),
                                                                      len(gradients)))
      print('UDPATE GRADIENT DEBUG START')

      dx = 0
      for i in range(0, len(action_weights), 1):
         action_weights[i] += gradients[i]
         dx += np.sum(np.sum(np.abs(gradients[i])))
      print('Moved weights {0}'.format(dx))
      self.__action_model.set_weights(action_weights)
      self.__action_context = tf.get_default_graph()

      if should_update_critic:
         with self.__target_context.as_default():
            print('Updating critic')
            self.__target_model.set_weights([np.array(w, copy=True) for w in action_weights])

      print('UPDATE GRADIENT DEBUG END')

def update_critic(self):
   with self.__target_context.as_default():
      self.__target_model.set_weights([np.array(w, copy=True) for w in self.__action_model.get_weights()])

Когда каждая партия завершит обучение, мы собираемся обновить нашу модель и начать новую эпоху.

# Given a set of training data, trains the model and determine the gradients.
# The agent will use this to compute the model updates to send to the trainer
def get_gradient_update_from_batches(self, batches):
   pre_states = np.array(batches['pre_states'])
   post_states = np.array(batches['post_states'])
   rewards = np.array(batches['rewards'])
   actions = list(batches['actions'])
   is_not_terminal = np.array(batches['is_not_terminal'])

После того, как наш агент закончил обучение в окружающей среде, пришло время протестировать нашу модель, заставив ее двигаться по карте. Мы собираемся использовать следующую функцию для записи изображений в симуляторе и передачи их через нашу модель.

# Performs a state prediction given the model input
def predict_state(self, observation):
   if type(observation) == type([]):
      observation = np.array(observation)

   # Our model only predicts on a single state.
   # Take the latest image
   observation = observation[3, :, :, :]
   observation = observation.reshape(1, 59, 255, 3)
   with self.__action_context.as_default():
      predicted_qs = self.__action_model.predict([observation])

   # Select the action with the highest Q value
   predicted_state = np.argmax(predicted_qs)
   return predicted_state, predicted_qs[0][predicted_state]

Как только наша модель приняла решение, в качестве последнего шага мы преобразуем решение в исполняемые действия, которые можно запустить в симуляторе. Для этого проекта мы просто прогнозируем угол поворота, а логика ускорения очень проста. По сути, мы хотим, чтобы скорость нашего автомобиля была ниже определенной (в нашем примере 10 км/ч).

# Convert the current state to control signals to drive the car.
# As we are only predicting steering angle, we will use a simple controller to keep the car at a constant speed
def state_to_control_signals(self, state, car_state):
   if car_state.speed > 9:
      return self.__angle_values[state], 0, 1
   else:
      return self.__angle_values[state], 1, 0

Вот и все! Разве это не удивительно? Менее чем из 200 строк кода вы создали собственный автомобиль с автоматическим управлением. Теперь, хотя это еще не все, чтобы стать частью нашей повседневной жизни, это новая технология с большим потенциалом. Если эта технология когда-нибудь сможет стать частью нашей жизни, она предложит нам непревзойденное удобство за гранью нашего воображения.

Поздравляем, вы только что построили собственную беспилотную машину!

В итоге у вас должно получиться что-то похожее на это:

Если вам интересно, посмотрите исходный код здесь: https://github.com/DJDerie/selfdrivingcar2

Ключевые выводы

  1. Обучение с подкреплением учится, выясняя вещи самостоятельно
  2. CNN очень эффективны при обработке изображений и визуальных эффектов.
  3. Беспилотные автомобили имеют большой потенциал в будущем, ждите, чтобы увидеть некоторые из них на дорогах в вашем районе в будущем!

Перед тем, как ты уйдешь

  1. Нажмите эту кнопку несколько раз, если вы хотите видеть больше интересных статей в будущем.
  2. Свяжитесь со мной в LinkedIn
  3. Поделитесь этой статьей с друзьями
  4. Следите за новостями!