Hi All, We are getting exception after we added one RDD to another RDD.
We first declared an empty RDD "A", then received new Dstream "B" from Kafka; for each RDD in the Dstream "B", we kept adding them to the existing RDD "A". Error happened when we were trying to use the updated RDD "A". Could anybody give some hints about what's going wrong here ? Thanks ! *Sample Code :* val sparkConf = new SparkConf().setMaster("local[*]").setAppName(“newAPP") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") ///Declare an empty RDD for A/ @volatile var rddA: RDD[A] = ssc.sparkContext.emptyRDD[A] / //Receives A and B data from Kafka/ val kafkaMessages = KafkaUtils.createStream(ssc, "localhost:2181", "CONSUMERGROUP", Map(“TOPIC" -> 1)).map(_._2)…. val dstreamA: DStream[A] = kafkaMessages.map(msg => JSONConverter.fromJSON[A](msg)).flatMap(_.toOption)… val dstreamB: DStream[B] = kafkaMessages.map(msg => JSONConverter.fromJSON[B](msg)).flatMap(_.toOption)… / //Updates RDD A from DStream A/ dstreamA.foreachRDD(rdd => rddA = rddA ++ rdd ) / //Joins DStream B with existing RDD / val results : Dstream[(A,B)] = dstreamB.transform(rdd => { val id_A: RDD[(String, A)] = rddA.keyBy(_.id) val id_B: RDD[(String, B)] = rdd.keyBy(_.id) val rddA_B: RDD[(A, B)] = id_A.join(id_B).map { case (id, (a, b)) => (a, b) } rddA_B }) >From my observation, the exception happens when we were doing the rddA.keyBy(_.id) *Exception Output :* 14/12/23 15:35:23 WARN storage.BlockManager: Block input-0-1419377723400 already exists on this machine; not re-adding it 14/12/23 15:35:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/23 15:35:34 WARN storage.BlockManager: Block input-0-1419377734600 already exists on this machine; not re-adding it 14/12/23 15:35:37 ERROR scheduler.DAGSchedulerActorSupervisor: eventProcesserActor failed; shutting down SparkContext org.apache.spark.SparkException: Attempted to use BlockRDD[5] at BlockRDD at ReceiverInputDStream.scala:69 after its blocks have been removed! at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83) at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:216) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:216) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:215) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1297) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304) at scala.collection.immutable.List.foreach(List.scala:318) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-after-changing-RDDs-tp20841.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org