Получение данных в реальном времени из канала 311 Сан-Франциско

За последние несколько недель мы обсудили несколько важных тем в мире инженерии данных и автоматизации.

Мы заложили основу для понимания словаря и основных понятий, которые использует инженер данных. Пришло время приступить к созданию вашего первого набора пакетных заданий.

Для этого мы будем использовать библиотеку Apache Airflow, которая поможет автоматизировать нашу работу.

Кроме того, для нашего источника данных в реальном времени мы будем использовать набор данных sfgov 311. Вы можете получить эту информацию в любое время и получить самые свежие данные о 311 отчетах. Обычно это включает в себя такие вещи, как вандализм, нарушение правил парковки и т. Д.

Для этого конвейера мы сначала извлечем данные в необработанный CSV, а затем загрузим их в базу данных MySQL.

В этой статье мы опишем код, необходимый для этого, а также укажем на некоторые причины различных шагов, которые мы предпринимаем.

Создание функции извлечения JSON

Прежде чем начать, вам необходимо настроить среду Airflow, чтобы иметь возможность выполнять каждый из шагов, описанных в этой статье. Если вы еще этого не сделали, мы считаем эту статью одной из наших любимых.

При создании конвейеров данных, особенно тех, которые более ориентированы на пакетную обработку, полезно извлекать данные на уровень необработанных данных. Это позволяет сделать резервную копию необработанных данных.

Когда есть ошибка в данных, наличие необработанного файла может помочь вам выяснить, где ошибка данных - в источнике или в вашем общем процессе.

В нашем случае мы будем извлекать данные из набора данных JSON, который находится в сети. Мы можем сделать это с помощью функции pandas с именем read_json. Это может читать файл или URL.

Мы сделаем это, создав функцию, которую вы можете вызвать, которая будет извлекать данные на основе JSON из URL-адреса или файла.

У нас также может быть временная метка для имени файла, чтобы мы знали, когда мы извлекли данные. Это можно использовать позже при попытке отследить любые изменения в данных, которые могут быть скорее снимком, чем обновлением данных.

Это можно сделать с помощью объекта DateTime , как показано ниже:

Это имя файла будет использоваться позже с функцией извлечения, которую мы создаем ниже:

Эту задачу еще предстоит реализовать. Однако мы покажем вам это в конце, как только у нас будет настроена функция загрузки.

Загрузка данных в MySQL с помощью воздушного потока

После того, как у вас есть отрывок, вашим следующим шагом будет загрузка ваших данных в какой-то необработанный слой в вашем хранилище данных.

Главное на этом этапе - не манипулировать вашими данными. Причина в том, что если в ваших данных есть какая-то проблема с данными или проблема из источника данных, то их легче отследить.

Вы можете сделать это, проверяя качество данных на каждом этапе. В необработанных проверках вы обычно проверяете, имеют ли типы данных смысл.

Например, все ли поля даты датированы? Все ли состояния действительны? Вы не поверите, но у нас здесь были проблемы. «МЫ» не является сокращением штата.

Это проверки работоспособности, чтобы убедиться, что данные, которые вы извлекаете, верны.

Теоретически в вашем приложении должны быть дважды проверены вводимые пользователем данные. Однако мы никогда не доверяем прикладному уровню.

Помимо всего этого, вы можете использовать приведенный ниже код. Вы заметите, что сначала мы подключаемся к базе данных с помощью MySQL, а затем загружаем CSV построчно:

Если бы вы создавали более надежную систему, вы, вероятно, создали бы некую форму класса менеджера баз данных, который просто принимал импортированную вами строку подключения.

Однако, поскольку мы создаем это только для демонстрации, мы объединили этот код в одной функции.

После настройки всех этих функций вы можете официально настроить DAG. Как я уже упоминал в предыдущих статьях, группа доступности базы данных действует как блок-схема. Он указывает, какие задачи запускаются первыми и какие задачи зависят от других задач.

В нашем случае у нас есть задача, которая извлекает данные, и другая задача, которая загружает указанные данные в таблицу MySQL. Эти две основные задачи помогут запустить ваш конвейер, и он будет выглядеть так, как показано ниже.

Настройка вашего трубопровода воздушного потока

Теперь, имея все эти функции, мы можем настроить конвейер.

Настройка реального конвейера в Airflow требует, чтобы вы установили набор аргументов по умолчанию. Это позволяет вам установить владельца, дату начала, частоту повторных попыток конвейера и несколько других параметров:

В дополнение к параметрам вам нужно будет настроить конкретных операторов. В этом случае у нас есть две функции: jsonToCSV и csvToSql. Они будут использоваться в PythonOperator. Это позволяет вам создавать то, что мы называем задачами.

Чтобы убедиться, что задачи работают в разумном порядке, вам необходимо определить зависимости.

Вы можете определить зависимость, используя оператор битового сдвига. Для тех, кто не знаком с оператором битового сдвига, он выглядит как >> или <<.

В этом случае вы должны определить его как opr_json_to_csv >> opr_csv_to_sql.

Это гарантирует, что opr_json_to_csv запустится до opr_csv_to_sql.

По правде говоря, у вас будет дублироваться загрузка данных таким образом.

Чтобы иметь дело с дублирующимися данными, вы можете загрузить необработанный слой, а затем проверить, не загружаете ли вы дублирующиеся данные в более поздний промежуточный слой. Так что пока мы не будем об этом беспокоиться.

На этом вы, по сути, завершили свой первый конвейер.

Итак, где вы разместите свой конвейер?

Чтобы этот конвейер работал, вам необходимо сохранить его в папке airflow/dags, которую вы настроили. Если вы все еще не настроили его, воспользуйтесь нашим любимым руководством по настройке Airflow.

Как только этот конвейер будет сохранен - ​​и пока у вас есть Airflow, работающий в фоновом режиме, ваш DAG будет автоматически выбран Airflow.

Вы можете проверить это, перейдя на свой localhost: 8080, где по умолчанию работает панель управления Airflow.

Оттуда должен появиться ваш DAG.

Как только он появится, вы можете начать изучать свою DAG, чтобы убедиться, что все различные задачи настроены.

Это будет выглядеть как на картинке ниже:

Теперь ваш конвейер готов к работе.

Завершение первого конвейера данных

Поздравляем с созданием и автоматизацией вашего первого конвейера данных Airflow! Теперь вы можете взять эту структуру и использовать ее в других ваших ETL и конвейерах данных.

Это, конечно, только первый уровень платформы данных или хранилища данных. Отсюда вам все равно нужно будет создать производственный уровень, слой метрик и какой-то слой визуализации данных или уровень науки о данных.

Тогда вы действительно сможете начать оказывать влияние на свои данные.

Подпишитесь на нашу рассылку новостей

Прежде чем прокручивать дальше, почему бы не подписаться на информационный бюллетень нашей команды, чтобы быть в курсе событий в области науки о данных, инженерии данных и технологий! Подробнее здесь.