I have a very simple two lines program. I am getting input from Kafka and save the input in a file and counting the input received. My code looks like this, when I run this code I am getting two accumulator count for each input.
HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", "localhost:9092"); kafkaParams.put("zookeeper.connect", "localhost:2181"); JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0); JavaDStream<String> lines = messages.map( new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { accum.add(1); return tuple2._2(); } }); lines.foreachRDD(new Function<JavaRDD<String>, Void>() { public Void call(JavaRDD<String> rdd) throws Exception { if(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");} System.out.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS " + accum.value()); return null;} }); jssc.start(); If I comment rdd.saveAsTextFile I get correct count, but with rdd.saveAsTextFile for each input I am getting multiple accumulator count. Thanks, Rachana