Производительность фильтра Spark DataSet

Я экспериментировал с разными способами фильтрации набора типизированных данных. Оказывается, производительность может быть совсем другой.

Набор данных был создан на основе строк данных размером 1,6 ГБ с 33 столбцами и 4226047 строками. DataSet создается путем загрузки данных CSV и сопоставляется с классом дела.

val df = spark.read.csv(csvFile).as[FireIncident]

Фильтр с UnitId = 'B02' должен вернуть 47980 строк. Я протестировал три способа, как показано ниже: 1) Использовать типизированный столбец (~ 500 мс на локальном хосте)

df.where($"UnitID" === "B02").count()

2) Используйте временную таблицу и sql-запрос (~ то же, что и вариант 1)

df.createOrReplaceTempView("FireIncidentsSF")
spark.sql("SELECT * FROM FireIncidentsSF WHERE UnitID='B02'").count()

3) Используйте строго типизированное поле класса (14 987 мс, т.е. в 30 раз медленнее)

df.filter(_.UnitID.orNull == "B02").count()

Я снова протестировал его с помощью API Python, для того же набора данных время составляет 17 046 мс, что сопоставимо с производительностью варианта 3 API Scala.

df.filter(df['UnitID'] == 'B02').count()

Может ли кто-нибудь пролить свет на то, как 3) и API Python выполняются иначе, чем первые два варианта?


person YPL    schedule 20.12.2016    source источник


Ответы (2)


Это из-за шага 3 здесь.

В первых двух Spark не нуждается в десериализации всего объекта Java / Scala - он просто смотрит на один столбец и движется дальше.

В-третьих, поскольку вы используете лямбда-функцию, Spark не может сказать, что вам нужно только одно поле, поэтому он извлекает из памяти все 33 поля для каждой строки, чтобы вы могли проверить одно поле.

Не знаю, почему четвертый такой медленный. Похоже, что он будет работать так же, как и первый.

person Corey Woodfield    schedule 08.06.2017
comment
Очень содержательный ответ. Что бы произошло, если бы вы написали Dataset<Row> на java: datasetRdd.filter(r -> r.<String>getAs("event_type_id").equals("LOG"))? - person Dusan Vasiljevic; 24.01.2018
comment
@DusanVasiljevic то же самое, если вы используете лямбду. Вы можете сохранить тип, но вам придется выполнять с ним нетипизированные операции, чтобы избежать загрузки в память. - person Joan; 13.08.2020

При запуске python происходит то, что сначала ваш код загружается в JVM, интерпретируется, а затем, наконец, компилируется в байт-код. При использовании Scala API Scala изначально запускается на JVM, поэтому вы вырезаете весь загружаемый код python в часть JVM.

person TheM00s3    schedule 20.12.2016
comment
Python API и фильтр Scala API со строго типизированным полем класса дают сопоставимые результаты по производительности. Вы знаете, почему вариант 3) в 30 раз медленнее, чем вариант 1) или 2)? - person YPL; 21.12.2016