Как улучшить параллелизм при использовании потока отправленных событий Sinatra Server

Я работаю над созданием промежуточного программного обеспечения Rack, которое подписывается на канал Redis и отправляет сообщения клиентам с помощью событий, отправленных сервером. Sinatra предоставляет хороший DSL для этого. Однако у меня есть рабочий пример, проблема, с которой я сталкиваюсь, заключается в том, что производительность существенно снижается, когда я добираюсь до 7 или 8 клиентов. Я также столкнулся с проблемами с «мертвой блокировкой» сервера при попытке повторного использования соединения Redis между запросами.

Я использую Thin для обслуживания приложения (которое использует EventMachine под капотом). Я думал, что Sinatra DSL уже поддерживает параллелизм с EventMachine, но, может быть, это то, что мне нужно реализовать самому? Я не хочу ограничиваться только серверами на базе EventMachine (Thin, Rainbows!), если кто-то захочет использовать многопоточный сервер, такой как Puma. Что мне нужно сделать, чтобы увеличить параллелизм в моем коде?

require 'redis'
require 'sinatra/base'

class SSE < Sinatra::Base

  def send_message(json)
    "id: #{Time.now}\n" +
    "data: #{json}" +
    "\r\n\n"
  end

  get '/channels/:id/subscribe', provides: 'text/event-stream' do
    channel_id = params['id']
    stream(:keep_open) do |connection|
      Redis.new.subscribe("channels:#{channel_id}") do |on|
        on.message do |channel, json|
          connection << send_message(json)
        end
      end
    end
  end

end

person Andrew    schedule 05.08.2013    source источник


Ответы (2)


На ум приходит несколько вещей, поэтому я повторю их в произвольном порядке.

Я использую Thin для обслуживания приложения (которое использует EventMachine под капотом). Я думал, что Sinatra DSL уже поддерживает параллелизм с EventMachine, но, может быть, это то, что мне нужно реализовать самому?

Вы правы, Thin использует EventMachine. Однако особенность EventMachine (или любого другого реактора) заключается в том, что как только вы выполняете синхронную операцию, вы останавливаете весь реактор. Таким образом, чтобы действительно получить ожидаемый параллелизм, вам необходимо продолжать использовать EventMachine во всем приложении.

Проверьте em-hiredis для клиента Redis с поддержкой EventMachine, который поддерживает pub/sub.

Я не хочу ограничиваться только серверами на базе EventMachine (Thin, Rainbows!), если кто-то захочет использовать многопоточный сервер, такой как Puma.

Никогда не пробовал то, что я собираюсь сказать, но я не думаю, что у вас возникнут проблемы с использованием EventMachine на сервере, который этого не делает. Просто не забудьте начать свой собственный EM. Может в config.ru?

Я также столкнулся с проблемами с «мертвой блокировкой» сервера при попытке повторно использовать соединение Redis между запросами.

Я считаю, что причина, по которой вы сталкиваетесь с этим, заключается в том, что каждый вызов «/channels/:id/subscribe» открывает новое соединение с Redis. Вы можете иметь только так много из них открытыми. Рассмотрите возможность рефакторинга Redis.new в общее соединение для вашего приложения. Только открыть его один раз. Одно соединение Redis должно поддерживать несколько публикаций/подписок.

Просто некоторые мысли, и я надеюсь, что они помогут.

person Jason Whitehorn    schedule 05.10.2013

После долгих исследований и экспериментов вот код, который я использую с sinatra + sinatra sse gem:

class EventServer < Sinatra::Base
 include Sinatra::SSE
 set :connections, []
 .
 .
 .
 get '/channel/:channel' do
 .
 .
 .
  sse_stream do |out|
    settings.connections << out
    out.callback {
      puts 'Client disconnected from sse';
      settings.connections.delete(out);
    }
  redis.subscribe(channel) do |on|
      on.subscribe do |channel, subscriptions|
        puts "Subscribed to redis ##{channel}\n"
      end
      on.message do |channel, message|
        puts "Message from redis ##{channel}: #{message}\n"
        message = JSON.parse(message)
        .
        .
        .
        if settings.connections.include?(out)
          out.push(message)
        else
          puts 'closing orphaned redis connection'
          redis.unsubscribe
        end
      end
    end
  end
end

Соединение Redis блокирует on.message и принимает только команды (p)subscribe/(p)unsubscribe. После того, как вы отмените подписку, соединение Redis больше не блокируется и может быть разблокировано объектом веб-сервера, экземпляр которого был создан первоначальным запросом sse. Он автоматически очищается, когда вы получаете сообщение о Redis, и соединение sse с браузером больше не существует в массиве коллекции.

person henry74    schedule 24.09.2014
comment
Спасибо за хороший пример, единственное, с чем я столкнулся, это то, что только первое соединение сработало, как и ожидалось. После исследования я обнаружил, что Redis.new следует вызывать в каждом запросе, иначе будет обрабатываться только первый. - person Max Prokopov; 12.08.2016