Spring-Integration: внешняя маршрутизация

Я хотел бы разрешить вызывающим абонентам передавать внешний уведомление о маршруте, например по публикации:

POST http://localhost:8080/transform?routing-slip=capitalize&routing-slip=lowercase
Content-Type: text/plain

camelCase

Должна быть возможность использовать данный массив маршрутных листов в качестве внешнего маршрутного листа из pojo:

@Bean
public IntegrationFlow transformerChain(RoutingSlipRouteStrategy routeStrategy) {
    return IntegrationFlows.from(
        Http.inboundGateway("/transform")
            .headerExpression("routingSlipParam", 
                    "#requestParams['routing-slip']")
            .requestPayloadType(String.class))
        .enrichHeaders(spec -> spec.header(
            IntegrationMessageHeaderAccessor.ROUTING_SLIP,
              new RoutingSlipHeaderValueMessageProcessor(
                 "@routePojo.get(request, reply)")
              )
        )
        .logAndReply();
}

pojo может получить доступ к заголовку routingSlipParam, и вы могли бы подумать, что он может затем хранить слип как внутреннее состояние, или, по крайней мере, это то, что TestRoutingSlipRoutePojo заставил меня поверить, поэтому я построил это ( с небольшим сомнением, учитывая, что экземпляр pojo всего один):

public class ExternalRoutingSlipRoutePojo {

    private List<String> routingSlip;
    private int i = 0;

    public String get(Message<?> requestMessage, Object reply) {
        if (routingSlip == null) {
            routingSlip = (LinkedList)requestMessage.getHeaders()
                .get("routingSlipParam");
        }
        try {
            return this.routingSlip.get(i++);
        } catch (Exception e) {
            return null;
        }
    }

}

Получается, что это срабатывает ровно один раз, что в конце концов неудивительно — индекс увеличивается при каждом входящем сообщении, а маршрутный лист никогда не обновляется.

Поэтому я подумал, конечно, что мне нужно хранить внутренний статус для всех входящих сообщений, и придумал эту RouteStrategy:

public class ExternalRoutingSlipRouteStrategy implements RoutingSlipRouteStrategy {

  private Map<UUID, LinkedList<String>> routingSlips = new WeakHashMap<>();
  private static final LinkedList EMPTY_ROUTINGSLIP = new LinkedList<>();

  @Override
  public Object getNextPath(Message<?> requestMessage,Object reply) {
    MessageHeaders headers = requestMessage.getHeaders();

    UUID id = headers.getId();
    if (!routingSlips.containsKey(id)) {
        @SuppressWarnings("unchecked")
        List<String> routingSlipParam = 
            headers.get("routingSlipParam", List.class);
        if (routingSlipParam != null) {
            routingSlips.put(id, 
                new LinkedList<>(routingSlipParam));
        }
    }
    LinkedList<String> routingSlip = routingSlips.getOrDefault(id,
        EMPTY_ROUTINGSLIP);
    String nextPath = routingSlip.poll();
    if (nextPath == null) {
        routingSlips.remove(id);
    }
    return nextPath;
  }
}

Это также не работает, потому что стратегия вызывается не только для входящего сообщения, но и для всех новых сообщений, созданных динамической маршрутизацией, которые, конечно, имеют разные идентификаторы.

Но он вызывается только дважды для исходного сообщения, поэтому маршрутный лист никогда не исчерпывается, и приложение работает в бесконечном цикле.

Как я могу заставить spring-integration использовать внешний маршрутный лист?

ОБНОВЛЕНИЕ:

Как предложил Гэри Рассел, ни индекс внешней маршрутной квитанции, ни сам внешний маршрутный листок не должны храниться в bean-компоненте Spring, вместо этого можно использовать заголовки сообщений, чтобы поддерживать их отдельно для каждого запроса:

Http.inboundGateway("/transform")
    .headerExpression("routingSlipParam",
            "#requestParams['routing-slip']")
    .requestPayloadType(String.class))
.enrichHeaders(spec -> spec
    .headerFunction("counter",h -> new AtomicInteger())
    .header(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
        new RoutingSlipHeaderValueMessageProcessor(externalRouteStrategy)
    )
)

externalRouteStrategy является экземпляром следующего класса:

public class ExternalRoutingSlipRouteStrategy implements 
        RoutingSlipRouteStrategy {

    @Override
    public Object getNextPath(Message<?> requestMessage, Object reply) {

        List<String> routingSlip = (List<String>) 
            requestMessage.getHeaders().get("routingSlipParam");

        int routingSlipIndex = requestMessage.getHeaders()
            .get("counter", AtomicInteger.class)
            .getAndIncrement();

        String routingSlipEntry;
        if (routingSlip != null 
            && routingSlipIndex < routingSlip.size()) {

            routingSlipEntry = routingSlip.get(routingSlipIndex);
        } else {
            routingSlipEntry = null;
        }
        return routingSlipEntry;
    }
}

Для справки, я опубликовал пример на Github.


person dschulten    schedule 18.02.2019    source источник


Ответы (1)


Вернитесь к своей первой версии и сохраните i в заголовке сообщения (AtomicInteger) в расширении заголовков.

.headerExpression("counter", "new java.util.concurrent.atomic.AtomicInteger()")

потом

int i = requestMessage.getHeaders().get("counter", AtomicInteger.class).getAndIncrement();
person Gary Russell    schedule 18.02.2019
comment
Я адаптировал пример по вашему предложению, кроме того, я больше не храню маршрутный лист в pojo, а использую его прямо из заголовка. Смотрите обновленный вопрос. - person dschulten; 23.02.2019