I am trying to run this, a basic mapToPair and then count() to trigger an action. 4 executors are launched but I don't see any relevant logs on those executors.
It looks like the the driver is pulling all the data and it runs out of memory, the dataset is big, so it won't fit on 1 machine. So what is the issue here? I am using spark in a wrong way in this example? Configuration mongodbConfigInventoryDay = new Configuration(); mongodbConfigInventoryDay.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); mongodbConfigInventoryDay.set("mongo.input.uri", "mongodb://" + props.getProperty("mongo") + ":27017/A.MyColl"); JavaPairRDD<Object, BSONObject> MyColl = sc.newAPIHadoopRDD( mongodbConfigInventoryDay, MongoInputFormat.class, Object.class, BSONObject.class ); JavaPairRDD<Long, MyColl> myCollRdd = myColl.mapToPair(tuple2 -> { ObjectMapper mapper = new ObjectMapper(); tuple2._2().removeField("_id"); MyColl day = mapper.readValue(tuple2._2().toMap().toString(), MyColl.class); return new Tuple2<>(Long.valueOf((String) tuple2._2().get("MyCollId")), day); }); myCollRdd.count(); Logs on the driver: 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(120664) called with curMem=253374, maxMem=278019440 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 117.8 KB, free 264.8 MB) 15/09/12 21:07:45 INFO MemoryStore: ensureFreeSpace(12812) called with curMem=374038, maxMem=278019440 15/09/12 21:07:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 12.5 KB, free 264.8 MB) 15/09/12 21:07:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.70.7.135:58291 (size: 12.5 KB, free: 265.1 MB) 15/09/12 21:07:45 INFO SparkContext: Created broadcast 1 from newAPIHadoopRDD at SparkRunner.java:192 15/09/12 21:07:45 INFO StandaloneMongoSplitter: Running splitvector to check splits against mongodb:// dsc-dbs-0000001.qasql.opentable.com:27017/A.MyColl 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min=null, max= { "_id" : "54e64d626d0bfe0a24ba79b3"} 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id" : "54e64d626d0bfe0a24ba79b3"}, max= { "_id" : "54e64d646d0bfe0a24ba79e1"} 15/09/12 21:07:46 INFO MongoCollectionSplitter: Created split: min={ "_id" : "54e64d646d0bfe0a24ba79e1"}, max= { "_id" : "5581d1c3d52db40bc8558c6b"} ...... ...... 15/09/12 21:08:22 INFO MongoCollectionSplitter: Created split: min={ "_id" : "55adf840d3b5be0724224807"}, max= { "_id" : "55adf841b4d2970fb07d7288"} Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at org.bson.io.PoolOutputBuffer.<init>(PoolOutputBuffer.java:224) at org.bson.BasicBSONDecoder.<init>(BasicBSONDecoder.java:499) at com.mongodb.hadoop.input.MongoInputSplit.<init>(MongoInputSplit.java:59) at com.mongodb.hadoop.splitter.MongoCollectionSplitter.createSplitFromBounds(MongoCollectionSplitter.java:248) at com.mongodb.hadoop.splitter.StandaloneMongoSplitter.calculateSplits(StandaloneMongoSplitter.java:157) at com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:58) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:442) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47) at runner.SparkRunner.getInventoryDayRdd(SparkRunner.java:205) at runner.SparkRunner.main(SparkRunner.java:68) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Thanks, -Utkarsh