Мотивация: мне нужно установить threadKey для DelegatingSessionFactory, прежде чем я перенаправлюсь на исходящий шлюз Sftp и потом отключу threadKey.
В зависимости от клиента мне нужно использовать другую учетную запись пользователя Sftp. Учетные записи пользователей - это вопрос конфигурации в моем application.yml, я не хочу писать отдельные маршруты для каждого нового клиента.
public IntegrationFlow aDynamicSftpFlow() {
f -> f
.handle(tenantSessionDefine()) // how can I use a lambda instead?
.handle(Sftp.outboundGateway(delegatingSessionFactory, ...))
.handle(...) // undefine sftp session
}
Для установки threadKey требуется Message<?>
, а не только полезная нагрузка и заголовки. Поэтому я использую bean-компонент, потому что он принимает сообщение:
public class TenantSessionDefine {
private DelegatingSessionFactory delegatingSessionFactory;
public TenantSessionDefine(DelegatingSessionFactory delegatingSessionFactory) {
this.delegatingSessionFactory = delegatingSessionFactory;
}
public Message<?> defineSession(Message<?> message) {
return delegatingSessionFactory.setThreadKey(message, message.getHeaders()
.get("tenantId", String.class));
// used by SessionFactoryLocator
}
}
Я хотел бы написать это как лямбду, например
.handle(message -> delegatingSessionFactory.setThreadKey(message,
message.getPayload().getTenant())
но это не так просто. Лямбда, которую можно использовать с handle()
, которая принимает Message<T>
, завершает поток, потому что это функция void (MessageHandler
функциональный интерфейс). Другая лямбда - это GenericHandler, который не завершает поток, но принимает полезную нагрузку и заголовки, а не сообщение.
Это всего лишь пример, время от времени мне хотелось бы использовать handle()
с сообщением в лямбда-выражении, не прерывая поток. Как я могу это сделать?
Обновить
DelegatingSessionFactory
- не совсем подходящий пример. Поскольку установка и очистка ключа потока должны происходить до и после вызова sftp, совет подходит лучше, чем определение обработчика до и после вызова.