Очередь для сообщений из двух источников в Apache Camel в хронологическом порядке

Я хотел бы реализовать следующий сценарий в Apache Camel (входит в состав JBoss Fuse):

У меня есть две системы, обе они производят события, хранящиеся в базе данных отдельно. Теперь мне нужно прочитать события из этих таблиц событий и поместить их как сообщения в очередь (реализовано ActiveMQ). Но что действительно важно, мне нужно сохранить хронологический порядок (время создания) событий в этой очереди, независимо от того, где событие было создано.

Я ищу решение, которое максимально использует компоненты и шаблоны из Camel framework, конечно я могу реализовать механизм чтения вне Camel (чистая Java), но я предпочитаю решение Camel.

Большое спасибо за любую идею!


person Thelvyn    schedule 27.02.2015    source источник


Ответы (1)


Я думаю, вы просто хотите поместить сообщения в очередь seda и использовать resequencer, чтобы объединить их обратно по порядку.

from("--database1--")
    to("seda:resequencer")

from("--database2--")
    to("seda:resequencer")

from("seda:resequencer")
    .resequence(header("date")).batch().timeout(5000L)
    .to("activemq:...")

Вам нужно будет обратить внимание на настройки времени ожидания и на то, что уместно.

(Примечание: я не тестировал этот код, поэтому отметьте его как предложение).

person vikingsteve    schedule 27.02.2015
comment
Кажется, это рабочее решение, можно ли обнаружить (остановить) обработку сообщений, когда одна (или обе) базы данных недоступны? Я имею в виду, когда один из них не работает, могу ли я перестать передавать сообщения от второго. - person Thelvyn; 27.02.2015
comment
Конечно, я думаю, вы могли бы использовать onException, чтобы затем перейти к новому маршруту, где вы, в свою очередь, используете controlBus, чтобы остановить другой маршрут базы данных, или, проще говоря, просто установить логический флаг keepProcessing для компонента, который используется в фильтре на обоих маршрутах. . - person vikingsteve; 27.02.2015
comment
+последний пункт, не уверен, что seda:resequencer необходим, сначала стоит попробовать direct:resequencer. - person vikingsteve; 01.03.2015