Обнаружение и восстановление после удаления учетных данных в Spring AMQP

У нас есть настройка Spring Cloud Config с использованием серверных частей базы данных Vault (MySQL и RabbitMQ), что дает нам возможность вводить сгенерированные учетные данные в такие свойства, как:

  • spring.rabbitmq.username
  • spring.rabbitmq.password

Когда наше приложение запускается, у нас есть новый набор учетных данных Rabbit, и у нас есть возможность запросить новый набор по запросу.

Поскольку наши учетные данные Rabbit управляются Vault извне, срок их действия может истечь/удалить в любое время в течение жизни приложения (это также сценарий проверки устойчивости).

Мой вопрос в том, как мы можем (эффективно, надежно):

  • Обнаружение истечения срока действия сгенерированных учетных данных
  • Обновите наш существующий Spring AMQP CachingConnectionFactory новыми учетными данными.

Мы работаем, исходя из того, что это должно обрабатываться исключительно на стороне клиента в целях обеспечения устойчивости, даже если сервер желает или может отправлять уведомления об истечении срока действия.

С чем мы боремся, так это с тем, как определить истечение срока действия учетных данных, чтобы мы могли затем перенастроить файл CachingConnectionFactory.

Возможности включают в себя:

  1. Что у нас есть сейчас: ChannelListener, который создает список всех вновь созданных Channel и пытается создать/удалить анонимный Queue каждые x секунд, прослушивая любые ShutdownSignalException через ShutdownListener, которые могут иметь код состояния 403. Кажется, что это работает, но немного сложно, и мы видели проблемы параллелизма, делающие что-то нетривиальное в обработчике выключения.
  2. Как-нибудь подключитесь к CachingConnectionFactory. Мы пытались работать с клоном класса, но, помимо сложности этого, мы просто получили RESOURCE_LOCKED ошибок, создающих очереди.
  3. Что-то более простое и легкое, например. просто опрашивайте брокера каждые x секунд, чтобы убедиться, что текущие учетные данные все еще существуют.

Частично проблема заключается в том, что ACCESS_REFUSED — то, что вы получаете, когда CachingConnectionFactory пытается работать с удаленными учетными данными, — обычно рассматривается как фатальная ошибка неправильной настройки, а не как часть какого-либо реального рабочего процесса, или это может быть восстановлено.

Есть ли здесь изящное решение?


Использование: Spring Boot 1.5.10-RELEASE, Spring Cloud Dalston SR4


Обновление:

На стороне RabbitTemplate не генерируется никаких исключений — с RetryTemplate или без него — даже когда CachingConnectionFactory правильно определяет ACCESS_REFUSED для обмена, на который я отправляю.

Конфигурация:

spring
  rabbitmq:
    host: rabbitmq.service.consul
    port: 5672
    virtualHost: /
    template:
      retry:
        enabled: true

Код:

@Autowired private RabbitTemplate rt;  // From RabbitAutoConfiguration

@Bean
public DirectExchange emailExchange() {
    return new DirectExchange("email");
}

public void sendEmail() {
    this.rt.send("email", "email.send", "test payload");
}

Приложение запускается, объявляя обмен email. Пользовательский интерфейс RabbitMQ показывает моего (сгенерированного) пользователя и подключение к бирже, что нормально при запуске. Затем я моделирую истечение срока действия учетных данных, вручную удаляя этого пользователя с помощью пользовательского интерфейса, прежде чем запускать локальный тест для вызова электронной почты sendEmail() выше.

В результате вызова RabbitTemplate не создаются исключения или не регистрируются ошибки, но регистрируется следующая (ожидаемая) ошибка:

[Соединение AMQP 127.0.0.1:5672] ОШИБКА o.s.a.r.c.CachingConnectionFactory — отключение канала: ошибка канала; метод протокола: #method(reply-code=403, answer-text=ACCESS_REFUSED - доступ к обмену 'электронной почтой' в vhost '/' запрещен для пользователя 'cert-configserver-75c3ae60-da76-3058-e7df-a7b90ef72171', class- идентификатор = 60, идентификатор метода = 40)

Если не считать проверки учетных данных перед всеми вызовами RabbitTemplate.send(), я хотел бы знать, есть ли способ перехватить ошибку ACCESS_REFUSED во время отправки, чтобы я мог обновить учетные данные, как я это делаю для слушателей, и дать RetryTemplate возможность повторить попытку.


person Andrew Regan    schedule 07.03.2018    source источник


Ответы (2)


Для такого сценария контейнер прослушивателя выдает ListenerContainerConsumerFailedEvent. Вы можете прослушать это, проверить его reason и исключение и решить stop() контейнер и сделать что-то еще, что вам нужно. Затем start() снова использовать брокера с новыми учетными данными.

На стороне RabbitTemplate нужно просто try...catch вызвать и проанализировать исключение по той же причине.

Это не то, что я пробовал до сих пор, но это мое лучшее ощущение, как справиться с состоянием ACCESS_REFUSED. Вы действительно ничего не можете сделать по этому вопросу с точки зрения CachingConnectionFactory.

ОБНОВЛЕНИЕ

Мое приложение выглядит так:

spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1ms
logging.level.org.springframework.retry=DEBUG

@SpringBootApplication
public class So49155945Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(So49155945Application.class, args);
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        try {
            rabbitTemplate.convertAndSend("foo", "foo");
        }
        catch (AmqpException e) {
            System.err.println("Error during sending: " + e.getCause().getCause().getMessage());
        }
    }

}

И это то, что у меня есть в консоли, когда я запускаю это приложение для этого несуществующего пользователя:

Error during sending: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

ОБНОВЛЕНИЕ 2

Что я нашел также, как мы можем сделать этот реквизит:

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.template.mandatory=true

Затем добавьте rabbitTemplate.setConfirmCallback(), и наше отклоненное сообщение для асинхронной отправки будет отклонено. Однако это по-прежнему асинхронный обратный вызов, аналогичный упомянутому ChannelListener. С точки зрения Spring AMQP действительно нечего делать. Все является асинхронной природой протокола AMQP и может действительно нуждаться в каком-то «быстром отказе» из клиентской библиотеки Rabbit.

Пожалуйста, задайте такой вопрос в группе rabbitmq-users Google. Это место, где тусуются инженеры RabbitMQ.

ОБНОВЛЕНИЕ 3

В качестве решения для таких событий на Брокере может использоваться Плагин обмена событиями. Конкретные события user.deleted или user.password.changed генерируются Брокером.

person Artem Bilan    schedule 07.03.2018
comment
На стороне RabbitTemplate внутри RetryTemplate фактически не создается никаких исключений. Я вижу, как CachingConnectionFactory регистрирует ACCESS_REFUSED, но отправка просто продолжается без повторной попытки. - person Andrew Regan; 12.03.2018
comment
Точно такой же, судя по всему. - person Andrew Regan; 12.03.2018
comment
Любой простой код для воспроизведения? Что я должен сделать с точки зрения RabbitTemplate, чтобы наблюдать подобное поведение и, возможно, увидеть, что ACCESS_REFUSED регистрирует этот вопрос? - person Artem Bilan; 12.03.2018
comment
Я добавил ОБНОВЛЕНИЕ к своему ответу. К сожалению, это работает, как я за исключением. А также я вижу все эти 3 повторные попытки - person Artem Bilan; 12.03.2018
comment
Разница в том, что в вашем примере RT connectionFactory.createConnection() должен создать новое подключение, и сразу же происходит сбой с AuthenticationFailureException. В моем случае CachingConnectionFactory извлекает кэшированный файл и не проверяет его, поэтому мы получаем основной сбой, но не исключение. - person Andrew Regan; 13.03.2018
comment
И поскольку я реагирую, а не упреждающе обновляю ConnFactory, мой код не знает до отправки RabbitTemplate, что ConnFactory нуждается в этом обновлении. Поэтому я не знаю заранее и не могу перехватить вышеуказанное исключение на уровне RT. Похоже, мне нужно заменить/переопределить либо RT, либо CachingConnFactory... - person Andrew Regan; 13.03.2018
comment
Теперь я понимаю, что вы имеете в виду. Спасибо. Расследование и обсуждение с Гэри Расселом. Что я могу предложить, так это CachingConnectionFactory.addChannelListener(ChannelListener) и справиться с этим onShutDown(), что происходит в этом случае. Однако я согласен, что для блокирующей операции send() это все равно не годится: мы не знаем, что произошло, и можем потерять данные. - person Artem Bilan; 13.03.2018
comment
Пожалуйста, смотрите UPDATE 2 в моем ответе. - person Artem Bilan; 13.03.2018
comment
Re: ListenerContainerConsumerFailedEvent, наверняка это будет выброшено только тогда, когда контейнер должен обработать событие, но у него возникнет проблема? Я застрял, потому что пользователь контейнера удален, но я не могу обнаружить его, пока он не будет использован, а не активно, например. через какой-то пинг или сердцебиение в соединении или каналах. - person Andrew Regan; 19.03.2018
comment
Вы можете использовать com.rabbitmq.http.client.Client.getUser(String username) для проверки связи, есть пользователь или нет: github.com/rabbitmq/hop - person Artem Bilan; 19.03.2018
comment
Я надеялся, что мы сможем пропинговать пользователя, и Хоп это делает — спасибо. Единственная проблема заключается в том, что нам придется вводить долгоживущие учетные данные администратора в отдельные службы для выполнения проверок на стороне клиента, что, вероятно, будет запрещено. В качестве альтернативы сервер Cloud Config может использовать Hop для запланированных проверок и отправки удаленных обновлений. Однако оба они далеки от исходного решения! - person Andrew Regan; 19.03.2018
comment
М-м-м. Вы знаете, похоже, что это может вам помочь: rabbitmq.com/event-exchange.html - person Artem Bilan; 19.03.2018
comment
Ага, это похоже на идеальное решение. Я попробую! - person Andrew Regan; 19.03.2018
comment
Ага! Я только что открыл его для себя сегодня утром! :-) - person Artem Bilan; 19.03.2018
comment
Спасибо за вашу помощь в этом - Event Exchange работал очень хорошо и сэкономил много кода! - person Andrew Regan; 20.03.2018
comment
Я бы не добился этого без вашей помощи, но я опубликовал новый отдельный ответ, который суммирует все, что мне нужно было сделать, что сильно отличается от того, с чего мы начали. TBH Я действительно не знаю, почему я не мог решить это полностью в коде, но, надеюсь, подход плагина поможет другим. - person Andrew Regan; 20.03.2018

После долгих экспериментов и отладки я принял предложение Артема Билана и принял Плагин обмена событиями RabbitMQ.

Так что теперь, вместо того, чтобы пытаться отслеживать события ShutdownSignalException и ListenerContainerConsumerFailedEvent в коде Spring и Rabbit, между SimpleMessageListenerContainer, с одной стороны, и RabbitTemplate, с другой, я просто подписываюсь на обмен и позволяю своему новому @RabbitListener уведомлять меня о проблемах с учетными данными. Это без каких-либо других движущихся частей или объявлений bean-компонентов, без каких-либо проблем с синхронизацией или заблокированных потоков, и, как правило, следует за потоком автоконфигурации, а не борется с ним.

Все, что мне сейчас нужно, это:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.cloud.endpoint.RefreshEndpoint;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import static org.springframework.amqp.core.ExchangeTypes.TOPIC;

@Component
public class ReuathenticationListener {

    private static Logger log = LoggerFactory.getLogger(ReuathenticationListener.class);

    @Autowired private RabbitProperties rabbitProperties;
    @Autowired private RefreshEndpoint refreshEndpoint;
    @Autowired private CachingConnectionFactory connectionFactory;

    @RabbitListener(
        id = "credential_expiry_listener",
        bindings = @QueueBinding(value = @Queue(value="credentials.expiry", autoDelete="true", durable="false"),
            exchange = @Exchange(value="amq.rabbitmq.event", type=TOPIC, internal="true", durable="true"),
            key = "user.#")
    )
    public void expiryHandler(final MessageHeaders headers) {
        final String key = (String) headers.get("amqp_receivedRoutingKey");
        // See: https://www.rabbitmq.com/event-exchange.html
        if (!key.equals("user.deleted") &&
            !key.equals("user.authentication.failure")) {
            return;
        }

        final String failedName = (String) headers.get("name");
        final String prevUsername = rabbitProperties.getUsername();

        if (!failedName.equals(prevUsername)) {
            log.debug("Ignore expiry of unrelated user: " + failedName);
            return;
        }

        log.info("Refreshing Rabbit credentials...");
        refreshEndpoint.refresh();
        log.info("Refreshed username: '" + prevUsername + "' => '" + rabbitProperties.getUsername() + "'");

        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        connectionFactory.resetConnection();

        log.info("CachingConnectionFactory reset, reconnection should now begin.");
    }
}
person Andrew Regan    schedule 20.03.2018
comment
Я поднял JIRA по этому вопросу: jira.spring.io/browse/AMQP-804 - person Artem Bilan; 21.03.2018