[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14997148#comment-14997148 ] ASF GitHub Bot commented on KAFKA-2674: --- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/467 KAFKA-2674: clarify onPartitionsRevoked behavior You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-2674 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/467.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #467 commit 7922adfe74278f7f003d4053c7b6e06f618ab1a6 Author: Jason GustafsonDate: 2015-11-09T19:02:44Z KAFKA-2674: clarify onPartitionsRevoked behavior > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14997155#comment-14997155 ] Jason Gustafson commented on KAFKA-2674: [~guozhang] [~becket_qin] I added a commit to clarify the behavior. I think the documentation was already fairly clear, so I just added a comment to emphasize that onPartitionsRevoked() is not called before close(). I also reordered the methods to try to suggest the order they are actually invoked. I don't think this is a blocker for 0.9.0. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14997165#comment-14997165 ] ASF GitHub Bot commented on KAFKA-2674: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/467 > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > Fix For: 0.9.0.0 > > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14986470#comment-14986470 ] Jason Gustafson commented on KAFKA-2674: [~guozhang] [~becket_qin] Since none of the alternatives seem clearly better, maybe we should just keep the current names. I can add a patch to try and clarify the behavior. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14986541#comment-14986541 ] Jiangjie Qin commented on KAFKA-2674: - Yes, I agree documenting it clearly is probably sufficient. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14980737#comment-14980737 ] Jiangjie Qin commented on KAFKA-2674: - [~hachikuji] I am wondering do we want to avoid use *assign* here? Because we have used it for explicit partition management. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14979819#comment-14979819 ] Jason Gustafson commented on KAFKA-2674: [~guozhang] In the context of rebalancing, I think the meaning of onPrepare/onComplete should be fairly clear to the user, and the benefit is that these names suggest the semantics that we actually implement. Maybe oldAssignment is a bad argument name, but we could use currentAssignment as Becket suggests, or we could also use the revoked/assigned names, as below: {code} interface RebalanceListener { void onPrepare(List revokedPartitions); void onComplete(List assignedPartitions); } {code} Haha, I've been working on this code a little too much, so it's hard for me to see whether this would be more intuitive to users. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14979433#comment-14979433 ] Jay Kreps commented on KAFKA-2674: -- [~hachikuji] I don't have a ton to add. I think I added that class, but it was mostly a placeholder not something with a well-thought-out rationale--I agree that the way it calls revoke prior to assign is a bit odd. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14977457#comment-14977457 ] Jiangjie Qin commented on KAFKA-2674: - I don't have a strong opinion on this. I agree with [~guozhang] that the argument name is a little bit weird. If we change the function name perhaps we can use currentAssignment instead of oldAssignment? > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14975020#comment-14975020 ] Guozhang Wang commented on KAFKA-2674: -- Personally I prefer the old function names since the new proposed names seem not very related to the old partitions as its parameter and hence its effort in resolving confusion seems overwhelmed by the new confusions it introduced. I would suggest we only document clearly that the callback will not be triggered upon consumer closure. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14974770#comment-14974770 ] Jason Gustafson commented on KAFKA-2674: [~becket_qin] [~guozhang] What would you guys think of the following change to ConsumerRebalanceListener? Basically the objective is to make the calling order clear. {code} interface RebalanceListener { /* Invoked prior to rebalance, offsets committed here */ void onPrepare(List oldAssignment); /* Invoked after rebalance, set initial offset here */ void onComplete(List newAssignment) } {code} Might also be nice to get feedback from [~jkreps]. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Jason Gustafson > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14967638#comment-14967638 ] Guozhang Wang commented on KAFKA-2674: -- This is a good point, especially if the user DOES NOT want to do some logic upon shutting down that is included in the callback. One edge case though is that upon closing the consumer which tries to close the coordinator, it will likely try to finish all in-flight requests by calling "maybeAutoCommitOffsetsSync", hence users' behavior before calling close() may not be on the final state of the consumer. Do we have any ideas about resolving this? [~becket_qin] [~hachikuji] > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14968350#comment-14968350 ] Jiangjie Qin commented on KAFKA-2674: - [~guozhang] That is a valid concern, I haven't think it through before. The question is what states could change and whether the changes matter. From consumer's point of view, there are only two states that matter: 1)Partition Assignment 2)Consumer Offsets. In most cases, user would not care about partition assignment. In cases where user care about the partition assignment, the assignment after close would be empty. So it seems users do not really lose anything from here. If user are using auto commit, that means user do not really care about the offset commit timing as long as the correct offset is committed. When consumer closes, the correct offsets to commit are the consumed offsets, and consumed offsets will not change unless user call poll(). If user don't want committed offset to change before and after consumer closes, they can either call commitOffsetSync() before closing consumer by themselves or disable auto commit, depending on the use cases. On the other hand, if users are not using auto commit, we will not commit the offsets, so the state won't change. So I think we are OK here. Do you have some specific use case that breaks? > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14968420#comment-14968420 ] Jason Gustafson commented on KAFKA-2674: I agree with [~becket_qin] that this is probably not a problem. One thing that may be unintuitive, and perhaps the reason we're having this discussion, is that the callback names suggest a different invocation order. From a user's perspective, I would expect that each invocation of onPartitionsAssigned() gets a corresponding invocation of onPartitionsRevoked(), not the other way around (doesn't assignment come before revocation after all?). In that case, it's natural to expect that onPartitionsRevoked() is invoked on close(). However, the actual contract is exactly the opposite: onPartitionsRevoked() is always called prior to rebalance, and onPartitionsAssigned() is called after. So I wonder if we should make this clearer by renaming the methods. For example: {code} void beforeRebalance(List oldAssignment); void afterRebalance(List newAssignment); {code} In this case, the invocation order is clear even without reading the documentation. It also seems clear then that close() would not call beforeRebalance(). Any thoughts? > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14968467#comment-14968467 ] Jiangjie Qin commented on KAFKA-2674: - [~hachikuji] I agree the name is a little misleading. People tends to ignore the class name which is ConsumerRebalanceListener. The reason we had onPartitionsRevoked and onPartitionsAssigned was because in old consumer during rebalance there are multiple steps (commit offsets, release partition ownership, assign partitions, claim partition ownership, etc). We wanted to make it clear when those methods are called. However some of the steps might not applicable to new consumer anymore. One thing is when we say beforeRebalance(), it sounds like before committing offsets, while actually it is not. For example, if user call committed() in rebalance listener, they might get staled result. Personally I think this is fine as long as we document the behavior clearly. Another way to think about this is that in the future is it possible we add a beforeCommittingOffset() that get called before committing offset. If there is such possibility I would prefer to keep the current names but document them clearly. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14965532#comment-14965532 ] Jason Gustafson commented on KAFKA-2674: [~onurkaraman] Maybe this issue can be addressed when we update the client-side code for the LeaveGroup request? > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14965782#comment-14965782 ] Jiangjie Qin commented on KAFKA-2674: - [~hachikuji] I am reviewing KAFKA-2464 and also noticed this. I am actually wondering if rebalance listener should be invoked when consumer shuts down. It seems to me that when consumer shuts down, it should simply commit offsets then send a leave group request. It is not actually participating the rebalance any more. Only the rest of the members in the group are doing rebalance. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14965803#comment-14965803 ] Jason Gustafson commented on KAFKA-2674: [~becket_qin] I think the only problem with that is that some users might be doing their own offset management, in which case they might not even be using Kafka to store offsets. Currently, we only commit on close if auto-commit is enabled. I guess we could also depend on the user to manually do commits prior to close, but it seems like they'd probably already have their commit logic in the revoke callback. What do you think? > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14965865#comment-14965865 ] Jiangjie Qin commented on KAFKA-2674: - I think it would be clearer if rebalance callback is only called when rebalance occurred, but not on consumer closure. If auto commit is turned off, and users are committing offset on there own, the following code looks clean to me when user close a consumer. {code} if (!failure) consumer.commitOffsetSync(); consumer.close(); {code} And this is what we do in mirror maker. I prefer this because it does not complex and we are not re-purposing rebalance listener for something else. I can imagine some logic that only makes sense for actual rebalance, say holding a lock and release it when onPartitionAssigned() is called. Thoughts? > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14965913#comment-14965913 ] Guozhang Wang commented on KAFKA-2674: -- Today we are already calling ```commitOffsetSync``` upon consumer closing in the coordinator if auto commit is turned on. In addition, we also call ```commitOffsetSync``` before we call ```onPartitionsRevoked``` during the rebalance if auto commit is turned on. So I think the question is really whether we should also call ```onPartitionsRevoked``` upon closing after we call ```commitOffsetSync``` as well. I prefer adding the ```onPartitionsRevoked``` call since it may be used not only for manual offset management. BTW there is a discrepancy between the old and new consumer in MirrorMaker, that with the old consumer we use a rebalance listener that returns the global assignment in ```onPartitionsAssigned``` whereas in the new consumer it only returns its own assignment. We need to think about how it can be resolved. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14965935#comment-14965935 ] Jason Gustafson commented on KAFKA-2674: Since LeaveGroup will cause a group rebalance, it doesn't seem inconsistent to call the revocation callback prior to closing, but I don't have a strong preference either way as long as the documentation is clear. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2674) ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer close
[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14966003#comment-14966003 ] Jiangjie Qin commented on KAFKA-2674: - [~guozhang] The original reason we add consumer rebalance listener was because rebalance can be triggered without user awareness. But when rebalance occurs, user might want to do something. This is different from consumer closure case. When users close the consumer, they know what they are doing, and likely they will do some cleanup and necessary state checkpoint before closing the consumer. I am not sure how valuable it is to put some pre-closure tasks to onPartitionRevoked(). To me it is a little confusing. Also user might need to add some check to see if the rebalance is caused by close() or it is an actual rebalance.(for example the grabbing a lock as I mentioned before). Good point about the Mirror Maker. The reason we have global assignment for mirror maker during rebalance listener is because we want to allow mirror maker to do some administrative logic when rebalance is triggered. (e.g. when rebalance is triggered because of a new topic is created in source cluster, we want to create the topic in target cluster with the same number of partitions). In order to let the rebalance listener to perform such action we need a global knowledge of topic so we know which topics changed. This global topic information is part of global assignment (global topic info + owners). With the new client-side assignment patch, at least the leader has global assignment knowledge, so I think for that use case we should be fine. Although letting the leader to do everything is not ideal if the administrative work is heavy, but it is still doable. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > --- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Michal Turek >Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance