Hi All, I am trying to understand what is going on in my simple WordCount Spark Streaming application. Here is the setup -
I have a Kafka producer that is streaming words (lines of text). On the flip side, I have a spark streaming application that uses the high-level Kafka/Spark connector to read in these messages from the kafka topic. The code is straightforward - Using CDH5.1.3 distribution and submitting the job to a yarn cluster SparkConf sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); sparkConf.set("spark.shuffle.manager", "SORT"); sparkConf.set("spark.streaming.unpersist", "true"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); Map<String, String> kafkaConf = new HashMap<String, String>(); kafkaConf.put("zookeeper.connect", zookeeper); kafkaConf.put("group.id", consumerGrp); kafkaConf.put("auto.offset.reset", "largest"); kafkaConf.put("zookeeper.conection.timeout.ms", "1000"); kafkaConf.put("rebalance.max.retries", "20"); kafkaConf.put("rebalance.backoff.ms", "30000"); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, 1); List<JavaPairDStream<byte[], String>> kafkaStreams = new ArrayList<JavaPairDStream<byte[], String>>(); for(int i = 0; i < numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunction<Tuple2<byte[],String>, byte[], String>() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStream<byte[], String> unifiedStream; if (kafkaStreams.size() > 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); return 0; If I remove the code (highlighted) "JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new ReduceWords());", the application works just fine... The moment I introduce the "reduceBykey", I start getting the following error and spark streaming shuts down - 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job 1413323925000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: KafkaStreamingWordCount at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ..... ..... 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage 2 java.util.NoSuchElementException: key not found: Stage 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645) ..... ..... *My assumption as to why it is failing is the following - * The producer application is not continuously streaming data. There are periods where there is no data being produced. On the flip side, Spark Streaming is generating DStream every one second. This DStreams is comprised of RDDs with no data associated with them. Hence, I am wondering if this would cause the "reduceByKey" transformation to throw an exception... Here are some general questions - (1) What happens when there is no data in the stream.... In terms of DStream and underlying RDD? (2) Since DStreams are just a wrapper around all individual RDD for a particular slice of time, I am assuming that these RDD are associated with an empty dataset. Is this correct? (3) What is a generally acceptable solution to weed out these RDD that do not have any data associated with them. Regards, - Jacob