Краткое введение в PySpark

Учебник по PySpark для науки о данных

PySpark - отличный язык для выполнения масштабного исследовательского анализа данных, построения конвейеров машинного обучения и создания ETL для платформы данных. Если вы уже знакомы с Python и такими библиотеками, как Pandas, тогда PySpark - отличный язык для изучения, чтобы создавать более масштабируемые анализы и конвейеры. Цель этого поста - показать, как приступить к работе с PySpark и выполнять общие задачи.



Данные игры NHL
Информация об игре, команде, игроке и играх, включая координаты x, y www.kaggle.com



Мы будем использовать Databricks для среды Spark и набор данных NHL от Kaggle в качестве источника данных для анализа. В этом посте показано, как читать и записывать данные в фреймы данных Spark, создавать преобразования и агрегаты этих фреймов, визуализировать результаты и выполнять линейную регрессию. Я также покажу, как масштабируемо смешивать обычный код Python с PySpark с помощью пользовательских функций Pandas. Чтобы упростить задачу, мы сосредоточимся на пакетной обработке и избежим некоторых сложностей, которые возникают при конвейерах потоковой передачи данных.

Полный блокнот для этого поста доступен на github.

Среда

Существует несколько различных способов начать работу со Spark:

  • Самостоятельное размещение: вы можете настроить кластер самостоятельно, используя «голые» машины или виртуальные машины. Apache Ambari - полезный проект для этого варианта, но я не рекомендую его для быстрого начала работы.
  • Поставщики облачных услуг. Большинство поставщиков облачных услуг предлагают кластеры Spark: AWS имеет EMR, а GCP - DataProc. Раньше я вел блог о DataProc, и вы можете получить интерактивную среду быстрее, чем самостоятельный хостинг.
  • Решения для поставщиков. Компании, включая Databricks и Cloudera, предоставляют решения Spark, что упрощает запуск и запуск Spark.

Решение для использования зависит от безопасности, стоимости и существующей инфраструктуры. Если вы пытаетесь приступить к работе со средой для обучения, я бы посоветовал использовать Databricks Community Edition.

В этой среде легко начать работу с кластером Spark и средой ноутбука. Для этого руководства я создал кластер со средой выполнения Spark 2.4 и Python 3. Для запуска кода, приведенного в этом сообщении, вам понадобится как минимум Spark версии 2.3 для функций Pandas UDFs.

Фреймы данных Spark

Ключевым типом данных, используемым в PySpark, является фрейм данных Spark. Этот объект можно рассматривать как таблицу, распределенную по кластеру, и ее функциональность аналогична фреймам данных в R и Pandas. Если вы хотите выполнять распределенные вычисления с использованием PySpark, вам нужно будет выполнять операции с фреймами данных Spark, а не с другими типами данных Python.

Также можно использовать фреймы данных Pandas при использовании Spark, вызвав toPandas () для фрейма данных Spark, который возвращает объект pandas. Однако этой функции обычно следует избегать, за исключением работы с небольшими фреймами данных, поскольку она втягивает весь объект в память на одном узле.

Одно из ключевых различий между фреймами данных Pandas и Spark - нетерпеливое и ленивое выполнение. В PySpark операции откладываются до тех пор, пока результат не понадобится в конвейере. Например, вы можете указать операции для загрузки набора данных из S3 и применения ряда преобразований к фрейму данных, но эти операции не будут применены немедленно. Вместо этого записывается граф преобразований, и как только данные действительно необходимы, например, при записи результатов обратно в S3, преобразования применяются как единая конвейерная операция. Этот подход используется, чтобы избежать втягивания полного кадра данных в память и обеспечивает более эффективную обработку в кластере машин. С фреймами данных Pandas все втягивается в память, и каждая операция Pandas применяется немедленно.

В общем, рекомендуется по возможности избегать активных операций в Spark, поскольку это ограничивает объем вашего конвейера, который может быть эффективно распределен.

Чтение данных

Один из первых шагов, которые нужно изучить при работе со Spark, - это загрузка набора данных в фрейм данных. После загрузки данных в фрейм данных вы можете применять преобразования, выполнять анализ и моделирование, создавать визуализации и сохранять результаты. В Python вы можете загружать файлы прямо из локальной файловой системы с помощью Pandas:

import pandas as pd
pd.read_csv("dataset.csv")

В PySpark загрузка файла CSV немного сложнее. В распределенной среде нет локального хранилища, поэтому для указания пути к файлу необходимо использовать распределенную файловую систему, такую ​​как HDFS, хранилище файлов Databricks (DBFS) или S3.

Обычно при использовании PySpark я работаю с данными в S3. Многие базы данных обеспечивают функцию выгрузки в S3, а также можно использовать консоль AWS для перемещения файлов с локального компьютера на S3. В этой публикации я буду использовать файловую систему Databricks (DBFS), которая предоставляет пути в виде / FileStore. Первый шаг - загрузить CSV-файл, который вы хотите обработать.

Следующим шагом является чтение файла CSV в фрейм данных Spark, как показано ниже. Этот фрагмент кода указывает путь к CSV-файлу и передает ряд аргументов функции read для обработки файла. Последний шаг отображает подмножество загруженного фрейма данных, подобное df.head() в Pandas.

file_location = "/FileStore/tables/game_skater_stats.csv"
df = spark.read.format("csv").option("inferSchema", 
           True).option("header", True).load(file_location)
display(df)

Я предпочитаю использовать паркетный формат при работе со Spark, потому что это формат файла, который включает метаданные о типах данных столбца, предлагает сжатие файлов и является форматом файла, который разработан для хорошей работы со Spark. AVRO - еще один формат, который хорошо работает со Spark. В приведенном ниже фрагменте показано, как взять фрейм данных из предыдущего фрагмента и сохранить его как файл паркета в DBFS, а затем перезагрузить фрейм данных из сохраненного файла паркета.

df.write.save('/FileStore/parquet/game_skater_stats',  
               format='parquet')
df = spark.read.load("/FileStore/parquet/game_skater_stats")
display(df)

Результат этого шага тот же, но последовательность выполнения существенно отличается. При чтении файлов CSV в фреймы данных Spark выполняет операцию в режиме ожидания, что означает, что все данные загружаются в память до того, как начнется выполнение следующего шага, в то время как при чтении файлов в формате паркета используется ленивый подход. Как правило, при работе со Spark нужно избегать нетерпеливых операций, и если мне нужно обработать большие файлы CSV, я сначала преобразую набор данных в формат паркета, прежде чем выполнять остальную часть конвейера.

Часто вам нужно обработать большое количество файлов, например, сотни паркетных файлов, расположенных по определенному пути или каталогу в DBFS. В Spark вы можете включить подстановочный знак в путь для обработки коллекции файлов. Например, вы можете загрузить пакет файлов паркета из S3 следующим образом:

df = spark.read .load("s3a://my_bucket/game_skater_stats/*.parquet")

Этот подход полезен, если у вас есть отдельный файл паркета в день или если в вашем конвейере есть предыдущий шаг, который выводит сотни файлов паркета.

Если вы хотите читать данные из базы данных, такой как Redshift, рекомендуется сначала выгрузить данные в S3, прежде чем обрабатывать их с помощью Spark. В Redshift команду выгрузить можно использовать для экспорта данных в S3 для обработки:

unload ('select * from data_to_process') 
to 's3://my_bucket/game_data'  
iam_role 'arn:aws:iam::123:role/RedshiftExport';

Существуют также библиотеки для баз данных, такие как искровое красное смещение, которые упрощают выполнение этого процесса.

Запись данных

Как и при чтении данных с помощью Spark, при использовании PySpark не рекомендуется записывать данные в локальное хранилище. Вместо этого вам следует использовать распределенную файловую систему, такую ​​как S3 или HDFS. Если вы собираетесь обрабатывать результаты с помощью Spark, то паркет - хороший формат для сохранения фреймов данных. В приведенном ниже фрагменте показано, как сохранить фрейм данных в DBFS и S3 как паркет.

# DBFS (Parquet)
df.write.save('/FileStore/parquet/game_stats',format='parquet')
# S3 (Parquet)
df.write.parquet("s3a://my_bucket/game_stats", mode="overwrite")

При сохранении фрейма данных в формате паркета он часто разбивается на несколько файлов, как показано на изображении ниже.

Если вам нужны результаты в файле CSV, потребуется немного другой шаг вывода. Одно из основных отличий этого подхода заключается в том, что все данные будут отправлены в один узел перед выводом в CSV. Этот подход рекомендуется, когда вам нужно сохранить небольшой фрейм данных и обработать его в системе за пределами Spark. В приведенном ниже фрагменте показано, как сохранить фрейм данных в виде отдельного файла CSV в DBFS и S3.

# DBFS (CSV)
df.write.save('/FileStore/parquet/game_stats.csv', format='csv')
# S3 (CSV)
df.coalesce(1).write.format("com.databricks.spark.csv")
   .option("header", "true").save("s3a://my_bucket/game_sstats.csv")

Другой распространенный выход для сценариев Spark - это база данных NoSQL, такая как Cassandra, DynamoDB или Couchbase. Это выходит за рамки данной статьи, но один подход, который я видел в прошлом, - это запись фрейма данных в S3, а затем запуск процесса загрузки, который сообщает системе NoSQL загрузить данные по указанному пути на S3.

Я также пропустил запись в источник потокового вывода, например Kafka или Kinesis. Эти системы удобнее использовать при использовании Spark Streaming.

Преобразование данных

С фреймами данных Spark можно выполнять множество различных типов операций, во многом аналогичные широкому спектру операций, которые можно применять к фреймам данных Pandas. Один из способов выполнения операций с фреймами данных Spark - через Spark SQL, который позволяет запрашивать фреймы данных, как если бы они были таблицами. В приведенном ниже фрагменте показано, как найти лучших игроков в наборе данных.

df.createOrReplaceTempView("stats")
display(spark.sql("""
  select player_id, sum(1) as games, sum(goals) as goals
  from stats
  group by 1
  order by 3 desc
  limit 5
"""))

Результатом является список идентификаторов игроков, количество появлений в играх и общее количество голов, забитых в этих играх. Если мы хотим показать имена игроков, нам нужно будет загрузить дополнительный файл, сделать его доступным как временное представление, а затем присоединиться к нему с помощью Spark SQL.

В приведенном выше фрагменте я использовал команду display для вывода образца набора данных, но также можно назначить результаты другому фрейму данных, который можно использовать на более поздних этапах конвейера. В приведенном ниже коде показано, как выполнить эти шаги, где первые результаты запроса назначаются новому фрейму данных, который затем назначается временному представлению и объединяется с коллекцией имен игроков.

top_players = spark.sql("""
  select player_id, sum(1) as games, sum(goals) as goals
  from stats
  group by 1
  order by 3 desc
  limit 5
""")
top_players.createOrReplaceTempView("top_players")
names.createOrReplaceTempView("names")
display(spark.sql("""
  select p.player_id, goals, _c1 as First, _c2 as Last
  from top_players p
  join names n
    on p.player_id = n._c0
  order by 2 desc  
"""))

Результат этого процесса показан ниже, определяя Алекса Овечкина как самого результативного игрока в НХЛ на основе набора данных Kaggle.

Существуют операции фрейма данных Spark для общих задач, таких как добавление новых столбцов, удаление столбцов, выполнение объединений и вычисление совокупной и аналитической статистики, но при начале работы эти операции может быть проще выполнить с помощью Spark SQL. Кроме того, легче переносить код с Python на PySpark, если вы уже используете библиотеки, такие как PandaSQL или framequery, для управления фреймами данных Pandas с помощью SQL.

Как и большинство операций с фреймами данных Spark, операции Spark SQL выполняются в режиме отложенного выполнения, что означает, что шаги SQL не будут оцениваться до тех пор, пока не потребуется результат. Spark SQL предоставляет отличный способ покопаться в PySpark без предварительного изучения новой библиотеки для фреймов данных.

Если вы используете Databricks, вы также можете создавать визуализации прямо в записной книжке, без явного использования библиотек визуализации. Например, мы можем построить среднее количество голов за игру, используя приведенный ниже код Spark SQL.

display(spark.sql("""
  select cast(substring(game_id, 1, 4) || '-' 
    || substring(game_id, 5, 2) || '-01' as Date) as month
    , sum(goals)/count(distinct game_id) as goals_per_goal
  from stats
  group by 1
  order by 1
"""))

Первоначальный вывод, отображаемый в записной книжке Databricks, представляет собой таблицу результатов, но мы можем использовать функцию построения графика для преобразования вывода в различные визуализации, такие как гистограмма, показанная ниже. Этот подход не поддерживает все визуализации, которые могут понадобиться специалисту по данным, но он значительно упрощает выполнение исследовательского анализа данных в Spark. При необходимости мы можем использовать функцию toPandas () для создания фрейма данных Pandas на узле драйвера, что означает, что для визуализации результатов можно использовать любую библиотеку построения графиков Python. Однако этот подход следует использовать только для небольших фреймов данных, поскольку все данные с готовностью загружаются в память на узле драйвера.

Я также посмотрел на среднее количество голов за бросок для игроков, забивших не менее 5 голов.

display(spark.sql("""
  select cast(goals/shots * 50 as int)/50.0 as Goals_per_shot
      ,sum(1) as Players 
  from (
    select player_id, sum(shots) as shots, sum(goals) as goals
    from stats
    group by 1
    having goals >= 5
  )  
  group by 1
  order by 1
"""))

Результаты этого преобразования показаны на диаграмме ниже. Большинство игроков, забивших хотя бы 5 голов, совершают броски от 4% до 12% времени.

MLlib

Один из распространенных вариантов использования Python для специалистов по обработке данных - построение прогнозных моделей. Хотя scikit-learn отлично подходит для работы с пандами, он не масштабируется до больших наборов данных в распределенной среде (хотя есть способы распараллелить его с помощью Spark). При построении прогнозных моделей с помощью PySpark и массивных наборов данных предпочтительной библиотекой является MLlib, поскольку она изначально работает с фреймами данных Spark. Не каждый алгоритм в scikit-learn доступен в MLlib, но существует множество вариантов, охватывающих многие варианты использования.

Чтобы использовать один из контролируемых алгоритмов в MLib, вам необходимо настроить фрейм данных с вектором функций и меткой в ​​виде скаляра. После подготовки вы можете использовать функцию fit для обучения модели. В приведенном ниже фрагменте показано, как объединить несколько столбцов в фрейме данных в один вектор функций с помощью VectorAssembler. Мы используем полученный фрейм данных для вызова функции fit, а затем генерируем сводную статистику для модели.

# MLlib imports
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# Create a vector representation for features
assembler = VectorAssembler(inputCols=['shots', 'hits', 'assists', 
    'penaltyMinutes','timeOnIce','takeaways'],outputCol="features")
train_df = assembler.transform(df)
# Fit a linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='goals')
lr_model = lr.fit(train_df)
# Output statistics 
trainingSummary = lr_model.summary
print("Coefficients: " + str(lr_model.coefficients))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R2: %f" % trainingSummary.r2)

Модель предсказывает, сколько голов забьет игрок, на основе количества бросков, времени в игре и других факторов. Однако производительность этой модели оставляет желать лучшего, она дает среднеквадратичную ошибку (RMSE) 0,375 и значение R-квадрата 0,125. Коэффициент с наибольшим значением был столбцом выстрелы, но он не дал достаточного сигнала для того, чтобы модель была точной.

При построении конвейера машинного обучения с помощью PySpark необходимо учитывать ряд дополнительных шагов, включая обучение и тестирование наборов данных, настройку гиперпараметров и хранение моделей. Приведенный выше фрагмент - это просто отправная точка для начала работы с MLlib.

Пользовательские файлы Pandas

Одна из функций Spark, которую я использовал совсем недавно, - это пользовательские функции Pandas (UDF), которые позволяют выполнять распределенные вычисления с фреймами данных Pandas в среде Spark. Общий способ работы этих UDF заключается в том, что вы сначала разделяете фрейм данных Spark с помощью оператора groupby, и каждый раздел отправляется на рабочий узел и транслируется в фрейм данных Pandas, который передается в UDF. Затем UDF возвращает преобразованный фрейм данных Pandas, который объединяется со всеми другими разделами, а затем преобразуется обратно в фрейм данных Spark. Конечный результат действительно полезен: вы можете использовать библиотеки Python, которым требуются Pandas, но теперь их можно масштабировать до массивных наборов данных, если у вас есть хороший способ разбить фрейм данных. Пользовательские функции Pandas были представлены в Spark 2.3, и я расскажу о том, как мы используем эту функцию в Zynga, во время Spark Summit 2019.

Аппроксимация кривой - это обычная задача, которую я выполняю как специалист по данным. В приведенном ниже фрагменте кода показано, как выполнить подгонку кривой для описания взаимосвязи между количеством выстрелов и попаданий, которые игрок записывает в ходе игры. Во фрагменте показано, как мы можем выполнить эту задачу для одного игрока, вызвав toPandas () для набора данных, отфильтрованного для одного игрока. Результатом этого шага являются два параметра (коэффициенты линейной регрессии), которые пытаются описать взаимосвязь между этими переменными.

# Sample data for a player 
sample_pd = spark.sql("""
  select * from stats
  where player_id = 8471214
""").toPandas()
# Import python libraries 
from scipy.optimize import leastsq
import numpy as np
# Define a function to fit
def fit(params, x, y):
    return (y - (params[0] + x * params[1] ))
# Fit the curve and show the results 
result = leastsq(fit, [1, 0], 
                 args=(sample_pd.shots, sample_pd.hits))
print(result)

Если мы хотим рассчитать эту кривую для каждого игрока и иметь большой набор данных, тогда вызов toPandas () завершится ошибкой из-за исключения нехватки памяти. Мы можем масштабировать эту операцию для всего набора данных, вызвав groupby () для player_id, а затем применив Pandas UDF, показанный ниже. Функция принимает в качестве входных данных фрейм данных Pandas, который описывает статистику игрового процесса для одного игрока, и возвращает итоговый фрейм данных, который включает player_id и подогнанные коэффициенты. Затем каждый из сводных фреймов данных Pandas объединяется в фрейм данных Spark, который отображается в конце фрагмента кода. Еще одна часть настройки для использования Pandas UDF - это определение схемы для результирующего фрейма данных, где схема описывает формат фрейма данных Spark, сгенерированного на этапе применения.

# Load necessary libraries
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd
# Create the schema for the resulting data frame
schema = StructType([StructField('ID', LongType(), True),
                     StructField('p0', DoubleType(), True),
                     StructField('p1', DoubleType(), True)])
# Define the UDF, input and outputs are Pandas DFs
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def analyze_player(sample_pd):
    # return empty params in not enough data
    if (len(sample_pd.shots) <= 1):
        return pd.DataFrame({'ID': [sample_pd.player_id[0]], 
                                   'p0': [ 0 ], 'p1': [ 0 ]})
     
    # Perform curve fitting     
    result = leastsq(fit, [1, 0], args=(sample_pd.shots, 
                                  sample_pd.hits))
    # Return the parameters as a Pandas DF 
    return pd.DataFrame({'ID': [sample_pd.player_id[0]], 
                       'p0': [result[0][0]], 'p1': [result[0][1]]})
# perform the UDF and show the results 
player_df = df.groupby('player_id').apply(analyze_player)
display(player_df)

Результат этого процесса показан ниже. Теперь у нас есть фрейм данных, который суммирует аппроксимацию кривой для каждого игрока, и мы можем выполнять эту операцию с огромным набором данных. При работе с огромными наборами данных важно выбрать или сгенерировать ключ раздела, чтобы добиться хорошего компромисса между количеством и размером разделов данных.

Лучшие практики

Я рассмотрел некоторые общие задачи использования PySpark, но также хотел дать несколько советов, как упростить переход от Python к PySpark. Вот некоторые из лучших практик, которые я собрал на основе моего опыта переноса нескольких проектов между этими средами:

  • Избегайте словарей, используйте фреймы данных: использование типов данных Python, таких как словари, означает, что код может быть невыполнимым в распределенном режиме. Вместо использования ключей для индексации значений в словаре рассмотрите возможность добавления еще одного столбца в фрейм данных, который можно использовать в качестве фильтра.
  • Используйте toPandas экономно. Вызов toPandas () приведет к загрузке всех данных в память на узле драйвера и предотвратит выполнение операций в распределенный режим. Эту функцию можно использовать, когда данные уже были агрегированы, и вы хотите использовать знакомые инструменты построения графиков Python, но ее не следует использовать для больших фреймов данных.
  • Избегайте циклов for. По возможности рекомендуется переписать логику цикла for, используя шаблон groupby-apply для поддержки параллельного выполнения кода. Я заметил, что использование этого шаблона в Python также привело к очистке кода, который легче перевести в PySpark.
  • Попробуйте свести к минимуму активные операции: чтобы ваш конвейер был максимально масштабируемым, рекомендуется избегать активных операций, которые загружают полные фреймы данных в память. Я заметил, что чтение в CSV - это активная операция, и моя работа заключается в том, чтобы сохранить фрейм данных как паркет, а затем перезагрузить его из паркета для создания более масштабируемых конвейеров.
  • Используйте framequery / pandasql, чтобы упростить перенос. Если вы работаете с чужим кодом Python, может быть сложно расшифровать, чего достигают некоторые операции Pandas. Если вы планируете переносить свой код с Python на PySpark, то использование библиотеки SQL для Pandas может упростить этот перевод.

Я обнаружил, что время, потраченное на написание кода в PySpark, также улучшилось благодаря навыкам программирования на Python.

Заключение

PySpark - отличный язык для изучения специалистами по данным, поскольку он обеспечивает масштабируемый анализ и конвейеры машинного обучения. Если вы уже знакомы с Python и Pandas, то большая часть ваших знаний может быть применена к Spark. Я показал, как выполнять некоторые общие операции с PySpark, чтобы ускорить процесс обучения. Я также продемонстрировал некоторые недавние функции Spark с пользовательскими функциями Pandas, которые позволяют выполнять код Python в распределенном режиме. Существуют отличные среды, которые позволяют легко приступить к работе с кластером Spark, поэтому сейчас отличное время для изучения PySpark!

Бен Вебер - ведущий специалист по анализу данных в Zynga. Мы нанимаем"!