I am trying to use spark streaming application in java. My spark application reads continuous feed from hadoop directory using textFileStream() at interval of each 1 Min. I need to perform spark aggregation(group by) operation on incoming DStream. After aggregation, I am joining aggregated DStream with RDD with RDD created from static dataset read by textFile() from hadoop directory.
Problem comes when I enable check pointing. With empty checkpoint directory, it runs fine. After running 2-3 batches I close it using ctrl+c and run it again. On second run it throws spark exception immediately: "SPARK-5063" /"Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063"/ *Following is the Block of Code of spark application:* private void compute(JavaSparkContext sc, JavaStreamingContext ssc) { JavaRDD<String> distFile = sc.textFile(MasterFile); JavaDStream<String> file = ssc.textFileStream(inputDir); // Read Master file JavaRDD<MasterParseLog> masterLogLines = distFile.flatMap(EXTRACT_MASTER_LOGLINES); final JavaPairRDD<String, String> masterRDD = masterLogLines.mapToPair(MASTER_KEY_VALUE_MAPPER); // Continuous Streaming file JavaDStream<ParseLog> logLines = file.flatMap(EXTRACT_CKT_LOGLINES); // calculate the sum of required field and generate group sum RDD JavaPairDStream<String, Summary> sumRDD = logLines.mapToPair(CKT_GRP_MAPPER); JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKey(CKT_GRP_SUM); //GROUP BY Operation JavaPairDStream<String, Summary> grpAvgRDD = grpSumRDD.mapToPair(CKT_GRP_AVG); // Join Master RDD with the DStream //This is the block causing error (without it code is working fine) JavaPairDStream<String, Tuple2<String, String>> joinedStream = grpAvgRDD.transformToPair( new Function2<JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() { private static final long serialVersionUID = 1L; public JavaPairRDD<String, Tuple2<String, String>> call( JavaPairRDD<String, String> rdd, Time v2) throws Exception { return masterRDD.value().join(rdd); } } ); joinedStream.print(10); } public static void main(String[] args) { JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { // Create the context with a 60 second batch size SparkConf sparkConf = new SparkConf(); final JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaStreamingContext ssc1 = new JavaStreamingContext(sc, Durations.seconds(duration)); app.compute(sc, ssc1); ssc1.checkpoint(checkPointDir); return ssc1; } }; JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkPointDir, contextFactory); // start the streaming server ssc.start(); logger.info("Streaming server started..."); // wait for the computations to finish ssc.awaitTermination(); logger.info("Streaming server stopped..."); } I know that block of code which joins static dataset with DStream is causing error, But that is taken from spark-streaming page of Apache spark website (sub heading "stream-dataset join" under "Join Operations"). Please help me to get it working even if there is different way of doing it. I need to enable checkpointing in my streaming application. *Environment Details:* Centos6.5 :2 node Cluster Java :1.8 Spark :1.4.1 Hadoop :2.7.1 I have also posted same question on stackoverflow. you can have a look http://stackoverflow.com/questions/32378296/spark-checkpoining-error-when-joining-static-dataset-with-dstream Please help me to get it working. Regards, Rajneesh. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoining-error-when-joining-static-dataset-with-DStream-tp24590.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org