Рабочий процесс SageMaker ML с использованием настраиваемых контейнеров Docker

Перенос алгоритма машинного обучения (ML) или рабочего процесса, созданного в среде Python, в облако AWS включает в себя сложный набор шагов. Для начинающего специалиста по данным это непростая процедура. Несмотря на то, что в Интернете есть много документации, в большинстве из них нет подробностей и простых руководств, что делает его непонятным для первого пользователя AWS. В частности, казалось, что они были разработаны для тех, кто имеет некоторый опыт работы с данными. Эта серия руководств представляет собой попытку объединить различные элементы рабочего процесса машинного обучения (ML) в среде AWS.

  • Прием данных и EDA
  • Предварительная обработка данных / разработка функций в SageMaker
  • Обучение модели и оптимизация гиперпараметров с помощью SageMaker
  • Вывод с помощью пакетного преобразования SageMaker
  • Развертывание

Чтобы реализовать это, я использовал сервис AWS SageMaker с настраиваемым док-контейнером для создания автоматизированного конвейера. Включение всего содержимого на одну страницу было бы довольно громоздким, поэтому в этой первой части я расскажу о создании докера и использовании скелета SageMaker для обучения и логического вывода. Во второй части этой серии статей я расскажу, как рабочий процесс, созданный с помощью SageMaker, можно организовать с помощью AWS StepFunctions. И, наконец, в части III я сосредоточусь на автоматизации этих шагов с помощью AWS EventBridge и/или функции AWS Lambda.

Прежде чем приступить к работе с этим учебным пособием, вам потребуются следующие разрешения в вашем AWS IAM:

  • SageMakerExecutionRole или SageMakerFullAccess
  • AmazonEC2ContainerRegistryFullAccess

Прикрепите политику к роли выполнения SageMaker, связанной с экземпляром Notebook.

Перейдите в IAM -> выберите роли -> выберите AmazonSageMaker-ExecutionPolicy (или любую другую политику, которая у вас есть)

Добавить разрешения -› Прикрепить политику-› введите имя созданной выше политики в строку поиска и прикрепите политику

Теперь давайте настроим Docker-контейнер.

Структура папки Docker:

В Jupiter Lab структура папок должна выглядеть следующим образом.

|--Container
   |--sub-folder
      |--nginx.conf 
      |--predictor.py
      |--preprocessing.py
      |--serve
      |--train
      |--wsgi.py
      |--misc files
   |--build_and_push.sh
   |--Dockerfile
   |--setup.sh

У нас есть основная папка «Контейнер», в которой находятся все необходимые файлы. Внутри контейнера у нас есть подпапка и три файла сценария оболочки. Вы можете изменить название «подпапка» на свое усмотрение. Нам нужно только обновить содержимое файлов Dockerfile и build_and_push.sh. Внутри «подпапки» у нас есть несколько файлов. Вам не нужно изменять следующее: nginx.conf, serve, wsgi.py. Остальные файлы — это коды Python для вашего рабочего процесса машинного обучения. Также обратите внимание, что train — это не .py, а текстовый файл, вам просто нужно скопировать и вставить сюда содержимое вашего обучающего сценария. Все содержимое кода файлов предоставлено ниже и для вашей помощи.

Докерфайл:

# Build an image that can do data ingestion from Snowflake and preprocessing in SageMaker
# This is a Python 3 image that uses the nginx, gunicorn, flask stack
# for serving inferences in a stable way.
FROM python:3.7-slim-buster
# FROM ubuntu:18.04
# MAINTAINER Amazon AI <[email protected]>
RUN apt-get -y update && apt-get install -y --no-install-recommends \
      wget \
      python3-pip \
      python3-setuptools \
      nginx \
      ca-certificates \
 && rm -rf /var/lib/apt/lists/*
# RUN ln -s /usr/bin/python3 /usr/bin/python
# RUN ln -s /usr/bin/pip3 /usr/bin/pip

# Here we get all python packages.
RUN pip3 install pandas scikit-learn nltk  # (you may include versions)
RUN pip3 install flask gunicorn. # (make sure these are not disabled!)

# Set some environment variables. PYTHONUNBUFFERED keeps Python from buffering our standard
# output stream, which means that logs can be delivered to the user quickly. PYTHONDONTWRITEBYTECODE
# keeps Python from writing the .pyc files which are unnecessary in this case. We also update
# PATH so that the train and serve programs are found when the container is invoked.
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"

# Set up the program in the image
# Also, get familiar with docker file system, I have an example scrrenshot coming up later
COPY sub-folder /opt/program
WORKDIR /opt/program
RUN chmod +x /opt/program/preprocessing.py
RUN chmod +x /opt/program/predictor.py
RUN chmod +x /opt/program/train
RUN chmod +x /opt/program/serve

Последние три строки с функцией chmod добавляют аналогичные строки для дополнительных кодов Python, которые вы собираетесь выполнять в рабочем процессе ML. БУДЬТЕ ОСТОРОЖНЫ, когда вы комментируете коды здесь, у вас может возникнуть соблазн закомментировать одну или две строки, чтобы увидеть, что эти коды делают, но если вы забудете раскомментировать их позже, вы столкнетесь со странной ошибкой, из-за которой вы не будете знать, где что происходит. пошло не так.

Затем мы создаем сценарий оболочки для отправки контейнера докеров.

build_and_push.sh

%%sh
# The base name of our container image
algorithm_name= <your algorithm name>
cd <folder>
chmod +x <folder>/preprocessing.py
chmod +x <folder>/predictor.py
chmod +x <folder>/train
chmod +x <folder>/serve
account=$(aws sts get-caller-identity --query Account --output text)
# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:<your region>}
fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"
# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1
if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi
# Get the login command from ECR and execute it directly
aws ecr get-login-password --region ${region}|docker login --username AWS --password-stdin ${fullname}
# Build the docker image locally with the image name and then push it to ECR
# with the full name.
docker build  -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}
# Push to ECR, after you've tested locally
docker push ${fullname}
echo ${fullname}

Вам нужно обновить только строки внутри ‹...›.

Скопируйте содержимое файла setup.sh ниже, здесь ничего менять не нужно.

setup.sh

#!/bin/bash
sudo -n true
if [ $? -eq 0 ]; then
  echo "The user has root access."
else
  echo "The user does not have root access. Everything required to run the notebook is already installed and setup. We are good to go!"
  exit 0
fi
# Do we have GPU support?
nvidia-smi > /dev/null 2>&1
if [ $? -eq 0 ]; then
  # check if we have nvidia-docker
  NVIDIA_DOCKER=`rpm -qa | grep -c nvidia-docker2`
  if [ $NVIDIA_DOCKER -eq 0 ]; then
    # Install nvidia-docker2
    #sudo pkill -SIGHUP dockerd
    sudo yum -y remove docker
    sudo yum -y install docker-17.09.1ce-1.111.amzn1
sudo /etc/init.d/docker start
curl -s -L https://nvidia.github.io/nvidia-docker/amzn1/nvidia-docker.repo | sudo tee /etc/yum.repos.d/nvidia-docker.repo
    sudo yum install -y nvidia-docker2
    sudo cp daemon.json /etc/docker/daemon.json
    sudo pkill -SIGHUP dockerd
    echo "installed nvidia-docker2"
  else
    echo "nvidia-docker2 already installed. We are good to go!"
  fi
fi
# This is common for both GPU and CPU instances
# check if we have docker-compose
docker-compose version >/dev/null 2>&1
if [ $? -ne 0 ]; then
  # install docker compose
  pip install docker-compose
fi
# check if we need to configure our docker interface
SAGEMAKER_NETWORK=`docker network ls | grep -c sagemaker-local`
if [ $SAGEMAKER_NETWORK -eq 0 ]; then
  docker network create --driver bridge sagemaker-local
fi
# Notebook instance Docker networking fixes
RUNNING_ON_NOTEBOOK_INSTANCE=`sudo iptables -S OUTPUT -t nat | grep -c 169.254.0.2`
# Get the Docker Network CIDR and IP for the sagemaker-local docker interface.
SAGEMAKER_INTERFACE=br-`docker network ls | grep sagemaker-local | cut -d' ' -f1`
DOCKER_NET=`ip route | grep $SAGEMAKER_INTERFACE | cut -d" " -f1`
DOCKER_IP=`ip route | grep $SAGEMAKER_INTERFACE | cut -d" " -f12`
# check if both IPTables and the Route Table are OK.
IPTABLES_PATCHED=`sudo iptables -S PREROUTING -t nat | grep -c 169.254.0.2`
ROUTE_TABLE_PATCHED=`sudo ip route show table agent | grep -c $SAGEMAKER_INTERFACE`
if [ $RUNNING_ON_NOTEBOOK_INSTANCE -gt 0 ]; then
if [ $ROUTE_TABLE_PATCHED -eq 0 ]; then
    # fix routing
    sudo ip route add $DOCKER_NET via $DOCKER_IP dev $SAGEMAKER_INTERFACE table agent
  else
    echo "SageMaker instance route table setup is ok. We are good to go."
  fi
if [ $IPTABLES_PATCHED -eq 0 ]; then
    sudo iptables -t nat -A PREROUTING  -i $SAGEMAKER_INTERFACE -d 169.254.169.254/32 -p tcp -m tcp --dport 80 -j DNAT --to-destination 169.254.0.2:9081
    echo "iptables for Docker setup done"
  else
    echo "SageMaker instance routing for Docker is ok. We are good to go!"
  fi
fi

Теперь мы заходим в «подпапку» и копируем содержимое следующего:

nginx.conf

worker_processes 1;
daemon off; # Prevent forking
pid /tmp/nginx.pid;
error_log /var/log/nginx/error.log;
events {
  # defaults
}
http {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /var/log/nginx/access.log combined;
  
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
server {
    listen 8080 deferred;
    client_max_body_size 5m;
keepalive_timeout 5;
    proxy_read_timeout 1200s;
location ~ ^/(ping|invocations) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;
      proxy_pass http://gunicorn;
    }
location / {
      return 404 "{}";
    }
  }
}

Иногда, если ваш размер ввода слишком велик, вы можете обновить «client_max_body_size».

обслуживать

#!/usr/bin/env python
# This file implements the scoring service shell. You don't necessarily need to modify it for various
# algorithms. It starts nginx and gunicorn with the correct configurations and then simply waits until
# gunicorn exits.
#
# The flask server is specified to be the app object in wsgi.py
#
# We set the following parameters:
#
# Parameter                Environment Variable              Default Value
# ---------                --------------------              -------------
# number of workers        MODEL_SERVER_WORKERS              the number of CPU cores
# timeout                  MODEL_SERVER_TIMEOUT              60 seconds
import multiprocessing
import os
import signal
import subprocess
import sys
cpu_count = multiprocessing.cpu_count()
model_server_timeout = os.environ.get('MODEL_SERVER_TIMEOUT', 60)
model_server_workers = int(os.environ.get('MODEL_SERVER_WORKERS', cpu_count))
def sigterm_handler(nginx_pid, gunicorn_pid):
    try:
        os.kill(nginx_pid, signal.SIGQUIT)
    except OSError:
        pass
    try:
        os.kill(gunicorn_pid, signal.SIGTERM)
    except OSError:
        pass
sys.exit(0)
def start_server():
    print('Starting the inference server with {} workers.'.format(model_server_workers))
# link the log streams to stdout/err so they will be logged to the container logs
    subprocess.check_call(['ln', '-sf', '/dev/stdout', '/var/log/nginx/access.log'])
    subprocess.check_call(['ln', '-sf', '/dev/stderr', '/var/log/nginx/error.log'])
nginx = subprocess.Popen(['nginx', '-c', '/opt/program/nginx.conf'])
    gunicorn = subprocess.Popen(['gunicorn',
                                 '--timeout', str(model_server_timeout),
                                 '-k', 'sync',
                                 '-b', 'unix:/tmp/gunicorn.sock',
                                 '-w', str(model_server_workers),
                                 'wsgi:app'])
signal.signal(signal.SIGTERM, lambda a, b: sigterm_handler(nginx.pid, gunicorn.pid))
# If either subprocess exits, so do we.
    pids = set([nginx.pid, gunicorn.pid])
    while True:
        pid, _ = os.wait()
        if pid in pids:
            break
sigterm_handler(nginx.pid, gunicorn.pid)
    print('Inference server exiting')
# The main routine just invokes the start function.
if __name__ == '__main__':
    start_server()

всги

from predictor import app
# This is just a simple wrapper for gunicorn to find your app.
# If you want to change the algorithm file, simply change "predictor" above to the
# new file.

Выполнение сценариев Docker из Jupyter Notebooks:

# Build the docker container
!/bin/bash container/setup.sh
# Push the container to the AWS ECR
!/bin/bash container/build_and_push.sh

После успешного выполнения вы должны получить ECR-адрес контейнера, который должен выглядеть так:

<########>.dkr.ecr.region.amazonaws.com/container_name:latest

Теперь мы готовы выполнить рабочий процесс машинного обучения с помощью SageMaker.

В этом разделе мы обсудим следующие три шага: предварительная обработка, обучение и вывод.

Библиотеки, необходимые для следующих шагов:

import boto3
import re
import json
import os
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
sess = sagemaker.Session()
region = boto3.session.Session().region_name
role = get_execution_role()

Чтобы определить различные методы SageMaker с помощью контейнера докера, важно понимать, как файлы структурируются внутри докера при выполнении. Ниже скриншот из AWS:

Этап предварительной обработки:

С этой целью я собираюсь обсудить, как использовать метод SageMaker ScriptProcessor для предварительной обработки. ScriptProcessor полезен, когда вам нужно использовать пользовательские функции или некоторые библиотеки, недоступные в SageMaker. Например, в этом упражнении я использовал библиотеки NLTK для обработки текста, которых не было в SageMaker.

В качестве альтернативы вы также можете использовать SKLearn Processor, однако вам нужно добавить еще один файл .txt в качестве требований, показывающих, какие библиотеки необходимо установить во время выполнения.

Использование ScriptProcessor:

input_data_path = <s3 input data location>
output_data_path = <s3 location after processing>
image_uri = <###.dkr.ecr.region.amazonaws.com/container_name:latest>
script_processor = ScriptProcessor(
    command=['python3'],
    image_uri,
    role=role,
    instance_count=1,
    #instance_type="ml.m5.xlarge",
    instance_type='local',    (local mode to execute quickly)
)
script_processor.run(
    code="container/sub-folder/preprocessing.py",
    inputs=[ProcessingInput(source=input_data_path, destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(output_name="train_data",
                         source="/opt/ml/processing/output/train",
                         destination="{}/{}".format(output_data_path, "train_data")),
        ProcessingOutput(output_name="test_data",
                         source="/opt/ml/processing/output/test",
                         destination="{}/{}".format(output_data_path, "test_data")),
    ],
    arguments=["--train-test-split-ratio", "0.2"])

После успешного выполнения SageMaker выведет два файла: функции и метки отдельно для обучающих и тестовых данных.

Обратите внимание, что источник и назначение, описанные в разделе ввода и вывода в приведенном выше коде, являются источником и местом назначения в докере, которые создаются динамически во время выполнения. путь_входных_данных и путь_выходных_данных — это фактическое физическое расположение внутри корзины S3.

Вы можете проверить вывод после успешного выполнения шага предварительной обработки следующим образом:

preprocessing_job_description = script_processor.jobs[-1].describe()
output_config = preprocessing_job_description["ProcessingOutputConfig"]
for output in output_config["Outputs"]:
    if output["OutputName"] == "train_data":
        preprocessed_training_data_path = output["S3Output"]["S3Uri"]
    if output["OutputName"] == "test_data":
        preprocessed_test_data_path = output["S3Output"]["S3Uri"]

Использование SKLearnProcessor:

Прежде чем использовать эту опцию, создайте папку (например, extra_libraries), в которой вы сохраните файл с именем ‘requirements.txt’. Внутри этого файла вы укажете названия библиотек, которые необходимо установить.

pandas==1.3.4 
scikit-learn==1.0.2 
nltk==3.6.5

В методе sklearn_processor.run() вы указываете дополнительные входные данные для загрузки requirement.txt в место, куда загружаются коды.

Вам также потребуется добавить следующие строки кода в файл preprocessing.py и убедиться, что путь opt/ml совпадает с тем, который вы определите в методе SKLearnProcessor.

#update preprocessing.py file
import sys
import subprocess
subprocess.check_call([
    sys.executable, "-m", "pip", "install", "-r",
    "/opt/ml/processing/input/code/extra_libraries/requirements.txt",
])
.
.
.

SKLearnProcessor:

sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role=role,
    #instance_type="ml.m5.xlarge",
    instance_type='local',
    instance_count=1,
)
sklearn_processor.run(
    code="container/sub-folder/preprocessing.py",      
    inputs=[
        ProcessingInput(source=input_data_path, destination="/opt/ml/processing/input"),
        ProcessingInput(source="extra_libraries/", destination="/opt/ml/processing/input/code/extra_libraries")
    ],
    outputs=[
        ProcessingOutput(output_name="train_data",
                         source="/opt/ml/processing/output/train",
                         destination="{}/{}".format(output_data_path, "train_data")),
        ProcessingOutput(output_name="test_data",
                         source="/opt/ml/processing/output/test",
                         destination="{}/{}".format(output_data_path, "test_data")),
    ],
    arguments=["--train-test-split-ratio", "0.2"],
)
Note: 
The code files will be uploaded at /opt/ml/processing/input/code/... at runtime.
One benefit of using SKLearnProcessor over ScriptProcessor is that you can 
continuously change the codes inside the .py file and execute immediately. 
Whereas with the ScriptProcessor, you have to update the docker file and push 
the docker container before you can execute the preprocessing steps. 
And this will take extra minutes!

SageMaker будет выводить данные функции и метки отдельно для обучающего и тестового набора. Кроме того, внутри метода sklearn_processor.run() можно определить столько каналов ProcessingInput и ProcessingOutput.

Кроме того, использование instance_type для «local» ускорит выполнение вашего кода. Вы должны использовать этот режим при тестировании и, возможно, тестировать на меньшем фрагменте данных. По мере роста размера ваших данных вы обнаружите, что используете более крупный тип экземпляра.

Этап обучения:

Мы использовали метод SageMaker Estimator для построения рабочего процесса обучения.

sklearn = sagemaker.estimator.Estimator(
    image_uri=<########>.dkr.ecr.region.amazonaws.com/container_name:latest,
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type="ml.m5.xlarge",    
    #instance_type='local',   (I think local mode is not supported for training)
    sagemaker_session=sagemaker.Session(),
    output_path="s3://{}/output".format(<bucket>),
    hyperparameters={
                    "clf__max_iter":1000,
                    "clf__C": 1,
                    "clf__solver": "saga",
                    "clf__class_weight": "balanced",
                    "clf__n_jobs": -1,
                    "clf__penalty": "l1",
                    "clf__tol": 0.001,
                    "tfidf__tfidf__max_df": 0.06,
                    "tfidf__tfidf__min_df": 5
                    }, 
    base_job_name='base_train_hp'
)
sklearn.fit({"train": <preprocessed_training_data_path>})

Пример обучающего скрипта внутри файла «train»:

from __future__ import print_function

import json
import os
import pickle
import sys
import traceback
import argparse

import pandas as pd
import numpy as np

from imblearn.over_sampling import RandomOverSampler
from imblearn.pipeline import Pipeline

from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, fbeta_score

# These are the paths to where SageMaker mounts interesting things 
# in your container.

prefix = '/opt/ml/'

input_path = prefix + 'input/data'
output_path = os.path.join(prefix, 'output')
model_path = os.path.join(prefix, 'model')
param_path = os.path.join(prefix, 'input/config/hyperparameters.json')

# the huperparameters are supplied with the sklearn.estimator() method as shown
# int the previous block.


# This algorithm has a single channel of input data called 'train'. 
# Since we run in file mode, the input files are copied to the directory specified here.

channel_name='train'
training_path = os.path.join(input_path, channel_name)


def train_and_save(X_train, y_train, args, model_name_prefix=''):
    '''
    An example of binary classification from text input
    '''
    # Vectorize text
    tfidf = TfidfVectorizer()    

    # transform the 0th column 
    column_transformer = ColumnTransformer([('tfidf', tfidf, 'description_clean')],
                                           remainder='passthrough')

    # Random oversampler for unbalanced class
    ros = RandomOverSampler()

    # Create the model object    
    model_obj = LogisticRegression()

    # Initialize Pipeline
    pipe = Pipeline([
        ('tfidf', column_transformer),
        ('oversamp', ros),
        ('clf', model_obj)
        ])
    
    # only set hyperparameters that Pipeline will take
    pipe_params = pipe.get_params()
    for key, val in args.items():
        if key in pipe_params:
            pipe = pipe.set_params(**{key: val})
    
    # fit model
    pipe.fit(X_train, y_train)
    
    # score the model
    metric_score = roc_auc_score(y_train, pipe.predict_proba(X_train)[:, 1])
    print('roc_auc: {}'.format(metric_score))
          
    # save the model
    with open(os.path.join(model_path, '{}_logreg_model.pkl'.format(model_name_prefix)), 'wb') as out:
        pickle.dump(pipe, out)
    print('Training for {} {} complete.'.format(args['category'], model_name_prefix))


# The function to execute the training.
def train(args, training_path):
    print('Starting the training.')
    
    try:        
        X_train = pd.read_csv('{}/train_features.csv'.format(training_path))
        y_train = pd.read_csv('{}/train_labels.csv'.format(training_path))
       
        # Set features and/or label
        X_train = X_train[['input_columnn']]            
        y_train = y_train['target_column']

        train_and_save(X_train, y_train, args)
            

        
    except Exception as e:
        # Write out an error file. This will be returned as the failureReason in the
        # DescribeTrainingJob result.
        trc = traceback.format_exc()
        with open(os.path.join(output_path, 'failure'), 'w') as s:
            s.write('Exception during training: ' + str(e) + '\n' + trc)
        # Printing this causes the exception to be in the training job logs, as well.
        print('Exception during training: ' + str(e) + '\n' + trc, file=sys.stderr)
        # A non-zero exit code causes the training job to be marked as Failed.
        sys.exit(255)

def _decode(obj):
    '''
    This function is to decode the argement parameter supplied
    '''
    # convert floats if possible
    rv = {}
    for k, v in obj.items():
        if isinstance(v, str):
            try:
                rv[k] = int(v)
            except ValueError:
                try:
                    rv[k] = float(v)
                except ValueError:
                    rv[k] = v
        else:
            rv[k] = v
    return rv


if __name__ == '__main__':    
    with open(param_path, 'r') as tc:
        args = json.load(tc)
        
    args = _decode(args)    
    train(args, training_path)

    # A zero exit code causes the job to be marked a Succeeded.
    sys.exit(0)

Шаг вывода:

В SageMaker вывод выполняется двумя способами: путем создания конечной точки для непрерывного прогнозирования/прогнозирования в реальном времени или пакетного преобразования для однократного прогнозирования. В этом примере я буду обсуждать только вариант пакетного преобразования, поскольку мой рабочий процесс был разработан для автоматизации прогнозирования один раз в неделю.

model = sagemaker.model.Model(
    model_data='<s3 bucket model location>/.../model.tar.gz',
    image_uri,
    role=role)
transformer = model.transformer(
    instance_count=1,
    instance_type="ml.m5.4xlarge",
    output_path=batch_output_path,
    assemble_with="Line",
    accept="text/csv",
)
transformer.transform(
    batch_input_path, 
    content_type="text/csv", 
    split_type="Line",
    #join_source="Input",       # to join the prediction with input data
    #input_filter="$[1:]",      # filter the input columns
    #output_filter='$[0,-1]',   # filter the output columns
)
transformer.wait()

Пакетное преобразование SageMaker должно позволять фильтровать входные данные и выводить прогнозы, связанные с входными данными. В моей первой попытке это еще не сработало, но я оставил их в комментариях, чтобы вы могли попробовать, если это сработает для вас.

При создании объекта модели мы должны определить док-контейнер, используемый для обучения, иначе задание не будет выполнено. Что делать, если у вас есть только модель, но нет образа контейнера или образ контейнера был обновлен? Это то, что мне нужно для исследования. Если у вас есть предложение, не стесняйтесь комментировать.

Скрипт предсказания выглядит так:

from __future__ import print_function

import io
import json
import os
import pickle
import signal
import sys
import traceback

import flask
import pandas as pd

prefix = "/opt/ml/"
model_path = os.path.join(prefix, "model")

# A singleton for holding the model. This simply loads the model and holds it.
# It has a predict function that does a prediction based on the model and the input data.


class ScoringService(object):
    model = None  # Where we keep the model when it's loaded

    @classmethod
    def get_model(cls):
        """Get the model object for this instance, loading it if it's not already loaded."""
        if cls.model == None:
            with open(os.path.join(model_path, "_logreg_model.pkl"), "rb") as inp:
                           cls.model = pickle.load(inp)
        return cls.model

    @classmethod
    def predict(cls, input):
        """For the input, do the predictions and return them.

        Args:
            input (a pandas dataframe): The data on which to do the predictions. There will be
                one prediction per row in the dataframe"""
        clf = cls.get_model()
        return clf.predict(input)


# The flask app for serving predictions
app = flask.Flask(__name__)


@app.route("/ping", methods=["GET"])
def ping():
    """Determine if the container is working and healthy. In this sample container, we declare
    it healthy if we can load the model successfully."""
    health = ScoringService.get_model() is not None  # You can insert a health check here

    status = 200 if health else 404
    return flask.Response(response="\n", status=status, mimetype="application/json")


@app.route("/invocations", methods=["POST"])
def transformation():
    """Do an inference on a single batch of data. In this sample server, we take data as CSV, convert
    it to a pandas data frame for internal use and then convert the predictions back to CSV (which really
    just means one prediction per line, since there's a single column.
    """
    data = None

    # Convert from CSV to pandas
    if flask.request.content_type == "text/csv":
        data = flask.request.data.decode("utf-8")
        s = io.StringIO(data)
        data = pd.read_csv(s) 
        
    else:
        return flask.Response(
            response="This predictor only supports CSV data", status=415, mimetype="text/plain"
        )
    
    print("Invoked with {} records".format(data.shape[0]))

    # Do the prediction
    predictions = ScoringService.predict(data)

    # Convert from numpy back to CSV
    out = io.StringIO()
    pd.DataFrame({"results": predictions}).to_csv(out, header=False, index=False)
    result = out.getvalue()

    return flask.Response(response=result, status=200, mimetype="text/csv")

В этом упражнении результаты моего моделирования состояли из нескольких моделей, сжатых вместе в папку. Чтобы увидеть, как SageMaker сохранил обученную модель в корзине S3, был полезен следующий код на основе Linux, который вы можете выполнить из блокнота Jupyter.

!aws s3 cp <s3 bucket model location>/.../model.tar.gz <new_location>/model.tar.gz
!tar -xvf <new_location>/model.tar.gz

После этого обычная оценка модели!

Следующим шагом является объединение этих шагов в конвейер для автоматизации. С этой целью я буду обсуждать AWS StepFunctions во второй части этой серии руководств.

Спасибо за чтение!

Ссылка: