Внедрение больших данных с помощью Scala
Сравнение функций collect_list () и collect_set () в Spark с Scala
Различия между collect_list () и collect_set () в программировании на Scala
Переменные в формате массива могут быть созданы с различными версиями на разных языках программирования. В языке Scala в Spark есть две отличительные функции для создания массивов. Они называются функциями collect_list()
и collect_set()
, которые в основном применяются к столбцам типа массива в сгенерированном DataFrame, как правило, после оконных операций.
В этой статье различия между этими двумя функциями будут объяснены на соответствующих примерах. Основная цель - сравнить и выделить различия между следующими двумя функциями, поскольку они могут использоваться в случаях, вводящих в заблуждение.
В качестве языка программирования выбран Scala для использования со Spark 3.1.1. Вы можете применить аналогичную методологию, используя язык PySpark.
В целях тестирования образец фрейма данных с типизированной структурой может быть сгенерирован следующим образом. Во фрагменте кода строки таблицы создаются путем добавления соответствующего содержимого. После создания строк мы можем добавить эти столбцы в нашу схему данных, отформатировав их с соответствующими типами данных как IntegerType для столбца день и StringType для столбца имя.
import org.apache.spark.sql._ import org.apache.spark.sql.types._ val dataFrame = Seq( Row(1,"Employee_1", "Kafka"), Row(2,"Employee_1", "Kibana"), Row(3,"Employee_1", "Hadoop"), Row(4,"Employee_1", "Hadoop"), Row(4,"Employee_1", "Hadoop"), Row(5,"Employee_2", "Spark"), Row(6,"Employee_2", "Scala"), Row(7,"Employee_2", "SageMaker"), Row(7,"Employee_2", "SageMaker"), Row(8,"Employee_3", "GCP"), Row(9,"Employee_3", "AWS"), Row(10,"Employee_3", "Azure"), Row(11,"Employee_4", null) ) val dataFrameSchema = new StructType().add("day", IntegerType).add("name", StringType).add("toolSet", StringType) val array_dataframe = spark.createDataFrame(spark.sparkContext.parallelize(dataFrame),dataFrameSchema)
После этапа создания фрейма данных мы можем распечатать схему данных и их содержимое.
array_dataframe.printSchema() array_dataframe.show(false)
Мы создали фрейм данных с тремя столбцами: день, имя сотрудника и их наборы инструментов.
Нашим следующим шагом будет создание массивов для этих сотрудников для их соответствующих наборов инструментов с помощью функций collect_list()
и collect_set()
.
collect_list ()
Первая функция создания массива называется collect_list()
. Его можно использовать либо для группировки значений, либо для их агрегирования с помощью операции управления окнами.
В следующем скрипте существует столбец с именами name и toolSet. Если внимательно присмотреться, Сотрудник 1 имеет три инструмента с двумя дубликатами, а Сотрудник 2 имеет три инструмента с одним дубликатом. В этом конкретном случае мы можем сгруппировать по сотрудникам и собрать весь toolSet в массив. Это можно реализовать, сначала сгруппировав по сотрудникам и объединив в toolSet.
Ключевым моментом для списка сбора является то, что функция сохраняет все повторяющиеся значения внутри массива, сохраняя последовательность элементов.
val collect_list_df = array_dataframe.groupBy("name").agg(collect_list("toolSet").as("toolSet")) collect_list_df.printSchema() collect_list_df.show(false)
Если мы хотим удалить отдельные значения, сохранив порядок элементов (день, метка времени, идентификатор и т. Д.), мы можем использовать функцию array_distinct () перед применением collect_list. В следующем примере мы ясно видим, что исходная последовательность элементов сохраняется. Например, Employee_1, мы видим, что исходный порядок элементов Kafka, Kibana, Hadoop сохранялся как Kafka, Kibana, Hadoop после применения операции collect_list()
без потери последовательности.
val collect_list_df = array_dataframe.groupBy("name").agg(array_distinct(collect_list("toolSet")).as("toolSet")) collect_list_df.printSchema() collect_list_df.show(false)
collect_set ()
Вторая функция создания массива называется collect_set()
. Его можно использовать либо для группировки значений, либо для их агрегирования с помощью операции управления окнами.
В следующем скрипте существует столбец с именами name и toolSet. Если внимательно присмотреться, Сотрудник 1 имеет три инструмента с двумя дубликатами, а Сотрудник 2 имеет три инструмента с одним дубликатом. В этом конкретном случае мы можем сгруппировать по сотрудникам и собрать весь toolSet в массив. Это можно реализовать, сначала сгруппировав по сотрудникам и объединив в toolSet.
Ключевым моментом для списка сбора является то, что функция удаляет повторяющиеся значения внутри массива. Однако это не гарантирует, что последовательность элементов в массиве. Посмотрев на Employee_1, мы видим, что первоначальный порядок элементов Kafka, Kibana, Hadoop был изменен на Kibana, Kafka, Hadoop после применения операции collect_set()
.
val collect_set_df = array_dataframe.groupBy("name").agg(collect_set("toolSet").as("toolSet")) collect_set_df.show(false)
Заключение
В Spark мы можем использовать функции collect_list()
и collect_set()
для генерации массивов с разных точек зрения.
Операцияcollect_list()
не отвечает за объединение списка массивов. Он заполняет все элементы в их существующем порядке и не удаляет дубликаты.
С другой стороны, collect_set()
operation действительно устраняет дубликаты; однако он не может сохранить существующий порядок элементов в массиве.
С целью объединения наших элементов в массив мы можем легко использовать встроенную функцию array_distinct () перед функцией collect_list (), чтобы удалить повторяющиеся значения без потери последовательности. элементов в массиве.
Вопросы и комментарии приветствуются!