Hey Rachana,

There are two jobs in your codes actually: `rdd.isEmpty` and
`rdd.saveAsTextFile`. Since you don't cache or checkpoint this rdd, it will
execute your map function twice for each record.

You can move "accum.add(1)" to "rdd.saveAsTextFile" like this:

    JavaDStream<String> lines = messages.map(
        new Function<Tuple2<String, String>, String>() {
          public String call(Tuple2<String, String> tuple2) {
            LOG.info("#################  Input json stream data
 ################# " + tuple2._2);
            return tuple2._2();
          }
        });
    lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
      public Void call(JavaRDD<String> rdd) throws Exception {
        if (!rdd.isEmpty() || !rdd.partitions().isEmpty()) {
          rdd.map(new Function<String, String>() {
                public String call(String str) {
                  accum.add(1);
                  return str;
                }

}).saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");
        }
        System.out.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS
" + accum.value());
        return null;
      }
    });



On Tue, Jan 5, 2016 at 8:37 AM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Thanks a lot for your prompt response.  I am pushing one message.
>
>
>
> HashMap<String, String> kafkaParams = *new* HashMap<String, String>();
> kafkaParams.put("metadata.broker.list",    "localhost:9092");
> kafkaParams.put("zookeeper.connect", "localhost:2181");
>
> JavaPairInputDStream<String, String> messages = KafkaUtils.
> *createDirectStream*( jssc, String.*class*, String.*class*, StringDecoder.
> *class*, StringDecoder.*class*, kafkaParams, topicsSet);
>
> *final** Accumulator<Integer> **accum** = **jssc*
> *.sparkContext().accumulator(0);*
>
> JavaDStream<String> lines = messages.map(
>
> *new* *Function<Tuple2<String, String>, String>()* {
>
>                *public* String call(Tuple2<String, String> tuple2) { *LOG*
> .info("#################  Input json stream data  ################# " +
> tuple2._2);*accum**.add(1);* *return* tuple2._2();
>
> } });
>
> lines.foreachRDD(*new* *Function<JavaRDD<String>, Void>()* {
>
> *public* Void call(JavaRDD<String> rdd) *throws* Exception {
>
> *if*(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ rdd.saveAsTextFile(
> "hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}
>
> System.*out*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS " +
> *accum**.value(*)); *return* *null*;}
>
>  });
>
>  jssc.start();
>
>
>
> If I remove this saveAsTextFile I get correct count with this line I am
> getting double counting.
>
>
>
> *Here are the Stack trace with SaveAsText statement Please see double
> counting below:*
>
>
>
> &&&&&&&&&&&&&&&&&&&&&&& BEFORE COUNT OF ACCUMULATOR IS &&&&&&&&&&&&&&& 0
>
> INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at
> KafkaURLStreaming.java:90
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Got job 0 (foreachRDD at
> KafkaURLStreaming.java:90) with 1 output partitions
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage
> 0(foreachRDD at KafkaURLStreaming.java:90)
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage:
> List()
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 0
> (MapPartitionsRDD[1] at map at KafkaURLStreaming.java:83), which has no
> missing parents
>
> INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(3856) called
> with curMem=0, maxMem=1893865881
>
> INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0 stored as
> values in memory (estimated size 3.8 KB, free 1806.1 MB)
>
> INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2225) called
> with curMem=3856, maxMem=1893865881
>
> INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0_piece0
> stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB)
>
> INFO : org.apache.spark.storage.BlockManagerInfo - Added
> broadcast_0_piece0 in memory on localhost:51637 (size: 2.2 KB, free: 1806.1
> MB)
>
> INFO : org.apache.spark.SparkContext - Created broadcast 0 from broadcast
> at DAGScheduler.scala:861
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing
> tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
> KafkaURLStreaming.java:83)
>
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0
> with 1 tasks
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in
> stage 0.0 (TID 0, localhost, ANY, 2026 bytes)
>
> INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0
> (TID 0)
>
> INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11,
> partition 0 offsets 36 -> 37
>
> INFO : kafka.utils.VerifiableProperties - Verifying properties
>
> INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes
> is overridden to 1073741824
>
> INFO : kafka.utils.VerifiableProperties - Property group.id is overridden
> to
>
> INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is
> overridden to localhost:2181
>
> INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #################
> Input json stream data  ################# one test message
>
> INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0
> (TID 0). 972 bytes result sent to driver
>
> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (foreachRDD
> at KafkaURLStreaming.java:90) finished in 0.133 s
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in
> stage 0.0 (TID 0) in 116 ms on localhost (1/1)
>
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0,
> whose tasks have all completed, from pool
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Job 0 finished:
> foreachRDD at KafkaURLStreaming.java:90, took 0.496657 s
>
> INFO : org.apache.spark.ContextCleaner - Cleaned accumulator 2
>
> INFO : org.apache.spark.storage.BlockManagerInfo - Removed
> broadcast_0_piece0 on localhost:51637 in memory (size: 2.2 KB, free: 1806.1
> MB)
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.tip.id
> is deprecated. Instead, use mapreduce.task.id
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.task.id
> is deprecated. Instead, use mapreduce.task.attempt.id
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.job.id
> is deprecated. Instead, use mapreduce.job.id
>
> INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at
> KafkaURLStreaming.java:90
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Got job 1 (foreachRDD at
> KafkaURLStreaming.java:90) with 1 output partitions
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage
> 1(foreachRDD at KafkaURLStreaming.java:90)
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage:
> List()
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 1
> (MapPartitionsRDD[2] at foreachRDD at KafkaURLStreaming.java:90), which has
> no missing parents
>
> INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(97104)
> called with curMem=0, maxMem=1893865881
>
> INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as
> values in memory (estimated size 94.8 KB, free 1806.0 MB)
>
> INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(32204)
> called with curMem=97104, maxMem=1893865881
>
> INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0
> stored as bytes in memory (estimated size 31.4 KB, free 1806.0 MB)
>
> INFO : org.apache.spark.storage.BlockManagerInfo - Added
> broadcast_1_piece0 in memory on localhost:51637 (size: 31.4 KB, free:
> 1806.1 MB)
>
> INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast
> at DAGScheduler.scala:861
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing
> tasks from ResultStage 1 (MapPartitionsRDD[2] at foreachRDD at
> KafkaURLStreaming.java:90)
>
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0
> with 1 tasks
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in
> stage 1.0 (TID 1, localhost, ANY, 2026 bytes)
>
> INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0
> (TID 1)
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.output.dir is deprecated. Instead, use
> mapreduce.output.fileoutputformat.outputdir
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.output.key.class is deprecated. Instead, use
> mapreduce.job.output.key.class
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.output.value.class is deprecated. Instead, use
> mapreduce.job.output.value.class
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
>
> INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11,
> partition 0 offsets 36 -> 37
>
> INFO : kafka.utils.VerifiableProperties - Verifying properties
>
> INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes
> is overridden to 1073741824
>
> INFO : kafka.utils.VerifiableProperties - Property group.id is overridden
> to
>
> INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is
> overridden to localhost:2181
>
> INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #################
> Input json stream data  ################# one test message
>
> INFO : org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved
> output of task 'attempt_201601050824_0001_m_000000_1' to
> hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text/_temporary/0/task_201601050824_0001_m_000000
>
> INFO : org.apache.spark.mapred.SparkHadoopMapRedUtil -
> attempt_201601050824_0001_m_000000_1: Committed
>
> INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0
> (TID 1). 933 bytes result sent to driver
>
> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD
> at KafkaURLStreaming.java:90) finished in 0.758 s
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished:
> foreachRDD at KafkaURLStreaming.java:90, took 0.888585 s
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in
> stage 1.0 (TID 1) in 760 ms on localhost (1/1)
>
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0,
> whose tasks have all completed, from pool
>
>  &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 2
>
>
>
> *But if I comment the saveAsText then I am getting correct count as one
> for each input.*
>
>
>
> INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2227) called
> with curMem=9937, maxMem=1893865881
>
> INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0
> stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB)
>
> INFO : org.apache.spark.storage.BlockManagerInfo - Added
> broadcast_1_piece0 in memory on localhost:58397 (size: 2.2 KB, free: 1806.1
> MB)
>
> INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast
> at DAGScheduler.scala:861
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing
> tasks from ResultStage 1 (MapPartitionsRDD[3] at map at
> KafkaURLStreaming.java:83)
>
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0
> with 1 tasks
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in
> stage 1.0 (TID 1, localhost, ANY, 2026 bytes)
>
> INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0
> (TID 1)
>
> INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11,
> partition 0 offsets 37 -> 38
>
> INFO : kafka.utils.VerifiableProperties - Verifying properties
>
> INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes
> is overridden to 1073741824
>
> INFO : kafka.utils.VerifiableProperties - Property group.id is overridden
> to
>
> INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is
> overridden to localhost:2181
>
> INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #################
> Input json stream data  ################# one test message without saveAs
>
> INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0
> (TID 1). 987 bytes result sent to driver
>
> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD
> at KafkaURLStreaming.java:90) finished in 0.103 s
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished:
> foreachRDD at KafkaURLStreaming.java:90, took 0.151210 s
>
> &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1
>
>
>
>
>
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
> Sent: Tuesday, January 05, 2016 8:21 AM
> To: user@spark.apache.org
> Subject: Re: Double Counting When Using Accumulators with Spark Streaming
>
>
>
> Hi Rachana,
>
>
>
> don't you have two messages on the kafka broker ?
>
>
>
> Regards
>
> JB
>
>
>
> On 01/05/2016 05:14 PM, Rachana Srivastava wrote:
>
> > I have a very simple two lines program.  I am getting input from Kafka
>
> > and save the input in a file and counting the input received.  My code
>
> > looks like this, when I run this code I am getting two accumulator
>
> > count for each input.
>
> >
>
> > HashMap<String, String> kafkaParams= *new*HashMap<String,
>
> > String>();kafkaParams.put("metadata.broker.list", "localhost:9092");
>
> > kafkaParams.put("zookeeper.connect", "localhost:2181");
>
> >
>
> > JavaPairInputDStream<String, String> messages=
>
> > KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*,
>
> > StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);
>
> >
>
> > *final**Accumulator<Integer> **accum**=
>
> > **jssc**.sparkContext().accumulator(0);***
>
> >
>
> > JavaDStream<String> lines= messages.map(
>
> >
>
> > *new*_Function<Tuple2<String, String>, String>()_ {
>
> >
>
> > *public*String call(Tuple2<String, String> tuple2) { *accum.add(1);*
>
> > *return*tuple2._2();
>
> >
>
> > }});
>
> >
>
> > lines.foreachRDD(*new*_Function<JavaRDD<String>, Void>()_ {
>
> >
>
> > *public*Void call(JavaRDD<String> rdd) *throws*Exception {
>
> >
>
> > *if*(!rdd.isEmpty() ||
>
> > !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.clo
>
> > udera:8020/user/cloudera/testDirJan4/test1.text");}
>
> >
>
> > System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS
>
> > "+ *accum.value(*)); *return**null*;}
>
> >
>
> >   });
>
> >
>
> > jssc.start();
>
> >
>
> > If I comment rdd.saveAsTextFile I get correct count, but with
>
> > rdd.saveAsTextFile for each input I am getting multiple accumulator
> count.
>
> >
>
> > Thanks,
>
> >
>
> > Rachana
>
> >
>
>
>
> --
>
> Jean-Baptiste Onofré
>
> jbono...@apache.org
>
> http://blog.nanthrax.net
>
> Talend - http://www.talend.com
>
>
>
> ---------------------------------------------------------------------
>
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
>

Reply via email to