Когда мы создаем модели машинного обучения, мы не хотим обучать модель на плохих или поврежденных данных. Поэтому важно проверить его на здравомыслие. Примеры включают «total_rows› 0 » или« null count »определенного списка столбцов и т. Д.
При таком подходе мы будем искать простой метод, чтобы добавить его в наши рабочие процессы.
Давайте посмотрим на пример, мы генерируем данные в нашей таблице HIVE и хотим проверить, все ли значения для столбца не равны нулю.
-- It is a presto query SELECT count_if(col_x is null) as null_count, count(1) as total FROM db.some_table WHERE dt = ‘{ds}’
В приведенном выше запросе, если null_count
равно total
, он должен завершиться ошибкой. Итак, самое простое, что мы можем сделать сейчас, - это изменить наш запрос, чтобы он возвращал True
или False
.
Истина - → Тестовый пример пройден (общее количество строк не равно нулевому количеству)
Ложь - → Тестовый пример завершился неудачно (общее количество строк равно нулевому количеству)
давайте изменим приведенный выше запрос:
-- It is a presto query SELECT count_if(segments is null) != count(1) FROM db.some_table WHERE dt = ‘{ds}’
Это вернет Верно / Неверно.
Расширяя этот дизайн, мы могли бы написать функцию, которая может запускать эти тестовые запросы sql.
Давайте определим вариант использования и посмотрим, как мы можем его решить.
Цель: в Airflow перед запуском модели машинного обучения по расписанию, использующей таблицу db.some_data
, выполните несколько базовых проверок работоспособности , если проверки не прошли, пропустить задание модели.
Итак, наш DAG воздушного потока будет выглядеть так.
Шаги рабочего процесса:
- Мы определим тестовые примеры как список объектов словаря.
- Мы передадим функцию sanity_check_func и словарь test_cases в ShortCircuitOperator.
- sanity_check_func будет запускать каждый тестовый пример один за другим как запрос Presto (Мы можем использовать любой движок), когда любой из запросов возвращает False, ShortCircutOperator пропустит последующие задачи.
Примеры тестовых случаев:
# Data Validation Test Cases test_cases = [ { 'name': 'data exists', 'description': 'Check if the most recent partition has data', 'sql': """ SELECT COUNT(1) > 0 FROM db.some_table WHERE dt = '{ds}' """ }, { 'name': 'Validate Col X', 'description': 'At least 1000 SUM in last 3 days ', 'sql': """ SELECT SUM(col_x) >= 1000 FROM db.some_table WHERE dt BETWEEN to_iso8601(date '{ds}' + interval '-3' day) AND '{ds}' """ }, { 'name': 'All null value test', 'description': 'Test if all the values are not null for given cols in recent partition', 'sql': """ SELECT count_if(col_x is null) != count(1), count_if(col_y is null) != count(1), count_if(col_z is null) != count(1) FROM db.some_table WHERE dt BETWEEN to_iso8601(date '{ds}' + interval '-1' day) AND '{ds}' """ } ]
Функция проверки работоспособности
ShortCircuitOperator Пример
Прочие ключевые моменты:
- Мы также можем добавить это в другие рабочие процессы ETL, когда данные будут созданы, выполните базовую проверку работоспособности.
- Мы также можем сохранить результат в mysql или другом БД и построить его с помощью любых инструментов бизнес-аналитики.
- Мы можем вызвать любую другую внешнюю систему, например, datadog, для регистрации события и создания предупреждений.