Streaming Receiverless Kafka API + Offset Management

2015-11-16 Thread Nick Evans
I really like the Streaming receiverless API for Kafka streaming jobs, but
I'm finding the manual offset management adds a fair bit of complexity. I'm
sure that others feel the same way, so I'm proposing that we add the
ability to have consumer offsets managed via an easy-to-use API. This would
be done similarly to how it is done in the receiver API.

I haven't written any code yet, but I've looked at the current version of
the codebase and have an idea of how it could be done.

To keep the size of the pull requests small, I propose that the following
distinct features are added in order:

   1. If a group ID is set in the Kafka params, and also if fromOffsets is
   not passed in to createDirectStream, then attempt to resume from the
   remembered offsets for that group ID.
   2. Add a method on KafkaRDDs that commits the offsets for that KafkaRDD
   to Zookeeper.
   3. Update the Python API with any necessary changes.

My goal is to not break the existing API while adding the new functionality.

One point that I'm not sure of is regarding the first point. I'm not sure
whether it's a better idea to set the group ID as mentioned through Kafka
params, or to define a new overload of createDirectStream that expects the
group ID in place of the fromOffsets param. I think the latter is a cleaner
interface, but I'm not sure whether adding a new param is a good idea.

If anyone has any feedback on this general approach, I'd be very grateful.
I'm going to open a JIRA in the next couple days and begin working on the
first point, but I think comments from the community would be very helpful
on building a good API here.


Re: Streaming Receiverless Kafka API + Offset Management

2015-11-16 Thread Cody Koeninger
There are already private methods in the code for interacting with Kafka's
offset management api.

There's a jira for making those methods public, but TD has been reluctant
to merge it

https://issues.apache.org/jira/browse/SPARK-10963

I think adding any ZK specific behavior to spark is a bad idea, since ZK
may no longer be the preferred storage location for Kafka offsets within
the next year.



On Mon, Nov 16, 2015 at 9:53 AM, Nick Evans  wrote:

> I really like the Streaming receiverless API for Kafka streaming jobs, but
> I'm finding the manual offset management adds a fair bit of complexity. I'm
> sure that others feel the same way, so I'm proposing that we add the
> ability to have consumer offsets managed via an easy-to-use API. This would
> be done similarly to how it is done in the receiver API.
>
> I haven't written any code yet, but I've looked at the current version of
> the codebase and have an idea of how it could be done.
>
> To keep the size of the pull requests small, I propose that the following
> distinct features are added in order:
>
>1. If a group ID is set in the Kafka params, and also if fromOffsets
>is not passed in to createDirectStream, then attempt to resume from the
>remembered offsets for that group ID.
>2. Add a method on KafkaRDDs that commits the offsets for that
>KafkaRDD to Zookeeper.
>3. Update the Python API with any necessary changes.
>
> My goal is to not break the existing API while adding the new
> functionality.
>
> One point that I'm not sure of is regarding the first point. I'm not sure
> whether it's a better idea to set the group ID as mentioned through Kafka
> params, or to define a new overload of createDirectStream that expects the
> group ID in place of the fromOffsets param. I think the latter is a cleaner
> interface, but I'm not sure whether adding a new param is a good idea.
>
> If anyone has any feedback on this general approach, I'd be very grateful.
> I'm going to open a JIRA in the next couple days and begin working on the
> first point, but I think comments from the community would be very helpful
> on building a good API here.
>
>


Re: Streaming Receiverless Kafka API + Offset Management

2015-11-16 Thread Saisai Shao
Kafka now build-in supports managing metadata itself besides ZK, it is easy
to use and change from current ZK implementation. I think here the problem
is do we need to manage offset in Spark Streaming level or leave this
question to user.

If you want to manage offset in user level, letting Spark to offer a
convenient API, I think Cody's patch (
https://issues.apache.org/jira/browse/SPARK-10963) could satisfy your needs.

If you hope to let Spark Streaming to manage offsets for you (transparent
to the user level), I think I had a PR before but the community inclines to
leave this to user level.

On Tue, Nov 17, 2015 at 9:27 AM, Nick Evans  wrote:

> The only dependancy on Zookeeper I see is here:
> https://github.com/apache/spark/blob/1c5475f1401d2233f4c61f213d1e2c2ee9673067/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L244-L247
>
> If that's the only line that depends on Zookeeper, we could probably try
> to implement an abstract offset manager that could be switched out in
> favour of the new offset management system, yes? I
> know kafka.consumer.Consumer currently depends on Zookeeper, but I'm
> guessing this library will eventually be updated to use the new method.
>
> On Mon, Nov 16, 2015 at 5:28 PM, Cody Koeninger 
> wrote:
>
>> There are already private methods in the code for interacting with
>> Kafka's offset management api.
>>
>> There's a jira for making those methods public, but TD has been reluctant
>> to merge it
>>
>> https://issues.apache.org/jira/browse/SPARK-10963
>>
>> I think adding any ZK specific behavior to spark is a bad idea, since ZK
>> may no longer be the preferred storage location for Kafka offsets within
>> the next year.
>>
>>
>>
>> On Mon, Nov 16, 2015 at 9:53 AM, Nick Evans  wrote:
>>
>>> I really like the Streaming receiverless API for Kafka streaming jobs,
>>> but I'm finding the manual offset management adds a fair bit of complexity.
>>> I'm sure that others feel the same way, so I'm proposing that we add the
>>> ability to have consumer offsets managed via an easy-to-use API. This would
>>> be done similarly to how it is done in the receiver API.
>>>
>>> I haven't written any code yet, but I've looked at the current version
>>> of the codebase and have an idea of how it could be done.
>>>
>>> To keep the size of the pull requests small, I propose that the
>>> following distinct features are added in order:
>>>
>>>1. If a group ID is set in the Kafka params, and also if fromOffsets
>>>is not passed in to createDirectStream, then attempt to resume from the
>>>remembered offsets for that group ID.
>>>2. Add a method on KafkaRDDs that commits the offsets for that
>>>KafkaRDD to Zookeeper.
>>>3. Update the Python API with any necessary changes.
>>>
>>> My goal is to not break the existing API while adding the new
>>> functionality.
>>>
>>> One point that I'm not sure of is regarding the first point. I'm not
>>> sure whether it's a better idea to set the group ID as mentioned through
>>> Kafka params, or to define a new overload of createDirectStream that
>>> expects the group ID in place of the fromOffsets param. I think the latter
>>> is a cleaner interface, but I'm not sure whether adding a new param is a
>>> good idea.
>>>
>>> If anyone has any feedback on this general approach, I'd be very
>>> grateful. I'm going to open a JIRA in the next couple days and begin
>>> working on the first point, but I think comments from the community would
>>> be very helpful on building a good API here.
>>>
>>>
>>
>
>
> --
> *Nick Evans* 
> P. (613) 793-5565
> LinkedIn  | Website 
>
>


Re: Streaming Receiverless Kafka API + Offset Management

2015-11-16 Thread Nick Evans
The only dependancy on Zookeeper I see is here:
https://github.com/apache/spark/blob/1c5475f1401d2233f4c61f213d1e2c2ee9673067/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L244-L247

If that's the only line that depends on Zookeeper, we could probably try to
implement an abstract offset manager that could be switched out in favour
of the new offset management system, yes? I know kafka.consumer.Consumer
currently depends on Zookeeper, but I'm guessing this library will
eventually be updated to use the new method.

On Mon, Nov 16, 2015 at 5:28 PM, Cody Koeninger  wrote:

> There are already private methods in the code for interacting with Kafka's
> offset management api.
>
> There's a jira for making those methods public, but TD has been reluctant
> to merge it
>
> https://issues.apache.org/jira/browse/SPARK-10963
>
> I think adding any ZK specific behavior to spark is a bad idea, since ZK
> may no longer be the preferred storage location for Kafka offsets within
> the next year.
>
>
>
> On Mon, Nov 16, 2015 at 9:53 AM, Nick Evans  wrote:
>
>> I really like the Streaming receiverless API for Kafka streaming jobs,
>> but I'm finding the manual offset management adds a fair bit of complexity.
>> I'm sure that others feel the same way, so I'm proposing that we add the
>> ability to have consumer offsets managed via an easy-to-use API. This would
>> be done similarly to how it is done in the receiver API.
>>
>> I haven't written any code yet, but I've looked at the current version of
>> the codebase and have an idea of how it could be done.
>>
>> To keep the size of the pull requests small, I propose that the following
>> distinct features are added in order:
>>
>>1. If a group ID is set in the Kafka params, and also if fromOffsets
>>is not passed in to createDirectStream, then attempt to resume from the
>>remembered offsets for that group ID.
>>2. Add a method on KafkaRDDs that commits the offsets for that
>>KafkaRDD to Zookeeper.
>>3. Update the Python API with any necessary changes.
>>
>> My goal is to not break the existing API while adding the new
>> functionality.
>>
>> One point that I'm not sure of is regarding the first point. I'm not sure
>> whether it's a better idea to set the group ID as mentioned through Kafka
>> params, or to define a new overload of createDirectStream that expects the
>> group ID in place of the fromOffsets param. I think the latter is a cleaner
>> interface, but I'm not sure whether adding a new param is a good idea.
>>
>> If anyone has any feedback on this general approach, I'd be very
>> grateful. I'm going to open a JIRA in the next couple days and begin
>> working on the first point, but I think comments from the community would
>> be very helpful on building a good API here.
>>
>>
>


-- 
*Nick Evans* 
P. (613) 793-5565
LinkedIn  | Website