Сбой задания искры Apache сразу же без повторной попытки, настройка maxFailures не работает

Я тестировал программу сканирования/удаления веб-страниц на Apache Spark локально на своем компьютере.

программа использует несколько преобразований RDD, которые берут изменчивую функцию, которая время от времени дает сбой. (Цель функции — преобразовать URL-ссылки в веб-страницы, иногда вызываемый ею безголовый браузер просто отключается или перегружается — этого я не могу избежать)

Я слышал, что Apache Spark имеет мощную функцию аварийного переключения и повторной попытки, любое неудачное преобразование или потерянные данные могут быть пересчитаны с нуля из любого ресурса, который они могут найти (звучит как волшебство, верно?). код.

Это моя конфигурация искры:

val conf = new SparkConf().setAppName("MoreLinkedIn")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","40") //definitely enough

К сожалению, работа не удалась после того, как большинство этапов и отдельных задач были выполнены успешно. Последний журнал в консоли показывает:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:7 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.openqa.selenium.TimeoutException: Timed out after 50 seconds waiting for...

Похоже, Спарк просто трусливо сдался после неудачной попытки. Как правильно его настроить, чтобы сделать его более живучим?

(мою программу можно скачать с https://github.com/tribbloid/spookystuff, извините за скудный и неорганизованный код/документация, я просто запускаю его на несколько дней)

ДОБАВЬТЕ: если вы хотите попробовать это сами, следующий код может продемонстрировать эту проблему:

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local[*]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
conf.setJars(SparkContext.jarOfClass(this.getClass).toList)
conf.set("spark.task.maxFailures","400000")
val sc = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 8
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
  val x = java.lang.Math.random()
  if (x > 0.9) throw new IllegalStateException("the map has a chance of 10% to fail")
  x
}.reduce(_ + _)
sc.stop()
println("finished")
}

Следует отметить, что одно и то же IllegalStateException повторялось 32 раза в этом сообщении: data">Apache Spark выдает java.lang.IllegalStateException: непрочитанные данные блока


person tribbloid    schedule 09.06.2014    source источник
comment
Пожалуйста, дайте несколько контекстных строк вокруг Exception. У меня есть подозрение, что генерируемые пользователем исключения не приводят к повторным попыткам, поскольку в 90% случаев такие исключения возникают из-за ошибки в пользовательском коде, но я не совсем уверен.   -  person samthebest    schedule 09.06.2014
comment
Я надеюсь, что вы правы, поскольку иногда пользователь действительно хочет, чтобы задание не выполнялось быстро, не занимая слишком много времени. Однако я не знаю, как указать строки «контекста». Не могли бы Вы уточнить?   -  person tribbloid    schedule 09.06.2014
comment
Мое недавнее редактирование добавило минимальное искровое приложение, которое показывает проблему, если вы можете запустить его, не видя: Исключение в потоке main org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0.0:7 не удалась 1 раз, большинство недавний сбой: сбой исключения в TID 7 на локальном хосте: java.lang.IllegalStateException: вероятность сбоя карты составляет 10%. Тогда ваша искра работает правильно   -  person tribbloid    schedule 09.06.2014


Ответы (3)


Я знаю, что это очень старый вопрос, но у меня была точно такая же проблема, и я столкнулся с этим вопросом при поиске решения.

Существует 3 основных формата URL для отправки приложения Spark в локальном режиме:

  • local - один поток (без параллелизма), без повторов
  • local[K] (или local[*]) — использует K (или количество ядер) рабочих потоков и задает task.maxFailures значение 1 (см. здесь)

  • local[K, F] (или local[*, F]) — задает task.maxFailures=F, и это то, что нам нужно.

Дополнительные сведения см. в документации Spark.

person botchniaque    schedule 08.12.2017

Позвольте мне направить наиболее авторитетный ответ:

Если это полезная функция для локального режима, мы должны открыть JIRA, чтобы задокументировать настройку или улучшить ее (я бы предпочел добавить свойство spark.local.retries вместо специального формата URL). Мы изначально отключили его для всего, кроме модульных тестов, потому что в 90% случаев исключение в локальном режиме означает проблему в приложении, и мы бы предпочли, чтобы пользователь сразу отлаживал ее, а не повторял задачу несколько раз и заставлял их беспокоиться. о том, почему они получают так много ошибок.

Матей

person tribbloid    schedule 10.06.2014
comment
Пожалуйста, не могли бы вы добавить ссылку на то, где вы нашли этот ответ. - person samthebest; 10.06.2014
comment
Извините, я не видел ваш комментарий до сих пор, я прикрепляю ссылку прямо здесь: apache-spark-user-list.1001560.n3.nabble.com/ - person tribbloid; 15.06.2014

Это работает для меня -

sparkConfig
.set("spark.task.maxFailures", "2")
.set("spark.master", "local[2, 2]")

Мне пришлось установить оба, чтобы увидеть, как моя задача не удалась (при генерации исключения), а затем повторить попытку в локальной тестовой среде.

person Ravindra    schedule 28.01.2019