Внедрение больших данных с помощью Scala

Сравнение функций collect_list () и collect_set () в Spark с Scala

Различия между collect_list () и collect_set () в программировании на Scala

Переменные в формате массива могут быть созданы с различными версиями на разных языках программирования. В языке Scala в Spark есть две отличительные функции для создания массивов. Они называются функциями collect_list() и collect_set(), которые в основном применяются к столбцам типа массива в сгенерированном DataFrame, как правило, после оконных операций.

В этой статье различия между этими двумя функциями будут объяснены на соответствующих примерах. Основная цель - сравнить и выделить различия между следующими двумя функциями, поскольку они могут использоваться в случаях, вводящих в заблуждение.

В качестве языка программирования выбран Scala для использования со Spark 3.1.1. Вы можете применить аналогичную методологию, используя язык PySpark.

В целях тестирования образец фрейма данных с типизированной структурой может быть сгенерирован следующим образом. Во фрагменте кода строки таблицы создаются путем добавления соответствующего содержимого. После создания строк мы можем добавить эти столбцы в нашу схему данных, отформатировав их с соответствующими типами данных как IntegerType для столбца день и StringType для столбца имя.

import org.apache.spark.sql._
import org.apache.spark.sql.types._
val dataFrame = Seq(
    Row(1,"Employee_1", "Kafka"), 
    Row(2,"Employee_1", "Kibana"), 
    Row(3,"Employee_1", "Hadoop"), 
    Row(4,"Employee_1", "Hadoop"),  
    Row(4,"Employee_1", "Hadoop"),
    Row(5,"Employee_2", "Spark"), 
    Row(6,"Employee_2", "Scala"),  
    Row(7,"Employee_2", "SageMaker"), 
    Row(7,"Employee_2", "SageMaker"),
    Row(8,"Employee_3", "GCP"), 
    Row(9,"Employee_3", "AWS"),
    Row(10,"Employee_3", "Azure"),
    Row(11,"Employee_4", null)
  )
val dataFrameSchema = new StructType().add("day", IntegerType).add("name", StringType).add("toolSet", StringType)
val array_dataframe = spark.createDataFrame(spark.sparkContext.parallelize(dataFrame),dataFrameSchema)

После этапа создания фрейма данных мы можем распечатать схему данных и их содержимое.

array_dataframe.printSchema()
array_dataframe.show(false)

Мы создали фрейм данных с тремя столбцами: день, имя сотрудника и их наборы инструментов.

Нашим следующим шагом будет создание массивов для этих сотрудников для их соответствующих наборов инструментов с помощью функций collect_list() и collect_set().

collect_list ()

Первая функция создания массива называется collect_list(). Его можно использовать либо для группировки значений, либо для их агрегирования с помощью операции управления окнами.

В следующем скрипте существует столбец с именами name и toolSet. Если внимательно присмотреться, Сотрудник 1 имеет три инструмента с двумя дубликатами, а Сотрудник 2 имеет три инструмента с одним дубликатом. В этом конкретном случае мы можем сгруппировать по сотрудникам и собрать весь toolSet в массив. Это можно реализовать, сначала сгруппировав по сотрудникам и объединив в toolSet.

Ключевым моментом для списка сбора является то, что функция сохраняет все повторяющиеся значения внутри массива, сохраняя последовательность элементов.

val collect_list_df = array_dataframe.groupBy("name").agg(collect_list("toolSet").as("toolSet"))
collect_list_df.printSchema()
collect_list_df.show(false)

Если мы хотим удалить отдельные значения, сохранив порядок элементов (день, метка времени, идентификатор и т. Д.), мы можем использовать функцию array_distinct () перед применением collect_list. В следующем примере мы ясно видим, что исходная последовательность элементов сохраняется. Например, Employee_1, мы видим, что исходный порядок элементов Kafka, Kibana, Hadoop сохранялся как Kafka, Kibana, Hadoop после применения операции collect_list() без потери последовательности.

val collect_list_df = array_dataframe.groupBy("name").agg(array_distinct(collect_list("toolSet")).as("toolSet"))
collect_list_df.printSchema()
collect_list_df.show(false)

collect_set ()

Вторая функция создания массива называется collect_set(). Его можно использовать либо для группировки значений, либо для их агрегирования с помощью операции управления окнами.

В следующем скрипте существует столбец с именами name и toolSet. Если внимательно присмотреться, Сотрудник 1 имеет три инструмента с двумя дубликатами, а Сотрудник 2 имеет три инструмента с одним дубликатом. В этом конкретном случае мы можем сгруппировать по сотрудникам и собрать весь toolSet в массив. Это можно реализовать, сначала сгруппировав по сотрудникам и объединив в toolSet.

Ключевым моментом для списка сбора является то, что функция удаляет повторяющиеся значения внутри массива. Однако это не гарантирует, что последовательность элементов в массиве. Посмотрев на Employee_1, мы видим, что первоначальный порядок элементов Kafka, Kibana, Hadoop был изменен на Kibana, Kafka, Hadoop после применения операции collect_set().

val collect_set_df = array_dataframe.groupBy("name").agg(collect_set("toolSet").as("toolSet"))
collect_set_df.show(false)

Заключение

В Spark мы можем использовать функции collect_list() и collect_set() для генерации массивов с разных точек зрения.

Операцияcollect_list() не отвечает за объединение списка массивов. Он заполняет все элементы в их существующем порядке и не удаляет дубликаты.

С другой стороны, collect_set()operation действительно устраняет дубликаты; однако он не может сохранить существующий порядок элементов в массиве.

С целью объединения наших элементов в массив мы можем легко использовать встроенную функцию array_distinct () перед функцией collect_list (), чтобы удалить повторяющиеся значения без потери последовательности. элементов в массиве.

Вопросы и комментарии приветствуются!

использованная литература

  1. Операция сбора и сбора списка в ScalaDoc в Spark 3
  2. Встроенные функции Spark
  3. Пример из DataBricks для Collect_List