Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Hi Folks,

I am seeing some strange behavior when using the Spark Kafka connector in
Spark streaming.

I have a Kafka topic which has 8 partitions. I have a kafka producer that
pumps some messages into this topic.

On the consumer side I have a spark streaming application that that has 8
executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka
group id connected to the 8 partitions I have for the topic. Also the kafka
consumer property auto.offset.reset is set to smallest.


Now here is the sequence of steps -

(1) I Start the the spark streaming app.
(2) Start the producer.

As this point I see the messages that are being pumped from the producer in
Spark Streaming.  Then I -

(1) Stopped the producer
(2) Wait for all the message to be consumed.
(2) Stopped the spark streaming app.

Now when I restart the spark streaming app (note - the producer is still
down and no messages are being pumped into the topic) - I observe the
following -

(1) Spark Streaming starts reading from each partition right from the
beginning.


This is not what I was expecting. I was expecting the consumers started by
spark streaming to start from where it left off

Is my assumption not correct that the consumers (the kafka/spark
connector) to start reading from the topic where it last left off...?

Has anyone else seen this behavior? Is there a way to make it such that it
starts from where it left off?

Regards,
- Abraham


Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.id?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote:

 Hi Folks,
 
 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming. 
 
 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.
 
 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.
 
 
 Now here is the sequence of steps - 
 
 (1) I Start the the spark streaming app.
 (2) Start the producer.
 
 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I - 
 
 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.
 
 Now when I restart the spark streaming app (note - the producer is still down 
 and no messages are being pumped into the topic) - I observe the following - 
 
 (1) Spark Streaming starts reading from each partition right from the 
 beginning.
 
 
 This is not what I was expecting. I was expecting the consumers started by 
 spark streaming to start from where it left off
 
 Is my assumption not correct that the consumers (the kafka/spark connector) 
 to start reading from the topic where it last left off...?
 
 Has anyone else seen this behavior? Is there a way to make it such that it 
 starts from where it left off?
 
 Regards,
 - Abraham


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Sure... I do set the group.id for all the consumers to be the same. Here is
the code ---

SparkConf sparkConf = new
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
 MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.ms, 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.ms, 3000);
 MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
 ListJavaPairDStreambyte[], String kafkaStreams = new
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.com
wrote:

 Would you mind sharing the code leading to your createStream?  Are you
 also setting group.id?

 Thanks,

 Sean


 On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote:

  Hi Folks,
 
  I am seeing some strange behavior when using the Spark Kafka connector
 in Spark streaming.
 
  I have a Kafka topic which has 8 partitions. I have a kafka producer
 that pumps some messages into this topic.
 
  On the consumer side I have a spark streaming application that that has
 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
 kafka group id connected to the 8 partitions I have for the topic. Also the
 kafka consumer property auto.offset.reset is set to smallest.
 
 
  Now here is the sequence of steps -
 
  (1) I Start the the spark streaming app.
  (2) Start the producer.
 
  As this point I see the messages that are being pumped from the producer
 in Spark Streaming.  Then I -
 
  (1) Stopped the producer
  (2) Wait for all the message to be consumed.
  (2) Stopped the spark streaming app.
 
  Now when I restart the spark streaming app (note - the producer is still
 down and no messages are being pumped into the topic) - I observe the
 following -
 
  (1) Spark Streaming starts reading from each partition right from the
 beginning.
 
 
  This is not what I was expecting. I was expecting the consumers started
 by spark streaming to start from where it left off
 
  Is my assumption not correct that the consumers (the kafka/spark
 connector) to start reading from the topic where it last left off...?
 
  Has anyone else seen this behavior? Is there a way to make it such that
 it starts from where it left off?
 
  Regards,
  - Abraham




-- 
~


Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
How long do you let the consumers run for?  Is it less than 60 seconds by 
chance?  auto.commit.interval.ms defaults to 6 (60 seconds).  If so that 
may explain why you are seeing that behavior.

Cheers,

Sean


On Oct 10, 2014, at 4:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

Sure... I do set the group.idhttp://group.id/ for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id/, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id/?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

 Hi Folks,

 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming.

 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.

 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.


 Now here is the sequence of steps -

 (1) I Start the the spark streaming app.
 (2) Start the producer.

 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I -

 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.

 Now when I restart the spark streaming app (note - the producer is still down 
 and no messages are being pumped into the topic) - I observe the following -

 (1) Spark Streaming starts reading from each partition right from the 
 beginning.


 This is not what I was expecting. I was expecting the consumers started by 
 spark streaming to start from where it left off

 Is my assumption not correct that the consumers (the kafka/spark connector) 
 to start reading from the topic where it last left off...?

 Has anyone else seen this behavior? Is there a way to make it such that it 
 starts from where it left off?

 Regards,
 - Abraham




--
~



RE: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Shao, Saisai
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


·Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

 Hi Folks,

 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming.

 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.

 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.


 Now here is the sequence of steps -

 (1) I Start the the spark streaming app.
 (2) Start the producer.

 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I -

 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.

 Now when I restart the spark streaming app (note

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Thanks Jerry, So, from what I can understand from the code, if I leave out
auto.offset.reset, it should theoretically read from the last commit
point... Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Abraham,



 You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is
 different from original Kafka’s semantics, if you set this configure,
 KafkaReceiver will clean the related immediately, but for Kafka this
 configuration is just a hint which will be effective only when offset is
 out-of-range. So you will always read data from the beginning as you set to
 “smallest”, otherwise if you set to “largest”, you will always get data
 from the end immediately.



 There’s a JIRA and PR to follow this, but still not merged to the master,
 you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492
 ).



 Thanks

 Jerry



 *From:* Abraham Jacob [mailto:abe.jac...@gmail.com]
 *Sent:* Saturday, October 11, 2014 6:57 AM
 *To:* Sean McNamara
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming KafkaUtils Issue



 Probably this is the issue -




 http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/





 ·Spark’s usage of the Kafka consumer parameter auto.offset.reset
 http://kafka.apache.org/documentation.html#consumerconfigs is different
 from Kafka’s semantics. In Kafka, the behavior of setting
 auto.offset.reset to “smallest” is that the consumer will automatically
 reset the offset to the smallest offset when a) there is no existing offset
 stored in ZooKeeper or b) there is an existing offset but it is out of
 range. Spark however will *always* remove existing offsets and then start
 all the way from zero again. This means whenever you restart your
 application with auto.offset.reset = smallest, your application will
 completely re-process all available Kafka data. Doh! See this discussion
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html
 and that discussion http://markmail.org/message/257a5l3oqyftsjxj.



 Hmm interesting... Wondering what happens if I set it as largest...?





 On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.com
 wrote:

  Sure... I do set the group.id for all the consumers to be the same. Here
 is the code ---



 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);

 sparkConf.set(spark.shuffle.manager, SORT);

 sparkConf.set(spark.streaming.unpersist, true);

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
 Duration(1000));

 MapString, String kafkaConf = new HashMapString, String();

 kafkaConf.put(zookeeper.connect, zookeeper);

 kafkaConf.put(group.id, consumerGrp);

 kafkaConf.put(auto.offset.reset, smallest);

 kafkaConf.put(zookeeper.conection.timeout.ms, 1000);

 kafkaConf.put(rebalance.max.retries, 4);

 kafkaConf.put(rebalance.backoff.ms, 3000);

 MapString, Integer topicMap = new HashMapString, Integer();

 topicMap.put(topic, 1);

 ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();

 for(int i = 0; i  numPartitions; i++) {

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,

 DefaultDecoder.class, PayloadDeSerializer.class,

 kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
 PairFunctionTuple2byte[],String, byte[], String() {



 private static final long serialVersionUID = -1936810126415608167L;



 public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
 Exception {

 return tuple2;

 }

 }

 )

 );

 }





 JavaPairDStreambyte[], String unifiedStream;

 if (kafkaStreams.size()  1) {

 unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
 kafkaStreams.size()));

 } else {

 unifiedStream = kafkaStreams.get(0);

 }

 unifiedStream.print();

 jssc.start();

 jssc.awaitTermination();





 -abe





 On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
 sean.mcnam...@webtrends.com wrote:

 Would you mind sharing the code leading to your createStream?  Are you
 also setting group.id?

 Thanks,

 Sean



 On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote:

  Hi Folks,
 
  I am seeing some strange behavior when using the Spark Kafka connector
 in Spark streaming.
 
  I have a Kafka topic which has 8 partitions. I have a kafka producer
 that pumps some messages into this topic.
 
  On the consumer side I have a spark streaming application that that has
 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
 kafka group id connected to the 8 partitions I have for the topic. Also the
 kafka consumer property auto.offset.reset is set to smallest.
 
 
  Now here is the sequence of steps -
 
  (1) I Start the the spark streaming app.
  (2) Start the producer.
 
  As this point I see the messages that are being pumped from

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
This jira and comment sums up the issue:
https://issues.apache.org/jira/browse/SPARK-2492?focusedCommentId=14069708page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14069708

Basically the offset param was renamed and had slightly different semantics 
between kafka 0.7 than 0.8.  Also it was useful because earlier versions of the 
spark streaming receiver could be overwhelmed when having a streaming job down 
for a period of time.

I think this PR quite nicely addresses the issue:
https://github.com/apache/spark/pull/1420


Best,

Sean


On Oct 10, 2014, at 6:48 PM, Abraham Jacob abe.jac...@gmail.com wrote:

Thanks Jerry, So, from what I can understand from the code, if I leave out 
auto.offset.reset, it should theoretically read from the last commit point... 
Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


•Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id/ for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id/, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id/?

Thanks,

Sean


On Oct 10, 2014

RE: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Shao, Saisai
Hi abe,

You can see the details in KafkaInputDStream.scala, here is the snippet

// When auto.offset.reset is defined, it is our responsibility to try and 
whack the
// consumer group zk node.
if (kafkaParams.contains(auto.offset.reset)) {
 tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id))
}

KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it 
will clean ZK metadata immediately, so you will always read data from beginning 
(set to “smallest”) and end (set to “largest”) immediately, because the ZK 
metadata is deleted beforehand.

If you do not set this parameter, this code path will not be triggered, so data 
will be read from the last commit point. And if last commit point is not yet 
available, Kafka will move the offset to the end of partition (Kafka is set 
“auto.commit.offset” to “largest” by default).

If you want to keep the same semantics as Kafka, you need to remove the above 
code path manually and recompile the Spark.

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 8:49 AM
To: Shao, Saisai
Cc: user@spark.apache.org; Sean McNamara
Subject: Re: Spark Streaming KafkaUtils Issue

Thanks Jerry, So, from what I can understand from the code, if I leave out 
auto.offset.reset, it should theoretically read from the last commit point... 
Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


•Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2

Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Abraham Jacob
Ah I see... much clearer now...

Because auto.offset.reset will trigger KafkaReciver to delete the ZK
metadata; when the control passes over to Kafka consumer API it will see
that there is no offset available for the partition. This then will trigger
the smallest or largest logic to execute in kafka, depending on what we
set for auto.offset.reset...

Thanks for explaining this clearly! Appreciate your effort.



On Fri, Oct 10, 2014 at 6:08 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi abe,



 You can see the details in KafkaInputDStream.scala, here is the snippet



 // When auto.offset.reset is defined, it is our responsibility to try
 and whack the

 // consumer group zk node.

 if (kafkaParams.contains(auto.offset.reset)) {

  tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id))

 }



 KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set,
 it will clean ZK metadata immediately, so you will always read data from
 beginning (set to “smallest”) and end (set to “largest”) immediately,
 because the ZK metadata is deleted beforehand.



 If you do not set this parameter, this code path will not be triggered, so
 data will be read from the last commit point. And if last commit point is
 not yet available, Kafka will move the offset to the end of partition
 (Kafka is set “auto.commit.offset” to “largest” by default).



 If you want to keep the same semantics as Kafka, you need to remove the
 above code path manually and recompile the Spark.



 Thanks

 Jerry



 *From:* Abraham Jacob [mailto:abe.jac...@gmail.com]
 *Sent:* Saturday, October 11, 2014 8:49 AM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Sean McNamara

 *Subject:* Re: Spark Streaming KafkaUtils Issue



 Thanks Jerry, So, from what I can understand from the code, if I leave out
 auto.offset.reset, it should theoretically read from the last commit
 point... Correct?



 -abe



 On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Abraham,



 You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is
 different from original Kafka’s semantics, if you set this configure,
 KafkaReceiver will clean the related immediately, but for Kafka this
 configuration is just a hint which will be effective only when offset is
 out-of-range. So you will always read data from the beginning as you set to
 “smallest”, otherwise if you set to “largest”, you will always get data
 from the end immediately.



 There’s a JIRA and PR to follow this, but still not merged to the master,
 you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492
 ).



 Thanks

 Jerry



 *From:* Abraham Jacob [mailto:abe.jac...@gmail.com]
 *Sent:* Saturday, October 11, 2014 6:57 AM
 *To:* Sean McNamara
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming KafkaUtils Issue



 Probably this is the issue -




 http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/





 ·Spark’s usage of the Kafka consumer parameter auto.offset.reset
 http://kafka.apache.org/documentation.html#consumerconfigs is different
 from Kafka’s semantics. In Kafka, the behavior of setting
 auto.offset.reset to “smallest” is that the consumer will automatically
 reset the offset to the smallest offset when a) there is no existing offset
 stored in ZooKeeper or b) there is an existing offset but it is out of
 range. Spark however will *always* remove existing offsets and then start
 all the way from zero again. This means whenever you restart your
 application with auto.offset.reset = smallest, your application will
 completely re-process all available Kafka data. Doh! See this discussion
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html
 and that discussion http://markmail.org/message/257a5l3oqyftsjxj.



 Hmm interesting... Wondering what happens if I set it as largest...?





 On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.com
 wrote:

  Sure... I do set the group.id for all the consumers to be the same. Here
 is the code ---



 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);

 sparkConf.set(spark.shuffle.manager, SORT);

 sparkConf.set(spark.streaming.unpersist, true);

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
 Duration(1000));

 MapString, String kafkaConf = new HashMapString, String();

 kafkaConf.put(zookeeper.connect, zookeeper);

 kafkaConf.put(group.id, consumerGrp);

 kafkaConf.put(auto.offset.reset, smallest);

 kafkaConf.put(zookeeper.conection.timeout.ms, 1000);

 kafkaConf.put(rebalance.max.retries, 4);

 kafkaConf.put(rebalance.backoff.ms, 3000);

 MapString, Integer topicMap = new HashMapString, Integer();

 topicMap.put(topic, 1);

 ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();

 for(int i = 0; i