Java Kafka Word Count Issue

2015-02-02 Thread Jadhav Shweta

Hi All,

I am trying to run Kafka Word Count Program.
please find below, the link for the same
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java

I have set spark master to setMaster(local[*])

and I have started Kafka Producer which reads the file.

If my file has already few words
then after running Spark java program I get proper output.

But when i append new words in same file it starts word count again from 1.

If I need to do word count for already present and newly appended words exactly 
what changes I need to make in code for that.

P.S. I am using Spark spark-1.2.0-bin-hadoop2.3

Thanks and regards 
Shweta Jadhav
=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




Re: Java Kafka Word Count Issue

2015-02-02 Thread Jadhav Shweta

Hi Sean,

Kafka Producer is working fine.
This is related to Spark.

How can i configure spark so that it will make sure to remember count from the 
beginning.

If my log.text file has

spark
apache
kafka
spark

My Spark program gives correct output as 

spark 2
apache 1
kafka 1

but when I append spark to my log.text file

Spark program gives output as

spark 1

which should be spark 3.

So how to handle this in Spark code.

Thanks and regards 
Shweta Jadhav



-Sean Owen so...@cloudera.com wrote: -
To: Jadhav Shweta jadhav.shw...@tcs.com
From: Sean Owen so...@cloudera.com
Date: 02/02/2015 04:13PM
Subject: Re: Java Kafka Word Count Issue

This is a question about the Kafka producer right? Not Spark

On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote:

Hi All,

I am trying to run Kafka Word Count Program.
please find below, the link for the same
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java

I have set spark master to setMaster(local[*])

and I have started Kafka Producer which reads the file.

If my file has already few words
then after running Spark java program I get proper output.

But when i append new words in same file it starts word count again from 1.

If I need to do word count for already present and newly appended words exactly 
what changes I need to make in code for that.

P.S. I am using Spark spark-1.2.0-bin-hadoop2.3

Thanks and regards 
Shweta Jadhav
=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you



Re: Java Kafka Word Count Issue

2015-02-02 Thread Jadhav Shweta

Hi,

I added checkpoint directory and now Using updateStateByKey()

import com.google.common.base.Optional;
Function2ListInteger, OptionalInteger, OptionalInteger updateFunction =
  new Function2ListInteger, OptionalInteger, OptionalInteger() {
@Override public OptionalInteger call(ListInteger values, 
OptionalInteger state) {
  Integer newSum = ...  // add the new values with the previous running 
count to get the new count
  return Optional.of(newSum);
}
  };
JavaPairDStreamString, Integer runningCounts = 
pairs.updateStateByKey(updateFunction);

But I didn't get what exactly I should assign in Integer newSum = ...  // add 
the new values with the previous running count to get the new count


Thanks and regards
Shweta Jadhav



-VISHNU SUBRAMANIAN johnfedrickena...@gmail.com wrote: -
To: Jadhav Shweta jadhav.shw...@tcs.com
From: VISHNU SUBRAMANIAN johnfedrickena...@gmail.com
Date: 02/02/2015 04:39PM
Cc: user@spark.apache.org user@spark.apache.org
Subject: Re: Java Kafka Word Count Issue

You can use updateStateByKey() to perform the above operation.

On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta jadhav.shw...@tcs.com wrote:

Hi Sean,

Kafka Producer is working fine.
This is related to Spark.

How can i configure spark so that it will make sure to remember count from the 
beginning.

If my log.text file has

spark
apache
kafka
spark

My Spark program gives correct output as 

spark 2
apache 1
kafka 1

but when I append spark to my log.text file

Spark program gives output as

spark 1

which should be spark 3.

So how to handle this in Spark code.

Thanks and regards 
Shweta Jadhav



-Sean Owen so...@cloudera.com wrote: -
To: Jadhav Shweta jadhav.shw...@tcs.com
From: Sean Owen so...@cloudera.com
Date: 02/02/2015 04:13PM
Subject: Re: Java Kafka Word Count Issue

This is a question about the Kafka producer right? Not Spark

On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote:

Hi All,

I am trying to run Kafka Word Count Program.
please find below, the link for the same
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java

I have set spark master to setMaster(local[*])

and I have started Kafka Producer which reads the file.

If my file has already few words
then after running Spark java program I get proper output.

But when i append new words in same file it starts word count again from 1.

If I need to do word count for already present and newly appended words exactly 
what changes I need to make in code for that.

P.S. I am using Spark spark-1.2.0-bin-hadoop2.3

Thanks and regards 
Shweta Jadhav
=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration

2015-02-17 Thread Jadhav Shweta
Hi,

I am running streaning word count program in Spark Standalone mode cluster, 
having four machines in cluster.

public final class JavaKafkaStreamingWordCount {
private static final Pattern SPACE = Pattern.compile( );
static transient Configuration conf;
private JavaKafkaStreamingWordCount() {
}

public static void main(String[] args) {
if (args.length  4) {
System.err.println(Usage: JavaKafkaWordCount 
zkQuorum group topics numThreads);
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new 
SparkConf().setAppName(JavaKafkaWordCount);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
new Duration(1));


jssc.checkpoint(hdfs://172.17.199.229:8020/spark/wordcountKafkaCheckpoint);

int numThreads = Integer.parseInt(args[3]);
MapString, Integer topicMap = new HashMapString, Integer();
String[] topics = args[2].split(//,);
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

JavaPairReceiverInputDStreamString, String messages =
KafkaUtils.createStream(jssc, args[0], args[1], 
topicMap);

JavaDStreamString lines = messages.map(new 
FunctionTuple2String, String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});

JavaDStreamString words = lines.flatMap(new 
FlatMapFunctionString, String() {
@Override
public IterableString call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});

JavaPairDStreamString, Integer pairs = words.mapToPair(
new PairFunctionString, String, Integer() {
@Override 
public Tuple2String, Integer 
call(String s) {
return new Tuple2String, 
Integer(s, 1);
}
});

Function2ListInteger, OptionalInteger, OptionalInteger 
updateFunction =
new Function2ListInteger, OptionalInteger, 
OptionalInteger() {
@Override public OptionalInteger call(ListInteger 
values, OptionalInteger state) {
Integer newSum = 0;
if(state.isPresent()){
if(values.size()!=0){
newSum = state.get();
for(int temp : values){
newSum += temp;
}
}else{
newSum = state.get();
}
}
else{
if(values.size()!=0){
for(int temp : values){
newSum += 1;
}
}
}

return Optional.of(newSum);
}
};
JavaPairDStreamString, Integer runningCounts = 
pairs.updateStateByKey(updateFunction);
conf = new Configuration();

runningCounts.saveAsNewAPIHadoopFiles(hdfs://172.17.199.229:8020/spark/wordCountOutput/word,
 stream, Text.class, Text.class, (Class? extends 
org.apache.hadoop.mapreduce.OutputFormat?, ?)TextOutputFormat.class,conf);
//jssc.sparkContext().hadoopConfiguration();
jssc.start();
jssc.awaitTermination();
}
}

This is working fine in one node cluster but its giving following error when i 
try to run the same in cluster.

15/02/17 12:57:10 ERROR actor.OneForOneStrategy: 
org.apache.hadoop.conf.Configuration
java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at 

Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError

2015-07-24 Thread Jadhav Shweta
Hi,

I am trying one transformation by calling scala method
this scala method returns MutableList[AvroObject]

def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): 
scala.collection.mutable.MutableList[AvroObject] 

Hence, the output of transaformation is RDD[MutableList[AvroObject]]

But I want o/p as RDD[AvroObject]

I tried applying foreach on RDD[MutableList[AvroObject]] -- RDD[AvroObject]

var uA = sparkContext.accumulableCollection[MutableList[AvroObject], 
universe](MutableList[AvroObject]())
rdd_list_avroObj.foreach(u = {
uA ++= u
})
var uRDD = sparkContext.parallelize(uA.value)

Its failing on large dataset with following error

java.io.IOException: java.lang.StackOverflowError
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140)
at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45)
at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
at 
java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359)
at 
java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at java.util.ArrayList.writeObject(ArrayList.java:742)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

I have two queries regarding this issue:
Option 1: REplacement of accumulator
Option 2: In scala method instead of returning List[AvroObject] can I send 
multiple AvroObject. SO that I'll get RDD[AvroObject]

Note:
I am using Saprk 1.3.0
Input DataSize 200GB
Cluster 3 Machines(2 Cores, 8GB)
Spark running in YARN Mode

Thanks  Regards
Shweta Jadhav

=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError

2015-07-15 Thread Jadhav Shweta
Hi,

I am trying one transformation by calling scala method
this scala method returns MutableList[AvroObject]

def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): 
scala.collection.mutable.MutableList[AvroObject] 

Hence, the output of transaformation is RDD[MutableList[AvroObject]]

But I want o/p as RDD[AvroObject]

I tried applying foreach on RDD[MutableList[AvroObject]] -- RDD[AvroObject]

var uA = sparkContext.accumulableCollection[MutableList[AvroObject], 
universe](MutableList[AvroObject]())
rdd_list_avroObj.foreach(u = {
uA ++= u
})
var uRDD = sparkContext.parallelize(uA.value)

Its failing on large dataset with following error

java.io.IOException: java.lang.StackOverflowError
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140)
at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45)
at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
at 
java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359)
at 
java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at java.util.ArrayList.writeObject(ArrayList.java:742)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

I have two queries regarding this issue:
Option 1: REplacement of accumulator
Option 2: In scala method instead of returning List[AvroObject] can I send 
multiple AvroObject. SO that I'll get RDD[AvroObject]

Note:
I am using Saprk 1.3.0
Input DataSize 200GB
Cluster 3 Machines(2 Cores, 8GB)
Spark running in YARN Mode

Thanks  Regards
Shweta Jadhav
Tata Consultancy Services Limited
Cell:- +91-9867515614
Mailto: jadhav.shw...@tcs.com
Website: http://www.tcs.com

Experience certainty.   IT Services
Business Solutions
Consulting

=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you