Я пытаюсь использовать интеграцию Spring с RabbitMQ, используя каналы интеграции Spring, поддерживаемые RabbitMQ. (Что кажется почти не задокументированным по какой-то причине, это что-то новое?).
Для этого, кажется, я могу использовать AmqpChannelFactoryBean для создания канала. Чтобы настроить преобразование сообщений, я использую файл Jackson2JsonMessageConverter.
Когда я использую GenericMessage с полезной нагрузкой POJO, он отказывается десериализовать его из Java, в основном потому, что не знает тип. Я ожидал, что тип будет автоматически помещен в заголовок, но в заголовке есть только __TypeId__=org.springframework.messaging.support.GenericMessage
.
В Spring boot мой класс конфигурации выглядит так:
@Configuration
public class IntegrationConfiguration {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
MessageConverter messageConverter) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("myActivateOut");
factoryBean.setPubSub(false);
factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
factoryBean.setMessageConverter(messageConverter);
return factoryBean;
}
@Bean
@ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
public MessageHandler mqttOutbound() {
return m -> System.out.println(m);
}
}
Отправка осуществляется так:
private final MessageChannel myActivateOutChannel;
@Autowired
public MySender(MessageChannel myActivateOutChannel) {
this.myActivateOutChannel = myActivateOutChannel;
}
@Override
public void run(ApplicationArguments args) throws Exception {
MyPojo pojo = new MyPojo();
Message<MyPojo> msg = new GenericMessage<>(pojo);
myActivateOutChannel.send(msg);
}
Если я установлю свой собственный classmapper, все будет работать так, как должно. Но мне пришлось бы использовать много MessageConverters, если бы я настраивал такие вещи. Например.
converter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class< ? > clazz, MessageProperties properties) {
}
@Override
public Class< ? > toClass(MessageProperties properties) {
return MyPojo.class;
}
});
Я использую это неправильно? Я пропустил какую-то конфигурацию? Любые другие предложения?
Спасибо!! :)
Примечание. Глядя на вещи подробнее, я предполагаю, что способ «интеграции Spring» будет состоять в том, чтобы добавить преобразователь JSON интеграции Spring с каждой стороны, что означает также добавление двух дополнительных прямых каналов на очередь RabbitMQ? Мне это кажется неправильным, так как тогда у меня тройное количество каналов (6! для входа/выхода), но, может быть, именно так должна использоваться структура? Соедините все простые шаги с прямыми каналами? (Сохраню ли я постоянство, которое предлагают каналы RabbitMQ в этом случае? Или мне нужен какой-то механизм транзакций, если я этого хочу? Или это присуще тому, как работают прямые каналы?)
Я также заметил, что теперь есть как Spring-integration MessageConverter, так и Spring-amqp MessageConverter. Последний - тот, который я использовал. Будет ли другой работать так, как я хочу? Беглый взгляд на код показывает, что он не сохраняет тип объекта в заголовке сообщения?