Try to add a filter to remove/replace the null elements within/before the map operation.
Thanks Best Regards On Mon, Sep 7, 2015 at 3:34 PM, ZhengHanbin <hanbin_zh...@163.com> wrote: > Hi, > > I am using spark streaming to join every RDD of a DStream to a stand alone > RDD to generate a new DStream as followed: > > *def joinWithBatchEvent(contentFeature: RDD[(String, String)],* > * batchEvent: DStream[((String, String), (Long, > Double, Double))]) = {* > * batchEvent.map(event => {* > * (event._1._2, (event._1._1, event._2._1, event._2._2, event._2._3))* > * }).transform(eventRDD => {* > * eventRDD.leftOuterJoin(contentFeature).map(result =>* > * (result._2._1._1, (result._1, result._2._1._2, result._2._1._3, > result._2._1._4, result._2._2))* > * )* > * })* > *}* > > It works well when it start from a new StreamContext. > But if the StreamContext is restored from checkpoint, there will be an > exception as followed and the Graph can not be setup. > Do you know how to solve this problem? Thanks very much! > > 5/09/07 14:07:18 INFO spark.SparkContext: Starting job: saveAsTextFiles at > CFBModel.scala:49 > 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 12 > (repartition at EventComponent.scala:64) > 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 17 (flatMap > at CFBModel.scala:25) > 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Registering RDD 20 (map at > ContentFeature.scala:100) > 15/09/07 14:07:18 WARN scheduler.DAGScheduler: Creating new stage failed > due to exception - job: 1 > java.lang.IllegalArgumentException: Flat hash tables cannot contain null > elements. > at > scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390) > at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41) > at > scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123) > at > scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) > at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) > at scala.collection.mutable.HashSet.contains(HashSet.scala:58) > at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) > at scala.collection.mutable.AbstractSet.apply(Set.scala:45) > at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336) > at > org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355) > at > org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298) > at > org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310) > at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > 15/09/07 14:07:18 INFO scheduler.DAGScheduler: Job 1 failed: > saveAsTextFiles at CFBModel.scala:49, took 0.016406 s > 15/09/07 14:07:18 ERROR scheduler.JobScheduler: Error running job > streaming job 1441605900000 ms.0 > java.lang.IllegalArgumentException: Flat hash tables cannot contain null > elements. > at > scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390) > at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41) > at > scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123) > at > scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) > at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) > at scala.collection.mutable.HashSet.contains(HashSet.scala:58) > at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) > at scala.collection.mutable.AbstractSet.apply(Set.scala:45) > at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336) > at > org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355) > at > org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298) > at > org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310) > at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Exception in thread "main" java.lang.IllegalArgumentException: Flat hash > tables cannot contain null elements. > at > scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:390) > at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41) > at > scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123) > at > scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) > at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) > at scala.collection.mutable.HashSet.contains(HashSet.scala:58) > at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) > at scala.collection.mutable.AbstractSet.apply(Set.scala:45) > at org.apache.spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:336) > at > org.apache.spark.scheduler.DAGScheduler.getAncestorShuffleDependencies(DAGScheduler.scala:355) > at > org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:317) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:218) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:301) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:298) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298) > at > org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310) > at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > Thanks, > Hanbin Zheng > >