Я реализовал многопоточность для выполнения некоторых операций в jira после получения сообщения от rabbitmq. Я использую spring amqp (версия 1.6.1)
Как только поток перехватывает исключение, я устанавливаю статус как ошибку и объект вывода, о котором я буду говорить в будущем. При отправке этого выходного объекта в очередь. Я столкнулся с вышеупомянутым исполнением
Код:
Завод подключения:
@Configuration
@PropertySources({ @PropertySource("classpath:application.properties") })
public class RabbitMQConfiguration {
@Autowired
private Environment environment;
@Bean
public ConnectionFactory connectionFactory() {
// TODO make it possible to customize in subclasses.
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(environment.getProperty("bip.rabbitmq.url"));
connectionFactory.setUsername(environment.getProperty("bip.rabbitmq.username"));
connectionFactory.setPassword(environment.getProperty("bip.rabbitmq.password"));
return connectionFactory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* @return the admin bean that can declare queues etc.
*/
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean(name = "jiraQueueListenerContainerFactory")
public SimpleRabbitListenerContainerFactory jiraQueueListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setReceiveTimeout(10L);
return factory;
}
}
MessageHandler:
@Component
@PropertySources({ @PropertySource("classpath:application.properties") })
public class JiraMessageHandler {
@Autowired
private RabbitTemplate rabbitTemplate;
private static ExecutorService executor = Executors.newFixedThreadPool(Constants.THREAD_SIZE);
private static Logger logger = LogManager.getLogger(JiraMessageHandler.class);
@RabbitListener(containerFactory = "jiraQueueListenerContainerFactory", queues = Constants.QUEUE_NAME)
public void handleMessage(HashMap<String, Object> jiraMessage) {
logger.info(jiraMessage.toString() + System.currentTimeMillis());
logger.info("Jira Message Handler");
BaseServerAdapter jiraProcessingAdapter = new JiraProcessingAdapter();
Future future = executor.submit(jiraProcessingAdapter);
JiraAdapterOutput jiraAdapterOutput = new JiraAdapterOutput();
Future future = executor.submit(jiraProcessingAdapter);
jiraAdapterOutput = (JiraAdapterOutput) future.get();
try {
jiraAdapterOutput = (JiraAdapterOutput) future.get();
if (jiraAdapterOutput.getOutputMap().get("activityStatus") == "SUCCESS") {
logger.info("Successfully Executed Jira ::: " + new Date() + "::: "
+ jiraAdapterOutput.getOutputMap().get("jiraId"));
rabbitTemplate.convertAndSend(Constants.ADAPTER_OUTPUT_QUEUE, senderMap);
}else if (jiraAdapterOutput.getOutputMap().get("activityStatus").equalsIgnoreCase("FAIL")) {
logger.info("Successfully Executed Jira ::: " + new Date() + "::: "
+ jiraAdapterOutput.getOutputMap().get("jiraId"));
sendMessageForProcessingToBIP(senderMap);
}
private boolean sendMessageForProcessingToBIP(HashMap<String, ExchangeDTO> senderMap) {
try {
rabbitTemplate.convertAndSend(Constants.WFM_ERROR_QUEUE, senderMap);
return true;
} catch (Exception e) {
**logger.info("Message sending failed, try again:::::::" + e.getMessage());**
}
return false;
}
Он показывает «Контекст приложения закрыт, и ConnectionFactory больше не может создавать подключения».
Я делаю что-то не так. Я также упомянул здесь: https://jira.spring.io/browse/AMQP-546 < / а>