Я использую Pyspark с дельта-озером, но когда я пытаюсь импортировать дельта-модуль, я получаю ModuleNotFoundError: No module named 'delta'
. Это на машине без подключения к Интернету, поэтому мне пришлось вручную загрузить jar-файл delta-core с Maven и поместите его в папку %SPARK_HOME%/jars
.
Моя программа работает без проблем, и я могу писать и читать из дельты озера, так что я счастлив, что у меня правильный сосуд. Но когда я пытаюсь импортировать дельта-модуль from delta.tables import *
, я получаю сообщение об ошибке.
Для информации мой код:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, FloatType, StructType, StructField
from pyspark.sql.functions import input_file_name
from Constants import Constants
if __name__ == "__main__":
constants = Constants()
spark = SparkSession.builder.master("local[*]")\
.appName("Delta Lake Testing")\
.getOrCreate()
# have to start spark session before importing: https://docs.delta.io/latest/quick-start.html#python
from delta.tables import *
# set logging level to limit output
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.session.timeZone", "UTC")
# push additional python files to the worker nodes
base_path = os.path.abspath(os.path.dirname(__file__))
spark.sparkContext.addPyFile(os.path.join(base_path, 'Constants.py'))
# start pipeline
schema = StructType([StructField("Timestamp", TimestampType(), False),\
StructField("ParamOne", FloatType(), False),\
StructField("ParamTwo", FloatType(), False),\
StructField("ParamThree", FloatType(), False)])
df = spark.readStream\
.option("header", "true")\
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")\
.schema(schema)\
.csv(constants.input_path)\
.withColumn("input_file_name", input_file_name())
df.writeStream\
.format("delta")\
.outputMode("append")\
.option("checkpointLocation", constants.checkpoint_location)\
.start("/tmp/bronze")
# await on stream
sqm = spark.streams
sqm.awaitAnyTermination()
Здесь используются Spark v2.4.4 и Python v3.6.1, а задание отправлено с использованием spark-submit path/to/job.py