Why not saveAsNewAPIHadoopFile?
//Define your mongoDB confs
val config = new Configuration()
config.set("mongo.output.uri", "mongodb://
127.0.0.1:27017/sigmoid.output")
//Write everything to mongo
rdd.saveAsNewAPIHadoopFile("file:///some/random", classOf[Any],
classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
config)
Thanks
Best Regards
On Fri, Nov 7, 2014 at 2:53 PM, qinwei <[email protected]> wrote:
> Hi, everyone
>
> I come across with a prolem about writing data to mongodb in
> mapPartitions, my code is as below:
>
> val sourceRDD = sc.textFile("hdfs://host:port/sourcePath")
> // some transformations
> val rdd= sourceRDD .map(mapFunc).filter(filterFunc)
> val newRDD = rdd.mapPartitions(args => {
> val mongoClient = new MongoClient("host", port)
> val db = mongoClient.getDB("db")
> val coll = db.getCollection("collectionA")
>
> args.map(arg => {
> coll.insert(new BasicDBObject("pkg", arg))
> arg
> })
>
> mongoClient.close()
> args
> })
>
> newRDD.saveAsTextFile("hdfs://host:port/path")
>
> The application saved data to HDFS correctly, but not mongodb, is
> there someting wrong?
> I know that collecting the newRDD to driver and then saving it to
> mongodb will success, but will the following saveAsTextFile read the
> filesystem once again?
>
> Thanks
>
>
> ------------------------------
> qinwei
>