Hi All, I am facing a weird situation which is explained below.
Scenario and Problem: I want to add two attributes to JSON object based on the look up table values and insert the JSON to Mongo DB. I have broadcast variable which holds look up table. However, i am not being able to access it inside foreachPartition as you can see in the code. It does not give me any error but simply does not display anything. Also, because of it i cant insert JSON to Mongo DB. I cant find any explanation to this behaviour. Any explanation or work around to make it work is much appreciated. Note: I am using spark streaming and the file comes as micro batch to HDFS. I want to be able to use more cores in spark streaming (i have 16 cores available) so that processing can be done faster. Thanks. Regards, Prajwol Here is my full code: object ProcessMicroBatchStreams { val calculateDistance = udf { (lat: String, lon: String) => GeoHash.getDistance(lat.toDouble, lon.toDouble) } val DB_NAME = "IRT" val COLLECTION_NAME = "sensordata" val records = Array[String]() def main(args: Array[String]): Unit = { if (args.length < 0) { System.err.println("Usage: ProcessMicroBatchStreams <master> <input_directory>") System.exit(1) } val conf = new SparkConf() .setMaster("local[*]") .setAppName(this.getClass.getCanonicalName) .set("spark.hadoop.validateOutputSpecs", "false") /*.set("spark.executor.instances", "3") .set("spark.executor.memory", "18g") .set("spark.executor.cores", "9") .set("spark.task.cpus", "1") .set("spark.driver.memory", "10g")*/ val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(60)) val sqc = new SQLContext(sc) val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2) val broadcastTable = sc.broadcast(gpsLookUpTable) ssc.textFileStream("hdfs://localhost:9000/inputDirectory/") .foreachRDD { rdd => //broadcastTable.value.show() // I can access broadcast value here if (!rdd.partitions.isEmpty) { val partitionedRDD = rdd.repartition(4) partitionedRDD.foreachPartition { partition => println("Inside Partition") broadcastTable.value.show() // I cannot access broadcast value here partition.foreach { row => val items = row.split("\n") items.foreach { item => val mongoColl = MongoClient()(DB_NAME)(COLLECTION_NAME) val jsonObject = new JSONObject(item) val latitude = jsonObject.getDouble(Constants.LATITUDE) val longitude = jsonObject.getDouble(Constants.LONGITUDE) // The broadcast value is not being shown here // However, there is no error shown // I cannot insert the value into Mongo DB val selectedRow = broadcastTable.value .filter("geoCode LIKE '" + GeoHash.subString(latitude, longitude) + "%'") .withColumn("Distance", calculateDistance(col("Lat"), col("Lon"))) .orderBy("Distance") .select(Constants.TRACK_KM, Constants.TRACK_NAME).take(1) if (selectedRow.length != 0) { jsonObject.put(Constants.TRACK_KM, selectedRow(0).get(0)) jsonObject.put(Constants.TRACK_NAME, selectedRow(0).get(1)) } else { jsonObject.put(Constants.TRACK_KM, "NULL") jsonObject.put(Constants.TRACK_NAME, "NULL") } val record = JSON.parse(jsonObject.toString()).asInstanceOf[DBObject] mongoColl.insert(record) } } } } } sys.addShutdownHook { ssc.stop(true, true) } ssc.start() ssc.awaitTermination() } }