Как создавать повторно используемые компоненты, которые интегрируются с Scikit-learn, Dask и RAPIDS

При создании многоразового кода для науки о данных и машинного обучения разработчикам часто требуется добавить настраиваемую бизнес-логику вокруг существующих библиотек с открытым исходным кодом, таких как scikit-learn. Эти настройки могут выполнять предварительную обработку данных, сегментировать эти данные определенным образом или реализовывать собственный алгоритм. Пользовательская логика приводит к большему количеству кода, который нужно понимать и поддерживать, что добавляет сложности и повышает риск. В этом сообщении блога будет обсуждаться, как использовать API библиотеки scikit-learn для добавления таких настроек таким образом, чтобы минимизировать код, сократить обслуживание, облегчить повторное использование и обеспечить возможность масштабирования с помощью таких технологий, как Dask и БЫСТРЫЕ.

Почему сложно следовать API scikit-learn при моделировании кода?

Конечная цель любого машинного обучения обычно - как можно быстрее получить лучшую модель для данного набора данных. В машинном обучении так много времени тратится на обработку данных, обучение модели и проверку, что иногда упускается из виду дизайн и ремонтопригодность кода, созданного в процессе. В мире ограниченных ресурсов, возможно, такая приоритезация имеет смысл. В конце концов, в производство будет запущена именно модель, а не код, который ее создал. В банках и других финансовых учреждениях модели должны пройти процесс проверки - там, где воспроизводимость критически важна - прежде, чем они будут внедрены в производство. После развертывания модели код, который ее создал, часто остается нетронутым в репозитории git в течение недель, месяцев или даже лет. До тех пор, пока однажды разработчик модели, который может быть или не быть первоначальным автором, не вернется к коду, чтобы обновить модель. Они получат значительную выгоду - по времени и с меньшими затратами - благодаря хорошо протестированному и документированному репозиторию, который легко понять, чтобы они могли быстро приступить к работе.

На практике соблюдение стандартов и их применение может быть сложной задачей, потому что

  • Стандарты необходимо понять, прежде чем их можно будет применять.
  • Применение стандарта к вашей проблеме требует усилий.

Преодоление этих проблем требует тщательного обдумывания и требует драгоценного времени, которого может не хватить на проект по науке о данных. Экосистема PyData состоит из множества развивающихся стандартов и API-интерфейсов, что затрудняет отслеживание изменений. Все это может замедлить процесс разработки модели, поэтому путь наименьшего сопротивления - использовать библиотеку, заключенную в ваш собственный дизайн, который лучше всего подходит для вашего приложения. Однако это часто приводит к таким проблемам, как утечка данных из-за ошибки пользователя или повторяющегося кода, поскольку конструкция не была достаточно модульной, чтобы допускать расширения. В последнем случае накладные расходы на обслуживание высоки из-за все более большой и сложной базы кода, где исправление ошибок или масштабирование, даже с Dask и RAPIDS, становится сложной задачей.

Пример Scikit-learn

Мы будем использовать следующий пример из документации scikit-learn, чтобы проиллюстрировать пункты этого поста.

from sklearn import datasets
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, train_test_split
import numpy as np

X_digits, y_digits = datasets.load_digits(return_X_y=True)

# Define a pipeline to search for the best combination of PCA truncation
# and classifier regularization.
pca = PCA()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(max_iter=10000, tol=0.1)
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])
# Parameters of pipelines can be set using ‘__’ separated parameter names:
param_grid = {
    'pca__n_components': [5, 15, 30, 45, 64],
    'logistic__C': np.logspace(-4, 4, 4),
}

search = GridSearchCV(pipe, param_grid, n_jobs=-1)

X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, random_state=123)

search.fit(X_train, y_train)

best = search.best_estimator_

print(f"Training set score: {best.score(X_train, y_train)}")
print(f"Test set score: {best.score(X_test, y_test)}")

Выход:

Training set score: 1.0
Test set score: 0.9666666666666667

Настройка примера Scikit-learn

Предположим, мы хотим изменить этот пример, чтобы изменить данные после шага PCA. Возможно, у нас есть набор функций, которые мы всегда хотим включить из другого источника данных или нам нужно токенизировать данные. Мы могли бы сделать это, удалив конвейер и добавив функцию для выполнения мутации X.

def mutate(X):
    """Mutates X"""
    # ... do something ...
    return X

pca = PCA(n_components=search.best_params_['pca__n_components'])
logistic = LogisticRegression(
    max_iter=10000, tol=0.1, C=search.best_params_['logistic__C'])

X_train, X_test, y_train, y_test = train_test_split(
    X_digits, y_digits, random_state=123)

X_train = pca.fit_transform(X_train, y_train)
X_train = mutate(X_train)
logistic = logistic.fit(X_train, y_train)

X_test = pca.transform(X_test) # <- Don't call fit again!
X_test = mutate(X_test) # <-Don’t forget to call mutate on X_test!

print(f"Training set score: {logistic.score(X_train, y_train)}")
print(f"Test set score: {logistic.score(X_test, y_test)}")

Выход:

Training set score: 1.0
Test set score: 0.9666666666666667

Это позволяет настраивать, не требуя, чтобы пользователь узнал о конвейерах scikit-learn. Однако, если вы сотрудничаете с другими командами или разрабатываете библиотеку для них, этот шаблон быстро становится проблематичным. Этот код можно инкапсулировать в класс, а затем многократно копировать. Некоторые из этих классов можно даже немного изменить, чтобы добавить расширенную функциональность или масштабирование. Кроме того, если вы обнаружите ошибку в одном из классов, у вас потенциально будет много мест, где ее можно исправить. Удачной охоты!

Рассмотрим альтернативу, которая расширяет API scikit-learn, добавляя настраиваемый оценщик, содержащий логику мутации.

from sklearn.base import BaseEstimator, TransformerMixin
from abc import ABCMeta

class Mutate(TransformerMixin, BaseEstimator, metaclass=ABCMeta):
    def fit(self, X, y):
        return self
    
    def transform(self, X):
        """Mutates X"""
        # ... do something ...
        return X

pca = PCA(n_components=search.best_params_['pca__n_components'])
logistic = LogisticRegression(max_iter=10000, tol=0.1, C=search.best_params_['logistic__C'])

pipe = Pipeline(steps=[('pca', pca), ('mutate', Mutate()), ('logistic', logistic)])

X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, random_state=123)

pipe.fit(X_train, y_train)
print(f"Training set score: {pipe.score(X_train, y_train)}")
print(f"Test set score: {pipe.score(X_test, y_test)}")

Выход:

Training set score: 1.0
Test set score: 0.9666666666666667

Добавив настраиваемый оценщик, мы инкапсулировали мутацию таким образом, чтобы мы могли вставить ее в конкретный шаг конвейера. Дополнительным преимуществом этого подхода является то, что нам нужно поддерживать только один класс, который со временем может расти, чтобы содержать больше функциональных возможностей и масштабирования.

def transform(self, X):
    """Mutates X"""
    # ... do something ...
    if is_dask_collection(X):
        # Do Dask things
    elif is_rapids_collection(X):
        # Do RAPIDS things
    return X

Основной недостаток этого подхода и, вероятно, то, почему он не соблюдается, заключается в том, что разработчику требуется достаточное понимание scikit-learn, чтобы уместить проблему в API.

Существующие масштабируемые пользовательские оценщики в Dask и RAPIDS

На самом деле это не новый паттерн. Фактически, у нас уже есть множество примеров настраиваемых масштабируемых оценщиков в сообществе PyData. Dask-ml - это библиотека расширений scikit-learn, которые масштабируют данные и выполняют параллельные вычисления с использованием Dask. Он предоставляет множество незаменимых замен для оценщиков scikit-learn.

Вот как выглядит примерный конвейер игрушки с dask-ml.

from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='3GB')
from dask_ml.datasets import make_classification
from dask_ml.decomposition import PCA
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline  # <-- using the sklearn pipeline
import numpy as np

X, y = make_classification(n_samples=1000, n_features=20,
                           chunks=100, n_informative=4,
                           random_state=0)

# Define a pipeline to search for the best combination of PCA truncation
# and classifier regularization.
pca = PCA()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(fit_intercept=False, max_iter=10000, tol=0.1)
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])
# Parameters of pipelines can be set using ‘__’ separated parameter names:
param_grid = {
    'pca__n_components': [5, 15, 30, 45, 64],
    'logistic__C': np.logspace(-4, 4, 4),
}

search = GridSearchCV(pipe, param_grid, n_jobs=-1)

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=123)

search.fit(X_train, y_train)

best = search.best_estimator_

print(f"Training set score: {best.score(X_train, y_train)}")
print(f"Test set score: {best.score(X_test, y_test)}")

В качестве альтернативы можно использовать заменяющие элементы cuML для масштабирования на графических процессорах NVIDIA с помощью RAPIDS.

from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from cuml.dask.datasets.classification import make_classification

cluster = LocalCUDACluster()
client = Client(cluster)

X, y = make_classification(n_samples=1000, n_features=20,
                           chunks=100, n_informative=4,
                           random_state=0)
from cuml.dask.decomposition import PCA
from cuml.linear_model import LogisticRegression

pca = PCA()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(max_iter=10000, tol=0.1)
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])

Создание собственных оценщиков scikit-learn, масштабируемых с помощью Dask & RAPIDS

На этом этапе мы показали, как один и тот же конвейер можно масштабировать двумя способами, и вы можете увидеть, как возникает закономерность. Структуры данных и алгоритмы моделирования зависят от одних и тех же соответствующих базовых библиотек, но слабо связаны друг с другом. Другими словами, мы отделили логику загрузки данных от вычислений, которые опираются либо на массивный, либо на dataframe API. Под капотом мы видим, что оценщики Dask знают, как работать с коллекциями Dask, а оценщики cuML знают, как работать с коллекциями RAPIDS. Все работает, если мы читаем данные с помощью библиотеки, которая соответствует оценщику. Можем ли мы создать наши собственные оценщики, следуя этому шаблону, чтобы инкапсулировать пользовательскую бизнес-логику таким образом, чтобы она масштабировалась как на Dask, так и на RAPIDS?

Сначала давайте рассмотрим способы чтения данных для фреймворков в оперативной памяти, распределенных и ускоренных сред.

import pandas as pd
df = pd.read_csv(...)

import dask.dataframe as dd
df = dd.read_csv(...)

import cudf as cdf
df = cdf.read_csv(...)

Обратите внимание, что все три из этих библиотек предоставляют заменяющие элементы, которые можно использовать в качестве уровня абстракции, который обобщает чтение данных в память и операции с данными. Когда у нас есть структура, подобная фрейму данных, мы можем определить оценщик, который делает осторожные предположения об API данных.

from mylib import CustomEstimator
est = CustomEstimator(**params)
est.fit(df)

В качестве альтернативы, если мы хотим стандартизировать интерфейс массива, метод подгонки нашего оценщика может принимать значения X и Y. Мы обнаружили, что подобный массиву API более согласован для задач scikit-learn в трех фреймворках, что делает наш пользовательский код оценки более универсальным для масштабирования. Ниже мы показываем слегка измененную версию нашего CustomEstimator использования для подобного массиву интерфейса.

from mylib import CustomEstimator
est = CustomEstimator(**params)
X, y = df[features].values, df[target].values
est.fit(X, y)

Пользовательский оценщик scikit-learn

Понимая базовую схему отделения чтения данных и манипуляций от бизнес-логики, давайте рассмотрим пример того, как это может выглядеть на практике. Ниже мы определяем наш собственный класс перекрестной проверки CustomSearchCV, который реализует версию логики обучения модели в памяти, следуя API scikit-learn.

from sklearn.base import BaseEstimator, clone
from sklearn.model_selection import train_test_split
import copy
import pandas as pd
import numpy as np

class CustomSearchCV(BaseEstimator):
    
    def __init__(self, estimator, cv, logger):
        self.estimator = estimator
        self.cv = cv
        self.logger = logger
    
    def fit(self, X, y=None, **fit_kws):
        if isinstance(X, pd.DataFrame):
            X = X.values
        # Insert more guards here!
            
        X_base, X_holdout, y_base, y_holdout = train_test_split(
            X, y, random_state=123)
        
        self.split_scores_ = []
        self.holdout_scores_ = []
        self.estimators_ = []            
        
        for train_idx, test_idx in self.cv.split(X_base, y_base):
            X_test, y_test = X_base[test_idx], y_base[test_idx]
            X_train, y_train = X_base[train_idx], y_base[train_idx]

            estimator_ = clone(self.estimator)
            estimator_.fit(X_train, y_train, **fit_kws)

            self.logger.info("... log things ...")
            self.estimators_.append(estimator_)
            self.split_scores_.append(estimator_.score(X_test, y_test))            
            self.holdout_scores_.append(
                estimator_.score(X_holdout, y_holdout))
    
        self.best_estimator_ = \
                self.estimators_[np.argmax(self.holdout_scores_)]
        return self

CustomSearchCV хорошо работает с существующими оценщиками, такими как sklearn.model_selection.RepeatedKFold и xgboost.XGBRegressor. Пользователи могут даже определить свой собственный класс сворачивания и внедрить его в наш оценщик. Пример использования показан ниже.

from sklearn.model_selection import RepeatedKFold
import xgboost as xgb

from mylib import make_classifier_data, logger

X, y = make_classifier_data(
    n_samples=100_000,
    n_features=100,
    response_rate=0.25,
    predictability=0.25,
    random_state=123,
)

cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
clf = CustomSearchCV(xgb.XGBClassifier(n_jobs=-1), cv, logger)

clf.fit(X, y)
clf.best_estimator_

Масштабировать с помощью Dask

CustomSearchCV может работать с коллекциями Dask с небольшими изменениями в методе подгонки. Сначала создайте клиент Dask для подключения к вашему кластеру.

from dask.distributed import Client, progress

client = Client()
client

Затем добавьте следующую логику для проверки входных данных, чтобы определить, является ли это коллекцией Dask.

from dask.base import is_dask_collection
import dask.dataframe as dd
...
    def fit(self, X, y=None, **fit_kws):
        if isinstance(X, dd.DataFrame):
            X = X.to_dask_array(lengths=True)
        elif isinstance(X, pd.DataFrame):
            X = X.values
        if is_dask_collection(X):
            from dask_ml.model_selection import train_test_split
        else:
            from sklearn.model_selection import train_test_split
        ...

Теперь мы можем читать (или генерировать) данные с помощью Dask и внедрять оценщик, поддерживающий Dask, в наш CustomSearchCV объект. В этом случае мы вводим xgb.dask.DaskXGBClassifier.

from sklearn.model_selection import RepeatedKFold
import xgboost as xgb

from mylib import logger, make_classifier_data

X, y = make_classifier_data(
    n_samples=100_000,
    n_features=1000,
    response_rate=0.25,
    predictability=0.25,
    random_state=123,
    dask_ml=True,
    chunks=1000
)

cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
clf = CustomSearchCV(xgb.dask.DaskXGBClassifier(), cv, logger)

clf.fit(X, y)
clf.best_estimator_

Масштабирование с помощью графических процессоров

Один графический процессор

После внесения аналогичных изменений в CustomSearchCV мы можем выполнять обучение на одном графическом процессоре, инициализировав xgb.XGBClassifier с помощью tree_method="gpu_hist". В этом случае нам не нужно изменять чтение (или генерацию) данных, поскольку XGBoost знает, как перемещать данные на GPU.

from sklearn.model_selection import RepeatedKFold
from mylib import make_classifier_data, logger
import xgboost as xgb

X, y = make_classifier_data(
    n_samples=100_000,
    n_features=1000,
    response_rate=0.25,
    predictability=0.25,
    random_state=123
)
cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
xgb_clf = xgb.XGBClassifier(n_jobs=-1, tree_method="gpu_hist")
clf = CustomSearchCV(xgb_clf, cv, logger)
clf.fit(X, y)
clf.best_estimator_

Один узел, несколько графических процессоров

Многие системы имеют несколько графических процессоров, которые можно объединить в кластер с одним хостом, используя Dask и RAPIDS. Ниже мы инициализируем xgb.dask.DaskXGBClassifier с tree_method="gpu_hist" и подключаем его к dask_cuda.LocalCUDACluster. По умолчанию LocalCUDACluster добавит cuda-worker (GPU worker) для каждого GPU на хосте. Если мы запустим этот код в системе с восемью графическими процессорами, у нас будет восемь рабочих кластеров. NVLink и Apache Arrow обеспечивают чрезвычайно эффективный распределенный доступ к данным между графическими процессорами.

Кроме того, по мере заполнения памяти графического процессора данные будут перетекать в системную память, которая обычно намного больше, чем то, что доступно на графических процессорах. Это делает вычисления с одним узлом и несколькими графическими процессорами хорошо подходящими для решения многих задач в области науки о данных и машинного обучения. В приведенном ниже примере показано использование без изменений класса CustomSearchCV.

from sklearn.model_selection import RepeatedKFold
from mylib import make_classifier_data, logger

import xgboost as xgb

cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
dclf = xgb.dask.DaskXGBClassifier(tree_method="gpu_hist")
clf = CustomSearchCV(dclf, cv, logger)
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)
X, y = make_classifier_data(
    n_samples=100_000,
    n_features=1000,
    response_rate=0.25,
    predictability=0.25,
    random_state=123,
    dask_ml=True,
    chunks=1000
)
X = X.persist()

dclf.client = client
clf.fit(X, y)
clf.best_estimator_

Проверяйте панель управления Dask и использование графического процессора во время выполнения этого кода. Обратите внимание, что кластер Dask занят выполнением работы, а затем приостанавливается, пока загрузка графического процессора резко возрастает. Здесь мы разделяем данные в Dask на системную память и CPU, а затем обучаем модель XGBoost на графических процессорах. В будущем улучшением будет выполнение обучающих разделений на графическом процессоре.

Оценщик CustomSearchCV может содержать настраиваемую логику для масштабируемой настройки гиперпараметров или выполнять некоторые из техник отсечения, регуляризации и ранней остановки, которые обсуждались в предыдущем посте Управление моделями XGBoost.

Примечания

Вот несколько дополнительных замечаний, которые следует учитывать:

  • Стандартизация интерфейса, подобного массиву, для внутренних структур данных на ранней стадии процедуры подбора позволяет снизить сложность, связанную с масштабированием с помощью Dask и RAPIDS.
  • Убедитесь, что значения X, извлеченные из фреймов данных, содержат только функции для обучения и разделяют метки как 1d-массивы или pd.Series.
  • Убедитесь, что значения X не содержат столбцов, используемых для сегментации, например дат.
  • Сведите к минимуму преобразование фрейма данных в массив, чтобы избежать узких мест в производительности.
  • Методы должны возвращать коллекции, соответствующие типу ввода.
  • Разработчикам следует использовать вспомогательную функцию check_estimator в sklearn.utils.estimator_checks, чтобы убедиться, что их пользовательские оценщики соответствуют API.

Заключение

В этом посте мы обсудили шаблоны для добавления пользовательских функций в код моделирования scikit-learn. Мы обнаружили, что, следуя API scikit-learn, мы можем минимизировать настройки и инкапсулировать логику масштабирования в одном месте. Это снижает стоимость обслуживания с течением времени и предоставляет разработчикам примеры того, как интегрировать свой код в экосистему. Следование стандартному API позволяет нам делиться оценщиками и комбинировать их для решения многих задач.

Первоначально опубликовано на https://www.capitalone.com.

РАСКРЫТИЕ ИНФОРМАЦИИ: © 2021 Capital One. Мнения принадлежат отдельному автору. Если в этом посте не указано иное, Capital One не является аффилированным лицом и не поддерживается ни одной из упомянутых компаний. Все используемые или отображаемые товарные знаки и другая интеллектуальная собственность являются собственностью соответствующих владельцев.