Yup, that was the problem. Changing the default " mongo.input.split_size" from 8MB to 100MB did the trick.
Config reference: https://github.com/mongodb/mongo-hadoop/wiki/Configuration-Reference Thanks! On Sat, Sep 12, 2015 at 3:15 PM, Richard Eggert <richard.egg...@gmail.com> wrote: > Hmm... The count() method invokes this: > > def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] > = { > runJob(rdd, func, 0 until rdd.partitions.length) > } > > It appears that you're running out of memory while trying to compute > (within the driver) the number of partitions that will be in the final > result. It seems as if Mongo is computing so many splits that you're > running out of memory. > > Looking at your log messages, I see this: > 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id" > : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"} > > 0x54e64d646d0bfe0a24ba79e1 - 0x54e64d626d0bfe0a24ba79b3 = > 0x2000000000000002e) = 36893488147419103278 > > The last split reported in the log has max 55adf841b4d2970fb07d7288. > > 0x55adf841b4d2970fb07d7288 - 0x54e64d646d0bfe0a24ba79e1 = > 0xc7aadd47c699058bc2f8a7 = 241383122307828806444054695 > > 241383122307828806444054695/36893488147419103278 = 6,542,702 potential > splits, assuming they are evenly distributed. I'm not sure how big each > split object is, but it's plausible that the process of creating an array > of 6.5 million of them is causing you to run out of memory. > > I think the reason you don't see anything in the executor logs is that the > exception is occurring before the work is tasked to the executors. > > > Rich > > > > On Sat, Sep 12, 2015 at 5:18 PM, Utkarsh Sengar <utkarsh2...@gmail.com> > wrote: > >> I am trying to run this, a basic mapToPair and then count() to trigger an >> action. >> 4 executors are launched but I don't see any relevant logs on those >> executors. >> >> It looks like the the driver is pulling all the data and it runs out of >> memory, the dataset is big, so it won't fit on 1 machine. >> >> So what is the issue here? I am using spark in a wrong way in this >> example? >> >> Configuration mongodbConfigInventoryDay = new Configuration(); >> mongodbConfigInventoryDay.set("mongo.job.input.format", >> "com.mongodb.hadoop.MongoInputFormat"); >> mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" + >> props.getProperty("mongo") + ":27017/A.MyColl"); >> JavaPairRDD<Object, BSONObject> MyColl = sc.newAPIHadoopRDD( >> mongodbConfigInventoryDay, >> MongoInputFormat.class, >> Object.class, >> BSONObject.class >> ); >> JavaPairRDD<Long, MyColl> myCollRdd = myColl.mapToPair(tuple2 -> { >> ObjectMapper mapper = new ObjectMapper(); >> tuple2._2().removeField("_id"); >> MyColl day = mapper.readValue(tuple2._2().toMap().toString(), >> MyColl.class); >> return new Tuple2<>(Long.valueOf((String) >> tuple2._2().get("MyCollId")), day); >> }); >> >> myCollRdd.count(); >> >> >> Logs on the driver: >> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with >> curMem=253374, maxMem=278019440 >> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in >> memory (estimated size 117.8 KB, free 264.8 MB) >> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with >> curMem=374038, maxMem=278019440 >> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as >> bytes in memory (estimated size 12.5 KB, free 264.8 MB) >> 15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in >> memory on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB) >> 15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from >> newAPIHadoopRDD at SparkRunner.java:192 >> 15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to >> check splits against mongodb:// >> dsc-dbs-0000001.qasql.opentable.com:27017/A.MyColl >> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null, >> max= { "_id" : "54e64d626d0bfe0a24ba79b3"} >> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ >> "_id" : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : >> "54e64d646d0bfe0a24ba79e1"} >> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ >> "_id" : "54e64d646d0bfe0a24ba79e1"}, max= { "_id" : >> "5581d1c3d52db40bc8558c6b"} >> ...... >> ...... >> 15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={ >> "_id" : "55adf840d3b5be0724224807"}, max= { "_id" : >> "55adf841b4d2970fb07d7288"} >> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space >> at org.bson.io.PoolOutputBuffer.<init>(PoolOutputBuffer.java:224) >> at org.bson.BasicBSONDecoder.<init>(BasicBSONDecoder.java:499) >> at >> com.mongodb.hadoop.input.MongoInputSplit.<init>(MongoInputSplit.java:59) >> at >> com.mongodb.hadoop.splitter.MongoCollectionSplitter.createSplitFromBounds(MongoCollectionSplitter.java:248) >> at >> com.mongodb.hadoop.splitter.StandaloneMongoSplitter.calculateSplits(StandaloneMongoSplitter.java:157) >> at >> com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:58) >> at >> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95) >> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) >> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) >> at >> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) >> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) >> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) >> at org.apache.spark.rdd.RDD.count(RDD.scala:1099) >> at >> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:442) >> at >> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47) >> at runner.SparkRunner.getInventoryDayRdd(SparkRunner.java:205) >> at runner.SparkRunner.main(SparkRunner.java:68) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> -- >> Thanks, >> -Utkarsh >> > > > > -- > Rich > -- Thanks, -Utkarsh