Применение лучших практик MLOps к расширенным параметрам обслуживания

MLOps — это важная практика для продуктивной работы ваших рабочих процессов машинного обучения. С помощью MLOps вы можете создавать рабочие процессы, рассчитанные на жизненный цикл машинного обучения. Это упрощает централизованное обслуживание ресурсов, обновление/отслеживание моделей и в целом упрощает процесс по мере увеличения масштабов ваших экспериментов с машинным обучением.

Ключевым инструментом MLOps в рамках экосистемы Amazon SageMaker является SageMaker Pipelines. С помощью SageMaker Pipelines вы можете определять рабочие процессы, состоящие из различных определенных этапов машинного обучения. Вы также можете структурировать эти рабочие процессы, определив параметры, которые можно вводить в качестве переменных в конвейер. Более общее введение в SageMaker Pipelines см. в статье по ссылке.

Определение конвейера само по себе не очень сложно, но есть несколько расширенных вариантов использования, которые требуют дополнительной настройки. В частности, предположим, что вы обучаете несколько моделей, необходимых для логического вывода в вашем сценарии использования машинного обучения. В SageMaker есть вариант хостинга, известный как Мультимодельные конечные точки (MME), где вы можете размещать несколько моделей на одной конечной точке и вызывать целевую модель. Однако в SageMaker Pipelines на данный момент нет встроенной поддержки определения или развертывания MME. В этом сообщении блога мы рассмотрим, как мы можем использовать Лямбда-этап конвейеров для развертывания мультимодельной конечной точки настраиваемым образом, придерживаясь при этом лучших практик MLOP.

ПРИМЕЧАНИЕ. Для тех из вас, кто не знаком с AWS, убедитесь, что вы создали учетную запись по следующей ссылке, если хотите продолжить. Статья также предполагает промежуточное понимание развертывания SageMaker, я бы посоветовал следовать этой статье для более глубокого понимания развертывания/вывода. В частности, для SageMaker Multi-Model Endpoints я бы назвал следующий блог.

Настраивать

В этом примере мы будем работать в SageMaker Studio, где у нас есть доступ к визуальным интерфейсам для SageMaker Pipelines и других компонентов SageMaker. Для разработки мы будем использовать экземпляр Studio Notebook с ядром Data Science на экземпляре ml.t3.medium. Для начала нам нужно сначала импортировать необходимые библиотеки для различных шагов, которые мы будем использовать в SageMaker Pipelines.

import os
import boto3
import re
import time
import json
from sagemaker import get_execution_role, session
import pandas as pd

from time import gmtime, strftime
import sagemaker
from sagemaker.model import Model
from sagemaker.image_uris import retrieve
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.estimator import Estimator

# Custom Lambda Step
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.pipeline import Pipeline

Затем мы создаем конвейерную сессию, эта конвейерная сессия гарантирует, что ни одно из обучающих заданий не будет выполняться в блокноте до тех пор, пока не будет выполнен сам конвейер.

pipeline_session = PipelineSession()

В этом примере мы будем использовать набор данных Abalone (CC BY 4.0) и запустим на нем алгоритм SageMaker XGBoost для регрессионной модели. Вы можете загрузить набор данных из общедоступных наборов данных Amazon.

!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/uci_abalone/train_csv/abalone_dataset1_train.csv .
!aws s3 cp abalone_dataset1_train.csv s3://{default_bucket}/xgboost-regression/train.csv
training_path = 's3://{}/xgboost-regression/train.csv'.format(default_bucket)

Затем мы можем параметризировать наш конвейер, определив значения по умолчанию как для набора обучающих данных, так и для типа экземпляра.

training_input_param = ParameterString(
    name = "training_input",
    default_value=training_path,
)

training_instance_param = ParameterString(
    name = "training_instance",
    default_value = "ml.c5.xlarge")

Затем мы также получаем контейнер, предоставленный AWS для XGBoost, который мы будем использовать для обучения и вывода.

model_path = f's3://{default_bucket}/{s3_prefix}/xgb_model'

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_param,
)

image_uri

Настройка обучения

Для обучающей части нашего конвейера мы будем настраивать алгоритм SageMaker XGBoost для нашего регрессионного набора данных Abalone.

xgb_train_one = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_param,
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role
)

xgb_train_one.set_hyperparameters(
    objective="reg:linear",
    num_round=40,
    max_depth=4,
    eta=0.1,
    gamma=3,
    min_child_weight=5,
    subsample=0.6,
    silent=0,
)

Затем для нашего второго оценщика мы меняем наши гиперпараметры, чтобы настроить обучение нашей модели, чтобы у нас были две отдельные модели за нашей конечной точкой мультимодели.

xgb_train_two = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_param,
    instance_count=1,
    output_path=model_path,
    sagemaker_session=pipeline_session,
    role=role
)

#adjusting hyperparams
xgb_train_two.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

Затем мы настраиваем наши обучающие входные данные для обоих оценщиков, чтобы они указывали на параметр, который мы определили для нашего обучающего набора данных S3.

train_args_one = xgb_train_one.fit(
    inputs={
        "train": TrainingInput(
            s3_data=training_input_param,
            content_type="text/csv",
        )
    }
)

train_args_two = xgb_train_two.fit(
    inputs={
        "train": TrainingInput(
            s3_data=training_input_param,
            content_type="text/csv",
        )
    }
)

Затем мы определяем два отдельных шага обучения, которые будут выполняться параллельно через наш конвейер.

step_train_one = TrainingStep(
    name="TrainOne",
    step_args=train_args_one,
)

step_train_two = TrainingStep(
    name = "TrainTwo",
    step_args= train_args_two
)

Лямбда-шаг

Лямбда-шаг по сути позволяет вам подключить лямбда-функцию к вашему конвейеру. Для каждого учебного задания SageMaker создается файл model.tar.gz, содержащий артефакты обученной модели. Здесь мы воспользуемся этапом Lambda, чтобы получить артефакты обученной модели и развернуть их на мультимодельной конечной точке SageMaker.

Прежде чем мы сможем это сделать, нам нужно предоставить нашей функции Lambda соответствующие разрешения для работы с SageMaker. Мы можем использовать следующий существующий скрипт для создания роли IAM для нашей лямбда-функции.

import boto3
import json

iam = boto3.client("iam")


def create_lambda_role(role_name):
    try:
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(
                {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Effect": "Allow",
                            "Principal": {"Service": "lambda.amazonaws.com"},
                            "Action": "sts:AssumeRole",
                        }
                    ],
                }
            ),
            Description="Role for Lambda to call SageMaker functions",
        )

        role_arn = response["Role"]["Arn"]

        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
        )

        response = iam.attach_role_policy(
            PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", RoleName=role_name
        )

        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f"Using ARN from existing role: {role_name}")
        response = iam.get_role(RoleName=role_name)
        return response["Role"]["Arn"]
from iam_helper import create_lambda_role

lambda_role = create_lambda_role("lambda-deployment-role")

После того, как мы определили нашу роль Lambda, мы можем создать функцию Lambda, которая сделает за нас несколько вещей:

  • Берет каждый отдельный файл model.tar.gz из каждого учебного задания и помещает его в центральное расположение S3, содержащее оба архива. Для MME они ожидают, что все архивы моделей будут находиться в одном единственном пути S3.
  • Использует клиент boto3 с SageMaker для создания модели SageMaker, конфигурации конечной точки и конечной точки.

Мы можем использовать следующие вспомогательные функции для выполнения первой задачи, скопировав артефакты задания обучения в центральное расположение S3 с обоими архивами моделей.

sm_client = boto3.client("sagemaker")
s3 = boto3.resource('s3')

def extract_bucket_key(model_data):
    """
    Extracts the bucket and key from the model data tarballs that we are passing in
    """
    bucket = model_data.split('/', 3)[2]
    key = model_data.split('/', 3)[-1]
    return [bucket, key]

def create_mme_dir(model_data_dir):
    """
    Takes in a list of lists with the different trained models, 
    creates a central S3 bucket/key location with all model artifacts for MME.
    """
    bucket_name = model_data_dir[0][0]
    for i, model_data in enumerate(model_data_dir):
        copy_source = {
              'Bucket': bucket_name,
              'Key': model_data[1]
            }
        bucket = s3.Bucket(bucket_name)
        destination_key = 'xgboost-mme-pipelines/model-{}.tar.gz'.format(i)
        bucket.copy(copy_source, destination_key)
    mme_s3_path = 's3://{}/xgboost-mme-pipelines/'.format(bucket_name)
    return mme_s3_path

Следующими шагами для нашей функции Lambda будет создание необходимых сущностей SageMaker для создания конечной точки в реальном времени:

  • Модель SageMaker: содержит данные модели и образ контейнера, а также определяет конечные точки мультимоделей и одиночной модели.
  • Конфигурация конечной точки SageMaker: определяет аппаратное обеспечение конечной точки, тип экземпляра и количество.
  • Конечная точка SageMaker: ваша конечная точка REST, которую вы можете вызывать для логического вывода, для MME вы также указываете модель, относительно которой хотите выполнить логический вывод.
    model_name = 'mme-source' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
    create_model_response = sm_client.create_model(
        ModelName=model_name,
        Containers=[
            {
                "Image": image_uri,
                "Mode": "MultiModel",
                "ModelDataUrl": model_url
            }
        ],
        #to-do parameterize this
        ExecutionRoleArn='arn:aws:iam::474422712127:role/sagemaker-role-BYOC',
    )
    print("Model Arn: " + create_model_response["ModelArn"])
    
    
    #Step 2: EPC Creation
    xgboost_epc_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
    endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=xgboost_epc_name,
        ProductionVariants=[
            {
                "VariantName": "xgbvariant",
                "ModelName": model_name,
                "InstanceType": "ml.c5.large",
                "InitialInstanceCount": 1
            },
        ],
    )
    print("Endpoint Configuration Arn: " + endpoint_config_response["EndpointConfigArn"])
    
    #Step 3: EP Creation
    endpoint_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name,
        EndpointConfigName=xgboost_epc_name,
    )
    print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])

Мы возвращаем успешное сообщение с помощью нашей функции Lambda, как только сможем начать создание конечной точки.

return {
        "statusCode": 200,
        "body": json.dumps("Created Endpoint!"),
        "endpoint_name": endpoint_name
    }

Затем мы определяем эту лямбда-функцию в необходимом формате лямбда-шага для нашего конвейера.

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="code/lambda_helper.py",
    handler="lambda_helper.lambda_handler",
)

Мы также определяем, что мы возвращаем из Lambda в виде выходных параметров.

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="endpoint_name", output_type=LambdaOutputTypeEnum.String)

Затем мы определяем наши входные данные с помощью двух разных артефактов обученной модели из этапов обучения, которые мы определили ранее в нашей записной книжке.

step_deploy_lambda = LambdaStep(
    name="LambdaStep",
    lambda_func=func,
    inputs={
        "model_artifacts_one": step_train_one.properties.ModelArtifacts.S3ModelArtifacts,
        "model_artifacts_two": step_train_two.properties.ModelArtifacts.S3ModelArtifacts
    },
    outputs=[output_param_1, output_param_2, output_param_3],
)

Выполнение конвейера и выборка вывода

Теперь, когда у нас настроены различные шаги, мы можем объединить все это в единый конвейер. Мы указываем на наши три разных шага и разные параметры, которые мы определили. Обратите внимание, что вы также можете определить дополнительные параметры, чем мы сделали здесь, в зависимости от вашего варианта использования.

pipeline = Pipeline(
    name="mme-pipeline",
    steps=[step_train_one, step_train_two, step_deploy_lambda],
    parameters= [training_input_param, training_instance_param]
)

Теперь мы можем запустить конвейер с помощью следующих команд.

pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

После выполнения мы замечаем, что в пользовательском интерфейсе Studio для вкладки «Конвейеры» для вашего конвейера был создан Directed Acylic Graph (DAG) для отображения вашего рабочего процесса.

Через несколько минут вы также должны увидеть, что конечная точка была создана в консоли SageMaker.

Затем мы можем протестировать эту конечную точку с помощью примера вывода, чтобы убедиться, что она работает правильно.

import boto3
smr = boto3.client('sagemaker-runtime') #client for inference

#specify the tarball you are invoking in the TargetModel param
resp = smr.invoke_endpoint(EndpointName=endpoint_name, Body=b'.345,0.224414,.131102,0.042329,.279923,-0.110329,-0.099358,0.0', 
                           ContentType='text/csv', TargetModel = 'model-0.tar.gz')

print(resp['Body'].read())

Дополнительные ресурсы и заключение



Код для всего примера можно найти по ссылке выше (следите за новыми примерами Pipelines). В этом примере расширенный вариант хостинга сочетается с передовыми методами MLOP. Крайне важно использовать инструменты MLOP по мере того, как вы расширяете свои эксперименты с машинным обучением, поскольку это помогает упростить и параметризовать ваши усилия, чтобы командам было легче сотрудничать и отслеживать. Я надеюсь, что эта статья была хорошим обзором использования конвейеров для конкретного варианта использования хостинга в MME. Как всегда, все отзывы приветствуются, спасибо за чтение!

Если вам понравилась эта статья, не стесняйтесь связаться со мной в LinkedIn и подписаться на мою Информационную рассылку. Если вы новичок в Medium, зарегистрируйтесь с помощью моего Реферала для участников.