Поток, работающий в промежуточном программном обеспечении, использует старую версию переменной экземпляра родителя.

Я использовал руководство Heroku для реализации веб-сокетов.

Он корректно работает с Thin, но не работает с Unicorn и Puma.

Также реализовано эхо-сообщение, которое отвечает на сообщение клиента. Он корректно работает на каждом сервере, поэтому проблем с реализацией вебсокетов не возникает.

Настройка Redis также правильная (он перехватывает все сообщения и выполняет код внутри блока subscribe).

Как это работает сейчас:

При запуске сервера инициализируется пустой массив @clients. Затем запускается новый поток, который слушает Redis и предназначен для отправки этого сообщения соответствующему пользователю из массива @clients.

При загрузке страницы создается новое соединение через веб-сокет, оно сохраняется в массиве @clients.

Если мы получаем сообщение от браузера, мы отправляем его обратно всем клиентам, подключенным к одному и тому же пользователю (эта часть работает правильно как на Thin, так и на Puma).

Если мы получаем сообщение от Redis, мы также ищем все соединения пользователя, хранящиеся в массиве @clients. Здесь происходит странная вещь:

  • При работе с Thin он находит подключения в массиве @clients и отправляет им сообщение.

  • При работе с Puma/Unicorn массив @clients всегда пуст, даже если мы попробуем его в таком порядке (без перезагрузки страницы или чего-то еще):

    1. Send message from browser -> @clients.length is 1, message is delivered
    2. Отправить сообщение через Redis -> @clients.length равно 0, сообщение потеряно
    3. Отправить сообщение из браузера -> @clients.length по-прежнему равно 1, сообщение доставлено

Может кто-нибудь разъяснить мне, что мне не хватает?

Связанная конфигурация сервера Puma:

workers 1
threads_count = 1
threads threads_count, threads_count

Связанный промежуточный код:

require 'faye/websocket'

class NotificationsBackend

  def initialize(app)
    @app     = app
    @clients = []
    Thread.new do
      redis_sub = Redis.new
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          # logging @clients.length from here will always return 0
          # [..] retrieve user
          send_message(user.id, { message: "ECHO: #{event.data}"} )
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
      ws.on :open do |event|
        # [..] retrieve current user
        if user
          # add ws connection to @clients array
        else
          # close ws
        end
      end

      ws.on :message do |event|
        # [..] retrieve current user
        Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
  def send_message user_id, message
    # logging @clients.length here will always return correct result
    # cs = all connections which belong to that client
    cs.each { |c| c.send(message.to_json) }
  end
end

person Felix Borzik    schedule 10.06.2015    source источник
comment
если вы регистрируете идентификатор процесса, когда ваш поток Redis получает событие, и когда вы изменяете @clients, вы получаете то же значение?   -  person Frederick Cheung    schedule 10.06.2015
comment
@FrederickCheung только что проверил, они разные. Метод Initialize и поток прослушивателя Redis имеют одинаковый PID, но он отличается (ниже), чем тот, в котором изменен @clients. Кстати, все клиенты хранятся в одном процессе (все они принадлежат одному и тому же PID и массиву @clients)   -  person Felix Borzik    schedule 11.06.2015


Ответы (2)


Unicorn (и, по-видимому, puma) запускают главный процесс, а затем разветвляют один или несколько рабочих процессов. fork копирует (или, по крайней мере, создает иллюзию копирования — фактическое копирование обычно происходит только тогда, когда вы пишете на страницы) весь ваш процесс, но в новом процессе существует только поток, который вызвал fork.

Очевидно, что ваше приложение инициализируется перед разветвлением — обычно это делается для того, чтобы рабочие могли быстро начать работу и извлечь выгоду из экономии памяти при копировании при записи. Как следствие, ваш поток проверки Redis работает только в главном процессе, тогда как @clients модифицируется в дочернем процессе.

Вы, вероятно, можете обойти это, либо отложив создание вашего потока Redis, либо отключив предварительную загрузку приложения, однако вы должны знать, что ваша установка не позволит вам масштабироваться за пределы одного рабочего процесса (что с puma и дружественной к потокам JVM, такой как jruby, будет меньше ограничений)

person Frederick Cheung    schedule 11.06.2015
comment
Спасибо за объяснение. Что касается отключения предварительной загрузки приложений (хотя мне не нравится эта идея, но пока все в порядке): насколько я понимаю, единственная проблема, которую это вызовет, связана с этим блоком on.message. Поэтому, если я опубликую сообщение, полученное там, в Redis, оно будет получено всеми рабочими процессами, и это позволит мне масштабироваться до более чем одного рабочего процесса. Есть ли что-то еще, что мне не хватает? - person Felix Borzik; 11.06.2015
comment
Я не совсем понимаю, что вы делаете, но каждый раз, когда вы полагаетесь на совместное использование некоторой переменной, которая хранится только в памяти, вы будете ограничены в своей способности добавлять дополнительные процессы. - person Frederick Cheung; 11.06.2015

На всякий случай, если кто-то столкнется с той же проблемой, вот два решения, которые я придумал:

<сильный>1. Отключить предварительную загрузку приложения (это было первое решение, которое я придумал)

Просто удалите preload_app! из файла puma.rb. Поэтому все потоки будут иметь свою собственную переменную @clients. И они будут доступны другим методам промежуточного программного обеспечения (например, call и т. д.).

Недостаток: вы потеряете все преимущества предварительной загрузки приложения. Это нормально, если у вас есть только 1 или 2 воркера с парой потоков, но если вам нужно их много, то лучше иметь предварительную загрузку приложения. Итак, я продолжил свои исследования, и вот еще одно решение:

<сильный>2. Вынести инициализацию потока из метода initialize (это то, что я сейчас использую)

Например, я переместил его в метод call, вот так выглядит код класса промежуточного ПО:

attr_accessor :subscriber

def call(env)
  @subscriber ||= Thread.new do # if no subscriber present, init new one
    redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
    redis_sub.subscribe(CHANNEL) do |on|
      on.message do |_, msg|
        # parsing message code here, retrieve user
        send_message(user.id, { message: "ECHO: #{event.data}"} )
      end
    end
  end
  # other code from method
end

Оба решения решают одну и ту же проблему: поток прослушивания Redis будет инициализирован для каждого рабочего потока/потока Puma, а не для основного процесса (который на самом деле не обслуживает запросы).

person Felix Borzik    schedule 13.09.2015