[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-11-05 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992208#comment-14992208
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~hachikuji] [~guozhang] I noticed that currently if we pause a partition, we 
only pause data fetch, but not pausing offset commit. Should we also pause 
offset commit for paused partitions?

The motivation can be explained with the following example:
In mirror maker, if a message send failed due to reason such as message size 
too large, today to ensure no data loss, we need to stop the entire mirror 
maker. But with pause, we can simply pause consumption from that partition 
without shutting down the whole pipeline. The problem is that the failed 
message has already been delivered, so when we do producer.flush then commit, 
that message will be committed. Later on if rebalance occurs and another 
consumer takes over this partition, that failed message will be lost.

The workaround today is to:
1. Disable auto commit
2. Maintain an external offset map to commit offset with a map explicitly.
3. Implement a consumer rebalance listener to clean up the external map once 
rebalance occurs.

This is very involved and we probably don't want to force user to keep an 
external map to solve this issue because it could be a very common 
consumer-process-write_to_downstream pattern.

I suggest we do the following:
1. Stop committing offsets for a paused partition.
2. When pause() is called, if auto commit is turned on, we commit offset for 
the partition being paused.

This way if user saw an error and does not want to pause a partition without 
losing messages, they can simply do
{code}
commitSync(Map(partition->offset_of_failed_message));
pause(partition)
{code}

When rebalance occurs, another consumer will see the failed message again and 
will again pause this partition.

The only difference here is that user need to commit offset for a paused 
partition first if they are not using auto commit. But for those users who 
disable auto commits, they probably do care about the data loss. So it is much 
simpler than asking them to maintain an external offset map.

Thoughts?

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-11-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992357#comment-14992357
 ] 

Guozhang Wang commented on KAFKA-2350:
--

Could this be resolved by calling

{code}
consumer.seek(record.offset - 1)
consumer.pause(...)
{code}

So that we will only committing up to the previous message. Note that the 
previous message does not need to be existed: if let's say we fetch a compacted 
topic with message offsets (0, 5, 10), and we got a error on message with 
offset 10, seeking and restarting at offset 9 will let us to fetch message 10 
again.

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-11-05 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992452#comment-14992452
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

Ah, right. That would work. It still a bit weird we commit the same offset 
repeatedly, but it is probably OK.

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-11-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992466#comment-14992466
 ] 

Guozhang Wang commented on KAFKA-2350:
--

I think this can be fixed by filtering out the partitions whose consumed value 
is the same as the committed value, since we have the committed position in the 
subscription state as well.

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648329#comment-14648329
 ] 

ASF GitHub Bot commented on KAFKA-2350:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/100


 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 0.8.3


 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645343#comment-14645343
 ] 

ASF GitHub Bot commented on KAFKA-2350:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/100

KAFKA-2350; KafkaConsumer pause/resume API



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-2350

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/100.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #100


commit a82b60a48f47a24b55d5b07ddb1a22fbcf52
Author: Jason Gustafson ja...@confluent.io
Date:   2015-07-29T00:58:10Z

KAFKA-2350; KafkaConsumer pause/resume API




 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643249#comment-14643249
 ] 

Guozhang Wang commented on KAFKA-2350:
--

[~becket_qin], I was not considering the implementation from a developer point 
of view regarding call trace, but rather from a user point of view. That is, 
with two different names it is more clear about the distinction between topic / 
partition subscriptions. For another example, say you write some applications 
with a consumer client embedded, and very likely call its function from many 
places in your app code / classes. When you saw an exception thrown from 
unsubscribe(partition), you need to possibly look at other places and check 
if it is case 1) you used subscribe(topic), but it is not assigned from Kafka, 
2) you used subscribe(partition) on some other partitions, but you did not 
subscribe this before. Similarly, if you saw an exception thrown on your 
subscribe(partition) call, you need to check if 1) you called poll() in 
between so that partition is no longer assigned; 2) you called subscribe(topic) 
before and that partition is not one of the assigned partitions. I.e., you as 
the developer needs to check what consumer function calls it has done (the 
trace) before in order to trouble-shoot, when the code is not necessarily 
written by yourself. With pause / resume, it will be more clear that the 
consumer is certainly using topic subscriptions, and the partition is no longer 
assigned to you if those functions throw an exception because of a rebalance, 
etc.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643442#comment-14643442
 ] 

Jason Gustafson commented on KAFKA-2350:


[~becket_qin] I think that we're on the same page as far as supporting only 
automatic or manual assignment and not trying to mix them. I think my confusion 
is that subscribe(partition) in your proposal is used both a) to subscribe to a 
partition when manual assignment is used, and b) to unpause a partition when 
automatic assignment is used. This leads to the weird ordering problems that we 
have been talking about. By the way, I added the line about seek() and 
position() since it seems like something that intuitively should be supported 
by pause semantics. I think it's debatable whether it's really needed, but I 
think it would cause a bit a surprise to the user if we didn't allow it.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643349#comment-14643349
 ] 

Jason Gustafson commented on KAFKA-2350:


There's one interesting implementation note that [~yasuhiro.matsuda] and 
[~guozhang] brought up. When a partition is unpaused, there may be an active 
fetch which is parked on the broker. In the current implementation, the 
consumer will not initiate any new fetches until that fetch has completed. This 
means that the consumer will not be able to immediately process messages from 
the unpaused partition even if has some available. There are a couple ways this 
could be handled. We could issue the new fetch from a different socket on the 
client. We could also implement a way to cancel or override the active fetch on 
the broker. Since both of these make this patch significantly more complex, I 
think we should just note this limitation in the documentation and address it 
later if it becomes a larger problem.



 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another consumer. The desired behavior is instead that you keep the partition 
 assigned and simply 
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(TopicPartition... partitions);
 void resume(TopicPartition... partitions);
 {code}
 Here is the expected behavior of pause/resume:
 * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
 any new fetches for that partition.
 * After the partition is resumed, fetches will begin again. 
 * While a partition is paused, seek() and position() can still be used to 
 advance or query the current position.
 * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-27 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643418#comment-14643418
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~hachikuji], I am with [~guozhang] that it is much clear if we only support 
either auto partition assignment or manual partition assignment, but not mixed 
mode. I assumed pause/unpause will only be used when auto partition assignment 
is used, so we can check the assigned partition set. If it is under manual 
partition assignment, for the seek(), I assume that you will start consume 
after seek? If so, it might be the same as:
{code}
subscribe(tp)
seek(tp, offset)
poll()
{code}

[~guozhang], I see your point. I am convinced that for people who are using 
auto partition assignment, pause/unpause is more intuitive than using partition 
level sub/unsub. I am not really oppose to having them. What I was worrying is 
that we are adding methods that are intuitive to some particular use case, but 
potentially open the door for adding APIs that only have subtle differences. If 
we take a closer look at our API, there are potentially some other cases we can 
argue for new API. e.g. user might want to have auto commit turn on only for 
some but not all of the partitions they subscribed to. User might want to find 
a list of offsets of a partition within a time range. These are all different 
use cases, but likely can be solved with some lower level API calls instead of 
having a dedicate intuitive API for each of them.

I kind of feel the dilemma we are facing now is that in new consumer, we try to 
address both the high level consumer and low level consumer use cases. 
pause/unpause looks to me a medium-to-low level use case. As the higher level 
requirements can vary a lot and have subtle difference from one to another, the 
question to be answered is that should we expose a high level interface for 
each of the high level use case? Or should we just ask user to use a lower 
level API as long as we support the functionality.

My understanding is that for high level consumer use cases, hopefully we don't 
need user to care too much about the underlying mechanism. For people who wants 
to deal with lower level concept such as offsets, partition assignment, 
temporary consumption suspension, instead of having on high level API written 
for each of the use cases, letting user use a lower level API makes sense to me.

In terms of the example you mentioned, can we solve them by throwing 
appropriate exceptions?

{code}
// Auto partition assignment
subscribe(topic1) // assuming only topic1-partition0 is assigned to this 
consumer.
subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to 
topic1-partition1 because topic1 is managed by consumer coordinator)
unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 
is managed by consumer coordinator, and topic1-partition1 is not assigned to 
this consumer.)
{code}

{code}
subscribe(topic1-partition0)
subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 
because the assignment of topic1 is manually managed)
unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot 
unsubscribe topic1-partition1 because it is not subscribed)
{code}

[~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking 
at the problem from a different angle - user either wants to consume from a 
topic or not at a certain point, whether temporarily or permanently. So the 
state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be 
pretty much the same as NOT_CONSUME. I might have missed some use case like 
[~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by 
calling subscribe() first.

[~gwenshap], good point about heartbeat. We actually got some feedback from 
users in LinkedIn and found that putting the responsibility of sending 
heartbeat on user might be a problem in the first place... We may have 
pause/unpause as a workaround, but the ultimate issue is that maybe we are 
asking too much from user to maintain the heartbeat...

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
 a rebalance will be triggered and your partitions will be reassigned to 
 another 

[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-26 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642229#comment-14642229
 ] 

Neha Narkhede commented on KAFKA-2350:
--

Few thoughts:
bq. 1. Add pause/unpause
bq. 2. Allow subscribe(topic) followed by unsubscribe(partition) to subscribe 
to a topic but suppress a partition
bq. 3. Either of the above but making the use of group management explicit 
using an enable.group.management flag

I'm in favor of 1. for several reasons:
1. It keeps the API semantics clean. subscribe/unsubscribe indicates intent to 
consume data, while pause/resume indicates a *temporary* preference for the 
purposes of flow control.
2. It avoids all the different permutations of subscribe/unsubscribe that we 
will need to worry about and each one of those would have to make sense and be 
explained clearly to the user. This discussion is confusing enough that I'm 
convinced that it will not be easy.
3. pause/resume moves the consumer to a different state in its state diagram. 
Overloading the same API to represent two different states is unintuitive.

Also +1 on - 
1. Renaming unpause to resume.
2. Not maintaining the pause/resume preference across consumer rebalances.

There may be complications in the implementation of the above preferences that 
I may have overlooked, but I feel we should design APIs for the right behavior 
and figure out the implementation related issues that might come up as a 
result. 


 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-24 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14640958#comment-14640958
 ] 

Yasuhiro Matsuda commented on KAFKA-2350:
-

Throwing exceptions makes sense.
In addition, I think a consumer should not keep pause/unpause states across 
rebalance. Forgetting the states makes the consumer/application logic cleaner.


 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639657#comment-14639657
 ] 

Guozhang Wang commented on KAFKA-2350:
--

Personally I would prefer not mixing the topic subscription (i.e. use Kafka for 
assignment) with partition subscription (i.e. not using Kafka for assignment) 
since I feel its flexibility benefits is overwhelmed by the complexity of API 
semantics, mainly because (again, this is totally personally) I think there 
will be very few use cases that need both during on life cycle of a consumer: 
with the new consumer implementation, you can always have two consumers, one 
for topic subscription and one for partition subscription since the 
construction cost is quite low.

With that assumption that we are not going to support this mixing, I think what 
I was mainly concerned (also probably as Jay intended to describe in case 3 
above) is that the we will be effectively making the logic of whether throwing 
exception or not with subscribe(topic) and subscribe(partition) at the same 
time based on the call trace, for example:

{code}
subscribe(topicA);
unsubscribe(topicA-partition1);
subscribe(topicA-partition1);  // this will be OK
subscribe(topicC-partition1);  // this will be not OK, i.e. the logic depends 
on what subscribe / unsubscribe() you have called before
{code}

Plus, what [~yasuhiro.matsuda] mentioned is also a valid point, though 
rebalance will only happen during poll call. It is still wired to get

{code}
subscribe(topicA);
// then say assignment() return me topicA-partition1.
poll();
unsubscribe(topicA-partition1); // may throw an exception here, so you need to 
get the assignment each time before you try to unsubscribe / subscribe, since 
each poll() may change the assignment.
{code}

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639460#comment-14639460
 ] 

Yasuhiro Matsuda commented on KAFKA-2350:
-

Suppose we are using auto assignment.
{code}
subscribe(topic)  // A
unsubscribe(partition) // B
subscribe(partition) // C
{code}
If rebalancing happens and the co-ordinator assigns the partition to a 
different instance, is the client still subscribe to the partition?
Rebalance can happen 1) between A and B, 2) between B and C, or 3) after C.


 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639665#comment-14639665
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

Hmm, good point. When rebalance come into place, things can be a little bit 
tricky. That said, this could also be tricky with pause/unpause.

It might be useful to track the contents in the two topic partition maps as 
well.
[1] subscribedPartitions
[2] assignedPartitions

The tricky part here is whether a rebalance should not override the suppression 
or not. During rebalance, consumer needs to compare the newly assigned 
partitions with [2] before rebalance to see if the partition is already in [2] 
or not.
* When a partition remains assigned to a consumer (partition is in [2] before 
rebalance), the rebalance should keep the current state of that partition 
unchanged.
** If the partition is in [2] but not in [1] (under suppression), the rebalance 
should keep it as is - the partition should not be add to [1] since that will 
override the suppression.
** If the partition is in both [1] and [2], the rebalance should also keep it 
as is.
* When a partition is not in [2] before rebalance, add the partition to both 
[1] and [2].
* When a partition has been assigned to some other consumer, the rebalance will 
remove that partition from both [1] and [2]. The later on operation will 
receive exception.

Write that in code would be something like:
{code}
for (TopicPartition tp : assignedPartitions) {
if (!newlyAssignedPartitions.contains(tp)) {
subscribedPartitions.remove(tp)
assignedPartitions.remove(tp)
}
}
for (TopicPartition tp : newlyAssignedPartitions) {
if(!assignedPartitions.contains(tp)) {
subscribedPartitions.add(tp)
assignedPartitions.add(tp)
}
}
{code}

Assuming that, here would be the behavior:

* rebalance occured between A and B. (partition is in both [1] and [2])
** After rebalance, if the partition has been assigned to some other consumer 
(partition is removed from both [1] and [2]), unsubscribe in B will either have 
no effect or get an exception. In that case, user might need to catch the 
exception and call assignedPartitions() again to see which partition they want 
to unsubscribe.
** After rebalance, if the partition is still assigned to this consumer 
(partition remains in both [1] and [2]), unsubscribe in B will work (partition 
will be removed from [1] but still in [2]).

* rebalance occured between B and C. (partition is not in [1] but in [2])
** After the rebalance if the partition has been assigned to some other 
consumer (partition is neither in [1] or [2]), subscribe in C will receive 
exception because the partition is not assigned to this consumer.
** After the rebalance if the partition remains assigned to this consumer, this 
rely on the trick mentioned above. Rebalance will not override the suppression 
in B. So C will subscribe to the partition successfully (partition will be in 
[1] and [2])

* rebalance occured after C
** After the rebalance if the partition has been assigned to some other 
consumer, partition will be removed from both [1] or [2]. Consumer will not be 
consuming from the partition anymore.
** After the rebalance if the partition remains assigned to this consumer, 
partition will stay in [1] and [2]. Consumer will keep consuming from the 
partition.



 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639796#comment-14639796
 ] 

Jason Gustafson commented on KAFKA-2350:


I agree that even if we use pause/unpause, we have to solve some of the same 
issues. For me, I just feel that mixing the semantics of pause with 
subscription is not intuitive. Unsubscribe(partition) means I am no longer 
interested in that partition. Pause(partition) means I am still interested, but 
I would like to hold off messages for now. These are different concepts, and 
I'm not convinced that they are even functionally equivalent. For example, I 
expect that I should be able to still call seek(partition) and 
position(partition) even if that partition has been paused. Would I still be 
able to do that if I had unsubscribed from the partition? In the case of 
automatic assignment, perhaps we would say that you can still seek on an 
unsubscribed (i.e. paused) partition as long as it's still assigned. But can 
you still call seek on an unsubscribed partition when you're not using 
automatic assignment? To be consistent, we'd have to say yes, but that just 
seems weird. Or if we say no, then we have no way to seek on a paused (umm.. 
unsubscribed?) position when not using automatic assignment. 

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639746#comment-14639746
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~guozhang], my understanding is that pause/unpause will also need to do the 
call trace, right? Otherwise if user are not using consumer coordinator, what 
would pause/unpause do for them? Is that exactly the same as 
subscribe/unsubscribe at partition level? Also, we have to do sub/unsub call 
trace anyway, otherwise, how can we enforce mutual exclusion of auto/manual 
partition assignment mode?

In terms of rebalance issue. Do you mean we can avoid the issue with 
pause/unpause?
I see the same issue. Consider the following:
{code}
subscribe(topicA);
// then say assignment() return me topicA-partition1.
poll(); // Say partition has been assigned to other consumer
pause(topicA-partition1); // Should we throw exception?
unpause(topicA-partition1); // Should we throw exception here?
{code}

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639424#comment-14639424
 ] 

Jason Gustafson commented on KAFKA-2350:


[~becket_qin] It would help if you would sketch out how 
subscribe/unsubscribe(topic) and subscribe/unsubscribe(partition) interact with 
the two collections you are trying to maintain. Also is it the union of these 
two sets that will be fetched and returned to the user? 

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639307#comment-14639307
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

Ah, I see your concern. Originally I was thinking that we are still going to 
make self partition control and consumer coordinator partition assignment 
mutually exclusive. So if subscribe(partition) has been called, 
subscribe(topic) will throw exception.

Now I think allowing mixed mode might be an interesting feature to support. Let 
me take a shot to address the use cases you raised.

We maintain the following two data structure:
{code}
MapTopic, TopicPartition assignedPartitions- A
SetTopic, TopicPartition subscribedPartitions  - B
{code}
The only rule we have is that you can not do both MANUAL and AUTO partition 
subscription on the same topic. 
The validity of sub/unsub to a topic are as below:
* In A, in B - partition is assigned by coordinator, sub/unsub works for both 
topic and partition level
* In A, not in B - Topic is subscribed manually, sub/unsub at topic level will 
get exception, sub/unsub at partition level works.
* Not in A, in B - Topic is assigned by coordinator, but suppressed by user, 
sub/unsub to topic level works, sub/unsub to partition level only works when 
partition is in B.
* Not in A, not in B - brand new topic, sub/unsub at topic and partition level 
works.

We can discuss whether it is legitimate to unsubscribe from a partition it is 
not assigned/subscribed to, but that is an orthogonal issue.

So for the cases you mentioned.
* case #1
{code}
subscribe(topic)
unsubscribe(partition)
{code}
means give me a set of partitions and suppress one of them - just like what you 
said.

* case 2
{code}
subscribe(partition)
subscribe(topic)
{code}
means give me this partition, plus whatever coordinator gives me.
However, if the topic user is trying to subscribe to happened to be the same 
topic of the partition it has already subscribed to in the first line, that is 
an exception - you can not do manual and automatic partition subscription at 
the same time. (In this case, the topic would be in subscribedPartitions but 
not in assignedPartitions)

* case 3
{code}
unsubscribe(partition)
subscribe(topic)
{code}
is a little bit weird sequence at the first place. How could you suppress a 
partition while you don't even know if the partition will be assigned to you?
Anyway, so here is the behavior. Unsubscribe to a partition will go through. 
But after calling subscribe(topic), the assigned topic partition will not be 
suppressed.
However, if user is currently subscribing to a partition manually from that 
topic (in subscribedParititions, not in assignedPartitions)

* case 4
{code}
subscribe(partition)
subscribe(topic)
unsubscribe(partition)
{code}
The first two subscribe is described in case 2. The partition will be 
suppressed after unsubscribe call.

* case 5
{code}
subscribe(partition)
unsubscribe(partition)
subscribe(topic)
{code}
If the first two lines are called for the same partition. it is equivalent to 
just call subscribe(topic) - so the partition won't be suppressed. If the first 
two lines are called for different partitions. It is then again a little bit 
weird for use to suppress a partition that it does not even know whether it 
will be assigned. But the partition won't be suppressed.
 


 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639080#comment-14639080
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~becket_qin] I think there are three proposals:
1. Add pause/unpause
2. Allow subscribe(topic) followed by unsubscribe(partition) to subscribe to a 
topic but suppress a partition
3. Either of the above but making the use of group management explicit using an 
enable.group.management flag

I'm in favor of (2) if it is possible to work out the corner cases and the 
implementation doesn't cause us to go crazy. I think you are saying that having 
unsubscribe mean suppress is actually somewhat sensible. I'm in favor of (1) 
otherwise. I'm not in favor of (3) because the two different modes.

Let me point out a few of the weird bits of (2), though. Since we now allow 
mingling of subscribe(topic) and subscribe(partition) we have to work out all 
the combinations. The case where you do
  subscribe(topic)
  unsubscribe(partition)
is really clear it means give me a set of partitions but suppress this 
particular partition. Likewise I think 
  subscribe(partition)
  subscribe(topic)
also makes sense. You are saying give me this particular partition plus 
whatever else the coordinator assigns me. But what about
  unsubscribe(partition)
  subscribe(topic)
do you still get the same suppression effect? But now this is a bit weird:
  subscribe(partition)
  subscribe(topic)
  unsubscribe(partition)
Does the unsubscribe call suppress the partition or not? The first two calls 
normally mean subscribe me to whatever the co-ordinator gives me plus a given 
partition. The last two calls normally mean subscribe me to whatever the 
co-ordinator gives me except this given partition. But is the result of 
combining these two the same as 
   subscribe(partition)
   unsubscribe(partition)
   subscribe(topic)
In other words I think all this implies that there are now three states for a 
partition: SUBSCRIBED, NOT_SUBSCRIBED, SUPPRESSED? This is what I think we'd 
have to work out to make your proposal feasible.


 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-23 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639085#comment-14639085
 ] 

Gwen Shapira commented on KAFKA-2350:
-

[~becket_qin] - when I imagined how the API would work, I assumed that when the 
coordinator is not used, pause/resume would do nothing.
The whole point was to keep sending heartbeats without consuming, to avoid a 
rebalance, so this is rather meaningless without a coordinator, heartbeats and 
rebalances. Of course, if someone calls it accidentally, nothing bad will 
happen.



 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637126#comment-14637126
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~becket_qin] Cool, we're on the same page, that was how I interpreted what you 
said. There is definitely a sense in which this is more elegant but there is a 
little complexity since you need to keep a list of surprised partitions and 
need to populate that when unsubscribe(partition) is called if that topic is 
subscribed to.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637219#comment-14637219
 ] 

Yasuhiro Matsuda commented on KAFKA-2350:
-

I think overloading subscribe/unsubscribe is very confusing. 
Subscribe/unsubscribe and pause/unpause are two very different behaviors.  And 
overloading same method names is not really simplifying the API. I want 
pause/unpause to be a pure flow control. It shouldn't be mix up with 
subscription.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637317#comment-14637317
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~hachikuji] Yeah I agree that would help this case. The nice thing about that 
proposal is it would make the group management explicit which could be nice. I 
wonder if that might not add more things that can go wrong in the common case, 
though. i.e. right now the common case of just subscribing to a topic and 
letting the group management figure out the assignment and it is kind of hard 
to mess that up. All the cases where either you subscribe to individual 
partitions or you pause partitions are kind of niche uses so maybe it is less 
important to optimize for those cases?

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637338#comment-14637338
 ] 

Gwen Shapira commented on KAFKA-2350:
-

Agree with [~yasuhiro.matsuda]. 

The notion of subscribe vs assigned is a bit challenging to grasp as is 
([~guozhang] demostrated that nicely), adding flow-control mechanics into it 
will be messy and difficult for users to get right. 
I think we all hope that in most cases users will subscribe and unsubscribe to 
entire topics and let the coordinator handle the details (thereby reducing 
use-errors, mailing list cries for help, etc). Adding APIs that will cause the 
opposite to happen is a step in the opposite direction. 

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637179#comment-14637179
 ] 

Guozhang Wang commented on KAFKA-2350:
--

Currently there is already a function for retrieving the subscribed topic 
partitions today:

{code}
public SetTopicPartition subscriptions() {
acquire();
try {
return 
Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
} finally {
release();
}
}
{code}

which will for example remove the partition and hence change the returned 
values if consumer.unsubscribe(partition) is called.

I actually think [~becket_qin]'s approach will not cause much confusion 
regarding the APIs. More explicitly assuming we add another function 
assignment() that returns you the assigned partitions, the semantics of the 
other APIs will be:

{code}
consumer.subscribe(topic); // will not throw any exception, but will update the 
assignment as well as subscription in the next poll.

consumer.unsubscribe(topic);  // will throw an exception if the topic is not 
subscribed; otherwise will update the assignment and the subscription in the 
next poll.

consumer.assignment(); // return the assigned partitions

consumer.subscriptions();  // return the subscribed partitions, which is the 
same to the assigned partitions most of the time

consumer.subscribe(partition1);  // will throw an exception if partition is not 
in assignment(), saying it is not assigned to you

consumer.unsubscribe(partition2);  // will throw an exception if partition is 
not in subscriptions(), saying it is not subscribed by yourself
{code}

What I am more concerned about this approach is about the client 
implementation. Since it allows a client to be both using Kafka partition 
assignment and not during its life cycle, this could possibly make the client 
state more complicated to manage. For example:

{code}
consumer.subscribe(topic1); // using kafka for assignment, say we are assigned 
topic1-partition1 and topic1-partition2
consumer.poll();
consumer.subscribe(topic2-partition1); // subscribe to another partition 
explicitly without letting kafka coordinator to be aware of.
consumer.unsubscribe(topic1-partition1);  // now the subscription is 
topic1-partition2 and topic2-partition1, where the first is from Kafka 
assignment and the second is from explicit subscription.
{code}

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637187#comment-14637187
 ] 

Jason Gustafson commented on KAFKA-2350:


[~becket_qin] [~jkreps] Currently automatic assignment is inferred based on 
which subscribe methods are invoked (e.g. if you subscribe to a topic, then we 
assume you want automatic assignment). I wonder if it might help to make that 
instead an explicit configuration parameter? Then that might free us a little 
bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we 
don't have to guess whether the user is intending to actually subscribe to a 
partition or just to pause it. 

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637753#comment-14637753
 ] 

Jason Gustafson commented on KAFKA-2350:


[~jkreps] This is a little unrelated to this jira, but one nice consequence of 
moving group management into configuration is that it opens the door to using 
subscribe(topic) as a shortcut to subscribing to all of a topic's partitions 
for users who do not want to use group management (which does have a little 
overhead). Currently the user would have to get all the partitions for that 
topic and then subscribe to them individually. Not that bad, but a tad 
annoying, especially if you have to poll for changes in the number of 
partitions. 

Anyway, I tend to agree with [~yasuhiro.matsuda] and [~gwenshap] that trying to 
mix partition and topic subscriptions in order to do pausing seems problematic. 
The explicit pause/unpause methods might not be as elegant, but I think they 
are easier for users to understand and are much easier for us to implement.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637792#comment-14637792
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~guozhang] In the last case you mentioned, is topic2-partition1 one of the 
partitions assigned by consumer coordinator? If it is not, the subscription 
will fail because the partition is not in the assignedTopicpartitions set.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638063#comment-14638063
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

Yes, that is what I meant.

Isn't it the case that subscribe(partition) always check the if 
subscribe(topic) has been called or not?

If user wants to subscribe/unsubscribe to an illegal partition, I kind of think 
it is easy to understand that we just throw exception and say Consumer is 
using Kafka based offset assignment and topic partition tp is not an assigned 
partition.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638070#comment-14638070
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

I think an explicit configuration makes sense. Different subscribe/unsubscribe 
methods confuses people from time to time.

So if enable.consumer.coordinator==false, user are on their own, all the 
subscribe/unsubscribe methods will work.

But what not clear to me is that if enable.consumer.coordinator==true, will 
user be able to call subscribe/unsbuscribe(partitions)?

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638139#comment-14638139
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~hachikuji] yeah i vote for pause/resume (or unpause).

I think the challenge with making group management explicit is that then it 
will conflict with the api usage. I.e. the user will say 
client.subscribe(mytopic) and something terrible and bad will happen and they 
won't know why and we will say uh uh uh you forgot to set the magic 
enable.consumer.coordinator=true flag. This was kind of what I liked about the 
implicit approach--the natural usage of just subscribing to a partition or 
topic does what you would expect. I would anticipate making this a flag turning 
into a source of confusion because in the early stages of usage most people 
won't know what a consumer coordinator is.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638182#comment-14638182
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~yasuhiro.matsuda] [~jkreps] [~hachikuji] [~gwenshap] If we are using 
pause/unpause, does that mean when user are not using consumer coordinator, 
they are equivalent to subscribe(partitions)/unsubscribe(partitions)?

I still don't understand why this is an overloading of current 
subscribe/unsubscribe API. The way I see how the KafkaConsumer API works is we 
have different methods to set different fields of a fetch request(offsets, 
topic, partitions), then we do a poll() using those settings. 

To me the definition of all the subscribe/unsubscribe methods stay unchanged:
* Subscrube/Unsubscribe to a TOPIC means
** Involve consumer coordinator to do partition assignment
** Consumer rebalance will be triggered
* Subscribe/Unsubscribe to a PARTITION means
** Do not involve consumer coordinator
** Consumer rebalance will not be triggered.

The only change is that instead of naively reject a PARTITION sub/unsub when 
consumer coordinator is involved, we allow users to decide whether they want to 
change the setting for your next poll() to exclude some topic partitions that 
have been assigned to this consumer.

Therefore I don't see why using subscribe(partitions)/unsubscribe(partitions) 
for pause and unpause consuming is a behavior re-definition. It looks to me 
that pause/unpause does the exact same thing as partition level 
subscribe/unsubscribe but we are adding them simply because we think user are 
using them for different use case. if so, does it mean we need to add yet 
another pair of interface if people are subscribe/unsubscribe partitions for 
some other use case? Then we are going to end up with a bunch of interfaces 
doing very similar or even exact same thing but with different names based on 
the use case. 

If the reason we don't like to use sub/unsub is because their names sound like 
purpose oriented and indicate a particular use case, we can change the name to 
something like addTopicPartition()/addTopic() (I know I am a terrible name 
picker, but hopefully you get what I wanted to say).

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637890#comment-14637890
 ] 

Guozhang Wang commented on KAFKA-2350:
--

[~becket_qin] OK I understand now. The subscribe(partition) will check if 
subscribe(topic) has been called before, hence assignment() is not null. I 
think behavior is a bit confusing since it depends on the call history of 
subscribe..

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-21 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636098#comment-14636098
 ] 

Jay Kreps commented on KAFKA-2350:
--

+1 on resume instead of unpause though it doesn't match subscribe/unsubscribe.

The original motivation for this was to be able to subscribe at the topic level 
but be able to say that while you're still subscribed to a given partition you 
can't take more data for that partition at this particular moment. Generalizing 
that to allow pausing a whole topic makes sense too.

[~becket_qin] I think your idea is having unsubscribe(partition) have the same 
effect as pause(partition) when you are subscribed at the topic level would be 
intuitive, but the logic of how that would work might be a bit complex. If 
someone is smart enough to work out the details that could be more elegant than 
a new api. The challenge is that partition level subscribe/unsubscribe is 
currently an error if you are subscribed at the topic level and the details of 
that control whether group management etc is used too.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-21 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636105#comment-14636105
 ] 

Jason Gustafson commented on KAFKA-2350:


Hey [~becket_qin], thanks for the suggestion. I think my only concern is that 
this would make the API more confusing. It would give two meanings to 
subscribe(partition) which depend on whether automatic assignment is used. I 
agree with you about minimizing the complexity of the consumer API, but I'd 
probably rather have the explicit methods if we think the use case is valid.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-21 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636319#comment-14636319
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~jkreps][~hachikuji], I actually was not proposing reuse
{code}
void subscribe(String topic)
void unsubscribe(String topic)
{code}
So I was thinking we follow the current convention which is:
1. If you are subscribing/unsubscribing to a partition explicitly, you are on 
your own
2. If you are subscribing/unsubscribing to a topic, we use consumer coordinator 
for partition assignment.

I assume the only use case we are trying to address is when user is using 
consumer coordinator and want to temporarily stop consuming from a topic 
without triggering a consumer rebalance.

If so, to unsubscribe from a topic we can do something like fowllowing
{code}
...
for(TopicPartition tp : consumer.assignedTopicPartitions.get(topic)) {
unsubscribe(tp);
}
{code}
To resubscribe, we can do the similar but just call subscribe(tp) instead

This approach might need to expose an interface of assignedTopicPartitions(), 
but I can see that useful in quite a few use cases.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-21 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636074#comment-14636074
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

I am thinking that currently we keep two collections of topic partitions in 
KafkaConsumer, one for user subscription, the other for coordinator assignment. 
Can we do something to the existing code to let subscribe/unsubscribe support 
pause/unpause as well?

Maybe we can have one subscription set and one assigned partition validation 
set.
{code}
void subscribe(String topic)
void unsubscribe(String topic)
{code}
will affect both assigned partition set and subscription set. If Kafka based 
partition assignment is not used, assigned partition set will be null.

{code}
void subscribe(TopicPartition... partitions)
void unsubscribe(TopicPartition... partitions)
{code}
will only change the subscription set. Calling them won't trigger rebalance. 
But the topics subscribed to has to be in assigned partition set if it is null.

In this way, user can simply use
{code}
void subscribe(TopicPartitions... partitions)
void unsubscribe(TopicPartitions... partitons)
{code}
to do the pause and unpause.

Some other benefits might be:
1. We don't add two more interface to the already somewhat complicated API.
2. We get validation for manual subscription.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-20 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634124#comment-14634124
 ] 

Jason Gustafson commented on KAFKA-2350:


Sure, I just meant that if you fail to call poll() periodically (in order to 
pause consumption), then no heartbeats can be sent, which will cause the 
coordinator to rebalance. This only applies if you are using assignment from 
the coordinator.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-20 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634114#comment-14634114
 ] 

Gwen Shapira commented on KAFKA-2350:
-

Cool feature :)

Can you clarify:  If you skip poll() ... then a rebalance will be triggered 

When does a delay count as skipping? Are we obligated to do the next poll() 
immediately after the first one ended?
I expect to use the consumer to do something like: 
poll until I get N messages, write those messages elsewhere, poll again. 
If the write messages elsewhere takes longer than expected (DB is busy kinda 
scenario), the consumer will lose the partitions?

(sorry if I missed important discussion elsewhere, feel free to refer me to 
another JIRA or thread)

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-20 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634126#comment-14634126
 ] 

Gwen Shapira commented on KAFKA-2350:
-

oh, for some reason I expected heartbeats to be handled in a separate consumer 
thread.
Not sure why though, so never mind :)

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-20 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634169#comment-14634169
 ] 

Jason Gustafson commented on KAFKA-2350:


There will probably be a large number of users with the same expectation 
(especially since that's how the old consumer works). We'll have to make sure 
the documentation is pretty clear on this point.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-20 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634229#comment-14634229
 ] 

Yasuhiro Matsuda commented on KAFKA-2350:
-

Can we have TopicPartition rather than String for finer control?
{code}
void pause(TopicPartition... partitions)
void unpause(TopicPartition... partitions)
{code}

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-20 Thread Edward Ribeiro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634296#comment-14634296
 ] 

Edward Ribeiro commented on KAFKA-2350:
---

Nit: wouldn't the inverse of ``pause`` to be ``resume``?

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-20 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634294#comment-14634294
 ] 

Jason Gustafson commented on KAFKA-2350:


I don't see why not. In that case, I assume you would expect the paused state 
to be preserved through a rebalance?

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-20 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634322#comment-14634322
 ] 

Yasuhiro Matsuda commented on KAFKA-2350:
-

To me the state preservation is not a requirement. The application code should 
track pausing states. I feel that a consumer should reset pausing state after a 
rebalance.

 Add KafkaConsumer pause capability
 --

 Key: KAFKA-2350
 URL: https://issues.apache.org/jira/browse/KAFKA-2350
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson

 There are some use cases in stream processing where it is helpful to be able 
 to pause consumption of a topic. For example, when joining two topics, you 
 may need to delay processing of one topic while you wait for the consumer of 
 the other topic to catch up. The new consumer currently doesn't provide a 
 nice way to do this. If you skip poll() or if you unsubscribe, then a 
 rebalance will be triggered and your partitions will be reassigned.
 One way to achieve this would be to add two new methods to KafkaConsumer:
 {code}
 void pause(String... topics);
 void unpause(String... topics);
 {code}
 When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
 new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)