Мне приходится хранить около 250 числовых значений в секунду для каждого клиента, что составляет около 900 тыс. чисел в час. Вероятно, это не будет запись в течение всего дня (вероятно, от 5 до 10 часов в день), но я разделю свои данные на основе идентификатора клиента и дня чтения. Максимальная длина строки составляет около 22-23 м, что вполне приемлемо. Тем не менее, моя схема выглядит так:
CREATE TABLE measurement (
clientid text,
date text,
event_time timestamp,
value int,
PRIMARY KEY ((clientid,date), event_time)
);
Ключевое пространство имеет коэффициент репликации 2, просто для тестирования снитч GossipingPropertyFileSnitch
и NetworkTopologyStrategy
. Я знаю, что фактор репликации 3 больше соответствует производственному стандарту.
Затем я создал небольшой кластер на серверах компании, три виртуализированных машины с 2 процессорами x 2 ядра, 16 ГБ ОЗУ и большим пространством. Я с ними в гигабитной локальной сети. Кластер рабочий, на базе nodetool.
Вот код, который я использую для проверки моей настройки:
Cluster cluster = Cluster.builder()
.addContactPoint("192.168.1.100")
.addContactPoint("192.168.1.102")
.build();
Session session = cluster.connect();
DateTime time = DateTime.now();
BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);
try {
ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts
String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also
//generating the entries
for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
time = time.plus(4); //4ms between each entry
BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
batch.add(bound);
//The batch statement must have 65535 statements at most
if (batch.size() >= 65534) {
queryQueue.put(batch);
batch = new BatchStatement();
}
}
queryQueue.put(batch); //the last batch, perhaps shorter than 65535
//storing the data
System.out.println("Starting storing");
while (!queryQueue.isEmpty()) {
pool.execute(() -> {
try {
long threadId = Thread.currentThread().getId();
System.out.println("Started: " + threadId);
BatchStatement statement = queryQueue.take();
long start2 = System.currentTimeMillis();
session.execute(statement);
System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
} catch (Exception ex) {
System.out.println(ex.toString());
}
});
}
pool.shutdown();
pool.awaitTermination(120,TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println(ex.toString());
} finally {
session.close();
cluster.close();
}
Я придумал код, читая сообщения здесь и в других блогах и на веб-сайтах. Как я понял, клиенту важно использовать несколько потоков, поэтому я так и сделал. Я также пытался использовать асинхронные операции.
Суть в том, что независимо от того, какой подход я использую, один пакет выполняется за 5-6 секунд, хотя это может занять до 10. То же самое происходит, если я ввожу только один пакет (таким образом, только ~ 65 тыс. столбцов) или если я использую тупое однопоточное приложение. Честно говоря, я ожидал немного большего. Тем более, что я получаю более или менее схожую производительность на своем ноутбуке с локальным экземпляром.
Вторая, может быть, более важная проблема — это исключения, с которыми я сталкиваюсь непредсказуемым образом. Эти двое:
com.datastax.driver.core.exceptions.WriteTimeoutException: время ожидания Cassandra во время запроса на запись при согласованности ONE (требовалась 1 реплика, но только 0 подтвердили запись)
а также
com.datastax.driver.core.exceptions.NoHostAvailableException: все хосты, которые пытались запросить, не удалось (пробовали: /192.168.1.102:9042 (com.datastax.driver.core.TransportException: [/192.168.1.102:9042] Connection было закрыто), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Соединение было закрыто), /192.168.1.101:9042 (com.datastax.driver.core .TransportException: [/192.168.1.101:9042] Соединение закрыто))
В общем, я что-то не так делаю? Должен ли я реорганизовать способ загрузки данных или изменить схему. Я попытался уменьшить длину строки (поэтому у меня есть 12-часовые строки), но это не имело большого значения.
============================== Обновление:
Я был груб и забыл вставить пример кода, который я использовал после ответа на вопрос. Это работает достаточно хорошо, однако я продолжаю свои исследования с KairosDB и бинарной передачей с Astyanax. Похоже, что с ними я могу получить гораздо лучшую производительность по сравнению с CQL, хотя у KairosDB могут быть некоторые проблемы, когда он перегружен (но я работаю над этим), а Astyanax немного многословен для использования, на мой вкус. Тем не менее, вот код, может я где-то ошибся.
Номер слота семафора не влияет на производительность при превышении 5000, он почти постоянный.
String insertQuery = "insert into keyspace.measurement (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
Semaphore semaphore = new Semaphore(15000);
System.out.println("Starting " + Thread.currentThread().getId());
DateTime time = DateTime.parse("2015-01-05T12:00:00");
//generating the entries
long start = System.currentTimeMillis();
for (int i = 0; i < 900000; i++) {
BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
semaphore.acquire();
ResultSetFuture resultSetFuture = session.executeAsync(statement);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {
semaphore.release();
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("Error: " + throwable.toString());
semaphore.release();
}
});
time = time.plus(4); //4ms between each entry
}