События Node.js EventEmitter не используют цикл событий

Возможно, основная проблема заключается в том, как модуль node-kafka, который я использую, реализовал вещи, но, возможно, нет, поэтому здесь мы идем...

Используя библиотеку node-kafa, я столкнулся с проблемой подписки на события consumer.on('message'). Библиотека использует стандартный модуль events, поэтому я думаю, что этот вопрос может быть достаточно общим.

Моя реальная структура кода большая и сложная, поэтому вот псевдопример базовой схемы, чтобы подчеркнуть мою проблему. (Примечание: этот фрагмент кода не тестировался, поэтому здесь могут быть ошибки, но в любом случае синтаксис здесь не обсуждается)

var messageCount = 0;
var queryCount = 0;

// Getting messages via some event Emitter
consumer.on('message', function(message) {
    message++;
    console.log('Message #' + message);

    // Making a database call for each message
    mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) {
        queryCount++;
        console.log('Query   #' + queryCount);
    });
})

Что я вижу здесь, так это то, что когда я запускаю свой сервер, есть около 100 000 незавершенных сообщений, которые kafka захочет мне передать, и он делает это через эмиттер событий. Итак, я начинаю получать сообщения. Чтобы получить и зарегистрировать все сообщения, требуется около 15 секунд.

Это то, что я ожидал увидеть на выходе, предполагая, что запрос mysql достаточно быстрый:

Message #1
Message #2
Message #3
...
Message #500
Query   #1
Message #501
Message #502
Query   #2
... and so on in some intermingled fashion

Я ожидал бы этого, потому что мой первый результат mysql должен быть готов очень быстро, и я ожидаю, что результат (ы) по очереди в цикле событий обработает ответ. Что я на самом деле получаю:

Message #1
Message #2
...
Message #100000
Query   #1
Query   #2
...
Query   #100000

Я получаю каждое сообщение до того, как ответ mysql сможет быть обработан. Итак, мой вопрос, почему? Почему я не могу получить один результат из базы данных, пока не будут завершены все события сообщений?

Еще одно замечание: я установил точку останова на .emit('message') в node-kafka и на mysql.query() в своем коде, и я использую их пошагово. Таким образом, оказывается, что все 100 000 эмитов не складываются заранее, прежде чем попасть в мой подписчик событий. Так появилась моя первая гипотеза по проблеме.

Идеи и знания приветствуются :)


person Eric Olson    schedule 04.05.2015    source источник
comment
Что произойдет, если вы увеличите количество сохраненных сообщений до гораздо большего числа? Возможно ли, что ваш MySQL просто такой медленный?   -  person Avery    schedule 04.05.2015
comment
@Avery Я задавался этим вопросом, но когда я повторяю это только с одним сообщением для обработки, я даже не могу ощутить задержку ответа mysql. Все это тоже работает локально. И фактический запрос mysql чрезвычайно прост (всего SELECT для ~ 8 полей из одной строки таблицы, и в этой таблице сейчас всего около 60 строк)   -  person Eric Olson    schedule 04.05.2015
comment
Если этот пример действительно представляет ваш код, то я тоже заблудился. Можете ли вы на самом деле получить этот результат с этим примером? У меня нет экземпляра MySQL для тестирования.   -  person Avery    schedule 04.05.2015
comment
Настроили ли вы node-kafka с достаточно большим значением fetchMaxBytes, чтобы все эти 100 тыс. сообщений передавались в одном запросе? EventEmitter является синхронным, он не использует цикл обработки событий Node, поэтому, если одновременно приходит 100 000 сообщений, все они могут быть отправлены до того, как ваш асинхронный код получит возможность запуститься.   -  person robertklep    schedule 04.05.2015
comment
@Avery Я просто вставил приведенный выше пример в свой реальный код и запустил его. Я получаю те же результаты. Примерно через 20 секунд журналы сообщений завершаются, и сразу же начинаются журналы запросов.   -  person Eric Olson    schedule 04.05.2015
comment
@robertklep Спасибо! Таким образом, в примерах kafka-node они показывают пример переопределений по умолчанию с fetchMaxBytes: 1024*10. В других переопределениях по умолчанию у них были значения, равные значениям по умолчанию, и они даже отметили это, поэтому я предположил, что это относится и к этому свойству. Ваш вопрос вдохновил меня изучить их код и увидеть, что по умолчанию на самом деле это fetchMaxBytes: 1024*1024. Так что да, я фактически принимал ВСЕ сообщения в одном запросе. А я и не знал, что EventEmitter синхронный :)   -  person Eric Olson    schedule 04.05.2015
comment
@EricOlson Из-за документации я действительно предположил, что 10 КБ также было значением по умолчанию.   -  person robertklep    schedule 04.05.2015
comment
@robertklep Я изменил переопределение на 1024 * 10, как в документации, и получил ожидаемые результаты :)   -  person Eric Olson    schedule 04.05.2015
comment
@robertklep Просто подталкиваю вас опубликовать свой ответ как ответ :)   -  person JMM    schedule 07.05.2015
comment
@JMM сделано. Также добавлен возможный обходной путь для большого количества поступающих событий.   -  person robertklep    schedule 07.05.2015
comment
Потрясающий! @EricOlson, отметьте как правильное.   -  person JMM    schedule 07.05.2015


Ответы (1)


Драйвер node-kafka использует довольно свободный размер буфера (1M), что означает, что он получит столько сообщений от Kafka, сколько поместится в буфере. Если сервер загружен, и в зависимости от размера сообщения это может означать (десятки) тысяч сообщений, поступающих с одним запросом.

Поскольку EventEmitter является синхронным (он не использует цикл событий Node), это означает, что драйвер будет передавать (десятки) тысяч событий своим слушателям, и, поскольку он синхронный, он не уступит циклу событий Node до тех пор, пока все сообщения доставлены.

Я не думаю, что вы можете обойти поток доставки событий, но я не думаю, что конкретно доставка событий является проблематичной. Более вероятная проблема заключается в запуске асинхронной операции (в данном случае запроса MySQL) для каждого события, что может привести к переполнению базы данных запросами.

Возможным обходным решением может быть использование очереди вместо выполнения запросов непосредственно из обработчиков событий. Например, с помощью async.queue вы можете ограничить количество одновременных (асинхронных) задач. «Рабочая» часть очереди будет выполнять запрос MySQL, а в обработчиках событий вы просто поместите сообщение в очередь.

person robertklep    schedule 07.05.2015
comment
Спасибо @robertklep. Я попробую async.queue. Я передаю свою собственную очередь, чтобы был только один запрос mysql и кэширование результатов для ожидающих запросов, но я подозреваю, что хорошо поддерживаемый/протестированный модуль будет лучше :) - person Eric Olson; 07.05.2015