Потому что мы не всегда можем вызывать df.collect() и локально запускать pandas

Будучи распределенной средой, Apache Spark запускает код на многих узлах. Однако этот рай при параллельной обработке может быстро превратиться в кошмар в аду зависимостей, особенно для непосвященных.

Люди, настраивающие кластеры Hadoop и управляющие ими, обычно нете же, кто пишет пакетные задания ETL и развертывает модные модели машинного обучения, работающие на передовых пакетах Python. Таким образом, у последней группы могут возникнуть трудности с использованием пакетов и jar-файлов, которые изначально никогда не устанавливались в кластере.



В то время как современные методы развертывания, такие как Dockerised Airflow, позволяют нам легко вернуться к использованию Python, который уже предустановлен в наших кластерах, цель этой статьи — освободить нас от оков устаревших API и устаревших шаблонов программирования.

…если вы не вызываете df.collect(), чтобы сделать все локально в качестве обходного пути, то мы спасем вас от ошибок нехватки памяти (OOM) через 6 месяцев, когда ваши данные больше не помещаются в памяти.

Предпосылки

Верно, у этой статьи тоже есть зависимости.

Если вы хотите продолжить, псевдокластер можно настроить полностью на одном хосте Docker, чтобы мы могли поиграть с ним. Если вы не знакомы с Docker, краткого руководства по нему (и docker-compose) должно быть достаточно, поскольку мы используем только самые распространенные функции.

$ git clone https://github.com/tuckging/tutorials.git
$ cd tutorials/spark-on-hadoop/
$ docker-compose up --build

За исключением ошибок и при наличии достаточного объема оперативной памяти (около 6 ГБ бесплатно) у нас должен быть работающий псевдокластер, доступный в следующих веб-интерфейсах:

С этого момента мы будем запускать код полностью в Jupyter, поэтому мы изменим префикс для команд bash с $ на ! для запуска в ячейках Jupyter.

Урок №1: В Hadoop YARN нет встроенной версии Spark

Что!?

Это верно. На самом деле мы не устанавливаемSpark в кластер Hadoop. Вместо этого банки Spark обычно распределяются по всему кластеру каждый раз, когда мы вызываем spark-submit. Это одна из нескольких причин, по которой запуск нового SparkContext часто занимает до 30 секунд.

На самом деле это новая установка Hadoop, и Spark вообще нигде не установлен. Итак, давайте создадим новый блокнот Jupyter (или откроем предоставленный) и установим последнюю версию PySpark 3.2.1 в контейнер jupyter:

import sys
!{sys.executable} -m pip install pyspark==3.2.1

Все остальные узлы в нашем псевдокластере остаются без Spark, так что же происходит, когда мы пытаемся создать SparkSession с yarn в качестве главного?

from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .getOrCreate()
)
spark

Мы должны получить версиюv3.2.1 как в выводе ячейки, так и в пользовательском интерфейсе Spark (http://localhost:4040). На этом этапе вы, вероятно, захотите преобразовать master и jupyter в 127.0.0.1 в файле hosts, чтобы избежать повторного ввода master:8088 в localhost:8088. Просто не забудьте очистить записи, когда закончите!

Затем мы можем просто установить другую версию PySpark, например 3.0.0 (pip install pyspark==3.0.0 ), перезапустить ядро ​​Jupyter, и оно также будет нормально работать в кластере, при условии, что версии Java и Hadoop остаются совместимыми. .

Кстати, если кто-то настаивает на том, что в его кластере установлен Spark, он может говорить об автономномкластере Spark, который отличается от Hadoop YARN!

Урок № 2: Для пользовательских функций Python требуются интерпретаторы Python, черт возьми!

Допустим, нам нужно вызвать некоторый код Python в наших удаленных исполнителях Spark. Мы можем сделать это с помощью определяемых пользователем функций Python (UDF) следующим образом:

from pyspark.sql.functions import udf
def get_python_version():
    import sys
    return f'{sys.executable} v{sys.version}'
py_udf = udf(get_python_version)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)

В зависимости от запущенной версии Spark ваша пользовательская функция может работать успешно, а может и не работать. В Spark 3.0.3 это не удается в нашем псевдокластере с сообщением об ошибке: Cannot run program "python" , но успешно в Spark 3.1.1 и более поздних версиях. Это связано с тем, что с версии 3.1.1 python по умолчанию был обновлен с python2.7 до python3 , что соответствует тому, что есть у нашего псевдокластера.

Прежде чем мы должным образом исправим проблему, давайте сначала рассмотрим, почему она не работает. Если подумать, мы явно уже запускаем какой-то код Python, так как же он может жаловаться, что не может найти python ?

Если мы посмотрим внимательнее, оценка f'{sys.executable} v{sys.version}' распределена для запуска на удаленных исполнителях Spark, а не на драйвере Spark или локальном Jupyter/python. клиент. Перед вызовом .show() эти удаленные узлы запускают только JVM для Spark Executors, без каких-либо s̶li̶t̶h̶e̶r фрагментов кода Python, работающего где-либо. Только при вызове нашей пользовательской функции Python они пытаются найти python и впоследствии выдают ошибку.

Чтобы помочь нашим исполнителям Spark найти python , мы перезапускаем ядро ​​(чтобы очистить существующий SparkSession), а затем просто определяем переменную среды PYSPARK_PYTHON [docs] перед запуском новый SparkSession.

import os
os.environ['PYSPARK_PYTHON'] = 'python3'

В качестве альтернативы, мы также можем обратиться к каждому рабочему узлу в кластере, чтобы создать символическую ссылку python на установленный python3 или даже конкретный python3.8 , но это обременительно без менеджера кластера и даже обескураживает, поскольку это может нарушить работу другого программного обеспечения, которое ожидает, что python укажет на узел. другая версия.

"Что, если это не те питоны, которых мы ищем?"

Будь то для получения передовых функций, таких как сопоставление шаблонов в python3.10, или для удовлетворения устаревшего кода или пакетов, нам иногда нужно запустить другую версию Python, которая не установлена ​​в кластере. Это приводит нас к…

Урок № 3: Принесите свой собственный Python на день кластера!

Согласно Spark docs, conda — единственный официально рекомендуемый способ распространения выбранного нами интерпретатора Python. В целях демонстрации, поскольку Jupyter работает под управлением Python 3.8.10, мы решили распространять очень похожий Python 3.8.13, поскольку основная версия (3.8) должна остаются одинаковыми между клиентом Jupyter и Spark Executors. На практике ваше ядро ​​Jupyter уже должно использовать нужный вам Python.

Используя Miniconda, предустановленную в /opt/conda(спасибо Dockerfile!), следующая ячейка Jupyter создает новую среду conda с Python 3.8.13 и упаковывает ее в архив .tgz, готовый к использованию. распределение.

!/opt/conda/bin/conda install -y conda-pack
!/opt/conda/bin/conda create -y python=3.8.13 --name=conda-env
!/opt/conda/bin/conda pack --prefix /opt/conda/envs/conda-env -f -o ./conda-env.tgz

Затем мы перезапускаем ядро, настраиваем spark.yarn.dist.archives на локальный путь упакованного архива .tgz, за которым следует # и окончательное имя извлеченной папки, conda-env . Это говорит Spark на YARN о необходимости распространения локального .tgz на все узлы и извлечения его локально как conda-env . Отсюда мы просто указываем PYSPARK_PYTHON на ./conda-env/bin/python3.8 и вуаля! Каждый узел получает интерпретатор Python!

import os
os.environ['PYSPARK_PYTHON'] = './conda-env/bin/python3.8'
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .config('spark.yarn.dist.archives','./conda-env.tgz#conda-env')
    .getOrCreate()
)
spark

Урок № 4: Пакеты PyPI помогают при распространении сред conda

Теперь, когда мы справились с тривиальным случаем оценки sys.version, давайте попробуем предоставить зависимости PyPI, такие как pandas==1.4.2, для исполнителей Spark. Это очень просто:

Шаг 1: установите pandas в среду conda и переупакуйте его.

!/opt/conda/envs/conda-env/bin/pip install pandas==1.4.2
!/opt/conda/bin/conda pack --prefix /opt/conda/envs/conda-env -f -o ./conda-env.tgz

Шаг 2. перезапустите ядро, заново создайте SparkSession, а затем попробуйте импортировать pandas.

from pyspark.sql.functions import udf
def get_pandas_version():
    import pandas
    return f'pandas v{pandas.__version__}'
py_udf = udf(get_pandas_version)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)

Шаг 3. Получайте прибыль!

Совет.spark.yarn.dist.archives также принимает пути HDFS, что позволяет среде conda входить в кеш YARN и немного сократить время запуска SparkSession. Сами jar-файлы Spark также могут кэшироваться таким же образом.

В качестве альтернативы, если в вашем кластере уже есть нужный интерпретатор Python и отсутствуют только пакеты PyPI, вы можете распространять только пакеты, используя venv и venv-pack аналогичным образом.

Хак: Интересно, что технически также возможно распространять нашу собственную переносимую среду выполнения Java и указывать JAVA_HOME для ее использования. Однако это недокументировано, поэтому используйте его только в крайнем случае и на свой страх и риск! Помните, что сам Hadoop также работает на доступной версии Java кластера.

Урок №5: Новый источник данных? Для этого есть баночка!

Хотя Spark оптимально работает с совместно расположенными файлами HDFS или Hive, мы часто извлекаем или загружаем данные из внешних источников, таких как Google BigQuery, AWS S3, Kafka, PostgreSQL и т. д. Почти всегда эти внешние источники данных обеспечивают подключение к Spark через Java .jar архивы. Итак, давайте рассмотрим наши варианты на примере Google spark-bigquery-connector.

Как правило, мы загружаем .jar (убедитесь, что версия Scala совместима!) как bq.jar

!curl https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.24.2.jar -o bq.jar

Затем перезапустите ядро и предоставьте ему Spark следующим образом:

from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .config('spark.yarn.dist.jars','./bq.jar')
    .config('spark.driver.extraClassPath','./bq.jar')
    .getOrCreate()
)
spark

Подобно распространению сред conda, мы сначала распространяем банку, используя spark.yarn.dist.jars (разделители-запятые), а затем заставляем Spark Driver и Executors забрать ее, используя spark.driver.extraClassPath (разделяя двоеточием в Unix, разделяя точкой с запятой в Windows). Мы не указываем extraClassPath для Executors, потому что они уже наследуются от Driver.

Затем мы тестируем коннектор с небольшими выборками общедоступных данных:

df = (spark.read
      .option("gcpAccessToken", '<your token here>')
      .option("parentProject", "<your GCP project here>")
      .format("bigquery")
      .load("bigquery-public-data.samples.shakespeare")
     )
df.groupBy('corpus') \
  .sum('word_count') \
  .orderBy('sum(word_count)', ascending=False) \
  .show(3)

Отлично, это работает!

Однако часто есть более простой способ: перезапустить ядро, просто найти имя артефакта maven и настроить spark.jars.packages следующим образом:

from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .config('spark.jars.packages','com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2')
    .getOrCreate()
)
spark

и Spark автоматически загрузит и будет использовать указанные файлы jar (+ зависимости!) из центрального репозитория Maven.

Урок № 6: Идти одному опасно! Возьмите с собой свой helper.py.

За исключением тривиальных задач, приложения Spark производственного уровня часто становятся сложнее, чем один файл main.py. В нашем примере мы создадим два файла helper.py и utils.py, каждый из которых имеет тривиальную функцию вывода __name__ своего модуля:

with open('./helper.py', 'w') as f:
    f.write(
'''def get_module():
    return __name__
''')
with open('./utils.py', 'w') as f:
    f.write(
'''def get_module():
    return __name__
''')
import helper
import utils
f'{helper.get_module()}, {utils.get_module()}'

Затем мы объединяем два файла .py в архив .zip и передаем его Spark с помощью .addPyFile():

!zip friends.zip utils.py helper.py
spark.sparkContext.addPyFile('./friends.zip')

На этот раз нам не нужно перезапускать ядро, чтобы .addPyFile() вступило в силу. Spark добавит файл .zip ко всем путям поиска Python Executor, поэтому мы можем вызывать их следующим образом:

from pyspark.sql.functions import udf
def get_module_names():
    import utils
    import helper
    return f'{helper.get_module()}, {utils.get_module()}'
py_udf = udf(get_module_names)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)

На этом этапе вы, возможно, воссоздали SparkSession безопределения первого PYSPARK_PYTHON='python3', поэтому не забудьте перезапустить ядро и снова определить его, если вы используете Spark 3.0. 3 или ранее.

Бонусный урок № 1: Пробовали ли вы выключить и снова включить его?

Вы могли заметить, что акцент делается на перезапуске ядра каждый раз, когда мы хотим воссоздать SparkSession. Это связано с тем, что при вызове spark.stop() освобождаются ресурсы из кластера, но остается оборванная JVM в качестве дочернего процесса Jupyter/python. Перезапуск ядра гарантирует создание новой JVM, которая может получить полный набор новых переменных среды и/или конфигураций Spark.

Бонусный урок № 2: время никого не ждет; изменение - единственная постоянная

За десять лет разработки Spark с открытым исходным кодом его зависимости со временем изменились, как и сам Spark. Таким образом, на удивление легко неосознанно смешивать несовместимые зависимости, даже при использовании общедоступных версий Java, Python, Scala или Hadoop.

Вот несколько советов:

  • Python 3.8+ требует Spark 3.0.0+
  • В последний раз Java 7 поддерживалась в Spark 2.1.0.
  • Последний раз Scala 2.11.x поддерживался в Spark 2.4.8,
    а Scala 2.12.x — в Spark 2.4.1 и более поздних версиях.

Помните, что любой дополнительный пакет Python или Java, который вы выберете, может иметь дополнительные ограничения версии, так что остерегайтесь этого.

Завершение

Запуск Spark на YARN позволяет нам распространять зависимости вместе с нашими основными программами. Будь то jar-файлы Java, пакеты PyPI или даже сам интерпретатор python, все они следуют одному и тому же шаблону: сначала упакуйте их в архивы, скажите YARN распространить их на все узлы, а затем начните их использовать!

Концепция довольно проста, но дьявол кроется в деталях обеспечения совместимости всех зависимостей друг с другом.