Spring Integration DSL: лямбда для возврата сообщения ‹T› в методе дескриптора, например с DelegatingSessionFactory?

Мотивация: мне нужно установить threadKey для DelegatingSessionFactory, прежде чем я перенаправлюсь на исходящий шлюз Sftp и потом отключу threadKey.

В зависимости от клиента мне нужно использовать другую учетную запись пользователя Sftp. Учетные записи пользователей - это вопрос конфигурации в моем application.yml, я не хочу писать отдельные маршруты для каждого нового клиента.

public IntegrationFlow aDynamicSftpFlow() {
    f -> f
        .handle(tenantSessionDefine()) // how can I use a lambda instead?
        .handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
        .handle(...) // undefine sftp session
}

Для установки threadKey требуется Message<?>, а не только полезная нагрузка и заголовки. Поэтому я использую bean-компонент, потому что он принимает сообщение:

public class TenantSessionDefine {


    private DelegatingSessionFactory delegatingSessionFactory;

    public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
        this.delegatingSessionFactory = delegatingSessionFactory;
    }

    public Message<?> defineSession(Message<?> message) {
        return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
            .get("tenantId", String.class)); 
// used by SessionFactoryLocator
    }
}

Я хотел бы написать это как лямбду, например

.handle(message -> delegatingSessionFactory.setThreadKey(message, 
    message.getPayload().getTenant())

но это не так просто. Лямбда, которую можно использовать с handle(), которая принимает Message<T>, завершает поток, потому что это функция void (MessageHandler функциональный интерфейс). Другая лямбда - это GenericHandler, который не завершает поток, но принимает полезную нагрузку и заголовки, а не сообщение.

Это всего лишь пример, время от времени мне хотелось бы использовать handle() с сообщением в лямбда-выражении, не прерывая поток. Как я могу это сделать?

Обновить

DelegatingSessionFactory - не совсем подходящий пример. Поскольку установка и очистка ключа потока должны происходить до и после вызова sftp, совет подходит лучше, чем определение обработчика до и после вызова.


person dschulten    schedule 02.03.2019    source источник


Ответы (1)


Понятно. В javadoc для handle() говорится

Используйте handle(Class, GenericHandler), если вам нужно получить доступ ко всему сообщению.

Параметр Class должен быть Message.class:

.handle(Message.class, 
    (message, headers) -> sftpSessionFactory
        .setThreadKey(message, headers.get("tenantId")))
person dschulten    schedule 02.03.2019
comment
Кроме того, существует пара setThreadKey / clearThreadKey, которая не требует сообщения. Кроме того, установка ключа потока для DelegatingSessionFactory, вероятно, лучше делать в AbstractRequestHandlerAdvice, см. Обновление вопроса. - person dschulten; 02.03.2019