Как подключить Azure Databricks к Cosmos DB с помощью API MongoDB?

Я создал одну учетную запись azure CosmosDB с помощью MongoDB API. Мне нужно подключить CosmosDB (API MongoDB) к кластеру Azure Databricks, чтобы читать и записывать данные из космоса.

Как подключить кластер Azure Databricks к учетной записи CosmosDB?


person Chetan SP    schedule 28.11.2018    source источник


Ответы (3)


Вот фрагмент кода pyspark, который я использую для подключения к базе данных CosmosDB с помощью API MongoDB из Azure Databricks (бета-версия 5.2 ML (включает Apache Spark 2.4.0, Scala 2.11) и коннектора MongoDB: org.mongodb.spark: mongo-spark- connector_2.11: 2.4.0):

from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource") \
  .option("uri", CONNECTION_STRING) \
  .load()

С CONNECTION_STRING, который выглядит так: "mongodb: // USERNAME: [email protected]: 10255 / DATABASE_NAME.COLLECTION_NAME? Ssl = true & replicaSet = globaldb"

Я пробовал множество других вариантов (добавление имен базы данных и коллекций в качестве опции или конфигурации SparkSession) безуспешно. Скажите, работает ли у вас ...

person 5md    schedule 22.01.2019
comment
Вы также можете указать базу данных и коллекцию с помощью параметров в последней версии Spark. - person hui chen; 19.06.2020

После добавления пакета org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 у меня это сработало:

import json

query = {
  '$limit': 100,
}

query_config = {
  'uri': 'myConnectionString'
  'database': 'myDatabase',
  'collection': 'myCollection',
  'pipeline': json.dumps(query),
}

df = spark.read.format("com.mongodb.spark.sql") \
  .options(**query_config) \
  .load()

Однако я получаю эту ошибку с некоторыми коллекциями:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 10.139.64.6, executor 0): com.mongodb.MongoInternalException: The reply message length 10168676 is less than the maximum message length 4194304
person Menzies    schedule 17.02.2019
comment
Также было прервано задание из-за исключения сбоя этапа - person tatigo; 22.05.2019

Отвечая так же, как и на свой вопрос.

Используя MAVEN в качестве источника, я установил нужную библиотеку в свой кластер, используя путь

org.mongodb.spark: mongo-spark-connector_2.11: 2.4.0

Spark 2.4

Пример кода, который я использовал, выглядит следующим образом (для тех, кто хочет попробовать):

# Read Configuration
readConfig = {
    "URI": "<URI>",
    "Database": "<database>",
    "Collection": "<collection>",
  "ReadingBatchSize" : "<batchSize>"
  }


pipelineAccounts = "{'$sort' : {'account_contact': 1}}"

# Connect via azure-cosmosdb-spark to create Spark DataFrame 
accountsTest = (spark.read.
                 format("com.mongodb.spark.sql").
                 options(**readConfig).
                 option("pipeline", pipelineAccounts).
                 load())

accountsTest.select("account_id").show()
person Gilmar Neves    schedule 01.07.2020