[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992208#comment-14992208 ] Jiangjie Qin commented on KAFKA-2350: - [~hachikuji] [~guozhang] I noticed that currently if we pause a partition, we only pause data fetch, but not pausing offset commit. Should we also pause offset commit for paused partitions? The motivation can be explained with the following example: In mirror maker, if a message send failed due to reason such as message size too large, today to ensure no data loss, we need to stop the entire mirror maker. But with pause, we can simply pause consumption from that partition without shutting down the whole pipeline. The problem is that the failed message has already been delivered, so when we do producer.flush then commit, that message will be committed. Later on if rebalance occurs and another consumer takes over this partition, that failed message will be lost. The workaround today is to: 1. Disable auto commit 2. Maintain an external offset map to commit offset with a map explicitly. 3. Implement a consumer rebalance listener to clean up the external map once rebalance occurs. This is very involved and we probably don't want to force user to keep an external map to solve this issue because it could be a very common consumer-process-write_to_downstream pattern. I suggest we do the following: 1. Stop committing offsets for a paused partition. 2. When pause() is called, if auto commit is turned on, we commit offset for the partition being paused. This way if user saw an error and does not want to pause a partition without losing messages, they can simply do {code} commitSync(Map(partition->offset_of_failed_message)); pause(partition) {code} When rebalance occurs, another consumer will see the failed message again and will again pause this partition. The only difference here is that user need to commit offset for a paused partition first if they are not using auto commit. But for those users who disable auto commits, they probably do care about the data loss. So it is much simpler than asking them to maintain an external offset map. Thoughts? > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson > Fix For: 0.9.0.0 > > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992357#comment-14992357 ] Guozhang Wang commented on KAFKA-2350: -- Could this be resolved by calling {code} consumer.seek(record.offset - 1) consumer.pause(...) {code} So that we will only committing up to the previous message. Note that the previous message does not need to be existed: if let's say we fetch a compacted topic with message offsets (0, 5, 10), and we got a error on message with offset 10, seeking and restarting at offset 9 will let us to fetch message 10 again. > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson > Fix For: 0.9.0.0 > > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992452#comment-14992452 ] Jiangjie Qin commented on KAFKA-2350: - Ah, right. That would work. It still a bit weird we commit the same offset repeatedly, but it is probably OK. > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson > Fix For: 0.9.0.0 > > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992466#comment-14992466 ] Guozhang Wang commented on KAFKA-2350: -- I think this can be fixed by filtering out the partitions whose consumed value is the same as the committed value, since we have the committed position in the subscription state as well. > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson > Fix For: 0.9.0.0 > > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip calls to poll() or if you unsubscribe, then > a rebalance will be triggered and your partitions will be reassigned to > another consumer. The desired behavior is instead that you keep the partition > assigned and simply > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(TopicPartition... partitions); > void resume(TopicPartition... partitions); > {code} > Here is the expected behavior of pause/resume: > * When a partition is paused, calls to KafkaConsumer.poll will not initiate > any new fetches for that partition. > * After the partition is resumed, fetches will begin again. > * While a partition is paused, seek() and position() can still be used to > advance or query the current position. > * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648329#comment-14648329 ] ASF GitHub Bot commented on KAFKA-2350: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/100 Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 0.8.3 There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645343#comment-14645343 ] ASF GitHub Bot commented on KAFKA-2350: --- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/100 KAFKA-2350; KafkaConsumer pause/resume API You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-2350 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/100.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #100 commit a82b60a48f47a24b55d5b07ddb1a22fbcf52 Author: Jason Gustafson ja...@confluent.io Date: 2015-07-29T00:58:10Z KAFKA-2350; KafkaConsumer pause/resume API Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643249#comment-14643249 ] Guozhang Wang commented on KAFKA-2350: -- [~becket_qin], I was not considering the implementation from a developer point of view regarding call trace, but rather from a user point of view. That is, with two different names it is more clear about the distinction between topic / partition subscriptions. For another example, say you write some applications with a consumer client embedded, and very likely call its function from many places in your app code / classes. When you saw an exception thrown from unsubscribe(partition), you need to possibly look at other places and check if it is case 1) you used subscribe(topic), but it is not assigned from Kafka, 2) you used subscribe(partition) on some other partitions, but you did not subscribe this before. Similarly, if you saw an exception thrown on your subscribe(partition) call, you need to check if 1) you called poll() in between so that partition is no longer assigned; 2) you called subscribe(topic) before and that partition is not one of the assigned partitions. I.e., you as the developer needs to check what consumer function calls it has done (the trace) before in order to trouble-shoot, when the code is not necessarily written by yourself. With pause / resume, it will be more clear that the consumer is certainly using topic subscriptions, and the partition is no longer assigned to you if those functions throw an exception because of a rebalance, etc. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643442#comment-14643442 ] Jason Gustafson commented on KAFKA-2350: [~becket_qin] I think that we're on the same page as far as supporting only automatic or manual assignment and not trying to mix them. I think my confusion is that subscribe(partition) in your proposal is used both a) to subscribe to a partition when manual assignment is used, and b) to unpause a partition when automatic assignment is used. This leads to the weird ordering problems that we have been talking about. By the way, I added the line about seek() and position() since it seems like something that intuitively should be supported by pause semantics. I think it's debatable whether it's really needed, but I think it would cause a bit a surprise to the user if we didn't allow it. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643349#comment-14643349 ] Jason Gustafson commented on KAFKA-2350: There's one interesting implementation note that [~yasuhiro.matsuda] and [~guozhang] brought up. When a partition is unpaused, there may be an active fetch which is parked on the broker. In the current implementation, the consumer will not initiate any new fetches until that fetch has completed. This means that the consumer will not be able to immediately process messages from the unpaused partition even if has some available. There are a couple ways this could be handled. We could issue the new fetch from a different socket on the client. We could also implement a way to cancel or override the active fetch on the broker. Since both of these make this patch significantly more complex, I think we should just note this limitation in the documentation and address it later if it becomes a larger problem. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another consumer. The desired behavior is instead that you keep the partition assigned and simply One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(TopicPartition... partitions); void resume(TopicPartition... partitions); {code} Here is the expected behavior of pause/resume: * When a partition is paused, calls to KafkaConsumer.poll will not initiate any new fetches for that partition. * After the partition is resumed, fetches will begin again. * While a partition is paused, seek() and position() can still be used to advance or query the current position. * Rebalance does not preserve pause/resume state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14643418#comment-14643418 ] Jiangjie Qin commented on KAFKA-2350: - [~hachikuji], I am with [~guozhang] that it is much clear if we only support either auto partition assignment or manual partition assignment, but not mixed mode. I assumed pause/unpause will only be used when auto partition assignment is used, so we can check the assigned partition set. If it is under manual partition assignment, for the seek(), I assume that you will start consume after seek? If so, it might be the same as: {code} subscribe(tp) seek(tp, offset) poll() {code} [~guozhang], I see your point. I am convinced that for people who are using auto partition assignment, pause/unpause is more intuitive than using partition level sub/unsub. I am not really oppose to having them. What I was worrying is that we are adding methods that are intuitive to some particular use case, but potentially open the door for adding APIs that only have subtle differences. If we take a closer look at our API, there are potentially some other cases we can argue for new API. e.g. user might want to have auto commit turn on only for some but not all of the partitions they subscribed to. User might want to find a list of offsets of a partition within a time range. These are all different use cases, but likely can be solved with some lower level API calls instead of having a dedicate intuitive API for each of them. I kind of feel the dilemma we are facing now is that in new consumer, we try to address both the high level consumer and low level consumer use cases. pause/unpause looks to me a medium-to-low level use case. As the higher level requirements can vary a lot and have subtle difference from one to another, the question to be answered is that should we expose a high level interface for each of the high level use case? Or should we just ask user to use a lower level API as long as we support the functionality. My understanding is that for high level consumer use cases, hopefully we don't need user to care too much about the underlying mechanism. For people who wants to deal with lower level concept such as offsets, partition assignment, temporary consumption suspension, instead of having on high level API written for each of the use cases, letting user use a lower level API makes sense to me. In terms of the example you mentioned, can we solve them by throwing appropriate exceptions? {code} // Auto partition assignment subscribe(topic1) // assuming only topic1-partition0 is assigned to this consumer. subscribe(topic1-partition1) // throw IllegalStateException(Cannot subscribe to topic1-partition1 because topic1 is managed by consumer coordinator) unsubscribe(topic1-partition1) // throw IllegalStateException(Topic1-partiion1 is managed by consumer coordinator, and topic1-partition1 is not assigned to this consumer.) {code} {code} subscribe(topic1-partition0) subscribe(topic1) // throw IllegalStateException(Cannot subscribe to topic1 because the assignment of topic1 is manually managed) unsubscribe(topic1-partition1) // throw IllegalStateException(Cannot unsubscribe topic1-partition1 because it is not subscribed) {code} [~nehanarkhede], what you said makes a lot of sense. I guess I'm just looking at the problem from a different angle - user either wants to consume from a topic or not at a certain point, whether temporarily or permanently. So the state I see is only CONSUME/NOT_CONSUME. The PAUSE state for consumer would be pretty much the same as NOT_CONSUME. I might have missed some use case like [~hachikuji] mentioned - seek() on a PAUSE partition, but that can be solved by calling subscribe() first. [~gwenshap], good point about heartbeat. We actually got some feedback from users in LinkedIn and found that putting the responsibility of sending heartbeat on user might be a problem in the first place... We may have pause/unpause as a workaround, but the ultimate issue is that maybe we are asking too much from user to maintain the heartbeat... Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip calls to poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned to another
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14642229#comment-14642229 ] Neha Narkhede commented on KAFKA-2350: -- Few thoughts: bq. 1. Add pause/unpause bq. 2. Allow subscribe(topic) followed by unsubscribe(partition) to subscribe to a topic but suppress a partition bq. 3. Either of the above but making the use of group management explicit using an enable.group.management flag I'm in favor of 1. for several reasons: 1. It keeps the API semantics clean. subscribe/unsubscribe indicates intent to consume data, while pause/resume indicates a *temporary* preference for the purposes of flow control. 2. It avoids all the different permutations of subscribe/unsubscribe that we will need to worry about and each one of those would have to make sense and be explained clearly to the user. This discussion is confusing enough that I'm convinced that it will not be easy. 3. pause/resume moves the consumer to a different state in its state diagram. Overloading the same API to represent two different states is unintuitive. Also +1 on - 1. Renaming unpause to resume. 2. Not maintaining the pause/resume preference across consumer rebalances. There may be complications in the implementation of the above preferences that I may have overlooked, but I feel we should design APIs for the right behavior and figure out the implementation related issues that might come up as a result. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14640958#comment-14640958 ] Yasuhiro Matsuda commented on KAFKA-2350: - Throwing exceptions makes sense. In addition, I think a consumer should not keep pause/unpause states across rebalance. Forgetting the states makes the consumer/application logic cleaner. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639657#comment-14639657 ] Guozhang Wang commented on KAFKA-2350: -- Personally I would prefer not mixing the topic subscription (i.e. use Kafka for assignment) with partition subscription (i.e. not using Kafka for assignment) since I feel its flexibility benefits is overwhelmed by the complexity of API semantics, mainly because (again, this is totally personally) I think there will be very few use cases that need both during on life cycle of a consumer: with the new consumer implementation, you can always have two consumers, one for topic subscription and one for partition subscription since the construction cost is quite low. With that assumption that we are not going to support this mixing, I think what I was mainly concerned (also probably as Jay intended to describe in case 3 above) is that the we will be effectively making the logic of whether throwing exception or not with subscribe(topic) and subscribe(partition) at the same time based on the call trace, for example: {code} subscribe(topicA); unsubscribe(topicA-partition1); subscribe(topicA-partition1); // this will be OK subscribe(topicC-partition1); // this will be not OK, i.e. the logic depends on what subscribe / unsubscribe() you have called before {code} Plus, what [~yasuhiro.matsuda] mentioned is also a valid point, though rebalance will only happen during poll call. It is still wired to get {code} subscribe(topicA); // then say assignment() return me topicA-partition1. poll(); unsubscribe(topicA-partition1); // may throw an exception here, so you need to get the assignment each time before you try to unsubscribe / subscribe, since each poll() may change the assignment. {code} Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639460#comment-14639460 ] Yasuhiro Matsuda commented on KAFKA-2350: - Suppose we are using auto assignment. {code} subscribe(topic) // A unsubscribe(partition) // B subscribe(partition) // C {code} If rebalancing happens and the co-ordinator assigns the partition to a different instance, is the client still subscribe to the partition? Rebalance can happen 1) between A and B, 2) between B and C, or 3) after C. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639665#comment-14639665 ] Jiangjie Qin commented on KAFKA-2350: - Hmm, good point. When rebalance come into place, things can be a little bit tricky. That said, this could also be tricky with pause/unpause. It might be useful to track the contents in the two topic partition maps as well. [1] subscribedPartitions [2] assignedPartitions The tricky part here is whether a rebalance should not override the suppression or not. During rebalance, consumer needs to compare the newly assigned partitions with [2] before rebalance to see if the partition is already in [2] or not. * When a partition remains assigned to a consumer (partition is in [2] before rebalance), the rebalance should keep the current state of that partition unchanged. ** If the partition is in [2] but not in [1] (under suppression), the rebalance should keep it as is - the partition should not be add to [1] since that will override the suppression. ** If the partition is in both [1] and [2], the rebalance should also keep it as is. * When a partition is not in [2] before rebalance, add the partition to both [1] and [2]. * When a partition has been assigned to some other consumer, the rebalance will remove that partition from both [1] and [2]. The later on operation will receive exception. Write that in code would be something like: {code} for (TopicPartition tp : assignedPartitions) { if (!newlyAssignedPartitions.contains(tp)) { subscribedPartitions.remove(tp) assignedPartitions.remove(tp) } } for (TopicPartition tp : newlyAssignedPartitions) { if(!assignedPartitions.contains(tp)) { subscribedPartitions.add(tp) assignedPartitions.add(tp) } } {code} Assuming that, here would be the behavior: * rebalance occured between A and B. (partition is in both [1] and [2]) ** After rebalance, if the partition has been assigned to some other consumer (partition is removed from both [1] and [2]), unsubscribe in B will either have no effect or get an exception. In that case, user might need to catch the exception and call assignedPartitions() again to see which partition they want to unsubscribe. ** After rebalance, if the partition is still assigned to this consumer (partition remains in both [1] and [2]), unsubscribe in B will work (partition will be removed from [1] but still in [2]). * rebalance occured between B and C. (partition is not in [1] but in [2]) ** After the rebalance if the partition has been assigned to some other consumer (partition is neither in [1] or [2]), subscribe in C will receive exception because the partition is not assigned to this consumer. ** After the rebalance if the partition remains assigned to this consumer, this rely on the trick mentioned above. Rebalance will not override the suppression in B. So C will subscribe to the partition successfully (partition will be in [1] and [2]) * rebalance occured after C ** After the rebalance if the partition has been assigned to some other consumer, partition will be removed from both [1] or [2]. Consumer will not be consuming from the partition anymore. ** After the rebalance if the partition remains assigned to this consumer, partition will stay in [1] and [2]. Consumer will keep consuming from the partition. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639796#comment-14639796 ] Jason Gustafson commented on KAFKA-2350: I agree that even if we use pause/unpause, we have to solve some of the same issues. For me, I just feel that mixing the semantics of pause with subscription is not intuitive. Unsubscribe(partition) means I am no longer interested in that partition. Pause(partition) means I am still interested, but I would like to hold off messages for now. These are different concepts, and I'm not convinced that they are even functionally equivalent. For example, I expect that I should be able to still call seek(partition) and position(partition) even if that partition has been paused. Would I still be able to do that if I had unsubscribed from the partition? In the case of automatic assignment, perhaps we would say that you can still seek on an unsubscribed (i.e. paused) partition as long as it's still assigned. But can you still call seek on an unsubscribed partition when you're not using automatic assignment? To be consistent, we'd have to say yes, but that just seems weird. Or if we say no, then we have no way to seek on a paused (umm.. unsubscribed?) position when not using automatic assignment. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639746#comment-14639746 ] Jiangjie Qin commented on KAFKA-2350: - [~guozhang], my understanding is that pause/unpause will also need to do the call trace, right? Otherwise if user are not using consumer coordinator, what would pause/unpause do for them? Is that exactly the same as subscribe/unsubscribe at partition level? Also, we have to do sub/unsub call trace anyway, otherwise, how can we enforce mutual exclusion of auto/manual partition assignment mode? In terms of rebalance issue. Do you mean we can avoid the issue with pause/unpause? I see the same issue. Consider the following: {code} subscribe(topicA); // then say assignment() return me topicA-partition1. poll(); // Say partition has been assigned to other consumer pause(topicA-partition1); // Should we throw exception? unpause(topicA-partition1); // Should we throw exception here? {code} Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639424#comment-14639424 ] Jason Gustafson commented on KAFKA-2350: [~becket_qin] It would help if you would sketch out how subscribe/unsubscribe(topic) and subscribe/unsubscribe(partition) interact with the two collections you are trying to maintain. Also is it the union of these two sets that will be fetched and returned to the user? Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639307#comment-14639307 ] Jiangjie Qin commented on KAFKA-2350: - Ah, I see your concern. Originally I was thinking that we are still going to make self partition control and consumer coordinator partition assignment mutually exclusive. So if subscribe(partition) has been called, subscribe(topic) will throw exception. Now I think allowing mixed mode might be an interesting feature to support. Let me take a shot to address the use cases you raised. We maintain the following two data structure: {code} MapTopic, TopicPartition assignedPartitions- A SetTopic, TopicPartition subscribedPartitions - B {code} The only rule we have is that you can not do both MANUAL and AUTO partition subscription on the same topic. The validity of sub/unsub to a topic are as below: * In A, in B - partition is assigned by coordinator, sub/unsub works for both topic and partition level * In A, not in B - Topic is subscribed manually, sub/unsub at topic level will get exception, sub/unsub at partition level works. * Not in A, in B - Topic is assigned by coordinator, but suppressed by user, sub/unsub to topic level works, sub/unsub to partition level only works when partition is in B. * Not in A, not in B - brand new topic, sub/unsub at topic and partition level works. We can discuss whether it is legitimate to unsubscribe from a partition it is not assigned/subscribed to, but that is an orthogonal issue. So for the cases you mentioned. * case #1 {code} subscribe(topic) unsubscribe(partition) {code} means give me a set of partitions and suppress one of them - just like what you said. * case 2 {code} subscribe(partition) subscribe(topic) {code} means give me this partition, plus whatever coordinator gives me. However, if the topic user is trying to subscribe to happened to be the same topic of the partition it has already subscribed to in the first line, that is an exception - you can not do manual and automatic partition subscription at the same time. (In this case, the topic would be in subscribedPartitions but not in assignedPartitions) * case 3 {code} unsubscribe(partition) subscribe(topic) {code} is a little bit weird sequence at the first place. How could you suppress a partition while you don't even know if the partition will be assigned to you? Anyway, so here is the behavior. Unsubscribe to a partition will go through. But after calling subscribe(topic), the assigned topic partition will not be suppressed. However, if user is currently subscribing to a partition manually from that topic (in subscribedParititions, not in assignedPartitions) * case 4 {code} subscribe(partition) subscribe(topic) unsubscribe(partition) {code} The first two subscribe is described in case 2. The partition will be suppressed after unsubscribe call. * case 5 {code} subscribe(partition) unsubscribe(partition) subscribe(topic) {code} If the first two lines are called for the same partition. it is equivalent to just call subscribe(topic) - so the partition won't be suppressed. If the first two lines are called for different partitions. It is then again a little bit weird for use to suppress a partition that it does not even know whether it will be assigned. But the partition won't be suppressed. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639080#comment-14639080 ] Jay Kreps commented on KAFKA-2350: -- [~becket_qin] I think there are three proposals: 1. Add pause/unpause 2. Allow subscribe(topic) followed by unsubscribe(partition) to subscribe to a topic but suppress a partition 3. Either of the above but making the use of group management explicit using an enable.group.management flag I'm in favor of (2) if it is possible to work out the corner cases and the implementation doesn't cause us to go crazy. I think you are saying that having unsubscribe mean suppress is actually somewhat sensible. I'm in favor of (1) otherwise. I'm not in favor of (3) because the two different modes. Let me point out a few of the weird bits of (2), though. Since we now allow mingling of subscribe(topic) and subscribe(partition) we have to work out all the combinations. The case where you do subscribe(topic) unsubscribe(partition) is really clear it means give me a set of partitions but suppress this particular partition. Likewise I think subscribe(partition) subscribe(topic) also makes sense. You are saying give me this particular partition plus whatever else the coordinator assigns me. But what about unsubscribe(partition) subscribe(topic) do you still get the same suppression effect? But now this is a bit weird: subscribe(partition) subscribe(topic) unsubscribe(partition) Does the unsubscribe call suppress the partition or not? The first two calls normally mean subscribe me to whatever the co-ordinator gives me plus a given partition. The last two calls normally mean subscribe me to whatever the co-ordinator gives me except this given partition. But is the result of combining these two the same as subscribe(partition) unsubscribe(partition) subscribe(topic) In other words I think all this implies that there are now three states for a partition: SUBSCRIBED, NOT_SUBSCRIBED, SUPPRESSED? This is what I think we'd have to work out to make your proposal feasible. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14639085#comment-14639085 ] Gwen Shapira commented on KAFKA-2350: - [~becket_qin] - when I imagined how the API would work, I assumed that when the coordinator is not used, pause/resume would do nothing. The whole point was to keep sending heartbeats without consuming, to avoid a rebalance, so this is rather meaningless without a coordinator, heartbeats and rebalances. Of course, if someone calls it accidentally, nothing bad will happen. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637126#comment-14637126 ] Jay Kreps commented on KAFKA-2350: -- [~becket_qin] Cool, we're on the same page, that was how I interpreted what you said. There is definitely a sense in which this is more elegant but there is a little complexity since you need to keep a list of surprised partitions and need to populate that when unsubscribe(partition) is called if that topic is subscribed to. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637219#comment-14637219 ] Yasuhiro Matsuda commented on KAFKA-2350: - I think overloading subscribe/unsubscribe is very confusing. Subscribe/unsubscribe and pause/unpause are two very different behaviors. And overloading same method names is not really simplifying the API. I want pause/unpause to be a pure flow control. It shouldn't be mix up with subscription. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637317#comment-14637317 ] Jay Kreps commented on KAFKA-2350: -- [~hachikuji] Yeah I agree that would help this case. The nice thing about that proposal is it would make the group management explicit which could be nice. I wonder if that might not add more things that can go wrong in the common case, though. i.e. right now the common case of just subscribing to a topic and letting the group management figure out the assignment and it is kind of hard to mess that up. All the cases where either you subscribe to individual partitions or you pause partitions are kind of niche uses so maybe it is less important to optimize for those cases? Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637338#comment-14637338 ] Gwen Shapira commented on KAFKA-2350: - Agree with [~yasuhiro.matsuda]. The notion of subscribe vs assigned is a bit challenging to grasp as is ([~guozhang] demostrated that nicely), adding flow-control mechanics into it will be messy and difficult for users to get right. I think we all hope that in most cases users will subscribe and unsubscribe to entire topics and let the coordinator handle the details (thereby reducing use-errors, mailing list cries for help, etc). Adding APIs that will cause the opposite to happen is a step in the opposite direction. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637179#comment-14637179 ] Guozhang Wang commented on KAFKA-2350: -- Currently there is already a function for retrieving the subscribed topic partitions today: {code} public SetTopicPartition subscriptions() { acquire(); try { return Collections.unmodifiableSet(this.subscriptions.assignedPartitions()); } finally { release(); } } {code} which will for example remove the partition and hence change the returned values if consumer.unsubscribe(partition) is called. I actually think [~becket_qin]'s approach will not cause much confusion regarding the APIs. More explicitly assuming we add another function assignment() that returns you the assigned partitions, the semantics of the other APIs will be: {code} consumer.subscribe(topic); // will not throw any exception, but will update the assignment as well as subscription in the next poll. consumer.unsubscribe(topic); // will throw an exception if the topic is not subscribed; otherwise will update the assignment and the subscription in the next poll. consumer.assignment(); // return the assigned partitions consumer.subscriptions(); // return the subscribed partitions, which is the same to the assigned partitions most of the time consumer.subscribe(partition1); // will throw an exception if partition is not in assignment(), saying it is not assigned to you consumer.unsubscribe(partition2); // will throw an exception if partition is not in subscriptions(), saying it is not subscribed by yourself {code} What I am more concerned about this approach is about the client implementation. Since it allows a client to be both using Kafka partition assignment and not during its life cycle, this could possibly make the client state more complicated to manage. For example: {code} consumer.subscribe(topic1); // using kafka for assignment, say we are assigned topic1-partition1 and topic1-partition2 consumer.poll(); consumer.subscribe(topic2-partition1); // subscribe to another partition explicitly without letting kafka coordinator to be aware of. consumer.unsubscribe(topic1-partition1); // now the subscription is topic1-partition2 and topic2-partition1, where the first is from Kafka assignment and the second is from explicit subscription. {code} Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637187#comment-14637187 ] Jason Gustafson commented on KAFKA-2350: [~becket_qin] [~jkreps] Currently automatic assignment is inferred based on which subscribe methods are invoked (e.g. if you subscribe to a topic, then we assume you want automatic assignment). I wonder if it might help to make that instead an explicit configuration parameter? Then that might free us a little bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we don't have to guess whether the user is intending to actually subscribe to a partition or just to pause it. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637753#comment-14637753 ] Jason Gustafson commented on KAFKA-2350: [~jkreps] This is a little unrelated to this jira, but one nice consequence of moving group management into configuration is that it opens the door to using subscribe(topic) as a shortcut to subscribing to all of a topic's partitions for users who do not want to use group management (which does have a little overhead). Currently the user would have to get all the partitions for that topic and then subscribe to them individually. Not that bad, but a tad annoying, especially if you have to poll for changes in the number of partitions. Anyway, I tend to agree with [~yasuhiro.matsuda] and [~gwenshap] that trying to mix partition and topic subscriptions in order to do pausing seems problematic. The explicit pause/unpause methods might not be as elegant, but I think they are easier for users to understand and are much easier for us to implement. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637792#comment-14637792 ] Jiangjie Qin commented on KAFKA-2350: - [~guozhang] In the last case you mentioned, is topic2-partition1 one of the partitions assigned by consumer coordinator? If it is not, the subscription will fail because the partition is not in the assignedTopicpartitions set. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638063#comment-14638063 ] Jiangjie Qin commented on KAFKA-2350: - Yes, that is what I meant. Isn't it the case that subscribe(partition) always check the if subscribe(topic) has been called or not? If user wants to subscribe/unsubscribe to an illegal partition, I kind of think it is easy to understand that we just throw exception and say Consumer is using Kafka based offset assignment and topic partition tp is not an assigned partition. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638070#comment-14638070 ] Jiangjie Qin commented on KAFKA-2350: - I think an explicit configuration makes sense. Different subscribe/unsubscribe methods confuses people from time to time. So if enable.consumer.coordinator==false, user are on their own, all the subscribe/unsubscribe methods will work. But what not clear to me is that if enable.consumer.coordinator==true, will user be able to call subscribe/unsbuscribe(partitions)? Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638139#comment-14638139 ] Jay Kreps commented on KAFKA-2350: -- [~hachikuji] yeah i vote for pause/resume (or unpause). I think the challenge with making group management explicit is that then it will conflict with the api usage. I.e. the user will say client.subscribe(mytopic) and something terrible and bad will happen and they won't know why and we will say uh uh uh you forgot to set the magic enable.consumer.coordinator=true flag. This was kind of what I liked about the implicit approach--the natural usage of just subscribing to a partition or topic does what you would expect. I would anticipate making this a flag turning into a source of confusion because in the early stages of usage most people won't know what a consumer coordinator is. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638182#comment-14638182 ] Jiangjie Qin commented on KAFKA-2350: - [~yasuhiro.matsuda] [~jkreps] [~hachikuji] [~gwenshap] If we are using pause/unpause, does that mean when user are not using consumer coordinator, they are equivalent to subscribe(partitions)/unsubscribe(partitions)? I still don't understand why this is an overloading of current subscribe/unsubscribe API. The way I see how the KafkaConsumer API works is we have different methods to set different fields of a fetch request(offsets, topic, partitions), then we do a poll() using those settings. To me the definition of all the subscribe/unsubscribe methods stay unchanged: * Subscrube/Unsubscribe to a TOPIC means ** Involve consumer coordinator to do partition assignment ** Consumer rebalance will be triggered * Subscribe/Unsubscribe to a PARTITION means ** Do not involve consumer coordinator ** Consumer rebalance will not be triggered. The only change is that instead of naively reject a PARTITION sub/unsub when consumer coordinator is involved, we allow users to decide whether they want to change the setting for your next poll() to exclude some topic partitions that have been assigned to this consumer. Therefore I don't see why using subscribe(partitions)/unsubscribe(partitions) for pause and unpause consuming is a behavior re-definition. It looks to me that pause/unpause does the exact same thing as partition level subscribe/unsubscribe but we are adding them simply because we think user are using them for different use case. if so, does it mean we need to add yet another pair of interface if people are subscribe/unsubscribe partitions for some other use case? Then we are going to end up with a bunch of interfaces doing very similar or even exact same thing but with different names based on the use case. If the reason we don't like to use sub/unsub is because their names sound like purpose oriented and indicate a particular use case, we can change the name to something like addTopicPartition()/addTopic() (I know I am a terrible name picker, but hopefully you get what I wanted to say). Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14637890#comment-14637890 ] Guozhang Wang commented on KAFKA-2350: -- [~becket_qin] OK I understand now. The subscribe(partition) will check if subscribe(topic) has been called before, hence assignment() is not null. I think behavior is a bit confusing since it depends on the call history of subscribe.. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636098#comment-14636098 ] Jay Kreps commented on KAFKA-2350: -- +1 on resume instead of unpause though it doesn't match subscribe/unsubscribe. The original motivation for this was to be able to subscribe at the topic level but be able to say that while you're still subscribed to a given partition you can't take more data for that partition at this particular moment. Generalizing that to allow pausing a whole topic makes sense too. [~becket_qin] I think your idea is having unsubscribe(partition) have the same effect as pause(partition) when you are subscribed at the topic level would be intuitive, but the logic of how that would work might be a bit complex. If someone is smart enough to work out the details that could be more elegant than a new api. The challenge is that partition level subscribe/unsubscribe is currently an error if you are subscribed at the topic level and the details of that control whether group management etc is used too. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636105#comment-14636105 ] Jason Gustafson commented on KAFKA-2350: Hey [~becket_qin], thanks for the suggestion. I think my only concern is that this would make the API more confusing. It would give two meanings to subscribe(partition) which depend on whether automatic assignment is used. I agree with you about minimizing the complexity of the consumer API, but I'd probably rather have the explicit methods if we think the use case is valid. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636319#comment-14636319 ] Jiangjie Qin commented on KAFKA-2350: - [~jkreps][~hachikuji], I actually was not proposing reuse {code} void subscribe(String topic) void unsubscribe(String topic) {code} So I was thinking we follow the current convention which is: 1. If you are subscribing/unsubscribing to a partition explicitly, you are on your own 2. If you are subscribing/unsubscribing to a topic, we use consumer coordinator for partition assignment. I assume the only use case we are trying to address is when user is using consumer coordinator and want to temporarily stop consuming from a topic without triggering a consumer rebalance. If so, to unsubscribe from a topic we can do something like fowllowing {code} ... for(TopicPartition tp : consumer.assignedTopicPartitions.get(topic)) { unsubscribe(tp); } {code} To resubscribe, we can do the similar but just call subscribe(tp) instead This approach might need to expose an interface of assignedTopicPartitions(), but I can see that useful in quite a few use cases. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636074#comment-14636074 ] Jiangjie Qin commented on KAFKA-2350: - I am thinking that currently we keep two collections of topic partitions in KafkaConsumer, one for user subscription, the other for coordinator assignment. Can we do something to the existing code to let subscribe/unsubscribe support pause/unpause as well? Maybe we can have one subscription set and one assigned partition validation set. {code} void subscribe(String topic) void unsubscribe(String topic) {code} will affect both assigned partition set and subscription set. If Kafka based partition assignment is not used, assigned partition set will be null. {code} void subscribe(TopicPartition... partitions) void unsubscribe(TopicPartition... partitions) {code} will only change the subscription set. Calling them won't trigger rebalance. But the topics subscribed to has to be in assigned partition set if it is null. In this way, user can simply use {code} void subscribe(TopicPartitions... partitions) void unsubscribe(TopicPartitions... partitons) {code} to do the pause and unpause. Some other benefits might be: 1. We don't add two more interface to the already somewhat complicated API. 2. We get validation for manual subscription. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634124#comment-14634124 ] Jason Gustafson commented on KAFKA-2350: Sure, I just meant that if you fail to call poll() periodically (in order to pause consumption), then no heartbeats can be sent, which will cause the coordinator to rebalance. This only applies if you are using assignment from the coordinator. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634114#comment-14634114 ] Gwen Shapira commented on KAFKA-2350: - Cool feature :) Can you clarify: If you skip poll() ... then a rebalance will be triggered When does a delay count as skipping? Are we obligated to do the next poll() immediately after the first one ended? I expect to use the consumer to do something like: poll until I get N messages, write those messages elsewhere, poll again. If the write messages elsewhere takes longer than expected (DB is busy kinda scenario), the consumer will lose the partitions? (sorry if I missed important discussion elsewhere, feel free to refer me to another JIRA or thread) Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634126#comment-14634126 ] Gwen Shapira commented on KAFKA-2350: - oh, for some reason I expected heartbeats to be handled in a separate consumer thread. Not sure why though, so never mind :) Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634169#comment-14634169 ] Jason Gustafson commented on KAFKA-2350: There will probably be a large number of users with the same expectation (especially since that's how the old consumer works). We'll have to make sure the documentation is pretty clear on this point. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634229#comment-14634229 ] Yasuhiro Matsuda commented on KAFKA-2350: - Can we have TopicPartition rather than String for finer control? {code} void pause(TopicPartition... partitions) void unpause(TopicPartition... partitions) {code} Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634296#comment-14634296 ] Edward Ribeiro commented on KAFKA-2350: --- Nit: wouldn't the inverse of ``pause`` to be ``resume``? Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634294#comment-14634294 ] Jason Gustafson commented on KAFKA-2350: I don't see why not. In that case, I assume you would expect the paused state to be preserved through a rebalance? Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634322#comment-14634322 ] Yasuhiro Matsuda commented on KAFKA-2350: - To me the state preservation is not a requirement. The application code should track pausing states. I feel that a consumer should reset pausing state after a rebalance. Add KafkaConsumer pause capability -- Key: KAFKA-2350 URL: https://issues.apache.org/jira/browse/KAFKA-2350 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are some use cases in stream processing where it is helpful to be able to pause consumption of a topic. For example, when joining two topics, you may need to delay processing of one topic while you wait for the consumer of the other topic to catch up. The new consumer currently doesn't provide a nice way to do this. If you skip poll() or if you unsubscribe, then a rebalance will be triggered and your partitions will be reassigned. One way to achieve this would be to add two new methods to KafkaConsumer: {code} void pause(String... topics); void unpause(String... topics); {code} When a topic is paused, a call to KafkaConsumer.poll will not initiate any new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)