Развязка конвейера машинного обучения: мне удалось написать конвейер машинного обучения, не зависящий от фреймворка, с помощью DVC, Rust и Python

Большинство оркестраторов конвейеров машинного обучения считают Python основным поддерживаемым языком. Кроме того, большинство из них (Kedro, Metaflow и т. д.) являются конвейерами на основе декораторов, где вам нужно использовать python с каким-либо декоратором для определения конвейера. Но что, если вы хотите иметь более экономичное и быстрое решение, в котором некоторые части (например, разработка функций) вы хотите написать на другом языке, а некоторые — на Python? Затем вам понадобится инструмент конвейерной обработки мл на основе конфигурации, где вы будете записывать шаги своего конвейера, например, в yaml, и будет независимый интерфейс командной строки, способный управлять конвейером.

DVC в качестве конвейера:

DVC в основном рассматривают инструмент для управления версиями данных и моделей. Хотя в последних версиях им удалось добавить некоторые удивительные функции MLOps. Теперь вы можете определить свой конвейер как шаги в файле конфигурации и использовать интерфейс командной строки DVC для запуска обучающего конвейера. Вы можете установить DVC как двоичный файл (в Docker или на локальном компьютере). В Главном документе есть инструкция по установке DVC глобально без использования pip или python.

Polars как платформа предварительной обработки данных:

Нет особой причины выбирать Polars. Вместо этого вы можете использовать Pandas, spark, scala, DataFusion, Modin или Julia DataFrame. Я просто хочу продемонстрировать, что могу использовать несколько языков в проекте и писать некоторые части проекта, которые могут быть неэффективны в Python.

Scikit-Learn для обучения моделей:

Scikit-learn — наиболее часто используемый и удобный пакет, доступный на данный момент для обучения простой модели. но можно использовать любой другой обучающий фреймворк, такой как Julia MLJ, Flux, привязка Rust libtorch.

Вы уже можете видеть преимущества разделения рабочего процесса. Настройка проста, шаги не зависят друг от друга. Это означает, что в будущем вы сможете переписать любые шаги конвейера в другую структуру, если входные и выходные данные совпадают.

Отказ от ответственности: сам DVC написан на Python, поэтому вы не собираетесь полностью избавляться от python, но этот python управляется внутри DVC. Ваш собственный код может быть независимым от Framework, если вы используете его.

Приступим к настройке проекта!

Данные, которые я использую в проекте, довольно малы, и использование Rust может показаться слишком сложным. Но моя цель здесь — показать, как мы можем иметь независимый от Framework конвейер данных, который определенно можно использовать в проектах с большим объемом данных и ресурсоемкой обработкой с помощью spark, scala или rust.

После того, как вы успешно установите DVC на свой компьютер, вы можете проверить это с помощью следующей команды в терминале:

datapsycho@dataops  ~ $ dvc --version
2.34.1

Теперь поместите cd в каталог и создайте новый проект:

datapsycho@dataops  ~ $ cd RustProjects/
datapsycho@dataops  ~/RustProjects $ cargo new torch-mlops
     Created binary (application) `torch-mlops` package
datapsycho@dataops  ~/RustProjects $ cd torch-mlops/

Не путайте с названием проекта. Я выбрал это имя, потому что в будущем я планирую расширить этот проект для обучения моделей с использованием привязки libtorch для ржавчины под названием tch-rs. Отсюда и такое имя.

Здесь я создал новый проект с помощью Cargo (менеджер пакетов для Rust). Давайте создадим среду Python с Poetry:

datapsycho@dataops  ~/.../torch-mlops $ poetry init

... more QnA ...

Would you like to define your main dependencies interactively? (yes/no) [yes] no
Would you like to define your development dependencies interactively? (yes/no) [yes] no
...

Давайте добавим следующую строку в файл pyproject.toml и запустим poetry install для установки необходимых пакетов:

# pyproject.yaml
[tool.poetry.dependencies]
python = "^3.10"
pandas = "^1.5.1"
PyYAML = "^6.0"
scikit-learn = "^1.1.3"

Давайте также добавим Required Creates (Packages) для Rust:

# Cargo.toml
[dependencies]
polars = { version="0.25.1", features = ["lazy", "csv-file", "strings", "temporal", "dtype-duration", "dtype-categorical", "concat_str", "list_eval", "rank", "lazy_regex"]}

Теперь запуск cargo build установит необходимые пакеты в проект. Давайте инициализируем DVC с помощью dvc init и добавим следующий каталог для хранения данных и модели:

datapsycho@dataops  ~/.../mpg-car-pipeline $ mkdir data
datapsycho@dataops  ~/.../mpg-car-pipeline $ mkdir models
datapsycho@dataops  ~/.../mpg-car-pipeline $ mkdir data/raw
datapsycho@dataops  ~/.../mpg-car-pipeline $ mkdir data/porcessed 

Я скачал данные auto-mpg.csv с Kaggle и поместил их в каталог data/raw. На этом мы закончили основную настройку проекта.

Создадим скрипт обработки данных на Rust (Polars):

use polars::{prelude::*, datatypes::DataType::Float64};
use std::path::Path;
use std::fs;


fn read_csv_into_df() -> Result<DataFrame, PolarsError>{
    // TODO: Remove the static file path into the input parameter
    let file_path = Path::new("data/raw/autompg.csv");
    let df = CsvReader::from_path(file_path).unwrap()
      .infer_schema(None)
      .has_header(true)
      .finish();
    df
}

// categorize continous data
fn bucketize(val: &Series) -> Series{
    let result = val.i64().unwrap().into_iter().map(|yr| {
        yr.map(|value| {
            let value = if value < 73 {
                0
            } else if value >= 73 && value < 76 {
                1} else if value >=76 && value < 79 {
                    2} else if value >= 79 {
                        3} else {
                            999
                        };
            value
        })
    });
    result.collect::<Int64Chunked>().into_series()
}

// Apply standardization on a arrow series like numpy series
fn standardize(val : &Series) -> Series {
    let val_mean = val.mean().unwrap();
    let casted_series = val.cast(&Float64).unwrap();
    let val_std = val.std_as_series(1).iter().nth(0).unwrap().try_extract::<f64>().unwrap();
    let result = casted_series.f64().unwrap().into_iter().map(|atom|{
        atom.map(|proton|{(proton - val_mean)/val_std as f64})
    });
    result.collect::<Float64Chunked>().into_series()
}

fn apply_standardization(df: DataFrame, col_list: Vec<&str>) -> DataFrame {
    let mut _df = df.clone();
    for item in col_list.iter(){
        // let std_result = standardize(df.column(*item).unwrap());
        let _df = _df.apply(item, standardize).unwrap();
    }
    _df
}

fn write_to_csv(path: String, mut df: DataFrame) {
    let mut file = std::fs::File::create(path).unwrap();
    CsvWriter::new(&mut file).finish(&mut df).unwrap();
}

fn main() -> PolarsResult<()>{
    fs::create_dir_all("data/processed")?;
    let mut df = read_csv_into_df().unwrap();
    let df = df.apply("model year", bucketize)?;
    let df = df.drop("car name").unwrap();
    let df = df.drop("origin").unwrap();
    let df = df.drop_nulls(None)?;
    let mean_df = df.mean();
    let std_df = df.std(1);
    let df = apply_standardization(
        df, 
        vec!["displacement", "horsepower", "weight", "acceleration", "cylinders"]
    );
    println!("{:?}", df.tail(Some(3)));
   // TODO: remove static write paths into the parameter
    write_to_csv("data/processed/autompg.csv".to_string(), df);
    write_to_csv("data/processed/autompg_mean.csv".to_string(), mean_df);
    write_to_csv("data/processed/autompg_std.csv".to_string(), std_df);
    println!("Pipeline Executed Successfully!");
    Ok(())
}

Здесь я прочитал данные с Polars и обработал их дальше. Многие функции ML, такие как создание категориального сегмента, кодирование One Hot недоступно в среде Polars, поэтому мне пришлось создать свою собственную функцию. Вот основной список обработки:

  • Разделите столбец года на категории
  • Удалить несколько столбцов
  • Удалить нулевые значения
  • Стандартизируйте числовые столбцы

Конечным результатом работы скрипта является обработанный CSV-файл, который будет использоваться на этапе обучения. Запуск cargo build -r создаст двоичный файл, который можно запустить из командной строки.

Давайте создадим файл .py в pysrc/train.py и определим сценарий обучения на Python:

import pandas as pd
import pickle
import sys
import yaml
from sklearn.ensemble import RandomForestRegressor
from pathlib import Path
from collections import namedtuple

DataRepo = namedtuple("DataRepo", "features, targets")
PARAMS = yaml.safe_load(open("params.yaml"))["train"]
MODEL_REPO = Path("models")


if len(sys.argv) != 3:
    sys.stderr.write("Arguments error. Usage:\n")
    sys.stderr.write("\tpython train.py features model\n")
    sys.exit(1)

input = sys.argv[1]
output = sys.argv[2]
seed = PARAMS["seed"]
n_est = PARAMS["n_est"]
min_split = PARAMS["min_split"]


def read_data() -> DataRepo:
    feature_cols = [
        "cylinders",
        "displacement",
        "horsepower",
        "weight",
        "acceleration",
        "model year",
    ]

    df = pd.read_csv(Path(input).joinpath("autompg.csv"))
    features = df[feature_cols].to_numpy()
    targets = df["mpg"].to_numpy()
    return DataRepo(features=features, targets=targets)


def train_clf(data: DataRepo) -> RandomForestRegressor:
    clf = RandomForestRegressor(
        n_estimators=n_est, 
        min_samples_split=min_split, 
        n_jobs=2, 
        random_state=seed
    )

    clf.fit(data.features, data.targets)
    return clf



def save_model(model: RandomForestRegressor):
    write_path = MODEL_REPO.joinpath(output)
    with open(write_path, "wb") as fd:
        pickle.dump(model, fd)
        sys.stderr.write(f"Model written successfully at {write_path}")


def main():
    data_repo = read_data()
    sys.stderr.write("X matrix size {}\n".format(data_repo.features.shape))
    sys.stderr.write("Y matrix size {}\n".format(data_repo.targets.shape))
    model = train_clf(data=data_repo)
    MODEL_REPO.mkdir(parents=True, exist_ok=True)
    save_model(model=model)

if __name__ == "__main__":
    main()

Здесь я загрузил обработанные данные, обучил регрессор RandomForest и сохранил модель в каталоге моделей. Я намеренно хотел иметь очень простой конвейер, чтобы его было легко понять.

Как мы можем организовать весь рабочий процесс? А вот и магия DVC ✨. Чтобы иметь конвейер DVC, нам нужно добавить два файла dvc.yaml и params.yaml. dvc.yaml будет иметь основной конвейер, а params.yaml будет иметь любые параметры, которые могут потребоваться изменить позже в целях эксперимента.

# params.yaml
train:
  seed: 20170428
  n_est: 50
  min_split: 0.01

У меня просто есть 3 параметра для шагов обучения, здесь вы можете указать любые другие параметры, которые хотите. Подобно Hydra, ссылки на параметры по имени не разрешены в файле в DVC.

Теперь давайте добавим первый шаг в конвейер:

stages:
  build_bin:
    cmd: cargo build --release
    deps:
      - src/main.rs
      - Cargo.toml

Название первого шага — Build Bin. cmd указывает, что команда DVC должна выполняться для этих шагов, и шаг также зависит от двух файлов. Таким образом, любые дальнейшие изменения в этих файлах будут распознаваться DVC. Чтобы запустить текущий конвейер, мы можем запустить dvc repro команду, которая запустит Cargo под капотом и создаст двоичный файл. В следующий раз, когда мы запустим dvc repro, этот шаг будет проигнорирован, если в файле груза src/main.rs нет изменений.

Добавим второй шаг:

stages:
  # ... previous step
  preprocess:
    cmd: ./target/release/torch-mlops
    deps:
      - data/raw/autompg.csv
      - ./target/release/torch-mlops
    outs:
      - data/processed

Здесь мы используем двоичный файл, созданный на первом этапе, и преобразуем данные (команда cmd), шаг зависит от необработанных данных и двоичного файла (раздел deps), вывод сохраняется в обработанном каталоге (outs). Таким образом, при первом запуске dvc repro этот шаг будет выполнен, а в следующий раз он будет пропущен, если в зависимостях нет изменений.

Добавим последний шаг:

stages:
  # ...
  train:
    cmd: python pysrc/train.py data/processed model.pkl
    deps:
      - data/processed
      - pysrc/train.py
    params:
      - train.min_split
      - train.n_est
      - train.seed
    outs:
      - models

Он имеет все разделы, как и раньше, с одним дополнительным параметром раздела. Таким образом, при первом запуске dvc repro будут выполнены последние шаги, и эти шаги будут пропущены, если в других шагах не будет никаких изменений. Вы можете запустить dvc repro — force, чтобы принудительно запустить весь конвейер. Полный конвейер выглядит следующим образом:

stages:
  build_bin:
    cmd: cargo build --release
    deps:
      - src/main.rs
      - Cargo.toml
  preprocess:
    cmd: ./target/release/torch-mlops
    deps:
      - data/raw/autompg.csv
      - ./target/release/torch-mlops
    outs:
      - data/processed
  train:
    cmd: python pysrc/train.py data/processed model.pkl
    deps:
      - data/processed
      - pysrc/train.py
    params:
      - train.min_split
      - train.n_est
      - train.seed
    outs:
      - models

Как видите, зависимости определены в файле yaml, но не в коде без декоратора @pipeline, @steps, что делает его независимым от фреймворка. Весь процесс отслеживается с помощью хеш-значений, созданных git и DVC для каждой зависимости, которые генерируются и управляются DVC автоматически.

📦 Полную версию проекта можно найти на моем GitHub ml-pipeline-with-dvc.

Это оно! Теперь у нас есть полностью автоматизированный конвейер данных, не зависящий от Framework. Любая часть может быть переписана на любом предпочитаемом языке или платформе. В DVC есть намного больше, чем то, что я показал, например, CICD для ML, Model Registry и т. д. Не стесняйтесь исследовать. Дайте мне знать, что вы думаете, или любые дальнейшие предложения будут оценены!

Здравствуйте!
Если вы уже знаете популярную платформу фреймов данных pandas и хотите быстро изучить PySpark, платформу фреймов больших данных, я вам помогу. Я только что запустил свой первый онлайн-курс под названием Pandas to PySpark DataFrame. Который доступен в Educaive здесь, а также в виде книги в Learnpub здесь. Эти курсы предназначены для быстро обучающихся, таких как я, которые больше заинтересованы в проектном обучении.