Java Kafka Word Count Issue
Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Java Kafka Word Count Issue
Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -Sean Owen so...@cloudera.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: Sean Owen so...@cloudera.com Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Java Kafka Word Count Issue
First I would check your code to see how you are pushing records into the topic. Is it reading the whole file each time and resending all of it? Then see if you are using the same consumer.id on the Spark side. Otherwise you are not reading from the same offset when restarting Spark but instead reading from the default defined in Kafka by auto.offset.reset, which you may be setting to 'smallest'. This is why I think this is likely an issue with how you use Kafka. On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Java Kafka Word Count Issue
You can use updateStateByKey() to perform the above operation. On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -Sean Owen so...@cloudera.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: Sean Owen so...@cloudera.com Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Java Kafka Word Count Issue
Hi, I added checkpoint directory and now Using updateStateByKey() import com.google.common.base.Optional; Function2ListInteger, OptionalInteger, OptionalInteger updateFunction = new Function2ListInteger, OptionalInteger, OptionalInteger() { @Override public OptionalInteger call(ListInteger values, OptionalInteger state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); } }; JavaPairDStreamString, Integer runningCounts = pairs.updateStateByKey(updateFunction); But I didn't get what exactly I should assign in Integer newSum = ... // add the new values with the previous running count to get the new count Thanks and regards Shweta Jadhav -VISHNU SUBRAMANIAN johnfedrickena...@gmail.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: VISHNU SUBRAMANIAN johnfedrickena...@gmail.com Date: 02/02/2015 04:39PM Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Java Kafka Word Count Issue You can use updateStateByKey() to perform the above operation. On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -Sean Owen so...@cloudera.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: Sean Owen so...@cloudera.com Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you