Транзакции Spring AMQP в многопоточности

Я использую Spring AMQP (реализация RabbitMQ) и пытаюсь распространить одну транзакцию на несколько потоков.

В качестве примера предположим, что есть 3 очереди, имена X, Y, Z, и сначала я получаю сообщение из очереди X, используя поток-1, а затем это сообщение передается в поток-0, а в поток-0 сообщение клонируется и отправляется в очередь Y через поток-2 и в очередь Z через поток-3. Поток 0 ожидает завершения как потока-3, так и потока-4, чтобы зафиксировать или откатить сообщение. Обратите внимание, что здесь я использую 4 потока.

Я хочу в основном обрабатывать эти 3 операции (получение сообщения и его размещение в двух очередях) как одну транзакцию. т.е. если я успешно отправлю сообщение в очередь Y, но не смог отправить его в Z, то сообщение, отправленное в Y, будет откатано, и исходное сообщение также будет откатано в очередь X.

До сих пор мне удавалось передавать информацию о транзакции через threadLocals (в основном TransactionStatus и TransactionSynchronizationManager.resources), и я мог связать эти 3 операции в одну транзакцию.

Но моя проблема заключается в отправке ACK / NACK в исходную очередь X, хотя я фиксирую / откатываю транзакцию, она работает только для очередей Y и Z. Сообщение, полученное от X, всегда находится в состоянии Unacked.

У меня есть попытки channel.basicAck (), RabbitUtils.commitIfNeeded (), подходы, но безуспешно.

Обратите внимание, что я также включил channelTransact. Любая помощь высоко ценится.


person Sajith Dilshan    schedule 23.09.2016    source источник
comment
Во время фиксации поток 1 активен? Может мертвый, это ждать чего-нибудь?   -  person Mr_Thorynque    schedule 23.09.2016
comment
Во время фиксации поток-1 возвращается в пул (все его локальные переменные потока очищаются). Фиксация выполняется потоком-0. Я обновил вопрос.   -  person Sajith Dilshan    schedule 23.09.2016
comment
Возможно, установите txSize в 1 только для вашего теста или отправьте больше сообщений, чем txSize во время теста.   -  person Mr_Thorynque    schedule 23.09.2016
comment
По умолчанию для txSize установлено значение 1   -  person Sajith Dilshan    schedule 23.09.2016


Ответы (1)


Транзакции Spring привязаны к одному потоку. Возможно, вы сможете заставить его работать напрямую, используя клиент RabbitMQ, но вам придется использовать один и тот же канал для всех потоков. Однако в документации RabbitMQ настоятельно не рекомендуется делать это:

Экземпляры каналов не должны совместно использоваться потоками. Приложениям следует предпочесть использование канала для каждого потока вместо совместного использования одного и того же канала в нескольких потоках. В то время как некоторые операции над каналами можно безопасно вызывать одновременно, некоторые нет, и это приведет к неправильному чередованию кадров в проводе. ...

В любом случае, даже если вы выполняете работу в одном потоке, транзакции RabbitMQ довольно слабые и не гарантируют многого; см. семантику брокера.

AMQP гарантирует атомарность только тогда, когда транзакции включают одну очередь, то есть все публикации внутри tx направляются в одну очередь, и все подтверждения относятся к сообщениям, полученным из одной очереди. ...

person Gary Russell    schedule 23.09.2016
comment
Я хотел бы знать, как транзакция привязана к потоку. Немного покопавшись, я нашел двух локальных потоков, которые, похоже, хранят детали транзакции, и это TransactionSynchronizationManager.resources и ConsumerChannelRegistry.consumerChannel. Я попытался переместить эти локальные переменные потока из одного потока в другой, чтобы переключать транзакции между потоками, но, как я уже упоминал в исходном вопросе, первая очередь не подтверждается. Что мне здесь не хватает? - person Sajith Dilshan; 23.09.2016
comment
Я не знаю, что вам не хватает, но, как я уже указал, вам не рекомендуется использовать канал в нескольких потоках. - person Gary Russell; 23.09.2016
comment
Большое спасибо за разъяснения, Гэри. Но в любом случае я буду копать еще немного, чтобы увидеть, возможно ли это технически, даже если этого не рекомендуется. - person Sajith Dilshan; 24.09.2016
comment
Наконец разобрался. Как оказалось, мне нужно вручную выполнить метод ConnectionFactoryUtils.registerDeliveryTag () до извлечения локального потока TransactionSynchronizationManager.resources. - person Sajith Dilshan; 24.09.2016