Альтернативный оркестратор данных для Airflow и Prefect

На многих своих предыдущих должностях я был универсальным поставщиком, поэтому я знаю, насколько проблематичными могут быть неверные данные. Я видел, как это влияет на весь бизнес — отчеты показывают разные результаты, изменения стратегии, и клиенты начинают сомневаться в данных и продукте. Эти изменения подчеркивают важность чистых данных.

Как перейти от необработанных данных к чистым данным? Мы создаем конвейеры данных. Многие из них. Затем мы планируем их. Это требует тонны оркестровки, и найти правильный инструмент для этого жизненно важно.

Недавно я немного рассказал о Префекте и моих усилиях по изучению этого феноменального инструмента.

Хотя Prefect определенно меняет правила игры по сравнению с инструментами оркестровки, такими как Airflow и Azure Data Factory, у него все еще есть некоторые недостатки; во-первых, его нелегко интегрировать с машинным обучением и инструментами искры.

Сегодня я представлю инструмент оркестрации данных под названием Flyte, который может все это, плюс еще кое-что.

Вот код и CSV, который я использовал.

Что такое Флайт?

Давайте представим Flyte. Вы когда-нибудь слышали о компании под названием Lyft? Ну, конечно, у вас есть! На самом деле Flyte был создан в Lyft в сотрудничестве со Spotify, Freenome и многими другими.

Он может обслуживать Python, Java и Scala. И он был создан поверх Kubernetes, поэтому он включает в себя все эти преимущества, включая воспроизводимость, переносимость, масштабируемость и надежность. Все находится в вашей инфраструктуре, поэтому вы можете видеть все, что делает Flyte. С таким же мышлением Flyte проверяет и проверяет все выполняемые действия.

Я всегда любил библиотеки с открытым исходным кодом. Конечно, для начала может потребоваться некоторое время, но конечный результат — это объединение всего сообщества для создания отличного продукта. Flyte не является исключением, так как это полностью открытый исходный код с лицензией Apache 2.0 в рамках Linux Foundation с межотраслевым надзорным комитетом.

Ключевые идеи

Наименьшая единица — задача. Это полностью независимые исполнительные блоки и первоклассные сущности Flyte, а значит, основные строительные блоки пользовательского кода. У каждой задачи также есть две характеристики: они отказоустойчивы и имеют механизм кэширования/запоминания.

Комбинация задач составляет то, что называется рабочим процессом. Эти рабочие процессы определены в protobuf, который меньше и быстрее обычного формата JSON. Пользователи могут определять рабочие процессы как набор узлов, и эти узлы в рабочем процессе могут создавать выходные данные, которые последующие узлы могут использовать в качестве входных данных. Эти узлы и их зависимости определяют структуру рабочего процесса. Таким образом, узел будет одного из трех типов в зависимости от его назначения. Узел задачи — это экземпляр задачи, узел рабочего процесса содержит весь подчиненный рабочий процесс, и, наконец, узел ответвления изменит выходные данные потока.

Каждый план запуска связан с определенным рабочим процессом, который имеет уникальный набор входных параметров. Это позволяет использовать динамические и статические потоки. Как только план запуска создан, мы можем легко поделиться им и выполнить его.

Одна проблема с планированием этих планов запуска заключается в том, что их нельзя редактировать после того, как расписание определено для плана запуска. При каждом изменении расписания создается новая версия плана запуска.

Время для некоторых примеров!

Установка локальной песочницы Flyte

Прежде чем мы сможем установить демонстрационный проект Flyte, flytesnacks, нам нужно установить предварительные условия:

  1. Docker — выберите нужную версию Docker и следуйте их инструкциям здесь.
  2. git — выберите нужную версию Docker и следуйте их инструкциям здесь.
  3. флайтектл
brew install flyteorg/homebrew-tap/flytectl

or

curl -sL https://ctl.flyte.org/install | bash

А затем проверьте правильность установки flytectl:

flytectl version

Настроить проект

Во-первых, нам нужно загрузить flytesnacks и flytekit, что мы можем сделать с помощью этого кода:

git clone https://github.com/flyteorg/flytesnacks
cd flytesnacks/cookbook
pip install -r core/requirements.txt

Чтобы убедиться, что в вашей виртуальной среде все работает, запустите hello_world.py локально:

python core/flyte_basics/hello_world.py

Ожидаемый результат:

Running my_wf() hello world

Теперь мы готовы!

Демонстрационный проект

Во-первых, нам нужно настроить демонстрационный кластер Flyte.

flytectl demo start

Затем мы можем протестировать рабочий процесс локально.

pyflyte run core/flyte_basics/hello_world.py:my_wf

И протолкнуть его в кластер.

pyflyte run --remote core/flyte_basics/hello_world.py:my_wf

Вы должны увидеть что-то вроде этого:

Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f82bd12acd76d4505a04 to see execution in the console.

У нас есть консоль. Теперь займемся тренировочным забегом.

pyflyte run --remote core/flyte_basics/basic_workflow.py:my_wf --a 5 --b hello

И вы можете увидеть успешный запуск проекта в консоли, как показано.

Ты видишь то же, что и я? Он разделен на разработку, постановку и производство. Вы когда-нибудь пробовали это с воздушным потоком? Это верно. Не получилось, хахаха.

Вы можете изменить домен с разработки на постановку следующим образом:

pyflyte run -d staging --remote core/flyte_basics/basic_workflow.py:my_wf --a 5 --b hello

Разве это не здорово?

Трубопровод

Вы можете увидеть этот код и набор данных здесь.

Недавно я вытащил некоторые данные из Министерства труда США, в частности, из этих таблиц. Я объединил эти таблицы тех лет и отфильтровал эти файлы по компьютерным и математическим занятиям, чтобы минимизировать размер файла.

Затем я создал конвейер для очистки этой таблицы и, таким образом, настроил рабочий процесс с регулируемым параметром набора данных.

pyflyte run --remote datapipeline.py file_wf --dataset https://raw.githubusercontent.com/sdf94/flyte/master/salary.csv

который запускает следующий рабочий процесс для анализа файла данных из GitHub.

@workflow
def file_wf(
   dataset: str
    = DATASET_REMOTE
) -> pd.DataFrame:
   df = download_file(dataset=dataset)
   df = filter_columns(df=df)
   df = clean_data(df=df)
   df = filter_states(df=df)
   df = apply_types(df=df)
   return df

Каждая из строк соответствует задаче в рабочем процессе.

Самая первая задача — загрузить файл для чтения.

@task
def download_file(
       dataset: str
) -> pd.DataFrame:
    urllib.request.urlretrieve(dataset, DATASET_LOCAL)
    df = pd.read_csv(DATASET_LOCAL)
    return df

Теперь вам не нужно загружать его «локально» или в модуле, поскольку я выполнял эти задания. Вместо этого вы можете использовать FlyteFile или FlyteDirectory.

Вторая задача: я хотел просмотреть только несколько столбцов, которые можно стандартизировать, добавив список столбцов.

@task
def filter_columns(
      df: pd.DataFrame) -> pd.DataFrame: 
    return df[['area_title','occ_title', 'tot_emp',  'jobs_1000', 'a_mean', 'a_pct10', 'a_pct25',
       'a_median', 'a_pct75', 'a_pct90', 'year','o_group']]

Третья задача: мне нужно было очистить их, удалив нулевые значения, удалив дубликаты и удалив строки с символами * и #.

@task
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    df = df.dropna()
    df = df.drop_duplicates()
    df = df[(df != '**').all(1)]
    df = df[(df != '*').all(1)]
    df = df[(df != '#').all(1)]
    return df

Четвертая задача: я отфильтровал штаты только до Вашингтона, Орегона, Калифорнии, Пенсильвании, Техаса, Джорджии, Флориды и Мичигана.

@task
def filter_states(df: pd.DataFrame) -> pd.DataFrame:
    df = df[df['area_title'].str.contains("WA|OR|CA|PA|TX|GA|FL|MI|")]
    df = df[df['area_title'].str.contains("-")]
    return df

Пятая задача: я хотел внедрить типы данных.

@task
def apply_types(df: pd.DataFrame) -> pd.DataFrame:
    return df.astype({"area_title": 'object',
                              "occ_title": 'object', 
                              "tot_emp":'float',
                              "jobs_1000":float,
                              "a_mean":float, 
                              "a_pct10":float,
                              "a_pct25":float,
                              "a_median":float,
                              "a_pct75":float,
                              "a_pct90":float,
                              "year":'object',
                              'o_group':str})

А вот как выглядит пользовательский интерфейс после неудачи и успеха:

Он даже покажет вам, какая задача на самом деле не удалась или удалась в этом случае.

Последние мысли

Я показал, как запускать Flyte локально, а также в демонстрационном кластере Kubernetes. Я выполнял эти задачи, используя разные домены, «постановку» и «разработку», и динамически, где я мог изменить путь к файлу на лету; обе эти функции уникальны для Flyte.

Flyte может предложить гораздо больше, и я продолжаю экспериментировать с этим замечательным оркестратором, в том числе с его способностью обрабатывать алгоритмы машинного обучения и запускать задания.

Надеюсь, вам понравилась эта статья, и дайте мне знать, что вы думаете!