В предыдущей статье о серии Serverless Spark мы описали, как можно разработать пример конвейера ETL. В этой статье мы расширим это, чтобы увидеть, как конвейер ML может быть разработан и организован с помощью Serverless Spark.
Мы построим простую регрессионную модель для прогнозирования веса пингвинов. Набор данных для этого решения доступен в общедоступных наборах данных BigQuery в разделе bigquery-public-data.ml_datasets.penguins. Мы предскажемвес пингвина на основе вида пингвина, острова проживания, длины и глубины стебля, длины ласт и пола.
Пример набора данных можно посмотреть здесь,
Архитектура решения всего конвейера следующая:
Чтобы начать работу над сеансом Serverless Spark в интерактивном режиме, перейдите к консоли Cloud Dataproc > Создать новый сеанс > Открыть сеанс Jupyter.
В Notebook › New Launcher › Serverless Spark › Здесь вы можете выбрать созданную сессию и начать работу над своим кодом pyspark.
Набор данных: данные о пингвинах доступны в файле bigquery-public-data.ml_datasets.penguins. Чтобы имитировать рабочий конвейер, мы предполагаем, что данные находятся в GCS, поэтому мы экспортируем эти данные из BigQuery в корзину GCS как penguins.csv.
model_building.py : мы будем использовать модель Linearregressor, чтобы построить регрессионную модель для прогнозирования веса пингвинов. Код для регрессионной модели приведен ниже:
from pyspark.sql import SparkSession import pyspark from pyspark.sql.functions import * from datetime import datetime from pyspark.sql.functions import monotonically_increasing_id, row_number from pyspark.sql import Window import hashlib from random import randint from pyspark.ml.feature import VectorAssembler, StringIndexer from pyspark.sql.functions import regexp_replace from pyspark.sql import functions as f from pyspark.sql.functions import isnan, when, count, col import seaborn as sns import matplotlib.pyplot as plt from pyspark.sql.types import IntegerType from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.ml.feature import VectorAssembler import pandas as pd from pyspark.sql.functions import isnan, when, count, col from google.cloud import bigquery from pyspark.ml import PipelineModel from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoder, StringIndexer from pyspark.ml.feature import RFormula from pyspark.ml.regression import LinearRegression from pyspark.ml.evaluation import RegressionEvaluator #Building a Spark session to read and write to and from BigQuery spark = SparkSession.builder.appName('pyspark-penguin-ml').config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar').enableHiveSupport().getOrCreate() #Reading the arguments and storing them in variables project_name=<< replace your project id>> dataset_name=<< replace your dataset id>> bucket_name=<< replace your bucket name>> user_name=<< replace your username>> #Reading the penguins source data p_df = spark.read.option("header",True).csv("gs://"+bucket_name+"/01-datasets/penguins.csv") p_df.printSchema() p_df.select("species", "island", "culmen_length_mm", "culmen_depth_mm", "flipper_length_mm", "body_mass_g","sex").show(5) ## Convert columns of type string to float p_df=p_df.withColumn('culmen_length_mm',p_df['culmen_length_mm'].cast("float").alias('culmen_length_mm')) p_df=p_df.withColumn('culmen_depth_mm',p_df['culmen_depth_mm'].cast("float").alias('culmen_depth_mm')) p_df=p_df.withColumn('flipper_length_mm',p_df['flipper_length_mm'].cast("float").alias('flipper_length_mm')) p_df=p_df.withColumn('culmen_length_mm',p_df['culmen_length_mm'].cast("float").alias('culmen_length_mm')) p_df=p_df.withColumn('culmen_depth_mm',p_df['culmen_depth_mm'].cast("float").alias('culmen_depth_mm')) p_df=p_df.withColumn('body_mass_g',p_df['body_mass_g'].cast("float").alias('body_mass_g')) p_df.printSchema() ## Split the data into train and test in the ration 80:20 p_trainDF, p_testDF = p_df.randomSplit([.8, .2], seed=42) print(f"""There are {p_trainDF.count()} rows in the training set, and {p_testDF.count()} in the test set""") ## VectorAssembler takes a list of input columns and creates a new DataFrame with an additional column, which we will call features. It combines ##the values of those input columns into a single vector vecAssembler = VectorAssembler(inputCols=["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"], outputCol="features") vecTrainDF = vecAssembler.transform(p_trainDF) vecTrainDF.select("species", "island", "culmen_length_mm", "culmen_depth_mm", "flipper_length_mm","sex","features", "body_mass_g").show(10) ## We will identify columns which are categorical and one-hot-code them categoricalCols = [field for (field, dataType) in p_trainDF.dtypes if dataType == "string"] indexOutputCols = [x + "Index" for x in categoricalCols] oheOutputCols = [x + "OHE" for x in categoricalCols] stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip") oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols) numericCols = [field for (field, dataType) in p_trainDF.dtypes if ((dataType == "float") & (field != "body_mass_g"))] assemblerInputs = oheOutputCols + numericCols vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") ##we put all the feature preparation and model building into the pipeline, and ##apply it to our data set lr = LinearRegression(labelCol="body_mass_g", featuresCol="features") pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr]) pipelineModel = pipeline.fit(p_trainDF) predDF = pipelineModel.transform(p_testDF) predDF.select("features", "body_mass_g", "prediction").show(5) predDF.show(5) ## Evaluate the Model regressionEvaluator = RegressionEvaluator( predictionCol="prediction", labelCol="body_mass_g", metricName="rmse") rmse = regressionEvaluator.evaluate(predDF) print(f"RMSE is {rmse:.1f}") r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) print(f"R2 is {r2}") pipelineModel.write().overwrite().save('gs://'+bucket_name+'/'+user_name+'_dt_model/dt_model.model') #Writing the output of the model evaluation to BigQuery spark.conf.set("parentProject", project_name) bucket = bucket_name spark.conf.set("temporaryGcsBucket",bucket) predDF.write.format('bigquery') .mode("overwrite").option('table', project_name+':'+dataset_name+'.'+user_name+'penguin_model_eval_output') .save() print('Job Completed Successfully!')
Модель сохраняется в GCS, а результаты прогнозирования модели записываются в BigQuery.
Код «model_building.py» сохраняется в папке GCS.
serverless_spark_ml.py: для организации этого процесса мы будем использовать облачный редактор dag. Хотя это трехэтапный процесс, включающий прием данных из GCS в фреймворк данных pandas > построение и оценка модели > экспорт результатов модели в BigQuery, производственный конвейер машинного обучения может включать дополнительные этапы, особенно связанные с подготовкой данных, CI/CD и обслуживанием модели, поэтому требуется использование инструмента оркестровки, такого как Cloud Composer.
Код дага, как показано ниже,
import os from airflow.models import Variable from datetime import datetime from airflow import models from airflow.providers.google.cloud.operators.dataproc import (DataprocCreateBatchOperator,DataprocGetBatchOperator) from datetime import datetime from airflow.utils.dates import days_ago import string import random # define the random module S = 10 # number of characters in the string. # call random.choices() string module to find the string in Uppercase + numeric data. ran = ''.join(random.choices(string.digits, k = S)) project_id = models.Variable.get("project_id") region = models.Variable.get("region") subnet=models.Variable.get("subnet") phs_server=Variable.get("phs") code_bucket=Variable.get("code_bucket") bq_dataset=Variable.get("bq_dataset") umsa=Variable.get("umsa") name="rmanj" dag_name= "serverless_spark_ml" service_account_id= <<enter your service account>> penguin_model= "gs://"+code_bucket+"/00-scripts/penguin_model.py" BATCH_ID = "penguin-weight-"+str(ran) BATCH_CONFIG1 = { "pyspark_batch": { "main_python_file_uri": penguin_model, "args": [ project_id, bq_dataset, code_bucket, name ], "jar_file_uris": [ "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.22.2.jar" ] }, "environment_config":{ "execution_config":{ "service_account": service_account_id, "subnetwork_uri": subnet }, "peripherals_config": { "spark_history_server_config": { "dataproc_cluster": f"projects/{project_id}/regions/{region}/clusters/{phs_server}" } }, }, } with models.DAG( dag_name, schedule_interval=None, start_date = days_ago(2), catchup=False, ) as dag_serverless_batch: # [START how_to_cloud_dataproc_create_batch_operator] create_serverless_batch1 = DataprocCreateBatchOperator( task_id="model_building", project_id=project_id, region=region, batch=BATCH_CONFIG1, batch_id=BATCH_ID, ) create_serverless_batch1
После того, как вы сохраните его в папке dags, он должен появиться в пользовательском интерфейсе Composer. Затем вы можете запустить конвейер,
опубликовать выполнение, вы можете увидеть результаты прогнозирования модели в BigQuery,
Модель в GCS,
‹‹Обязательно удалите ресурсы, особенно Composer, после запуска конвейера››
Вы можете расширить этот конвейер, включив CI/CD через Cloud Build. Таким образом, вы можете перенести свои рабочие нагрузки Spark в GCP, получить все преимущества автоматического масштабирования, бессерверных возможностей Spark на GCP и при этом сохранить преимущество незакрепленной или незаблокированной реализации.