Hi All,

I am trying to understand what is going on in my simple WordCount Spark
Streaming application. Here is the setup -

I have a Kafka producer that is streaming words (lines of text). On the
flip side, I have a spark streaming application that uses the high-level
Kafka/Spark connector to read in these messages from the kafka topic. The
code is straightforward  -
Using CDH5.1.3 distribution and submitting the job to a yarn cluster


SparkConf sparkConf = new
SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
sparkConf.set("spark.shuffle.manager", "SORT");
sparkConf.set("spark.streaming.unpersist", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
 Map<String, String> kafkaConf = new HashMap<String, String>();
kafkaConf.put("zookeeper.connect", zookeeper);
kafkaConf.put("group.id", consumerGrp);
kafkaConf.put("auto.offset.reset", "largest");
kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
kafkaConf.put("rebalance.max.retries", "20");
kafkaConf.put("rebalance.backoff.ms", "30000");
 Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
 List<JavaPairDStream<byte[], String>> kafkaStreams = new
ArrayList<JavaPairDStream<byte[], String>>();
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunction<Tuple2<byte[],String>, byte[], String>() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
Exception {
return tuple2;
}
}
)
);
}

JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines());
JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords());
wordMap = wordMap.filter(new wordFilter());
JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
ReduceWords());
 wordCount.print();
jssc.start();
jssc.awaitTermination();
 return 0;

If I remove the code (highlighted) "JavaPairDStream<String, Integer>
wordCount = wordMap.reduceByKey(new ReduceWords());", the application works
just fine...
The moment I introduce the "reduceBykey", I start getting the following
error and spark streaming shuts down -

14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job
1413323925000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException: KafkaStreamingWordCount
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        .....
.....

14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage
2
java.util.NoSuchElementException: key not found: Stage 2
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645)
.....
.....


*My assumption as to why it is failing is the following - *

The producer application is not continuously streaming data. There are
periods where there is no data being produced.  On the flip side, Spark
Streaming is generating DStream every one second. This DStreams is
comprised of RDDs with no data associated with them. Hence, I am wondering
if this would cause the "reduceByKey" transformation to throw an
exception...

Here are some general questions -
(1) What happens when there is no data in the stream.... In terms of
DStream and underlying RDD?
(2) Since DStreams are just a wrapper around all individual RDD for a
particular slice of time, I am assuming that these RDD are associated with
an empty dataset. Is this correct?
(3) What is a generally acceptable solution to weed out these RDD that do
not have any data associated with them.


Regards,
- Jacob

Reply via email to