Я использую Spring AMQP (реализация RabbitMQ) и пытаюсь распространить одну транзакцию на несколько потоков.
В качестве примера предположим, что есть 3 очереди, имена X, Y, Z, и сначала я получаю сообщение из очереди X, используя поток-1, а затем это сообщение передается в поток-0, а в поток-0 сообщение клонируется и отправляется в очередь Y через поток-2 и в очередь Z через поток-3. Поток 0 ожидает завершения как потока-3, так и потока-4, чтобы зафиксировать или откатить сообщение. Обратите внимание, что здесь я использую 4 потока.
Я хочу в основном обрабатывать эти 3 операции (получение сообщения и его размещение в двух очередях) как одну транзакцию. т.е. если я успешно отправлю сообщение в очередь Y, но не смог отправить его в Z, то сообщение, отправленное в Y, будет откатано, и исходное сообщение также будет откатано в очередь X.
До сих пор мне удавалось передавать информацию о транзакции через threadLocals (в основном TransactionStatus и TransactionSynchronizationManager.resources), и я мог связать эти 3 операции в одну транзакцию.
Но моя проблема заключается в отправке ACK / NACK в исходную очередь X, хотя я фиксирую / откатываю транзакцию, она работает только для очередей Y и Z. Сообщение, полученное от X, всегда находится в состоянии Unacked.
У меня есть попытки channel.basicAck (), RabbitUtils.commitIfNeeded (), подходы, но безуспешно.
Обратите внимание, что я также включил channelTransact. Любая помощь высоко ценится.