Эффективный способ синхронизированной массовой обработки документов в базе данных Cloudant с использованием nodejs

Пакетная обработка - это метод выполнения больших объемов повторяющихся заданий с данными, который требует выполнения многих скрытых действий. Это включает в себя применение обновлений, обработку данных, интеграцию данных из нескольких источников, извлечение данных и многое другое. В этой статье мы обсудим, как написать пакетный процесс для базы данных Cloudant NoSQL с помощью nodejs.

nodejs используют управляемую событиями асинхронную неблокирующую модель ввода-вывода. Таким образом, учитывая сценарий, в котором тысячи или даже миллионы записей должны быть обработаны или обновлены, это может повлиять на производительность базы данных, поскольку он будет почти одновременно запускать все вызовы одновременно. В этой статье мы обсудим способы решения этой проблемы с использованием синхронного механизма nodejs вместе с Cloudant Bulk API.

Чтобы продемонстрировать пакетную обработку на примере, давайте реплицируем базу данных « movies-demo , указанную в examples.cloudant.com .

См. Пример здесь для настройки репликации базы данных.

Сценарий

В этой статье мы будем использовать базу данных «movies-demo» и обновим все документы, которые старше 2000 года, с новым атрибутом Old_movie:true с помощью nodejs.

Шаги

  1. Создайте представление в базе данных Cloudant «фильмы-демо», чтобы отфильтровать документы старше 2000 года.
  2. Напишите сценарий nodejs для добавления нового атрибута Old_movie:true во все эти документы, выбранные представлением, синхронизированным образом.
  3. Импровизируйте скрипт узла с помощью Cloudant Bulk API.

Шаг 1

Создайте представление в базе данных Cloudant NoSQL «movies-demo», чтобы отфильтровать документы для обработки.

Нажмите на «Новый вид» на значке «плюс» в «Все документы» или «Проектные документы».

Давайте присвоим представлению имя «get-old-movies» и имя дизайн-документа «data» и вставим код, как показано ниже.

function (doc) {
  if(doc.Movie_year < 2000){
    emit(doc.Movie_year, doc.Movie_name);
  }
}

Нажмите кнопку «Создать документ, а затем создать индекс».

Шаг 2:

Напишите сценарий nodejs для добавления нового атрибута Old_movie:true во все эти выбранные документы.

Скрипт выполняет следующие три функции:

  • Получите соединение с базой данных Cloudant.
  • Получите данные из Cloudant, представьте «старые фильмы», добавьте новый атрибут в каждый документ и поместите их в массив.
  • Итерируйте массив и обновляйте документы базы данных синхронно.

скрипт nodejs с синхронным обновлением базы данных Cloudant

const cloudant = require('@cloudant/cloudant');
const cloudantURL = 'https://account-name.cloudant.com/';
const userName = 'username';
const passwd = 'password';
const databaseName = 'movies-demo';
  
/*
 * This function returns the database connection.
 */
let getConnection = () => {
    return new Promise(function(resolve, reject) {
        cloudant({
            url : cloudantURL,
            account : userName,
            password  : passwd
        }, function(error, connection){
            if (connection)
                resolve (connection);
            else
                reject (error);
            })
    })
}

/*
 * This function gets the data from the view 'get-old-movies'
 * and  insert a new attribute in each document.
 */
let fetchDataFromDB = (connection) => {
    return new Promise(function(resolve, reject) {
        let objectArray = [];
        let dbName = connection.db.use (databaseName);
        dbName.view('data','get-old-movies', {'include_docs':true}, (error,result) => {
            if(result){
                result.rows.forEach((document) => { 
                    var dbDoc = document.doc;
                    /* 
                     * add the new attribute 'Old_movie:true' 
                     * to each document and push it to the array.
                     */
                    dbDoc.Old_movie = true;
                    objectArray.push(dbDoc);
                });
                resolve (objectArray);
            }else{
                reject (error);
            } 
        })
    })
}

/*
 * This function updates the document in 'movies-demo' database.
 */
let updateData = (documentObject, connection) => {
    return new Promise(function(resolve,reject) {
        let dbName = connection.db.use (databaseName);
        dbName.insert(documentObject, function(err, doc) {
            if(err) {
                reject(err);
            }else{
                resolve('200');
            }
         });
    })
}

/*
 * async main method for executing function in a 
 * synchronous/blocking manner.
 */
async function main(){
    let connection = await getConnection();
    let dataArray = await fetchDataFromDB(dbConnection);
    for(i in dataArray){
        try{
            /* 
             * Sequential execution for the document update.
             */
            let status = await updateData(dataArray[i], connection);
            if(status != '200'){
                console.log(`Error : ${status}`);
                break;
            }
        }catch(err) {
            console.log(err);
        }
    }
    console.log('>>> Finished !!');
}

/*
 * Execution starts from here.
 */
main();

Примечание. Этот сценарий узла решает проблемы, возникающие из-за неблокирующей природы nodejs по умолчанию. Но ждать! если каждый вызов обновления базы данных будет выполняться последовательно, не вызовет ли это значительной задержки? Да, было бы. Фактически, приведенный выше сценарий обновляет 3680 документов за 15 минут, что требует значительных затрат времени !!

Есть ли способ повысить скорость выполнения? Да, мы могли бы использовать API массовых операций Cloudant. Намного более эффективно массовое обновление нескольких документов в одном массовом запросе, чем их отправка в виде отдельного вызова. Теперь давайте перепрограммируем приведенный выше сценарий, чтобы включить массовую операцию, при этом мы нацелены на массовое обновление 250 документов одновременно, но синхронно.

Шаг 3:

Обновленный скрипт nodejs для обновления документов с помощью Cloudant Bulk API

const cloudant = require('@cloudant/cloudant');
const cloudantURL = 'https://account-name.cloudant.com/';
const userName = 'username';
const passwd = 'password';
const databaseName = 'movies-demo';
  
/*
 * This function returns the database connection.
 */
let getConnection = () => {
    return new Promise(function(resolve, reject) {
        cloudant({
            url : cloudantURL,
            account : userName,
            password  : passwd
        }, function(error, connection){
            if (connection)
                resolve (connection);
            else
                reject (error);
            })
    })
}

/*
 * This function gets the data from the view 'get-old-movies' 
 * and insert a new attribute in each document.
 */
let fetchDataFromDB = (connection) => {
    return new Promise(function(resolve, reject) {
        let objectArray = [];
        let dbName = connection.db.use (databaseName);
        dbName.view('data','get-old-movies', {'include_docs':true}, (error,result) => {
            if(result){
                result.rows.forEach((document) => { 
                    var dbDoc = document.doc;
                    /* 
                     * add the new attribute 'Old_movie:true' 
                     * to each document and push it to the array.
                     */
                    dbDoc.Old_movie = true;
                    objectArray.push(dbDoc);
                });
                resolve (objectArray);
            }else{
                reject (error);
            } 
        })
    })
}

/*
 * This function updates the document in 'movies-demo' database.
 */
let updateBulkData = (documentArray, connection) => {
    return new Promise(function(resolve,reject) {
        let dbName = connection.db.use (databaseName);
        dbName.bulk(documentArray, function(err, doc) {
            if(err) {
                reject(err);
            }else{
                resolve('200');
            }
         });
    })
}

/*
 * async main method for executing function in 
 * a synchronous/blocking manner.
 */
async function main(){
    let connection = await getConnection();
    let dataArray = await fetchDataFromDB(dbConnection);
    var i = 0;
    const size = 250;
    while (dataArray[i]) {
        try{
            /*
             * Sequential execution with 250 documents 
             * updates per request.
             */
            let status = await updateBulkData({docs: dataArray.splice(0, size)}, connection);
            if(status != '200'){
                console.log(`Error : ${status}`);
                break;
            }
        }catch(err) {
            console.log(err);
        }
    }
    console.log('>>> Finished !!');
}

/*
 * Execution starts from here.
 */
main();

Вывод:

Когда я протестировал этот модифицированный сценарий с помощью Cloudant bulk API, он смог обновить такое же количество документов 3680 за 10 секунд! по сравнению с более ранним скриптом, который занимал около 15 минут. . Этот модифицированный скрипт nodejs обеспечивает большую эффективность и производительность, когда дело доходит до пакетной обработки.