Как получить такие метрики, как размер вывода и записи, написанные из пользовательского интерфейса Spark?

Как мне собрать эти метрики на консоли (Spark Shell или Spark submit job) сразу после выполнения задачи или задания.

Мы используем Spark для загрузки данных из Mysql в Cassandra, и он довольно большой (например: ~ 200 ГБ и 600 млн строк). Когда задача выполнена, мы хотим проверить, сколько строк было обработано искрой? Мы можем получить число из пользовательского интерфейса Spark, но как мы можем получить это число («Записанные выходные записи») из оболочки Spark или в задании отправки искры.

Пример команды для загрузки из Mysql в Cassandra.

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))

Я хочу получить все метрики пользовательского интерфейса Spark для вышеуказанной задачи, в основном размер вывода и количество записанных записей.

Пожалуйста помоги.

Спасибо за ваше время!


comment
Вы имеете в виду, что вы можете найти метрики в пользовательском интерфейсе искры, но я не нашел его с аналогичным кодом (читайте исходный код jdbc), где метрики отображаются в пользовательском интерфейсе?   -  person Tom    schedule 04.09.2018
comment
Он отображается в пользовательском интерфейсе приложения Spark, обычно под заданиями и под этапами. Вы можете видеть статистику, информацию об исполнителе и информацию об отдельных задачах, например, сколько данных читает каждая задача, сколько произвольно записывает каждая задача и т. Д.   -  person Ajay Guyyala    schedule 05.09.2018
comment
Спасибо @ ajay-guyyala. Мне не повезло разглядеть UI. Я выясню, что происходит.   -  person Tom    schedule 06.09.2018
comment
Вот несколько примеров изображений, которые я нашел. Он может не отображать показатели для всех заданий / этапов. Также это зависит от того, какую версию Spark мы используем. В то время, когда я размещал этот пост, мы использовали Spark 1.5.x или 1.6.x. google.com/   -  person Ajay Guyyala    schedule 07.09.2018
comment
@AjayGuyyala Не могли бы вы получить эти данные из пользовательского интерфейса искры? У меня также есть те же требования, когда я должен получить некоторые полезные данные из пользовательского интерфейса искры в мою java-код.   -  person Akash Patel    schedule 10.10.2020


Ответы (1)


Нашел ответ. Вы можете получить статистику с помощью SparkListener.

Если ваша работа не имеет показателей ввода или вывода, вы можете получить исключения None.get, которые можно безопасно игнорировать, указав if stmt.

sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.get.recordsRead}
    if(metrics.outputMetrics != None){
      outputWritten += metrics.outputMetrics.get.recordsWritten }
  }
})

Пожалуйста, найдите приведенный ниже пример.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

val conf = new SparkConf()
.set("spark.cassandra.connection.host", "...")
.set("spark.driver.allowMultipleContexts","true")
.set("spark.master","spark://....:7077")
.set("spark.driver.memory","1g")
.set("spark.executor.memory","10g")
.set("spark.shuffle.spill","true")
.set("spark.shuffle.memoryFraction","0.2")
.setAppName("CassandraTest")
sc.stop
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

var outputWritten = 0L

sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.get.recordsRead}
    if(metrics.outputMetrics != None){
      outputWritten += metrics.outputMetrics.get.recordsWritten }
  }
})

val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))

println("outputWritten",outputWritten)

Результат:

scala> println("outputWritten",outputWritten)
(outputWritten,16383)
person Ajay Guyyala    schedule 27.04.2016