20 часто используемых функций PySpark RDD

Каждая функция демонстрируется на понятном примере

Apache Spark очень популярен в аналитике больших данных. Он использует распределенную систему обработки. PySpark — это интерфейс для Apache Spark в Python. Когда у вас есть огромный набор данных размером в терабайты, обычный код на Python будет очень медленным. Но алгоритм PySpark будет намного быстрее. Потому что он делит набор данных на более мелкие части, распределяет их по отдельным процессорам, выполняет операции в каждом процессоре отдельно, а затем объединяет их вместе, чтобы получить общий результат.

Это общий обзор того, как PySpark работает быстрее. В этой статье основное внимание будет уделено некоторым очень часто используемым функциям в PySpark.

Если вы новичок, вы можете попрактиковаться в использовании блокнота Google-Colab. Вам просто нужно установить, используя эту простую строку:

pip install pyspark

Установка займет всего несколько минут, и ноутбук будет готов для кодов PySpark.

Для начала необходимо создать SparkContext, который является основной точкой входа для функциональности Spark. Он представляет собой подключение к кластеру Spark. Здесь я создаю SparkContext:

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

Я начну с самых основных функций и перейду к более удобным для аналитики функциям.

sc.parallelize ()

Здесь я создаю очень простой объект RDD, используя этот SparkContext, используя метод parallelize. Параллельный метод создает параллельную коллекцию, которая позволяет распределять данные.

rdd_small = sc.parallelize([3, 1, 12, 6, 8, 10, 14, 19])

Вы не можете распечатать объект RDD, как обычный список или массив в записной книжке.

.собирать()

Если вы просто наберете rdd_small и запустите в блокноте, вывод будет выглядеть так:

rdd_small

Выход:

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274

Итак, это parallelCollectionRDD. Потому что эти данные находятся в распределенной системе. Вы должны собрать их вместе, чтобы иметь возможность использовать их в качестве списка.

rdd_small.collect()

Выход:

[3, 1, 12, 6, 8, 10, 14, 19]

Сбор всего объекта RDD может не иметь смысла все время, когда набор данных слишком велик. Вы можете взять только первый элемент данных или несколько первых элементов данных, чтобы изучить структуру данных, тип или качество данных.

Здесь я делаю более крупный объект RDD:

rdd_set = sc.parallelize([[2, 12, 5, 19, 21],
                          [10, 19, 5, 21, 8],
                          [34, 21, 14, 8, 10],
                          [110, 89, 90, 134, 24],
                          [23, 119, 234, 34, 56]])

.первый()

Получение только первого элемента объекта RDD:

rdd_set.first()

Выход:

[2, 12, 5, 19, 21]

.брать()

Здесь я беру первые три элемента:

rdd_set.take(3)

Выход:

[[2, 12, 5, 19, 21], [10, 19, 5, 21, 8], [34, 21, 14, 8, 10]]

На выходе мы получаем первые три элемента.

.текстовый файл()

На этом этапе я хочу представить текстовый файл для демонстрации нескольких различных функций.

Я скопировал несколько текстов со Википедийной страницы США и сделал текстовый файл с помощью простого блокнота. Файл сохраняется как usa.txt. Вы можете скачать этот текстовый файл по этой ссылке:



Вот как сделать RDD с помощью текстового файла:

lines = sc.textFile("usa.txt")

Давайте снова воспользуемся функцией .take(), чтобы увидеть первые 4 элемента файла:

lines.take(2)

Выход:

["The United States of America (U.S.A. or USA), commonly known as the United States (U.S. or US) or America, is a country primarily located in North America. It consists of 50 states, a federal district, five major unincorporated territories, 326 Indian reservations, and nine minor outlying islands.[h] At nearly 3.8 million square miles (9.8 million square kilometers), it is the world's third- or fourth-largest country by geographic area.[c] The United States shares land borders with Canada to the north and Mexico to the south as well as maritime borders with the Bahamas, Cuba, Russia, and other countries.[i] With a population of more than 331 million people,[j] it is the third most populous country in the world. The national capital is Washington, D.C., and the most populous city and financial center is New York City.",  '']

.flatMap()

Обычной практикой является разделение текстового содержимого для анализа. Вот использование функции flatMap для разделения текстовых данных по пробелам и создания одного большого списка строк:

words = lines.flatMap(lambda x: x.split(' '))
words.take(10)

Выход:

['The',  'United',  'States',  'of',  'America',  '(U.S.A.',  'or',  'USA),',  'commonly',  'known']

Первые 10 элементов теперь выглядят так.

.карта()

Карта полезна, если вы хотите применить некоторое преобразование к каждому элементу RDD или использовать условие. В этом случае каждый элемент означает каждое слово. Здесь я сделаю каждое слово строчным и преобразую каждое слово в кортеж, добавляя 1 к каждому слову.

wordsAsTuples = words.map(lambda x: (x.lower(), 1))
wordsAsTuples.take(4)

Выход:

[('the', 1), ('united', 1), ('states', 1), ('of', 1)]

Вот небольшое пояснение того, что происходит. «x» в лямбда-выражении представляет каждый элемент RDD. Все, что вы делаете с «x», относится к каждому элементу в RDD.

Здесь мы преобразовали «x» как (x, 1). Итак, каждое слово выходит как (слово, 1). Внимательно посмотрите на вывод.

.reduceByKey

Эта функция полезна, когда есть пара ключ-значение, и вы хотите добавить все значения одного и того же ключа. Например, в словахAsTuples выше у нас есть пары ключ-значение, где ключи — это слова, а значения — это единицы. Обычно первый элемент кортежа считается ключом, а второй — значением.

Если мы используем reduceByKey для wordsAsTuples, он добавит единицы, которые мы добавили для того же ключа (это означает те же слова). Если у нас есть 4 «the», он добавит четыре единицы и сделает это («the», 4)

counts = wordsAsTuples.reduceByKey(lambda x, y: x+y)
counts.take(3)

Выход:

[('united', 14), ('of', 20), ('america', 1)]

Итак, в нашем тексте данные «united» появились 14 раз, «of» — 20 раз, а «america» — только один раз.

.вершина()

Возвращает указанные верхние элементы. Я объясню еще кое-что после этого примера:

counts.top(20, lambda x: x[1])

Выход:

[('the', 55),  
('and', 24),  
('of', 20),  
('united', 14),  
('is', 13),  
('in', 13),  
('a', 13),  
('states', 12),  
('it', 9),  
('to', 7),  
('as', 6),  
("world's", 6),  
('by', 6),  
('world', 5),  
('with', 5),  
('american', 5),  
('war', 5),  
('or', 4),  
('north', 4),  
('its', 4)]

Что здесь происходит? В этой команде мы говорим, что нам нужны первые 20 элементов. Затем x[1] указывается как условие в лямбда-выражении. В кортеже, подобном ('the', 55), "the" равно x[0], а 55 равно x[1]. В лямбда-выражении указание x[1] означает, что нам нужны 20 лучших элементов на основе x[1] каждого элемента. Таким образом, он возвращает первые 20 слов на основе вхождений в текстовый файл.

Если вы используете x[0] в качестве условия для лямбда, он вернет 20 лучших в алфавитном порядке, поскольку x[0] является строкой. Пожалуйста, не стесняйтесь попробовать.

.фильтр()

В топ-20 слов выше большинство слов не очень значимы. Такие слова, как «в», «в», «с», «в» не дают никакого понимания текста. При работе с текстовыми данными принято опускать незначащие слова такого типа. Хотя это не всегда хорошая идея.

Если мы сможем исключить некоторые из этих незначительных слов, мы можем увидеть еще несколько значимых слов в списке 20 лучших.

Вот список слов, которые я хочу исключить из текста перед тем, как взять 20 лучших слов:

stop = ['', 'the', 'and', 'of', 'is', 'in', 'a', 'it', 'to', 'as', 'by', 'with', 'or', 'its', 'from', 'at']

Теперь мы отфильтруем эти слова:

words_short = counts.filter(lambda x: x[0] not in stop)

В этом новом RDD words_short нет тех слов, которые мы перечислили в «стоп».

Вот топ-20 слов на данный момент:

words_short.top(20, lambda x: x[1])

Выход:

[('united', 14),
 ('states', 12),
 ("world's", 6),
 ('world', 5),
 ('american', 5),
 ('war', 5),
 ('north', 4),
 ('country', 3),
 ('population', 3),
 ('new', 3),
 ('established', 3),
 ('war,', 3),
 ('million', 3),
 ('military', 3),
 ('international', 3),
 ('largest', 3),
 ('america,', 2),
 ('states,', 2),
 ('square', 2),
 ('other', 2)]

У нас нет ни одного из этих слов в стоп-листе.

.sortByKey()

Мы можем отсортировать весь RDD, используя эту функцию .sortByKey(). Как следует из названия, он сортирует RDD по ключам. В RDD counts ключами являются строки символов. Таким образом, он будет сортироваться по алфавиту.

counts.sortByKey().take(10)

Выход:

[('', 3),  
('(1775–1783),', 1),  
('(9.8', 1),  
('(u.s.', 1),  
('(u.s.a.', 1),  
('12,000', 1),  
('16th', 1),  
('1848,', 1),  
('18th', 1),  
('1969', 1)]

Как видите, сначала появилась пустая строка, а затем числовые строки. Потому что цифровая клавиша стоит перед буквами в алфавитном порядке.

По умолчанию сортировка дает вам результаты в порядке возрастания. Но если вы передадите False в функцию sortByKey, она будет сортироваться в порядке убывания. Здесь мы сортируем по убыванию и берем первые 10 элементов:

counts.sortByKey(False).take(10)

Выход:

[('york', 1),  
('years', 1),  
('world.', 1),  
('world,', 1),  
("world's", 6),  
('world', 5),  
('with', 5),  
('which', 1),  
('when', 1),  
('west.', 1)]

Также можно применить функцию или условие перед сортировкой в ​​функции .sortByKey. Вот РДД:

r1 = [('a', 1), ('B', 2), ('c', 3), ('D', 4), ('e', 5)]

В RDD r1 некоторые клавиши написаны строчными буквами, а некоторые — прописными. Если мы отсортируем это по ключам, то по умолчанию сначала будут прописные буквы, а затем строчные.

r1.sortByKey().collect()

Выход:

[('B', 2), ('D', 4), ('a', 1), ('c', 3), ('e', 5)]

Но если мы хотим избежать части регистра и хотим, чтобы функция сортировала без учета регистра, мы можем добавить условие в функцию .sortByKey. Вот пример:

r1.sortByKey(True, keyfunc=lambda k: k.upper()).collect()

Выход:

[('a', 1), ('B', 2), ('c', 3), ('D', 4), ('e', 5)]

В приведенном выше лямбда-выражении мы просим функцию рассматривать все ключи как прописные, а затем сортировать. Он только рассматривает все ключи как верхний регистр, но не возвращает ключи в верхнем регистре.

.группа по ключу()

Эта функция groupByKey() группирует все значения на основе ключей и объединяет их. Напоминаем, что первый элемент в кортеже — это ключ, а второй — значение по умолчанию. Прежде чем продолжить обсуждение, рассмотрим пример:

numbers_only = wordsAsTuples.groupByKey().map(lambda x: sum(x[1]))
numbers_only.take(10)

Выход:

[14, 20, 1, 1, 1, 6, 2, 13, 3, 1]

В данном случае ключами являются слова. Предположим, что «the» — это ключ, и когда мы используем groupByKey(), он группирует все значения этого ключа «the» и агрегирует их, как указано. Здесь я использовал sum() в качестве агрегатной функции. Таким образом, он суммирует все значения. Мы получили вхождения каждого слова. Но на этот раз мы получили только количество вхождений в виде списка.

.уменьшать()

Используется для уменьшения элементов RDD. Сокращение number_only, которое мы получили из последнего примера:

total_words = numbers_only.reduce(lambda x, y: x+y)
total_words

Выход:

575

У нас получилось 575. Это означает, что всего в текстовом файле 575 слов.

.mapValues ​​()

Его можно использовать для применения некоторых преобразований к значению пары ключ-значение. Он возвращает ключ и преобразованные значения. Вот пример, где ключи — это строки, а значения — целые числа. Я разделю значения на 2:

rdd_1 = sc.parallelize([("a", 3), ("n", 10), ("s", 5), ("l", 12)])
rdd_1.mapValues(lambda x: x/2).collect()

Выход:

[('a', 1.5), ('n', 5.0), ('s', 2.5), ('l', 6.0)]

Здесь «x» в лямбда-выражении представляет значения. Итак, все, что вы делаете с «x», применяется ко всем значениям в RDD.

Еще один пример будет полезен для лучшего понимания. В этом примере используется другой RDD, где ключи — это строки, а значения — это списки целых чисел. Мы будем использовать агрегатную функцию для списков.

rdd_map = sc.parallelize([("a", [1, 2, 3, 4]), ("b", [10, 2, 8, 1])])
rdd_map.mapValues(lambda x: sum(x)).collect()

Выход:

[('a', 10), ('b', 21)]

Посмотрите на вывод здесь. Каждое значение представляет собой сумму целых чисел в списках значений.

.countByValue ()

Возвращает количество вхождений каждого элемента RDD в формате словаря.

sc.parallelize([1, 2, 1, 3, 2, 4, 1, 4, 4]).countByValue()

Выход:

defaultdict(int, {1: 3, 2: 2, 3: 1, 4: 3})

Вывод показывает словарь, где ключи — это отдельные элементы СДР, а значения — это количество вхождений этих различных значений.

.getNumPartitions()

Объекты RDD хранятся в виде кластеров элементов. Другими словами, объект RDD разделен на несколько разделов. Мы не делаем этих разделений. Такова природа RDD. Это происходит по умолчанию.

Кластеры могут выполнять одну задачу одновременно для всех разделов. Как следует из названия, эта функция .getNumPartitions() сообщает вам, сколько существует разделов.

data = sc.parallelize([("p",5),("q",0),("r", 10),("q",3)])
data.getNumPartitions()

Выход:

2

У нас есть 2 раздела в объекте «данные».

Вы можете увидеть, как они разделены, используя функцию .glom():

data.glom().collect()

Выход:

[[('a', 1), ('b', 2)], [('a', 2), ('b', 3)]]

Он показывает два списка элементов. Потому что есть два раздела.

Союз

Вы можете объединить два RDD, используя объединение. Например, здесь я делаю два RDD «rd1» и «rd2». Затем я использую объединение, чтобы соединить их вместе, чтобы создать «rd3».

rd1 = sc.parallelize([2, 4, 7, 9])
rd2 = sc.parallelize([1, 4, 5, 8, 9])
rd3 = rd1.union(rd2)
rd3.collect()

Выход:

[2, 4, 7, 9, 1, 4, 5, 8, 9]

Вновь сформированный СДР «rd3» включает в себя все элементы «rd1» и «rd2».

.отчетливый()

Он возвращает отдельные элементы RDD.

rd4 = sc.parallelize([1, 4, 2, 1, 5, 4])
rd4.distinct().collect()

Выход:

[4, 2, 1, 5]

У нас есть только отдельные элементы «rd4».

.zip()

Когда мы используем zip для двух RDD, они создают кортежи, используя элементы обоих RDD. Пример продемонстрирует это наглядно:

rd11 = sc.parallelize(["a", "b", "c", "d", "e"])
rdda = sc.parallelize([1, 2, 3, 4, 5])
rda_11 = rdda.zip(rd11)
rda_11.collect()

Выход:

[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]

В операции zip мы сначала упомянули «rdda». Итак, в выводе элементы «rdda» идут первыми.

Присоединяется

Последнее, над чем нужно поработать в этой статье, — это соединения. Название уже говорит вам, что оно объединяет два RDD. Давайте сделаем еще один RDD, такой как «rda_11», а затем присоединимся.

rddb = sc.parallelize([1, 3, 4, 6])
rd22 = sc.parallelize(["apple", "ball", "cal", "dog"])
rdb_22 = rddb.zip(rd22)
rdb_22.collect()

Выход:

[(1, 'apple'), (3, 'ball'), (4, 'cal'), (6, 'dog')]

Теперь у нас есть «rdb_22». Давайте объединим «rdda_11» из предыдущего примера и «rdb_22» вместе:

rda_11.join(rdb_22).collect()

Выход:

[(4, ('d', 'cal')), (1, ('a', 'apple')), (3, ('c', 'ball'))]

По умолчанию операция объединения объединяет два RDD по ключам. Еще раз напоминаем, что первый элемент каждого кортежа считается ключевым.

В базовой операции соединения объединяются только элементы общих ключей в обоих RDD.

Есть и другие виды соединений. Вот пример левого внешнего соединения:

rda_11.leftOuterJoin(rdb_22).collect()

Выход:

[(4, ('d', 'cal')),  
(1, ('a', 'apple')),  
(5, ('e', None)),  
(2, ('b', None)),  
(3, ('c', 'ball'))]

Поскольку это левое внешнее соединение, RDD, упомянутый слева, в данном случае «rda_11» принесет все его элементы. Но порядок элементов может отличаться от «rda_11». СДР, находящийся справа, будет содержать только те элементы, которые являются общими с СДР, расположенным слева.

Существует также правое внешнее соединение, которое делает прямо противоположное:

rda_11.rightOuterJoin(rdb_22).collect()

Выход:

[(4, ('d', 'cal')),  
(1, ('a', 'apple')),  
(6, (None, 'dog')),  
(3, ('c', 'ball'))]

Наконец, существует полное внешнее соединение, которое возвращает каждый элемент из обоих RDD.

rda_11.fullOuterJoin(rdb_22).collect()

Выход:

[(4, ('d', 'cal')),
 (1, ('a', 'apple')),
 (5, ('e', None)),
 (2, ('b', None)),
 (6, (None, 'dog')),
 (3, ('c', 'ball'))]

Как видите, здесь есть все ключи от обоих RDD.

Заключение

Я хотел перечислить наиболее часто используемые и простые операции RDD, которые могут выполнять множество задач. Операций RDD гораздо больше. Возможно, когда-нибудь позже я придумаю еще. Надеюсь, это было полезно.

Пожалуйста, не стесняйтесь подписываться на меня в Твиттере, на странице в Facebook и заглядывать на мой канал на YouTube.

Подробнее Чтение