Java Kafka Word Count Issue
Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Java Kafka Word Count Issue
Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -Sean Owen so...@cloudera.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: Sean Owen so...@cloudera.com Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Java Kafka Word Count Issue
Hi, I added checkpoint directory and now Using updateStateByKey() import com.google.common.base.Optional; Function2ListInteger, OptionalInteger, OptionalInteger updateFunction = new Function2ListInteger, OptionalInteger, OptionalInteger() { @Override public OptionalInteger call(ListInteger values, OptionalInteger state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); } }; JavaPairDStreamString, Integer runningCounts = pairs.updateStateByKey(updateFunction); But I didn't get what exactly I should assign in Integer newSum = ... // add the new values with the previous running count to get the new count Thanks and regards Shweta Jadhav -VISHNU SUBRAMANIAN johnfedrickena...@gmail.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: VISHNU SUBRAMANIAN johnfedrickena...@gmail.com Date: 02/02/2015 04:39PM Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Java Kafka Word Count Issue You can use updateStateByKey() to perform the above operation. On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -Sean Owen so...@cloudera.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: Sean Owen so...@cloudera.com Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration
Hi, I am running streaning word count program in Spark Standalone mode cluster, having four machines in cluster. public final class JavaKafkaStreamingWordCount { private static final Pattern SPACE = Pattern.compile( ); static transient Configuration conf; private JavaKafkaStreamingWordCount() { } public static void main(String[] args) { if (args.length 4) { System.err.println(Usage: JavaKafkaWordCount zkQuorum group topics numThreads); System.exit(1); } StreamingExamples.setStreamingLogLevels(); SparkConf sparkConf = new SparkConf().setAppName(JavaKafkaWordCount); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1)); jssc.checkpoint(hdfs://172.17.199.229:8020/spark/wordcountKafkaCheckpoint); int numThreads = Integer.parseInt(args[3]); MapString, Integer topicMap = new HashMapString, Integer(); String[] topics = args[2].split(//,); for (String topic: topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStreamString lines = messages.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStreamString, Integer pairs = words.mapToPair( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }); Function2ListInteger, OptionalInteger, OptionalInteger updateFunction = new Function2ListInteger, OptionalInteger, OptionalInteger() { @Override public OptionalInteger call(ListInteger values, OptionalInteger state) { Integer newSum = 0; if(state.isPresent()){ if(values.size()!=0){ newSum = state.get(); for(int temp : values){ newSum += temp; } }else{ newSum = state.get(); } } else{ if(values.size()!=0){ for(int temp : values){ newSum += 1; } } } return Optional.of(newSum); } }; JavaPairDStreamString, Integer runningCounts = pairs.updateStateByKey(updateFunction); conf = new Configuration(); runningCounts.saveAsNewAPIHadoopFiles(hdfs://172.17.199.229:8020/spark/wordCountOutput/word, stream, Text.class, Text.class, (Class? extends org.apache.hadoop.mapreduce.OutputFormat?, ?)TextOutputFormat.class,conf); //jssc.sparkContext().hadoopConfiguration(); jssc.start(); jssc.awaitTermination(); } } This is working fine in one node cluster but its giving following error when i try to run the same in cluster. 15/02/17 12:57:10 ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at
Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError
Hi, I am trying one transformation by calling scala method this scala method returns MutableList[AvroObject] def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): scala.collection.mutable.MutableList[AvroObject] Hence, the output of transaformation is RDD[MutableList[AvroObject]] But I want o/p as RDD[AvroObject] I tried applying foreach on RDD[MutableList[AvroObject]] -- RDD[AvroObject] var uA = sparkContext.accumulableCollection[MutableList[AvroObject], universe](MutableList[AvroObject]()) rdd_list_avroObj.foreach(u = { uA ++= u }) var uRDD = sparkContext.parallelize(uA.value) Its failing on large dataset with following error java.io.IOException: java.lang.StackOverflowError at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140) at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.StackOverflowError at java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359) at java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) I have two queries regarding this issue: Option 1: REplacement of accumulator Option 2: In scala method instead of returning List[AvroObject] can I send multiple AvroObject. SO that I'll get RDD[AvroObject] Note: I am using Saprk 1.3.0 Input DataSize 200GB Cluster 3 Machines(2 Cores, 8GB) Spark running in YARN Mode Thanks Regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError
Hi, I am trying one transformation by calling scala method this scala method returns MutableList[AvroObject] def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): scala.collection.mutable.MutableList[AvroObject] Hence, the output of transaformation is RDD[MutableList[AvroObject]] But I want o/p as RDD[AvroObject] I tried applying foreach on RDD[MutableList[AvroObject]] -- RDD[AvroObject] var uA = sparkContext.accumulableCollection[MutableList[AvroObject], universe](MutableList[AvroObject]()) rdd_list_avroObj.foreach(u = { uA ++= u }) var uRDD = sparkContext.parallelize(uA.value) Its failing on large dataset with following error java.io.IOException: java.lang.StackOverflowError at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140) at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.StackOverflowError at java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359) at java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) I have two queries regarding this issue: Option 1: REplacement of accumulator Option 2: In scala method instead of returning List[AvroObject] can I send multiple AvroObject. SO that I'll get RDD[AvroObject] Note: I am using Saprk 1.3.0 Input DataSize 200GB Cluster 3 Machines(2 Cores, 8GB) Spark running in YARN Mode Thanks Regards Shweta Jadhav Tata Consultancy Services Limited Cell:- +91-9867515614 Mailto: jadhav.shw...@tcs.com Website: http://www.tcs.com Experience certainty. IT Services Business Solutions Consulting =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you