Hi,I appreciate any help or pointers in the right direction

My current test scenario is the following.

I want to process a MongoDB collection, anonymising some fields on it and
store it in another Collection.

The size of the collection is around 900 GB with 2.5 million documents

Following is the code.



object Anonymizer extends SparkRunner {

  val sqlContext = new SQLContext(sc)

  MongoDBLoader(conf, sc,
"output").load(MongoHadoopImplementationReader(conf, sc, "input").rdd,
    (dbObject: BSONObject) => {
      dbObject.put("add_field", "John Macclane")
      val embedded = dbObject.get("embedded").asInstanceOf[BasicDBObject]
      embedded.put("business_name", Name.first_name)
      dbObject.put("embedded", webRfq)
      val notesWrapper =
Option(dbObject.get("embedded_list").asInstanceOf[java.util.ArrayList[BasicDBObject]])
      notesWrapper match {
        case Some(notes) =>
          notes.foreach((note: BasicDBObject) => {
            note.put("text", Name.name)
          })
        case None =>
      }
      dbObject
    }
  )
}...

And




case class MongoHadoopImplementationReader(conf: com.typesafe.config.Config,
sc: SparkContext, collection: String) {
  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.input.uri",
   
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_database")}.${collection}")
  mongoConfig.set("mongo.input.split_size", "50")
  mongoConfig.set("mongo.input.limit", "70")


  def rdd: RDD[(Object, BSONObject)] = {
    val rdd = sc.newAPIHadoopRDD(
      mongoConfig,
      classOf[MongoInputFormat],
      classOf[Object],
      classOf[BSONObject])
    rdd
  }

}


And 


case class MongoDBLoader(conf: com.typesafe.config.Config, sc:SparkContext,
collection: String) {

  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.output.uri",
   
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_output_database")}.${collection}")

  def load(rdd: => RDD[(Object, BSONObject)], transformer: (BSONObject) =>
BSONObject) = {

    val mongoRDD = rdd.map[(Object, BSONObject)]((tuple: (Object,
BSONObject)) => {
      (null, transformer(tuple._2))
    })

    mongoRDD.saveAsNewAPIHadoopFile(
      "file:///this-is-completely-unused",
      classOf[Object],
      classOf[BSONObject],
      classOf[MongoOutputFormat[Object, BSONObject]],
      mongoConfig)
  }
}


This code runs slow. Taking 9.5 hours in a 3 machine cluster to process all.
And after 6 hours in a 30 machine cluster I stopped as it was only about
half processed.

The machines are ec2 m3.large instances. The MongoDB lives on another EC2
instance inside the same VPC and same subnet.

I tried to look into the configuration options but it seems that in most
cases the defaults are the way to go (number of cores, memory, etc). 

It looks like I have some bottleneck somewhere, but not sure at all. And I
am thinking Mongo is not able to handle the parallelism? 

How are the RDDs stored in memory?. When I run it, I see I get around 32000
partitions and tasks created. Then it looks to slow down the processing
towards it advance (This can be due to mongo documents being bigger at the
second half of our DB.).

I see as well that the split is stored in HDFS in Spark and then read and
BulkInserted in Mongo. However there is a lot of HDFS space (like 30 gigs
per machine) but just a tiny fraction is used. Wouldn't it be better to fill
this more and only try to insert into mongo when more data is available?. 

I also tried to increase the Split size, but it complains of not enough
resources on the worker. However I don't think the Splits are big enough to
actually fill the 6GB of memory of each node, as when it stores them on HDFS
is a lot less than that.

Is there anything obvious (or not :)) that I am not doing correctly?. Is
this the correct way to transform a collection from Mongo to Mongo?. Is
there another way?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mongo-hadoop-with-Spark-is-slow-for-me-and-adding-nodes-doesn-t-seem-to-make-any-noticeable-differene-tp24754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to