Достигайте гармонии в сложных кластерах с помощью конечных автоматов

В наши дни распределенные системы широко используются в рабочих процессах программного обеспечения. Это системы, в которых серверы общаются с клиентом и друг с другом (обычно через протоколы сплетен и консенсуса). Распределенные системы помогают решить серьезную проблему, связанную с распределением общей задачи по нескольким серверам для повышения производительности и доступности.

Также необходимо обеспечить согласованность и отказоустойчивость в таких системах. В противном случае любая задача вернет выходные данные, являющиеся результатом моментального снимка данных в два разных момента времени, или неполный результат из-за сбоя какого-либо узла во время обработки.

Написать распределенную систему непросто

Если бы это было так же просто, как реализация вызовов API между серверами. Запуск и поддержка распределенной системы сама по себе непростая задача.
Как вы понимаете, написать новую гораздо сложнее.

Возьмите CassandraDB в качестве примера. Основная функция базы данных — обрабатывать запрос CQL, собирать данные с нескольких узлов и возвращать агрегированный ответ.

Однако в фоновом режиме вы должны правильно взаимодействовать между всеми узлами, используя Gossip, достигать консенсуса руководства через Paxos и обеспечивать его отказоустойчивость, правильно распределяя SSTables. Все это необходимо делать, поддерживая добавление и удаление существующих узлов, обеспечивая при этом ту же производительность запросов.

Что, если бы вы позволили распределенной системе отделить свою логику наблюдения от своей бизнес-логики? Таким образом, разработчики распределенной БД, подобной Cassandra, могут сосредоточиться только на части выполнения запросов, в то время как отказоустойчивость, репликация и назначение токенов позаботятся об этой новой сущности.

Войдите в Apache Helix

Apache Helix позволяет разработчикам представить сложную распределенную систему в виде простого конечного автомата. Затем все операции управления представляются в виде переходов в этом конечном автомате.

Это здорово, потому что общее мнение состоит в том, что операции кластера настолько отличаются от каждой системы к другой, что создание универсальной структуры невозможно. Используя Helix, разработчики могут сосредоточиться на основных функциях своих систем и позволить Helix позаботиться об управлении кластером.

Подождите, распределенная система — это конечный автомат?

Если вы не знаете о машинах состояний, то это модель, которая может представлять систему в фиксированном количестве состояний. Он также моделирует допустимые переходы из одного состояния в другое.

Возьмем пример популярной среды обработки событий в реальном времени Apache Kafka. Кластер Kafka состоит из нескольких тем. Каждая тема состоит из нескольких разделов.

Для тематического раздела Apache Kafka необходимо обеспечить следующее:

  • Всегда есть один лидер. Лидер — единственный, кто может принимать записи.
  • Существует N реплик раздела, где пользователь может выбрать N.
  • Реплики должны быть равномерно распределены между M серверными узлами для отказоустойчивой установки.
  • Только реплики, синхронизированные с лидером, могут быть повышены до лидера в случае сбоя.

Helix может позаботиться обо всем этом с помощью следующего конечного автомата:

Это позволяет системам, подобным Kafka, сосредоточиться только на чтении/записи данных из журналов разделов и других административных операциях, таких как добавление/удаление тем, управление смещениями и так далее.

Возьмем другой пример Апач Пино.

Pinot имеет несколько таблиц, каждая из которых состоит из небольших файлов сегментов. Аналогией может быть одна таблица Cassandra, состоящая из небольших файлов SSTable.

Для определенного сегмента Pinot имеет следующие ограничения:

  • Должно быть N реплик сегмента, где пользователь может настроить N
  • Для каждой таблицы может быть только один потребляющий сегмент. Все реплики этого сегмента также должны находиться в состоянии потребления.
  • Остальная часть сегмента должна быть в онлайн-состоянии, то есть запросы готовы к обслуживанию.
  • В случае добавления M новых узлов новые сегменты также должны быть равномерно распределены по этим узлам.
  • Сегмент и реплики следует удалять, если пользователи достигают или вручную удаляют срок хранения.

Это можно представить с помощью следующего конечного автомата:

Итак, где Helix входит в картину?

Пользователь загрузит эти конечные автоматы (в форме конфигурации) в Helix. Затем Helix отслеживает все узлы в кластере с помощью встроенного мониторинга.

Он также отслеживает состояние каждого раздела на каждом узле. Затем Helix создает карту идеального состояния каждого раздела. Если фактическое и идеальное состояние раздела не совпадают, Helix поставит в очередь переходы между состояниями.

Однако привлекательность Helix обусловлена ​​следующим:

  • Он может выполнять несколько переходов между состояниями параллельно.
  • Позаботьтесь о координации между несколькими серверами. Например, если сервер B назначен ведущим вместо сервера A, удалите сервер B из лидерства перед добавлением сервера A.
  • Он может определять порядок переходов между состояниями, даже если они выполняются параллельно, чтобы не нарушать ограничения. Например, обновление нового сервера без данных до лидера ИЛИ назначение некоторого раздела реплики узлу, у которого также есть лидер.
  • Определите приоритет переходов (который можно изменить с помощью конфигурации). Например, если лидер выходит из строя одновременно с добавлением в кластер новых узлов, переход из состояния реплики в состояние лидера должен иметь более высокий приоритет, чем переход из состояния простоя в состояние реплики.
  • Измените идеальное состояние на основе мониторинга кластера. Например, куда переместить разделы, находящиеся на отказавшем узле, если узел выйдет из строя?

Вот как конечный автомат для Kafka можно реализовать с помощью Helix:

    StateModelDefinition.Builder builder = new StateModelDefinition.Builder("your-state-model");
    builder.initialState("IDLE");

    builder.addState("IDLE");
    builder.addState("REPLICA");
    builder.addState("LEADER");
    // Set the initial state when the node starts

    // Add transitions between the states.
    builder.addTransition("IDLE", "REPLICA");
    builder.addTransition("REPLICA", "LEADER");
    builder.addTransition("REPLICA", "IDLE");
    builder.addTransition("LEADER", "REPLICA");
    
    // Set constraint that there can only be 1 leader at a time
    builder.dynamicUpperBound("LEADER", "1");

    // Set transition priority
    // the first inserted gets the top most priority.
    List<String> stateTransitionPriorityList = new ArrayList<String>();

    stateTransitionPriorityList.add("REPLICA-LEADER");
    stateTransitionPriorityList.add("IDLE-REPLICA");
    stateTransitionPriorityList.add("REPLICA-IDLE");
    stateTransitionPriorityList.add("LEADER-REPLICA");

    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
        stateTransitionPriorityList);
    

Назначение состояния каждому разделу

Внутри Helix использует модифицированную версию алгоритма RUSH для определения конечного состояния, а затем использует жадный подход для определения действительных транзакций и их планирования. Под жадным подходом я подразумеваю, что транзакции добавляются в список ожидающих транзакций до тех пор, пока мы не получим две транзакции, которые нельзя выполнять параллельно.

Helix также позволяет пользователям подключать любую другую стратегию размещения, например, ConsistentHashing, в зависимости от варианта использования.

Как Helix не зависит от базовой системной логики?

Он должен знать, как справляться с перемещением данных между серверами, как читать данные в каждом разделе и так далее, верно?

Ну, я никогда не говорил, что Helix выполняет переход состояния. Helix выдает запрос с переходом, который необходимо выполнить, на сервер, который должен его выполнить. В настоящее время это делается с использованием Apache Zookeeper в качестве очереди сообщений.

// Helix leader, running on a controller node
state.setPartitionState(partitionName, serverName, "ONLINE");

// Helix Listener, running inside each server
@Transition(from = "OFFLINE", to = "ONLINE")
public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
  String partitionName = message.getPartitionName();
  // Logic to add partition
  // Create partition files
  // add partition to queryable list
}

На каждом сервере есть слушатель, который отслеживает новые сообщения в очереди. У слушателя есть бизнес-логика того, что делать, когда его просят перейти из состояния A в состояние B. Например, если ему нужно скопировать данные из S3 для начальной загрузки или начать принимать записи, поскольку он был повышен до лидера.

Серверу не нужно заботиться о том, что делают другие серверы, например, кто лидер, есть ли на каком-то другом сервере такие же данные и так далее. Он просто выполняет то, что его просят, а затем обновляет фактическое состояние раздела. Если он не может выполнить задачу, он возвращается к ошибке, и Helix может предпринять соответствующие последующие действия.

Заключение

Хотя Apache Helix может помочь вам в удивительной степени упростить управление кластером, в нем по-прежнему отсутствуют некоторые функции:

  • Helix не может учитывать разнородные серверы при расчете конечного состояния. Например, вы сможете запланировать в два раза больше разделов на узле, который в 2 раза больше, чем остальные узлы.
  • Helix не может правильно учитывать репликацию в кластере, распределенном по нескольким зонам доступности. В таких случаях необходимо убедиться, что реплики присутствуют в разных зонах доступности.

Вы можете использовать следующие ссылки, чтобы узнать больше о Helix: