Эквивалент набора данных Spark для сбора scala с частичной функцией

Обычные коллекции scala имеют отличный метод collect, который позволяет мне выполнять операцию filter-map за один проход, используя частичную функцию. Есть ли эквивалентная операция на spark Datasets?

Я хотел бы это по двум причинам:

  • синтаксическая простота
  • он сокращает операции в стиле filter-map до одного прохода (хотя в spark я предполагаю, что есть оптимизации, которые определяют эти вещи для вас)

Вот пример, чтобы показать, что я имею в виду. Предположим, у меня есть последовательность параметров, и я хочу извлечь и удвоить только определенные целые числа (те, что в Some):

val input = Seq(Some(3), None, Some(-1), None, Some(4), Some(5)) 

Способ 1 — collect

input.collect {
  case Some(value) => value * 2
} 
// List(6, -2, 8, 10)

collect делает это довольно аккуратно синтаксически и выполняет один проход.

Способ 2 — filter-map

input.filter(_.isDefined).map(_.get * 2)

Я могу перенести этот тип шаблона на искру, потому что наборы данных и фреймы данных имеют аналогичные методы.

Но мне это не очень нравится, потому что isDefined и get кажутся мне запахом кода. Существует неявное предположение, что карта получает только Somes. Компилятор не может это проверить. В более крупном примере разработчику будет сложнее обнаружить это предположение, и разработчик может поменять фильтр и сопоставить, например, без получения синтаксической ошибки.

Способ 3 – fold* операций

input.foldRight[List[Int]](Nil) {
  case (nextOpt, acc) => nextOpt match {
    case Some(next) => next*2 :: acc
    case None => acc
  }
}

Я не использовал искру достаточно, чтобы знать, есть ли у fold эквивалент, так что это может быть немного тангенциально.

Как бы то ни было, совпадение шаблона, сгибание шаблона и перестроение списка — все смешалось вместе, и это трудно читать.


Так что в целом я считаю синтаксис collect самым приятным, и я надеюсь, что у искры есть что-то подобное.


person rmin    schedule 25.01.2017    source источник
comment
Метод collect, определенный для RDDs и Datasets, используется для материализации данных в программе-драйвере. Несмотря на отсутствие чего-то похожего на метод Collections API collect, ваша интуиция верна: поскольку обе операции оцениваются лениво, у движка есть возможность оптимизировать операции и связать их так, чтобы они выполнялись с максимальной локальностью.   -  person stefanobaghino    schedule 25.01.2017


Ответы (4)


Метод collect, определенный для RDDs и Datasets, используется для материализации данных в программе-драйвере.

Несмотря на отсутствие чего-то похожего на метод Collections API collect, ваша интуиция верна: поскольку обе операции оцениваются лениво, у движка есть возможность оптимизировать операции и связать их так, чтобы они выполнялись с максимальной локальностью.

Для варианта использования, который вы упомянули, в частности, я бы посоветовал вам принять во внимание flatMap, который работает как с RDDs, так и с Datasets:

// Assumes the usual spark-shell environment
// sc: SparkContext, spark: SparkSession
val collection = Seq(Some(1), None, Some(2), None, Some(3))
val rdd = sc.parallelize(collection)
val dataset = spark.createDataset(rdd)

// Both operations will yield `Array(2, 4, 6)`
rdd.flatMap(_.map(_ * 2)).collect
dataset.flatMap(_.map(_ * 2)).collect

// You can also express the operation in terms of a for-comprehension
(for (option <- rdd; n <- option) yield n * 2).collect
(for (option <- dataset; n <- option) yield n * 2).collect

// The same approach is valid for traditional collections as well
collection.flatMap(_.map(_ * 2))
for (option <- collection; n <- option) yield n * 2

ИЗМЕНИТЬ

Как правильно указано в другом вопросе, RDDs на самом деле имеют метод collect, который преобразует RDD, применяя частичную функцию, как это происходит в обычных коллекциях. Как документация Spark указывает, однако, что "этот метод следует использовать только в том случае, если ожидается, что результирующий массив будет небольшим, поскольку все данные загружаются в память драйвера".

person stefanobaghino    schedule 25.01.2017
comment
Спасибо за ответ @stefanobaghino! Кажется, пока у меня остается только метод 2, который мне не нравится. Несмотря на отсутствие сбора, есть ли более идиоматический и краткий способ решить мой пример для набора данных искры? - person rmin; 25.01.2017
comment
В частности, для случая, указанного в вашем ответе, подойдет flatMap. :-) val rdd = sc.parallelize(Seq(Some(1), None, Some(2), None, Some(3))); rdd.flatMap(_.map(_ * 2)).collect выведет Array(2, 4, 6). Вы также можете использовать for-comprehension. Я добавлю это к своему ответу. - person stefanobaghino; 25.01.2017
comment
Спасибо за обновление вашего ответа! Я забыл про for. - person rmin; 25.01.2017
comment
Я думаю, что сбор с PartialFunction не имеет этой проблемы... предупреждение находится в другом методе (собирать без аргументов). - person Carlos Verdes; 16.07.2017

Ответы здесь неверны, по крайней мере, для текущего Spark.

На самом деле в RDD есть метод сбора, который принимает частичную функцию и применяет к данным фильтр и карту. Это полностью отличается от метода .collect() без параметров. См. исходный код Spark RDD.scala @ строка 955:

/**
 * Return an RDD that contains all matching values by applying `f`.
 */
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  filter(cleanF.isDefinedAt).map(cleanF)
}

Это не материализует данные из RDD, в отличие от метода .collect() без параметров в RDD.scala @ строка 923:

/**
 * Return an array that contains all of the elements in this RDD.
 */
def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

В документации обратите внимание, как

def collect[U](f: PartialFunction[T, U]): RDD[U]

метод не имеет связанное с ним предупреждение о загрузке данных в память драйвера:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@collect[U](f:PartialFunction[T,U])(implicitevidence$29:scala.reflect.ClassTag[U]):org.apache.spark.rdd.RDD[U]

Spark очень сбивает с толку, когда эти перегруженные методы делают совершенно разные вещи.


редактировать: Моя ошибка! Я неправильно понял вопрос, мы говорим о наборах данных, а не о RDD. Тем не менее, принятый ответ говорит, что

«Однако в документации Spark указано, что «этот метод следует использовать только в том случае, если ожидается, что результирующий массив будет небольшим, поскольку все данные загружаются в память драйвера».

Что неверно! Данные не загружаются в память драйвера при вызове версии частичной функции .collect() - только при вызове версии без параметров. Вызов .collect(partial_function) должен иметь примерно ту же производительность, что и последовательный вызов .filter() и .map(), как показано в исходном коде выше.

person shoffing    schedule 06.04.2017
comment
Спасибо за ответ. Однако вопрос был о наборах данных, а не о rdd. В одном из других ответов упоминается, как вы можете превратить набор данных в rdd, а затем вызвать сбор. - person rmin; 07.04.2017
comment
Ах, простите, моя ошибка! Я отредактирую ответ, некоторым все же может быть полезно увидеть разницу между .collect() и .collect(pf). - person shoffing; 07.04.2017

Просто для полноты:

API RDD имеет такой метод, поэтому всегда можно преобразовать заданный набор данных/фрейм данных в RDD, выполнить операцию collect и преобразовать обратно, например:

val dataset = Seq(Some(1), None, Some(2)).toDS()
val dsResult = dataset.rdd.collect { case Some(i) => i * 2 }.toDS()

Однако это, вероятно, будет работать хуже, чем использование карты и фильтра в наборе данных (по причине, объясненной в ответе @stefanobaghino).

Что касается DataFrames, этот конкретный пример (с использованием Option) несколько вводит в заблуждение, так как преобразование в DataFrame фактически выполняет «выравнивание» параметров в их значения (или null для None), поэтому эквивалентное выражение будет таким:

val dataframe = Seq(Some(1), None, Some(2)).toDF("opt")
dataframe.withColumn("opt", $"opt".multiply(2)).filter(not(isnull($"opt")))

Который, я думаю, меньше страдает от вашего беспокойства о том, что операция карты «предполагает» что-либо о своем вводе.

person Tzach Zohar    schedule 25.01.2017

Я просто хотел расширить ответ стефанобагино, включив пример понимания for с классом case, так как многие варианты использования для этого, вероятно, будут включать классы case.

Также параметры являются монадами, что делает принятый ответ очень простым в этом случае, поскольку for аккуратно выбрасывает значения None, но этот подход не будет распространяться на не-монады, такие как классы case:

case class A(b: Boolean, i: Int, d: Double)

val collection = Seq(A(true, 3), A(false, 10), A(true, -1))
val rdd = ...
val dataset = ...

// Select out and double all the 'i' values where 'b' is true:
for {
  A(b, i, _) <- dataset
  if b
} yield i * 2
person rmin    schedule 25.01.2017