Коллекция обновлений в MongoDb через Apache Spark с использованием коннектора Mongo-Hadoop.

Я хотел бы обновить определенную коллекцию в MongoDb через Spark в Java. Я использую Коннектор MongoDB для Hadoop для извлечения и сохранения информации из Apache Spark для MongoDb в Java.

После прочтения отличного сообщения Sampo Niskanen о получении и сохранении коллекций в MongoDb через Spark, Я застрял с обновлением коллекций.

MongoOutputFormat.java включает конструктор, принимающий String[] updateKeys, который, как я предполагаю, относится к возможному списку ключей для сравнения существующих коллекций и выполнения обновления. Однако, используя метод Spark saveAsNewApiHadoopFile() с параметром MongoOutputFormat.class, мне интересно, как использовать этот конструктор обновлений.

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

До этого MongoUpdateWritable.java использовался для обновления коллекций. Из примеров, которые я видел в Hadoop, это обычно устанавливается на mongo.job.output.value, может быть, так в Spark:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);

Однако мне все еще интересно, как указать ключи обновления в MongoUpdateWritable.java.

По общему признанию, в качестве хакерского способа я установил «_id» объекта в качестве значения ключа моего документа, чтобы при выполнении сохранения коллекция перезаписывала документы, имеющие то же значение ключа, что и _id.

JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
    BSONObject o = (BSONObject) s._1;

    //for all keys, set _id to key:value_
    String id = "";
    for (String key : o.keySet()){
        id += key + ":" + (String) o.get(key) + "_";
    }
    o.put("_id", id);

    o.put("result", s._2);
    return new Tuple2<>(null, o);
});

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

Я хотел бы выполнить обновление коллекции mongodb через Spark, используя MongoOutputFormat или MongoUpdateWritable или Configuration, в идеале используя метод saveAsNewAPIHadoopFile(). Является ли это возможным? Если нет, есть ли другой способ, который не требует специальной установки _id для значений ключа, которые я хочу обновить?


person dyltini    schedule 23.10.2014    source источник


Ответы (1)


Я пробовал несколько комбинаций config.set("mongo.job.output.value","....") и несколько комбинаций

.saveAsNewAPIHadoopFile(
        "file:///bogus",
        classOf[Any],
        classOf[Any],
        classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
        mongo_config
      )

и ни один из них не работал.

Я заставил его работать, используя класс MongoUpdateWritable в качестве вывода моего метода карты:

items.map(row => {
      val mongo_id = new ObjectId(row("id").toString)
      val query = new BasicBSONObject()
      query.append("_id", mongo_id)
      val update = new BasicBSONObject()

      update.append("$set", new BasicBSONObject().append("field_name", row("new_value")))
      val muw = new MongoUpdateWritable(query,update,false,true)
      (null, muw)
    })
     .saveAsNewAPIHadoopFile(
       "file:///bogus",
       classOf[Any],
       classOf[Any],
       classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
       mongo_config
     )

Необработанный запрос, выполняемый в монго, выглядит примерно так:

2014-11-09T13:32:11.609-0800 [conn438] update db.users query: { _id: ObjectId('5436edd3e4b051de6a505af9') } update: { $set: { value: 10 } } nMatched:1 nModified:0 keyUpdates:0 numYields:0 locks(micros) w:24 3ms
person Francesco Laurita    schedule 09.11.2014