The results are no different - import org.apache.spark.api.java.function.Function2; import java.io.Serializable;
public class ReduceWords implements Serializable, Function2<Integer, Integer, Integer> { private static final long serialVersionUID = -6076139388549335886L; public Integer call(Integer first, Integer second){ return first + second; } } Same exception -- 14/10/14 20:04:47 ERROR JobScheduler: Error running job streaming job 1413331487000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: KafkaStreamingWordCount at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) ........ ......... 14/10/14 20:04:47 ERROR DAGSchedulerEventProcessActor: key not found: Stage 2 java.util.NoSuchElementException: key not found: Stage 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) ...... ...... On Tue, Oct 14, 2014 at 4:56 PM, Michael Campbell < michael.campb...@gmail.com> wrote: > Do you get any different results if you have ReduceWords actually > implement java.io.Serializable? > > On Tue, Oct 14, 2014 at 7:35 PM, Abraham Jacob <abe.jac...@gmail.com> > wrote: > >> Yeah... it totally should be... There is nothing fancy in there - >> >> >> import org.apache.spark.api.java.function.Function2; >> >> public class ReduceWords implements Function2<Integer, Integer, Integer> { >> >> private static final long serialVersionUID = -6076139388549335886L; >> >> public Integer call(Integer first, Integer second){ >> return first + second; >> } >> } >> >> >> >> >> On Tue, Oct 14, 2014 at 4:16 PM, Stephen Boesch <java...@gmail.com> >> wrote: >> >>> Is ReduceWords serializable? >>> >>> 2014-10-14 16:11 GMT-07:00 Abraham Jacob <abe.jac...@gmail.com>: >>> >>> >>>> Hi All, >>>> >>>> I am trying to understand what is going on in my simple WordCount Spark >>>> Streaming application. Here is the setup - >>>> >>>> I have a Kafka producer that is streaming words (lines of text). On the >>>> flip side, I have a spark streaming application that uses the high-level >>>> Kafka/Spark connector to read in these messages from the kafka topic. The >>>> code is straightforward - >>>> Using CDH5.1.3 distribution and submitting the job to a yarn cluster >>>> >>>> >>>> SparkConf sparkConf = new >>>> SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); >>>> sparkConf.set("spark.shuffle.manager", "SORT"); >>>> sparkConf.set("spark.streaming.unpersist", "true"); >>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new >>>> Duration(1000)); >>>> Map<String, String> kafkaConf = new HashMap<String, String>(); >>>> kafkaConf.put("zookeeper.connect", zookeeper); >>>> kafkaConf.put("group.id", consumerGrp); >>>> kafkaConf.put("auto.offset.reset", "largest"); >>>> kafkaConf.put("zookeeper.conection.timeout.ms", "1000"); >>>> kafkaConf.put("rebalance.max.retries", "20"); >>>> kafkaConf.put("rebalance.backoff.ms", "30000"); >>>> Map<String, Integer> topicMap = new HashMap<String, Integer>(); >>>> topicMap.put(topic, 1); >>>> List<JavaPairDStream<byte[], String>> kafkaStreams = new >>>> ArrayList<JavaPairDStream<byte[], String>>(); >>>> for(int i = 0; i < numPartitions; i++) { >>>> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, >>>> String.class, >>>> DefaultDecoder.class, PayloadDeSerializer.class, >>>> kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new >>>> PairFunction<Tuple2<byte[],String>, byte[], String>() { >>>> >>>> private static final long serialVersionUID = -1936810126415608167L; >>>> >>>> public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) >>>> throws Exception { >>>> return tuple2; >>>> } >>>> } >>>> ) >>>> ); >>>> } >>>> >>>> JavaPairDStream<byte[], String> unifiedStream; >>>> if (kafkaStreams.size() > 1) { >>>> unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, >>>> kafkaStreams.size())); >>>> } else { >>>> unifiedStream = kafkaStreams.get(0); >>>> } >>>> JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines()); >>>> JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new >>>> MapWords()); >>>> wordMap = wordMap.filter(new wordFilter()); >>>> JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new >>>> ReduceWords()); >>>> wordCount.print(); >>>> jssc.start(); >>>> jssc.awaitTermination(); >>>> return 0; >>>> >>>> If I remove the code (highlighted) "JavaPairDStream<String, Integer> >>>> wordCount = wordMap.reduceByKey(new ReduceWords());", the application works >>>> just fine... >>>> The moment I introduce the "reduceBykey", I start getting the following >>>> error and spark streaming shuts down - >>>> >>>> 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job >>>> 1413323925000 ms.0 >>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>>> not serializable: java.io.NotSerializableException: KafkaStreamingWordCount >>>> at org.apache.spark.scheduler.DAGScheduler.org >>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) >>>> at >>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>> at >>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>> ..... >>>> ..... >>>> >>>> 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: >>>> Stage 2 >>>> java.util.NoSuchElementException: key not found: Stage 2 >>>> at scala.collection.MapLike$class.default(MapLike.scala:228) >>>> at scala.collection.AbstractMap.default(Map.scala:58) >>>> at scala.collection.mutable.HashMap.apply(HashMap.scala:64) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) >>>> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645) >>>> ..... >>>> ..... >>>> >>>> >>>> *My assumption as to why it is failing is the following - * >>>> >>>> The producer application is not continuously streaming data. There are >>>> periods where there is no data being produced. On the flip side, Spark >>>> Streaming is generating DStream every one second. This DStreams is >>>> comprised of RDDs with no data associated with them. Hence, I am wondering >>>> if this would cause the "reduceByKey" transformation to throw an >>>> exception... >>>> >>>> Here are some general questions - >>>> (1) What happens when there is no data in the stream.... In terms of >>>> DStream and underlying RDD? >>>> (2) Since DStreams are just a wrapper around all individual RDD for a >>>> particular slice of time, I am assuming that these RDD are associated with >>>> an empty dataset. Is this correct? >>>> (3) What is a generally acceptable solution to weed out these RDD that >>>> do not have any data associated with them. >>>> >>>> >>>> Regards, >>>> - Jacob >>>> >>> >>> >> >> >> -- >> ~ >> > > -- ~