Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-24 Thread Dibyendu Bhattacharya
Hi,

I have raised a JIRA ( https://issues.apache.org/jira/browse/SPARK-11045)
to track the discussion but also mailing user group .

This Kafka consumer is around for a while in spark-packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer ) and I see
many started using it , I am now thinking of contributing back to Apache
Spark core project so that it can get better support ,visibility and
adoption.

Few Point about this consumer

*Why this is needed :*

This Consumer is NOT the replacement for existing DirectStream API.
DirectStream solves the problem around "Exactly Once" semantics and "Global
Ordering" of messages . But to achieve this DirectStream comes with an
overhead. The overhead of maintaining the offset externally
, limited parallelism while processing the RDD ( as the RDD partition is
same as Kafka Partition ), and higher latency while processing RDD ( as
messages are fetched when RDD is processed) . There are many who does not
want "Exact Once" and "Global Ordering" of messages, or ordering are
managed in external store ( say HBase),  and want more parallelism and
lower latency in their Streaming channel . At this point Spark does not
have a better fallback option available in terms of Receiver Based API.
Present Receiver Based API use Kafka High Level API which is low
performance and has serious issue. [For this reason Kafka is coming up with
new High Level Consumer API in 0.9]

The Consumer which I implemented is using the Kafka Low Level API which
gives more performance.  This consumer has built in fault tolerant features
for all failures recovery. This Consumer extended the code from Storm Kafka
Spout which is being around for some time and has matured over the years
and has all built in Kafka fault tolerant capabilities. This same Kafka
consumer for spark is being running in various production scenarios
presently and already being adopted by many in the spark community.

*Why Can't we fix existing Receiver based API in Spark* :

This is not possible unless you move to Kafka Low Level API . Or let wait
for Kafka 0.9 where they are re-writing the HighLevel Consumer API and
built another kafka spark consumer for Kafka 0.9 customers .
This approach seems to be not good in my opinion. The Kafka Low Level API
which I used in my consumer ( and also DirectStream uses ) will not going
to be deprecated in near future. So if Kafka Consumer for Spark is using
Low Level API for Receiver based mode, that will make sure all Kafka
Customers who are presently in 0.8.x or who will use 0.9 , benefited form
this same API.

*Concerns around Low Level API Complexity*

Yes, implementing a reliable consumer using Kafka Low Level consumer API is
complex. But same has been done for Strom -Kafka Spout and has been stable
for quite some time. This consumer for Spark is battle tested in various
production loads and gives much better performance than existing Kafka
Consumers for Spark and has better fault tolerant approach than existing
Receiver based mode.

*Why can't this consumer continue to be in Spark-Package ?*

This can be possible. But what I see , many customer who want to fallback
to receiver based mode as they may not need "Exact Once" semantics or
"Global Ordering" , seems to little tentative using a spark-package library
for their critical streaming pipeline. And they are forced to use faulty
and buggy Kafka High Level API based mode. This consumer being part of
Spark project will give much higher adoption and support from community.

*Some Major features around this consumer :*

This consumer is controlling the rate limit by maintaining the constant
Block size where as default rate limiting in other Spark consumers are done
by number of messages. This is an issue when Kafka has messages of
different sizes and there is no deterministic way to know the actual block
sizes and memory utilization if rate control done by number of messages.

This consumer has in-built PID controller which controls the Rate of
consumption again by modifying the block size and consume only that much
amount of messages needed from Kafka . In default Spark consumer , it
fetches chunk of messages and then apply throttle to control the rate.
Which can lead to excess I/O while consuming from Kafka.

You can also refer to Readme file for more details  :
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md

If you are using this consumer or going to use it, you can Vote for this
Jira.

Regards,
Dibyendu


Re: Low Level Kafka Consumer for Spark

2015-01-16 Thread Debasish Das
Hi Dib,

For our usecase I want my spark job1 to read from hdfs/cache and write to
kafka queues. Similarly spark job2 should read from kafka queues and write
to kafka queues.

Is writing to kafka queues from spark job supported in your code ?

Thanks
Deb
 On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote:

 There was a simple example
 https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45
 which you can run after changing few lines of configurations.

 Thanks
 Best Regards

 On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
 any issue with Receiver.Store method . It is able to fetch messages form
 Kafka.

 Can you cross check other configurations in your setup like Kafka broker
 IP , topic name, zk host details, consumer id etc.

 Dib

 On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and
 let you know how this goes.

 By the way, is there any change in Receiver Store method happened in
 Spark 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly,
 but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming
 abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2015-01-16 Thread Dibyendu Bhattacharya
My code handles the Kafka Consumer part. But writing to Kafka may not be a
big challenge which you can easily do in your driver code.

dibyendu

On Sat, Jan 17, 2015 at 9:43 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Hi Dib,

 For our usecase I want my spark job1 to read from hdfs/cache and write to
 kafka queues. Similarly spark job2 should read from kafka queues and write
 to kafka queues.

 Is writing to kafka queues from spark job supported in your code ?

 Thanks
 Deb
  On Jan 15, 2015 11:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote:

 There was a simple example
 https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45
 which you can run after changing few lines of configurations.

 Thanks
 Best Regards

 On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 Just now I tested the Low Level Consumer with Spark 1.2 and I did not
 see any issue with Receiver.Store method . It is able to fetch messages
 form Kafka.

 Can you cross check other configurations in your setup like Kafka broker
 IP , topic name, zk host details, consumer id etc.

 Dib

 On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and
 let you know how this goes.

 By the way, is there any change in Receiver Store method happened in
 Spark 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly,
 but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming
 abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread mykidong
Hi Dibyendu,

I am using kafka 0.8.1.1 and spark 1.2.0.
After modifying these version of your pom, I have rebuilt your codes.
But I have not got any messages from ssc.receiverStream(new
KafkaReceiver(_props, i)).

I have found, in your codes, all the messages are retrieved correctly, but
_receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
class's method does not seem to work correctly.

Have you tried running your spark streaming kafka consumer with kafka
0.8.1.1 and spark 1.2.0 ?

- Kidong.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

No , I have not tried yet with Spark 1.2 yet. I will try this out and let
you know how this goes.

By the way, is there any change in Receiver Store method happened in Spark
1.2 ?



Regards,
Dibyendu



On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly, but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
any issue with Receiver.Store method . It is able to fetch messages form
Kafka.

Can you cross check other configurations in your setup like Kafka broker IP
, topic name, zk host details, consumer id etc.

Dib

On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and let
 you know how this goes.

 By the way, is there any change in Receiver Store method happened in Spark
 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly, but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Akhil Das
There was a simple example
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45
which you can run after changing few lines of configurations.

Thanks
Best Regards

On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
 any issue with Receiver.Store method . It is able to fetch messages form
 Kafka.

 Can you cross check other configurations in your setup like Kafka broker
 IP , topic name, zk host details, consumer id etc.

 Dib

 On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and let
 you know how this goes.

 By the way, is there any change in Receiver Store method happened in
 Spark 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly,
 but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming
 abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Low Level Kafka Consumer for Spark

2014-12-03 Thread Dibyendu Bhattacharya
Hi,

Yes, as Jerry mentioned, the Spark -3129 (
https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
which solves the Driver failure problem. The way 3129 is designed , it
solved the driver failure problem agnostic of the source of the stream (
like Kafka or Flume etc) But with just 3129 you can not achieve complete
solution for data loss. You need a reliable receiver which should also
solves the data loss issue on receiver failure.

The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
for which this email thread was started has solved that problem with Kafka
Low Level API.

And Spark-4062 as Jerry mentioned also recently solved the same problem
using Kafka High Level API.

On the Kafka High Level Consumer API approach , I would like to mention
that Kafka 0.8 has some issue as mentioned in this wiki (
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
where consumer re-balance sometime fails and that is one of the key reason
Kafka is re-writing consumer API in Kafka 0.9.

I know there are few folks already have faced this re-balancing issues
while using Kafka High Level API , and If you ask my opinion, we at Pearson
are still using the Low Level Consumer as this seems to be more robust and
performant and we have been using this for few months without any issue
..and also I may be little biased :)

Regards,
Dibyendu



On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com wrote:

 Hi Rod,

 The purpose of introducing  WAL mechanism in Spark Streaming as a general
 solution is to make all the receivers be benefit from this mechanism.

 Though as you said, external sources like Kafka have their own checkpoint
 mechanism, instead of storing data in WAL, we can only store metadata to
 WAL, and recover from the last committed offsets. But this requires
 sophisticated design of Kafka receiver with low-level API involved, also we
 need to take care of rebalance and fault tolerance things by ourselves. So
 right now instead of implementing a whole new receiver, we choose to
 implement a simple one, though the performance is not so good, it's much
 easier to understand and maintain.

 The design purpose and implementation of reliable Kafka receiver can be
 found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
 future, to improve the reliable Kafka receiver like what you mentioned is
 on our scheduler.

 Thanks
 Jerry


 -Original Message-
 From: RodrigoB [mailto:rodrigo.boav...@aspect.com]
 Sent: Wednesday, December 3, 2014 5:44 AM
 To: u...@spark.incubator.apache.org
 Subject: Re: Low Level Kafka Consumer for Spark

 Dibyendu,

 Just to make sure I will not be misunderstood - My concerns are referring
 to the Spark upcoming solution and not yours. I would to gather the
 perspective of someone which implemented recovery with Kafka a different
 way.

 Tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-12-03 Thread Luis Ángel Vicente Sánchez
My main complain about the WAL mechanism in the new reliable kafka receiver
is that you have to enable checkpointing and for some reason, even if
spark.cleaner.ttl is set to a reasonable value, only the metadata is
cleaned periodically. In my tests, using a folder in my filesystem as the
checkpoint folder, the receivedMetaData folder remains almost constant in
size but the receivedData folder is always increasing; the spark.cleaner.ttl
was configured to 300 seconds.

2014-12-03 10:13 GMT+00:00 Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com:

 Hi,

 Yes, as Jerry mentioned, the Spark -3129 (
 https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
 which solves the Driver failure problem. The way 3129 is designed , it
 solved the driver failure problem agnostic of the source of the stream (
 like Kafka or Flume etc) But with just 3129 you can not achieve complete
 solution for data loss. You need a reliable receiver which should also
 solves the data loss issue on receiver failure.

 The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
 for which this email thread was started has solved that problem with Kafka
 Low Level API.

 And Spark-4062 as Jerry mentioned also recently solved the same problem
 using Kafka High Level API.

 On the Kafka High Level Consumer API approach , I would like to mention
 that Kafka 0.8 has some issue as mentioned in this wiki (
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
 where consumer re-balance sometime fails and that is one of the key reason
 Kafka is re-writing consumer API in Kafka 0.9.

 I know there are few folks already have faced this re-balancing issues
 while using Kafka High Level API , and If you ask my opinion, we at Pearson
 are still using the Low Level Consumer as this seems to be more robust and
 performant and we have been using this for few months without any issue
 ..and also I may be little biased :)

 Regards,
 Dibyendu



 On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi Rod,

 The purpose of introducing  WAL mechanism in Spark Streaming as a general
 solution is to make all the receivers be benefit from this mechanism.

 Though as you said, external sources like Kafka have their own checkpoint
 mechanism, instead of storing data in WAL, we can only store metadata to
 WAL, and recover from the last committed offsets. But this requires
 sophisticated design of Kafka receiver with low-level API involved, also we
 need to take care of rebalance and fault tolerance things by ourselves. So
 right now instead of implementing a whole new receiver, we choose to
 implement a simple one, though the performance is not so good, it's much
 easier to understand and maintain.

 The design purpose and implementation of reliable Kafka receiver can be
 found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
 future, to improve the reliable Kafka receiver like what you mentioned is
 on our scheduler.

 Thanks
 Jerry


 -Original Message-
 From: RodrigoB [mailto:rodrigo.boav...@aspect.com]
 Sent: Wednesday, December 3, 2014 5:44 AM
 To: u...@spark.incubator.apache.org
 Subject: Re: Low Level Kafka Consumer for Spark

 Dibyendu,

 Just to make sure I will not be misunderstood - My concerns are referring
 to the Spark upcoming solution and not yours. I would to gather the
 perspective of someone which implemented recovery with Kafka a different
 way.

 Tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Low Level Kafka Consumer for Spark

2014-12-02 Thread RodrigoB
Hi Dibyendu,What are your thoughts on keeping this solution (or not),
considering that Spark Streaming v1.2 will have built-in recoverability of
the received data?https://issues.apache.org/jira/browse/SPARK-1647I'm
concerned about the complexity of this solution with regards the added
complexity and performance overhead by the writing of big amounts of data
into HDFS on a small batch
interval.https://docs.google.com/document/d/1vTCB5qVfyxQPlHuv8rit9-zjdttlgaSrMgfCDQlCJIM/edit?pli=1#
http://apache-spark-user-list.1001560.n3.nabble.com/file/n20181/spark_streaming_v.png
I think the whole solution is well designed and thought but I'm afraid if it
does really fit all needs with checkpoint based technologies like Flume or
Kafka, by hiding away the management of the offset from the user code. If
instead of saving received data into HDFS, the ReceiverHandler would be
saving some metadata (such as offset in the case of Kafka) specified by the
custom receiver passed into the StreamingContext, then upon driver restart,
that metadata could be used by the custom receiver to recover the point from
which it should start receiving data once more.Anyone's comments will be
greatly appreciated.Tnks,Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20181.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Low Level Kafka Consumer for Spark

2014-12-02 Thread Shao, Saisai
Hi Rod,

The purpose of introducing  WAL mechanism in Spark Streaming as a general 
solution is to make all the receivers be benefit from this mechanism. 

Though as you said, external sources like Kafka have their own checkpoint 
mechanism, instead of storing data in WAL, we can only store metadata to WAL, 
and recover from the last committed offsets. But this requires sophisticated 
design of Kafka receiver with low-level API involved, also we need to take care 
of rebalance and fault tolerance things by ourselves. So right now instead of 
implementing a whole new receiver, we choose to implement a simple one, though 
the performance is not so good, it's much easier to understand and maintain.

The design purpose and implementation of reliable Kafka receiver can be found 
in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to 
improve the reliable Kafka receiver like what you mentioned is on our scheduler.

Thanks
Jerry


-Original Message-
From: RodrigoB [mailto:rodrigo.boav...@aspect.com] 
Sent: Wednesday, December 3, 2014 5:44 AM
To: u...@spark.incubator.apache.org
Subject: Re: Low Level Kafka Consumer for Spark

Dibyendu,

Just to make sure I will not be misunderstood - My concerns are referring to 
the Spark upcoming solution and not yours. I would to gather the perspective of 
someone which implemented recovery with Kafka a different way.

Tnks,
Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Alon Pe'er
Hi Dibyendu,

Thanks for your great work!

I'm new to Spark Streaming, so I just want to make sure I understand Driver
failure issue correctly.

In my use case, I want to make sure that messages coming in from Kafka are
always broken into the same set of RDDs, meaning that if a set of messages
are assigned to one RDD, and the Driver dies before this RDD is processed,
then once the Driver recovers, the same set of messages are assigned to a
single RDD, instead of arbitrarily repartitioning the messages across
different RDDs.

Does your Receiver guarantee this behavior, until the problem is fixed in
Spark 1.2?

Regards,
Alon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Alon,

No this will not be guarantee that same set of messages will come in same
RDD. This fix just re-play the messages from last processed offset in same
order. Again this is just a interim fix we needed to solve our use case .
If you do not need this message re-play feature, just do not perform the
ack ( Acknowledgement) call in the Driver code. Then the processed messages
will not be written to ZK and hence replay will not happen.

Regards,
Dibyendu

On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
wrote:

 Hi Dibyendu,

 Thanks for your great work!

 I'm new to Spark Streaming, so I just want to make sure I understand Driver
 failure issue correctly.

 In my use case, I want to make sure that messages coming in from Kafka are
 always broken into the same set of RDDs, meaning that if a set of messages
 are assigned to one RDD, and the Driver dies before this RDD is processed,
 then once the Driver recovers, the same set of messages are assigned to a
 single RDD, instead of arbitrarily repartitioning the messages across
 different RDDs.

 Does your Receiver guarantee this behavior, until the problem is fixed in
 Spark 1.2?

 Regards,
 Alon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Tim Smith
Hi Dibyendu,

I am a little confused about the need for rate limiting input from
kafka. If the stream coming in from kafka has higher message/second
rate than what a Spark job can process then it should simply build a
backlog in Spark if the RDDs are cached on disk using persist().
Right?

Thanks,

Tim


On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
dibyendu.bhattach...@gmail.com wrote:
 Hi Alon,

 No this will not be guarantee that same set of messages will come in same
 RDD. This fix just re-play the messages from last processed offset in same
 order. Again this is just a interim fix we needed to solve our use case . If
 you do not need this message re-play feature, just do not perform the ack (
 Acknowledgement) call in the Driver code. Then the processed messages will
 not be written to ZK and hence replay will not happen.

 Regards,
 Dibyendu

 On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
 wrote:

 Hi Dibyendu,

 Thanks for your great work!

 I'm new to Spark Streaming, so I just want to make sure I understand
 Driver
 failure issue correctly.

 In my use case, I want to make sure that messages coming in from Kafka are
 always broken into the same set of RDDs, meaning that if a set of messages
 are assigned to one RDD, and the Driver dies before this RDD is processed,
 then once the Driver recovers, the same set of messages are assigned to a
 single RDD, instead of arbitrarily repartitioning the messages across
 different RDDs.

 Does your Receiver guarantee this behavior, until the problem is fixed in
 Spark 1.2?

 Regards,
 Alon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Tim,

I have not tried persist the RDD.

Here are some discussion on Rate Limiting Spark Streaming is there in this
thread.

http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html

There is a Pull Request https://github.com/apache/spark/pull/945/files to
fix this Rate Limiting issue at BlockGenerator level.

But while testing with heavy load, this fix did not solve my problem. So I
had to have Rate Limiting built into Kafka Consumer. I will make it
configurable soon.

If this is not done, I can see Block are getting dropped which leads to Job
failure.

I have raised this in another thread ..

https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239.
But have not got any answer yet if this is a bug ( Block getting dropped
and Job failed).



Dib


On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith secs...@gmail.com wrote:

 Hi Dibyendu,

 I am a little confused about the need for rate limiting input from
 kafka. If the stream coming in from kafka has higher message/second
 rate than what a Spark job can process then it should simply build a
 backlog in Spark if the RDDs are cached on disk using persist().
 Right?

 Thanks,

 Tim


 On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  Hi Alon,
 
  No this will not be guarantee that same set of messages will come in same
  RDD. This fix just re-play the messages from last processed offset in
 same
  order. Again this is just a interim fix we needed to solve our use case
 . If
  you do not need this message re-play feature, just do not perform the
 ack (
  Acknowledgement) call in the Driver code. Then the processed messages
 will
  not be written to ZK and hence replay will not happen.
 
  Regards,
  Dibyendu
 
  On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er alo...@supersonicads.com
  wrote:
 
  Hi Dibyendu,
 
  Thanks for your great work!
 
  I'm new to Spark Streaming, so I just want to make sure I understand
  Driver
  failure issue correctly.
 
  In my use case, I want to make sure that messages coming in from Kafka
 are
  always broken into the same set of RDDs, meaning that if a set of
 messages
  are assigned to one RDD, and the Driver dies before this RDD is
 processed,
  then once the Driver recovers, the same set of messages are assigned to
 a
  single RDD, instead of arbitrarily repartitioning the messages across
  different RDDs.
 
  Does your Receiver guarantee this behavior, until the problem is fixed
 in
  Spark 1.2?
 
  Regards,
  Alon
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-09-10 Thread Dibyendu Bhattacharya
. Repartitioning: I am trying to understand the repartition issue. One
 common mistake I have seen is that developers repartition a stream but not
 use the repartitioned stream.

 WRONG:
 inputDstream.repartition(100)
 inputDstream.map(...).count().print()

 RIGHT:
 val repartitionedDStream = inputDStream.repartitoin(100)
 repartitionedDStream.map(...).count().print()

 Not sure if this helps solve the problem that you all the facing. I am
 going to add this to the stremaing programming guide to make sure this
 common mistake is avoided.

 TD




 On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Sorry for little delay . As discussed in this thread, I have modified
 the Kafka-Spark-Consumer (
 https://github.com/dibbhatt/kafka-spark-consumer) code to have
 dedicated Receiver for every Topic Partition. You can see the example howto
 create Union of these receivers in consumer.kafka.client.Consumer.java .

 Thanks to Chris for suggesting this change.

 Regards,
 Dibyendu


 On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com
 wrote:

 Just a comment on the recovery part.

 Is it correct to say that currently Spark Streaming recovery design
 does not
 consider re-computations (upon metadata lineage recovery) that depend on
 blocks of data of the received stream?
 https://issues.apache.org/jira/browse/SPARK-1647

 Just to illustrate a real use case (mine):
 - We have object states which have a Duration field per state which is
 incremented on every batch interval. Also this object state is reset to
 0
 upon incoming state changing events. Let's supposed there is at least
 one
 event since the last data checkpoint. This will lead to inconsistency
 upon
 driver recovery: The Duration field will get incremented from the data
 checkpoint version until the recovery moment, but the state change event
 will never be re-processed...so in the end we have the old state with
 the
 wrong Duration value.
 To make things worst, let's imagine we're dumping the Duration increases
 somewhere...which means we're spreading the problem across our system.
 Re-computation awareness is something I've commented on another thread
 and
 rather treat it separately.

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

 Re-computations do occur, but the only RDD's that are recovered are the
 ones
 from the data checkpoint. This is what we've seen. Is not enough by
 itself
 to ensure recovery of computed data and this partial recovery leads to
 inconsistency in some cases.

 Roger - I share the same question with you - I'm just not sure if the
 replicated data really gets persisted on every batch. The execution
 lineage
 is checkpointed, but if we have big chunks of data being consumed to
 Receiver node on let's say a second bases then having it persisted to
 HDFS
 every second could be a big challenge for keeping JVM performance -
 maybe
 that could be reason why it's not really implemented...assuming it
 isn't.

 Dibyendu had a great effort with the offset controlling code but the
 general
 state consistent recovery feels to me like another big issue to address.

 I plan on having a dive into the Streaming code and try to at least
 contribute with some ideas. Some more insight from anyone on the dev
 team
 will be very appreciated.

 tnks,
 Rod




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2014-09-08 Thread Tim Smith
-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Low Level Kafka Consumer for Spark

2014-09-07 Thread Dibyendu Bhattacharya
-1647

 Just to illustrate a real use case (mine):
 - We have object states which have a Duration field per state which is
 incremented on every batch interval. Also this object state is reset to 0
 upon incoming state changing events. Let's supposed there is at least one
 event since the last data checkpoint. This will lead to inconsistency
 upon
 driver recovery: The Duration field will get incremented from the data
 checkpoint version until the recovery moment, but the state change event
 will never be re-processed...so in the end we have the old state with the
 wrong Duration value.
 To make things worst, let's imagine we're dumping the Duration increases
 somewhere...which means we're spreading the problem across our system.
 Re-computation awareness is something I've commented on another thread
 and
 rather treat it separately.

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

 Re-computations do occur, but the only RDD's that are recovered are the
 ones
 from the data checkpoint. This is what we've seen. Is not enough by
 itself
 to ensure recovery of computed data and this partial recovery leads to
 inconsistency in some cases.

 Roger - I share the same question with you - I'm just not sure if the
 replicated data really gets persisted on every batch. The execution
 lineage
 is checkpointed, but if we have big chunks of data being consumed to
 Receiver node on let's say a second bases then having it persisted to
 HDFS
 every second could be a big challenge for keeping JVM performance - maybe
 that could be reason why it's not really implemented...assuming it isn't.

 Dibyendu had a great effort with the offset controlling code but the
 general
 state consistent recovery feels to me like another big issue to address.

 I plan on having a dive into the Streaming code and try to at least
 contribute with some ideas. Some more insight from anyone on the dev team
 will be very appreciated.

 tnks,
 Rod




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Low Level Kafka Consumer for Spark

2014-09-03 Thread Dibyendu Bhattacharya
Hi,

Sorry for little delay . As discussed in this thread, I have modified the
Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
code to have dedicated Receiver for every Topic Partition. You can see the
example howto create Union of these receivers
in consumer.kafka.client.Consumer.java .

Thanks to Chris for suggesting this change.

Regards,
Dibyendu


On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Just a comment on the recovery part.

 Is it correct to say that currently Spark Streaming recovery design does
 not
 consider re-computations (upon metadata lineage recovery) that depend on
 blocks of data of the received stream?
 https://issues.apache.org/jira/browse/SPARK-1647

 Just to illustrate a real use case (mine):
 - We have object states which have a Duration field per state which is
 incremented on every batch interval. Also this object state is reset to 0
 upon incoming state changing events. Let's supposed there is at least one
 event since the last data checkpoint. This will lead to inconsistency upon
 driver recovery: The Duration field will get incremented from the data
 checkpoint version until the recovery moment, but the state change event
 will never be re-processed...so in the end we have the old state with the
 wrong Duration value.
 To make things worst, let's imagine we're dumping the Duration increases
 somewhere...which means we're spreading the problem across our system.
 Re-computation awareness is something I've commented on another thread and
 rather treat it separately.

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

 Re-computations do occur, but the only RDD's that are recovered are the
 ones
 from the data checkpoint. This is what we've seen. Is not enough by itself
 to ensure recovery of computed data and this partial recovery leads to
 inconsistency in some cases.

 Roger - I share the same question with you - I'm just not sure if the
 replicated data really gets persisted on every batch. The execution lineage
 is checkpointed, but if we have big chunks of data being consumed to
 Receiver node on let's say a second bases then having it persisted to HDFS
 every second could be a big challenge for keeping JVM performance - maybe
 that could be reason why it's not really implemented...assuming it isn't.

 Dibyendu had a great effort with the offset controlling code but the
 general
 state consistent recovery feels to me like another big issue to address.

 I plan on having a dive into the Streaming code and try to at least
 contribute with some ideas. Some more insight from anyone on the dev team
 will be very appreciated.

 tnks,
 Rod




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-31 Thread RodrigoB
Just a comment on the recovery part. 

Is it correct to say that currently Spark Streaming recovery design does not
consider re-computations (upon metadata lineage recovery) that depend on
blocks of data of the received stream?
https://issues.apache.org/jira/browse/SPARK-1647

Just to illustrate a real use case (mine): 
- We have object states which have a Duration field per state which is
incremented on every batch interval. Also this object state is reset to 0
upon incoming state changing events. Let's supposed there is at least one
event since the last data checkpoint. This will lead to inconsistency upon
driver recovery: The Duration field will get incremented from the data
checkpoint version until the recovery moment, but the state change event
will never be re-processed...so in the end we have the old state with the
wrong Duration value.
To make things worst, let's imagine we're dumping the Duration increases
somewhere...which means we're spreading the problem across our system.
Re-computation awareness is something I've commented on another thread and
rather treat it separately.
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205

Re-computations do occur, but the only RDD's that are recovered are the ones
from the data checkpoint. This is what we've seen. Is not enough by itself
to ensure recovery of computed data and this partial recovery leads to
inconsistency in some cases. 

Roger - I share the same question with you - I'm just not sure if the
replicated data really gets persisted on every batch. The execution lineage
is checkpointed, but if we have big chunks of data being consumed to
Receiver node on let's say a second bases then having it persisted to HDFS
every second could be a big challenge for keeping JVM performance - maybe
that could be reason why it's not really implemented...assuming it isn't.

Dibyendu had a great effort with the offset controlling code but the general
state consistent recovery feels to me like another big issue to address.

I plan on having a dive into the Streaming code and try to at least
contribute with some ideas. Some more insight from anyone on the dev team
will be very appreciated.

tnks,
Rod 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Sean Owen
I'm no expert. But as I understand, yes you create multiple streams to
consume multiple partitions in parallel. If they're all in the same
Kafka consumer group, you'll get exactly one copy of the message so
yes if you have 10 consumers and 3 Kafka partitions I believe only 3
will be getting messages.

The parallelism of Spark's processing of the RDDs of those messages is
different. There could be 4 partitions in your RDDs doing the work.
This is the kind of thing you potentially influence with repartition.
That is I believe you can get more tasks processing the messages even
if you are only able to consume messages from the queue with 3-way
parallelism, since the queue has 3 partitions.

On Aug 30, 2014 12:56 AM, Tim Smith secs...@gmail.com wrote:

 Ok, so I did this:
 val kInStreams = (1 to 10).map{_ = 
 KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
  - 1)) }
 val kInMsg = ssc.union(kInStreams)
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))

 This has improved parallelism. Earlier I would only get a Stream 0. Now I 
 have Streams [0-9]. Of course, since the kafka topic has only three 
 partitions, only three of those streams are active but I am seeing more 
 blocks being pulled across the three streams total that what one was doing 
 earlier. Also, four nodes are actively processing tasks (vs only two earlier) 
 now which actually has me confused. If Streams are active only on 3 nodes 
 then how/why did a 4th node get work? If a 4th got work why aren't more nodes 
 getting work?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Roger Hoover
 should be
 able
 to access it once more and not having to pool it back again from Kafka
 or
 any other stream that is being affected by this issue. If for example
 there
 is a big amount of batches to be recomputed I would rather have them
 done
 distributed than overloading the batch interval with huge amount of
 Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Tim Smith
 as i hear these questions asked
 a lot in my talks and such.  and i think a clear, crisp story on scaling
 and fault-tolerance will help Spark Streaming's adoption.

 hope that helps!

 -chris




 On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that
 the
 raw data was being computed again and that's not happening after
 further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should
 be able
 to access it once more and not having to pool it back again from Kafka
 or
 any other stream that is being affected by this issue. If for example
 there
 is a big amount of batches to be recomputed I would rather have them
 done
 distributed than overloading the batch interval with huge amount of
 Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread bharatvenkat
Chris,

I did the Dstream.repartition mentioned in the document on parallelism in
receiving, as well as set spark.default.parallelism and it still uses only
2 nodes in my cluster.  I notice there is another email thread on the same
topic:

http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html

My code is in Java and here is what I have:

   JavaPairReceiverInputDStreamString, String messages =

KafkaUtils.createStream(ssc, zkQuorum,
cse-job-play-consumer, kafkaTopicMap);

JavaPairDStreamString, String newMessages =
messages.repartition(partitionSize);// partitionSize=30

JavaDStreamString lines = newMessages.map(new
FunctionTuple2lt;String, String, String() {
...

public String call(Tuple2String, String tuple2) {
  return tuple2._2();
}
  });

JavaDStreamString words = lines.flatMap(new
MetricsComputeFunction()
);

JavaPairDStreamString, Integer wordCounts = words.mapToPair(
new PairFunctionString, String, Integer() {
   ...
}
);

 wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer,
Void() {...});

Thanks,
Bharat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Jonathan Hodges
'this 2-node replication is mainly for failover in case the receiver dies
while data is in flight.  there's still chance for data loss as there's no
write ahead log on the hot path, but this is being addressed.'

Can you comment a little on how this will be addressed, will there be a
durable WAL?  Is there a JIRA for tracking this effort?

I am curious without WAL if you can avoid this data loss with explicit
management of Kafka offsets e.g. don't commit offset unless data is
replicated to multiple nodes or maybe not until processed.  The incoming
data will always be durably stored to disk in Kafka so can be replayed in
failure scenarios to avoid data loss if the offsets are managed properly.




On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote:

 @bharat-

 overall, i've noticed a lot of confusion about how Spark Streaming scales
 - as well as how it handles failover and checkpointing, but we can discuss
 that separately.

 there's actually 2 dimensions to scaling here:  receiving and processing.

 *Receiving*
 receiving can be scaled out by submitting new DStreams/Receivers to the
 cluster as i've done in the Kinesis example.  in fact, i purposely chose to
 submit multiple receivers in my Kinesis example because i feel it should be
 the norm and not the exception - particularly for partitioned and
 checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
 only way to scale.

 a side note here is that each receiver running in the cluster will
 immediately replicates to 1 other node for fault-tolerance of that specific
 receiver.  this is where the confusion lies.  this 2-node replication is
 mainly for failover in case the receiver dies while data is in flight.
  there's still chance for data loss as there's no write ahead log on the
 hot path, but this is being addressed.

 this in mentioned in the docs here:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 *Processing*
 once data is received, tasks are scheduled across the Spark cluster just
 like any other non-streaming task where you can specify the number of
 partitions for reduces, etc.  this is the part of scaling that is sometimes
 overlooked - probably because it works just like regular Spark, but it is
 worth highlighting.

 Here's a blurb in the docs:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

 the other thing that's confusing with Spark Streaming is that in Scala,
 you need to explicitly

 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

 in order to pick up the implicits that allow DStream.reduceByKey and such
 (versus DStream.transform(rddBatch = rddBatch.reduceByKey())

 in other words, DStreams appear to be relatively featureless until you
 discover this implicit.  otherwise, you need to operate on the underlying
 RDD's explicitly which is not ideal.

 the Kinesis example referenced earlier in the thread uses the DStream
 implicits.


 side note to all of this - i've recently convinced my publisher for my
 upcoming book, Spark In Action, to let me jump ahead and write the Spark
 Streaming chapter ahead of other more well-understood libraries.  early
 release is in a month or so.  sign up  @ http://sparkinaction.com if you
 wanna get notified.

 shameless plug that i wouldn't otherwise do, but i really think it will
 help clear a lot of confusion in this area as i hear these questions asked
 a lot in my talks and such.  and i think a clear, crisp story on scaling
 and fault-tolerance will help Spark Streaming's adoption.

 hope that helps!

 -chris




 On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example
 there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html

Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Good to see I am not the only one who cannot get incoming Dstreams to
repartition. I tried repartition(512) but still no luck - the app
stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
release notes for 1.0.1 and 1.0.2, I don't see anything that says this
was an issue and has been fixed.

How do I debug the repartition() statement to see what's the flow
after the job hits that statement?


On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote:
 Chris,

 I did the Dstream.repartition mentioned in the document on parallelism in
 receiving, as well as set spark.default.parallelism and it still uses only
 2 nodes in my cluster.  I notice there is another email thread on the same
 topic:

 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html

 My code is in Java and here is what I have:

JavaPairReceiverInputDStreamString, String messages =

 KafkaUtils.createStream(ssc, zkQuorum,
 cse-job-play-consumer, kafkaTopicMap);

 JavaPairDStreamString, String newMessages =
 messages.repartition(partitionSize);// partitionSize=30

 JavaDStreamString lines = newMessages.map(new
 FunctionTuple2lt;String, String, String() {
 ...

 public String call(Tuple2String, String tuple2) {
   return tuple2._2();
 }
   });

 JavaDStreamString words = lines.flatMap(new
 MetricsComputeFunction()
 );

 JavaPairDStreamString, Integer wordCounts = words.mapToPair(
 new PairFunctionString, String, Integer() {
...
 }
 );

  wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer,
 Void() {...});

 Thanks,
 Bharat



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
I create my DStream very simply as:
val kInMsg =
KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
- 8))
.
.
eventually, before I operate on the DStream, I repartition it:
kInMsg.repartition(512)

Are you saying that ^^ repartition doesn't split by dstream into multiple
smaller streams? Should I manually create multiple Dstreams like this?:
val kInputs = (1 to 10).map {_= KafkaUtils.createStream()}

Then I apply some custom logic to it as:
val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where
normalizeLog takes a String and Map of regex and returns a string

In my case, I think I have traced the issue to the receiver executor being
killed by Yarn:
14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
node-dn1-4-acme.com: remote Akka client disassociated

This be the root cause?
http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
https://issues.apache.org/jira/browse/SPARK-2121





On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote:

 Are you using multiple Dstreams? repartitioning does not affect how
 many receivers you have. It's on 2 nodes for each receiver. You need
 multiple partitions in the queue, each consumed by a DStream, if you
 mean to parallelize consuming the queue.

 On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote:
  Good to see I am not the only one who cannot get incoming Dstreams to
  repartition. I tried repartition(512) but still no luck - the app
  stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
  release notes for 1.0.1 and 1.0.2, I don't see anything that says this
  was an issue and has been fixed.
 
  How do I debug the repartition() statement to see what's the flow
  after the job hits that statement?
 
 
  On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:
  Chris,
 
  I did the Dstream.repartition mentioned in the document on parallelism
 in
  receiving, as well as set spark.default.parallelism and it still uses
 only
  2 nodes in my cluster.  I notice there is another email thread on the
 same
  topic:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
 
  My code is in Java and here is what I have:
 
 JavaPairReceiverInputDStreamString, String messages =
 
  KafkaUtils.createStream(ssc, zkQuorum,
  cse-job-play-consumer, kafkaTopicMap);
 
  JavaPairDStreamString, String newMessages =
  messages.repartition(partitionSize);// partitionSize=30
 
  JavaDStreamString lines = newMessages.map(new
  FunctionTuple2lt;String, String, String() {
  ...
 
  public String call(Tuple2String, String tuple2) {
return tuple2._2();
  }
});
 
  JavaDStreamString words = lines.flatMap(new
  MetricsComputeFunction()
  );
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
 ...
  }
  );
 
   wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String,
 Integer,
  Void() {...});
 
  Thanks,
  Bharat
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Ok, so I did this:
val kInStreams = (1 to 10).map{_ =
KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
- 1)) }
val kInMsg = ssc.union(kInStreams)
val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))

This has improved parallelism. Earlier I would only get a Stream 0. Now I
have Streams [0-9]. Of course, since the kafka topic has only three
partitions, only three of those streams are active but I am seeing more
blocks being pulled across the three streams total that what one was doing
earlier. Also, four nodes are actively processing tasks (vs only two
earlier) now which actually has me confused. If Streams are active only
on 3 nodes then how/why did a 4th node get work? If a 4th got work why
aren't more nodes getting work?






On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith secs...@gmail.com wrote:

 I create my DStream very simply as:
 val kInMsg =
 KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
 - 8))
 .
 .
 eventually, before I operate on the DStream, I repartition it:
 kInMsg.repartition(512)

 Are you saying that ^^ repartition doesn't split by dstream into multiple
 smaller streams? Should I manually create multiple Dstreams like this?:
 val kInputs = (1 to 10).map {_= KafkaUtils.createStream()}

 Then I apply some custom logic to it as:
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where
 normalizeLog takes a String and Map of regex and returns a string

 In my case, I think I have traced the issue to the receiver executor being
 killed by Yarn:
 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
 node-dn1-4-acme.com: remote Akka client disassociated

 This be the root cause?

 http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
 https://issues.apache.org/jira/browse/SPARK-2121





 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote:

 Are you using multiple Dstreams? repartitioning does not affect how
 many receivers you have. It's on 2 nodes for each receiver. You need
 multiple partitions in the queue, each consumed by a DStream, if you
 mean to parallelize consuming the queue.

 On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote:
  Good to see I am not the only one who cannot get incoming Dstreams to
  repartition. I tried repartition(512) but still no luck - the app
  stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
  release notes for 1.0.1 and 1.0.2, I don't see anything that says this
  was an issue and has been fixed.
 
  How do I debug the repartition() statement to see what's the flow
  after the job hits that statement?
 
 
  On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:
  Chris,
 
  I did the Dstream.repartition mentioned in the document on parallelism
 in
  receiving, as well as set spark.default.parallelism and it still
 uses only
  2 nodes in my cluster.  I notice there is another email thread on the
 same
  topic:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
 
  My code is in Java and here is what I have:
 
 JavaPairReceiverInputDStreamString, String messages =
 
  KafkaUtils.createStream(ssc, zkQuorum,
  cse-job-play-consumer, kafkaTopicMap);
 
  JavaPairDStreamString, String newMessages =
  messages.repartition(partitionSize);// partitionSize=30
 
  JavaDStreamString lines = newMessages.map(new
  FunctionTuple2lt;String, String, String() {
  ...
 
  public String call(Tuple2String, String tuple2) {
return tuple2._2();
  }
});
 
  JavaDStreamString words = lines.flatMap(new
  MetricsComputeFunction()
  );
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
 ...
  }
  );
 
   wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String,
 Integer,
  Void() {...});
 
  Thanks,
  Bharat
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Re: Low Level Kafka Consumer for Spark

2014-08-28 Thread Chris Fregly
@bharat-

overall, i've noticed a lot of confusion about how Spark Streaming scales -
as well as how it handles failover and checkpointing, but we can discuss
that separately.

there's actually 2 dimensions to scaling here:  receiving and processing.

*Receiving*
receiving can be scaled out by submitting new DStreams/Receivers to the
cluster as i've done in the Kinesis example.  in fact, i purposely chose to
submit multiple receivers in my Kinesis example because i feel it should be
the norm and not the exception - particularly for partitioned and
checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
only way to scale.

a side note here is that each receiver running in the cluster will
immediately replicates to 1 other node for fault-tolerance of that specific
receiver.  this is where the confusion lies.  this 2-node replication is
mainly for failover in case the receiver dies while data is in flight.
 there's still chance for data loss as there's no write ahead log on the
hot path, but this is being addressed.

this in mentioned in the docs here:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

*Processing*
once data is received, tasks are scheduled across the Spark cluster just
like any other non-streaming task where you can specify the number of
partitions for reduces, etc.  this is the part of scaling that is sometimes
overlooked - probably because it works just like regular Spark, but it is
worth highlighting.

Here's a blurb in the docs:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

the other thing that's confusing with Spark Streaming is that in Scala, you
need to explicitly

import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

in order to pick up the implicits that allow DStream.reduceByKey and such
(versus DStream.transform(rddBatch = rddBatch.reduceByKey())

in other words, DStreams appear to be relatively featureless until you
discover this implicit.  otherwise, you need to operate on the underlying
RDD's explicitly which is not ideal.

the Kinesis example referenced earlier in the thread uses the DStream
implicits.


side note to all of this - i've recently convinced my publisher for my
upcoming book, Spark In Action, to let me jump ahead and write the Spark
Streaming chapter ahead of other more well-understood libraries.  early
release is in a month or so.  sign up  @ http://sparkinaction.com if you
wanna get notified.

shameless plug that i wouldn't otherwise do, but i really think it will
help clear a lot of confusion in this area as i hear these questions asked
a lot in my talks and such.  and i think a clear, crisp story on scaling
and fault-tolerance will help Spark Streaming's adoption.

hope that helps!

-chris




On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example
 there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-27 Thread Bharat Venkat
Hi Dibyendu,

That would be great.  One of the biggest drawback of Kafka utils as well as
your implementation is I am unable to scale out processing.  I am
relatively new to Spark and Spark Streaming - from what I read and what I
observe with my deployment is that having the RDD created on one receiver
is processed by at most 2 nodes in my cluster (most likely because default
replication is 2 and spark schedules processing close to where the data
is).  I tried rdd.replicate() to no avail.

Would Chris and your proposal to have union of DStreams for all these
Receivers still allow scaling out subsequent processing?

Thanks,
Bharat


On Tue, Aug 26, 2014 at 10:59 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:


 Thanks Chris and Bharat for your inputs. I agree, running multiple
 receivers/dstreams is desirable for scalability and fault tolerant. and
 this is easily doable. In present KafkaReceiver I am creating as many
 threads for each kafka topic partitions, but I can definitely create
 multiple KafkaReceivers for every partition. As Chris mentioned , in this
 case I need to then have union of DStreams for all these Receivers. I will
 try this out and let you know.

 Dib


 On Wed, Aug 27, 2014 at 9:10 AM, Chris Fregly ch...@fregly.com wrote:

 great work, Dibyendu.  looks like this would be a popular contribution.

 expanding on bharat's question a bit:

 what happens if you submit multiple receivers to the cluster by creating
 and unioning multiple DStreams as in the kinesis example here:


 https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123

 for more context, the kinesis implementation above uses the Kinesis
 Client Library (KCL) to automatically assign - and load balance - stream
 shards among all KCL threads from all receivers (potentially coming and
 going as nodes die) on all executors/nodes using DynamoDB as the
 association data store.

 ZooKeeper would be used for your Kafka consumers, of course.  and
 ZooKeeper watches to handle the ephemeral nodes.  and I see you're using
 Curator, which makes things easier.

 as bharat suggested, running multiple receivers/dstreams may be desirable
 from a scalability and fault tolerance standpoint.  is this type of load
 balancing possible among your different Kafka consumers running in
 different ephemeral JVMs?

 and isn't it fun proposing a popular piece of code?  the question
 floodgates have opened!  haha. :)

  -chris



 On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Bharat,

 Thanks for your email. If the Kafka Reader worker process dies, it
 will be replaced by different machine, and it will start consuming from the
 offset where it left over ( for each partition). Same case can happen even
 if I tried to have individual Receiver for every partition.

 Regards,
 Dibyendu


 On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:

 I like this consumer for what it promises - better control over offset
 and
 recovery from failures.  If I understand this right, it still uses
 single
 worker process to read from Kafka (one thread per partition) - is there
 a
 way to specify multiple worker processes (on different machines) to read
 from Kafka?  Maybe one worker process for each partition?

 If there is no such option, what happens when the single machine
 hosting the
 Kafka Reader worker process dies and is replaced by a different
 machine
 (like in cloud)?

 Thanks,
 Bharat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Low Level Kafka Consumer for Spark

2014-08-27 Thread Dibyendu Bhattacharya
I agree. This issue should be fixed in Spark rather rely on replay of Kafka
messages.

Dib
On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Hi,

As I understand, your problem is similar to this JIRA.

https://issues.apache.org/jira/browse/SPARK-1647

The issue in this case, Kafka can not replay the message as offsets are
already committed. Also I think existing KafkaUtils ( The Default High
Level Kafka Consumer) also have this issue.

Similar discussion is there in this thread also...

http://apache-spark-user-list.1001560.n3.nabble.com/Data-loss-Spark-streaming-and-network-receiver-td12337.html

As I am thinking, it is possible to tackle this in the consumer code I have
written. If we can store the topic partition_id and consumed offset in ZK
after every checkpoint , then after Spark recover from the fail over, the
present PartitionManager code can start reading from last checkpointed
offset ( instead last committed offset as it is doing now) ..In that case
it can replay the data since last checkpoint.

I will think over it ..

Regards,
Dibyendu



On Mon, Aug 25, 2014 at 11:23 PM, RodrigoB rodrigo.boav...@aspect.com
wrote:

 Hi Dibyendu,

 My colleague has taken a look at the spark kafka consumer github you have
 provided and started experimenting.

 We found that somehow when Spark has a failure after a data checkpoint, the
 expected re-computations correspondent to the metadata checkpoints are not
 recovered so we loose Kafka messages and RDD's computations in Spark.
 The impression is that this code is replacing quite a bit of Spark Kafka
 Streaming code where maybe (not sure) metadata checkpoints are done every
 batch interval.

 Was it on purpose to solely depend on the Kafka commit to recover data and
 recomputations between data checkpoints? If so, how to make this work?

 tnks
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Hi Bharat,

Thanks for your email. If the Kafka Reader worker process dies, it will
be replaced by different machine, and it will start consuming from the
offset where it left over ( for each partition). Same case can happen even
if I tried to have individual Receiver for every partition.

Regards,
Dibyendu


On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com
wrote:

 I like this consumer for what it promises - better control over offset and
 recovery from failures.  If I understand this right, it still uses single
 worker process to read from Kafka (one thread per partition) - is there a
 way to specify multiple worker processes (on different machines) to read
 from Kafka?  Maybe one worker process for each partition?

 If there is no such option, what happens when the single machine hosting
 the
 Kafka Reader worker process dies and is replaced by a different machine
 (like in cloud)?

 Thanks,
 Bharat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Chris Fregly
great work, Dibyendu.  looks like this would be a popular contribution.

expanding on bharat's question a bit:

what happens if you submit multiple receivers to the cluster by creating
and unioning multiple DStreams as in the kinesis example here:

https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123

for more context, the kinesis implementation above uses the Kinesis Client
Library (KCL) to automatically assign - and load balance - stream shards
among all KCL threads from all receivers (potentially coming and going as
nodes die) on all executors/nodes using DynamoDB as the association data
store.

ZooKeeper would be used for your Kafka consumers, of course.  and ZooKeeper
watches to handle the ephemeral nodes.  and I see you're using Curator,
which makes things easier.

as bharat suggested, running multiple receivers/dstreams may be desirable
from a scalability and fault tolerance standpoint.  is this type of load
balancing possible among your different Kafka consumers running in
different ephemeral JVMs?

and isn't it fun proposing a popular piece of code?  the question
floodgates have opened!  haha. :)

-chris



On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Bharat,

 Thanks for your email. If the Kafka Reader worker process dies, it will
 be replaced by different machine, and it will start consuming from the
 offset where it left over ( for each partition). Same case can happen even
 if I tried to have individual Receiver for every partition.

 Regards,
 Dibyendu


 On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:

 I like this consumer for what it promises - better control over offset and
 recovery from failures.  If I understand this right, it still uses single
 worker process to read from Kafka (one thread per partition) - is there a
 way to specify multiple worker processes (on different machines) to read
 from Kafka?  Maybe one worker process for each partition?

 If there is no such option, what happens when the single machine hosting
 the
 Kafka Reader worker process dies and is replaced by a different machine
 (like in cloud)?

 Thanks,
 Bharat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Low Level Kafka Consumer for Spark

2014-08-25 Thread RodrigoB
Hi Dibyendu,

My colleague has taken a look at the spark kafka consumer github you have
provided and started experimenting.

We found that somehow when Spark has a failure after a data checkpoint, the
expected re-computations correspondent to the metadata checkpoints are not
recovered so we loose Kafka messages and RDD's computations in Spark. 
The impression is that this code is replacing quite a bit of Spark Kafka
Streaming code where maybe (not sure) metadata checkpoints are done every
batch interval.

Was it on purpose to solely depend on the Kafka commit to recover data and
recomputations between data checkpoints? If so, how to make this work?

tnks
Rod 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-25 Thread bharatvenkat
I like this consumer for what it promises - better control over offset and
recovery from failures.  If I understand this right, it still uses single
worker process to read from Kafka (one thread per partition) - is there a
way to specify multiple worker processes (on different machines) to read
from Kafka?  Maybe one worker process for each partition?

If there is no such option, what happens when the single machine hosting the
Kafka Reader worker process dies and is replaced by a different machine
(like in cloud)?

Thanks,
Bharat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-05 Thread Dibyendu Bhattacharya
Thanks Jonathan,

Yes, till non-ZK based offset management is available in Kafka, I need to
maintain the offset in ZK. And yes, both cases explicit commit is
necessary. I modified the Low Level Kafka Spark Consumer little bit to have
Receiver spawns threads for every partition of the topic and perform the
'store' operation in multiple threads. It would be good if the
receiver.store methods are made thread safe..which is not now presently .

Waiting for TD's comment on this Kafka Spark Low Level consumer.


Regards,
Dibyendu



On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges hodg...@gmail.com wrote:

 Hi Yan,

 That is a good suggestion.  I believe non-Zookeeper offset management will
 be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for
 September.


 https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

 That should make this fairly easy to implement, but it will still require
 explicit offset commits to avoid data loss which is different than the
 current KafkaUtils implementation.

 Jonathan





 On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote:

 Another suggestion that may help is that, you can consider use Kafka to
 store the latest offset instead of Zookeeper. There are at least two
 benefits: 1) lower the workload of ZK 2) support replay from certain
 offset. This is how Samza http://samza.incubator.apache.org/ deals
 with the Kafka offset, the doc is here
 http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html
  .
 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 I'll let TD chime on on this one, but I'm guessing this would be a
 welcome addition. It's great to see community effort on adding new
 streams/receivers, adding a Java API for receivers was something we did
 specifically to allow this :)

 - Patrick


 On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 I have implemented a Low Level Kafka Consumer for Spark Streaming using
 Kafka Simple Consumer API. This API will give better control over the Kafka
 offset management and recovery from failures. As the present Spark
 KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
 control over the offset management which is not possible in Kafka HighLevel
 consumer.

 This Project is available in below Repo :

 https://github.com/dibbhatt/kafka-spark-consumer


 I have implemented a Custom Receiver
 consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka
 Consumer API (implemented in consumer.kafka packages) to fetch messages
 from Kafka and 'store' it in Spark.

 The logic will detect number of partitions for a topic and spawn that
 many threads (Individual instances of Consumers). Kafka Consumer uses
 Zookeeper for storing the latest offset for individual partitions, which
 will help to recover in case of failure. The Kafka Consumer logic is
 tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker
 failures,  recovery from offset errors and other fail-over aspects.

 The consumer.kafka.client.Consumer is the sample Consumer which uses
 this Kafka Receivers to generate DStreams from Kafka and apply a Output
 operation for every messages of the RDD.

 We are planning to use this Kafka Spark Consumer to perform Near Real
 Time Indexing of Kafka Messages to target Search Cluster and also Near Real
 Time Aggregation using target NoSQL storage.

 Kindly let me know your view. Also if this looks good, can I contribute
 to Spark Streaming project.

 Regards,
 Dibyendu







Re: Low Level Kafka Consumer for Spark

2014-08-04 Thread Jonathan Hodges
Hi Yan,

That is a good suggestion.  I believe non-Zookeeper offset management will
be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for
September.

https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

That should make this fairly easy to implement, but it will still require
explicit offset commits to avoid data loss which is different than the
current KafkaUtils implementation.

Jonathan





On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang yanfang...@gmail.com wrote:

 Another suggestion that may help is that, you can consider use Kafka to
 store the latest offset instead of Zookeeper. There are at least two
 benefits: 1) lower the workload of ZK 2) support replay from certain
 offset. This is how Samza http://samza.incubator.apache.org/ deals with
 the Kafka offset, the doc is here
 http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html
  .
 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 I'll let TD chime on on this one, but I'm guessing this would be a
 welcome addition. It's great to see community effort on adding new
 streams/receivers, adding a Java API for receivers was something we did
 specifically to allow this :)

 - Patrick


 On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi,

 I have implemented a Low Level Kafka Consumer for Spark Streaming using
 Kafka Simple Consumer API. This API will give better control over the Kafka
 offset management and recovery from failures. As the present Spark
 KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
 control over the offset management which is not possible in Kafka HighLevel
 consumer.

 This Project is available in below Repo :

 https://github.com/dibbhatt/kafka-spark-consumer


 I have implemented a Custom Receiver
 consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka
 Consumer API (implemented in consumer.kafka packages) to fetch messages
 from Kafka and 'store' it in Spark.

 The logic will detect number of partitions for a topic and spawn that
 many threads (Individual instances of Consumers). Kafka Consumer uses
 Zookeeper for storing the latest offset for individual partitions, which
 will help to recover in case of failure. The Kafka Consumer logic is
 tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker
 failures,  recovery from offset errors and other fail-over aspects.

 The consumer.kafka.client.Consumer is the sample Consumer which uses
 this Kafka Receivers to generate DStreams from Kafka and apply a Output
 operation for every messages of the RDD.

 We are planning to use this Kafka Spark Consumer to perform Near Real
 Time Indexing of Kafka Messages to target Search Cluster and also Near Real
 Time Aggregation using target NoSQL storage.

 Kindly let me know your view. Also if this looks good, can I contribute
 to Spark Streaming project.

 Regards,
 Dibyendu






Low Level Kafka Consumer for Spark

2014-08-02 Thread Dibyendu Bhattacharya
Hi,

I have implemented a Low Level Kafka Consumer for Spark Streaming using
Kafka Simple Consumer API. This API will give better control over the Kafka
offset management and recovery from failures. As the present Spark
KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
control over the offset management which is not possible in Kafka HighLevel
consumer.

This Project is available in below Repo :

https://github.com/dibbhatt/kafka-spark-consumer


I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver.
The KafkaReceiver uses low level Kafka Consumer API (implemented in
consumer.kafka packages) to fetch messages from Kafka and 'store' it in
Spark.

The logic will detect number of partitions for a topic and spawn that many
threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper
for storing the latest offset for individual partitions, which will help to
recover in case of failure. The Kafka Consumer logic is tolerant to ZK
Failures, Kafka Leader of Partition changes, Kafka broker failures,
 recovery from offset errors and other fail-over aspects.

The consumer.kafka.client.Consumer is the sample Consumer which uses this
Kafka Receivers to generate DStreams from Kafka and apply a Output
operation for every messages of the RDD.

We are planning to use this Kafka Spark Consumer to perform Near Real Time
Indexing of Kafka Messages to target Search Cluster and also Near Real Time
Aggregation using target NoSQL storage.

Kindly let me know your view. Also if this looks good, can I contribute to
Spark Streaming project.

Regards,
Dibyendu