На днях я работал с Airflow и обнаружил, что не могу найти в Интернете совет по поводу, казалось бы, простых задач:
- Как обработать файл CSV с помощью Airflow?
- Как сохранить фрейм данных в базе данных с помощью Airflow?
- Как использовать scikit-learn в Airflow для создания простой модели машинного обучения?
После некоторой работы я придумал простой рабочий пример, который подробно описан ниже.
Настройка воздушного потока
Я не собираюсь тратить здесь кучу времени, так как это может быть отдельный пост. Я использую Astro CLI для настройки всех своих проектов Airflow. Это не требует усилий и позволяет мне сосредоточиться на проблемах, которые я пытаюсь решить, вместо того, чтобы тратить часы на настройку Airflow.
Данные
Я получил набор данных от Kaggle с атрибутами супергероев и злодеев Marvel. Файл отслеживает такие атрибуты, как интеллект, скорость, сила и мировоззрение (хорошее или плохое). Моя гипотеза состоит в том, что я мог бы использовать эти атрибуты для предсказания хорошего или плохого выравнивания.
Функции Python
Первое, что я сделал, это написал код Python для обработки CSV-файла и сохранения его в локальной базе данных Postgres.
Я храню все CSV-файлы, которые хочу обработать, в папке include моего проекта Airflow. Это позволяет мне иметь виртуальный «рабочий стол» в моем проекте Airflow, где я могу хранить и циклически просматривать файлы.
После чтения CSV в кадре данных pandas я выполняю базовую обработку данных, удаляя все нулевые значения в кадре данных.
Затем я устанавливаю соединение с базой данных Postgres, предоставленной Airflow, что, я уверен, будет несколько спорным. Идеальной установкой было бы иметь ОТДЕЛЬНУЮ аналитическую базу данных для хранения данных по сравнению с базой данных Postgres, предоставляемой Airflow. Большинство экспертов Airflow также посоветовали бы вам установить соединение с базой данных Postgres с помощью функции Connections в Airflow и использовать оператор Postgres для выполнения команд.
Однако этот пример работает. Всегда будут учитываться масштабы и сложные варианты использования, но я намеренно сделал это простым. Я делаю большую часть своей разработки в ноутбуках Jupyter, поэтому для меня важно иметь простой способ перевести эту работу в DAG Airflow. Описанный здесь подход буквально позволяет мне копировать и вставлять работу из моей записной книжки в операторы Python в моей DAG Airflow без особых опасений.
def upload_csv(): #read csv file to df df = pd.read_csv(AIRFLOW_HOME + '/include/charcters_stats.csv') #drop nulls df = df.dropna() #create engine engine = create_engine('postgresql://postgres:postgres@tatooine_3cc253-postgres-1:5432/postgres') #df to database df.to_sql('characters1', engine, if_exists='replace')
Вторая функция более сложная, так как она выполняет код, используемый для обучения и тестирования базовой логистической регрессии.
Я снова устанавливаю соединение с базой данных Airflow Postgres и использую SQL-запрос для извлечения данных, которые я сохранил на шаге 1. Это важно, поскольку я считаю этот метод передачи данных между задачами намного проще, чем использование xcom или других методов, специфичных для Airflow.
Я создаю X, атрибуты героя и злодея, и Y, очерчивание выравнивания, поэтому я могу создать тренировочную и тестовую разбивку, соответствующую логистической регрессии. Затем я делаю прогнозы для каждого символа и объединяю прогнозируемые значения обратно в исходный набор данных, чтобы рассчитать точность модели. Я сохраняю окончательный фрейм данных в новой таблице с именем characters1scored, поэтому я могу получить доступ к полученным данным в любое время.
def retrieve_predict(): # create engine engine = create_engine('postgresql://postgres:postgres@docker-postgres-1:5432/postgres') # retrieve data df = pd.read_sql("SELECT * FROM characters1",con=engine) # create X and Y X = df[['Intelligence','Strength','Speed','Durability','Power','Combat','Total']] y = df['Alignment'] # create train and test split X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=1, stratify=y) # choose classifier classifier = LogisticRegression(solver='lbfgs', random_state=1) # fit classifier classifier.fit(X_train, y_train) # create predictions predictions = classifier.predict(X_test) # create dataframe with predicted v actual dfx = pd.DataFrame({"Prediction": predictions, "Actual": y_test}) # concat with original df result = pd.concat([df, dfx], axis=1, join='inner') # send results to sql result.to_sql('characters1scored', engine, if_exists='replace') # print accuracy score print(accuracy_score(y_test, predictions))
ЗДР
Все это упаковывается в DAG (направленный ациклический граф), который представляет собой способ Airflow создавать зависимости задач и линейную логику между всеми компонентами вашего кода.
Моя DAG довольно проста, так как состоит всего из двух функций, обе из которых выполняются операторами Python. По мере роста этого пайплайна я могу добавлять больше функций, задач и взаимозависимой логики. Я уверен, что здесь можно сделать много улучшений, но я намеренно сделал это простым, чтобы его можно было перепрофилировать для ваших собственных проектов.
from airflow.models import DAG from datetime import datetime from airflow.operators.python import PythonOperator import pandas as pd import os from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score import numpy as np from sqlalchemy import create_engine AIRFLOW_HOME = os.getenv('AIRFLOW_HOME') def upload_csv(): #read csv file to df df = pd.read_csv(AIRFLOW_HOME + '/include/charcters_stats.csv') #drop nulls df = df.dropna() #create engine engine = create_engine('postgresql://postgres:postgres@docker-postgres-1:5432/postgres') #df to database df.to_sql('characters1', engine, if_exists='replace') def retrieve_predict(): # create engine engine = create_engine('postgresql://postgres:postgres@tatooine_3cc253-postgres-1:5432/postgres') # retrieve data df = pd.read_sql("SELECT * FROM characters1",con=engine) # create X and Y X = df[['Intelligence','Strength','Speed','Durability','Power','Combat','Total']] y = df['Alignment'] # create train and test split X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=1, stratify=y) # choose classifier classifier = LogisticRegression(solver='lbfgs', random_state=1) # fit classifier classifier.fit(X_train, y_train) # create predictions predictions = classifier.predict(X_test) # create dataframe with predicted v actual dfx = pd.DataFrame({"Prediction": predictions, "Actual": y_test}) # concat with original df result = pd.concat([df, dfx], axis=1, join='inner') # send results to sql result.to_sql('characters1scored', engine, if_exists='replace') # print accuracy score print(accuracy_score(y_test, predictions)) with DAG( dag_id='csv_example', schedule_interval='@daily', start_date=datetime(2022, 3, 1), catchup=False ) as dag: # Upload the file t1 = PythonOperator( task_id='read_hero_villian_stats', python_callable=upload_csv, ) t2 = PythonOperator( task_id='score_hero_villian', python_callable=retrieve_predict, ) t1 >> t2
Результаты
Показатель точности модели составляет 71%, что оставляет желать лучшего. Я думаю, что эксперименты с различными классификаторами и разработка некоторых дополнительных функций могли бы значительно улучшить это число, но это служит цели создания простой основы для будущего.
Я также представлял себе, что наберу нового героя (может быть, Homelander?) с низким интеллектом и максимальными атрибутами до конца пути. Этот воображаемый новый герой (или злодей) был классифицирован как хороший в соответствии с моделью.
new_data = np.array([[50,100,100,100,100,100,550]]) new_prediction = classifier.predict(new_data) print(f"The new character was classified as: {new_prediction}") ###The new character was classified as: ['good'
Заключение
Хотя наша модель имеет большие возможности для улучшения, мы успешно создали конвейер для обработки CSV, сохранения его в базе данных и развертывания простого алгоритма машинного обучения для прогнозирования двоичной классификации. Кроме того, все автоматизировано для ежедневной работы благодаря Airflow! Надеюсь, это послужит полезным шаблоном для тех, кто изо всех сил пытается превратить свои проекты по науке о данных в автоматизированные конвейеры с помощью Airflow.