Развертывание прямой привязки с Kafka в качестве шины сообщений

Я пытаюсь использовать прямую привязку к следующему потоку.

stream create --definition "time | log" --name ticktock
stream deploy ticktock --properties module.*.count=0

Развертывание завершается с ошибкой с этим исключением как на узле администратора, так и на узле контейнера:

java.lang.IllegalArgumentException: Module count cannot be zero
    at org.springframework.xd.dirt.integration.kafka.KafkaMessageBus$KafkaPropertiesAccessor.getNumberOfKafkaPartitionsForProducer(KafkaMessageBus.java:799)
    at org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.bindProducer(KafkaMessageBus.java:500)
    at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageProducer(AbstractMessageBusBinderPlugin.java:287)
    at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:143)
    at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73)
    at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
    at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
    at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
    at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
    at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
    at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
    at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:509)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:503)
    at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:92)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
    at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
    at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$10.run(PathChildrenCache.java:762)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

У меня есть кластер Spring-XD (1.2.0) с одним администратором и двумя контейнерными узлами, использующими Kafka в качестве шины сообщений.

Я делаю что-то неправильно? Или проблема с прямой привязкой и шиной сообщений Kafka?


person chris_fitz    schedule 23.07.2015    source источник


Ответы (1)


Согласно документации, XD KafkaMessageBus в настоящее время не поддерживает прямую привязку...

ПРИМЕЧАНИЕ. Шина сообщений Kafka не поддерживает count=0 для развертывания модулей и, следовательно, не поддерживает прямую привязку модулей. Эта функция будет доступна в будущем выпуске. Между тем, если для развертывания Kafka необходима прямая связь между модулями, вместо этого следует использовать составные модули.

person Gary Russell    schedule 23.07.2015