Ошибка `` Модуль не найден '' при импорте модуля Pyspark Delta Lake

Я использую Pyspark с дельта-озером, но когда я пытаюсь импортировать дельта-модуль, я получаю ModuleNotFoundError: No module named 'delta'. Это на машине без подключения к Интернету, поэтому мне пришлось вручную загрузить jar-файл delta-core с Maven и поместите его в папку %SPARK_HOME%/jars.

Моя программа работает без проблем, и я могу писать и читать из дельты озера, так что я счастлив, что у меня правильный сосуд. Но когда я пытаюсь импортировать дельта-модуль from delta.tables import *, я получаю сообщение об ошибке.

Для информации мой код:

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, FloatType, StructType, StructField
from pyspark.sql.functions import input_file_name
from Constants import Constants

if __name__ == "__main__":
    constants = Constants()
    spark = SparkSession.builder.master("local[*]")\
                                .appName("Delta Lake Testing")\
                                .getOrCreate()

    # have to start spark session before importing: https://docs.delta.io/latest/quick-start.html#python
    from delta.tables import *

    # set logging level to limit output
    spark.sparkContext.setLogLevel("ERROR")

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    # push additional python files to the worker nodes
    base_path = os.path.abspath(os.path.dirname(__file__))
    spark.sparkContext.addPyFile(os.path.join(base_path, 'Constants.py'))

    # start pipeline
    schema = StructType([StructField("Timestamp", TimestampType(), False),\
                        StructField("ParamOne", FloatType(), False),\
                        StructField("ParamTwo", FloatType(), False),\
                        StructField("ParamThree", FloatType(), False)])

    df = spark.readStream\
               .option("header", "true")\
               .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")\
               .schema(schema)\
               .csv(constants.input_path)\
               .withColumn("input_file_name", input_file_name())

     df.writeStream\
       .format("delta")\
       .outputMode("append")\
       .option("checkpointLocation", constants.checkpoint_location)\
       .start("/tmp/bronze")

    # await on stream
    sqm = spark.streams
    sqm.awaitAnyTermination()

Здесь используются Spark v2.4.4 и Python v3.6.1, а задание отправлено с использованием spark-submit path/to/job.py


person Poc275    schedule 11.06.2020    source источник
comment
Это может вам помочь stackoverflow.com/questions/59170595/   -  person TheDarkW3b    schedule 11.06.2020
comment
Это сработало, спасибо @ TheDarkW3b. Не могу поверить, что пропустил этот вопрос во время поиска! Отправьте его как ответ, если хотите, и я отмечу его как выполненное.   -  person Poc275    schedule 12.06.2020