Пул соединений Spark - правильный ли это подход

У меня есть задание Spark в Structured Streaming, которое использует данные из Kafka и сохраняет их в InfluxDB. Я реализовал механизм пула соединений следующим образом:

object InfluxConnectionPool {
      val queue = new LinkedBlockingQueue[InfluxDB]()

      def initialize(database: String): Unit = {
        while (!isConnectionPoolFull) {
          queue.put(createNewConnection(database))
        }
      }

      private def isConnectionPoolFull: Boolean = {
        val MAX_POOL_SIZE = 1000
        if (queue.size < MAX_POOL_SIZE)
          false
        else
          true
      }

      def getConnectionFromPool: InfluxDB = {
        if (queue.size > 0) {
          val connection = queue.take()
          connection
        } else {
          System.err.println("InfluxDB connection limit reached. ");
          null
        }

      }

      private def createNewConnection(database: String) = {
        val influxDBUrl = "..."
        val influxDB = InfluxDBFactory.connect(...)
        influxDB.enableBatch(10, 100, TimeUnit.MILLISECONDS)
        influxDB.setDatabase(database)
        influxDB.setRetentionPolicy(database + "_rp")
        influxDB
      }

      def returnConnectionToPool(connection: InfluxDB): Unit = {
        queue.put(connection)
      }
    }

В моей искровой работе я делаю следующее

def run(): Unit = {

val spark = SparkSession
  .builder
  .appName("ETL JOB")
  .master("local[4]")
  .getOrCreate()


 ...

 // This is where I create connection pool
InfluxConnectionPool.initialize("dbname")

val sdvWriter = new ForeachWriter[record] {
  var influxDB:InfluxDB = _

  def open(partitionId: Long, version: Long): Boolean = {
    influxDB = InfluxConnectionPool.getConnectionFromPool
    true
  }
  def process(record: record) = {
    // this is where I use the connection object and save the data
    MyService.saveData(influxDB, record.topic, record.value)
    InfluxConnectionPool.returnConnectionToPool(influxDB)
  }
  def close(errorOrNull: Throwable): Unit = {
  }
}

import spark.implicits._
import org.apache.spark.sql.functions._

//Read data from kafka
val kafkaStreamingDF = spark
  .readStream
  ....

val sdvQuery = kafkaStreamingDF
  .writeStream
  .foreach(sdvWriter)
  .start()
  }

Но когда я запускаю задание, я получаю следующее исключение

18/05/07 00:00:43 ERROR StreamExecution: Query [id = 6af3c096-7158-40d9-9523-13a6bffccbb8, runId = 3b620d11-9b93-462b-9929-ccd2b1ae9027] terminated with error
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 8, 192.168.222.5, executor 1): java.lang.NullPointerException
        at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:332)
        at com.abc.telemetry.app.influxdb.InfluxConnectionPool$.returnConnectionToPool(InfluxConnectionPool.scala:47)
        at com.abc.telemetry.app.ETLappSave$$anon$1.process(ETLappSave.scala:55)
        at com.abc.telemetry.app.ETLappSave$$anon$1.process(ETLappSave.scala:46)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:53)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)

NPE — это когда соединение возвращается в пул соединений в queue.put(connection). Что мне здесь не хватает? Любая помощь приветствуется.

P.S: В обычном подходе DStreams я сделал это с помощью метода foreachPartition. Не знаете, как сделать повторное использование/объединение соединений со структурированной потоковой передачей.


person KarthikJ    schedule 07.05.2018    source источник


Ответы (2)


Аналогичным образом я использую forEachWriter для Redis, где пул упоминается только в процессе. Ваш запрос будет выглядеть примерно так:

def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  def process(record: record) = {
    influxDB = InfluxConnectionPool.getConnectionFromPool
    // this is where I use the connection object and save the data
    MyService.saveData(influxDB, record.topic, record.value)
    InfluxConnectionPool.returnConnectionToPool(influxDB)
  }```
person sam22    schedule 05.02.2019

datasetOfString.writeStream.foreach(new ForeachWriter[String] {
      def open(partitionId: Long, version: Long): Boolean = {
        // open connection
      }
      def process(record: String) = {
        // write string to connection
      }
      def close(errorOrNull: Throwable): Unit = {
        // close the connection
      }
    })

Из документов ForeachWriter,

Each task will get a fresh serialized-deserialized copy of the provided object

Поэтому все, что вы инициализируете вне ForeachWriter, будет работать только в драйвере.

Вам нужно инициализировать пул соединений и открыть соединение в методе open.

person arunmahadevan    schedule 28.06.2018
comment
Если вы initialize the connection pool and open the connection in the open method., для каждой задачи будет создан новый пул соединений, возможно, не стоит запускать пул соединений в открытом методе. - person Naveen Cotha; 10.11.2018