С легкостью обрабатывайте изменения данных из вашей базы данных в озеро данных с течением времени с помощью Apache Hudi в Amazon EMR
В предыдущей статье ниже мы обсудили, как легко собрать данные CDC с помощью Amazon Database Migration Service (DMS).
В следующей статье будет продемонстрировано, как обрабатывать данные CDC, чтобы в озере данных можно было получить представление базы данных почти в реальном времени. Для выполнения этой операции мы воспользуемся объединенной мощью Apache Hudi и Amazon EMR. Apache Hudi - это среда управления данными с открытым исходным кодом, используемая для упрощения инкрементной обработки данных почти в реальном времени.
Мы начнем процесс с создания нового кластера EMR.
$ aws emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole --applications Name=Spark Name=Hive --ebs-root-volume-size 10 --ec2-attributes '{"KeyName":"roopikadf","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-097e5d6e","EmrManagedSlaveSecurityGroup":"sg-088d03d676ac73013","EmrManagedMasterSecurityGroup":"sg-062368f478fb07c11"}' --service-role EMR_DefaultRole --release-label emr-6.0.0 --name 'Training' --instance-groups '[{"InstanceCount":3,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m5.xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","Name":"Master - 1"}]' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-east-1 --bootstrap-actions Path=s3://aws-analytics-course/job/energy/emr.sh,Name=InstallPythonLibs
После создания кластера EMR войдите в систему на главном узле с помощью SSH и выполните следующие команды. Эти команды скопируют файлы JAR Apache Hudi на S3.
$ aws s3 cp /usr/lib/hudi/hudi-spark-bundle.jar s3://aws-analytics-course/hudi/jar/ upload: ../../usr/lib/hudi/hudi-spark-bundle.jar to s3://aws-analytics-course/hudi/jar/hudi-spark-bundle.jar $ aws s3 cp /usr/lib/spark/external/lib/spark-avro.jar s3://aws-analytics-course/hudi/jar/ upload: ../../usr/lib/spark/external/lib/spark-avro.jar to s3://aws-analytics-course/hudi/jar/spark-avro.jar $ aws s3 ls s3://aws-analytics-course/hudi/jar/ 2020-10-21 17:00:41 23214176 hudi-spark-bundle.jar 2020-10-21 17:00:56 101212 spark-avro.jar
Теперь создайте новую записную книжку EMR и загрузите записную книжку, доступную в следующем месте. Загрузите hudi / hudi.ipynb
$ git clone https://github.com/mkukreja1/blogs.git
Создайте сеанс Spark, используя файлы Hudi JAR, загруженные в S3 на предыдущем шаге.
from pyspark.sql import SparkSession import pyspark from pyspark.sql.types import StructType, StructField, IntegerType, StringType, array, ArrayType, DateType, DecimalType from pyspark.sql.functions import * from pyspark.sql.functions import concat, lit, col spark = pyspark.sql.SparkSession.builder.appName("Product_Price_Tracking") \ .config("spark.jars", "s3://aws-analytics-course/hudi/jar/hudi-spark-bundle.jar,s3://aws-analytics-course/hudi/jar/spark-avro.jar") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .config("spark.sql.hive.convertMetastoreParquet", "false") \ .getOrCreate()
Давайте прочитаем файлы CDC. Начнем с чтения файла полной загрузки.
TABLE_NAME = "coal_prod" S3_RAW_DATA = "s3://aws-analytics-course/raw/dms/fossil/coal_prod/LOAD00000001.csv" S3_HUDI_DATA = "s3://aws-analytics-course/hudi/data/coal_prod" coal_prod_schema = StructType([StructField("Mode", StringType()), StructField("Entity", StringType()), StructField("Code", StringType()), StructField("Year", IntegerType()), StructField("Production", DecimalType(10,2)), StructField("Consumption", DecimalType(10,2)) ]) df_coal_prod = spark.read.csv(S3_RAW_DATA, header=False, schema=coal_prod_schema) df_coal_prod.show(5) +----+-----------+----+----+----------+-----------+ |Mode| Entity|Code|Year|Production|Consumption| +----+-----------+----+----+----------+-----------+ | I|Afghanistan| AFG|1949| 0.04| 0.00| | I|Afghanistan| AFG|1950| 0.11| 0.00| | I|Afghanistan| AFG|1951| 0.12| 0.00| | I|Afghanistan| AFG|1952| 0.14| 0.00| | I|Afghanistan| AFG|1953| 0.13| 0.00| +----+-----------+----+----+----------+-----------+ only showing top 5 rows
Apache Hudi требует наличия первичного ключа для индивидуальной идентификации каждой записи. Обычно для этой цели лучше всего подходит последовательно сгенерированный первичный ключ. Однако в нашей таблице его нет. Чтобы решить эту проблему, давайте сгенерируем PK, используя сочетание столбцов Entity и Year. Столбец ключ ниже будет использоваться в качестве первичного ключа.
df_coal_prod=df_coal_prod.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key")) df_coal_prod_f=df_coal_prod.drop(df_coal_prod.Mode) df_coal_prod_f.show(5) +-----------+----+----+----------+-----------+---------------+ | Entity|Code|Year|Production|Consumption| key| +-----------+----+----+----------+-----------+---------------+ |Afghanistan| AFG|1949| 0.04| 0.00|Afghanistan1949| |Afghanistan| AFG|1950| 0.11| 0.00|Afghanistan1950| |Afghanistan| AFG|1951| 0.12| 0.00|Afghanistan1951| |Afghanistan| AFG|1952| 0.14| 0.00|Afghanistan1952| |Afghanistan| AFG|1953| 0.13| 0.00|Afghanistan1953| +-----------+----+----+----------+-----------+---------------+ only showing top 5 rows
Теперь мы готовы сохранить данные в формате Hudi. Поскольку мы сохраняем эту таблицу впервые, мы будем использовать операцию «bulk_insert» и режим = перезапись. Также обратите внимание, что мы используем столбец «ключ» в качестве ключа записи.
df_coal_prod_f.write.format("org.apache.hudi") \ .option("hoodie.table.name", TABLE_NAME) \ .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \ .option("hoodie.datasource.write.operation", "bulk_insert") \ .option("hoodie.datasource.write.recordkey.field","key") \ .option("hoodie.datasource.write.precombine.field", "key") \ .mode("overwrite") \ .save(S3_HUDI_DATA)
Теперь мы можем прочитать только что созданную таблицу Hudi.
df_final = spark.read.format("org.apache.hudi")\ .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet") df_final.registerTempTable("coal_prod") spark.sql("select count(*) from coal_prod").show(5) spark.sql("select * from coal_prod where key='India2013'").show(5) +--------+ |count(1)| +--------+ | 6282| +--------+ +-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|Entity|Code|Year|Production|Consumption| key| +-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+ | 20201021215857|20201021215857_54...| India2013| default|8fae00ae-34e7-45e...| India| IND|2013| 2841.01| 0.00|India2013| +-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
Обратите внимание, что у нас есть 6282 строк из полной загрузки и данные для ключа India2013. Этот ключ будет обновлен при следующей операции, поэтому важно записывать историю. Теперь мы будем читать инкрементные данные.
Дополнительные данные поступили с 4 строками: 2 строки были вставлены, одна строка была обновлена и одна строка - удалена. Сначала мы обработаем вставленные и обновленные строки. обратите внимание на фильтр для («Mode IN (‘ U ’,‘ I ’)») ниже.
S3_INCR_RAW_DATA = "s3://aws-analytics-course/raw/dms/fossil/coal_prod/20200808-*.csv" df_coal_prod_incr = spark.read.csv(S3_INCR_RAW_DATA, header=False, schema=coal_prod_schema) df_coal_prod_incr_u_i=df_coal_prod_incr.filter("Mode IN ('U', 'I')") df_coal_prod_incr_u_i=df_coal_prod_incr_u_i.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key")) df_coal_prod_incr_u_i.show(5) df_coal_prod_incr_u_i_f=df_coal_prod_incr_u_i.drop(df_coal_prod_incr_u_i.Mode) df_coal_prod_incr_u_i_f.show() +----+------+----+----+----------+-----------+---------+ |Mode|Entity|Code|Year|Production|Consumption| key| +----+------+----+----+----------+-----------+---------+ | I| India| IND|2015| 4056.33| 0.00|India2015| | I| India| IND|2016| 4890.45| 0.00|India2016| | U| India| IND|2013| 2845.66| 145.66|India2013| +----+------+----+----+----------+-----------+---------+ +------+----+----+----------+-----------+---------+ |Entity|Code|Year|Production|Consumption| key| +------+----+----+----------+-----------+---------+ | India| IND|2015| 4056.33| 0.00|India2015| | India| IND|2016| 4890.45| 0.00|India2016| | India| IND|2013| 2845.66| 145.66|India2013| +------+----+----+----------+-----------+---------+
Теперь мы готовы выполнить операцию Hudi Upsert для дополнительных данных. Поскольку эта таблица уже существует, на этот раз мы воспользуемся опцией добавить.
df_coal_prod_incr_u_i_f.write.format("org.apache.hudi") \ .option("hoodie.table.name", TABLE_NAME) \ .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \ .option("hoodie.datasource.write.operation", "upsert") \ .option("hoodie.upsert.shuffle.parallelism", 20) \ .option("hoodie.datasource.write.recordkey.field","key") \ .option("hoodie.datasource.write.precombine.field", "key") \ .mode("append") \ .save(S3_HUDI_DATA)
Проверьте исходные данные. Обратите внимание, что были добавлены 2 новые строки, поэтому количество таблиц увеличилось с 6282 до 6284. Также обратите внимание, что строка для ключа India2013 теперь обновлена для столбцов «Производство и потребление».
df_final = spark.read.format("org.apache.hudi")\ .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet") df_final.registerTempTable("coal_prod") spark.sql("select count(*) from coal_prod").show(5) spark.sql("select * from coal_prod where key='India2013'").show(5) +--------+ |count(1)| +--------+ | 6284| +--------+ +-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|Entity|Code|Year|Production|Consumption| key| +-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+ | 20201021220359|20201021220359_0_...| India2013| default|8fae00ae-34e7-45e...| India| IND|2013| 2845.66| 145.66|India2013| +-------------------+--------------------+------------------+----------------------+--------------------+------+----+----+----------+-----------+---------+
Теперь мы хотели бы разобраться с одной строкой Удалено.
df_coal_prod_incr_d=df_coal_prod_incr.filter("Mode IN ('D')") df_coal_prod_incr_d=df_coal_prod_incr_d.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key")) df_coal_prod_incr_d_f=df_coal_prod_incr_d.drop(df_coal_prod_incr_u_i.Mode) df_coal_prod_incr_d_f.show() +------+----+----+----------+-----------+---------+ |Entity|Code|Year|Production|Consumption| key| +------+----+----+----------+-----------+---------+ | India| IND|2010| 2710.54| 0.00|India2010| +------+----+----+----------+-----------+---------+
Мы можем сделать это с помощью операции Hudi Upsert, но нам потребуется дополнительная опция для удаления hoodie.datasource.write.payload.class = org.apache.hudi.EmptyHoodieRecordPayload
df_coal_prod_incr_d_f.write.format("org.apache.hudi") \ .option("hoodie.table.name", TABLE_NAME) \ .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \ .option("hoodie.datasource.write.operation", "upsert") \ .option("hoodie.upsert.shuffle.parallelism", 20) \ .option("hoodie.datasource.write.recordkey.field","key") \ .option("hoodie.datasource.write.precombine.field", "key") \ .option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload") \ .mode("append") \ .save(S3_HUDI_DATA)
Теперь мы можем проверить результаты. Поскольку одна строка была удалена, счетчик уменьшился с 6284 до 6283. Кроме того, запрос для удаленной строки вернулся пустым. Все заработало как надо.
df_final = spark.read.format("org.apache.hudi")\ .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet") df_final.registerTempTable("coal_prod") spark.sql("select count(*) from coal_prod").show(5) spark.sql("select * from coal_prod where key='India2010'").show(5) +--------+ |count(1)| +--------+ | 6283| +--------+ +-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|Entity|Code|Year|Production|Consumption|key| +-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+ +-------------------+--------------------+------------------+----------------------+-----------------+------+----+----+----------+-----------+---+
Весь код, использованный в этой статье, можно найти по ссылке ниже:
Надеюсь, эта статья была полезной. CDC с использованием Amazon Database Migration Service рассматривается в рамках курса AWS Big Data Analytics, предлагаемого Datafence Cloud Academy. Курс преподаю я онлайн по выходным.