Когда мы создаем модели машинного обучения, мы не хотим обучать модель на плохих или поврежденных данных. Поэтому важно проверить его на здравомыслие. Примеры включают «total_rows› 0 » или« null count »определенного списка столбцов и т. Д.

При таком подходе мы будем искать простой метод, чтобы добавить его в наши рабочие процессы.

Давайте посмотрим на пример, мы генерируем данные в нашей таблице HIVE и хотим проверить, все ли значения для столбца не равны нулю.

-- It is a presto query
SELECT
   count_if(col_x is null) as null_count,
   count(1) as total
FROM
   db.some_table
WHERE
   dt = ‘{ds}’

В приведенном выше запросе, если null_count равно total, он должен завершиться ошибкой. Итак, самое простое, что мы можем сделать сейчас, - это изменить наш запрос, чтобы он возвращал True или False.

Истина - → Тестовый пример пройден (общее количество строк не равно нулевому количеству)

Ложь - → Тестовый пример завершился неудачно (общее количество строк равно нулевому количеству)

давайте изменим приведенный выше запрос:

-- It is a presto query
SELECT
   count_if(segments is null) != count(1)
FROM
   db.some_table
WHERE
   dt = ‘{ds}’

Это вернет Верно / Неверно.

Расширяя этот дизайн, мы могли бы написать функцию, которая может запускать эти тестовые запросы sql.

Давайте определим вариант использования и посмотрим, как мы можем его решить.

Цель: в Airflow перед запуском модели машинного обучения по расписанию, использующей таблицу db.some_data, выполните несколько базовых проверок работоспособности , если проверки не прошли, пропустить задание модели.

Итак, наш DAG воздушного потока будет выглядеть так.

Шаги рабочего процесса:

  1. Мы определим тестовые примеры как список объектов словаря.
  2. Мы передадим функцию sanity_check_func и словарь test_cases в ShortCircuitOperator.
  3. sanity_check_func будет запускать каждый тестовый пример один за другим как запрос Presto (Мы можем использовать любой движок), когда любой из запросов возвращает False, ShortCircutOperator пропустит последующие задачи.

Примеры тестовых случаев:

# Data Validation Test Cases
test_cases = [
{
  'name': 'data exists',
  'description': 'Check if the most recent partition has data',
  'sql': """ SELECT
                COUNT(1) > 0
             FROM
                 db.some_table
             WHERE
                 dt = '{ds}'
         """
}, 
{
  'name': 'Validate Col X',
  'description': 'At least 1000 SUM in last 3 days ',
  'sql': """ 
          SELECT
              SUM(col_x) >= 1000
          FROM
               db.some_table
          WHERE
              dt BETWEEN to_iso8601(date '{ds}' + interval '-3'
    day) AND '{ds}'
  """
}, 
{
  'name': 'All null value test',
  'description': 'Test if all the values are not null for given cols in recent partition',
  'sql': """
       SELECT
          count_if(col_x is null) != count(1),
          count_if(col_y is null) != count(1),
          count_if(col_z is null) != count(1)
       FROM
          db.some_table
       WHERE
          dt BETWEEN to_iso8601(date '{ds}' + interval '-1'
    day) AND '{ds}'
  """
}
]

Функция проверки работоспособности

ShortCircuitOperator Пример

Прочие ключевые моменты:

  1. Мы также можем добавить это в другие рабочие процессы ETL, когда данные будут созданы, выполните базовую проверку работоспособности.
  2. Мы также можем сохранить результат в mysql или другом БД и построить его с помощью любых инструментов бизнес-аналитики.
  3. Мы можем вызвать любую другую внешнюю систему, например, datadog, для регистрации события и создания предупреждений.