You didn't say what isn't serializable or where the exception occurs, but, is it the same as this issue? https://issues.apache.org/jira/browse/SPARK-4196
On Thu, Nov 6, 2014 at 5:42 AM, Vasu C <[email protected]> wrote: > Dear All, > > I am getting java.io.NotSerializableException for below code. if > jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception > Please help > > JavaStreamingContextFactory contextFactory = new > JavaStreamingContextFactory() { > @Override > public JavaStreamingContext create() { > SparkConf sparkConf = new SparkConf().set("spark.cores.max", "3"); > > final JavaStreamingContext jssc = new JavaStreamingContext( > sparkConf, new Duration(300)); > > final JavaHiveContext javahiveContext = new JavaHiveContext( > jssc.sc()); > > javahiveContext.createParquetFile(Bean.class, > IMPALA_TABLE_LOC, true, new Configuration()) > .registerTempTable(TEMP_TABLE_NAME); > > // TODO create checkpoint directory for fault tolerance > final JavaDStream<String> textFileStream = jssc > .textFileStream(HDFS_FILE_LOC); > > textFileStream > .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() { > > @Override > public Void call(JavaRDD<String> rdd, Time time) > throws Exception { > if (rdd != null) { > if (rdd.count() > 0) { > JavaSchemaRDD schRdd = javahiveContext > .jsonRDD(rdd); > schRdd.insertInto(TEMP_TABLE_NAME); > } > } > return null; > } > }); > jssc.checkpoint(HDFS_CHECKPOINT_DIR); > return jssc; > } > }; > > // Get JavaStreamingContext from checkpoint data or create a new one > JavaStreamingContext context = JavaStreamingContext.getOrCreate( > HDFS_CHECKPOINT_DIR, contextFactory); > > context.start(); // Start the computation > context.awaitTermination(); > > > > Regards, > Vasu --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
