ОШИБКА com.websudos.phantom - Пакет слишком велик

Я получаю следующую ошибку:

22:24:34.419 [run-main-0] DEBUG com.websudos.phantom - Executing query: com.datastax.driver.core.BatchStatement@3f4f5b68
22:24:34.426 [pool-15-thread-3] ERROR com.websudos.phantom - Batch too large
[error] (run-main-0) com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large

Повторно запускайте код и каждый раз получайте эту ошибку в следующем месте:

cqlsh> select count(*) from superchain.blocks  limit 1000000;

 count
-------
 51728

(1 rows)

Warnings :
Aggregation query used without partition key

Заранее благодарим за любые идеи.

+++ ОБНОВЛЕНИЯ +++

Итак, код нарушения

//This file is Database.scala
class Database(val keyspace: KeySpaceDef) extends DatabaseImpl(keyspace) {
  def insertBlock(block: Block) = {
  //should note here that have also tried Batch.unlogged to same effect
    Batch.logged
      .add(ChainDatabase.block.insertNewRecord(block))
      .future()
  }

  def insertTransaction(tx: Transaction) = {
  //should note here that have also tried Batch.unlogged to same effect
    Batch.logged
      .add(ChainDatabase.tx.insertNewTransaction(tx))
      .future()
  }

  object block extends BlockTable with keyspace.Connector

  object tx extends TransactionTable with keyspace.Connector


}

object ChainDatabase extends Database(Config.keySpaceDefinition)

Ниже показаны функции вставки для транзакции и аналогичный код для блока.

Пытался следовать

https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e#.7zdd0qopv

&&

https://github.com/outworkers/phantom/wiki/Batch-statements

Но я все еще пытаюсь найти реализацию, которая бы не приводила к Batch too large ошибкам.

//This file is Transaction.scala
abstract class TransactionTable extends TransactionColumnFamily with RootConnector {

  override val tableName = "transactions"

  def insertNew(tx: Transaction): Future[ResultSet] = insertNewTransaction(tx).future()

  def insertNewTransaction(tx: Transaction) = {
    insert
      .value(_.txid, tx.txid)
      .value(_.version, tx.version)
      .value(_.locktime, tx.locktime)
      .value(_.vout, tx.vout)
      .value(_.vin, tx.vin)
  }

}

person dan-mi-sun    schedule 05.04.2016    source источник


Ответы (3)


Вы получаете ошибку не из-за размера таблицы, а из-за количества запросов в пакете. Вы можете запустить не более 100 запросов одновременно в любом заданном пакете.

В то же время вы почти на 99% используете здесь менее оптимальный подход, поскольку на самом деле вам никогда не нужно столько запросов в одном пакете. Как предполагает Тьяго, пакеты предназначены для обеспечения атомарности, а не для оптимизации производительности.

Если вы хотите просто выполнять параллельные запросы, просто используйте Future.sequence, который будет использовать подход типа группового объединения для параллелизации операций.

Ошибка от Кассандры, а не от фантома. Неважно, какой подход вы используете на клиенте, размер пакета ограничен.

// Assuming you have a list of queries:
val execution = Future.sequence(queries map (_.future())

Надеюсь это поможет!

Обновить

Допустим, у вас есть список транзакций.

val list: List[Transaction] = ..
// all you need is
Future.sequence(list.map(tr => database.transactionTable.insertNew(tr))

Это создаст будущее, которое завершится, когда все базовые фьючерсы будут завершены, что фактически даст вам тип возврата: Future[List[ResultSet]] из исходного List[Future[ResultSet]].

person flavian    schedule 06.04.2016
comment
@ thiago-pereira & flavian: обновили вопрос еще раз в надежде получить конкретный пример. Если вы хотите просто выполнять параллельные запросы, просто используйте Future.sequence, который будет использовать подход типа объединенного пула вилки для параллелизации операций. - person dan-mi-sun; 11.04.2016

Возможно, вы неправильно поняли назначение партий в Кассандре.

На самом деле они для атомарности, чтобы не выполнять несколько запросов «быстрее».

Хорошее объяснение можно найти здесь:

https://lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/

person Thiago Pereira    schedule 06.04.2016

Как уже говорили другие, ваше первое сообщение об ошибке исходит от очень большого оператора BATCH. Операторы BATCH не предназначены для пакетной вставки, как вы могли бы подумать в традиционной реляционной базе данных. Операторы BATCH полезны только при АТОМИЧЕСКОЙ вставке данных в несколько денормализованных таблиц или при использовании UNLOGGED BATCH для вставки данных под одним и тем же ключом раздела.

Пакетные операторы НЕ должны использоваться в качестве метод оптимизации, так как они не рассчитаны на скорость и фактически ухудшат вашу производительность.

В конце концов, это сообщение об ОШИБКЕ, поскольку драйвер клиента Cassandra пытается защитить кластер от очень большого оператора BATCH, который может (и будет) отключать узлы в вашем кластере.

Во-вторых, вы указываете, что запуск SELECT count(*) FROM table; выдает предупреждение:

Aggregation query used without partition key.

Использование count(*) без указания ключа раздела является антипаттерном. По тем же причинам, что и выше, это может негативно повлиять на стабильность вашего кластера.

Наконец, я подозреваю, что где-то в вашей библиотеке Cassandra DSL (не знакомой с Phantom-DSL) он выполняет BATCH там, где вы этого не ожидаете, или вы можете сознательно использовать BATCH, не полностью понимая его правильное использование. Я знаю, что в spring-data они используют BATCH, когда вы вставляете список элементов (что является ужасным антипаттерном), что может привести к аналогичной ошибке.

person fromanator    schedule 06.04.2016