Счастливая наука о данных :)
RDD против DataFrame
Resilience Distributed Dataset
, RDD
— это неизменяемый распределенный набор данных, разделенный на узлы кластера. RDD выпущен с Spark1.0
. Это фундаментальная структура данных Spark. RDD обеспечивает безопасность типов во время компиляции. Недостатком RDD является то, что он потребляет большой объем памяти и связан с накладными расходами на сборку мусора, поскольку он хранит в памяти объекты JVM.
DataFrame
— это неизменяемая распределенная коллекция данных. DataFrame выпущен с Spark 1.3
. В отличие от RDD, DataFrame организует данные в именованные столбцы. Это дает нам представление данных в виде столбцов с column name
и types info
, как в таблице реляционной базы данных. Spark улучшил производительность DataFrame, используя Custom Memory Management
и Optimized Execution Plans
. НедостатокDataFrame заключается в том, что он не обеспечивает безопасность во время компиляции и может вызывать ошибки во время выполнения. API DataFrame не выглядит программным и предоставляет больше SQL
полезных функций.
Несмотря на то, что DataFrame
/DataSet
передают абстракции, за кулисами все вычисления происходят с RDD. Мы можем перейти от DataFrame
к RDD
с помощью его метода rdd
. Также мы можем преобразовать RDD
в DataFrame
(если RDD имеет табличный формат) с помощью метода toDF
.
DataFrame со Scala
В моем предыдущем посте я рассказал про искру, искровую архитектуру и примеры с RDD и Scala
. В этом посте я расскажу о различных типах аналитики, которые можно выполнять с помощью Spark DataFrame. Все исходные коды, относящиеся к этому посту, доступны в gitlab. Пожалуйста, клонируйте репозиторий и продолжайте пост.
SBT-зависимость
Я использую IntelliJ Idea
в качестве своей IDE для работы с искровыми приложениями Scala. Сначала мне нужно создать sbt project
и добавить файл зависимостей build.sbt
со искровыми зависимостями. Для работы с DataFrame нам нужна зависимость spark-sql
. Ниже приведен файл зависимостей build.sbt
.
искровая сессия
Сначала нам нужно создать SparkSession
. Ниже приведен способ создания SparkSession с помощью построителя сеансов.
Загрузить кадр данных
DataFrame можно загрузить из .CSV
файлов. Для этого сначала нам нужно определить схему (поля и типы) для файла .CSV
. Затем мы можем загрузить данные .CSV
в DataFrame. В следующем примере я создаю DataFrame из файла uber.csv
, который находится в каталоге ресурсов.
Создать кадр данных
DataFrame можно создать с помощью Scala Seq
. Ниже приведен способ сделать это. Он использует функцию toDf
, доступную из import spark.implicits._
.
схема печати()
Мы можем просмотреть схему DataFrame с помощью функции printSchema()
. Он покажет поля в DataFrame и соответствующие типы данных.
показывать()
Для просмотра данных в DataFrame мы можем использовать функцию show()
. Мы не можем передать ни одной строки, которую нужно показать.
описывать()
Функция describe()
может использоваться для просмотра сводки DataFrame. Он показывает count
, mean
, min
, max
, stddev
подобные матрицы данных на DataFrame.
Выбрать()
Выбор конкретных полей можно выполнить с помощью функции select()
. В следующем примере выберите поля Device
, Event
и Time
из DataFrame.
фильтр()
Фильтрацию можно выполнить с помощью функции filter()
. Мы можем передать ему несколько логик фильтрации. Ниже приведен способ сделать это.
Сортировать()
Функция sort()
может использоваться для сортировки записей в DataFrame на основе поля. Следующие примеры сортируют DataFrame на основе поля Event
.
отчетливый()
Функция distinct()
может использоваться для получения разных значений одного или нескольких столбцов в DataFrame. Ниже приведены некоторые примеры.
группа по()
Функция groupBy
может использоваться для группировки похожих полей. count()
, sum()
, avg()
, min()
, max()
подобная функция может быть объединена с groupBy()
для добавления нового столбца в DataFrame.
окно()
Функцию window()
можно комбинировать с groupBy()
для группировки записей с определенным временным окном. В следующем примере записи в DataFrame группируются на основе поля Device
с временным интервалом 5 minutes
.
тип массива
DataFrame может содержать arrayType
полей. Функция array_contains
может использоваться для фильтрации элементов из arrayType
полей. Ниже приведен пример, который создает DataFrame с arrayType
полями и фильтрует с array_contains
.
тип структуры
DataFrame может содержать structType
вложенных объектов. В следующем примере показаны операции, которые мы можем выполнять с объектом structType
и списками.
Ссылка
- https://www.linkedin.com/pulse/apache-spark-rdd-vs-dataframe-dataset-chandan-prakash
- https://medium.com/rahasak/hacking-with-apache-spark-f6b0cabf0703
- https://hortonworks.com/tutorial/dataframe-and-dataset-examples-in-spark-repl/
- https://mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-1/
- http://allaboutscala.com/big-data/spark/