Из этой истории вы узнаете, как автоматически обрабатывать данные в конвейере машинного обучения на AWS.

В настоящее время каждый специалист по данным должен знать, как интегрировать свои модели в облачную платформу, чтобы улучшить свою работу и стать более ценным специалистом по данным. К сожалению, концепция интеграции немного сложна, когда вы новичок, но, к счастью, эта история для вас, если вы хотите построить свой первый конвейер машинного обучения в облаке, а точнее на Amazon Web Services (AWS).

Как вы можете видеть на схеме, вход конвейера - это загрузка некоторых данных на S3, а выход конвейера - это предварительно обработанные данные, записанные на S3. Все в конвейере автоматизировано.

Для этого конвейера я буду использовать следующие сервисы AWS:

  • S3 (Simple Storage Service): Сервис, обеспечивающий хранение объектов через интерфейс веб-сервиса.
  • Лямбда: позволяет запускать код без предоставления серверов и управления ими.
  • SQS (Simple Queue Service): полностью управляемая служба очередей сообщений, которая позволяет разделять и масштабировать микросервисы, распределенные системы и бессерверные приложения.
  • ECS (Elastic Container Service): полностью управляемый сервис оркестровки контейнеров. Вы можете запускать свои кластеры ECS с помощью AWS Fargate, который представляет собой бессерверные вычисления для контейнеров. Fargate избавляет от необходимости выделять серверы и управлять ими, позволяет указывать и оплачивать ресурсы для каждого приложения, а также повышает безопасность за счет изолированности приложений по дизайну.
  • ECR (эластичный реестр контейнеров): полностью управляемый реестр контейнеров Docker, который упрощает разработчикам хранение, управление и развертывание образов контейнеров Docker.
  • Пошаговая функция: позволяет координировать несколько сервисов AWS в бессерверные рабочие процессы, чтобы вы могли быстро создавать и обновлять приложения.

Если вы уже знакомы с некоторыми сервисами AWS и просто заинтересованы в какой-то части руководства, я пишу это оглавление, чтобы вы могли сразу перейти к той части, которая вас интересует.

  1. Создайте корзину S3.
  2. Создайте очередь SQS.
  3. Закодируйте лямбда-функцию, запускаемую по событию S3, чтобы отправить сообщение в очередь.
  4. Создайте свои сценарии для предварительной обработки ваших данных.
  5. Создайте образ Docker для запуска ваших скриптов.
  6. Отправьте образ Docker в ECR.
  7. Создайте задачу и кластер Fargate.
  8. Организуйте свой конвейер с помощью функции Step.

Для этого руководства требуется, чтобы учетная запись AWS была настроена в интерфейсе командной строки AWS (или пользователь IAM с разрешениями) и Docker был установлен локально на вашем компьютере.

Создайте корзину S3

Первым шагом является создание корзины S3, которая позволит вам загружать такие документы, как данные (json, csv, xlsx,…).

Зайдите в Консоль AWS, в сервисе S3 и нажмите Create Bucket. Вам будет предложено указать имя сегмента, которое должно быть уникальным, и указать, где будет размещаться ваша корзина. Ваше местоположение должно быть одинаковым для всех сервисов в одном проекте. Затем вы можете просто нажимать Далее до тех пор, пока не будет создана корзина.

Создать очередь SQS

Теперь перейдите в службу SQS на консоли aws, создайте новую очередь, дайте ей желаемое имя и выберите «стандартная очередь». Затем нажмите «Очередь быстрого создания».

Закодируйте свою лямбда-функцию, запускаемую по событию S3, чтобы отправить сообщение в очередь.

Теперь, когда у вас есть корзина S3 и очередь SQS, цель состоит в том, чтобы отправить сообщение в очереди в службу SQS, когда файл загружен в S3. Задача fargate спросит очередь SQS, что ей делать.

Перейдите в службу Lambda и создайте новую функцию. Укажите имя функции и язык программирования, который вы хотите запустить. (Я буду использовать Python: 3.8.0)

Затем в разделе роль исполнения вы должны указать роль с политикой полного доступа S3 и политикой полного доступа пошаговых функций. Если у вас его нет, перейдите к его созданию в консоли IAM.

Пришло время писать код. Как я уже сказал, этот учебник будет написан исключительно на Python, но вы можете свободно использовать тот язык, который вам нужен.

В Python библиотека, которая полезна для взаимодействия с сервисами AWS, называется Boto3.

Код, который вы вставляете в лямбда-функцию, должен выглядеть так:

import boto3
s3 = boto3.resource('s3')
sqs = boto3.client('sqs')
queue_url = 'your_queue_url'
def lambda_handler(s3_event, context):
    # Get the s3 event
    for record in s3_event.get("Records"):
        bucket = record.get("s3").get("bucket").get("name")
        key = record.get("s3").get("object").get("key")
        
        # Send message to SQS queue
        response = sqs.send_message(
            QueueUrl=queue_url,
            DelaySeconds=1,
            MessageAttributes={
                'key': {
                    'DataType': 'String',
                    'StringValue': key
                }
            },
            MessageBody=(bucket)
        )

В этом коде вы получаете ведро и ключ файла из события s3 и отправляете эти 2 информации в очередь.

После того, как функция закодирована, вы должны запустить ее с помощью события s3. В разделе «Конструктор» нажмите «Добавить триггер», выберите S3 и укажите сегмент, в котором вы хотите, чтобы ваша лямбда-функция запускалась при загрузке в него чего-либо. Префикс и суффикс полезны для фильтрации файлов, которые должны запускать функцию. Если вы хотите запускать только тогда, когда файл csv загружается в каталог с именем data, просто укажите суффикс .csv и префикс /data. Затем нажмите «добавить», и ваш триггер готов.

Обратите внимание, что если вы пишете в том же сегменте, в котором запускаете триггер, вы должны создать каталоги и указать префикс, в противном случае триггер будет бесконечным.

Создайте свои скрипты для предварительной обработки ваших данных

Пришло время написать ваши скрипты Python (или другие), которые будут обрабатывать данные. Для примера я напишу сценарий, который загружает данные как pandas DataFrame, перетасовывает строки и записывает обработанные данные в корзину s3.

Я советую вам в первый раз писать код, не думая о среде AWS, чтобы протестировать его локально, а во второй части - добавить строки boto3.

import pandas as pd
if __name__ == "__main__":
    #read your dataframe (locally)
    df = pd.read_csv('dataframe.csv')
    #compute the process you want
    shuffled_df = df.sample(frac=1)
    #print df's head in console to check it works
    print(shuffled_df.head())

Этот первый сценарий предназначен для локального тестирования, вам необходимо предоставить файл dataframe.csv в свой локальный каталог для его запуска.

Как только вы убедитесь, что он работает, цель состоит в том, чтобы написать тот же сценарий с помощью boto3 для управления другими сервисами AWS. В нашем случае нам нужно управлять тем, как получить информацию об очереди, чтобы узнать, какой фрейм данных, загруженный на S3, будет обрабатывать Fargate Task. Нам также необходимо записать этот фрейм данных после его обработки в корзину S3.

Этот код обрабатывает данные точно так же, как предыдущий. Единственная разница в том, что мы должны думать о том, как получить данные и как их записать с помощью boto3.

Поскольку мы помнили, что ECS fargate - это средство запуска контейнеров, мы должны создать образ контейнера благодаря Docker. Работайте в чистом каталоге, который на этом этапе содержит только файл fargate_task.py.

Создайте файл requirements.txt, в котором вы указываете библиотеки Python, необходимые для запуска вашего скрипта. Если вы работаете в виртуальной среде, вы можете ввести следующую команду в терминале UNIX: pip freeze > requirements.txt, в противном случае вы можете написать ее вручную следующим образом:

pandas == 1.0.4
boto3 == 1.13.20
s3fs

Обратите внимание: если вы все же укажете версию, будет загружена последняя версия.

Затем создайте файл с именем Dockerfile. Этот файл будет содержать инструкции для выполнения при создании образа Docker.

FROM python:3.8.0
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
CMD python main.py

В этом файле докеров выполняются следующие действия:

  • FROM создает слой контейнера из python:3.8.0 образа Docker.
  • COPY добавляет файлы из текущего каталога вашего Docker-клиента.
  • WORKDIR устанавливает рабочий каталог.
  • RUN выполнить команду во время сборки.
  • CMD выполняйте команду не при сборке, а при запуске.

Команда для создания образа из файла Docker: docker build -t imagename ., параметр -t используется для указания имени образа докера, а '.' указывает, что вы создаете образ в соответствии с файлом докера в текущем каталоге.

Отправьте образ Docker в ECR.

Теперь, когда вы подготовили образ докера для сборки, сервис AWS ECR упростит отправку образа. Перейдите в консоль ECR и create repository укажите имя, которое вы хотите дать репозиторию, и завершите создание репозитория. После его создания щелкните свое репо и нажмите кнопку View push commands. Он покажет вам 4 точных команды, которые вам нужно скопировать и вставить в локальный терминал, чтобы отправить образ докера в ECR.

Первая команда зарегистрирует ваш Docker с помощью ECR, вторая создаст образ, как я объяснял ранее, третья пометит изображение, чтобы дать ему хорошее имя для отправки в ECR, а последняя отправит в ECR. После завершения отправки вы сможете увидеть свое изображение в интерфейсе ECR.

Создайте задачу и кластер Fargate.

Чтобы запустить контейнер, вы должны создать задачу, которая будет выполняться в кластере. Перейдите в консоль ECS и щелкните Определение задачи ›Создать определение задачи. Выберите запуск Fargate и дайте своей задаче имя, роль с полным доступом SQS и S3, размер памяти задачи и, наконец, образ ECR контейнера.

После создания задачи, все еще находясь на консоли ECS, создайте кластер. Дайте ему имя, нажмите «Создать VPC» и оставьте подсети по умолчанию.

Проверьте, все ли работает на этом шаге руководства.

Прежде чем говорить об оркестраторе, на этом этапе руководства вы должны иметь возможность протестировать задачу fargate, запустив ее вручную. Для этого загрузите файл данных в корзину S3, удовлетворяющую условиям триггера. Он должен отправить сообщение в вашу очередь SQS (вы можете проверить на консоли SQS количество сообщений в очереди). Если действительно есть сообщение в очереди, вы можете вручную запустить свою задачу. Перейдите в консоль ECS, щелкните свою задачу и щелкните Action > Run task. Нажмите «Перейти к типу запуска» и выберите fargate, выберите кластер, который будет запускать его, и подсети, связанные с этим кластером.

После выполнения задачи запуск кластера занимает минуту. Вы можете проверить журналы, если появляется ошибка. В противном случае процесс обработки данных должен быть записан в вашу корзину S3.

Организуйте свой конвейер с помощью функции Step.

В настоящее время ваш конвейер работает, но не полностью автоматизирован, вам нужно решение для автоматического запуска задачи Fargate. Пошаговая функция - это одно из решений, которое вы можете использовать. Это конечный автомат, который позволяет вам выполнять ваш конвейер шаг за шагом.

Перейдите в консоль функции Step и create state machine, и в качестве определения напишите json, похожий на:

{
  "StartAt": "Modelling_task",
  "States": {
    "Modelling_task": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "YOUR_CLUSTER_ARN",
        "TaskDefinition": "YOUR_TASK_DEFINITION_ARN",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "Subnets": [
              "YOUR_SUBNET1",
              "YOUR_SUBNET2"
            ],
            "AssignPublicIp": "ENABLED"
          }
        }
      },
      "End": true
    }
  }
}

Затем нажмите «Далее» и укажите роль с полным доступом S3, полным доступом SQS, полным доступом ECS и полным доступом к функции Step.

Ваш конвейер выглядит примерно так:

После создания конечного автомата лучше всего попросить первую лямбду вашего конвейера запустить конечный автомат, чтобы конвейер мог выполняться автоматически.

Перейдите к первому коду лямбды, добавьте к нему код с глобальными переменными:

stepFunction = boto3.client('stepfunctions')

и в конце основной функции:

#RUN stepfunction new_company
response = stepFunction.start_execution(
    stateMachineArn='YOUR_STATE_MACHINE_ARN'
    )

Теперь вы можете протестировать весь конвейер, загрузив новый файл данных в корзину S3 и следуя инструкциям в консоли Step-function.

Заключение

Теперь вы можете создать свой первый конвейер машинного обучения с помощью AWS. Очевидно, вы можете расширить этот базовый конвейер до чего-то более сложного, повторив некоторые шаги руководства. Например, более сложный проект машинного обучения будет выглядеть примерно так.