I am trying to run this, a basic mapToPair and then count() to trigger an
action.
4 executors are launched but I don't see any relevant logs on those
executors.

It looks like the the driver is pulling all the data and it runs out of
memory, the dataset is big, so it won't fit on 1 machine.

So what is the issue here? I am using spark in a wrong way in this example?

        Configuration mongodbConfigInventoryDay = new Configuration();
        mongodbConfigInventoryDay.set("mongo.job.input.format",
"com.mongodb.hadoop.MongoInputFormat");
        mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" +
props.getProperty("mongo") + ":27017/A.MyColl");
        JavaPairRDD<Object, BSONObject> MyColl = sc.newAPIHadoopRDD(
                mongodbConfigInventoryDay,
                MongoInputFormat.class,
                Object.class,
                BSONObject.class
        );
        JavaPairRDD<Long, MyColl> myCollRdd = myColl.mapToPair(tuple2 -> {
            ObjectMapper mapper = new ObjectMapper();
            tuple2._2().removeField("_id");
            MyColl day = mapper.readValue(tuple2._2().toMap().toString(),
MyColl.class);
            return new Tuple2<>(Long.valueOf((String)
tuple2._2().get("MyCollId")), day);
        });

        myCollRdd.count();


Logs on the driver:
15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with
curMem=253374, maxMem=278019440
15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 117.8 KB, free 264.8 MB)
15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with
curMem=374038, maxMem=278019440
15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
bytes in memory (estimated size 12.5 KB, free 264.8 MB)
15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB)
15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from
newAPIHadoopRDD at SparkRunner.java:192
15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to
check splits against mongodb://
dsc-dbs-0000001.qasql.opentable.com:27017/A.MyColl
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null,
max= { "_id" : "54e64d626d0bfe0a24ba79b3"}
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"}
15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "54e64d646d0bfe0a24ba79e1"}, max= { "_id" : "5581d1c3d52db40bc8558c6b"}
......
......
15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={ "_id"
: "55adf840d3b5be0724224807"}, max= { "_id" : "55adf841b4d2970fb07d7288"}
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at org.bson.io.PoolOutputBuffer.<init>(PoolOutputBuffer.java:224)
    at org.bson.BasicBSONDecoder.<init>(BasicBSONDecoder.java:499)
    at
com.mongodb.hadoop.input.MongoInputSplit.<init>(MongoInputSplit.java:59)
    at
com.mongodb.hadoop.splitter.MongoCollectionSplitter.createSplitFromBounds(MongoCollectionSplitter.java:248)
    at
com.mongodb.hadoop.splitter.StandaloneMongoSplitter.calculateSplits(StandaloneMongoSplitter.java:157)
    at
com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:58)
    at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
    at
org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:442)
    at
org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47)
    at runner.SparkRunner.getInventoryDayRdd(SparkRunner.java:205)
    at runner.SparkRunner.main(SparkRunner.java:68)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

-- 
Thanks,
-Utkarsh

Reply via email to