Как сделать сервис Pub/Sub с CometD и Jetty

Мне нужно создать службу на основе Java 8, которая предоставляет канал CometD, на который могут подписаться несколько клиентов. Идея состоит в том, что сервер может отправлять уведомления клиентам при возникновении определенных событий.

Я использую Jetty 9 в качестве контейнера сервлета (необходимого для удовлетворения требований моей группы). Я читал документацию CometD и искал какой-то пример, который я мог бы использовать. Документация обширна, но не помогает (отсутствие контекста), и я не смог найти достойный пример того, что я пытаюсь сделать.

Может ли кто-нибудь привести простой пример создания механизма публикации на Java, который можно использовать с Jetty? В противном случае, может ли кто-нибудь указать мне пример того, как это сделать?

Пожалуйста, порекомендуйте.


person Factor Three    schedule 14.09.2016    source источник


Ответы (1)


Проект CometD имеет выдающаяся задача по возвращению учебных пособий.

На этот конкретный вопрос ответил учебник по ценам на акции на стороне сервера, источник которого вы можете найти здесь, пока мы работаем над тем, чтобы вернуть его в онлайн как часть документации.

Опуская некоторые детали, сервис, который вам нужно написать, похож на сервис цен на акции из учебника: после получения внешнего события сервис должен транслировать событие подписчикам.

@Service
public class StockPriceService implements StockPriceEmitter.Listener
{
    @Inject
    private BayeuxServer bayeuxServer;
    @Session
    private LocalSession sender;

    public void onUpdates(List<StockPriceEmitter.Update> updates)
    {
        for (StockPriceEmitter.Update update : updates)
        {
            // Create the channel name using the stock symbol.
            String channelName = "/stock/" + update.getSymbol().toLowerCase(Locale.ENGLISH);

            // Initialize the channel, making it persistent and lazy.
            bayeuxServer.createChannelIfAbsent(channelName, new ConfigurableServerChannel.Initializer()
            {
                public void configureChannel(ConfigurableServerChannel channel)
                {
                    channel.setPersistent(true);
                    channel.setLazy(true);
                }
            });

            // Convert the Update business object to a CometD-friendly format.
            Map<String, Object> data = new HashMap<>(4);
            data.put("symbol", update.getSymbol());
            data.put("oldValue", update.getOldValue());
            data.put("newValue", update.getNewValue());

            // Publish to all subscribers.
            ServerChannel channel = bayeuxServer.getChannel(channelName);
            channel.publish(sender, data);
        }
    }
}

Класс StockPriceEmitter является источником ваших внешних событий и публикует их в StockPriceEmitter.Listener в виде StockPriceEmitter.Update событий.

То, как внешние события передаются на сервер CometD, — это деталь, которую StockPriceEmitter скрывает; это можно сделать с помощью сообщений JMS, или путем опроса внешней службы REST, или через собственный сетевой протокол, или путем опроса базы данных и т. д.

Важно то, что при поступлении внешних событий вызывается StockPriceService.onUpdates(...), и там вы можете преобразовать события в удобный для CometD формат JSON, а затем опубликовать их на канале CometD.

Публикация на канал CometD, в свою очередь, отправит сообщение всем подписчикам этого канала, как правило, удаленным клиентам, таким как браузеры.

Канал CometD был сделан ленивым, потому что это способ избежать бомбардировки клиентов очень частой частотой обновления (скажем, выше 2-4 обновлений в секунду). Вам нужно будет принять решение о ленивости канала в зависимости от вашего конкретного варианта использования.

person sbordet    schedule 15.09.2016