На днях я работал с Airflow и обнаружил, что не могу найти в Интернете совет по поводу, казалось бы, простых задач:

  • Как обработать файл CSV с помощью Airflow?
  • Как сохранить фрейм данных в базе данных с помощью Airflow?
  • Как использовать scikit-learn в Airflow для создания простой модели машинного обучения?

После некоторой работы я придумал простой рабочий пример, который подробно описан ниже.

Настройка воздушного потока

Я не собираюсь тратить здесь кучу времени, так как это может быть отдельный пост. Я использую Astro CLI для настройки всех своих проектов Airflow. Это не требует усилий и позволяет мне сосредоточиться на проблемах, которые я пытаюсь решить, вместо того, чтобы тратить часы на настройку Airflow.

Данные

Я получил набор данных от Kaggle с атрибутами супергероев и злодеев Marvel. Файл отслеживает такие атрибуты, как интеллект, скорость, сила и мировоззрение (хорошее или плохое). Моя гипотеза состоит в том, что я мог бы использовать эти атрибуты для предсказания хорошего или плохого выравнивания.

Функции Python

Первое, что я сделал, это написал код Python для обработки CSV-файла и сохранения его в локальной базе данных Postgres.

Я храню все CSV-файлы, которые хочу обработать, в папке include моего проекта Airflow. Это позволяет мне иметь виртуальный «рабочий стол» в моем проекте Airflow, где я могу хранить и циклически просматривать файлы.

После чтения CSV в кадре данных pandas я выполняю базовую обработку данных, удаляя все нулевые значения в кадре данных.

Затем я устанавливаю соединение с базой данных Postgres, предоставленной Airflow, что, я уверен, будет несколько спорным. Идеальной установкой было бы иметь ОТДЕЛЬНУЮ аналитическую базу данных для хранения данных по сравнению с базой данных Postgres, предоставляемой Airflow. Большинство экспертов Airflow также посоветовали бы вам установить соединение с базой данных Postgres с помощью функции Connections в Airflow и использовать оператор Postgres для выполнения команд.

Однако этот пример работает. Всегда будут учитываться масштабы и сложные варианты использования, но я намеренно сделал это простым. Я делаю большую часть своей разработки в ноутбуках Jupyter, поэтому для меня важно иметь простой способ перевести эту работу в DAG Airflow. Описанный здесь подход буквально позволяет мне копировать и вставлять работу из моей записной книжки в операторы Python в моей DAG Airflow без особых опасений.

def upload_csv():
    #read csv file to df 
    df = pd.read_csv(AIRFLOW_HOME + '/include/charcters_stats.csv')
    #drop nulls
    df = df.dropna()
    #create engine
    engine = create_engine('postgresql://postgres:postgres@tatooine_3cc253-postgres-1:5432/postgres')
    #df to database
    df.to_sql('characters1', engine, if_exists='replace')

Вторая функция более сложная, так как она выполняет код, используемый для обучения и тестирования базовой логистической регрессии.

Я снова устанавливаю соединение с базой данных Airflow Postgres и использую SQL-запрос для извлечения данных, которые я сохранил на шаге 1. Это важно, поскольку я считаю этот метод передачи данных между задачами намного проще, чем использование xcom или других методов, специфичных для Airflow.

Я создаю X, атрибуты героя и злодея, и Y, очерчивание выравнивания, поэтому я могу создать тренировочную и тестовую разбивку, соответствующую логистической регрессии. Затем я делаю прогнозы для каждого символа и объединяю прогнозируемые значения обратно в исходный набор данных, чтобы рассчитать точность модели. Я сохраняю окончательный фрейм данных в новой таблице с именем characters1scored, поэтому я могу получить доступ к полученным данным в любое время.

def retrieve_predict():
    # create engine
    engine = create_engine('postgresql://postgres:postgres@docker-postgres-1:5432/postgres')
    # retrieve data
    df = pd.read_sql("SELECT * FROM characters1",con=engine)
    # create X and Y
    X = df[['Intelligence','Strength','Speed','Durability','Power','Combat','Total']]

    y = df['Alignment']
    # create train and test split
    X_train, X_test, y_train, y_test = train_test_split(X, 
                                                        y, 
                                                        random_state=1, 
                                                        stratify=y)

    # choose classifier
    classifier = LogisticRegression(solver='lbfgs', random_state=1)
    # fit classifier 
    classifier.fit(X_train, y_train)
    # create predictions
    predictions = classifier.predict(X_test)
    # create dataframe with predicted v actual
    dfx = pd.DataFrame({"Prediction": predictions, "Actual": y_test})
    # concat with original df
    result = pd.concat([df, dfx], axis=1, join='inner')
    # send results to sql 
    result.to_sql('characters1scored', engine, if_exists='replace')
    # print accuracy score 
    print(accuracy_score(y_test, predictions))

ЗДР

Все это упаковывается в DAG (направленный ациклический граф), который представляет собой способ Airflow создавать зависимости задач и линейную логику между всеми компонентами вашего кода.

Моя DAG довольно проста, так как состоит всего из двух функций, обе из которых выполняются операторами Python. По мере роста этого пайплайна я могу добавлять больше функций, задач и взаимозависимой логики. Я уверен, что здесь можно сделать много улучшений, но я намеренно сделал это простым, чтобы его можно было перепрофилировать для ваших собственных проектов.

from airflow.models import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import numpy as np 
from sqlalchemy import create_engine


AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')

def upload_csv():
    #read csv file to df 
    df = pd.read_csv(AIRFLOW_HOME + '/include/charcters_stats.csv')
    #drop nulls
    df = df.dropna()
    #create engine
    engine = create_engine('postgresql://postgres:postgres@docker-postgres-1:5432/postgres')
    #df to database
    df.to_sql('characters1', engine, if_exists='replace')

def retrieve_predict():
    # create engine
    engine = create_engine('postgresql://postgres:postgres@tatooine_3cc253-postgres-1:5432/postgres')
    # retrieve data
    df = pd.read_sql("SELECT * FROM characters1",con=engine)
    # create X and Y
    X = df[['Intelligence','Strength','Speed','Durability','Power','Combat','Total']]

    y = df['Alignment']
    # create train and test split
    X_train, X_test, y_train, y_test = train_test_split(X, 
                                                        y, 
                                                        random_state=1, 
                                                        stratify=y)

    # choose classifier
    classifier = LogisticRegression(solver='lbfgs', random_state=1)
    # fit classifier 
    classifier.fit(X_train, y_train)
    # create predictions
    predictions = classifier.predict(X_test)
    # create dataframe with predicted v actual
    dfx = pd.DataFrame({"Prediction": predictions, "Actual": y_test})
    # concat with original df
    result = pd.concat([df, dfx], axis=1, join='inner')
    # send results to sql 
    result.to_sql('characters1scored', engine, if_exists='replace')
    # print accuracy score 
    print(accuracy_score(y_test, predictions))


with DAG(
    dag_id='csv_example',
    schedule_interval='@daily',
    start_date=datetime(2022, 3, 1),
    catchup=False
) as dag:

    # Upload the file
    t1 = PythonOperator(
        task_id='read_hero_villian_stats',
        python_callable=upload_csv,
    )

    t2 = PythonOperator(
        task_id='score_hero_villian',
        python_callable=retrieve_predict,
    )


    t1 >> t2 

Результаты

Показатель точности модели составляет 71%, что оставляет желать лучшего. Я думаю, что эксперименты с различными классификаторами и разработка некоторых дополнительных функций могли бы значительно улучшить это число, но это служит цели создания простой основы для будущего.

Я также представлял себе, что наберу нового героя (может быть, Homelander?) с низким интеллектом и максимальными атрибутами до конца пути. Этот воображаемый новый герой (или злодей) был классифицирован как хороший в соответствии с моделью.

new_data = np.array([[50,100,100,100,100,100,550]])
new_prediction = classifier.predict(new_data)
print(f"The new character was classified as: {new_prediction}")

###The new character was classified as: ['good'

Заключение

Хотя наша модель имеет большие возможности для улучшения, мы успешно создали конвейер для обработки CSV, сохранения его в базе данных и развертывания простого алгоритма машинного обучения для прогнозирования двоичной классификации. Кроме того, все автоматизировано для ежедневной работы благодаря Airflow! Надеюсь, это послужит полезным шаблоном для тех, кто изо всех сил пытается превратить свои проекты по науке о данных в автоматизированные конвейеры с помощью Airflow.