В предыдущей статье о серии Serverless Spark мы описали, как можно разработать пример конвейера ETL. В этой статье мы расширим это, чтобы увидеть, как конвейер ML может быть разработан и организован с помощью Serverless Spark.

Мы построим простую регрессионную модель для прогнозирования веса пингвинов. Набор данных для этого решения доступен в общедоступных наборах данных BigQuery в разделе bigquery-public-data.ml_datasets.penguins. Мы предскажемвес пингвина на основе вида пингвина, острова проживания, длины и глубины стебля, длины ласт и пола.

Пример набора данных можно посмотреть здесь,

Архитектура решения всего конвейера следующая:

Чтобы начать работу над сеансом Serverless Spark в интерактивном режиме, перейдите к консоли Cloud Dataproc > Создать новый сеанс > Открыть сеанс Jupyter.

В Notebook › New Launcher › Serverless Spark › Здесь вы можете выбрать созданную сессию и начать работу над своим кодом pyspark.

Набор данных: данные о пингвинах доступны в файле bigquery-public-data.ml_datasets.penguins. Чтобы имитировать рабочий конвейер, мы предполагаем, что данные находятся в GCS, поэтому мы экспортируем эти данные из BigQuery в корзину GCS как penguins.csv.

model_building.py : мы будем использовать модель Linearregressor, чтобы построить регрессионную модель для прогнозирования веса пингвинов. Код для регрессионной модели приведен ниже:

from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window
import hashlib
from random import randint
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql.functions import regexp_replace
from pyspark.sql import functions as f
from pyspark.sql.functions import isnan, when, count, col
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.types import IntegerType
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
import pandas as pd
from pyspark.sql.functions import isnan, when, count, col
from google.cloud import bigquery
from pyspark.ml import PipelineModel
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
#Building a Spark session to read and write to and from BigQuery
spark = SparkSession.builder.appName('pyspark-penguin-ml').config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar').enableHiveSupport().getOrCreate()
#Reading the arguments and storing them in variables
project_name=<< replace your project id>>
dataset_name=<< replace your dataset id>>
bucket_name=<< replace your bucket name>>
user_name=<< replace your username>>
#Reading the penguins source data
p_df = spark.read.option("header",True).csv("gs://"+bucket_name+"/01-datasets/penguins.csv")
p_df.printSchema()
p_df.select("species", "island", "culmen_length_mm", "culmen_depth_mm",
 "flipper_length_mm", "body_mass_g","sex").show(5)
## Convert columns of type string to float
p_df=p_df.withColumn('culmen_length_mm',p_df['culmen_length_mm'].cast("float").alias('culmen_length_mm'))
p_df=p_df.withColumn('culmen_depth_mm',p_df['culmen_depth_mm'].cast("float").alias('culmen_depth_mm'))
p_df=p_df.withColumn('flipper_length_mm',p_df['flipper_length_mm'].cast("float").alias('flipper_length_mm'))
p_df=p_df.withColumn('culmen_length_mm',p_df['culmen_length_mm'].cast("float").alias('culmen_length_mm'))
p_df=p_df.withColumn('culmen_depth_mm',p_df['culmen_depth_mm'].cast("float").alias('culmen_depth_mm'))
p_df=p_df.withColumn('body_mass_g',p_df['body_mass_g'].cast("float").alias('body_mass_g'))
p_df.printSchema()
## Split the data into train and test in the ration 80:20
p_trainDF, p_testDF = p_df.randomSplit([.8, .2], seed=42)
print(f"""There are {p_trainDF.count()} rows in the training set,
and {p_testDF.count()} in the test set""")
## VectorAssembler takes a list of input columns and creates a new DataFrame with an additional column, which we will call features. It combines
##the values of those input columns into a single vector
vecAssembler = VectorAssembler(inputCols=["culmen_length_mm", "culmen_depth_mm",
 "flipper_length_mm"], outputCol="features")
vecTrainDF = vecAssembler.transform(p_trainDF)
vecTrainDF.select("species", "island", "culmen_length_mm", "culmen_depth_mm",
 "flipper_length_mm","sex","features", "body_mass_g").show(10)
## We will identify columns which are categorical and one-hot-code them
categoricalCols = [field for (field, dataType) in p_trainDF.dtypes
 if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols,
 outputCols=indexOutputCols,
 handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
 outputCols=oheOutputCols)
numericCols = [field for (field, dataType) in p_trainDF.dtypes
 if ((dataType == "float") & (field != "body_mass_g"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs,
 outputCol="features")
##we put all the feature preparation and model building into the pipeline, and
##apply it to our data set
lr = LinearRegression(labelCol="body_mass_g", featuresCol="features")
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])
pipelineModel = pipeline.fit(p_trainDF)
predDF = pipelineModel.transform(p_testDF)
predDF.select("features", "body_mass_g", "prediction").show(5)
predDF.show(5)
## Evaluate the Model 
regressionEvaluator = RegressionEvaluator(
 predictionCol="prediction",
 labelCol="body_mass_g",
 metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")
pipelineModel.write().overwrite().save('gs://'+bucket_name+'/'+user_name+'_dt_model/dt_model.model')
#Writing the output of the model evaluation to BigQuery
spark.conf.set("parentProject", project_name)
bucket = bucket_name
spark.conf.set("temporaryGcsBucket",bucket)
predDF.write.format('bigquery') .mode("overwrite").option('table', project_name+':'+dataset_name+'.'+user_name+'penguin_model_eval_output') .save()
print('Job Completed Successfully!')

Модель сохраняется в GCS, а результаты прогнозирования модели записываются в BigQuery.

Код «model_building.py» сохраняется в папке GCS.

serverless_spark_ml.py: для организации этого процесса мы будем использовать облачный редактор dag. Хотя это трехэтапный процесс, включающий прием данных из GCS в фреймворк данных pandas > построение и оценка модели > экспорт результатов модели в BigQuery, производственный конвейер машинного обучения может включать дополнительные этапы, особенно связанные с подготовкой данных, CI/CD и обслуживанием модели, поэтому требуется использование инструмента оркестровки, такого как Cloud Composer.

Код дага, как показано ниже,

import os
from airflow.models import Variable
from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (DataprocCreateBatchOperator,DataprocGetBatchOperator)
from datetime import datetime
from airflow.utils.dates import days_ago
import string
import random # define the random module
S = 10  # number of characters in the string.
# call random.choices() string module to find the string in Uppercase + numeric data.
ran = ''.join(random.choices(string.digits, k = S))
project_id = models.Variable.get("project_id")
region = models.Variable.get("region")
subnet=models.Variable.get("subnet")
phs_server=Variable.get("phs")
code_bucket=Variable.get("code_bucket")
bq_dataset=Variable.get("bq_dataset")
umsa=Variable.get("umsa")
name="rmanj"
dag_name= "serverless_spark_ml"
service_account_id= <<enter your service account>>
penguin_model= "gs://"+code_bucket+"/00-scripts/penguin_model.py"
BATCH_ID = "penguin-weight-"+str(ran)
BATCH_CONFIG1 = {
    "pyspark_batch": {
        "main_python_file_uri": penguin_model,
        "args": [
          project_id,
          bq_dataset,
          code_bucket,
          name
        ],
        "jar_file_uris": [
      "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar"
    ]
    },
    "environment_config":{
        "execution_config":{
              "service_account": service_account_id,
            "subnetwork_uri": subnet
            },
        "peripherals_config": {
            "spark_history_server_config": {
                "dataproc_cluster": f"projects/{project_id}/regions/{region}/clusters/{phs_server}"
                }
            },
        },
}
with models.DAG(
    dag_name,
    schedule_interval=None,
    start_date = days_ago(2),
    catchup=False,
) as dag_serverless_batch:
    # [START how_to_cloud_dataproc_create_batch_operator]
    create_serverless_batch1 = DataprocCreateBatchOperator(
        task_id="model_building",
        project_id=project_id,
        region=region,
        batch=BATCH_CONFIG1,
        batch_id=BATCH_ID,
    )
    
    create_serverless_batch1

После того, как вы сохраните его в папке dags, он должен появиться в пользовательском интерфейсе Composer. Затем вы можете запустить конвейер,

опубликовать выполнение, вы можете увидеть результаты прогнозирования модели в BigQuery,

Модель в GCS,

‹‹Обязательно удалите ресурсы, особенно Composer, после запуска конвейера››

Вы можете расширить этот конвейер, включив CI/CD через Cloud Build. Таким образом, вы можете перенести свои рабочие нагрузки Spark в GCP, получить все преимущества автоматического масштабирования, бессерверных возможностей Spark на GCP и при этом сохранить преимущество незакрепленной или незаблокированной реализации.