Yup, that was the problem.
Changing the default " mongo.input.split_size" from 8MB to 100MB did the
trick.

Config reference:
https://github.com/mongodb/mongo-hadoop/wiki/Configuration-Reference

Thanks!

On Sat, Sep 12, 2015 at 3:15 PM, Richard Eggert <richard.egg...@gmail.com>
wrote:

> Hmm... The count() method invokes this:
>
> def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
> = {
>    runJob(rdd, func, 0 until rdd.partitions.length)
> }
>
> It appears that you're running out of memory while trying to compute
> (within the driver) the number of partitions that will be in the final
> result. It seems as if Mongo is computing so many splits that you're
> running out of memory.
>
> Looking at your log messages, I see this:
> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
> : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"}
>
> 0x54e64d646d0bfe0a24ba79e1 - 0x54e64d626d0bfe0a24ba79b3 =
> 0x2000000000000002e) = 36893488147419103278
>
> The last split reported in the log has max 55adf841b4d2970fb07d7288.
>
> 0x55adf841b4d2970fb07d7288 - 0x54e64d646d0bfe0a24ba79e1 =
> 0xc7aadd47c699058bc2f8a7 = 241383122307828806444054695
>
> 241383122307828806444054695/36893488147419103278 = 6,542,702 potential
> splits, assuming they are evenly distributed. I'm not sure how big each
> split object is, but it's plausible that the process of creating an array
> of 6.5 million of them is causing you to run out of memory.
>
> I think the reason you don't see anything in the executor logs is that the
> exception is occurring before the work is tasked to the executors.
>
>
> Rich
>
>
>
> On Sat, Sep 12, 2015 at 5:18 PM, Utkarsh Sengar <utkarsh2...@gmail.com>
> wrote:
>
>> I am trying to run this, a basic mapToPair and then count() to trigger an
>> action.
>> 4 executors are launched but I don't see any relevant logs on those
>> executors.
>>
>> It looks like the the driver is pulling all the data and it runs out of
>> memory, the dataset is big, so it won't fit on 1 machine.
>>
>> So what is the issue here? I am using spark in a wrong way in this
>> example?
>>
>>         Configuration mongodbConfigInventoryDay = new Configuration();
>>         mongodbConfigInventoryDay.set("mongo.job.input.format",
>> "com.mongodb.hadoop.MongoInputFormat");
>>         mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" +
>> props.getProperty("mongo") + ":27017/A.MyColl");
>>         JavaPairRDD<Object, BSONObject> MyColl = sc.newAPIHadoopRDD(
>>                 mongodbConfigInventoryDay,
>>                 MongoInputFormat.class,
>>                 Object.class,
>>                 BSONObject.class
>>         );
>>         JavaPairRDD<Long, MyColl> myCollRdd = myColl.mapToPair(tuple2 -> {
>>             ObjectMapper mapper = new ObjectMapper();
>>             tuple2._2().removeField("_id");
>>             MyColl day = mapper.readValue(tuple2._2().toMap().toString(),
>> MyColl.class);
>>             return new Tuple2<>(Long.valueOf((String)
>> tuple2._2().get("MyCollId")), day);
>>         });
>>
>>         myCollRdd.count();
>>
>>
>> Logs on the driver:
>> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with
>> curMem=253374, maxMem=278019440
>> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in
>> memory (estimated size 117.8 KB, free 264.8 MB)
>> 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with
>> curMem=374038, maxMem=278019440
>> 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
>> bytes in memory (estimated size 12.5 KB, free 264.8 MB)
>> 15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
>> memory on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB)
>> 15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from
>> newAPIHadoopRDD at SparkRunner.java:192
>> 15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to
>> check splits against mongodb://
>> dsc-dbs-0000001.qasql.opentable.com:27017/A.MyColl
>> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null,
>> max= { "_id" : "54e64d626d0bfe0a24ba79b3"}
>> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" :
>> "54e64d646d0bfe0a24ba79e1"}
>> 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "54e64d646d0bfe0a24ba79e1"}, max= { "_id" :
>> "5581d1c3d52db40bc8558c6b"}
>> ......
>> ......
>> 15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "55adf840d3b5be0724224807"}, max= { "_id" :
>> "55adf841b4d2970fb07d7288"}
>> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>>     at org.bson.io.PoolOutputBuffer.<init>(PoolOutputBuffer.java:224)
>>     at org.bson.BasicBSONDecoder.<init>(BasicBSONDecoder.java:499)
>>     at
>> com.mongodb.hadoop.input.MongoInputSplit.<init>(MongoInputSplit.java:59)
>>     at
>> com.mongodb.hadoop.splitter.MongoCollectionSplitter.createSplitFromBounds(MongoCollectionSplitter.java:248)
>>     at
>> com.mongodb.hadoop.splitter.StandaloneMongoSplitter.calculateSplits(StandaloneMongoSplitter.java:157)
>>     at
>> com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:58)
>>     at
>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
>>     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>     at scala.Option.getOrElse(Option.scala:120)
>>     at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>     at
>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>>     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>     at scala.Option.getOrElse(Option.scala:120)
>>     at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
>>     at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
>>     at
>> org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:442)
>>     at
>> org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47)
>>     at runner.SparkRunner.getInventoryDayRdd(SparkRunner.java:205)
>>     at runner.SparkRunner.main(SparkRunner.java:68)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:497)
>>     at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>     at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> --
>> Thanks,
>> -Utkarsh
>>
>
>
>
> --
> Rich
>



-- 
Thanks,
-Utkarsh

Reply via email to