Hey Rachana, There are two jobs in your codes actually: `rdd.isEmpty` and `rdd.saveAsTextFile`. Since you don't cache or checkpoint this rdd, it will execute your map function twice for each record.
You can move "accum.add(1)" to "rdd.saveAsTextFile" like this: JavaDStream<String> lines = messages.map( new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { LOG.info("################# Input json stream data ################# " + tuple2._2); return tuple2._2(); } }); lines.foreachRDD(new Function<JavaRDD<String>, Void>() { public Void call(JavaRDD<String> rdd) throws Exception { if (!rdd.isEmpty() || !rdd.partitions().isEmpty()) { rdd.map(new Function<String, String>() { public String call(String str) { accum.add(1); return str; } }).saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text"); } System.out.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS " + accum.value()); return null; } }); On Tue, Jan 5, 2016 at 8:37 AM, Rachana Srivastava < rachana.srivast...@markmonitor.com> wrote: > Thanks a lot for your prompt response. I am pushing one message. > > > > HashMap<String, String> kafkaParams = *new* HashMap<String, String>(); > kafkaParams.put("metadata.broker.list", "localhost:9092"); > kafkaParams.put("zookeeper.connect", "localhost:2181"); > > JavaPairInputDStream<String, String> messages = KafkaUtils. > *createDirectStream*( jssc, String.*class*, String.*class*, StringDecoder. > *class*, StringDecoder.*class*, kafkaParams, topicsSet); > > *final** Accumulator<Integer> **accum** = **jssc* > *.sparkContext().accumulator(0);* > > JavaDStream<String> lines = messages.map( > > *new* *Function<Tuple2<String, String>, String>()* { > > *public* String call(Tuple2<String, String> tuple2) { *LOG* > .info("################# Input json stream data ################# " + > tuple2._2);*accum**.add(1);* *return* tuple2._2(); > > } }); > > lines.foreachRDD(*new* *Function<JavaRDD<String>, Void>()* { > > *public* Void call(JavaRDD<String> rdd) *throws* Exception { > > *if*(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ rdd.saveAsTextFile( > "hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");} > > System.*out*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS " + > *accum**.value(*)); *return* *null*;} > > }); > > jssc.start(); > > > > If I remove this saveAsTextFile I get correct count with this line I am > getting double counting. > > > > *Here are the Stack trace with SaveAsText statement Please see double > counting below:* > > > > &&&&&&&&&&&&&&&&&&&&&&& BEFORE COUNT OF ACCUMULATOR IS &&&&&&&&&&&&&&& 0 > > INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at > KafkaURLStreaming.java:90 > > INFO : org.apache.spark.scheduler.DAGScheduler - Got job 0 (foreachRDD at > KafkaURLStreaming.java:90) with 1 output partitions > > INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage > 0(foreachRDD at KafkaURLStreaming.java:90) > > INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage: > List() > > INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List() > > INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 0 > (MapPartitionsRDD[1] at map at KafkaURLStreaming.java:83), which has no > missing parents > > INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(3856) called > with curMem=0, maxMem=1893865881 > > INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0 stored as > values in memory (estimated size 3.8 KB, free 1806.1 MB) > > INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2225) called > with curMem=3856, maxMem=1893865881 > > INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0_piece0 > stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB) > > INFO : org.apache.spark.storage.BlockManagerInfo - Added > broadcast_0_piece0 in memory on localhost:51637 (size: 2.2 KB, free: 1806.1 > MB) > > INFO : org.apache.spark.SparkContext - Created broadcast 0 from broadcast > at DAGScheduler.scala:861 > > INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing > tasks from ResultStage 0 (MapPartitionsRDD[1] at map at > KafkaURLStreaming.java:83) > > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 > with 1 tasks > > INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in > stage 0.0 (TID 0, localhost, ANY, 2026 bytes) > > INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 > (TID 0) > > INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, > partition 0 offsets 36 -> 37 > > INFO : kafka.utils.VerifiableProperties - Verifying properties > > INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes > is overridden to 1073741824 > > INFO : kafka.utils.VerifiableProperties - Property group.id is overridden > to > > INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is > overridden to localhost:2181 > > INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - ################# > Input json stream data ################# one test message > > INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 > (TID 0). 972 bytes result sent to driver > > INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (foreachRDD > at KafkaURLStreaming.java:90) finished in 0.133 s > > INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in > stage 0.0 (TID 0) in 116 ms on localhost (1/1) > > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, > whose tasks have all completed, from pool > > INFO : org.apache.spark.scheduler.DAGScheduler - Job 0 finished: > foreachRDD at KafkaURLStreaming.java:90, took 0.496657 s > > INFO : org.apache.spark.ContextCleaner - Cleaned accumulator 2 > > INFO : org.apache.spark.storage.BlockManagerInfo - Removed > broadcast_0_piece0 on localhost:51637 in memory (size: 2.2 KB, free: 1806.1 > MB) > > INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.tip.id > is deprecated. Instead, use mapreduce.task.id > > INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.task.id > is deprecated. Instead, use mapreduce.task.attempt.id > > INFO : org.apache.hadoop.conf.Configuration.deprecation - > mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap > > INFO : org.apache.hadoop.conf.Configuration.deprecation - > mapred.task.partition is deprecated. Instead, use mapreduce.task.partition > > INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.job.id > is deprecated. Instead, use mapreduce.job.id > > INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at > KafkaURLStreaming.java:90 > > INFO : org.apache.spark.scheduler.DAGScheduler - Got job 1 (foreachRDD at > KafkaURLStreaming.java:90) with 1 output partitions > > INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage > 1(foreachRDD at KafkaURLStreaming.java:90) > > INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage: > List() > > INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List() > > INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 1 > (MapPartitionsRDD[2] at foreachRDD at KafkaURLStreaming.java:90), which has > no missing parents > > INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(97104) > called with curMem=0, maxMem=1893865881 > > INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as > values in memory (estimated size 94.8 KB, free 1806.0 MB) > > INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(32204) > called with curMem=97104, maxMem=1893865881 > > INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 > stored as bytes in memory (estimated size 31.4 KB, free 1806.0 MB) > > INFO : org.apache.spark.storage.BlockManagerInfo - Added > broadcast_1_piece0 in memory on localhost:51637 (size: 31.4 KB, free: > 1806.1 MB) > > INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast > at DAGScheduler.scala:861 > > INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing > tasks from ResultStage 1 (MapPartitionsRDD[2] at foreachRDD at > KafkaURLStreaming.java:90) > > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 > with 1 tasks > > INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in > stage 1.0 (TID 1, localhost, ANY, 2026 bytes) > > INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 > (TID 1) > > INFO : org.apache.hadoop.conf.Configuration.deprecation - > mapred.output.dir is deprecated. Instead, use > mapreduce.output.fileoutputformat.outputdir > > INFO : org.apache.hadoop.conf.Configuration.deprecation - > mapred.output.key.class is deprecated. Instead, use > mapreduce.job.output.key.class > > INFO : org.apache.hadoop.conf.Configuration.deprecation - > mapred.output.value.class is deprecated. Instead, use > mapreduce.job.output.value.class > > INFO : org.apache.hadoop.conf.Configuration.deprecation - > mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir > > INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, > partition 0 offsets 36 -> 37 > > INFO : kafka.utils.VerifiableProperties - Verifying properties > > INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes > is overridden to 1073741824 > > INFO : kafka.utils.VerifiableProperties - Property group.id is overridden > to > > INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is > overridden to localhost:2181 > > INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - ################# > Input json stream data ################# one test message > > INFO : org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved > output of task 'attempt_201601050824_0001_m_000000_1' to > hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text/_temporary/0/task_201601050824_0001_m_000000 > > INFO : org.apache.spark.mapred.SparkHadoopMapRedUtil - > attempt_201601050824_0001_m_000000_1: Committed > > INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 > (TID 1). 933 bytes result sent to driver > > INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD > at KafkaURLStreaming.java:90) finished in 0.758 s > > INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: > foreachRDD at KafkaURLStreaming.java:90, took 0.888585 s > > INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in > stage 1.0 (TID 1) in 760 ms on localhost (1/1) > > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, > whose tasks have all completed, from pool > > &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 2 > > > > *But if I comment the saveAsText then I am getting correct count as one > for each input.* > > > > INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2227) called > with curMem=9937, maxMem=1893865881 > > INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 > stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB) > > INFO : org.apache.spark.storage.BlockManagerInfo - Added > broadcast_1_piece0 in memory on localhost:58397 (size: 2.2 KB, free: 1806.1 > MB) > > INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast > at DAGScheduler.scala:861 > > INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing > tasks from ResultStage 1 (MapPartitionsRDD[3] at map at > KafkaURLStreaming.java:83) > > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 > with 1 tasks > > INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in > stage 1.0 (TID 1, localhost, ANY, 2026 bytes) > > INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 > (TID 1) > > INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, > partition 0 offsets 37 -> 38 > > INFO : kafka.utils.VerifiableProperties - Verifying properties > > INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes > is overridden to 1073741824 > > INFO : kafka.utils.VerifiableProperties - Property group.id is overridden > to > > INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is > overridden to localhost:2181 > > INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - ################# > Input json stream data ################# one test message without saveAs > > INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 > (TID 1). 987 bytes result sent to driver > > INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD > at KafkaURLStreaming.java:90) finished in 0.103 s > > INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: > foreachRDD at KafkaURLStreaming.java:90, took 0.151210 s > > &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1 > > > > > > -----Original Message----- > From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net] > Sent: Tuesday, January 05, 2016 8:21 AM > To: user@spark.apache.org > Subject: Re: Double Counting When Using Accumulators with Spark Streaming > > > > Hi Rachana, > > > > don't you have two messages on the kafka broker ? > > > > Regards > > JB > > > > On 01/05/2016 05:14 PM, Rachana Srivastava wrote: > > > I have a very simple two lines program. I am getting input from Kafka > > > and save the input in a file and counting the input received. My code > > > looks like this, when I run this code I am getting two accumulator > > > count for each input. > > > > > > HashMap<String, String> kafkaParams= *new*HashMap<String, > > > String>();kafkaParams.put("metadata.broker.list", "localhost:9092"); > > > kafkaParams.put("zookeeper.connect", "localhost:2181"); > > > > > > JavaPairInputDStream<String, String> messages= > > > KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*, > > > StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet); > > > > > > *final**Accumulator<Integer> **accum**= > > > **jssc**.sparkContext().accumulator(0);*** > > > > > > JavaDStream<String> lines= messages.map( > > > > > > *new*_Function<Tuple2<String, String>, String>()_ { > > > > > > *public*String call(Tuple2<String, String> tuple2) { *accum.add(1);* > > > *return*tuple2._2(); > > > > > > }}); > > > > > > lines.foreachRDD(*new*_Function<JavaRDD<String>, Void>()_ { > > > > > > *public*Void call(JavaRDD<String> rdd) *throws*Exception { > > > > > > *if*(!rdd.isEmpty() || > > > !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.clo > > > udera:8020/user/cloudera/testDirJan4/test1.text");} > > > > > > System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS > > > "+ *accum.value(*)); *return**null*;} > > > > > > }); > > > > > > jssc.start(); > > > > > > If I comment rdd.saveAsTextFile I get correct count, but with > > > rdd.saveAsTextFile for each input I am getting multiple accumulator > count. > > > > > > Thanks, > > > > > > Rachana > > > > > > > -- > > Jean-Baptiste Onofré > > jbono...@apache.org > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional > commands, e-mail: user-h...@spark.apache.org > > >