Ускорение операций машинного обучения с помощью конвейеров Kubeflow

Нажмите здесь, чтобы перейти к разделу Введение в пайплайны Kubeflow

MLOps, сокращение от Machine Learning Operations, представляет собой набор практик и методов, направленных на оптимизацию развертывания, управления и мониторинга моделей машинного обучения в рабочей среде. Обычно это достигается коллективными усилиями различных заинтересованных сторон, таких как специалисты по данным, инженеры по машинному обучению и инженеры-программисты.

MLOps включает в себя автоматизацию, используя технологии для автоматизации извлечения данных, обучения моделей, тестирования и процессов развертывания для обеспечения согласованности и воспроизводимости. Это также подчеркивает необходимость постоянного контроля и обслуживания моделей в производстве. Это включает в себя отслеживание производительности модели, обнаружение аномалий и адаптацию к изменениям данных и концепций.

Масштабируемость — еще один ключевой аспект MLOps. Он решает проблему обработки больших объемов данных, удовлетворения растущих требований пользователей и включения более сложных задач. MLOps стремится сделать модели машинного обучения более надежными, надежными и эффективными, применяя лучшие практики как из области разработки программного обеспечения, так и из области науки о данных.

Таким образом, MLOps позволяет организациям эффективно внедрять модели машинного обучения и управлять ими, обеспечивая их оптимальную производительность, масштабируемость и долгосрочный успех.

Kubeflow Pipelines (KFP) — отличный инструмент, который я выбрал для упрощения MLOps для своих проектов. KFP позволяет создавать, развертывать и запускать конвейеры машинного обучения масштабируемым и воспроизводимым образом, а простота использования — еще одно большое преимущество. Цель этой статьи — дать вам всестороннее представление о том, как интегрировать KFP в процесс разработки вашей модели и в полной мере использовать его преимущества.

В этой серии я хочу представить подробное руководство, демонстрирующее Конвейер непрерывного обучения машинному обучению. Этот конвейер включает в себя различные этапы, включая извлечение данных, обучение и оценку модели, регистрацию моделей, службы прогнозирования, оценку производительности и повторное обучение модели на основе показателей оценки. Хотя руководство в основном посвящено реализации KFP, меньше внимания будет уделяться моему личному мнению относительно набора данных и обоснованию выбора модели. Чтобы обеспечить полное освещение, я разделил учебник на три части следующим образом, так как это будет обширное путешествие:

MLOps с конвейерами Kubeflow (часть 1)

  • Извлечение и исследование данных
  • Обучение модели
  • Оценка модели

MLOps с конвейерами Kubeflow (часть 2)

  • Сохранить обученную модель в реестре моделей
  • Загрузить модель в среду из реестра моделей
  • Делайте прогнозы модели

MLOps с конвейерами Kubeflow (часть 3)

  • Оценка эффективности
  • Переобучение модели триггера

Без лишних слов, давайте приступим к делу.

Извлечение и исследование данных

Для целей этого руководства мы собираемся использовать набор данных breast cancer, который поставляется с предварительно загруженной библиотекой scikit-learn.

Этот набор данных содержит числовые измерения различных размеров отдельных опухолей, таких как периметр и текстура, из биопсий молочной железы и одно значение результата. Целевые метки каждой записи являются бинарными, указывая, является ли опухоль злокачественной или доброкачественной, что, по сути, и будет предсказывать наша модель машинного обучения.

Во-первых, давайте импортируем необходимые библиотеки, а также набор данных. Сначала мы собираемся импортировать весь набор данных в переменную data для быстрого исследовательского анализа данных, а также загрузим выборочные измерения в переменную X и целевые метки в переменную y.

# Importing libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import metrics
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
import xgboost as xgb
from sklearn.metrics import roc_auc_score

# import the entire dataset into `data` for quick EDA
data = load_breast_cancer() 
df = pd.DataFrame(data = data.data, columns = data.feature_names) 
df['target'] = pd.Series(data.target) 

#import features into X and target into y for training
X, y = load_breast_cancer(return_X_y = True, as_frame = True) 

# Create the training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=123)

df.head()
#check the column names
df.columns
>>>>   ['mean radius', 'mean texture', 'mean perimeter', 'mean area',
       'mean smoothness', 'mean compactness', 'mean concavity',
       'mean concave points', 'mean symmetry', 'mean fractal dimension',
       'radius error', 'texture error', 'perimeter error', 'area error',
       'smoothness error', 'compactness error', 'concavity error',
       'concave points error', 'symmetry error', 'fractal dimension error',
       'worst radius', 'worst texture', 'worst perimeter', 'worst area',
       'worst smoothness', 'worst compactness', 'worst concavity',
       'worst concave points', 'worst symmetry', 'worst fractal dimension',
       'target']

Проверьте разделение целевой переменной:

# Check target names
print(data.target_names)
>>>> ['malignant' 'benign'] #0 is malignant 1 is benign

# Check value counts
print(df['target'].value_counts()) 
>>>> 0    212   
>>>> 1    357

# Analyzing the target variable
plt.title('Count of cancer type')
sns.countplot(x=df['target'])
plt.xlabel('Cancer Severity')
plt.ylabel('Count')
plt.show()

Постройте диаграмму некоторых функций, которые представляют ключевые измерения по сравнению с целью:

# Size the plot 
plt.figure(figsize=(15,5))
plt.subplot(1,3,1)
# Plotting correlation between target and mean radius
sns.boxplot(x="target", y="mean radius", data=df)
plt.subplot(1,3,2)
# Plotting correlation between target and mean perimeter
sns.boxplot(x="target", y="mean perimeter", data=df)
plt.subplot(1,3,3)
# Plotting correlation between target and mean concave points
sns.boxplot(x="target", y="mean concave points", data=df)
# Show plot
plt.show()

Давайте продолжим изучение набора данных breast cancer, взглянув на взаимосвязь между mean radius и mean concave points образца опухоли. Таким образом, мы можем определить, различаются ли эти свойства в зависимости от того, является ли опухоль злокачественной или доброкачественной.

# Create scatter plot of mean radius vs mean concave points
sns.relplot(x='mean radius', y='mean concave points', data=df, style='target', hue='target', kind='scatter')

# Show plot
plt.show()

Наконец, давайте построим парный график, чтобы увидеть корреляцию некоторых наших характеристик с тяжестью рака.

# Size the plot
plt.figure(figsize=(15,4))

# Plotting bivariate relations between each pair of features 
sns.pairplot(df, hue="target", vars = ["mean radius", "mean concave points", "mean texture", "mean smoothness"])

# Show plot
plt.show()

На основе нашего анализа стало очевидно, что у нас есть набор очень полезных функций, которые мы можем эффективно использовать для прогнозного моделирования. Я настоятельно рекомендую вам взять эти сценарии кода в свой собственный блокнот и изучить данные, чтобы раскрыть дополнительные идеи, поскольку из этого набора данных еще предстоит извлечь огромное количество информации. Более того, выполнив это упражнение, вы получите практический опыт работы с библиотеками Pandas и Seaborn, особенно если вы еще не знакомы с ними.

Обучение и оценка модели

Перемотаем вперед, давайте начнем обучение модели. В качестве модели для этого упражнения выбран классификатор XGBoost, который на сегодняшний день является одним из самых мощных широко используемых классификаторов. И с этого момента, помимо обучения и оценки модели, мы начнем создавать компоненты конвейера для нашего конечного рабочего процесса машинного обучения. Те из вас, кто не знаком с понятием компонентов, могут обратиться к моей предыдущей статье.

Я планирую инкапсулировать код, аналогичный показанному выше, который импортирует данные о раке молочной железы из scikit-learn в среду, а затем сохраняет данные в облачном хранилище в компоненте на основе Python. Это позволит нам включить его в качестве одного из шагов в конвейере.

Компонент № 1: Импорт данных

# Import Libraries
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, ClassificationMetrics,
                        Metrics, component)
from google.cloud.aiplatform import pipeline_jobs
from typing import NamedTuple
import google
from google.oauth2 import credentials
from google.oauth2 import service_account
from google.oauth2.service_account import Credentials
from google.cloud import storage
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components.v1.batch_predict_job import \
    ModelBatchPredictOp as batch_prediction_op

# Import Dataset and Save in GCS
@component(
    base_image="python:3.9",
    output_component_file="import_data.yaml",
)
def import_data(
            dataset_id: str,
            file_bucket: str, 
    ) -> str: # this determine the output type as string

    # Import Libraries
    import pandas as pd
    import numpy as np
    from sklearn.datasets import load_breast_cancer

    # import the entire dataset into 'data'
    data = load_breast_cancer() 
    
    # save the data in df, including the targets
    df = pd.DataFrame(data = data.data, columns = data.feature_names) 
    df['target'] = pd.Series(data.target) 
    
    # save df in cloud storage 
    save_path = f'gs://{file_bucket}/{dataset_id}/{dataset_id}_data.csv'
    df.to_csv(save_path, index=True) 
    
    print(f'{dataset_id}_data.csv saved in {save_path}')
    
    return save_path

На следующем этапе мы приступим к обучению модели. Обратите внимание, что в целях этого руководства я использую довольно общие значения параметров модели в процессе обучения, поскольку целью этого руководства не является предоставление подробных инструкций по использованию XGBClassifier.

Компонент 2. Обучение моделей

# Load Dataset and Train Model
@component(
    base_image="python:3.9",
    output_component_file="train_model.yaml",
)
def model_training(
            dataset_id: str,
            file_bucket: str, 
            save_path: str,
            metrics: Output[Metrics],
            metricsc: Output[ClassificationMetrics]
    ) -> NamedTuple(
        "Outputs",
        [
            ("accuracy", float),  # Return parameters
            ("f1_score", float),
            ("roc_auc", float)
        ],
    ):
    # Import Libraries
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import seaborn as sns
    from sklearn.datasets import load_breast_cancer
    from sklearn.model_selection import train_test_split
    import xgboost as xgb
    from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score, roc_curve, confusion_matrix
    
    # Read csv that was saved in 'import_data' component
    df = pd.read_csv(save_path)  

    # X and y
    y = np.squeeze(df['target'].values)
    X = df.drop(columns='target')

    # Create the training and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=123)

    # Instantiate the XGB Classifier: xgb_model
    xgb_model = xgb.XGBClassifier(
        learning_rate=0.01,
        n_estimators=100,
        max_depth=8,
        min_child_weight=1,
        max_delta_step=1, 
        colsample_bytree=0.9,
        subsample=0.9,
        objective='binary:logistic',
        nthread=4,
        scale_pos_weight=1, 
        eval_metric='auc', 
        base_score=0.5
    )

    # Fit the classifier to the training set
    xgb_model.fit(X_train, y_train)
    
    # Predict based on X_test
    y_pred = xgb_model.predict(X_test)
    y_pred_proba = xgb_model.predict_proba(X_test)[:, 1]
    
    # Model accuracy 
    accuracy = accuracy_score(y_test, y_pred)
    print("Accuracy:", accuracy)
    
    # Precision & Recall 
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    
    # F1 Score 
    f1_score = f1_score(y_test, y_pred)
    print("F1 Score:", f1_score)

    # ROC AUC Score
    roc_auc = roc_auc_score(y_test, y_pred_proba)
    print("ROC AUC Score:", roc_auc)

    # Log eval metrics
    metrics.log_metric("Model", "XGBClassifier")
    metrics.log_metric("Size", df.shape[0])
    metrics.log_metric("Accuracy",(accuracy * 100.0))
    metrics.log_metric("AUC", roc_auc)
    metrics.log_metric("F1_Score", f1_score)

    # Compute fpr, tpr, thresholds for the ROC Curve
    fpr, tpr, thresholds = roc_curve(
        y_true=y_test, y_score=y_pred_proba, pos_label=True
    )
    
    # Log classification metrics
    metricsc.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
    metricsc.log_confusion_matrix(['Malignant', 'Benign'], confusion_matrix(y_test, y_pred).tolist())

    return (accuracy, f1_score, roc_auc)

Как видно из приведенного выше сценария кода, компонент возвращает метрики оценки в качестве выходных данных. В последующем компоненте мы оценим эти показатели производительности, полученные из компонента model training, чтобы определить, указывают ли они на надежную или некачественную производительность модели.

Компонент № 3: Оценка модели

# Evaluate the model performance
@component(
    base_image="python:3.9",
    output_component_file="model_eval.yaml",
)
def model_evaluation(
            accuracy: float, 
            f1_score: float, 
            roc_auc: float, 
            accuracy_threshold: float, 
            f1_score_threshold: float, 
            roc_auc_threshold: float
            ) -> NamedTuple(
                "Output", [("pass_fail", str)]
            ):
    
    # Set checker to True
    checker = True
    
    # Set checker to False if any of the eval metrics is below threshold
    if accuracy < accuracy_threshold: 
        checker = False 
    if f1_score < f1_score_threshold: 
        checker = False 
    if roc_auc < roc_auc_threshold: 
        checker = False 
        
    # if checker == True, return "Pass", otherwise return "Fail"
    if checker == True: 
        return ("Pass",) 
    else: 
        return ("Fail",)

Компонент №3 получит показатели оценки, возвращенные компонентом model_training, в качестве входных параметров вместе с соответствующими пороговыми значениями. Этот простой компонент на основе функций Python будет оценивать, находятся ли показатели (точность, оценка F1 и ROC_AUC) ниже заданных пороговых значений. Затем эту информацию можно использовать в последующих компонентах, чтобы определить, следует ли регистрировать модель в реестре вершинных моделей и условно развертывать обученную модель в конечной точке.

Соедините компоненты в конвейер

Настало время связать все эти компоненты в единый конвейер. Однако есть одно предостережение. Я собираюсь замаскировать некоторые части кода, призывая вас попробовать его самостоятельно, потому что я твердо верю, что лучший способ научиться — это запачкать руки. После того, как вы попробуете это сами, поделитесь своим решением в комментарии или отправьте мне личное сообщение. Я также открыт для любых вопросов, которые могут у вас возникнуть относительно этого поста. При выполнении этого упражнения вы найдете статью Конвейеры Kubeflow: простая организация рабочих процессов машинного обучения, особенно раздел под названием Шаг 2. Объедините компоненты в конвейер.

# library imports
from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs

#tag cell with parameters
DATASET_ID = 'breast_cancer'
FILE_BUCKET = '' # your GCS Bucket Name
REGION = '' # the region your environment belongs

@dsl.pipeline(
    name='breast-cancer-pipeline', 
    description='breast-cancer-pipeline'
    )
def pipeline(
        dataset_id: str = DATASET_ID, 
        file_bucket: str = FILE_BUCKET, 
        region: str = REGION
    ):
    
    # ----- create training set --------
    import_data_op = import_data(dataset_id=DATASET_ID,
                          file_bucket=FILE_BUCKET)
    
    model_training_op = model_training(**masked**) # try on your own!
    
    model_evaluation_op=  model_evaluation(
                          **masked** # try on your own!
                          accuracy_threshold=0.95, 
                          f1_score_threshold=0.95, 
                          roc_auc_threshold=0.95) 
    
    model_training_op.after(import_data_op)
    model_evaluation_op.after(model_training_op)

Скомпилируйте и запустите конвейер

Двигаясь вперед, мы приступим к компиляции и запуску задания конвейера. Этот шаг относительно прост, поэтому менять особо нечего. После выполнения этого блока кода ваш конвейер начнет работать в разделе Vertex AI → Pipelines.

import json

compiler.Compiler().compile(
   pipeline_func=pipeline, package_path="pipeline.json"
)

job = pipeline_jobs.PipelineJob(
   display_name='breast-cancer-pipeline',
   template_path="pipeline.json",
   pipeline_root = f"gs://{FILE_BUCKET}",
   location=REGION,
   enable_caching=False # I encourage you to enable caching when testing as it will reduce resource use
)

job.run()

Мониторинг трубопровода

Наконец-то мы подошли к последнему разделу статьи: мониторинг результатов и оценка моделей. Как вы можете видеть ниже, компонент model-training имеет 3 выходных параметра: точность модели, f1_score и показатель roc_auc. Учитывая, что я не тратил много усилий на настройку гиперпараметров, результаты просто потрясающие с точностью почти 99%.

Последний компонент, model-evaluation, имеет следующие входные и выходные параметры. Обратите внимание, что я использовал 0,95 в качестве порогового значения для всех наших показателей простоты, но эти пороговые значения могут быть установлены более тщательно в зависимости от характера вашего набора данных и конкретного варианта использования. Выходной параметр pass_failпоследнего компонента вернул «Пройдено». Это указывает на то, что модель работает очень хорошо, что позволяет нам перейти к следующим шагам: загрузить модель в реестр моделей и развернуть ее для служб прогнозирования.

Заключение

Как упоминалось ранее, на этом завершается Часть 1 этой серии. Я надеюсь, что это руководство дало вам четкое представление о том, как работают конвейеры Kubeflow. С помощью всего нескольких дополнительных строк кода KFP позволяет нам соединять индивидуально разработанные компоненты в единый конвейер, предлагая ряд преимуществ за счет взаимодействия компонентов. Одним из больших преимуществ KFP является его тесная интеграция с Google Cloud Platform, особенно на странице Vertex AI → Pipelines.

Если у вас есть какие-либо вопросы или если какая-либо часть статьи неясна, пожалуйста, не стесняйтесь оставлять комментарии. Ваше мнение очень ценится. Оставайтесь с нами, чтобы узнать много интересного во второй части, и мы очень ценим ваши хлопки и подписки.

Ссылки:







Повышение уровня кодирования

Спасибо, что являетесь частью нашего сообщества! Перед тем, как ты уйдешь:

🔔 Подписывайтесь на нас: Twitter | ЛинкедИн | "Новостная рассылка"

🚀👉 Присоединяйтесь к коллективу талантов Level Up и найдите прекрасную работу