Как использовать функцию асинхронной / пакетной записи с драйвером Datastax Java

Я планирую использовать драйвер Datastax Java для записи в Cassandra. Меня в основном интересовали функции Batch Writes и Asycnhronous драйвера Java Datastax, но я не могу получить никаких руководств, которые могли бы объяснить мне, как включить эти функции в мой приведенный ниже код, который использует драйвер Datastax Java ..

/**
 * Performs an upsert of the specified attributes for the specified id.
 */
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {

    try {

        // make a sql here using the above input parameters.

        String sql = sqlPart1.toString()+sqlPart2.toString();

        DatastaxConnection.getInstance();
        PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
        prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

        BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));

        DatastaxConnection.getSession().execute(query);

    } catch (InvalidQueryException e) {
        LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
    } catch (Exception e) {
        LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
    }
}

В приведенном ниже коде я создаю подключение к узлам Cassandra с помощью драйвера Datastax Java.

/**
 * Creating Cassandra connection using Datastax Java driver
 *
 */
private DatastaxConnection() {

    try{
        builder = Cluster.builder();
        builder.addContactPoint("some_nodes");

        builder.poolingOptions().setCoreConnectionsPerHost(
                HostDistance.LOCAL,
                builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));

        cluster = builder
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                .build();

        StringBuilder s = new StringBuilder();
        Set<Host> allHosts = cluster.getMetadata().getAllHosts();
        for (Host h : allHosts) {
            s.append("[");
            s.append(h.getDatacenter());
            s.append(h.getRack());
            s.append(h.getAddress());
            s.append("]");
        }
        System.out.println("Cassandra Cluster: " + s.toString());

        session = cluster.connect("testdatastaxks");

    } catch (NoHostAvailableException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    } catch (Exception e) {

    }
}

Может ли кто-нибудь помочь мне в том, как добавить пакетную запись или асинхронные функции в мой приведенный выше код .. Спасибо за помощь ..

Я использую Cassandra 1.2.9


person arsenal    schedule 05.10.2013    source источник


Ответы (2)


Для asynch это так же просто, как использовать функцию executeAsync:

...
DatastaxConnection.getSession().executeAsync(query);

Для пакета вам нужно построить запрос (я использую строки, потому что компилятор действительно хорошо знает, как оптимизировать конкатенацию строк):

String cql =  "BEGIN BATCH "
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "APPLY BATCH; "

DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:                     
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");

DatastaxConnection.getSession().execute(query);

Кстати, я думаю, что ваша привязка оператора может выглядеть примерно так, если вы меняете атрибуты на список карт, где каждая карта представляет собой обновление / вставку внутри пакета:

BoundStatement query = prepStatement.bind(userId,
                                          attributesList.get(0).values().toArray(new Object[attributes.size()]), 
                                          userId_2,
                                          attributesList.get(1).values().toArray(new Object[attributes.size()])); 
person Lyuben Todorov    schedule 06.10.2013
comment
Есть ли способ сделать это с помощью именованных параметров? - person Highstead; 30.03.2014
comment
@Highstead Какой язык программирования? Выше указано java, поэтому (вроде нет) - person Lyuben Todorov; 30.03.2014
comment
Я был сосредоточен на python, но предполагал, что если есть способ сделать это в одном, то можно будет сделать это и в другом. Старый драйвер cql поддерживает его, но считается устаревшим. Итак, я искал замену функциональности. - person Highstead; 31.03.2014
comment
@Highstead Python = yes для именованных параметров, пример здесь, используя более новый драйвер Python DataStax. - person Lyuben Todorov; 31.03.2014
comment
Это сделано на стороне сервера или на стороне клиента? Я склонен угадывать клиентскую сторону с синтаксисом% (p_name) s. - person Highstead; 31.03.2014
comment
Это клиентский драйвер, так что это клиентская сторона. Вы делаете это в своем собственном коде, который затем загружает новые данные на ваш сервер Cassandra. - person Lyuben Todorov; 31.03.2014
comment
Беспокойство в основном вызывает инъекция, или это то, за что я несу кассандру? - person Highstead; 01.04.2014
comment
Будьте конкретны, вы имеете в виду инъекцию кода или SQL-инъекцию? Для последнего вы хотите использовать подготовленный заявления. Вы не можете внедрить код через cql3. - person Lyuben Todorov; 01.04.2014
comment
Вы используете строковые запросы, но есть ли способ использовать QueryBuilder и подготовить / связать несколько операторов, а затем выполнить их в пакетном режиме? Насколько я понимаю, пока это невозможно ... - person VHristov; 29.04.2014
comment
@LyubenTodorov У меня также есть аналогичный вопрос здесь. Если возможно, вы можете мне помочь? - person john; 09.10.2014

В примере, приведенном в ответе Любена, установка определенных атрибутов пакета, таких как Type.COUNTER (если вам нужно обновить счетчики) с использованием строк, не сработает. Вместо этого вы можете организовать свои подготовленные операторы в пакетном порядке следующим образом:

final String insertQuery = "INSERT INTO test.prepared (id, col_1) VALUES (?,?);";
final PreparedStatement prepared = session.prepare(insertQuery);

final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
batch.add(prepared.bind(userId1, "something"));
batch.add(prepared.bind(userId2, "another"));
batch.add(prepared.bind(userId3, "thing"));

session.executeAsync(batch);
person cfeduke    schedule 11.11.2014
comment
Мне это нравится больше, чем принятый ответ. Здесь содержимое пакета может быть динамическим (по сравнению с фиксированным CQL и количеством аргументов в принятом ответе) - person 0cd; 26.07.2016
comment
Я считаю, что это плохой код (по состоянию на 2019 год). BatchStatement неизменен. Вам нужно batch = batch.add (... - person Tony Schwartz; 11.10.2019