Как я могу эффективно отправлять данные в параллельном режиме на конечную точку REST с помощью Spark?

У меня есть огромный файл, хранящийся в HDFS, mydata.txt, где каждая строка содержит данные, которые необходимо отправить в конечную точку REST. Мне интересно, как я могу эффективно группировать/разделять данные (строки в файле), а затем отправлять их в конечную точку REST, используя OkHttp. Я хочу сгруппировать/разделить данные, потому что не хочу создавать слишком много HTTP-клиентов, а также распределять рабочую нагрузку.

Например, в настоящее время у меня есть что-то вроде следующего.

val sc = new SparkContext(new SparkConf())
val client = new OkHttpClient
val input = "hdfs://myserver/path/to/mydata.txt"

sc.textFile(input)
 .foreach(line => {
  val request = new Request.Builder()
   .url("http://anotherserver/api/data")
   .post(RequestBody.create(MediaType.parse("application/json"), line))
   .build()
  client.newCall(request).execute()
 })

Насколько я понимаю, foreach - это Action, поэтому он вызывается в программе драйвера, поэтому client не нужно сериализовать и его можно использовать для всех данных (строк). Конечно, это решение не распараллелено.

Я тоже думал о разбиении, но думаю, что foreachPartition это тоже Action.

sc.textFile(input)
 .map(line => (Random.nextInt(10), line))
 .partitionBy(new HashPartitioner(10))
 .foreachPartition(iter => {
  while(iter.hasNext) {
   val item = iter.next()
   val line = item._2
   //submit to REST endpoint
  }
 })

Любые мысли о том, как распараллелить работу по отправке данных в конечную точку REST с помощью Spark?

EDIT Оказывается, OkHttpClient не сериализуем и даже не может использоваться внутри цикла foreach.


person Jane Wayne    schedule 26.07.2016    source источник


Ответы (1)


Типичный подход к решению этих типов проблем выглядит следующим образом:

  1. Убедитесь, что библиотека REST, которую вы хотите использовать, доступна для всех исполнителей. Это избавляет от необходимости беспокоиться о сериализации.

  2. Выберите уровень параллелизма по количеству ядер.

  3. Переразметьте свои данные так, чтобы #partitions >= k * #executors. При доступе к внешним службам с переменной пропускной способностью я использую большое значение k, например 5–10, чтобы уменьшить вероятность того, что пакет «медленных» входных данных замедлит всю работу.

  4. map() данные и настроить клиент внутри тела отображаемой функции, что устраняет проблемы с сериализацией. Возвращает пару входных данных и успех/неудача, а также любую диагностическую информацию.

  5. Отфильтруйте сбои и решите, что с ними делать, например, повторно обработайте их (вы даже можете вести подсчет повторных попыток).

Если настройка HTTP-клиента требует больших затрат, используйте mapPartitions() вместо map(), так как это позволит вам настроить клиент один раз и обрабатывать с его помощью множество входных данных.

Базовая версия:

def restCall(url: String): MyResultOrError = ...
val numCoresPerExecutor = ...
val numCores = numCoresPerExecutor * (sc.getExecutorStorageStatus.length - 1)
val result = rdd
  .repartition(5 * numCores)
  .map(url => (url, restCall(url)))
person Sim    schedule 27.07.2016
comment
Подход mapPartitions сработал для меня. Однако ничего не произошло, пока я не вызвал action как collect в самом конце. - person Jane Wayne; 27.07.2016
comment
Конечно, именно так работает Spark. Преобразования просто создают исполняемую DAG до тех пор, пока не будет вызвано действие. - person Sim; 28.07.2016