RE: Double Counting When Using Accumulators with Spark Streaming

2016-01-05 Thread Rachana Srivastava
rg.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored 
as bytes in memory (estimated size 2.2 KB, free 1806.1 MB)
INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in 
memory on localhost:58397 (size: 2.2 KB, free: 1806.1 MB)
INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast at 
DAGScheduler.scala:861
INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks 
from ResultStage 1 (MapPartitionsRDD[3] at map at KafkaURLStreaming.java:83)
INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 
1 tasks
INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 
1.0 (TID 1, localhost, ANY, 2026 bytes)
INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 
1)
INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, 
partition 0 offsets 37 -> 38
INFO : kafka.utils.VerifiableProperties - Verifying properties
INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is 
overridden to 1073741824
INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to
INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is 
overridden to localhost:2181
INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #  
Input json stream data  # one test message without saveAs
INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 
1). 987 bytes result sent to driver
INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD at 
KafkaURLStreaming.java:90) finished in 0.103 s
INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: foreachRDD at 
KafkaURLStreaming.java:90, took 0.151210 s

&&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1





-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Tuesday, January 05, 2016 8:21 AM
To: user@spark.apache.org
Subject: Re: Double Counting When Using Accumulators with Spark Streaming



Hi Rachana,



don't you have two messages on the kafka broker ?



Regards

JB



On 01/05/2016 05:14 PM, Rachana Srivastava wrote:

> I have a very simple two lines program.  I am getting input from Kafka

> and save the input in a file and counting the input received.  My code

> looks like this, when I run this code I am getting two accumulator

> count for each input.

>

> HashMap<String, String> kafkaParams= *new*HashMap<String,

> String>();kafkaParams.put("metadata.broker.list", "localhost:9092");

> kafkaParams.put("zookeeper.connect", "localhost:2181");

>

> JavaPairInputDStream<String, String> messages=

> KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*,

> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);

>

> *final**Accumulator **accum**=

> **jssc**.sparkContext().accumulator(0);***

>

> JavaDStream lines= messages.map(

>

> *new*_Function<Tuple2<String, String>, String>()_ {

>

> *public*String call(Tuple2<String, String> tuple2) { *accum.add(1);*

> *return*tuple2._2();

>

> }});

>

> lines.foreachRDD(*new*_Function<JavaRDD, Void>()_ {

>

> *public*Void call(JavaRDD rdd) *throws*Exception {

>

> *if*(!rdd.isEmpty() ||

> !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.clo

> udera:8020/user/cloudera/testDirJan4/test1.text");}

>

> System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS

> "+ *accum.value(*)); *return**null*;}

>

>   });

>

> jssc.start();

>

> If I comment rdd.saveAsTextFile I get correct count, but with

> rdd.saveAsTextFile for each input I am getting multiple accumulator count.

>

> Thanks,

>

> Rachana

>



--

Jean-Baptiste Onofré

jbono...@apache.org<mailto:jbono...@apache.org>

http://blog.nanthrax.net

Talend - http://www.talend.com



-

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For 
additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>




Re: Double Counting When Using Accumulators with Spark Streaming

2016-01-05 Thread Jean-Baptiste Onofré

Hi Rachana,

don't you have two messages on the kafka broker ?

Regards
JB

On 01/05/2016 05:14 PM, Rachana Srivastava wrote:

I have a very simple two lines program.  I am getting input from Kafka
and save the input in a file and counting the input received.  My code
looks like this, when I run this code I am getting two accumulator count
for each input.

HashMap kafkaParams= *new*HashMap();kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("zookeeper.connect", "localhost:2181");

JavaPairInputDStream messages=
KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*,
StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);

*final**Accumulator **accum**=
**jssc**.sparkContext().accumulator(0);***

JavaDStream lines= messages.map(

*new*_Function, String>()_ {

*public*String call(Tuple2 tuple2) { *accum.add(1);*
*return*tuple2._2();

}});

lines.foreachRDD(*new*_Function()_ {

*public*Void call(JavaRDD rdd) *throws*Exception {

*if*(!rdd.isEmpty() ||
!rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}

System.*/out/*.println(" & COUNT OF ACCUMULATOR IS
"+ *accum.value(*)); *return**null*;}

  });

jssc.start();

If I comment rdd.saveAsTextFile I get correct count, but with
rdd.saveAsTextFile for each input I am getting multiple accumulator count.

Thanks,

Rachana



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Double Counting When Using Accumulators with Spark Streaming

2016-01-05 Thread Rachana Srivastava
I have a very simple two lines program.  I am getting input from Kafka and save 
the input in a file and counting the input received.  My code looks like this, 
when I run this code I am getting two accumulator count for each input.

HashMap kafkaParams = new HashMap();  
kafkaParams.put("metadata.broker.list","localhost:9092");   
kafkaParams.put("zookeeper.connect", "localhost:2181");
JavaPairInputDStream messages = KafkaUtils.createDirectStream( 
jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, 
kafkaParams, topicsSet);
final Accumulator accum = jssc.sparkContext().accumulator(0);
JavaDStream lines = messages.map(
new Function, String>() {
   public String call(Tuple2 tuple2) { 
accum.add(1); return tuple2._2();
} });
lines.foreachRDD(new Function() {
public Void call(JavaRDD rdd) throws Exception {
if(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ 
rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}
System.out.println(" & COUNT OF ACCUMULATOR IS " + 
accum.value()); return null;}
 });
 jssc.start();

If I comment rdd.saveAsTextFile I get correct count, but with 
rdd.saveAsTextFile for each input I am getting multiple accumulator count.

Thanks,

Rachana


Re: Double Counting When Using Accumulators with Spark Streaming

2016-01-05 Thread Shixiong(Ryan) Zhu
tput.dir is deprecated. Instead, use
> mapreduce.output.fileoutputformat.outputdir
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.output.key.class is deprecated. Instead, use
> mapreduce.job.output.key.class
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.output.value.class is deprecated. Instead, use
> mapreduce.job.output.value.class
>
> INFO : org.apache.hadoop.conf.Configuration.deprecation -
> mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
>
> INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11,
> partition 0 offsets 36 -> 37
>
> INFO : kafka.utils.VerifiableProperties - Verifying properties
>
> INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes
> is overridden to 1073741824
>
> INFO : kafka.utils.VerifiableProperties - Property group.id is overridden
> to
>
> INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is
> overridden to localhost:2181
>
> INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #
> Input json stream data  # one test message
>
> INFO : org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved
> output of task 'attempt_201601050824_0001_m_00_1' to
> hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text/_temporary/0/task_201601050824_0001_m_00
>
> INFO : org.apache.spark.mapred.SparkHadoopMapRedUtil -
> attempt_201601050824_0001_m_00_1: Committed
>
> INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0
> (TID 1). 933 bytes result sent to driver
>
> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD
> at KafkaURLStreaming.java:90) finished in 0.758 s
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished:
> foreachRDD at KafkaURLStreaming.java:90, took 0.888585 s
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in
> stage 1.0 (TID 1) in 760 ms on localhost (1/1)
>
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0,
> whose tasks have all completed, from pool
>
>  &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 2
>
>
>
> *But if I comment the saveAsText then I am getting correct count as one
> for each input.*
>
>
>
> INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2227) called
> with curMem=9937, maxMem=1893865881
>
> INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0
> stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB)
>
> INFO : org.apache.spark.storage.BlockManagerInfo - Added
> broadcast_1_piece0 in memory on localhost:58397 (size: 2.2 KB, free: 1806.1
> MB)
>
> INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast
> at DAGScheduler.scala:861
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing
> tasks from ResultStage 1 (MapPartitionsRDD[3] at map at
> KafkaURLStreaming.java:83)
>
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0
> with 1 tasks
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in
> stage 1.0 (TID 1, localhost, ANY, 2026 bytes)
>
> INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0
> (TID 1)
>
> INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11,
> partition 0 offsets 37 -> 38
>
> INFO : kafka.utils.VerifiableProperties - Verifying properties
>
> INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes
> is overridden to 1073741824
>
> INFO : kafka.utils.VerifiableProperties - Property group.id is overridden
> to
>
> INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is
> overridden to localhost:2181
>
> INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #
> Input json stream data  # one test message without saveAs
>
> INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0
> (TID 1). 987 bytes result sent to driver
>
> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD
> at KafkaURLStreaming.java:90) finished in 0.103 s
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished:
> foreachRDD at KafkaURLStreaming.java:90, took 0.151210 s
>
> &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1
>
>
>
>
>
> -Original Message-
> From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
> Sent: Tuesday, Janu