В этом посте я расскажу вам об основах PySpark. Я постараюсь сделать его кратким и лаконичным.

Перво-наперво, что такое Spark?

Распределенная система обработки, используемая для рабочих нагрузок больших данных или чего-либо еще;) Она быстрее, чем MapReduce (операции фильтрации, сортировки и суммирования), потому что работает в памяти. Итак, стоит помнить:

MapReduce ~ = дисковые вычисления

Spark ~ = вычисления, ориентированные на память

Это, конечно, большое обобщение, но мне нравится думать об этом вот так. Если у вас есть другое мнение или вы хотите что-то добавить, не стесняйтесь. В конечном итоге, если вы решите работать с этими технологиями, вы соберете гораздо больше информации. Если вы просто хотите иметь какое-то чувство, базовое понимание, я лично считаю это простым ответом, который дает некоторое понимание.

Кстати, если вы относитесь к типу человека, который любит сиять во время вечеринок, вы можете запомнить, что «Spark представляет большие наборы данных как RDD» (устойчивые распределенные наборы данных) «- неизменяемые распределенные коллекции объектов, которые хранятся в исполнителях (или подчиненные узлы). Объекты, составляющие RDD, называются разделами и могут (но не обязательно) вычисляться на разных узлах распределенной системы. Вместо того, чтобы оценивать каждое преобразование сразу же, как указано программой драйвера, Spark лениво оценивает RDD, вычисляя преобразования RDD только тогда, когда необходимо вычислить окончательные данные RDD (часто путем записи в хранилище или сбора агрегата в драйвер). Spark может сохранять RDD загруженным в памяти на узлах-исполнителях на протяжении всего жизненного цикла приложения Spark для более быстрого доступа в повторяющихся вычислениях ». [Карау Х., Уорран Р., High Performance Spark]

или просто помните, что у Spark есть RDD и он лениво оценивает. DataFrames и Datasets построены на основе RDD.

Что в нем содержится?

Встроенные модули для потоковой передачи, SQL, машинного обучения и обработки графиков

Что такое PySpark?

PySpark = Python + Spark (все просто). Если хотите, можете сказать, что python API поддерживает Spark.

Как начать свое путешествие с PySpark

  1. Установить Java
  2. Установить Anaconda
  3. Установить Spark
  4. Установите PySpark и FindSpark (mac) / winutils.exe и FindSpark (windows10)

Следуйте: https://sharing.luminis.eu/blog/how-to-install-pyspark-and-apache-spark-on-macos/ (для Mac) или https://medium.com/@naomi .fridman / install-pyspark-to-run-on-jupyter-notebook-on-windows-4ec2009de21f (для windows10)

Основы PySpark:

Чтение данных, паркет, json, csv

dfp = spark.read.load('path-to-parquet-files/')
dfjson = spark.read.load('path-to-json-files/', format="json")
dfcsv = spark.read.load('path-to-csv-files/', format="csv", sep=":",        inferSchema="true", header="true")

круто, вы также можете запускать SQL непосредственно для этих файлов.

dfp = spark.sql("SELECT * FROM parquet.`path-to-parquet-files/`") 

это дает вам возможность сделать быстрый предварительный выбор столбцов или другие преобразования. Кроме того, вы можете выполнять фильтрацию и выбор, не вызывая spark.sql просто так

dfp = spark.read.load('path-to-parquet-files/').filter("date between 20200101 and 20200215").select("columnA").distinct()

Мы видим, что мы применили некоторую фильтрацию, выбрав один единственный столбец, и оставили только отдельные строки, что было быстро и легко.

Можем ли мы читать из ведер s3? Простой ответ: да, мы можем!

Итак, мы знаем, как читать данные, чтобы увидеть данные, которые мы можем просто сделать

dfp.show()
# or 
dfp.show(truncate=True)  

Следующее количество строк может быть отображено с помощью

dfp.count() 

мы можем увидеть схему, используя

dfp.schema 

Думаю, мы готовы к некоторым присоединениям.

Объедините данные вместе, не стесняйтесь. Функция соединения PySpark аналогична соединению SQL, где две или более таблицы или фреймы данных могут быть объединены в зависимости от условий.

Мы можем присоединиться с помощью sql или встроить функции.

  1. Встроенные функции:
df = df1.join(df2, "column_name")

ты сделал это! просто как тот. Но что именно мы сделали, это соединение по умолчанию = внутреннее соединение. Что, если мы захотим заняться чем-нибудь еще?

df = df1.join(df2, "column_name", how="outer")

хорошо, теперь я вижу, что мы можем добавить параметр, определяющий тип соединения, это хорошо. У нас могут быть внутренние (по умолчанию), внешние, left, right, left_semi, left_anti,…

Кроме того, вы можете выполнить некоторые условия, например:

df  = df1.join(df2, df1.coulmn1 == df2.coulmn1, how='inner')

поэтому вместо имени столбца мы просто добавили условие. Для нескольких условий просто поместите их в [] и разделите запятыми (для условий или разделите |). Примеры ниже:

df_and  = df1.join(df2, [df1.val1 < df2.val2, df1.val2 < df2.val3], how='inner')
df_or  = df1.join(df2, [(df1.val1 < df2.val2) | (df1.val2 > df2.val3)], how='inner')

По моему опыту, вы в основном будете использовать inner, left_anti, full_outer, а иногда и что-то еще, так что не забывайте об этом (вы всегда можете погуглить, это примерно 5 секунд)

и вы можете сравнить это с возможными значениями параметра «как» во встроенной функции соединения:

how - str, по умолчанию inner. Должен быть одним из: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, _23 _, _ 24_, leftanti и left_anti.

Надеюсь, это не сбивает с толку, но если у вас все еще есть проблемы, вы всегда можете протестировать это на некоторых примерах фреймов данных и посмотреть, как это пойдет.

Хорошо, мы почти закончили, но, как вы помните, я сказал, что вы также можете использовать sql. Для этого вам нужно создать временные представления, и на картинке выше у вас есть вся необходимая информация. Итак, давайте посмотрим на простой пример:

df1.createOrReplaceTempView("df1s")
df2.createOrReplaceTempView("df2s")
df = spark.sql("select a.* from df1s a join df2s b on a.column1 = b.column1")

Union = объединить два фрейма данных вместе (что-то вроде добавления в конец другого). Если вы знакомы с union в sql, вы знаете, что есть union и union all. В PySpark у нас есть только union (unionAll устарел, начиная с Spark «2.0.0»). Union in Spark не удаляет дубликаты из результирующего фрейма данных (вы можете использовать разные). Для объединения фреймы данных должны иметь идентичную схему.

df1n2 = df1.union(df2)

Теперь вы эксперт или почти эксперт по смешиванию данных. Конечно, требуется время, чтобы освоить все эти ETL, но это не ракетостроение (если вы работаете в NASA и используете PySpark, просто отбросьте этот комментарий).

Наверное, теперь вы думаете, а что, если я хочу сделать какие-то дополнительные преобразования?

UDF. Допустим, вы хотите каким-то образом преобразовать один из столбцов. Вы хотите сделать это быстро и параллельно. UDF здесь, чтобы помочь вам. Итак, позвольте мне показать вам простой пример.

Прежде всего нам нужно импортировать

from pyspark.sql.functions import udf

Давайте определим функцию, которую мы будем применять к каждой строке:

def test_function(row_value):
    ‘’’
    Test function to show how you can apply your own function 
    using UDFs
    ‘’’
    try:
       Output = float(row_value)/100.0
    except:
       Output = 0.0
    return Output

Итак, мы определили тестовую функцию, которая просто делит значение в каждой строке на 100,0. Теперь мы можем применить эту функцию с помощью UDF. Определим udf:

udf_F = udf(lambda x: test_function(x))

и теперь мы можем запустить это в каком-либо столбце (в примере имя столбца - «значения») нашего фрейма данных (df1)

df1new = df1.withColumn('Vdivided', udf_F(df1['values']))

и теперь у нас есть df1new, в котором есть дополнительный столбец с именем «Vdivided».

Это было просто. Можем ли мы сделать то же самое, но использовать информацию из нескольких столбцов? Конечно, давайте сделаем это с помощью двух столбцов, и тогда, вероятно, вы сможете расширить это до любого количества столбцов.

def test_function2(row_value1, row_value2):
    ‘’’
    Test function to show how you can apply your own function 
    using UDFs, taking into account info from two coulumns
‘’’
    try:
       Output = (float(row_value)/100.0) * row_value2
    except:
       Output = 0.0
    return Output

а также

udf_F2 = udf(lambda x, y: test_function2(x,y))
df1new_2 = df1.withColumn("newV", udf_F2(df1['values'], df1['other_values']) )

хорошо, круто, это тоже было легко. Кстати, в некоторых случаях после некоторых преобразований вы заметили, что ваш столбец должен быть, например, коллекциями целых чисел, но у вас есть числа с плавающей запятой (все остальное в порядке), тогда помните, что вы всегда можете использовать cast

df1 = df1.withColumn("Values", df1["Values"].cast(IntegerType()) )

Если вы предпочитаете, чтобы sql произвел какое-то преобразование, вы можете сделать это, просто помните, что вам нужны представления

df1.createOrReplaceTempView("df1s")
df = spark.sql("select ...

Нулевые значения в данных. Не бойтесь, мы можем что-то с этим поделать. Прежде всего, мы просто просмотрели UDF, чтобы вы могли найти нули и уже что-то с ними делать, но что, если вы хотите сделать что-то простое, например, отбросить их или заполнить их чем-то еще.

df1 = df.na.fill(0)
df2= df.na.drop()

df1 - значения NULL теперь равны 0, df2 - значения NULL отбрасываются (drop - возвращает новый DataFrame, который отбрасывает строки, содержащие любые значения NULL или NaN.)

Написание файлов. Возможно, сейчас самое время написать где-нибудь свои преобразованные фреймы данных, чтобы вы могли использовать их позже. Как я могу записывать данные в PySpark?

df.write.parquet("s3n://yours3bucket/or/any/other/location/,mode="overwrite")

Хорошо, это паркет, можно написать в csv? Конечно, можно. Из Spark 2.0+ вы можете использовать напрямую

df.write.csv("/location/", mode="overwrite")

Если вам нужен один файл, вы можете переразбить

df.repartition(1).write.csv("/location/single.csv", sep='|')

В те дни машинное обучение привлекало всех. На этом этапе публикации вы должны уметь создать довольно приличный ETL, так что можем ли мы делать ML поверх этого. Это сложно? Давайте посмотрим. Перво-наперво, давайте сделаем некоторую нормализацию, так как некоторые алгоритмы необходимы (некоторые могут работать в любом случае, я дам вам угадать, какие из них).

Нормализация. Как? А вот и PySpark! Сохраняемся - https://spark.apache.org/docs/latest/ml-features.html - ›нормализатор

columns_to_scale = [“column1”, “column2”, “column3”]

поэтому мы хотим использовать 3 столбца в качестве наших функций и масштабировать их, скажем, с помощью Min Max Scaler. Для этого нам нужно, чтобы наши объекты были представлены в виде вектора. Для этого мы используем векторный ассемблер, а затем применяем MinMaxScaler. Мы используем конвейер для применения этого масштабирования (проверьте документацию для других реализованных нормализаторов).

from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
columns_to_scale_copy = [“column1”, “column2”, “column3”]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col2 + "_vec", outputCol=col2 + "_sca") for col2 in columns_to_scale_copy]
pipeline = Pipeline(stages = assemblers + scalers)

Мы видим, что конвейерный подход дает нам простой способ добавить дополнительный элемент при необходимости. Теперь нам просто нужно применить это к нашему фреймворку данных.

Но подождите, мы знаем, что нужно получить параметры нормализации в обучающем наборе, а затем применить ту же нормализацию к другим наборам (перекрестная проверка и тестирование). Итак, как мы можем разделить наш фрейм данных?

train, test = df.randomSplit([0.8, 0.2], seed=12345)

Итак, это случайное разделение для обучения и тестирования (80% данных для обучения, 20% данных для тестирования), вот и все. Вероятно, вам нужно будет сделать некоторые проверки и решить, можно ли использовать случайное разбиение или, может быть, вам нужна стратифицированная выборка, но на этом этапе давайте предположим, что случайное разбиение в порядке.

Итак, давайте применим нормализацию для тестирования и обучения.

scalerModel = pipeline.fit(train)
train_scaled = scalerModel.transform(train)
test_scaled = scalerModel.transform(test)

У нас есть данные, готовые для ML / DL, ура!

ML / DL. ML с PySpark - это просто, просто проверьте эту страницу и посмотрите все реализованные модели - https://spark.apache.org/docs/latest/ml-classification-regression.html

Давайте, например, проведем регрессию с использованием случайного леса, чтобы вы могли увидеть, что это всего лишь несколько строк кода.

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
rf = RandomForestRegressor(featuresCol=”features”)
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(train)
predictions = model.transform(test)
predictions2 = model.transform(train)
evaluator = RegressionEvaluator(labelCol=”label”, predictionCol=”prediction”, metricName=”rmse”)
evaluator2 = RegressionEvaluator(labelCol=”label”, predictionCol=”prediction”, metricName=”rmse”)
rmse = evaluator.evaluate(predictions)
rmse2 = evaluator2.evaluate(predictions2)
print(“Root Mean Squared Error (RMSE) on test data = %g” % rmse)
print(“Root Mean Squared Error (RMSE) on train data = %g” % rmse2)
rfModel = model.stages[0]
print(rfModel)

Это было быстро. Вы также можете заметить, что мы снова использовали конвейер. Таким образом, вы можете легко объединить все элементы в один конвейер, если хотите.

Время для DL. По моему опыту, в 85% случаев вам не понадобится распределенное обучение. Вы просто настроите один экземпляр ec2-p2, например p2.16xlarge, и запустите модель глубокого обучения параллельно на 16 графических процессорах, и этого должно быть достаточно. В некоторых случаях, вероятно, вам понадобится p3s, но это довольно редко. Можете ли вы сделать распределенное глубокое обучение в pyspark? Керас простой, можно керас?

Да, вы можете проводить распределенное обучение в PySpark, и да, вы можете использовать Keras, но, вероятно, вам нужны дополнительные библиотеки, такие как Elephas (https://github.com/maxpumperla/elephas), dist-keras (https: // github .com / cerndb / dist-keras ), Horovod ( https://github.com/horovod/horovod ). У каждого из них есть свои плюсы и минусы, но это отдельная статья.

Реализация Elephas может быть очень быстрой, поэтому давайте просто посмотрим на пример кода.

from elephas.spark_model import SparkModel
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous') 
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)

Вы инициализируете SparkModel, передавая скомпилированную модель Keras, частоту обновления и режим распараллеливания. После этого вы можете просто fit модель на своем RDD. Elephas fit имеет те же параметры, что и модель Keras, поэтому вы можете передавать epochs, batch_size и т. Д., Как вы привыкли из Keras . (с сайта http://maxpumperla.com/elephas/). Некоторые вещи становятся немного сложнее, если вы хотите использовать обратные вызовы. Это работает на rdd, если вы хотите тренироваться на фрейме данных, вы также можете найти образец кода по ссылке выше. По моему личному опыту, вероятно, вы будете обучать keras, tensorflow, pytorch на одном экземпляре (возможно, с несколькими графическими процессорами), а не на кластерах / EMR. В любом случае, вперед. Теперь у вас есть модель, и вы хотите делать прогнозы, и здесь вы можете легко и красиво использовать параллельный подход с PySpark.

Запуск модели в PySpark. Здесь мы можем просто использовать UDF Pandas.

schema = StructType([StructField(‘id’, StringType(), True), StructField(‘score’, DoubleType(),True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_keras(pd):
    pd[‘score’] = model.predict(pd.iloc[:,0:10])
    return pd[[‘id’, ‘score’]]
infer_data_final.createOrReplaceTempView("infer_data_final_v")
partitionedDF = spark.sql("""select *, cast(rand()*100 as int) as partition_id from infer_data_final_v""")
results_df=partitionedDF.groupby('partition_id').apply(apply_keras)

Хорошо, давайте пошагово. Мы делаем функцию apply_keras с декоратором pandas_udf, который имеет схему и тип. В функции мы запускаем прогнозирование модели, вызывая model.predict, в этом примере наши функции находятся в первых 10 столбцах. Модель прогнозирует на основе функций, а функция возвращает идентификатор и оценку (мы предполагаем, что идентификатор столбца существует). Из фрейма данных, к которому мы хотим применить нашу модель, мы создаем представление (infer_data_final → infer_data_final_v), а затем разделяем все, в зависимости от количества рабочих узлов вы можете изменить количество разделов. Затем мы выполняем прогнозы модели для этих разделов и получаем результат. Все просто и должно быть довольно быстро.

Но ждать! Я думал, что у нас есть функции в виде вектора в одном столбце, мы можем разделить это или что?

infer_data = infer_data.select(“features”).rdd.map(lambda x: x[0].toArray().tolist()).toDF()

Я дам вам понять, как применять прогнозы модели keras без разделения, чтобы вы тоже могли повеселиться, удачи!

Думаю, к этому моменту я рассмотрел значительную часть основ PySpark. По крайней мере, часть, связанная с базовыми командами, SQL, машинным обучением.

Думаю, пора остановиться. Возможно, в следующий раз я расскажу о потоковой передаче и обработке графиков, или, может быть, потрачу больше времени на распределенное обучение, или, возможно, расширю часть ETL и добавлю некоторую информацию о том, как создавать красивые графики. Что вы думаете?

и, пожалуйста, поделитесь своими комментариями и любыми классными базовыми вещами, которые, по вашему мнению, должны быть включены, или, может быть, вы знаете несколько интересных трюков (я их возьму :))

Спасибо за чтение!