Возможно, основная проблема заключается в том, как модуль node-kafka, который я использую, реализовал вещи, но, возможно, нет, поэтому здесь мы идем...
Используя библиотеку node-kafa, я столкнулся с проблемой подписки на события consumer.on('message')
. Библиотека использует стандартный модуль events
, поэтому я думаю, что этот вопрос может быть достаточно общим.
Моя реальная структура кода большая и сложная, поэтому вот псевдопример базовой схемы, чтобы подчеркнуть мою проблему. (Примечание: этот фрагмент кода не тестировался, поэтому здесь могут быть ошибки, но в любом случае синтаксис здесь не обсуждается)
var messageCount = 0;
var queryCount = 0;
// Getting messages via some event Emitter
consumer.on('message', function(message) {
message++;
console.log('Message #' + message);
// Making a database call for each message
mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) {
queryCount++;
console.log('Query #' + queryCount);
});
})
Что я вижу здесь, так это то, что когда я запускаю свой сервер, есть около 100 000 незавершенных сообщений, которые kafka захочет мне передать, и он делает это через эмиттер событий. Итак, я начинаю получать сообщения. Чтобы получить и зарегистрировать все сообщения, требуется около 15 секунд.
Это то, что я ожидал увидеть на выходе, предполагая, что запрос mysql достаточно быстрый:
Message #1
Message #2
Message #3
...
Message #500
Query #1
Message #501
Message #502
Query #2
... and so on in some intermingled fashion
Я ожидал бы этого, потому что мой первый результат mysql должен быть готов очень быстро, и я ожидаю, что результат (ы) по очереди в цикле событий обработает ответ. Что я на самом деле получаю:
Message #1
Message #2
...
Message #100000
Query #1
Query #2
...
Query #100000
Я получаю каждое сообщение до того, как ответ mysql сможет быть обработан. Итак, мой вопрос, почему? Почему я не могу получить один результат из базы данных, пока не будут завершены все события сообщений?
Еще одно замечание: я установил точку останова на .emit('message')
в node-kafka и на mysql.query()
в своем коде, и я использую их пошагово. Таким образом, оказывается, что все 100 000 эмитов не складываются заранее, прежде чем попасть в мой подписчик событий. Так появилась моя первая гипотеза по проблеме.
Идеи и знания приветствуются :)
node-kafka
с достаточно большим значениемfetchMaxBytes
, чтобы все эти 100 тыс. сообщений передавались в одном запросе? EventEmitter является синхронным, он не использует цикл обработки событий Node, поэтому, если одновременно приходит 100 000 сообщений, все они могут быть отправлены до того, как ваш асинхронный код получит возможность запуститься. - person robertklep   schedule 04.05.2015fetchMaxBytes: 1024*10
. В других переопределениях по умолчанию у них были значения, равные значениям по умолчанию, и они даже отметили это, поэтому я предположил, что это относится и к этому свойству. Ваш вопрос вдохновил меня изучить их код и увидеть, что по умолчанию на самом деле этоfetchMaxBytes: 1024*1024
. Так что да, я фактически принимал ВСЕ сообщения в одном запросе. А я и не знал, что EventEmitter синхронный :) - person Eric Olson   schedule 04.05.2015