Распределение данных по ключу в Apache Flink, логическом или физическом операторе?

Согласно документации Apache Flink, преобразование KeyBy логически разбивает поток на непересекающиеся разделы. Все записи с одинаковым ключом относятся к одному разделу.

Является ли KeyBy 100% логическим преобразованием? Разве это не включает физическое разделение данных для распределения по узлам кластера? Если да, то как можно гарантировать, что все записи с одним и тем же ключом относятся к одному и тому же разделу?

Например, предположим, что мы получаем распределенный поток данных из кластера Apache Kafka из n узлов. Кластер Apache Flink, на котором выполняется наша потоковая работа, состоит из m узлов. Когда преобразование keyBy применяется к входящему потоку данных, как оно гарантирует логическое разделение данных? Или это связано с физическим разделением данных по узлам кластера?

Кажется, я запутался между логическим и физическим разделением данных.


person shaikh    schedule 05.10.2020    source источник


Ответы (1)


Пространство всех возможных ключей разделено на некоторое количество групп ключей. Количество групп ключей (которое совпадает с максимальным параллелизмом) - это параметр конфигурации, который вы можете установить при настройке кластера Flink; значение по умолчанию - 128.

Каждый ключ принадлежит ровно одной группе ключей. Когда кластер запускается, ключевые группы делятся между диспетчерами задач - и если кластер запускается с контрольной точки или точки сохранения, эти моментальные снимки индексируются по группе ключей, и каждый диспетчер задач загружает состояние ключей в ключе. группы он был назначен.

Во время выполнения задания каждому диспетчеру задач известны функции селектора ключей, используемые для вычисления ключей, и то, как ключи отображаются на группы ключей. TM также знают разделение ключевых групп для диспетчеров задач. Это упрощает маршрутизацию каждого сообщения диспетчеру задач, отвечающему за ключ этого сообщения.

Подробности:

Группа ключей, к которой принадлежит ключ, вычисляется примерно так:

Object key = the result of your KeySelector function;
int keyHash = key.hashCode();
int keyGroupId = MathUtils.murmurHash(keyHash) % maxParallelism;

Индекс экземпляра оператора, к которому должны быть перенаправлены элементы из данной группы ключей с учетом фактического параллелизма и maxParallelism, вычисляется как

keyGroupId * parallelism / maxParallelism

Фактический код находится в org.apache.flink.runtime.state.KeyGroupRangeAssignment, если вы хотите взглянуть.

Одним из основных выводов является то, что группы ключей не пересекаются и охватывают пространство ключей. Другими словами, невозможно получить ключ, не принадлежащий ни к одной из ключевых групп. Каждый ключ принадлежит ровно одной из ключевых групп, а каждая группа ключей принадлежит одному из диспетчеров задач.

person David Anderson    schedule 05.10.2020
comment
При маршрутизации сообщений к TM некоторые TM могут находиться на других физических вычислительных узлах. В этом случае сообщения необходимо отправить на другой узел. Верно? Разве это не физическое перемещение данных или, другими словами, физическое разделение данных? - person shaikh; 05.10.2020
comment
Да все верно. Логическое разделение keyBy может включать физическое разделение, но не может включать его. - person David Anderson; 05.10.2020
comment
Предназначено для того, чтобы сказать «... может или не может ...». - person David Anderson; 05.10.2020
comment
@DavidAnderson Я немного запутался. Когда кластер запущен, как диспетчеры задач узнают, какие ключи есть, чтобы им были назначены группы ключей? Кроме того, когда в потоке появляется новый ключ, который не был частью какой-либо группы ключей, создается ли новая группа ключей и назначается одной из TM? - person Harshith Bolar; 02.06.2021
comment
@HarshithBolar Я расширил свой ответ более подробной информацией. Надеюсь, это проясняет путаницу. - person David Anderson; 02.06.2021