ОБНОВЛЕНИЕ: Grakn Labs больше не использует и не поддерживает Redisq.

Это сообщение в блоге посвящено решению, которое мы создали здесь, в Grakn Labs, для асинхронного выполнения задач. Мы подумали, что было бы неплохо выпустить его как общую библиотеку Java, построенную на Redis.

Задача выбора очереди задач

Мы обсудим варианты дизайна, лежащие в основе движка, в другом посте, но, мотивированные необходимостью упростить наше распространение, мы решили использовать Redis в качестве очереди задач.

Redis часто называют базой данных в памяти, но он предлагает хорошие гарантии устойчивости и отличные примитивы для использования в качестве основы для распределенной очереди сообщений. Под очередью сообщений мы подразумеваем то, что поддерживает примитив push, используемый для хранения сообщения издателем, и команду subscribe, которая определяет, как обрабатываются сообщения. Что-то похожее на SQS, Kafka или RabbitMQ. Нам также требовался способ уведомления издателя о том, что сообщение было использовано.

Мы посмотрели, есть ли что-то готовое для Java, и, что удивительно, нашли только библиотеку под названием Jesque. В Джеске нам не нравились некоторые вещи. К сожалению, его дизайн должен был быть совместим с Resque, и это сильно ограничивало то, что он мог предложить. Кроме того, было излишне сложно создавать потребителей, и в нем отсутствовали некоторые необходимые нам функции (уведомление издателя, метрики, обработка мертвых задач).

Итак, мы сделали новую библиотеку. Пойдите и посмотрите, это называется Redisq! Вот несколько вещей, которые он предоставляет:

  1. Он инструментирован с использованием моей любимой библиотеки Metrics
  2. У него есть тайм-ауты для задач, застрявших в очереди на полет, чтобы их можно было повторно обработать или зарегистрировать и отбросить после тайм-аута.
  3. Он регистрирует состояние задачи
  4. Потребители реализованы как потребители Java, могут быть определены как лямбда-выражения и выполняются асинхронно. Нет необходимости создавать экземпляры потоков
  5. Издатель может подписаться на определенное сообщение (мы называем их документами) или заблокировать его завершение.

Как это работает?

Когда создается экземпляр Redisq, он создает список Redis, используемый в качестве очереди. Каждый push работает следующим образом: он помещает идентификатор в очередь и устанавливает ключ с содержимым документа. Он также создает документ state, который отслеживает, когда документ был отправлен, и обрабатывается ли он или уже готов. Наконец, он создает канал для подписок на изменения состояния.

Когда подписчик запускается, один поток блокирует новый контент в очереди.
Когда в очереди появляется что-то новое, задача отправляется потребителю и выполняется асинхронно в заданном пуле потоков. Идентификатор атомарно перемещается в другую очередь на лету.

Другой поток обрабатывает очередь в полете и ищет документы, которые находились там слишком долго.

Использование Redisq

Пользоваться им очень просто.

На первом этапе, как показано в следующем фрагменте кода, вам необходимо создать сериализуемый объект, чтобы Redisq знал, как его хранить. Мы используем для этого Джексона. Нам также нужен метод, который можно использовать для извлечения идентификатора getIdAsString. Все, что попадает в очередь, требует идентификатора.

public class MyTask implements Document {
    @JsonProperty
    private String id;
    @JsonProperty
    private String content;
    // Needed by Jackson
    public MyTask() {}
    

    public MyTask(String id, String content) {
        this.id = id;
        this.content = content;
    }
    public String getContent() {
        return content;
    }
    @Override
    @JsonIgnore
    public String getIdAsString() {
        return id;
    }
}

Чтобы создать потребителя, нам нужно выбрать имя для очереди, определить класс документа и определить логику. В этом примере мы просто распечатываем контент по стандарту.

Pool<Jedis> jedisPool = new JedisPool();

Queue<MyTask> redisq = new RedisqBuilder<MyTask>()
                .setJedisPool(jedisPool)
                .setName("my_queue")
                .setConsumer((d) -> 
                   System.out.println("I'm consuming " 
                                      + d.getContent()))
                .setDocumentClass(MyTask.class)
                .createRedisq();
redisq.startConsumer();

Обратите внимание, что при запуске потребителя мы также запускаем поток процессора в полете, который периодически проверяет, не застряло ли что-нибудь в очереди.

Мы можем установить время блокировки выполнения определенной задачи с помощью RedisqBuilder :: setLockTime (Duration lockTime) и время, по истечении которого задача отбрасывается и блокируется RedisqBuilder :: setDiscardTime (Duration discardTime) . Другими словами, если потребитель умирает, а документ не удаляется из очереди на полет, процессор принимает его по истечении lockTime. Если документ находится там менее discardTime, он возвращается в основную очередь, в противном случае он отбрасывается.

Вот как вы помещаете что-то в очередь:

redisq.push(new MyTask("documentid", "content"));

Не забудьте закрыть очередь:

redisq.close();

Закрытие очереди завершает поток в полете, поток потребителя и закрывает пул потоков, который выполняет потребителей.

Писатель также может дождаться завершения задачи. Это достигается с помощью замечательной команды Redis SUBSCRIBE.

redisq.pushAndWait(new MyTask("id", "waitforme"), 5, TimeUnit.SECONDS);

Вы также можете подтолкнуть и обработать будущее с помощью Queue :: getFutureForDocumentStateWait! Не забудьте подписаться перед отправкой, чтобы не пропустить обновление состояния.

Заключение

Использование очередей в архитектуре микросервисов для доставки потока операций, которые необходимо выполнить, является очень распространенным решением. Мы создали библиотеку, чтобы максимально использовать технологию Redis, уже используемую в Grakn для хранения метаданных.

Redisq имеет очень минимальный, простой в использовании интерфейс и предоставляет некоторые дополнительные функции по сравнению с другими аналогичными решениями.

Загляните в репозиторий GitHub по адресу github.com/pluraliseseverythings/redisq, чтобы узнать, как включить Redisq в свой проект и приступить к работе! Если у вас есть какие-либо вопросы или комментарии, свяжитесь с нами, ниже, через наш канал сообщества Slack или через наш дискуссионный форум.

Если вам понравилась эта статья, нажмите кнопку хлопка ниже, чтобы ее могли найти и другие. Узнайте больше на https://grakn.ai.

* Схема создана с помощью https://sketchboard.me/

Изображение предоставлено: Queue от Newtown grafitti находится под лицензией CC 2.0