Кластер Cassandra с плохой производительностью вставки и стабильностью вставки

Мне приходится хранить около 250 числовых значений в секунду для каждого клиента, что составляет около 900 тыс. чисел в час. Вероятно, это не будет запись в течение всего дня (вероятно, от 5 до 10 часов в день), но я разделю свои данные на основе идентификатора клиента и дня чтения. Максимальная длина строки составляет около 22-23 м, что вполне приемлемо. Тем не менее, моя схема выглядит так:

CREATE TABLE measurement (
  clientid text,
  date text,
  event_time timestamp,
  value int,
  PRIMARY KEY ((clientid,date), event_time)
);

Ключевое пространство имеет коэффициент репликации 2, просто для тестирования снитч GossipingPropertyFileSnitch и NetworkTopologyStrategy. Я знаю, что фактор репликации 3 больше соответствует производственному стандарту.

Затем я создал небольшой кластер на серверах компании, три виртуализированных машины с 2 процессорами x 2 ядра, 16 ГБ ОЗУ и большим пространством. Я с ними в гигабитной локальной сети. Кластер рабочий, на базе nodetool.

Вот код, который я использую для проверки моей настройки:

        Cluster cluster = Cluster.builder()
                .addContactPoint("192.168.1.100")
                .addContactPoint("192.168.1.102")
                .build();
        Session session = cluster.connect();
        DateTime time = DateTime.now();
        BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);

    try {

        ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts

        String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement = session.prepare(insertQuery);
        BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also

        //generating the entries
        for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
            time = time.plus(4); //4ms between each entry
            BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
            batch.add(bound);

            //The batch statement must have 65535 statements at most
            if (batch.size() >= 65534) {
                queryQueue.put(batch);
                batch = new BatchStatement();
            }
        }
        queryQueue.put(batch); //the last batch, perhaps shorter than 65535

        //storing the data
        System.out.println("Starting storing");
        while (!queryQueue.isEmpty()) {
            pool.execute(() -> {
                try {

                    long threadId = Thread.currentThread().getId();
                    System.out.println("Started: " + threadId);
                    BatchStatement statement = queryQueue.take();
                    long start2 = System.currentTimeMillis();
                    session.execute(statement);
                    System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
                } catch (Exception ex) {
                    System.out.println(ex.toString());
                }
            });

        }
        pool.shutdown();
        pool.awaitTermination(120,TimeUnit.SECONDS);


    } catch (Exception ex) {
        System.out.println(ex.toString());
    } finally {
        session.close();
        cluster.close();
    }

Я придумал код, читая сообщения здесь и в других блогах и на веб-сайтах. Как я понял, клиенту важно использовать несколько потоков, поэтому я так и сделал. Я также пытался использовать асинхронные операции.

Суть в том, что независимо от того, какой подход я использую, один пакет выполняется за 5-6 секунд, хотя это может занять до 10. То же самое происходит, если я ввожу только один пакет (таким образом, только ~ 65 тыс. столбцов) или если я использую тупое однопоточное приложение. Честно говоря, я ожидал немного большего. Тем более, что я получаю более или менее схожую производительность на своем ноутбуке с локальным экземпляром.

Вторая, может быть, более важная проблема — это исключения, с которыми я сталкиваюсь непредсказуемым образом. Эти двое:

com.datastax.driver.core.exceptions.WriteTimeoutException: время ожидания Cassandra во время запроса на запись при согласованности ONE (требовалась 1 реплика, но только 0 подтвердили запись)

а также

com.datastax.driver.core.exceptions.NoHostAvailableException: все хосты, которые пытались запросить, не удалось (пробовали: /192.168.1.102:9042 (com.datastax.driver.core.TransportException: [/192.168.1.102:9042] Connection было закрыто), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Соединение было закрыто), /192.168.1.101:9042 (com.datastax.driver.core .TransportException: [/192.168.1.101:9042] Соединение закрыто))

В общем, я что-то не так делаю? Должен ли я реорганизовать способ загрузки данных или изменить схему. Я попытался уменьшить длину строки (поэтому у меня есть 12-часовые строки), но это не имело большого значения.

============================== Обновление:

Я был груб и забыл вставить пример кода, который я использовал после ответа на вопрос. Это работает достаточно хорошо, однако я продолжаю свои исследования с KairosDB и бинарной передачей с Astyanax. Похоже, что с ними я могу получить гораздо лучшую производительность по сравнению с CQL, хотя у KairosDB могут быть некоторые проблемы, когда он перегружен (но я работаю над этим), а Astyanax немного многословен для использования, на мой вкус. Тем не менее, вот код, может я где-то ошибся.

Номер слота семафора не влияет на производительность при превышении 5000, он почти постоянный.

String insertQuery = "insert into keyspace.measurement     (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement =     session.prepare(insertQuery);
        Semaphore semaphore = new Semaphore(15000);

    System.out.println("Starting " + Thread.currentThread().getId());
    DateTime time = DateTime.parse("2015-01-05T12:00:00");
    //generating the entries
    long start = System.currentTimeMillis();

    for (int i = 0; i < 900000; i++) { 

        BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
        semaphore.acquire();
        ResultSetFuture resultSetFuture = session.executeAsync(statement);
        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {

                semaphore.release();
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("Error: " + throwable.toString());
                semaphore.release();
            }
        });
        time = time.plus(4); //4ms between each entry
    }

person Aleksandar Stojadinovic    schedule 12.01.2015    source источник


Ответы (1)


Каковы ваши результаты при использовании нерегистрируемой пакетной обработки? Вы уверены, что вообще хотите использовать пакетные операторы? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

person Stefan Podkowinski    schedule 12.01.2015
comment
Не кардинально отличается. Я почти уверен, что хочу использовать пакет, потому что я уже работал над подобными вещами в других проектах, и операторы один за другим обычно были медленнее. В любом случае нет смысла делать это быстрее. - person Aleksandar Stojadinovic; 12.01.2015
comment
Спод прав. Пакеты в Cassandra не являются оптимизацией производительности. Зарегистрированные пакеты следует использовать только в том случае, если требуется атомарность и существует потеря производительности для достижения атомарной записи. Даже незарегистрированные пакеты часто медленнее, чем прямые асинхронные запросы, они, по сути, требуют ненужной координации (если только вы не выполняете пакетирование по ключу и не используете токен — возможно, вы здесь). В любом случае я склонен рекомендовать прямую асинхронную запись. Вот еще одна статья, подтверждающая это мнение:lostechies.com/ryansvihla/2014/08/28/ - person phact; 12.01.2015
comment
Что касается ваших тайм-аутов, это произойдет, когда вы начнете перегружать свои узлы c* слишком большим количеством операций записи. Это легко сделать с помощью асинхронных запросов, поскольку ваша программа генерирует записи так быстро, как только может, без остановок. После удаления ваших пакетов (особенно ведения журнала) вы должны увидеть улучшение, но вам, возможно, придется ограничить свои записи или даже увеличить время ожидания, если это позволяет ваше соглашение об уровне обслуживания. - person phact; 12.01.2015
comment
Хорошо, это интересно. Я думал, что пакетная обработка снижает нагрузку на сеть. Есть ли какая-то особая техника, которую я мог бы использовать для дросселирования? Я думал о семафоре. Вероятно, это означает, что я должен вручную настроить количество параллельных асинхронных операций записи. К каким цифрам я должен стремиться и назвать их хорошими? Я думаю, что увеличение тайм-аута в порядке. Вероятно, будет промежуточный шаг между получением сервером данных и их загрузкой в ​​базу данных (может быть, простой файловый кеш или что-то в этом роде). - person Aleksandar Stojadinovic; 13.01.2015
comment
Подводя итог, вы были правы. Предполагаемые сетевые накладные расходы (если таковые имеются?) не являются причиной для использования пакетных операторов. Я вернулся к более простому решению с асинхронными операциями и достиг результатов тестов, около 30 тыс. операций в кластере из 3 узлов. Я использовал семафор для управления перегрузкой, и я мог легко увеличить его до 10000 слотов, но я заметил, что предел производительности намного ниже (но он все еще стабилен). Таймауты не трогал. Также убедитесь, что у вас есть надежное проводное соединение с серверами! До всего этого я был в WiFi, не хотел веселья. Большое спасибо вам обоим. - person Aleksandar Stojadinovic; 14.01.2015
comment
@AleksandarStojadinovic, пожалуйста, покажите пример вашего кода для вставки - у меня такая же проблема - person sedovav; 28.01.2015
comment
@sedovav, я добавил код, попробуйте. Конечно, всем остальным предлагается взглянуть, может я что-то упустил. - person Aleksandar Stojadinovic; 30.01.2015
comment
спасибо @AleksandarStojadinovic! Но мне никогда не удавалось избавиться от этого исключения, используя тот же подход с семафором. Я всегда получаю com.datastax.driver.core.exceptions.NoHostAvailableException: все хосты, которые пытались выполнить запрос, не удалось (пробовали: /172.26.30.96 (com.datastax.driver.core.exceptions.DriverException: тайм-аут во время чтения), / 172.26.30.36 (com.datastax.driver.core.exceptions.DriverException: тайм-аут во время чтения)) после вставки нескольких тысяч строк. - person sedovav; 30.01.2015
comment
@sedovav попробуйте уменьшить значение семафора до 1 и посмотрите, сохраняется ли проблема. Впоследствии вы можете попытаться увеличить его шаг за шагом. - person Stefan Podkowinski; 30.01.2015
comment
@AleksandarStojadinovic Я пробовал это до тех пор, пока не был превышен предел накладных расходов GC. Я думаю, что DriverException: Timeout во время чтения как-то связан с настройками Cassandra.yaml... - person sedovav; 30.01.2015
comment
Если это не удалось с размером семафора = 1, как сказал Стефан, у вас есть другая проблема. Единственный раз, когда у меня был превышен лимит накладных расходов GC, это когда я использовал выполнение синхронизации и действительно сошел с ума от размера вставки. - person Aleksandar Stojadinovic; 30.01.2015