Как реализовать распределенную блокировку вокруг поллера в Spring Integration с помощью ZooKeeper

Spring Integration имеет поддержку ZooKeeper, как описано в https://docs.spring.io/spring-integration/reference/html/zookeeper.html Однако этот документ настолько расплывчатый.

Он предлагает добавить ниже bean-компонент, но не дает подробностей о том, как запустить / остановить опрос, когда узлу предоставлено лидерство.

@Bean
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
    return new LeaderInitiatorFactoryBean()
                .setClient(client)
                .setPath("/siTest/")
                .setRole("cluster");
}

Есть ли у нас какой-либо пример того, как с помощью zookeeper убедиться, что нижеприведенный опросчик запускается только один раз в кластере в любое время?

@Component
public class EventsPoller {

    public void pullEvents() {
        //pull events should be run by only one node in the cluster at any time
    }
}



Ответы (1)


LeaderInitiator испускает OnGrantedEvent и OnRevokedEvent, когда становится лидером и его лидерство отменяется.

См. https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#endpoint-roles и следующий https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#лидерство-событие-обработка для получения дополнительной информации об обработке этих событий и о том, как это влияет на ваши компоненты в конкретной роли.

Хотя я согласен, что у главы Zookkeper должна быть какая-то ссылка на эту SmartLifecycleRoleController главу. Не стесняйтесь поднимать JIRA по этому вопросу, и ваш вклад приветствуется!

ОБНОВЛЕНИЕ

Вот что я сделал в нашем тесте:

@RunWith(SpringRunner.class)
@DirtiesContext
public class LeaderInitiatorFactoryBeanTests extends ZookeeperTestSupport {

    private static CuratorFramework client;

    @Autowired
    private PollableChannel stringsChannel;

    @BeforeClass
    public static void getClient() throws Exception {
        client = createNewClient();
    }

    @AfterClass
    public static void closeClient() {
        if (client != null) {
            client.close();
        }
    }

    @Test
    public void test() {
        assertNotNull(this.stringsChannel.receive(10_000));
    }


    @Configuration
    @EnableIntegration
    public static class Config {

        @Bean
        public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
            return new LeaderInitiatorFactoryBean()
                    .setClient(client)
                    .setPath("/siTest/")
                    .setRole("foo");
        }

        @Bean
        public CuratorFramework client() {
            return LeaderInitiatorFactoryBeanTests.client;
        }

        @Bean
        @InboundChannelAdapter(channel = "stringsChannel", autoStartup = "false", poller = @Poller(fixedDelay = "100"))
        @Role("foo")
        public Supplier<String> inboundChannelAdapter() {
            return () -> "foo";
        }

        @Bean
        public PollableChannel stringsChannel() {
            return new QueueChannel();
        }

    }

}

А у меня в логах что-то вроде этого:

2018-12-14 10:12:33,542 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Starting [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
2018-12-14 10:12:33,578 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Stopping [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
person Artem Bilan    schedule 13.12.2018
comment
есть ли у вас какой-нибудь пример того, как простым голосующим можно управлять на выборах лидера зоопарка? Я попытался реализовать SmartLifecycle, добавив DefaultCandidate в bean-компонент leaderInitiator. Но все же я не вижу, чтобы методы жизненного цикла вызывались или управлялись zookeeper. Хотите знать, есть ли какой-нибудь пример для простого опросчика? - person suman j; 14.12.2018
comment
Пожалуйста, посмотрите ОБНОВЛЕНИЕ в моем ответе. - person Artem Bilan; 14.12.2018