Приостановка потока cassandra для асинхронных операций

Я хочу приостановить мой поток cassandra для некоторых асинхронных операций перед обработкой следующей строки.

Каждая строка принимается в читаемом прослушивателе событий. Я пробовал использовать stream.pause, но на самом деле он не приостанавливает поток. Я также пробовал то же самое в прослушивателе событий «данные», и это тоже не работает. Буду чрезвычайно признателен за идеи и, возможно, разрешение. Вот мой код. использование async в readable и «ожидание» с использованием await на самом деле не предотвращает появление следующей строки до завершения асинхронной функции.

function start() {
let stream = client.stream('SELECT * FROM table');
stream
    .on('end', function () {
        console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
        console.error(err);
    })
    .on('readable', function () {
        let row = this.read();
        asyncFunctionNeedTowaitForthisBeforeNextRow()
    })
}

// Следующее ниже не работает

function start() {
let stream = client.stream('SELECT * FROM table');
stream
    .on('end', function () {
        console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
        console.error(err);
    })
    .on('readable', async function () {
        let row = this.read();
        stream.pause();
        await asyncFunctionNeedTowaitForthisBeforeNextRow();
        stream.resume();
    })
 }

person Naman Gupta    schedule 17.09.2019    source источник


Ответы (2)


Причина, по которой stream.pause() не работает, заключается в том, что событие readable запускается несколько раз, поэтому та же функция async вызывается снова. То же самое и для события data.

Я бы рекомендовал использовать настраиваемый поток с возможностью записи для правильной обработки всего этого асинхронного материала.

Доступный для записи поток будет выглядеть примерно так:

const {Writable} = require('stream');

const myWritable = new Writable({
  async write(chunk, encoding, callback) {
    let row = chunk.toString();
    await asyncFunctionNeedTowaitForthisBeforeNextRow();
    callback(); // Write completed successfully
  }
})

Затем настройте свой код, чтобы использовать эту запись:

function start() {
  let stream = client.stream('SELECT * FROM table');
  stream.pipe(myWritable);
  stream
    .on('end', function () {
      console.log(`Ended at ${Date.now()}`);
    })
    .on('error', function (err) {
      console.error(err);
    })
}
person Avraham    schedule 18.09.2019

Обратите внимание, что даже если вы объявляете свой обработчик для события 'readable' как асинхронную функцию, вызывающая сторона не ожидает завершения возвращенного обещания, потому что Stream ожидает нормального выполнения для обработчика событий.

Решение могло быть таким:

stream
  .on('end', () => {})
  .on('error', () => {})
  .on('data', row => {
    stream.pause();
    doSomethingAsync(row).then(() => stream.resume());
  });

Обратите внимание, что в идеале вы должны использовать параллелизм при выполнении чего-либо асинхронного, поэтому было бы лучше каждый раз читать пару строк, а затем останавливаться.

person jorgebg    schedule 18.09.2019