Пространство всех возможных ключей разделено на некоторое количество групп ключей. Количество групп ключей (которое совпадает с максимальным параллелизмом) - это параметр конфигурации, который вы можете установить при настройке кластера Flink; значение по умолчанию - 128.
Каждый ключ принадлежит ровно одной группе ключей. Когда кластер запускается, ключевые группы делятся между диспетчерами задач - и если кластер запускается с контрольной точки или точки сохранения, эти моментальные снимки индексируются по группе ключей, и каждый диспетчер задач загружает состояние ключей в ключе. группы он был назначен.
Во время выполнения задания каждому диспетчеру задач известны функции селектора ключей, используемые для вычисления ключей, и то, как ключи отображаются на группы ключей. TM также знают разделение ключевых групп для диспетчеров задач. Это упрощает маршрутизацию каждого сообщения диспетчеру задач, отвечающему за ключ этого сообщения.
Подробности:
Группа ключей, к которой принадлежит ключ, вычисляется примерно так:
Object key = the result of your KeySelector function;
int keyHash = key.hashCode();
int keyGroupId = MathUtils.murmurHash(keyHash) % maxParallelism;
Индекс экземпляра оператора, к которому должны быть перенаправлены элементы из данной группы ключей с учетом фактического параллелизма и maxParallelism, вычисляется как
keyGroupId * parallelism / maxParallelism
Фактический код находится в org.apache.flink.runtime.state.KeyGroupRangeAssignment, если вы хотите взглянуть.
Одним из основных выводов является то, что группы ключей не пересекаются и охватывают пространство ключей. Другими словами, невозможно получить ключ, не принадлежащий ни к одной из ключевых групп. Каждый ключ принадлежит ровно одной из ключевых групп, а каждая группа ключей принадлежит одному из диспетчеров задач.
person
David Anderson
schedule
05.10.2020