Hi, I am using spark streaming to join every RDD of a DStream to a stand alone RDD to generate a new DStream as followed:
def joinWithBatchEvent(contentFeature: RDD[(String, String)], batchEvent: DStream[((String, String), (Long, Double, Double))]) = { batchEvent.map(event => { (event._1._2, (event._1._1, event._2._1, event._2._2, event._2._3)) }).transform(eventRDD => { eventRDD.leftOuterJoin(contentFeature).map(result => (result._2._1._1, (result._1, result._2._1._2, result._2._1._3, result._2._1._4, result._2._2)) ) }) } It works well when it start from a new StreamContext. But if the StreamContext is restored from checkpoint, there will be an exception as followed and the Graph can not be setup. Do you know how to solve this problem? Thanks very much! 5/09/07 14:07:18 INFO spark.SparkContext: Starting job: saveAsTextFiles at CFBModel.scala:49 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 12 (repartition at EventComponent.scala:64) 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 17 (flatMap at CFBModel.scala:25) 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 20 (map at ContentFeature.scala:100) 15/09/07 14:07:18 WARN scheduler.DAGScheduler: Creating new stage failed due to exception - job: 1 java.lang.IllegalArgumentException: Flat hash tables cannot contain null elements. at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390) at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41) at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123) at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) at scala.collection.mutable.HashSet.contains(HashSet.scala:58) at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) at scala.collection.mutable.AbstractSet.apply(Set.scala:45) at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336) at org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355) at org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218) at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301) at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298) at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310) at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Job 1 failed: saveAsTextFiles at CFBModel.scala:49, took 0.016406 s 15/09/07 14:07:18 ERROR scheduler.JobScheduler: Error running job streaming job 1441605900000 ms.0 java.lang.IllegalArgumentException: Flat hash tables cannot contain null elements. at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390) at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41) at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123) at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) at scala.collection.mutable.HashSet.contains(HashSet.scala:58) at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) at scala.collection.mutable.AbstractSet.apply(Set.scala:45) at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336) at org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355) at org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218) at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301) at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298) at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310) at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Exception in thread "main" java.lang.IllegalArgumentException: Flat hash tables cannot contain null elements. at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390) at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41) at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123) at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) at scala.collection.mutable.HashSet.contains(HashSet.scala:58) at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) at scala.collection.mutable.AbstractSet.apply(Set.scala:45) at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336) at org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355) at org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218) at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301) at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298) at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310) at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Thanks, Hanbin Zheng