Please note: I have asked the following question in stackoverflow as well http://stackoverflow.com/questions/41729451/adding-to-spark-streaming-dstream-rdd-the-max-line-of-each-rdd
I am trying to add to each RDD in a JavaDStream the line with the maximum timestamp, with some modification. However, I keep getting a serialization error despite the fact that I do not have any class which is not serialized. Below is the code sample, which is throwing the error: JavaDStream<LogMessage> logMessageWithHB = logMessageMatched.transform(new Function<JavaRDD<LogMessage>, JavaRDD<LogMessage>>() { @Override public JavaRDD<LogMessage> call(JavaRDD<LogMessage> logMessageJavaRDD) throws Exception { LogMessage max = logMessageJavaRDD.max(LogMessageComparator); List<LogMessage> tempList = new ArrayList<LogMessage>(); max.convertToHBLogMessage(); tempList.add(max); JavaRDD<LogMessage> parallelize = ssc.sparkContext().parallelize(tempList); JavaRDD<LogMessage> union = logMessageJavaRDD.union(parallelize); return union; } }); The following is the error, but it does not really tell me the root-cause: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable org.apache.spark.streaming.api.java.JavaStreamingContext Serialization stack: - object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: org.apache.spark.streaming.api.java.JavaStreamingContext@4b62f99b) - field (class: org.necla.ngla.loganalyzer.stateful.Type9.Type9ViolationChecker$6, name: val$ssc, type: class org.apache.spark.streaming.api.java.JavaStreamingContext) - object (class org.necla.ngla.loganalyzer.stateful.Type9.Type9ViolationChecker$6, org.necla.ngla.loganalyzer.stateful.Type9.Type9ViolationChecker$6@6ced9607) - field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1, name: transformFunc$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1, <function1>) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>) - field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.TransformedDStream, org.apache.spark.streaming.dstream.TransformedDStream@1df376a4) - field (class: org.apache.spark.streaming.dstream.MappedDStream, name: parent, type: class org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.MappedDStream, org.apache.spark.streaming.dstream.MappedDStream@68926b04) - field (class: org.apache.spark.streaming.dstream.ShuffledDStream, name: parent, type: class org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.ShuffledDStream, org.apache.spark.streaming.dstream.ShuffledDStream@1aa732d9) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.MapValuedDStream, org.apache.spark.streaming.dstream.MapValuedDStream@39579072) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) Can anyone please suggest what I am doing wrong? Please let me know if you need more information. Thanks Nipun