Масштабируемые панды стали возможны благодаря недавно выпущенной Spark3.2.0
1. Введение
Сообщество Apache Spark 13 октября 2021 г. выпустило spark3.2.0. Они включили API Pandas в Spark как часть своего крупного обновления среди прочего. Pandas - мощный и хорошо известный среди специалистов по обработке данных пакет. Однако у Pandas есть собственное ограничение на обработку больших данных, потому что он обрабатывает данные на одной машине. Чтобы восполнить этот пробел, несколько лет назад компания Databricks выпустила библиотеку Koalas.
Добавление Pandas API в spark3.2.0 позволяет избежать использования сторонней библиотеки. Теперь пользователи Pandas могут сохранить свои Pandas и масштабировать процесс до многоузловых искровых кластеров.
2. Цель
В этой статье конкретно рассказывается, как можно использовать Pandas API на Spark для:
- Чтение данных как фрейм данных pandas-spark (df)
- Чтение данных как spark df и преобразование в pandas-spark df
- Создать pandas-spark df
- Используйте SQL-запрос напрямую к pandas-spark df
- Используйте функцию plot для построения pandas-spark df
- Переход с koalas на pandas API на Spark
3. Данные
Вы можете получить CSV-файл и блокнот Jupyter, использованные в этой статье, на моей странице GitHub здесь. Это небольшой набор данных, однако проиллюстрированные здесь подходы могут быть легко использованы в больших наборах данных.
4. Требуется установка
Прежде чем продолжить, сначала скачайте spark3.2.0 здесь и правильно настройте PySpark. Вам также потребуются библиотеки pyarrow и plotly, которые можно установить через интерфейс ноутбука jupyter, как показано ниже:
- pyarrow (! conda install -c conda-forge - да, pyarrow)
- сюжетно (! conda install - да сюжетно)
Потрясающие! если ваш PySpark запущен и работает, давайте перейдем к следующему разделу.
5. Импортируйте библиотеки и запустите сеанс Spark.
Здесь мы начинаем импортировать PySpark и запускаем сеанс, используя блок кодов, как показано ниже.
import pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.appName('spark3.2show').getOrCreate() print('Spark info :') spark
Информация об искре показывает, что используется версия 3.2.0.
Вы также можете проверить версию python и pyspark, как показано ниже. Я использовал искровую версию 3.2.0 с python 3.8.8.
print('Version of python: ') !python -V print('Version of pyspark :', pyspark.__version__)
Хорошо! давайте импортируем функцию read_csv для чтения данных CSV как pandas-spark df с помощью pyspark.pandas.
Если мы получаем предупреждение, как показано на рис. 3, мы можем установить для переменной среды PYARROW_IGNORE_TIMEZON значение 1 перед запуском из pyspark.pandas import read_csv.
from pyspark.pandas import read_csv
import pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.appName('spark3.2show').getOrCreate() print('Spark info :') spark print('Version of python: ') !python -V print('Version of pyspark :', pyspark.__version__) from pyspark.pandas import read_csv # To get rid of error set the environ variable as below os.environ["PYARROW_IGNORE_TIMEZONE"]="1" from pyspark.pandas import read_csv
6.1 Чтение данных как pandas-spark df из csv
Мы используем файл example_csv.csv, чтобы проиллюстрировать различные варианты использования pandas spark API. Функция read_csv возвращает df-файл pandas-spark (назовите его psdf).
# Define datapath # Read as pandas on spark df datapath = '/Users/...../' psdf = read_csv(datapath+'example_csv.csv') psdf.head(2)
Большой! мы только что создали pandas-spark df и теперь можем использовать функции pandas для последующей задачи. Например, psdf.head (2) и psdf.shape можно использовать для получения двух верхних строк и размера данных соответственно. Здесь, в отличие от стандартного python pandas df, вы получаете преимущество распараллеливания.
# get the data type # get the dimension # get the data columns' name print('Data type :', type(psdf)) print('Data shape :', psdf.shape) print('Data columns : \n ', psdf.columns)
Не только это, если вы хотите преобразовать pandas-spark df в spark df, это также можно сделать, просто используя функцию to_spark (). Это вернет фрейм данных Spark (назовите его: sdf), и теперь все функции pyspark можно использовать в этом df. Например, sdf.show (5) и sdf.printSchema () выводят 5 верхних строк и схему данных spark df соответственно.
# Convert from pandas on spark to spark dataframe # show top 5 rows from spark df sdf = psdf.to_spark() sdf.show(5)
# Print schema sdf.printSchema()
6.2 Чтение искры df из csv и преобразование в pandas-spark df
Мы также можем преобразовать spark df в pandas-spark df с помощью команды to_pandas_on_spark (). Это принимает вход как искру df и выводит pandas-spark df. Ниже мы читаем данные как spark df (назовем его sdf1). Чтобы подтвердить, что это искровый df, мы можем использовать type (sdf1), который показывает, что это искровой df, т.е. ‘Pyspark.sql.dataframe.DataFrame’.
# Read data using spark sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True) type(sdf1)
Принимая во внимание, что после преобразования в pandas-spark df (psdf1) типом будет pandas-spark df, то есть «pyspark.pandas.frame.DataFrame». Мы можем дополнительно подтвердить, что это pandas-spark df, имея возможность использовать функцию pandas, например, .head ().
# Convert to pandas-spark df psdf1 = sdf1.to_pandas_on_spark() # Print top psdf1.head(2)
# Check type of psdf1 type(psdf1)
6.3 Создание pandas-spark df
В этом разделе вместо создания pandas-spark df из CSV мы можем создать его напрямую, импортировав pyspark.pandas как ps. Ниже мы создали psdf2 как pandas-spark df с помощью ps.DataFrame (). psdf2 имеет 2 функции и 3 строки.
import pandas as pd import pyspark.pandas as ps # Create pandas on spark df psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]}) psdf2.head()
Если мы хотим преобразовать pandas-spark df (psdf2) обратно в spark df, то у нас есть легкодоступная функция, как объяснялось ранее, to_spark (), которая выполняет эту работу. Синтаксис обеспечивает гибкость при смене типов фреймов данных. Это может быть полезно в зависимости от функций (от pandas или от Spark), которые вы хотите использовать в своем анализе.
# Again we can convert from pandas-spark df to spark df sdf2 = psdf2.to_spark() sdf2.show(2)
7. Запрашивайте pandas на Spark df напрямую с помощью SQL.
Еще одна замечательная тема для обсуждения API pandas-spark - это функция sql. Хорошо, давайте воспользуемся этой функцией в pandas-spark df (psdf2), созданном ранее, чтобы извлечь информацию из. На самом деле нам просто нужно использовать функцию ps.sql () поверх pandas-spark df для выполнения SQL-запроса. Как показано ниже, функция count (*) возвращает всего 3 наблюдения в данных psdf2. Точно так же второй запрос выводит отфильтрованные данные с оценкой более 80.
# Query data using SQL. Input data is pandas on spark df (psdf) ps.sql("SELECT count(*) as num FROM {psdf2}")
# Returns pandas on spark df selected_data = ps.sql("SELECT id, score FROM {psdf2} WHERE score>80") selected_data.head()
8. Сюжет на pandas df и pandas на spark df
Большой! что вы зашли так далеко. Теперь давайте кратко коснемся графических возможностей этого нового API Pandas-Spark. В отличие от статического графика по умолчанию в стандартном API-интерфейсе Python pandas, график по умолчанию в API-интерфейсе pandas-spark является интерактивным, поскольку по умолчанию он использует plotly. Ниже мы импортируем данные как pandas df, так и pandas-spark df и строим гистограмму переменной зарплаты для каждого из типов данных.
# Read data as pandas dataframe pddf = pd.read_csv(datapath+'example_csv.csv') type(pddf) #pandas.core.frame.DataFrame pddf.head(2)
На рисунке ниже показана гистограмма зарплаты от pandas df.
# Read data as pandas on spark df pdsdf = read_csv(datapath+'example_csv.csv') type(pdsdf) #pyspark.pandas.frame.DataFrame # plot histogram of pandas df pddf['salary'].hist(bins=3)
Я показал гистограмму той же переменной из pandas-spark df ниже, которая на самом деле является интерактивным сюжетом.
Примечание. Приведенный ниже график вставлен как изображение, поэтому он статичен. У вас должна быть возможность увеличивать / уменьшать масштаб (сделать его интерактивным), если вы запустите приведенный ниже синтаксис в блокноте jupyter.
# plot histogram of pandas on spark df import plotly pdsdf['salary'].hist(bins=3)
9. Переход с Koalas на Pandas API.
Наконец, давайте поговорим о том, какие изменения потребуются при переходе от библиотеки Koalas к API pandas-spark. В таблице ниже показаны некоторые изменения синтаксиса от Koalas до новых pandas в Spark API.
10. Резюме
В статье рассказывается, как мы можем использовать недавно добавленный API pandas в Spark3.2.0 для чтения данных, создания фрейма данных, использования SQL непосредственно в фрейме данных pandas-spark и перехода от существующей библиотеки Koalas к API pandas-spark.
Спасибо за чтение!
Следуйте за мной в LinkedIn, чтобы получать больше информации о навыках в области науки о данных.
Удачного обучения !!!
11. Ссылки
Https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html
Https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html
Https://www.datanami.com/2021/10/26/spark-gets-closer-hooks-to-pandas-sql-with-version-3-2/
Https://databricks.com/blog/2020/08/11/interoperability-between-koalas-and-apache-spark.html