Всем привет!!! В этом блоге мы узнаем о переменных в PySpark. Обратитесь к моему блогу о концепции Spark, чтобы получить представление о концепции.Сначала давайте узнаем о том,

Что такое переменные?

⯮ Переменная — это значение, которое может изменяться в зависимости от условий или информации, переданной программе.

⯮ Как правило, программа состоит из инструкций, сообщающих компьютеру, что делать, и данных, которые программа использует во время работы.

Что такое переменные в PySpark?

➥ Apache Spark использует общие переменные для параллельной обработки. Ну, общие переменные бывают двух типов,

◙ Трансляция

◙ Аккумулятор

➥ В этом блоге мы изучим концепцию трансляции и накопления с использованием PySpark.

Итак, давайте запустим PySpark Broadcast and Accumulator.

Трансляция и аккумулятор PySpark

➥ При определении параллельной обработки, когда драйвер отправляет задачу исполнителю в кластере, копия общей переменной отправляется на каждый узел кластера, поэтому мы можем использовать ее для выполнения задач.

Давайте подробно рассмотрим PySpark Broadcast и Accumulator…

Широковещательные переменные — PySpark

⯮ Широковещательные переменные — это переменные, доступные во всех исполнителях, выполняющих приложение Spark.

⯮ Эти переменные уже кэшированы и готовы к использованию задачами, выполняемыми как часть приложения.

⯮ Переменные трансляции отправляются исполнителям только один раз и доступны для всех задач, выполняемых в исполнителях.

Когда использовать широковещательные переменные?

➨ Допустим, вы работаете с набором данных о сотрудниках. В наборе данных сотрудников у вас есть столбец для представления состояния. Штат обозначается двумя буквами, например, NY для Нью-Йорка.

➨ Теперь вы хотите, чтобы вывод выводил имя сотрудника и штат, но вам нужно полное имя штата, а не двухбуквенное обозначение.

➨ Существует традиционный способ решения этой проблемы. Это должно поддерживать небольшой набор данных с сопоставлением двух букв состояния с полным именем и присоединять этот набор данных к набору данных сотрудников, присоединяясь к двухбуквенному ключу состояния. Это, безусловно, даст результат, который вы ищете.

➨ Но в описанном выше подходе есть несколько ненужных сложностей.

  1. Присоединение вызывает перетасовку, и для большого набора данных это будет дорого.
  2. Много данных передается по сети
  3. Перемешивание увеличивает время выполнения задания.

➨Вместо объединения сформируйте карту (пара ключ-значение) с буквой состояния 2 и полным именем штата и транслируйте карту. Spark сериализует данные и сделает данные Map доступными для всех исполнителей. Задачи могут выполнять простой поиск двух букв и отображать полное имя вместо соединения, чтобы получить результат.

Когда НЕ использовать широковещательные переменные

Используйте широковещательные переменные для небольших данных стиля поиска, а не для больших наборов данных. Размер данных, которые вы транслируете, должен быть в МБ, а не в ГБ.

Как Spark обрабатывает широковещательные переменные?

◙ Когда Spark обнаружит использование широковещательной переменной в вашем коде, Spark сериализует данные и отправит их всем исполнителям, участвующим в вашем приложении.

◙ Широковещательные переменные кэшируются на стороне исполнителя, и все задачи в приложении будут иметь доступ к данным в широковещательной переменной.

◙ Предположим, у вас есть 10 выполнений, и ваши приложения выполняют в общей сложности 100 задач. Широковещательная переменная будет отправлена ​​10 исполнителям, а не 100 раз.

◙ Это десятикратное уменьшение объема данных, которые были бы переданы, если бы мы не использовали широковещательную переменную.

Широковещательные переменные используются для сохранения копии данных на всех узлах. Эта переменная кэшируется на всех машинах и не отправляется на машины с задачами.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

⯮ В приведенном выше коде показано использование переменной Broadcast. У него есть атрибут с именем value. Он хранит данные и используется для возврата широковещательного значения.

⯮ Чтобы использовать широковещательную переменную, вот пример, показывающий широковещательную переменную, у которой есть атрибут с именем value, этот атрибут хранит данные, а затем он используется для возврата широковещательного значения, например:

from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)

Команда:

$SPARK_HOME/bin/spark-submit Broadcast.py

Вывод :

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Что нужно помнить при использовании переменных Broadcast:

⯮ После того, как мы передали значение узлам, мы не должны вносить изменения в его значение, чтобы убедиться, что на каждом узле есть одна и та же копия данных.

⯮ Измененное значение может быть позже отправлено другому узлу, что приведет к неожиданным результатам.

Аккумуляторы — Pyspark

Что такое аккумуляторы?

Аккумуляторы похожи на глобальные переменные в приложении Spark. В реальном мире аккумуляторы используются как счетчики и отслеживают что-то на уровне приложения. Аккумуляторы служат той же цели, что и счетчики в MapReduce.

Как создать переменную аккумулятора в PySpark?

Используя accumulator() из класса SparkContext, мы можем создать аккумулятор в программировании PySpark. Пользователи также могут создавать аккумуляторы для пользовательских типов, используя класс AccumulatorParam PySpark.

Несколько замечаний.

  • sparkContext.accumulator() используется для определения переменных-аккумуляторов.
  • Функция add() используется для добавления/обновления значения в аккумуляторе.
  • value в переменной-аккумуляторе используется для извлечения значения из аккумулятора.

➥ Мы можем создавать аккумуляторы в PySpark для примитивных типов int и float. Пользователи также могут создавать аккумуляторы для пользовательских типов, используя класс AccumulatorParam PySpark.

➥ Переменная трансляции называется значением и хранит пользовательские данные. Переменная также возвращает значение широковещательной рассылки.

accumulator_ = sc.accumulator(0)
rdd = sc.parallelize([5, 6, 7, 8])
def f(x):
глобальный аккумулятор_
accum += x
rdd.foreach(f)
accum.value

Команда:

$SPARK_HOME/bin/spark-submit accumulator.py

Выход :

Значение: 26

Преимущества и использование искрового аккумулятора

Доступ к памяти очень прямой.

♠ Значения мусора меньше всего собираются при обработке служебных данных.

♠ Формат памяти — компактный столбчатый.

♠ Оптимизация катализатора запросов.

♠ Генерация кода — это весь этап.

♠ Преимущества тайлового компиляции по наборам данных перед фреймами данных.

Давайте встретимся со всеми вами в следующем блоге… Есть вопросы? Пожалуйста, прикрепите меня в разделе комментариев, я свяжусь с вами :)

Ресурсы:





https://data-flair.training/blogs/pyspark-broadcast-and-accumulator/





Источник изображения: