I have a very simple two lines program.  I am getting input from Kafka and save 
the input in a file and counting the input received.  My code looks like this, 
when I run this code I am getting two accumulator count for each input.

HashMap<String, String> kafkaParams = new HashMap<String, String>();  
kafkaParams.put("metadata.broker.list",    "localhost:9092");   
kafkaParams.put("zookeeper.connect", "localhost:2181");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( 
jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, 
kafkaParams, topicsSet);
final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0);
JavaDStream<String> lines = messages.map(
new Function<Tuple2<String, String>, String>() {
               public String call(Tuple2<String, String> tuple2) { 
accum.add(1); return tuple2._2();
} });
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ 
rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}
System.out.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS " + 
accum.value()); return null;}
 });
 jssc.start();

If I comment rdd.saveAsTextFile I get correct count, but with 
rdd.saveAsTextFile for each input I am getting multiple accumulator count.

Thanks,

Rachana

Reply via email to