Оптимизация разработки машинного обучения с помощью многократно используемых, настраиваемых шагов и конвейеров

Введение

Разработка проектов машинного обучения часто требует от практиков управления несколькими этапами и компонентами в рамках жизненного цикла машинного обучения. Хорошо структурированная среда жизненного цикла машинного обучения может упростить этот процесс, помогая разработчикам создавать повторно используемые компоненты и поддерживать организованные кодовые базы. В этой статье мы обсудим модульный подход к разработке инфраструктуры жизненного цикла машинного обучения с использованием настраиваемых шагов и конвейеров, которые служат основой для ваших проектов. Подход здесь — это новый подход или руководство по реализации фрейма для жизненного цикла машинного обучения. Мы расширим два базовых класса, которые станут основой нашей структуры; то есть BaseStep и BasePipeline.

В следующих разделах мы рассмотрим следующее:

  • Базовый шаг
  • Базовый конвейер
  • Пример или вариант использования

BaseStep: строительный блок настраиваемых шагов

Класс BaseStep — это абстрактный базовый класс (ABC), который служит основой для всех этапов жизненного цикла машинного обучения. Каждый шаг в конвейере будет наследоваться от этого базового класса, гарантируя, что каждый шаг будет соответствовать согласованному интерфейсу. Этот базовый класс предоставляет методы для сохранения и загрузки артефактов шагов конвейера.

К основным методам и свойствам этого класса относятся:

  • run(self, *args, **kwargs): абстрактный метод, запускающий компонент конвейера. Каждый производный класс должен реализовать этот метод.
  • metadata(self): абстрактное свойство, которое возвращает любые метаданные, записанные во время выполнения компонента. Каждый производный класс должен реализовать этот метод.
  • get_init_args(cls): метод класса, возвращающий аргументы конструктора компонента.
  • get_init_values(self): метод экземпляра, который возвращает значения атрибутов для текущего экземпляра.
  • Метод __repr__ также реализован для обеспечения удобочитаемого представления шага.

Вот возможный такой базовый класс

"""Base class for all ML Lifecycle Steps"""

from abc import ABC, abstractmethod
import pickle
import inspect


class BaseStep(ABC):
    """Base Class for a ML pipeline step. Each step in ML life cycle will inherit from this Base class.
    Base clas methods mainly contains saving and loading of pipeline component artifacts .
    """

    _type = "step"

    @abstractmethod
    def run(self, *args, **kwargs):
        """Run the pipeline step"""
        return self

    @property
    @abstractmethod
    def metadata(self):
        """Return any metadata that was recorded during the step run"""

    @classmethod
    def get_init_args(cls):
        """Get constructor names"""
        # get the contructor arguments of the step
        init_signature = inspect.signature(cls.__init__)
        return sorted(
            [p.name for p in init_signature.parameters.values() if p.name != 'self']
        )

    def get_init_values(self):
        """
        Get instance attribute values
        """
        return {
            key: getattr(self, key) for key in self.get_init_args() 
        }

    def set_local_components(self, local_variables):
        for name, var in local_variables.items():
            if hasattr(var, 'run'):
                self.__dict__.update({name: var})

    def __repr__(self):
        return f"Completed {self.__class__.__name__} component run"

Например, компонент DataLoader наследуется от класса BaseStep и реализует метод run() для загрузки данных и их предварительной обработки при необходимости. Точно так же другие компоненты, такие как DataFeatureMapper, DataValidator, ModelTrainer, ModelEvaluator и ArtifactPusher, также наследуются от BaseStep и предоставляют пользовательские реализации для метода run().

Мы рассмотрим следующие шаги в конвейере:

  1. DataLoader
  2. DataFeatureMapper
  3. Валидатор данных
  4. МодельTrainer
  5. ModelEvaluator
  6. АртефактТолкач

BasePipeline: организация жизненного цикла машинного обучения

Класс BasePipeline служит основой для всех конвейеров в структуре жизненного цикла машинного обучения. Он содержит методы для регистрации метаданных и сохранения конечного состояния компонентов во время выполнения конвейера. Этот базовый класс позволяет разработчикам создавать конвейеры, которые эффективно управляют несколькими компонентами и их метаданными.

К основным методам и свойствам этого класса относятся:

  • metadata(self): свойство, которое возвращает метаданные всех компонентов конвейера выполнения.
  • Метод __repr__ также реализован для обеспечения удобочитаемого представления конвейера.
"""Base class for all pipelines"""

import pathlib
import json


class BasePipeline:
    """Base class for all Pipeline. Contains metadata registring and saving final component state
    during pipeline"""

    _type = "pipeline"

    @property
    def metadata(self):
        """Return metadata of all runned pipeline components
        """
        return {component.__class__.__name__: component.metadata
                for _, component in self.__dict__.items() if hasattr(component, 'run')
        }

    def __repr__(self):
        return f"Completed {self.__class__.__name__} pipeline run"

Внедрение модульной инфраструктуры жизненного цикла машинного обучения

Используя эти базовые классы, вы можете создавать модульные и многократно используемые шаги и конвейеры для своих проектов машинного обучения. Класс BaseStep гарантирует, что каждый шаг в конвейере соответствует согласованному интерфейсу, а класс BasePipeline обеспечивает основу для построения конвейеров, которые могут управлять несколькими шагами и их метаданными.

Вот общий обзор того, как реализовать эту модульную структуру жизненного цикла машинного обучения:

  1. Создавайте собственные компоненты для каждого шага жизненного цикла машинного обучения, наследуя от класса BaseStep и реализуя метод run() и свойство metadata. Например, шаги DataLoader, DataFeatureMapper и другие, упомянутые ранее.
  2. Создайте конвейер, используя класс BasePipeline для управления пользовательскими шагами и их метаданными.
  3. Создавайте высокоуровневые интерфейсы для обучения и логических выводов с помощью настраиваемых шагов.

Следуя этим шагам, разработчики могут создать модульную и многократно используемую структуру жизненного цикла машинного обучения, которая оптимизирует процесс разработки и поддерживает организованную кодовую базу.

Пример потока жизненного цикла машинного обучения

Приведенный ниже пример и предложенную выше структуру можно найти в следующем репозитории Github. Это новый подход. Другие пакеты, такие как конвейеры MLFlow, Tensorflow Extended (TFx), Kedro и т. д., могут использоваться для разработки надежных конвейеров.

Загрузчик данных:

DataLoader отвечает за загрузку данных и выполнение любых шагов предварительной обработки без сохранения состояния. Он может загружать данные из файла или URL-адреса и автоматически разбивать их на наборы для обучения, тестирования и оценки, используя указанную стратегию разделения (например, ShuffleSplit):

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.metrics import accuracy_score
from sklearn.model_selection import ShuffleSplit

# local imports
import mlxops
from mlxops.sklearn.components import (
  DataLoader,
  DataFeatureMapper,
  DataValidator,
  ModelTrainer,
  ModelEvaluator,
  ArtifactPusher
)
file_url = "http://storage.googleapis.com/download.tensorflow.org/data/heart.csv"
data_loader = DataLoader.from_file(
                    file_url,
                    target='target',
                    splitter=ShuffleSplit
)
data_loader.run()

DataLoader разбивает данные на обучающие, тестовые и оценочные наборы. Эти наборы могут быть получены из следующих свойств

data_loader.train_set
data_loader.test_set
data_loader.eval_set

DataFeatureMapper:

Этот компонент обрабатывает функции, такие как нормализация для числовых функций и горячее кодирование для категориальных функций. Пользователи могут создать сопоставитель объектов на основе предполагаемого конвейера, который автоматически определяет соответствующие преобразования на основе типов данных функций или путем предоставления собственного конвейера преобразования объектов.

feature_mapper = DataFeatureMapper.from_infered_pipeline()
feature_mapper.run(data_loader=data_loader)
mapped_train_features = feature_mapper.transform(data_loader.train_set[0])

Валидатор

Этот шаг компонента является необязательным и в качестве входных данных используется валидатор sklearn или детектор выбросов. Этот валидатор должен реализовать метод прогнозирования, который возвращает 1 для посторонних и -1 для посторонних. Компонент валидатора применяется к сопоставленным данным объектов на этапе feature_mapper. Метод run принимает в качестве входных данных запущенные DataLoader и DataValidator.

data_validator = DataValidator(
    validator=IsolationForest(contamination=0.001)
)
data_validator.run(data_loader=data_loader, feature_mapper=feature_mapper)

Маску для данных поезда можно получить из метода свойства. Эту маску можно использовать вместе с компонентом DataLoader для выбора только релевантных выборок. Например

train_data, train_targets = data_loader.train_set
valid_train_data, valid_train_targets = train_data[data_validator.trainset_valid],\
                                        train_targets[data_validator.trainset_valid]

ModelTrainer:

Компонент ModelTrainer обучает указанный пользователем оценщик (например, классификатор scikit-learn) с использованием обработанных данных. Это зависит от компонентов DataLoader, DataFeatureMapper и DataValidator.

base_trainer = ModelTrainer(
    estimator=LogisticRegression()
)
base_trainer.run(data_loader, feature_mapper, data_validator)

Давайте сохраним эту базовую модель и построим новую модель. Мы сравним новую модель с базовой моделью позже.

mlxops.saved_model.save(base_trainer, "lr_base_model")

Давайте построим вторую модель претендента, используя классификатор случайного леса.

new_trainer = ModelTrainer(
    estimator=RandomForestClassifier(n_estimators=50)
)
new_trainer.run(data_loader, feature_mapper, data_validator)

Оценщик модели:

Компонент ModelEvaluator сравнивает производительность двух моделей, используя указанные показатели (например, показатель точности от scikit-learn). В качестве входных данных он использует компоненты DataLoader, DataFeatureMapper, DataValidator и новую обученную модель.

evaluator = ModelEvaluator(base_model="lr_base_model", metrics=[accuracy_score])
evaluator.run(
    data_loader, feature_mapper, data_validator, new_trainer
)

АртефактПушер:

Компонент ArtifactPusher помещает артефакты (например, обученные модели) в указанный каталог, если текущая модель является улучшением по сравнению с существующей лучшей моделью. Толкатель артефактов принимает Evaluator в качестве входных данных в своем методе запуска. Оценщик имеет логический атрибут push_model, который будет иметь значение True, если модель работает лучше, чем текущая модель (модели).

pusher = ArtifactPusher(model_serving_dir='serving')
pusher.run(evaluator)

Высокоуровневые интерфейсы для обучения и логического вывода:

Для более рационального подхода пользователи могут создать ModelTrainingPipeline, объединяющий все компоненты в одном конвейере.

from mlxops.sklearn.pipeline import ModelTrainingPipeline


file_url = "http://storage.googleapis.com/download.tensorflow.org/data/heart.csv"

train_pipeline_arguments = {
    'data_loader': DataLoader.from_file(
        file_name=file_url, target='target', splitter=ShuffleSplit
    ),
    'data_validator': DataValidator(
        validator=IsolationForest(contamination=0.01)
    ),
    'feature_mapper': DataFeatureMapper.from_infered_pipeline(),
    'trainer': ModelTrainer(
        estimator=RandomForestClassifier(n_estimators=100, max_depth=3)
    ),
    'evaluator': ModelEvaluator(
        base_model='lr_base_model',
        metrics=[accuracy_score]
    ),
    'pusher': ArtifactPusher(model_serving_dir='serving'),
    "run_id": "0.0.0.1"
}

# execute pipeline
train_pipeline = ModelTrainingPipeline(
    **train_pipeline_arguments
)
train_pipeline.run()
mlxops.saved_model.save(train_pipeline, "base_model")

Pipeline будет работать локально. Хотя вы можете запустить его, например, в одном контейнере, но обычно вы хотите запускать каждый шаг отдельно в своем собственном экземпляре контейнера. Для выполнения каждого шага можно использовать оркестратор, такой как AWS stepfunctions или Airflow.

Шаги и конвейеры здесь используют Pandas и SKLearn или любую модель типа SKLearn, такую ​​как XGBoost, которая соответствует этой структуре с методами подбора и прогнозирования.

Можно также создать дополнительные базовые классы. DataLoader является таким примером, который реализует методы, основанные на Pandas. Подобные классы могут быть созданы для интеграции, скажем, с наборами данных Dask или Tensorflow. То же самое касается других шагов.

Заключение

Этот модульный подход к проектированию инфраструктуры жизненного цикла машинного обучения с использованием настраиваемых компонентов и конвейеров позволяет разработчикам сосредоточиться на важнейших аспектах своих проектов, таких как выбор модели и разработка функций. Создавая повторно используемые компоненты и высокоуровневые интерфейсы, пользователи могут быстро создавать прототипы. Это новый подход, который дает представление о том, как можно реализовать такую ​​структуру.

СТАНЬТЕ ПИСАТЕЛЕМ на MLearning.ai