Я читал статью Норма и заинтересовался тем, как использовать d6tflow для обертывания рабочих процессов обработки данных в DAG (направленный ациклический граф), управляемый d6tflow.

Я рекомендую вам прочитать статью Норма, чтобы лучше понять мотивацию и предысторию того, почему вам следует использовать DAG.

Ориентированный ациклический граф - это конечный ориентированный граф, не содержащий циклов. Короче говоря, DAG - это граф, в котором вы можете начать с любого узла, следовать направленным стрелкам и никогда не возвращаться к тому месту, где вы начали. Хорошая аналогия - представить ручей с проточной водой. Цепочка зависимостей задач в рабочем процессе обработки данных обычно может быть естественным образом представлена ​​в виде ориентированного ациклического графа.

После некоторых экспериментов по разработке выразительных и простых в использовании примеров / рабочих процессов машинного обучения в блокнотах jupyter я начал чувствовать, что могу обойтись более явной организацией и бухгалтерией в блокнотах. Наличие функций, определенных в разных местах записной книжки, в сочетании с характером состояния ячеек записной книжки jupyter, часто причиняло мне боль при попытке отладки, почему конвейер машинного обучения не давал мне того, что я ожидал.

Я склонен согласиться с мнением, что рабочие процессы в области науки о данных лучше писать как набор задач с зависимостями между ними, а не как набор функций, которые вы должны гарантировать, которые выполняются линейно. Вы вынуждены определять, что должно быть выполнено, когда и какие шаги требуют вывода из предыдущих шагов. Это также полезно, потому что заставляет вас отойти от кода и подумать о рабочем процессе, который вы создаете, в терминах всего конвейера, а не просто результата какой-то ячейки или набора ячеек.

Цель этой статьи - предоставить подробный пример того, как обернуть полный рабочий процесс машинного обучения в задачи d6tflow. Я подумал, что было бы полезно собрать уроки, которые я извлек из разных источников документации и примеров, в один пост в блоге. Я пытаюсь создать руководство, которое поможет людям начать работу по упаковке своих рабочих процессов в DAG-файлы d6tflow. Я пытаюсь прояснить и разъяснить некоторые ключевые моменты, которые могут быть неочевидными при первоначальном использовании d6tflow. В этом посте я рассмотрю чистый пример конвейера scikit-learn и конвертирую его в набор задач d6tflow.

Этот пример взят из Практическое машинное обучение с помощью Scikit-Learn и TensorFlow: концепции, инструменты и методы для создания интеллектуальных систем, Глава 2, Сквозной пример проекта машинного обучения. Сначала я повторно изложу сокращенную версию конечного результата этого проекта машинного обучения, а затем покажу, как обернуть ее в набор задач d6tflow.

Цель состоит в том, чтобы предсказать среднюю стоимость домов в округах Калифорнии, учитывая некоторую информацию об округах. Мы получаем данные о каждом районе и хотим спрогнозировать среднюю цену дома, так что это проблема регрессии.

# general imports
import pandas as pd
import matplotlib.pyplot as plt
import os
import numpy as np
import d6tflow
# d6tflow uses luigi to intelligently pass parameters upstream and downstream
import luigi
from luigi.util import inherits, requires
# scikit learn components for our workflow
from sklearn.preprocessing import FunctionTransformer, StandardScaler, OneHotEncoder
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
from sklearn.impute import SimpleImputer
from sklearn.model_selection import GridSearchCV, cross_val_score
from sklearn.externals import joblib
from sklearn.metrics import mean_squared_error

Во-первых, нам нужно загрузить данные (которые вы можете скачать здесь). Объявить категориальные переменные категорией как категорию - хорошая идея по нескольким причинам. Это приводит к более эффективному хранению объектно-типизированных данных и позволяет нам выбирать столбцы на основе типов данных позже в нашем ColumnTransformer, и это лишь некоторые из них.

DATA_DIR = "../Aurelien_ML/data"
FILENAME = "housing.csv"
# good practice to use os.path.join to generate full file paths
data = pd.read_csv(os.path.join(DATA_DIR, FILENAME))
# declare features you know are categorical as category 
data.ocean_proximity = data.ocean_proximity.astype("category")
data.info()

Теперь мы разбиваем наши данные на набор независимых переменных (наши характеристики) и зависимую переменную (наша целевая переменная - в данном случае медианная стоимость дома).

target_variable = "median_house_value"
features = [col for col in data.columns if col != target_variable]
housing = data.loc[:,features]
labels = data.loc[:,target_variable]
print("Features to use: \n{}\n".format(list(housing.columns)))
print("Target variable: {}".format(labels.name))

Аурелион Джерон добавляет несколько дополнительных функций, чтобы показать пример того, как создавать собственные преобразователи в scikit-learn. Я включаю функции, чтобы показать, как они вписываются в наш конвейер.

Есть как минимум два способа сделать это, поскольку наша функциональность достаточно проста. Все, что мы сделали, это взяли некоторые характеристики (комнаты, спальни, население, домохозяйства) и создали новые функции, исходя из их соотношений. Первый способ создать простой преобразователь для добавления этих функций в наш набор данных - создать собственный преобразователь с нуля, который наследуется от BaseEstimator и TransformerMixin. Чтобы создать собственный преобразователь, нам просто нужно реализовать методы fit, transform и fit_transform (duck-typing). Мы наследуем от TransformerMixin, чтобы получить fit_transform бесплатно, и мы наследуем от BaseEstimator, чтобы получить set_params и get_params бесплатно. Это позволит нам выполнять настройку гиперпараметров внутри конвейера, получая доступ к параметрам преобразователя (например, включение или отключение функции).

# here is how to do it from scratch with a custom transformer
class AddNewFeatures(BaseEstimator, TransformerMixin):
    """
    a class to add the features for rooms per household, population per household and berooms per room
    we inherit from BaseEstimator and TransformerMixin to get some stuff for free, such as fit_transform
    """
    def __init__(self, column_names):
        self.rooms_index, self.bedrooms_index, self.population_index, self.household_index = [ \
            list(column_names).index(col) for col in ("total_rooms","total_bedrooms","population",\
                                                         "households")]
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        """
        X - np array containing features
        y - np array containing labels
        """
        rooms_per_household = X[:, self.rooms_index] / X[:, self.household_index]
        populations_per_household = X[:, self.population_index] / X[:, self.household_index]
        bedrooms_per_room = X[:, self.bedrooms_index] / X[:, self.rooms_index]
        return np.c_[X, rooms_per_household, populations_per_household, bedrooms_per_room]
Feature_Adder_Trans = AddNewFeatures(data.columns)

Второй способ - использовать FunctionTrasformer API, который позволяет нам создать класс с учетом функции, которая принимает весь набор данных в качестве входных данных и возвращает преобразованную версию набора данных в качестве выходных данных. Вот как реализовать тот же трансформатор, что и выше:

# here is how to use the FunctionTransformer
# first, define the function that specifies the transformation
def add_extra_features(X, cols=None):
    rooms_index, bedrooms_index, population_index, household_index = [ \
            list(cols).index(col) for col in ("total_rooms","total_bedrooms","population",\
                                                         "households")]
    rooms_per_household = X[:, rooms_index] / X[:, household_index]
    populations_per_household = X[:, population_index] / X[:, household_index]
    bedrooms_per_room = X[:, bedrooms_index] / X[:, rooms_index]
    return np.c_[X, rooms_per_household, populations_per_household, bedrooms_per_room]
# then instantiate a FunctionTransformer with the function specifying the transformation
Feature_Adder_FT = FunctionTransformer(add_extra_features, validate=False, kw_args={"cols":housing.columns})

Теперь мы соберем финальный конвейер. Во-первых, нам нужно преобразовать нашу категориальную переменную в горячую кодировку, которая подходит для использования внутри конвейера машинного обучения. Для этого мы вычислим различные категории, которые будут встречаться в нашем единственном категориальном столбце, и передадим их в OneHotEncoder, чтобы у нас не возникло проблем с попыткой кодирования категорий, которые еще не были просмотрены. Это проблема только потому, что если вы не сообщите заранее, какие категории следует ожидать, это может произойти позже, когда мы будем проводить перекрестную проверку, что наш конвейер попытается вычислить оценку производительности на тестовой свертке. и он сталкивается с категорией, которую еще не видел в соответствующей тренировочной папке. (Удалите параметр категорий, чтобы понять, что я имею в виду. Это происходит потому, что ISLAND категория в ocean_proximity встречается крайне редко, поэтому вполне вероятно, что вы не увидите ее в одной из сгенерированных тренировочных сверток).

categories = list(housing.ocean_proximity.unique())

Поскольку у нас есть числовые и категориальные столбцы, нам нужно создать для них разные конвейеры и правильно объединить их в один. Для нашего числового конвейера преобразования просты: у нас есть импьютер для вменения пропущенных значений (с медианой для этого столбца), средство изменения характеристик (добавляет новые столбцы функций) и средство масштабирования (стандартное средство масштабирования масштабирует каждый столбец, чтобы иметь среднее значение 0 и единицу измерения). дисперсия). Для категориального конвейера мы просто используем горячую кодировку категорий. Я не буду вдаваться в подробности о кодировках one-hot, но немного предыстории может быть полезно.

Одно горячее кодирование означает, что мы заменяем каждый элемент столбца вектором, содержащим единицу в соответствующей позиции (это будет важно позже). Поскольку существует 5 возможных категорий (["NEAR BAY", "‹ 1H OCEAN "," INLAND "," NEAR OCEAN "," ISLAND "]), наш вектор будет 5-мерным вектором. Таким образом, наш единственный столбец (Nx1) заменяется матрицей с 5 столбцами (Nx5).

Затем нам нужно последовательно объединить этот числовой конвейер с категориальным. Мы делаем это, помещая их в ColumnTransformer, который был создан для этой цели. Нам нужно передать в каждом кортеже списки числовых имен функций и категориальных имен функций из исходного DataFrame. Мы делаем это на основе их dtypes в исходном DataFrame (этот шаг зависит от того, как мы ранее преобразовали категориальный столбец в «категорию»).

numerical_pipeline = Pipeline([("imputer", SimpleImputer(strategy="median")),
                               ("featurization", Feature_Adder_FT),
                               ("strd_scaler", StandardScaler())], verbose=True)
categorical_pipeline = Pipeline([("one_hot_encoding", OneHotEncoder(categories=[categories]))],verbose=True)
numerical_features = list(housing.select_dtypes(include=[np.number]).columns)
categorical_features = list(
    housing.select_dtypes(include=["category"]).columns)
full_pipeline = ColumnTransformer([("numerical_pipeline", numerical_pipeline, numerical_features),
                                   ("categorical_pipeline", categorical_pipeline, categorical_features)],
                                 verbose=True)

Мы завершили конвейер предварительной обработки. Единственное, что нужно добавить, - это предиктор в конце (оценщик scikit-learn с predictmethod). Но сначала мы должны подвести итоги того, что у нас есть, чтобы убедиться, что мы понимаем, что происходит. Мы начали с 9 характеристик (не считая целевой переменной) - 8 числовых и 1 категориальный. Поскольку мы добавили наш настраиваемый featurizer (добавив 3 новые функции), это число увеличилось до 12. Затем мы заменили категориальный столбец на 5 горячих столбцов, что привело к окончательному подсчету до 12–1 + 5 = 16. Мы можем использовать это как проверку работоспособности:

processed_data = full_pipeline.fit_transform(housing)
processed_data.shape

Большой! Теперь осталось только прикрепить какой-нибудь предиктор к концу нашего конвейера, в результате чего получится еще один конвейер (это может быть любая модель sklearn, для целей этой статьи это не имеет значения):

full_pipeline_with_predictor = Pipeline(
    [("data_preprocessing", full_pipeline), ("linear_regressor", LinearRegression())])

Теперь мы можем использовать этот финальный конвейер, чтобы делать разные крутые штуки! Мы можем выполнить перекрестную проверку, например, для настройки гиперпараметров. Обратите внимание, что для param_grid синтаксис должен дать имя параметра, который вы хотите изменить, в качестве ключа, а значение - это набор значений, которые вы хотите попробовать. Схема именования ключа состоит в том, чтобы следовать именам трансформаторов, пока вы не вернетесь обратно к трансформатору, параметры которого вы хотите изменить (вы разделяете имена разных трансформаторов двойным подчеркиванием, чтобы имена, которые вы даете своим трансформаторам, в кортежах могут быть одиночные символы подчеркивания). Итак, в качестве примера, чтобы изменить параметр strategy преобразователя imputer, мы следуем названиям различных конвейеров, пока не дойдем до imputer. Обратите внимание, вы также можете использовать это, чтобы делать что-то вроде включения и выключения одного из ваших преобразований. Например, если мы хотим отключить наш настраиваемый featurizer, мы могли бы добавить параметр do_featurization в определение класса и обернуть код преобразования внутри блока if-else, говоря, что нужно вернуть исходные данные if do_featurization==False. Затем вы можете добавить в свой param_grid: {"data_preprocessing__numerical_pipeline__featurization__do_featurization":[True, False]}. И поиск по сетке будет сравнивать оба значения со всеми другими возможными комбинациями изменяемых вами параметров. Итак, если бы мы добавили это измерение в нашу сетку параметров, у нас было бы в общей сложности 2x2 = 4 различных набора гиперпараметров, которые можно было бы попробовать. И для каждого набора параметров нам нужно обучить cv=5 складок. Таким образом, это очень быстро становится вычислительно дорогостоящим, но дает более надежную оценку вашей ошибки обобщения.

param_grid = [
    {"data_preprocessing__numerical_pipeline__imputer__strategy": ["mean", "median"]}]
grid_search = GridSearchCV(full_pipeline_with_predictor, param_grid, cv=5, scoring="neg_mean_squared_error",
                           verbose=2, n_jobs=8)
print(grid_search.fit(housing, labels))

Мы также можем оценить с помощью перекрестной проверки с параметрами, которые мы установили в наших конвейерах, и получить баллы:

cross_val_scores = cross_val_score(full_pipeline_with_predictor, housing, labels, cv=10)

Лучший набор параметров можно получить так:

best_params = grid_search.best_params_

Вы можете обучить окончательный оценщик (если вы оставили refit=True, он также будет обученным оценщиком с наивысшей оценкой - который используется по умолчанию и заставляет ваш GridSearchCV объект повторно обучать вашу модель в конце с лучшим набором найденных параметров):

best_model = grid_search.best_estimator_

И, наконец, вы можете сохранить и повторно загрузить конвейер следующим образом:

model = full_pipeline_with_predictor
joblib.dump(model, "full_pipeline_with_predictor.pkl")
model_loaded = joblib.load("full_pipeline_with_predictor.pkl")

Фух! Это была настоящая установка. Теперь мы переходим к относительно простому делу - обернуть все это в DAG d6tflow.

По сути, вы создаете новый класс для каждой задачи, которую хотите отслеживать. Класс становится включенным d6tflow путем наследования от одного из классов внутри d6tflow.tasks.*. Вы можете создавать восходящие зависимости несколькими способами. Вы либо определяете requires метод внутри своего класса, который возвращает экземпляр класса задачи, от которого должен зависеть текущий класс, либо вы украшаете свой класс @requires. Я предпочитаю последний, потому что он фактически автоматически определяет для вас requires метод и позволяет параметрам luigi, определенным в вышестоящих классах, распространяться вниз по течению.

Задача помечается как завершенная, если для нее существует выходной файл.

Это предотвращает повторное выполнение завершенных задач. Задачи обычно имеют некоторый вывод и сохраняются где-нибудь в файле. Мы устанавливаем корень, в котором сохраняются эти выходные данные, используя:

d6tflow.set_dir("d6tflow_output/")

Это означает сохранение вывода в папке с именем d6tflow_output в текущем рабочем каталоге. Обычно это первая строка, которая у меня есть в проекте d6tflow. По умолчанию этот путь установлен на data/.

А теперь перейдем к нашей первой задаче! Сначала мы пишем задачу для загрузки данных и разделения их на функции и метки:

class TaskGetData(d6tflow.tasks.TaskPqPandas):
    persist = ["training_data", "labels"]
    
    def run(self):
        """
        loading data from external files is done inside run
        so now, if you check os.path.join(d6tflow.settings.dirpath, TaskGetData().output().path),
        you should see a parquet file saved there, and this task is considered complete
        """
        data = pd.read_csv(os.path.join(DATA_DIR, FILENAME))
target_variable = "median_house_value"
        features = [col for col in data.columns if col != "median_house_value"]
housing = data.loc[:, features]
        labels = pd.DataFrame(data.loc[:, target_variable])
        # save as parquet
        self.save({"training_data": housing, "labels": labels})

Здесь нужно отметить несколько моментов. Во-первых, в нем нет ни requires функции, ни @requires декоратора. Вот как мы отмечаем отсутствие зависимостей. Во-вторых, он наследуется от d6tflow.tasksTaskPqPandas. Задача d6tflow, от которой наследуется ваш класс, определяется на основе формата файла, в котором вы хотите сохранить выходные данные. Поскольку мы хотим загрузить наши данные как pandas DataFrame и сохранить данные в parquet, это говорит нам, что это класс, от которого мы наследуем. В-третьих, действие задачи всегда содержится внутри ее run метода, который обычно заканчивается self.save, чтобы сохранить результат выполнения вашей задачи. И, наконец, необходимо объявить, что ваша задача будет сохранять как член уровня класса, то есть прямо под строкой class TaskGetData(): внутри persist. Кроме того, имена внутри persist должны совпадать с ключами внутри self.save. Теперь вы можете предварительно просмотреть свой DAG, запустив:

d6tflow.preview(TaskGetData())
# or
get_data_task = TaskGetData()
d6tflow.preview(get_data_task)

И вы запускаете его с помощью:

d6tflow.run(get_data_task)

Теперь, когда вы заглянете внутрь d6tflow_output/, вы должны увидеть другую папку, TaskGetData, содержащую два файла pickle, один для training_data, а другой для labels. Наличие этих двух файлов означает, что задача завершена. Если вы запустите d6tflow.preview(get_data_task), вместо PENDING будет написано ЗАВЕРШИТЬ. Если вы удалите эти паркетные файлы, задача будет зарегистрирована как незавершенная. Попытайся! (удалите файлы и перезапустите d6tflow.preview)

Теперь перейдем к нашей следующей задаче - предварительной обработке. Подавляющее большинство этого кода является точной копией той части, где мы создали конвейеры. Следует отметить несколько моментов. Во-первых, я решил использовать декоратор @requires, чтобы указать, что этот класс зависит от класса TaskGetData. В качестве альтернативы я мог бы определить метод requires, который возвращает TaskGetData(). Во-вторых, этот класс наследуется от d6tflow.tasks.TaskPickle. Это означает, что мы собираемся сохранить результаты этой задачи в виде файлов рассола. Наследование приводит к переопределению метода save для сохранения результирующих объектов в виде файлов pickle. В-третьих, мы определили persist, чтобы показать, что мы собираемся сохранять преобразованные данные, а также конвейер для обработки файлов (предварительно обработанные данные должны быть сохранены в паркете, я мог бы реорганизовать это, я просто подумал, что это не слишком важно для целей данной статьи). В-четвертых, это важный момент, мы определили do_preprocess и categorical_column_name как параметры luigi. Это позволяет разумно передавать параметры нижестоящим задачам (классам), которые требуют этой задачи, а также значения, которые они возвращают в восходящем направлении. Допустим, у нас есть класс, который зависит от TaskPreprocess, назовите его TaskTrain. Украсив TaskTrain символом @requires(TaskPreprocess), мы сможем записать training_task = TaskTrain(do_preprocess=True), а значение параметра do_preprocess будет передано восходящему потоку в TaskPreprocess. На этом этапе вы можете видеть, что эта функциональность снимает значительную головную боль при передаче параметров в DAG. Если вы правильно отметите зависимости в каждой последующей последующей задаче, вы сможете установить любой параметр в любом месте группы DAG, инициализировав его в самой последующей задаче, без каких-либо дополнительных действий. Luigi - еще одна популярная библиотека для простого управления задачами и параметрами. Рекомендую пойти посмотреть. Также следует отметить, что если вы хотите получить доступ к параметру, вам нужно рассматривать его как переменную-член. Итак, нам нужно сказать self.categorical_column_name. Наконец, у нас есть знакомый save метод, который использует словарь, и ключи соответствуют записям внутри persist, а значения в словаре сохраняются как файлы pickle внутри папки d6tflow_output/TaskPreprocess.

@requires(TaskGetData)
class TaskPreprocess(d6tflow.tasks.TaskPickle):
    persist = ["processed_data", "pipeline"]
    categorical_column_name = luigi.Parameter(default="ocean_proximity")
    do_preprocess = luigi.BoolParameter(default=True)
    def run(self):
        # multiple dependencies, multiple outputs
        X = self.input()["training_data"].load()
        # when is loaded back from parquet, we need to convert this one to categorical again
        X[self.categorical_column_name] = X[self.categorical_column_name].astype("category")
        # get unique values in the categorical column
        categories = list(X[self.categorical_column_name].unique())
        # get column names for categorical and numerical columns separately
        numerical_features = list(
                X.select_dtypes(include=[np.number]).columns)
        categorical_features = list(
            X.select_dtypes(include=["category"]).columns)
        
        # we could split these transformations into different tasks if we wanted
        # really fine-grain control over keeping track of the progression of data
        numerical_pipeline = Pipeline([("imputer", SimpleImputer(strategy="median")),
                                       ("featurization", Feature_Adder_FT),
                                       ("strd_scaler", StandardScaler())], verbose=True)
        categorical_pipeline = Pipeline(
            [("one_hot_encoding", OneHotEncoder(categories=[categories]))], verbose=True)
full_pipeline = ColumnTransformer([("numerical_pipeline", numerical_pipeline, numerical_features),
                                    ("categorical_pipeline", categorical_pipeline, categorical_features)],
                                          verbose=True)
        if self.do_preprocess:
            number_categories = len(categories)
            featurized_feature_names = [
                "rooms_per_household", "population_per_household", "bedrooms_per_room"]
            categorical_feature_names = ["one_hot_{:d}".format(
                i) for i in range(number_categories)]
            # because our transformations add columns to our original matrix, need to manually
            # construct column names
            # our featurizer adds 3 new feature columns to the right side of original feature matrix
            # and the categorical columns are added to the right side because it comes after numerical
            # in the full pipeline
            column_names = list(X.drop(self.categorical_column_name,axis=1).columns) + \
                featurized_feature_names + categorical_feature_names
            preprocessed_data = pd.DataFrame(data=full_pipeline.fit_transform(X), \
                                             columns=column_names)
            
        self.save({"processed_data":preprocessed_data,"pipeline":full_pipeline})

Также следует упомянуть, что конкретные значения передаваемых вами параметров учитываются при проверке того, выполнена ли задача или нет. Если у вас есть параметры, то задача с этими параметрами должна быть завершена. Таким образом, вы можете запустить несколько версий задачи с разными значениями параметров и пометить их все как выполненные.

Одна из первых проблем, с которыми мы столкнулись до сих пор, заключается в том, что, поскольку мы разбиваем исходный конвейер обучения scikit, помещая предиктор в отдельную задачу, нам нужно вручную вычислить правильный порядок имен столбцов, поскольку нам нужно сохраните обработанные данные в файл, чтобы мы могли загрузить их в задачу для обучения нашей модели. Это вызывает некоторый дополнительный код, но не должно быть слишком сложно понять, понимаете ли вы, что делают ваши преобразования. Я сделал это здесь следующим образом: поскольку числовой конвейер предшествует категориальному конвейеру, нам нужно сначала обработать его. Мы берем все исходные числовые имена столбцов (без категориального), а затем добавляем три имени столбца с характеристиками, потому что это то, что добавляется следующим в конвейере. Затем мы добавляем имена для категориальных столбцов в конце, поскольку категориальный конвейер объединяется в конце числовых столбцов.

Теперь создадим задачу для обучения нашей модели. Обратите внимание, что мы используем декоратор @requires с двумя аргументами. Это просто означает, что мы хотим унаследовать параметры luigi от них обоих, и что, когда мы создадим экземпляр TaskTrain, мы сможем предоставить параметры luigi для любой из этих задач через конструктор для TaskTrain. Как обычно, мы наследуем от TaskPickle, потому что собираемся сохранить полученную обученную модель как файл pickle. Кроме того, нам нужно отметить имя в списке persist. Теперь, когда мы передали декоратору @requires два класса, нам нужен способ доступа к их выходным данным. Это делается путем доступа к self.input() в виде списка. Например, чтобы получить результат processed_data для TaskPreprocess, мы используем self.input()[0]["processed_data"].load(). Слева направо этот вызов обращается к списку ввода, выбирает первую зависимость, затем получает вывод этой зависимости (которая является словарем), а затем выбирает ключ processed_data в этом словаре, и мы завершаем его с помощью load() чтобы загрузить файл рассола обратно в память.

@requires(TaskPreprocess, TaskGetData)
class TaskTrain(d6tflow.tasks.TaskPickle):
    """
    you can pass do_preprocess=False and model="svm" to TaskTrain __init__
    """
    persist=["model"]
    model = luigi.Parameter(default='ols')
def run(self):
        # even tho this was a smultiple dependency, single output, self.input()["processed_data"]
        # is still a dictionary, whose key-name is from persist inside TaskPreprocess
        training_data = self.input()[0]["processed_data"].load()
        labels = self.input()[1]["labels"].load()
        if self.model == "ols":
            model = LinearRegression()
        elif self.model == "svm":
            model = SVR()
        else:
            raise ValueError("invalid model selection")
        model.fit(training_data, labels)
        self.save({"model":model})

Теперь, когда наша модель сохранена в файл рассола, мы можем загрузить ее и запустить задачу проверки перекрестной свертки:

@requires(TaskGetData, TaskPreprocess, TaskTrain) # allows me to pass model='svm' to this
class TaskCrossFoldValidation(d6tflow.tasks.TaskCache):
    persist=["cross_val_scores"]
    def requires(self):
        # use self.clone_parent so that when I pass model='svm' to this, it gets passed to TraskTrain
        return {"processed_data":TaskPreprocess(),"model":self.clone_parent()}
def run(self):
        labels = self.input()[0]["labels"].load()
        training_data = self.input()[1]["processed_data"].load()
        model = self.input()[1]["model"].load()
            
        cross_val_scores = cross_val_score(model, training_data, labels, cv=10)
        
        self.save({"cross_val_scores":cross_val_scores})

Наконец, последний тип задач, которые я здесь опишу, - это d6tflow.tasks.TaskAggregator. Эта задача просто порождает множество других объектов задачи. Это сделано для того, чтобы вы могли сохранять свои задачи модульными и иметь какой-то цикл внутри агрегатора, выдающий все задачи, которые вы хотите запустить. Поскольку каждая итерация цикла приводит к созданию отдельного экземпляра задачи с разными параметрами, все задачи будут отдельно помечены как завершенные. Если все поставленные задачи выполнены, задача агрегатора будет помечена как выполненная. Вот пример использования d6tflow.tasks.TaskAggregator для создания отдельных задач для управления вычислением оценки перекрестной проверки для разных моделей. Сама задача агрегатора помечается как завершенная, когда два составляющих ее объекта задачи помечаются как завершенные.

class TaskAggregator(d6tflow.tasks.TaskAggregator):
    def run(self):
        yield TaskPrintCrossValScore(model="svm")
        yield TaskPrintCrossValScore(model="ols")

Используя различные типы задач, которые мы обсуждали, вы просто объединяете их вместе, как обсуждалось ранее, а затем запускаете d6tflow.run(most_downstream_task) для запуска своей группы DAG или d6tflow.preview(most_downstream_task) для предварительного просмотра зависимостей и состояния различных частей.

Что ж, на этом я закончил мое сквозное введение в использование d6tflow для обертывания конвейера sklearn внутри управляемого DAG. Здесь вы можете найти мою записную книжку для этого проекта. Не стесняйтесь, дайте мне знать, если что-то непонятно или у вас есть вопросы в комментариях!