Масштабируемые панды стали возможны благодаря недавно выпущенной Spark3.2.0

1. Введение

Сообщество Apache Spark 13 октября 2021 г. выпустило spark3.2.0. Они включили API Pandas в Spark как часть своего крупного обновления среди прочего. Pandas - мощный и хорошо известный среди специалистов по обработке данных пакет. Однако у Pandas есть собственное ограничение на обработку больших данных, потому что он обрабатывает данные на одной машине. Чтобы восполнить этот пробел, несколько лет назад компания Databricks выпустила библиотеку Koalas.

Добавление Pandas API в spark3.2.0 позволяет избежать использования сторонней библиотеки. Теперь пользователи Pandas могут сохранить свои Pandas и масштабировать процесс до многоузловых искровых кластеров.

2. Цель

В этой статье конкретно рассказывается, как можно использовать Pandas API на Spark для:

  • Чтение данных как фрейм данных pandas-spark (df)
  • Чтение данных как spark df и преобразование в pandas-spark df
  • Создать pandas-spark df
  • Используйте SQL-запрос напрямую к pandas-spark df
  • Используйте функцию plot для построения pandas-spark df
  • Переход с koalas на pandas API на Spark

3. Данные

Вы можете получить CSV-файл и блокнот Jupyter, использованные в этой статье, на моей странице GitHub здесь. Это небольшой набор данных, однако проиллюстрированные здесь подходы могут быть легко использованы в больших наборах данных.

4. Требуется установка

Прежде чем продолжить, сначала скачайте spark3.2.0 здесь и правильно настройте PySpark. Вам также потребуются библиотеки pyarrow и plotly, которые можно установить через интерфейс ноутбука jupyter, как показано ниже:

  • pyarrow (! conda install -c conda-forge - да, pyarrow)
  • сюжетно (! conda install - да сюжетно)

Потрясающие! если ваш PySpark запущен и работает, давайте перейдем к следующему разделу.

5. Импортируйте библиотеки и запустите сеанс Spark.

Здесь мы начинаем импортировать PySpark и запускаем сеанс, используя блок кодов, как показано ниже.

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark

Информация об искре показывает, что используется версия 3.2.0.

Вы также можете проверить версию python и pyspark, как показано ниже. Я использовал искровую версию 3.2.0 с python 3.8.8.

print('Version of python: ') 
!python -V
print('Version of pyspark :', pyspark.__version__)

Хорошо! давайте импортируем функцию read_csv для чтения данных CSV как pandas-spark df с помощью pyspark.pandas.

Если мы получаем предупреждение, как показано на рис. 3, мы можем установить для переменной среды PYARROW_IGNORE_TIMEZON значение 1 перед запуском из pyspark.pandas import read_csv.

from pyspark.pandas import read_csv

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
print('Version of python: ') 
!python -V
print('Version of pyspark :', pyspark.__version__)
from pyspark.pandas import read_csv

# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"
from pyspark.pandas import read_csv

6.1 Чтение данных как pandas-spark df из csv

Мы используем файл example_csv.csv, чтобы проиллюстрировать различные варианты использования pandas spark API. Функция read_csv возвращает df-файл pandas-spark (назовите его psdf).

# Define datapath
# Read as pandas on spark df
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)

Большой! мы только что создали pandas-spark df и теперь можем использовать функции pandas для последующей задачи. Например, psdf.head (2) и psdf.shape можно использовать для получения двух верхних строк и размера данных соответственно. Здесь, в отличие от стандартного python pandas df, вы получаете преимущество распараллеливания.

# get the data type
# get the dimension
# get the data columns' name
print('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)

Не только это, если вы хотите преобразовать pandas-spark df в spark df, это также можно сделать, просто используя функцию to_spark (). Это вернет фрейм данных Spark (назовите его: sdf), и теперь все функции pyspark можно использовать в этом df. Например, sdf.show (5) и sdf.printSchema () выводят 5 верхних строк и схему данных spark df соответственно.

# Convert from pandas on spark  to spark dataframe
# show top 5 rows from spark df
sdf = psdf.to_spark()
sdf.show(5)

# Print schema
sdf.printSchema()

6.2 Чтение искры df из csv и преобразование в pandas-spark df

Мы также можем преобразовать spark df в pandas-spark df с помощью команды to_pandas_on_spark (). Это принимает вход как искру df и выводит pandas-spark df. Ниже мы читаем данные как spark df (назовем его sdf1). Чтобы подтвердить, что это искровый df, мы можем использовать type (sdf1), который показывает, что это искровой df, т.е. ‘Pyspark.sql.dataframe.DataFrame’.

# Read data using spark 
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)

Принимая во внимание, что после преобразования в pandas-spark df (psdf1) типом будет pandas-spark df, то есть «pyspark.pandas.frame.DataFrame». Мы можем дополнительно подтвердить, что это pandas-spark df, имея возможность использовать функцию pandas, например, .head ().

# Convert to pandas-spark df
psdf1 = sdf1.to_pandas_on_spark()
# Print top 
psdf1.head(2)

# Check type of psdf1
type(psdf1)

6.3 Создание pandas-spark df

В этом разделе вместо создания pandas-spark df из CSV мы можем создать его напрямую, импортировав pyspark.pandas как ps. Ниже мы создали psdf2 как pandas-spark df с помощью ps.DataFrame (). psdf2 имеет 2 функции и 3 строки.

import pandas as pd
import pyspark.pandas as ps
# Create pandas on spark df
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()

Если мы хотим преобразовать pandas-spark df (psdf2) обратно в spark df, то у нас есть легкодоступная функция, как объяснялось ранее, to_spark (), которая выполняет эту работу. Синтаксис обеспечивает гибкость при смене типов фреймов данных. Это может быть полезно в зависимости от функций (от pandas или от Spark), которые вы хотите использовать в своем анализе.

# Again we can convert from pandas-spark df to spark df
sdf2 = psdf2.to_spark()
sdf2.show(2)

7. Запрашивайте pandas на Spark df напрямую с помощью SQL.

Еще одна замечательная тема для обсуждения API pandas-spark - это функция sql. Хорошо, давайте воспользуемся этой функцией в pandas-spark df (psdf2), созданном ранее, чтобы извлечь информацию из. На самом деле нам просто нужно использовать функцию ps.sql () поверх pandas-spark df для выполнения SQL-запроса. Как показано ниже, функция count (*) возвращает всего 3 наблюдения в данных psdf2. Точно так же второй запрос выводит отфильтрованные данные с оценкой более 80.

# Query data using SQL. Input data is pandas on spark df (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")

# Returns pandas on spark df
selected_data = ps.sql("SELECT id, score  FROM {psdf2} WHERE score>80")
selected_data.head()

8. Сюжет на pandas df и pandas на spark df

Большой! что вы зашли так далеко. Теперь давайте кратко коснемся графических возможностей этого нового API Pandas-Spark. В отличие от статического графика по умолчанию в стандартном API-интерфейсе Python pandas, график по умолчанию в API-интерфейсе pandas-spark является интерактивным, поскольку по умолчанию он использует plotly. Ниже мы импортируем данные как pandas df, так и pandas-spark df и строим гистограмму переменной зарплаты для каждого из типов данных.

# Read data as pandas dataframe 
pddf = pd.read_csv(datapath+'example_csv.csv')
type(pddf)
#pandas.core.frame.DataFrame
pddf.head(2)

На рисунке ниже показана гистограмма зарплаты от pandas df.

# Read data as pandas on spark df
pdsdf = read_csv(datapath+'example_csv.csv')
type(pdsdf)
#pyspark.pandas.frame.DataFrame
# plot histogram of pandas df
pddf['salary'].hist(bins=3)

Я показал гистограмму той же переменной из pandas-spark df ниже, которая на самом деле является интерактивным сюжетом.

Примечание. Приведенный ниже график вставлен как изображение, поэтому он статичен. У вас должна быть возможность увеличивать / уменьшать масштаб (сделать его интерактивным), если вы запустите приведенный ниже синтаксис в блокноте jupyter.

# plot histogram of pandas on spark df
import plotly
pdsdf['salary'].hist(bins=3)

9. Переход с Koalas на Pandas API.

Наконец, давайте поговорим о том, какие изменения потребуются при переходе от библиотеки Koalas к API pandas-spark. В таблице ниже показаны некоторые изменения синтаксиса от Koalas до новых pandas в Spark API.

10. Резюме

В статье рассказывается, как мы можем использовать недавно добавленный API pandas в Spark3.2.0 для чтения данных, создания фрейма данных, использования SQL непосредственно в фрейме данных pandas-spark и перехода от существующей библиотеки Koalas к API pandas-spark.

Спасибо за чтение!

Следуйте за мной в LinkedIn, чтобы получать больше информации о навыках в области науки о данных.

Удачного обучения !!!

11. Ссылки

Https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html

Https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html

Https://www.datanami.com/2021/10/26/spark-gets-closer-hooks-to-pandas-sql-with-version-3-2/

Https://www.datamechanics.co/blog-post/apache-spark-3-2-release-main-features-whats-new-for-spark-on-kubernetes

Https://databricks.com/blog/2020/08/11/interoperability-between-koalas-and-apache-spark.html