Создание потока данных

В посте №6 мы рассмотрели, как может выглядеть конвейерный контроль поверх облачной операционной среды, такой как Kubernetes, на уровне процессно-ориентированных контейнеров. Но, вероятно, наиболее важной особенностью конвейера является обработка некоторых входных данных, приводящая к предсказуемым и контролируемым выходам. В этом посте мы спрашиваем, как мы получаем данные в конвейер и из него? Говорим ли мы о контейнерном коде для выполнения или о вводе-выводе, который должен вводиться из источника, необходимо учитывать два аспекта: 1) Каково взаимодействие с пользователем? и 2) Что на самом деле происходит за кулисами? На протяжении нескольких этапов мы хотим, чтобы то, что получилось, было передано на следующий этап, но обычно только в том случае, если оно соответствует определенным критериям качества.

Какой бы конвейер вы ни использовали, артефакты процесса создаются, а затем передаются поэтапно. Если входные данные представляют собой бесконечно малые изменения, такие как отдельные события в потоке, выход может быть таким же простым, как одно обновление базы данных. Если нужно скомпилировать новую версию программного обеспечения, результатом может быть новый пакет, готовый к употреблению.

В поисках подходящего момента

На практике, вероятно, будет пакет агрегированных изменений, обрабатываемых вместе, для экономии на масштабе. Конвейер никогда не выводит что-либо быстрее, чем поступающие данные, и обычно медленнее, потому что, хотя наш инстинкт состоит в том, чтобы все было как можно быстрее, некоторые результаты могут быть вычислены только по истечении определенного времени. Например, возьмем случай трубопроводов непрерывной доставки:

  • Нет необходимости фиксировать каждое отдельное изменение в системе контроля версий.
  • Нет необходимости перестраивать каждое зафиксированное изменение.
  • Нет необходимости развертывать каждую реконструкцию.

В каждом случае мы ждем, пока накопится достаточно связанных изменений и переупаковываются в новый тип «единицы», прежде чем приступить к обработке, например, заполнению ящиков, контейнеров или даже блоков в цепочке блоков (см. Рисунок ниже). Что бы мы ни выбрали в подходящее время для перехода к следующему этапу, это политическое решение. Эффективность всегда будет в пользу пакетной обработки. Даже для захвата необработанных данных, когда мы представляем захват каждого события из некоторого источника, часто требуется какая-то фильтрация или выборка, которая добавляет эффективную стадию агрегации.

Это как раз история пакетирования в компьютерных сетях, применяемая только к данным высокого уровня, а не к данным низкого уровня. Это должно сказать нам, что нет принципиальной разницы между потоком и пакетным пакетным процессом. Что имеет значение, так это то, как вы анализируете данные на каждом конце.

Размеры партий важны по нескольким причинам. Если грубый размер данных слишком мал, мы растрачиваем ресурсы и судим нестабильность; если он слишком велик, важные изменения могут быть пропущены или внесены слишком поздно.

Это особенно относится к машинному обучению, где слишком много обучения деталям может привести к тому, что сеть будет распознавать исключительно то, чему она была обучена (с небольшой способностью к обобщениям), в то время как недостаточное обучение может привести к пропуску важных случаев.

Наука о данных - это не точная наука!

Обработка на краю

Размер пакетов данных - одна из проблем. Другая, даже более важная проблема - это в первую очередь передача данных в конвейер. Сбор данных - это нормально для некоторых приложений, но будущее конвейерной обработки данных - это обработка на границе сети, где данные-кандидаты могут быть выбраны немедленно, чтобы избежать ненужной и дорогостоящей транспортировки. Это означает обработку в самом источнике данных. Это особенно актуально для Интернета вещей и в интеллектуальных средах, где данные обычно поступают от датчиков и локальных процессов.

Объем данных, которые можно собирать в этих сценариях, слишком велик для некритичной передачи по сети в облачный центр обработки данных. Однако с некоторой обработкой краев, фильтрацией и пакетированием он может быть правдоподобным из самого источника. Чем больше данных происходит перед транспортировкой, тем эффективнее может быть обработка. Это накладывает некоторые требования на управление инфраструктурой. Управление ресурсами за пределами единого центра обработки данных - гораздо большая головная боль, включающая в себя тонкие проблемы синхронизации и сложности координации, к которым мы вернемся позже.

Проблемы масштабирования входных данных означают, что на каждом этапе необходимо принимать решения о принятии данных:

  • Принимает ли получатель данные, предлагаемые удаленным источником (в нисходящем направлении).
  • Принимает ли источник запрос данных от получателя (восходящий поток).
  • Являются ли данные из источника подходящими или приемлемыми (семантическая проверка).

Решение принять данные в конвейер может быть связано с эффективностью, но также с целостностью и даже самозащитой (например, все службы должны защищать от DDOS-атак и злонамеренных действий). Безопасность - это серьезная проблема, даже для приложения, которое традиционно не считается риском. Есть разные способы справиться с этим, используя обработку на основе push и pull. Оптимальный выбор зависит от временных рамок источника данных и топологии конвейера. Эти решения иногда принимаются как должное, но это то, что мы не можем себе позволить в распределенной системе. По масштабу все сложнее.

Языковые барьеры

Kubernetes может обеспечить несколько базовых средств защиты от злонамеренной отправки между внутренней и внешней сетями кластера контейнеров, но если сервисы развернуты на обширных территориях, даже в разных облаках, то это нужно обрабатывать явным образом.

Более очевидный критерий для принятия данных - это то, что мы должны понимать, какое сообщение передают события данных. Этапы конвейера передают данные в различных форматах. Отправленные и полученные данные типов должны быть охвачены стандартным lingua franca, чтобы их можно было понять (что-то вроде буфера протокола или стандартного формата данных, возможно, с символом GraphQL -подобный выбор предлагаемых типов). Также предполагается, что каждому этапу конвейера предоставляется доступ к предлагаемым данным. Помните, что в распределенном кластере нет автоматического права на чтение данных, созданных отдельным процессом.

Подводя итог, мы можем думать о конвейерах как о серии ребер, на которых происходит обработка, чтобы принимать входные данные и преобразовывать их. Где на самом деле начинается труба? Это вопросы, которые будут много значить в будущем ...

Техническое задание

За кулисами протоколы и программное обеспечение позаботятся о реализации передачи сообщений. Между тем, архитекторам рабочих процессов необходимо выразить свое намерение соединить компоненты в конвейер в соответствии с какой-то политикой. Есть разные подходы к этому. Возможно, наиболее чистым является то, что мы сделали бы в случае математической обработки, когда затронутые величины представляют собой простые массивы чисел. Обозначения понятны каждому программисту:

output = stage_function(input),

… Можно использовать последовательно на каждом из этапов. Имеет ли эта нотация смысл или нет в других случаях (или даже реализуема), например в сборках с непрерывной доставкой еще предстоит обсудить.

Другой подход - использовать простые спецификации данных, такие как YAML или JSON. На самом деле это не языки, это форматы данных. Просить пользователей «перепрыгивать», чтобы упростить вход для разработчиков, не удобен для пользователя, но, тем не менее, стал популярной нормой.

Pipeline:
   Name : my_pipe
   Input: /path/file1
   Output: /path/file2

Давайте теперь рассмотрим несколько примеров такого взаимодействия с пользователем в приложениях для обработки данных.

Google TensorFlow

Очень популярная библиотека машинного обучения Google с интерфейсом Python, возможно, является самой простой последовательностью обработки данных для понимания с точки зрения программирования. Отчасти это связано с тем, что представления данных четко определены с самого начала, тогда как в других случаях это не так. В программе TensorFlow есть вызовы, которые выглядят примерно так (пример TensorFlow):

import tensorflow as tf
tf.enable_eager_execution()
tfe = tf.contrib.eager
x = tfe.Variable(np.random.randn())
# . . .
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
grad = tfe.implicit_gradients(mean_square_fn)
for step in range(num_steps):
 optimizer.apply_gradients(grad(linear_regression, train_X, train_Y))

В этом подходе мы передаем данные по ссылке на переменную. Ссылки указывают на огромные тензорные массивы, хранящиеся неопределенным образом. Но для всех намерений и целей это выглядит как простое вычисление на одной машине. Мы видим выбор политики (стремление к выполнению) наряду с побочными преобразованиями и очевидным назначением данных. Код довольно чистый и минимальный, с хорошо скрытыми деталями. Это возможно только потому, что исходная проблема достаточно хорошо определена.

Apache Airflow

Для сравнения: Airflow - это инструмент для реализации конвейеров, управляемых задачами. Он использует самоуверенное описание интерфейса Python (исключительно) для построения ориентированного ациклического графа (DAG), который, в свою очередь, представляет процесс. В этом примере используются только некоторые фиктивные команды оболочки:

dag = DAG(‘tutorial’, default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
 task_id=’print_date’,
 bash_command=’date’,
 dag=dag)
t2 = BashOperator(
 task_id=’sleep’,
 bash_command=’sleep 5',
 retries=3,
 dag=dag)
templated_command = “””
 {% for i in range(5) %}
 echo “{{ ds }}”
 echo “{{ macros.ds_add(ds, 7)}}”
 echo “{{ params.my_param }}”
 {% endfor %}
“””
t3 = BashOperator(
 task_id=’templated’,
 bash_command=templated_command,
 params={‘my_param’: ‘Parameter I passed in’},
 dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)

Явной передачи данных нет, только детали выполнения. На самом деле обработка данных полностью внеполосна, что делает ее загадочной: есть только частично упорядоченный набор заданий, которые нужно выполнить. Код здесь непрозрачен и путает команды оболочки с упаковкой конвейера и контекстными переменными. Отношения приоритета группы доступности базы данных должны быть добавлены явно. Это мало чем отличается от примера CFEngine в сообщении №4, который не был специально разработан для упрощения конвейеров, поэтому он стал неуклюжим. Это будет проблемой с дизайном, если он явно не предназначен для конвейерной обработки, но Airflow есть, поэтому мы все равно можем добиться большего.

Поскольку порядок Python уже является буквальным, мы можем задаться вопросом, зачем нужен явный порядок зависимостей. Дело в том, что язык является неявно последовательным по дизайну, поэтому он будет пытаться сериализовать все без необходимости: путем определения зависимостей исполнитель может иметь возможность распараллелить некоторые части конвейера. Так что мы должны положительно отнестись к этому требованию!

Ko

Запуск поверх Kubernetes - цель многих. Kubernetes может упростить ряд проблем, но без автоматизации это головная боль с множеством движущихся частей. В Aljabr мы разработали Ko как наш собственный язык сценариев, написанный на Go и адаптированный к задаче написания сценариев Kubernetes. Мы можем сравнить предыдущие примеры с конвейером Ko. Рассмотрим пример:

pipe(d1, d2) {
 g1: G1(d1: d1, d2: d2)
g2: G2(g1)
g3: G3(g2)
 return: Sum(left: g1, right: g3)
}

В приведенном выше примере определяется объект, подобный функции, с двумя входными параметрами. Строки выполняются параллельно, если последующие строки не зависят друг от друга, и в этом случае Ко организует сериализацию. Распознавание типов обрабатывается логическим выводом, чтобы свести к минимуму бремя формализма. Рекурсия новых функциональных объектов разрешена, и возвращаемое значение явно декларируется. Результат прост и интуитивно понятен для программиста, хотя может не подходить для других специалистов по науке о данных в отрасли.

Ко является скелетом по своей форме, выражая только отношения голых костей за чистую монету. Существуют параметры для передачи данных предыдущего этапа в «функции», которые обрабатывают их и возвращают результаты, а типы данных могут использоваться для сокрытия неявных деталей взаимосвязей данных. Здесь нет явных деталей поведения, относящихся к облачным экземплярам или службам. Это означает, что они должны быть установлены внеполосно как часть политики, поэтому простота частично иллюзорна, хотя и желательна. Мы рассмотрим это более подробно в следующих статьях.

Куда пойти отсюда?

В каждом из приведенных выше примеров ясность выражения процесса, очевидно, окрашена собственным синтаксисом языков, используемых для встраивания описаний. В этом есть свои преимущества и недостатки. Нетрудно заметить, что существует множество конкретных вопросов, связанных с проектированием трубопроводов: как их соединить, как определить, как сделать их безопасными. Но есть и культурные проблемы. В следующем посте мы сделаем несколько предложений!