Я тестировал программу сканирования/удаления веб-страниц на Apache Spark локально на своем компьютере.
программа использует несколько преобразований RDD, которые берут изменчивую функцию, которая время от времени дает сбой. (Цель функции — преобразовать URL-ссылки в веб-страницы, иногда вызываемый ею безголовый браузер просто отключается или перегружается — этого я не могу избежать)
Я слышал, что Apache Spark имеет мощную функцию аварийного переключения и повторной попытки, любое неудачное преобразование или потерянные данные могут быть пересчитаны с нуля из любого ресурса, который они могут найти (звучит как волшебство, верно?). код.
Это моя конфигурация искры:
val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough
К сожалению, работа не удалась после того, как большинство этапов и отдельных задач были выполнены успешно. Последний журнал в консоли показывает:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...
Похоже, Спарк просто трусливо сдался после неудачной попытки. Как правильно его настроить, чтобы сделать его более живучим?
(мою программу можно скачать с https://github.com/tribbloid/spookystuff, извините за скудный и неорганизованный код/документация, я просто запускаю его на несколько дней)
ДОБАВЬТЕ: если вы хотите попробовать это сами, следующий код может продемонстрировать эту проблему:
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
val x = java.lang.Math.random()
if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
x
}.reduce(_ + _)
sc.stop()
println("finished")
}
Следует отметить, что одно и то же IllegalStateException повторялось 32 раза в этом сообщении: data">Apache Spark выдает java.lang.IllegalStateException: непрочитанные данные блока