[jira] [Commented] (KAFKA-2397) leave group request
[ https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660269#comment-14660269 ] Jason Gustafson commented on KAFKA-2397: [~jkreps] Yeah, TCP is pretty resilient to network weirdness. I was mostly thinking client-side timeouts which may end up exposed in configuration. The only thing the client can do if a request times out is disconnect and try again. Perhaps we'd want to keep any timeouts with the coordinator out of configuration if we tried this approach. I was also wondering if there were some tunneling situations which could make the connection unstable. leave group request --- Key: KAFKA-2397 URL: https://issues.apache.org/jira/browse/KAFKA-2397 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Onur Karaman Assignee: Onur Karaman Priority: Minor Fix For: 0.8.3 Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window. This is a low priority optimization! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660276#comment-14660276 ] Mayuresh Gharat commented on KAFKA-2120: [~jasong35] I am not too familiar with the consumer code right now. From your comment above I get a feeling that the consumer and producer have different requirements. Also I see that consumer uses a ConsumerNetworkClient and we can have a separate timeout config for Consumer that can be passed to NetworkClient. By per-requests timeout, do you mean that different types of request can have different timeouts configured? If yes, it can be done. But then we might degrade the user experience wherein the user has to now configure different timeouts for different kinds of requests, which means that he has to have internal knowledge of different kinds of requests that clients are making to brokers. I might be wrong here since I am not fully aware of the consumer code base and I am still starting with it. Let me know your thoughts on this. Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2397) leave group request
[ https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660279#comment-14660279 ] Jay Kreps commented on KAFKA-2397: -- [~guozhang] Basically session would be the concept that ties a connection to the business logic layer. The session would be exposed with the request. I haven't thought through how this would work, but maybe in handleProduce you could do session.addOnClose(clear-produce-purgatory, = removeRequestFromPurgatory(id)) and in the purgatory when the request is completed you'd do session.removeOnClose(clear-produce-purgatory) A similar mechanism would work based on the join-group request to rebalance the group on connection close. Like I said, I didn't think this through and don't really advocate it. Like Jason I think there could be odd side effects. leave group request --- Key: KAFKA-2397 URL: https://issues.apache.org/jira/browse/KAFKA-2397 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Onur Karaman Assignee: Onur Karaman Priority: Minor Fix For: 0.8.3 Let's say every consumer in a group has session timeout s. Currently, if a consumer leaves the group, the worst case time to stabilize the group is 2s (s to detect the consumer failure + s for the rebalance window). If a consumer instead can declare they are leaving the group, the worst case time to stabilize the group would just be the s associated with the rebalance window. This is a low priority optimization! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KAFKA-2364 migrate docs from SVN to git
+1 for the same repo. The closer docs can be to code the more accurate they are likely to be. The same way we encourage unit tests for a new feature/patch. Updating the docs can be the same. If we follow Sqoop's process for example, how would small fixes/adjustments/additions to the live documentation occur without a new release? On Thu, Aug 6, 2015 at 3:33 PM, Guozhang Wang wangg...@gmail.com wrote: I am +1 on same repo too. I think keeping one git history of code / doc change may actually be beneficial for this approach as well. Guozhang On Thu, Aug 6, 2015 at 9:16 AM, Gwen Shapira g...@confluent.io wrote: I prefer same repo for one-commit / lower-barrier benefits. Sqoop has the following process, which decouples documentation changes from website changes: 1. Code github repo contains a doc directory, with the documentation written and maintained in AsciiDoc. Only one version of the documentation, since it is source controlled with the code. (unlike current SVN where we have directories per version) 2. Build process compiles the AsciiDoc to HTML and PDF 3. When releasing, we post the documentation of the new release to the website Gwen On Thu, Aug 6, 2015 at 12:20 AM, Ismael Juma ism...@juma.me.uk wrote: Hi, For reference, here is the previous discussion on moving the website to Git: http://search-hadoop.com/m/uyzND11JliU1E8QU92 People were positive to the idea as Jay said. I would like to see a bit of a discussion around whether the website should be part of the same repo as the code or not. I'll get the ball rolling. Pros for same repo: * One commit can update the code and website, which means: ** Lower barrier for updating docs along with relevant code changes ** Easier to require that both are updated at the same time * More eyeballs on the website changes * Automatically branched with the relevant code Pros for separate repo: * Potentially simpler for website-only changes (smaller repo, less verification needed) * Website changes don't clutter the code Git history * No risk of website change affecting the code Your thoughts, please. Best, Ismael On Fri, Jul 31, 2015 at 6:15 PM, Aseem Bansal asmbans...@gmail.com wrote: Hi When discussing on KAFKA-2364 migrating docs from svn to git came up. That would make contributing to docs much easier. I have contributed to groovy/grails via github so I think having mirror on github could be useful. Also I think unless there is some good reason it should be a separate repo. No need to mix docs and code. I can try that out. Thoughts? -- -- Guozhang -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660717#comment-14660717 ] Jason Gustafson commented on KAFKA-1893: Ditto what [~onurkaraman] said. I think this is pretty natural when the subscribe API is no longer additive. Allow regex subscriptions in the new consumer - Key: KAFKA-1893 URL: https://issues.apache.org/jira/browse/KAFKA-1893 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Jay Kreps Assignee: Ashish K Singh Priority: Critical Fix For: 0.8.3 The consumer needs to handle subscribing to regular expressions. Presumably this would be done as a new api, {code} void subscribe(java.util.regex.Pattern pattern); {code} Some questions/thoughts to work out: - It should not be possible to mix pattern subscription with partition subscription. - Is it allowable to mix this with normal topic subscriptions? Logically this is okay but a bit complex to implement. - We need to ensure we regularly update the metadata and recheck our regexes against the metadata to update subscriptions for new topics that are created or old topics that are deleted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2400; expose heartbeat interval in Kafka...
Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/116 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2400: - Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) Issue resolved by pull request 116 [https://github.com/apache/kafka/pull/116] Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor Fix For: 0.8.3 The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660810#comment-14660810 ] ASF GitHub Bot commented on KAFKA-2400: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/116 Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor Fix For: 0.8.3 The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660687#comment-14660687 ] Onur Karaman commented on KAFKA-1893: - I agree with [~singhashish] that we should keep all forms of subscription mutually exclusive: either use partition subscriptions, topic subscriptions, or wildcard topic subscriptions. Throw an exception if any mixing tries to happen. If the user needs anything more complex, they could use a combination of listTopics and one of the subscribes. Allow regex subscriptions in the new consumer - Key: KAFKA-1893 URL: https://issues.apache.org/jira/browse/KAFKA-1893 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Jay Kreps Assignee: Ashish K Singh Priority: Critical Fix For: 0.8.3 The consumer needs to handle subscribing to regular expressions. Presumably this would be done as a new api, {code} void subscribe(java.util.regex.Pattern pattern); {code} Some questions/thoughts to work out: - It should not be possible to mix pattern subscription with partition subscription. - Is it allowable to mix this with normal topic subscriptions? Logically this is okay but a bit complex to implement. - We need to ensure we regularly update the metadata and recheck our regexes against the metadata to update subscriptions for new topics that are created or old topics that are deleted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660896#comment-14660896 ] Onur Karaman commented on KAFKA-2400: - It seems that this change could increase the chances for a consumer to hammer the coordinator with heartbeats now. Previously, the heartbeat interval was tied to the session timeouts, so the coordinator's min and max session timeouts would in a way limit the heartbeat interval. With this patch dissociating the heartbeat interval from session timeout, the coordinator's min and max session timeouts no longer help shield against this. Is this a concern? Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor Fix For: 0.8.3 The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KAFKA-2364 migrate docs from SVN to git
I am +1 on same repo too. I think keeping one git history of code / doc change may actually be beneficial for this approach as well. Guozhang On Thu, Aug 6, 2015 at 9:16 AM, Gwen Shapira g...@confluent.io wrote: I prefer same repo for one-commit / lower-barrier benefits. Sqoop has the following process, which decouples documentation changes from website changes: 1. Code github repo contains a doc directory, with the documentation written and maintained in AsciiDoc. Only one version of the documentation, since it is source controlled with the code. (unlike current SVN where we have directories per version) 2. Build process compiles the AsciiDoc to HTML and PDF 3. When releasing, we post the documentation of the new release to the website Gwen On Thu, Aug 6, 2015 at 12:20 AM, Ismael Juma ism...@juma.me.uk wrote: Hi, For reference, here is the previous discussion on moving the website to Git: http://search-hadoop.com/m/uyzND11JliU1E8QU92 People were positive to the idea as Jay said. I would like to see a bit of a discussion around whether the website should be part of the same repo as the code or not. I'll get the ball rolling. Pros for same repo: * One commit can update the code and website, which means: ** Lower barrier for updating docs along with relevant code changes ** Easier to require that both are updated at the same time * More eyeballs on the website changes * Automatically branched with the relevant code Pros for separate repo: * Potentially simpler for website-only changes (smaller repo, less verification needed) * Website changes don't clutter the code Git history * No risk of website change affecting the code Your thoughts, please. Best, Ismael On Fri, Jul 31, 2015 at 6:15 PM, Aseem Bansal asmbans...@gmail.com wrote: Hi When discussing on KAFKA-2364 migrating docs from svn to git came up. That would make contributing to docs much easier. I have contributed to groovy/grails via github so I think having mirror on github could be useful. Also I think unless there is some good reason it should be a separate repo. No need to mix docs and code. I can try that out. Thoughts? -- -- Guozhang
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661246#comment-14661246 ] Onur Karaman commented on KAFKA-2413: - I'll have the patch ready later today. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661294#comment-14661294 ] Ashish K Singh commented on KAFKA-2413: --- Hi [~onurkaraman], sorry for not explicitly saying this, but I am working on a patch already. Thanks for your help! New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) at
[jira] [Commented] (KAFKA-2415) Transient failure in LogRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments
[ https://issues.apache.org/jira/browse/KAFKA-2415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661255#comment-14661255 ] ASF GitHub Bot commented on KAFKA-2415: --- GitHub user becketqin opened a pull request: https://github.com/apache/kafka/pull/121 KAFKA-2415: Fix transient failure in LogRecoveryTest You can merge this pull request into a Git repository by running: $ git pull https://github.com/becketqin/kafka KAFKA-2415 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/121.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #121 commit 346103c410f0afd875f95770b47a9235e0bf8ed2 Author: Jiangjie Qin j...@jqin-ld1.linkedin.biz Date: 2015-08-07T03:25:57Z KAFKA-2415: Fix transient failure in LogRecoveryTest Transient failure in LogRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments Key: KAFKA-2415 URL: https://issues.apache.org/jira/browse/KAFKA-2415 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Jiangjie Qin See transient failure in the test with the following error message. kafka.server.LogRecoveryTest testHWCheckpointWithFailuresMultipleLogSegments FAILED java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at kafka.server.LogRecoveryTest$$anonfun$testHWCheckpointWithFailuresMultipleLogSegments$8.apply$mcZ$sp(LogRecoveryTest.scala:215) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:616) at kafka.server.LogRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments(LogRecoveryTest.scala:214) It looks the fix is to wait for the new broker to create the replica before check its HW. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661317#comment-14661317 ] ASF GitHub Bot commented on KAFKA-2413: --- GitHub user onurkaraman opened a pull request: https://github.com/apache/kafka/pull/122 KAFKA-2413: fix ConsumerCoordinator updateConsumer You can merge this pull request into a Git repository by running: $ git pull https://github.com/onurkaraman/kafka KAFKA-2413 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/122.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #122 commit 073dc4b716594880de4fb58c8832f02dd3792683 Author: Onur Karaman okara...@linkedin.com Date: 2015-08-07T04:49:53Z fix ConsumerCoordinator updateConsumer New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661319#comment-14661319 ] Onur Karaman commented on KAFKA-2413: - Aand I just saw this. My bad. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274) at
[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker
[ https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661253#comment-14661253 ] Jiangjie Qin commented on KAFKA-1997: - Oh... I forgot to do it when integrating the close() call. I just submitted a follow up patch. Refactor Mirror Maker - Key: KAFKA-1997 URL: https://issues.apache.org/jira/browse/KAFKA-1997 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1997.patch, KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch, KAFKA-1997_2015-03-18_12:47:32.patch Refactor mirror maker based on KIP-3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661329#comment-14661329 ] Ashish K Singh commented on KAFKA-2413: --- I am not sure if posting patches to issues other person is working on is a good idea. It is discouraging, that is all I can see. There has to be a reason why Assignee field is present in JIRA. I guess it will be a moot point to discuss about it anymore as the fix is already posted. Feel free to assign yourself to the JIRA. I will review the patch posted by you as I have already spent some time to find the issue and to try to fix it. I just hope it does not happen again. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at
[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker
[ https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661250#comment-14661250 ] ASF GitHub Bot commented on KAFKA-1997: --- GitHub user becketqin opened a pull request: https://github.com/apache/kafka/pull/120 KAFKA-1997: Follow-up patch, hardcode key/value serializer in mirror maker to byte serializer. Hardcode the key/value serializer to ByteArraySerializer according to Jun’s comments. You can merge this pull request into a Git repository by running: $ git pull https://github.com/becketqin/kafka KAFKA-1997 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/120.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #120 commit 7f2e5a6ad9d43a4da8fdbcd5b2aaefa1de4c8963 Author: Jiangjie Qin j...@jqin-ld1.linkedin.biz Date: 2015-08-07T03:03:18Z KAFKA-1997: Follow-up patch, hardcode key/value serializer in mirror maker to byte serializer. Refactor Mirror Maker - Key: KAFKA-1997 URL: https://issues.apache.org/jira/browse/KAFKA-1997 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1997.patch, KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch, KAFKA-1997_2015-03-18_12:47:32.patch Refactor mirror maker based on KIP-3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661227#comment-14661227 ] Onur Karaman edited comment on KAFKA-2413 at 8/7/15 2:51 AM: - Woops! I haven't tried it out yet, but I think the fix is: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- (group.topics ++ topics) group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} I forgot to factor in the possibility of the consumer's former previous subscriptions overlapping with its new subscriptions. was (Author: onurkaraman): Woops! I haven't tried it out yet, but I think the fix is: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- (group.topics ++ topics) group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661227#comment-14661227 ] Onur Karaman commented on KAFKA-2413: - Woops! I haven't tried it out yet, but I think the fix is: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- (group.topics ++ topics) group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at
[jira] [Comment Edited] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661227#comment-14661227 ] Onur Karaman edited comment on KAFKA-2413 at 8/7/15 2:51 AM: - Woops! I haven't tried it out yet, but I think the fix is: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- (group.topics ++ topics) group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} I forgot to factor in the possibility of the consumer's previous subscriptions overlapping with its new subscriptions. was (Author: onurkaraman): Woops! I haven't tried it out yet, but I think the fix is: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- (group.topics ++ topics) group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} I forgot to factor in the possibility of the consumer's former previous subscriptions overlapping with its new subscriptions. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException:
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661242#comment-14661242 ] Onur Karaman commented on KAFKA-2413: - Okay I just tested it out. It seems to have fixed the bug. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) at
[jira] [Created] (KAFKA-2415) Transient failure in LogRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments
Jiangjie Qin created KAFKA-2415: --- Summary: Transient failure in LogRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments Key: KAFKA-2415 URL: https://issues.apache.org/jira/browse/KAFKA-2415 Project: Kafka Issue Type: Sub-task Reporter: Jiangjie Qin Assignee: Jiangjie Qin See transient failure in the test with the following error message. kafka.server.LogRecoveryTest testHWCheckpointWithFailuresMultipleLogSegments FAILED java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at kafka.server.LogRecoveryTest$$anonfun$testHWCheckpointWithFailuresMultipleLogSegments$8.apply$mcZ$sp(LogRecoveryTest.scala:215) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:616) at kafka.server.LogRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments(LogRecoveryTest.scala:214) It looks the fix is to wait for the new broker to create the replica before check its HW. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2403; Add support for commit metadata in...
GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/119 KAFKA-2403; Add support for commit metadata in KafkaConsumer You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-2403 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/119.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #119 commit 1ac0309682e170ca78495ade796b0f8957b04df9 Author: Jason Gustafson ja...@confluent.io Date: 2015-08-05T22:10:02Z KAFKA-2403; Add support for commit metadata in KafkaConsumer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2403) Expose offset commit metadata in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660911#comment-14660911 ] Jason Gustafson commented on KAFKA-2403: No hurry to get this checked in, but I thought I'd submit an initial patch. Would be nice to get some feedback to see if this is what people had in mind for this API. Expose offset commit metadata in new consumer - Key: KAFKA-2403 URL: https://issues.apache.org/jira/browse/KAFKA-2403 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor The offset commit protocol supports the ability to add user metadata to commits, but this is not yet exposed in the new consumer API. The straightforward way to add it is to create a container for the offset and metadata and adjust the KafkaConsumer API accordingly. {code} OffsetMetadata { long offset; String metadata; } KafkaConsumer { commit(MapTopicPartition, OffsetMetadata) OffsetMetadata committed(TopicPartition) } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2403) Expose offset commit metadata in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660909#comment-14660909 ] ASF GitHub Bot commented on KAFKA-2403: --- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/119 KAFKA-2403; Add support for commit metadata in KafkaConsumer You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-2403 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/119.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #119 commit 1ac0309682e170ca78495ade796b0f8957b04df9 Author: Jason Gustafson ja...@confluent.io Date: 2015-08-05T22:10:02Z KAFKA-2403; Add support for commit metadata in KafkaConsumer Expose offset commit metadata in new consumer - Key: KAFKA-2403 URL: https://issues.apache.org/jira/browse/KAFKA-2403 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor The offset commit protocol supports the ability to add user metadata to commits, but this is not yet exposed in the new consumer API. The straightforward way to add it is to create a container for the offset and metadata and adjust the KafkaConsumer API accordingly. {code} OffsetMetadata { long offset; String metadata; } KafkaConsumer { commit(MapTopicPartition, OffsetMetadata) OffsetMetadata committed(TopicPartition) } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2403) Expose offset commit metadata in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-2403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-2403: --- Status: Patch Available (was: Open) Expose offset commit metadata in new consumer - Key: KAFKA-2403 URL: https://issues.apache.org/jira/browse/KAFKA-2403 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor The offset commit protocol supports the ability to add user metadata to commits, but this is not yet exposed in the new consumer API. The straightforward way to add it is to create a container for the offset and metadata and adjust the KafkaConsumer API accordingly. {code} OffsetMetadata { long offset; String metadata; } KafkaConsumer { commit(MapTopicPartition, OffsetMetadata) OffsetMetadata committed(TopicPartition) } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661094#comment-14661094 ] Ashish K Singh commented on KAFKA-2413: --- [~hachikuji] thoughts? New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274) at
[jira] [Updated] (KAFKA-2351) Brokers are having a problem shutting down correctly
[ https://issues.apache.org/jira/browse/KAFKA-2351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2351: --- Status: In Progress (was: Patch Available) Brokers are having a problem shutting down correctly Key: KAFKA-2351 URL: https://issues.apache.org/jira/browse/KAFKA-2351 Project: Kafka Issue Type: Bug Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat Attachments: KAFKA-2351.patch, KAFKA-2351_2015-07-21_14:58:13.patch, KAFKA-2351_2015-07-23_21:36:52.patch The run() in Acceptor during shutdown might throw an exception that is not caught and it never reaches shutdownComplete due to which the latch is not counted down and the broker will not be able to shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661157#comment-14661157 ] Ashish K Singh commented on KAFKA-2413: --- Thanks guys for quick response! Below is a test case that can repro the issue. {code} def testRepetitiveTopicSubscription() { val numRecords = 1 sendRecords(numRecords) this.consumers(0).subscribe(topic) TestUtils.waitUntilTrue(() = { this.consumers(0).poll(50) this.consumers(0).subscriptions.size == 2 }, Could not find expected number of subscriptions) TestUtils.createTopic(this.zkClient, tblablac, 2, serverCount, this.servers) sendRecords(1000, new TopicPartition(tblablac, 0)) sendRecords(1000, new TopicPartition(tblablac, 1)) this.consumers(0).subscribe(tblablac) TestUtils.waitUntilTrue(() = { this.consumers(0).poll(50) this.consumers(0).subscriptions.size == 4 }, Could not find expected number of subscriptions) } {code} I guess another interesting problem to solve. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at
[jira] [Updated] (KAFKA-2414) Running kafka-producer-perf-test.sh with --messages 10000000 --message-size 1000 --new-producer will get WARN Error in I/O.
[ https://issues.apache.org/jira/browse/KAFKA-2414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bo Wang updated KAFKA-2414: --- Summary: Running kafka-producer-perf-test.sh with --messages 1000 --message-size 1000 --new-producer will get WARN Error in I/O. (was: Running kafka-producer-perf-test.sh with --messages 1000 --message-size 1000 --new-producer will got WARN Error in I/O.) Running kafka-producer-perf-test.sh with --messages 1000 --message-size 1000 --new-producer will get WARN Error in I/O. Key: KAFKA-2414 URL: https://issues.apache.org/jira/browse/KAFKA-2414 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.2.1 Environment: Linux Reporter: Bo Wang Labels: performance Original Estimate: 1m Remaining Estimate: 1m Running kafka-producer-perf-test.sh with --messages 1000 --message-size 1000 --new-producer will got WARN Error in I/O: java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661177#comment-14661177 ] Ashish K Singh commented on KAFKA-2413: --- [~hachikuji] I was planning to play with it to get more accustomed with the new consumer's intricacies. If you have not already worked out a patch, then is it OK I try to fix it by tonight? New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at
[jira] [Commented] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660903#comment-14660903 ] Jason Gustafson commented on KAFKA-2400: [~onurkaraman] The goal of the ticket was specifically to decouple the heartbeat frequency from the session timeout to allow longish session timeouts but still have quick expected rebalance times. I think this is a helpful feature for users who want to limit the impact from rebalances. Since heartbeats are pretty cheap, I don't feel too much concern hammering the server, but perhaps it would help to have a minimum value? I also wouldn't be opposed to having a hard-coded value that was fairly low. Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor Fix For: 0.8.3 The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660921#comment-14660921 ] Guozhang Wang commented on KAFKA-2400: -- Hey [~onurkaraman], yeah that is indeed an issue. However after thinking it twice I feel for different types of consumer clients these may end up with the same effect: 1. Malicious client: a consumer could always claim my heartbeat frequency is X upon join-group but actually sends a heartbeat every 1ms, for this case I think the only shield would be throttling; i.e. protocols between the coordinator / consumer does not really help here. 2. Mis-configured client: a consumer could mistakenly config its heartbeat frequency too small; the min heartbeat could help in this cases while throttling might just result in the same effect. 3. Not-care client: they will not override the defaults at all, so all we need to do is to make sure the default values are reasonable. A side note is that we'd better be careful throttling heatbeats since this would possibly increase the false positives of consumer failure if we throttle heartbeat the wrong way. Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor Fix For: 0.8.3 The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
Ashish K Singh created KAFKA-2413: - Summary: New consumer's subscribe(Topic...) api fails if called more than once Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at
[jira] [Updated] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1835: --- Resolution: Fixed Status: Resolved (was: Patch Available) [~smiklosovic], I think this is being addressed in KAFKA-2120 by adding a max.block.ms in producer config. So, resolving this jira. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661135#comment-14661135 ] Jason Gustafson commented on KAFKA-2413: [~onurkaraman] I was able to reproduce this error on trunk by subscribing to a second topic while in the consumer's poll loop. It looks like the error might be related to how the server manages group topics. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at
[jira] [Commented] (KAFKA-2414) Running kafka-producer-perf-test.sh with --messages 10000000 --message-size 1000 --new-producer will got WARN Error in I/O.
[ https://issues.apache.org/jira/browse/KAFKA-2414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661164#comment-14661164 ] Bo Wang commented on KAFKA-2414: The exception sometimes occurs , how can I fix or avoid this problem? Running kafka-producer-perf-test.sh with --messages 1000 --message-size 1000 --new-producer will got WARN Error in I/O. Key: KAFKA-2414 URL: https://issues.apache.org/jira/browse/KAFKA-2414 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.2.1 Environment: Linux Reporter: Bo Wang Labels: performance Original Estimate: 1m Remaining Estimate: 1m Running kafka-producer-perf-test.sh with --messages 1000 --message-size 1000 --new-producer will got WARN Error in I/O: java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660950#comment-14660950 ] Jason Gustafson commented on KAFKA-2400: [~onurkaraman], [~guozhang] One related problem that I didn't think about it is that the current consumer allows multiple pending heartbeats to be transmitted to the coordinator. If some request takes longer than normal on the server (maybe a commit for example), then the heartbeats might pile up. It seems unlikely to be a major issue as long as the heartbeat interval is reasonable, but it should be pretty easy to fix. Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor Fix For: 0.8.3 The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661160#comment-14661160 ] Jason Gustafson commented on KAFKA-2413: I think the issue is in here: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- group.topics group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} In particular, it looks like topicsToUnbind is taking the difference in the wrong order. But I'm not sure that just reversing that is correct either since we'd only wan to unbind the topic if no other consumers in the group are subscribed. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at
[jira] [Comment Edited] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661160#comment-14661160 ] Jason Gustafson edited comment on KAFKA-2413 at 8/7/15 1:20 AM: I think the issue is in here (in ConsumerCoordinator): {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- group.topics group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} In particular, it looks like topicsToUnbind is taking the difference in the wrong order. But I'm not sure that just reversing that is correct either since we'd only wan to unbind the topic if no other consumers in the group are subscribed. was (Author: hachikuji): I think the issue is in here: {code} private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { val topicsToBind = topics -- group.topics group.remove(consumer.consumerId) val topicsToUnbind = consumer.topics -- group.topics group.add(consumer.consumerId, consumer) consumer.topics = topics coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) } {code} In particular, it looks like topicsToUnbind is taking the difference in the wrong order. But I'm not sure that just reversing that is correct either since we'd only wan to unbind the topic if no other consumers in the group are subscribed. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at
Re: Review Request 36858: Patch for KAFKA-2120
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/#review94447 --- Thanks for the patch. A few comments below. Also, there seems to be style check failure when running unit tests. :clients:checkstyleMain[ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java:22:8: Unused import - java.util.Set. [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java:23:8: Unused import - org.apache.kafka.common.utils.SystemTime. [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java:24:8: Unused import - org.apache.kafka.common.utils.Time. [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:164:100: ';' is preceded with whitespace. [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:223: if child at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:223: method call child at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:224: method call child at indentation level 18 not at correct indentation, 20 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:225: if child at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:226: if at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:227: if child at indentation level 16 not at correct indentation, 20 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:228: if rcurly at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:229: if child at indentation level 16 not at correct indentation, 20 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:229: method call child at indentation level 16 not at correct indentation, 20 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:230: method call child at indentation level 20 not at correct indentation, 24 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:231: if child at indentation level 16 not at correct indentation, 20 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:232: if rcurly at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:233: else child at indentation level 16 not at correct indentation, 20 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:234: else rcurly at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:236: if child at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:236: method call child at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:237: method call child at indentation level 18 not at correct indentation, 20 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:238: if child at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:240: else child at indentation level 14 not at correct indentation, 16 [ant:checkstyle] /Users/junrao/intellij/kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:248: if child at indentation level 14 not at correct indentation, 16 [ant:checkstyle]
[jira] [Updated] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2120: --- Status: In Progress (was: Patch Available) Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1835. Resolution: Duplicate Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reopened KAFKA-1835: Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2414) Running kafka-producer-perf-test.sh with --messages 10000000 --message-size 1000 --new-producer will got WARN Error in I/O.
Bo Wang created KAFKA-2414: -- Summary: Running kafka-producer-perf-test.sh with --messages 1000 --message-size 1000 --new-producer will got WARN Error in I/O. Key: KAFKA-2414 URL: https://issues.apache.org/jira/browse/KAFKA-2414 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.2.1 Environment: Linux Reporter: Bo Wang Running kafka-producer-perf-test.sh with --messages 1000 --message-size 1000 --new-producer will got WARN Error in I/O: java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660896#comment-14660896 ] Onur Karaman edited comment on KAFKA-2400 at 8/6/15 10:39 PM: -- It seems that this change could increase the chances for a consumer to hammer the coordinator with heartbeats now. Previously, the heartbeat interval was tied to the session timeouts, so the coordinator's min and max session timeouts would in a way limit the heartbeat interval. With this patch dissociating the heartbeat interval from session timeout, the coordinator's min and max session timeouts no longer help shield against this. Is this a concern? Also: my bad for replying to the past few tickets after the check in! I need to start looking at these more proactively. was (Author: onurkaraman): It seems that this change could increase the chances for a consumer to hammer the coordinator with heartbeats now. Previously, the heartbeat interval was tied to the session timeouts, so the coordinator's min and max session timeouts would in a way limit the heartbeat interval. With this patch dissociating the heartbeat interval from session timeout, the coordinator's min and max session timeouts no longer help shield against this. Is this a concern? Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor Fix For: 0.8.3 The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2340) Add additional unit tests for new consumer Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-2340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-2340: Assignee: Jason Gustafson Add additional unit tests for new consumer Fetcher -- Key: KAFKA-2340 URL: https://issues.apache.org/jira/browse/KAFKA-2340 Project: Kafka Issue Type: Test Reporter: Jason Gustafson Assignee: Jason Gustafson Fix For: 0.8.3 There are a number of cases in Fetcher which have no corresponding unit tests. To name a few: - list offset with partition leader unknown - list offset disconnect - fetch disconnect Additionally, updateFetchPosition (which was moved from KafkaConsumer) has no tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2340) Add additional unit tests for new consumer Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-2340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660927#comment-14660927 ] ASF GitHub Bot commented on KAFKA-2340: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/112 Add additional unit tests for new consumer Fetcher -- Key: KAFKA-2340 URL: https://issues.apache.org/jira/browse/KAFKA-2340 Project: Kafka Issue Type: Test Reporter: Jason Gustafson There are a number of cases in Fetcher which have no corresponding unit tests. To name a few: - list offset with partition leader unknown - list offset disconnect - fetch disconnect Additionally, updateFetchPosition (which was moved from KafkaConsumer) has no tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2340) Add additional unit tests for new consumer Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-2340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-2340. -- Resolution: Fixed Fix Version/s: 0.8.3 Issue resolved by pull request 112 [https://github.com/apache/kafka/pull/112] Add additional unit tests for new consumer Fetcher -- Key: KAFKA-2340 URL: https://issues.apache.org/jira/browse/KAFKA-2340 Project: Kafka Issue Type: Test Reporter: Jason Gustafson Fix For: 0.8.3 There are a number of cases in Fetcher which have no corresponding unit tests. To name a few: - list offset with partition leader unknown - list offset disconnect - fetch disconnect Additionally, updateFetchPosition (which was moved from KafkaConsumer) has no tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2400) Expose heartbeat frequency in new consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660955#comment-14660955 ] Onur Karaman commented on KAFKA-2400: - [~hachikuji]: Yeah the reason behind the decoupling is valid and seems like a good idea. [~guozhang]: Agreed. It seems like more things can go wrong by throttling heartbeats as opposed to setting a reasonable lower bound in ConsumerConfig. Expose heartbeat frequency in new consumer configuration Key: KAFKA-2400 URL: https://issues.apache.org/jira/browse/KAFKA-2400 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Priority: Minor Fix For: 0.8.3 The consumer coordinator communicates the need to rebalance through responses to heartbeat requests sent from each member of the consumer group. The heartbeat frequency therefore controls how long normal rebalances will take. Currently, the frequency is hard-coded to 3 heartbeats per the configured session timeout, but it would be nice to expose this setting so that the user can control the impact from rebalancing. Since the consumer is currently single-threaded and heartbeats are sent in poll(), we cannot guarantee that the heartbeats will actually be sent at the configured frequency. In practice, the user may have to adjust their fetch size to ensure that poll() is called often enough to get the desired heartbeat frequency. For most users, the consumption rate is probably fast enough for this not to matter, but we should make the documentation clear on this point. In any case, we expect that most users will accept the default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661110#comment-14661110 ] Onur Karaman commented on KAFKA-2413: - Couple questions to start things off: 1. What is the git hash of kafka you're running on the brokers and consumers? 2. How can I reproduce this? What does your consumer code look like? More specifically, can you go into more detail on the called more than once part? New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at
[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker
[ https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661161#comment-14661161 ] Jun Rao commented on KAFKA-1997: [~becket_qin], has this been fixed yet? Thanks, Refactor Mirror Maker - Key: KAFKA-1997 URL: https://issues.apache.org/jira/browse/KAFKA-1997 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 0.8.3 Attachments: KAFKA-1997.patch, KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch, KAFKA-1997_2015-03-18_12:47:32.patch Refactor mirror maker based on KIP-3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2413) New consumer's subscribe(Topic...) api fails if called more than once
[ https://issues.apache.org/jira/browse/KAFKA-2413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661167#comment-14661167 ] Jason Gustafson commented on KAFKA-2413: [~singhashish] I can attempt a patch if you want. New consumer's subscribe(Topic...) api fails if called more than once - Key: KAFKA-2413 URL: https://issues.apache.org/jira/browse/KAFKA-2413 Project: Kafka Issue Type: Bug Components: consumer Reporter: Ashish K Singh Assignee: Ashish K Singh I believe new consumer is supposed to allow adding to existing topic subscriptions. If it is then the issue is that on trying to subscribe to a topic when consumer is already subscribed to a topic, below exception is thrown. {code} [2015-08-06 16:06:48,591] ERROR [KafkaApi-2] error when handling request null (kafka.server.KafkaApis:103) java.util.NoSuchElementException: key not found: topic at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:109) at kafka.coordinator.RangeAssignor$$anonfun$4.apply(PartitionAssignor.scala:108) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.coordinator.RangeAssignor.assign(PartitionAssignor.scala:108) at kafka.coordinator.ConsumerCoordinator.reassignPartitions(ConsumerCoordinator.scala:378) at kafka.coordinator.ConsumerCoordinator.rebalance(ConsumerCoordinator.scala:360) at kafka.coordinator.ConsumerCoordinator.onCompleteRebalance(ConsumerCoordinator.scala:414) at kafka.coordinator.DelayedRebalance.onComplete(DelayedRebalance.scala:39) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) at kafka.coordinator.DelayedRebalance$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedRebalance.scala:37) at kafka.coordinator.ConsumerCoordinator.tryCompleteRebalance(ConsumerCoordinator.scala:388) at kafka.coordinator.DelayedRebalance.tryComplete(DelayedRebalance.scala:37) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227) at kafka.coordinator.ConsumerCoordinator.doJoinGroup(ConsumerCoordinator.scala:186) at kafka.coordinator.ConsumerCoordinator.handleJoinGroup(ConsumerCoordinator.scala:131) at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:578) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) Unexpected error in join group response: The server experienced an unexpected error when processing the request org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:362) at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:311) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:703) at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:677) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237) at
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14659965#comment-14659965 ] Stefan Miklosovic commented on KAFKA-1835: -- Any progres? It is weird there is no action after such a massive information exchange. Are you stuck somewhere? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2364) Improve documentation for contributing to docs
[ https://issues.apache.org/jira/browse/KAFKA-2364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14659917#comment-14659917 ] Aseem Bansal commented on KAFKA-2364: - How do you reply to and get those emails? Improve documentation for contributing to docs -- Key: KAFKA-2364 URL: https://issues.apache.org/jira/browse/KAFKA-2364 Project: Kafka Issue Type: Task Reporter: Aseem Bansal Priority: Minor Labels: doc While reading the documentation for kafka 8 I saw some improvements that can be made. But the docs for contributing are not very good at https://github.com/apache/kafka. It just gives me a URL for svn. But I am not sure what to do. Can the README.MD file be improved for contributing to docs? I have submitted patches to groovy and grails by sending PRs via github but looking at the comments on PRs submitted to kafak it seems PRs via github are not working for kafka. It would be good to make that work also. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660329#comment-14660329 ] Jason Gustafson commented on KAFKA-2120: [~mgharat] Sounds reasonable to me. Thanks! Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660397#comment-14660397 ] Mayuresh Gharat commented on KAFKA-2120: [~becket_qin] yes we can do that. I will create a separate ticket and work on that. I think we should not include that in this patch if possible since this patch is specifically for KIP-19. Let us wait a bit and think on this a bit and then I will send out the jira links for the new tickets. Thanks, Mayuresh Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2351) Brokers are having a problem shutting down correctly
[ https://issues.apache.org/jira/browse/KAFKA-2351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660361#comment-14660361 ] Mayuresh Gharat commented on KAFKA-2351: Thanks [~junrao]. I will see if I can address the issue reported by [~sslavic] in a few days. If I can, I will upload a new patch else will do it in another ticket. Brokers are having a problem shutting down correctly Key: KAFKA-2351 URL: https://issues.apache.org/jira/browse/KAFKA-2351 Project: Kafka Issue Type: Bug Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat Attachments: KAFKA-2351.patch, KAFKA-2351_2015-07-21_14:58:13.patch, KAFKA-2351_2015-07-23_21:36:52.patch The run() in Acceptor during shutdown might throw an exception that is not caught and it never reaches shutdownComplete due to which the latch is not counted down and the broker will not be able to shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2351) Brokers are having a problem shutting down correctly
[ https://issues.apache.org/jira/browse/KAFKA-2351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660361#comment-14660361 ] Mayuresh Gharat edited comment on KAFKA-2351 at 8/6/15 5:11 PM: Thanks [~junrao]. I will see if I can address the issue reported by [~sslavic] quickly. If I can, I will upload a new patch else will do it in another ticket. was (Author: mgharat): Thanks [~junrao]. I will see if I can address the issue reported by [~sslavic] in a few days. If I can, I will upload a new patch else will do it in another ticket. Brokers are having a problem shutting down correctly Key: KAFKA-2351 URL: https://issues.apache.org/jira/browse/KAFKA-2351 Project: Kafka Issue Type: Bug Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat Attachments: KAFKA-2351.patch, KAFKA-2351_2015-07-21_14:58:13.patch, KAFKA-2351_2015-07-23_21:36:52.patch The run() in Acceptor during shutdown might throw an exception that is not caught and it never reaches shutdownComplete due to which the latch is not counted down and the broker will not be able to shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2143) Replicas get ahead of leader and fail
[ https://issues.apache.org/jira/browse/KAFKA-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660629#comment-14660629 ] Ewen Cheslack-Postava commented on KAFKA-2143: -- [~wushujames] I think one easy way for a broker to be in ISR for one topic and out of ISR for another is a partition -- if topic A has broker 1 as leader and topic B has broker 2 as leader, then broker 3 replicating A and B could be in ISR for A, but unable to replicate data for B if there is a partition between brokers 1 and 3. Replicas get ahead of leader and fail - Key: KAFKA-2143 URL: https://issues.apache.org/jira/browse/KAFKA-2143 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Jiangjie Qin Fix For: 0.8.3 On a cluster of 6 nodes, we recently saw a case where a single under-replicated partition suddenly appeared, replication lag spiked, and network IO spiked. The cluster appeared to recover eventually on its own, Looking at the logs, the thing which failed was partition 7 of the topic {{background_queue}}. It had an ISR of 1,4,3 and its leader at the time was 3. Here are the interesting log lines: On node 3 (the leader): {noformat} [2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [background_queue,7] offset 3722949957 from follower with correlation id 148185816. Possible cause: Request for offset 3722949957 but we only have log segments in the range 3648049863 to 3722949955. (kafka.server.ReplicaManager) [2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [background_queue,7] offset 3722949957 from follower with correlation id 156007054. Possible cause: Request for offset 3722949957 but we only have log segments in the range 3648049863 to 3722949955. (kafka.server.ReplicaManager) [2015-04-23 16:50:13,960] INFO Partition [background_queue,7] on broker 3: Shrinking ISR for partition [background_queue,7] from 1,4,3 to 3 (kafka.cluster.Partition) {noformat} Note that both replicas suddenly asked for an offset *ahead* of the available offsets. And on nodes 1 and 4 (the replicas) many occurrences of the following: {noformat} [2015-04-23 16:50:05,935] INFO Scheduling log segment 3648049863 for log background_queue-7 for deletion. (kafka.log.Log) (edited) {noformat} Based on my reading, this looks like the replicas somehow got *ahead* of the leader, asked for an invalid offset, got confused, and re-replicated the entire topic from scratch to recover (this matches our network graphs, which show 3 sending a bunch of data to 1 and 4). Taking a stab in the dark at the cause, there appears to be a race condition where replicas can receive a new offset before the leader has committed it and is ready to replicate? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660397#comment-14660397 ] Mayuresh Gharat edited comment on KAFKA-2120 at 8/6/15 5:24 PM: [~becket_qin] yes we can do that. I will create a separate ticket and work on that. I think we should not include that in this patch if possible since this patch is specifically for KIP-19. Let us wait and think on this a bit and then I will send out the jira links for the new tickets. Thanks, Mayuresh was (Author: mgharat): [~becket_qin] yes we can do that. I will create a separate ticket and work on that. I think we should not include that in this patch if possible since this patch is specifically for KIP-19. Let us wait a bit and think on this a bit and then I will send out the jira links for the new tickets. Thanks, Mayuresh Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660303#comment-14660303 ] Jason Gustafson commented on KAFKA-2120: [~mgharat] Yeah, that's what I mean. I think the consumer has three different cases where a different timeout should be used: 1. Join group requests: The timeout should be based on the session timeout. 2. Fetch requests: The timeout should be based on the fetch max wait time. 3. All others: This includes heartbeats and metadata requests, and can use the default request.timeout.ms from this patch. So we don't actually need any new configuration, but it seems like we need to ability to set timeouts differently depending on the request type. Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660375#comment-14660375 ] Jiangjie Qin commented on KAFKA-2120: - [~hachikuji] [~mgharat], the request time is for exception handling purpose, the request timeout should be the highest timeout across all the timeout. Can we just sanity check the configurations user provides? The rules should be: Request timeout session timeout. Request timeout fetch.max.wait.timeout request timeout won't kick in before the other timeout reached. The sanity check patch should be relatively simple. We can either do it in this patch or in a separate patch. Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer
[ https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660584#comment-14660584 ] Ashish K Singh commented on KAFKA-1893: --- Should we support topic and regex subscription at same time? There are a few special cases that will need attention if we decide to have regex and topic subscription work at same time. Say one subscribed to test* and then later explicitly unsubscribed from test1. test1 will get subscribed the next time regex check is run. This can be handled by maintaining a blacklist for a particular pattern. The point is we can support that, however it will have complicated logic. Wondering what will we gain from this complicated logic though. One can have a pattern to subscribe to individual topic or a collection of topics or both. Having support for topic and regex subscription does not buy a lot. If nobody suggests otherwise I will go with making topic and regex subscription mutually exclusive. Also, in my current implementation I am only supporting subscribing to a single pattern and not multiple patterns as multiple patterns can be combined into a single pattern. Hope that is fine as well. Allow regex subscriptions in the new consumer - Key: KAFKA-1893 URL: https://issues.apache.org/jira/browse/KAFKA-1893 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Jay Kreps Assignee: Ashish K Singh Priority: Critical Fix For: 0.8.3 The consumer needs to handle subscribing to regular expressions. Presumably this would be done as a new api, {code} void subscribe(java.util.regex.Pattern pattern); {code} Some questions/thoughts to work out: - It should not be possible to mix pattern subscription with partition subscription. - Is it allowable to mix this with normal topic subscriptions? Logically this is okay but a bit complex to implement. - We need to ensure we regularly update the metadata and recheck our regexes against the metadata to update subscriptions for new topics that are created or old topics that are deleted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660314#comment-14660314 ] Mayuresh Gharat commented on KAFKA-2120: Hi [~jasong35], Cool. So what we can do is : 1) Let this patch go through since this is important for the new producer to be used in production. 2) I will create separate ticket for #1 and #2 that you mentioned and we can work on that separately from this patch. Does that work? Thanks, Mayuresh Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660405#comment-14660405 ] Jason Gustafson commented on KAFKA-2120: [~becket_qin] That is what I was thinking when I first looked at this patch, and it's still an option. However, since we are recommending session timeouts on the order of 60 seconds, it seems unfortunate to have to apply the same timeout on all requests (even those that should return immediately). Allowing a lower timeout for other requests means that the consumer could recover from worst-case failures much quicker. That being said, since the sanity check is easy, I think it's a good idea to add it to this patch. Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Partitioning in Kafka
OK, the general consensus seems to be that more elaborate partitioning functions belong to the scope of Kafka. Could somebody have a look at KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092 then? -- Gianmarco On 30 July 2015 at 05:57, Jiangjie Qin j...@linkedin.com.invalid wrote: Just my two cents. I think it might be OK to put this into Kafka if we agree that this might be a good use case for people who wants to use Kafka as temporary store for stream processing. At very least I don't see down side on this. Thanks, Jiangjie (Becket) Qin On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Jason, Thanks for starting the discussion and for your very concise (and correct) summary. Ewen, while what you say is true, those kinds of detasets (large number of keys with skew) are very typical in the Web (think Twitter users, or Web pages, or even just plain text). If you want to compute an aggregate on these datasets (either for reporting purposes, or as part of some analytical task such as machine learning), then the skew will kill your performance, and the amount of parallelism you can effectively extract from your dataset. PKG is a solution to that, without the full overhead of going to shuffle grouping to compute partial aggregates. The problem with shuffle grouping is not only the memory, but also the cost of combining the aggregates, which increases with the parallelism level. Also, by keeping partial aggregates in 2 places, you can query those at runtime with constant overhead (similarly to what you would be able to do with hashing) rather than needing to broadcast the query to all partitions (which you need to do with shuffle grouping). -- Gianmarco On 28 July 2015 at 00:54, Gwen Shapira gshap...@cloudera.com wrote: I guess it depends on whether the original producer did any map tasks or simply wrote raw data. We usually advocate writing raw data, and since we need to write it anyway, the partitioner doesn't introduce any extra hops. Its definitely useful to look at use-cases and I need to think a bit more on whether huge-key-space-with-large-skew is the only one. I think that there are use-cases that are not pure-aggregate and therefore keeping key-list in memory won't help and scaling to large number of partitions is still required (and therefore skew is a critical problem). However, I may be making stuff up, so need to double check. Gwen On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Gwen - this is really like two steps of map reduce though, right? The first step does the partial shuffle to two partitions per key, second step does partial reduce + final full shuffle, final step does the final reduce. This strikes me as similar to partition assignment strategies in the consumer in that there will probably be a small handful of commonly used strategies that we can just maintain as part of Kafka. A few people will need more obscure strategies and they can maintain those implementations themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner for partitioning matrices, and ShuffleRowRDD for their SQL implementation. So I don't think it would be a big deal to include it here, although I'm not really sure how often it's useful -- compared to normal partitioning or just doing two steps by starting with unpartitioned data, you need to be performing an aggregation, the key set needs to be large enough for memory usage to be a problem (i.e. you don't want each consumer to have to maintain a map with every key in it), and a sufficiently skewed distribution (i.e. not just 1 or 2 very hot keys). The key set constraint, in particular, is the one I'm not convinced by since in practice if you have a skewed distribution, you probably also won't actually see every key in every partition; each worker actually only needs to maintain a subset of the key set (and associated aggregate data) in memory. On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com wrote: If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson
[jira] [Work started] (KAFKA-2411) remove usage of BlockingChannel in the broker
[ https://issues.apache.org/jira/browse/KAFKA-2411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2411 started by Ismael Juma. -- remove usage of BlockingChannel in the broker - Key: KAFKA-2411 URL: https://issues.apache.org/jira/browse/KAFKA-2411 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jun Rao Assignee: Ismael Juma In KAFKA-1690, we are adding the SSL support at Selector. However, there are still a few places where we use BlockingChannel for inter-broker communication. We need to replace those usage with Selector/NetworkClient to enable inter-broker communication over SSL. Specially, BlockingChannel is currently used in the following places. 1. ControllerChannelManager: for the controller to propagate metadata to the brokers. 2. KafkaServer: for the broker to send controlled shutdown request to the controller. 3. AbstractFetcherThread: for the follower to fetch data from the leader (through SimpleConsumer). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2364) Improve documentation for contributing to docs
[ https://issues.apache.org/jira/browse/KAFKA-2364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660041#comment-14660041 ] Ben Stopford commented on KAFKA-2364: - Hi Aseem Have you signed up to the mailing lists? http://kafka.apache.org/contact.html Improve documentation for contributing to docs -- Key: KAFKA-2364 URL: https://issues.apache.org/jira/browse/KAFKA-2364 Project: Kafka Issue Type: Task Reporter: Aseem Bansal Priority: Minor Labels: doc While reading the documentation for kafka 8 I saw some improvements that can be made. But the docs for contributing are not very good at https://github.com/apache/kafka. It just gives me a URL for svn. But I am not sure what to do. Can the README.MD file be improved for contributing to docs? I have submitted patches to groovy and grails by sending PRs via github but looking at the comments on PRs submitted to kafak it seems PRs via github are not working for kafka. It would be good to make that work also. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34641: Patch for KAFKA-2214
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/#review94378 --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala (line 43) https://reviews.apache.org/r/34641/#comment148956 Formatting nitpick: Since we're now using the result of the try expression (via reassignmentStatus) we should indent the try/catch block. core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala (line 51) https://reviews.apache.org/r/34641/#comment148957 Question: It looks to me as if we're using ReassignmentCompleted in two (different) ways: In verifyAssignment() its semantics are the reassignment operation is fully completed (notably, it is not in progress any longer). In the main() its semantics seem to be reassignment operation was successfully initiated, right? This might also be the reason why main() -- unlike verifyAssignment -- will not return ReassignmentInProgress in any case. Sticking to the current way main() is supposed to work (if I understand correctly), I'd intuitively expect main() to distinguish the following states (names are just examples): (1) ReassignmentInitiated with a shell status of 0, (2) ReassignmentFailed with a shell status of 2. (I thought about the alternative to re-use ReassignmentInProgress instead of ReassignmentCompleted, but ReassignmentInProgress has a shell status code of 1 whereas we'd need a status of 0 for non-failures. So this doesn't work.) I don't have a strong opinion either way, but personally I'd prefer not to conflate these two different semantics of the current ReassignmentCompleted usage. Would that work? And please correct me if my understanding is wrong! core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala (line 87) https://reviews.apache.org/r/34641/#comment148955 Formatting nitpick: Missing spaces between if's and conditions. core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala (line 265) https://reviews.apache.org/r/34641/#comment148961 FYI: Exit code 2 is arguably a reserved status code (see http://tldp.org/LDP/abs/html/exitcodes.html#EXITCODESREF): Misuse of shell builtins (according to Bash documentation) To be extra-compliant we could change: ReassignmentFailed (the true failure) - status 1 ReassignmentInProgress (more like a WARN) - status 3 Again, I don't have a strong opinion here as it's IMHO pretty common to ignore the advice/link above. In general the patch is going in the right direction. I only added two minor formatting issues and a clarification request. - Michael Noll On Aug. 5, 2015, 3:19 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/ --- (Updated Aug. 5, 2015, 3:19 p.m.) Review request for kafka. Bugs: KAFKA-2214 https://issues.apache.org/jira/browse/KAFKA-2214 Repository: kafka Description --- Addresing ismail juma's comments Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ea345895a52977c25bff57e95e12b8662331d7fe Diff: https://reviews.apache.org/r/34641/diff/ Testing --- Thanks, Manikumar Reddy O
Re: KAFKA-2364 migrate docs from SVN to git
Hi, For reference, here is the previous discussion on moving the website to Git: http://search-hadoop.com/m/uyzND11JliU1E8QU92 People were positive to the idea as Jay said. I would like to see a bit of a discussion around whether the website should be part of the same repo as the code or not. I'll get the ball rolling. Pros for same repo: * One commit can update the code and website, which means: ** Lower barrier for updating docs along with relevant code changes ** Easier to require that both are updated at the same time * More eyeballs on the website changes * Automatically branched with the relevant code Pros for separate repo: * Potentially simpler for website-only changes (smaller repo, less verification needed) * Website changes don't clutter the code Git history * No risk of website change affecting the code Your thoughts, please. Best, Ismael On Fri, Jul 31, 2015 at 6:15 PM, Aseem Bansal asmbans...@gmail.com wrote: Hi When discussing on KAFKA-2364 migrating docs from svn to git came up. That would make contributing to docs much easier. I have contributed to groovy/grails via github so I think having mirror on github could be useful. Also I think unless there is some good reason it should be a separate repo. No need to mix docs and code. I can try that out. Thoughts?
[jira] [Commented] (KAFKA-2364) Improve documentation for contributing to docs
[ https://issues.apache.org/jira/browse/KAFKA-2364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14659615#comment-14659615 ] Ismael Juma commented on KAFKA-2364: [~anshbansal], there were a couple of responses: http://search-hadoop.com/m/uyzND1Dux842dm7vg2 I just sent another reply, but it looks like it's not in the archives yet. Improve documentation for contributing to docs -- Key: KAFKA-2364 URL: https://issues.apache.org/jira/browse/KAFKA-2364 Project: Kafka Issue Type: Task Reporter: Aseem Bansal Priority: Minor Labels: doc While reading the documentation for kafka 8 I saw some improvements that can be made. But the docs for contributing are not very good at https://github.com/apache/kafka. It just gives me a URL for svn. But I am not sure what to do. Can the README.MD file be improved for contributing to docs? I have submitted patches to groovy and grails by sending PRs via github but looking at the comments on PRs submitted to kafak it seems PRs via github are not working for kafka. It would be good to make that work also. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review94412 --- A few more comments. We need to be careful with sensors at the client-id level. Clients can come and go (e.g. console consumer). We probably don't want to hold sensors that are not longer actively used since it takes memory. So, we will need some way of removing inactive sensors. Not sure if we should add this at the metric level or at the quota level. core/src/main/scala/kafka/server/ClientQuotaManager.scala (lines 154 - 201) https://reviews.apache.org/r/33049/#comment148998 Not sure why the lock is needed. metrics.sensor() is synchronized and alreayd does the getOrCreate thing. core/src/main/scala/kafka/server/ClientQuotaManager.scala (lines 187 - 191) https://reviews.apache.org/r/33049/#comment149000 I suggest this earlier but realized now this may not work. The rate thing works when there is a single client instance per client-id. However, there could be multiple instances in reality. This means the accumlated delay time could be larger than the elapsed time and the percentage of time delayed can be larger than 1, which is werid. So, we will need some other way to measure the degree of throttling (potentially at both client-id and global level). - Jun Rao On Aug. 5, 2015, 2:08 a.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated Aug. 5, 2015, 2:08 a.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description --- Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases for both producer and consumer 6. This doesn't include a system test. There is a separate ticket for that 7. Fixed KAFKA-2191 - (Included fix from : https://reviews.apache.org/r/34418/ ) Addressing Joel's comments Diffs - clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java 98429da34418f7f1deba1b5e44e2e6025212edb3 clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 544e120594de78c43581a980b1e4087b4fb98ccb clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaManager.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala dbe170f87331f43e2dc30165080d2cb7dfe5fdbf core/src/main/scala/kafka/server/KafkaServer.scala 84d4730ac634f9a5bf12a656e422fea03ad72da8 core/src/main/scala/kafka/server/ReplicaManager.scala 795220e7f63d163be90738b4c1a39687b44c1395 core/src/main/scala/kafka/server/ThrottledResponse.scala PRE-CREATION core/src/main/scala/kafka/utils/ShutdownableThread.scala fc226c863095b7761290292cd8755cd7ad0f155c core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 04a02e08a54139ee1a298c5354731bae009efef3 core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
Re: KAFKA-2364 migrate docs from SVN to git
I prefer same repo for one-commit / lower-barrier benefits. Sqoop has the following process, which decouples documentation changes from website changes: 1. Code github repo contains a doc directory, with the documentation written and maintained in AsciiDoc. Only one version of the documentation, since it is source controlled with the code. (unlike current SVN where we have directories per version) 2. Build process compiles the AsciiDoc to HTML and PDF 3. When releasing, we post the documentation of the new release to the website Gwen On Thu, Aug 6, 2015 at 12:20 AM, Ismael Juma ism...@juma.me.uk wrote: Hi, For reference, here is the previous discussion on moving the website to Git: http://search-hadoop.com/m/uyzND11JliU1E8QU92 People were positive to the idea as Jay said. I would like to see a bit of a discussion around whether the website should be part of the same repo as the code or not. I'll get the ball rolling. Pros for same repo: * One commit can update the code and website, which means: ** Lower barrier for updating docs along with relevant code changes ** Easier to require that both are updated at the same time * More eyeballs on the website changes * Automatically branched with the relevant code Pros for separate repo: * Potentially simpler for website-only changes (smaller repo, less verification needed) * Website changes don't clutter the code Git history * No risk of website change affecting the code Your thoughts, please. Best, Ismael On Fri, Jul 31, 2015 at 6:15 PM, Aseem Bansal asmbans...@gmail.com wrote: Hi When discussing on KAFKA-2364 migrating docs from svn to git came up. That would make contributing to docs much easier. I have contributed to groovy/grails via github so I think having mirror on github could be useful. Also I think unless there is some good reason it should be a separate repo. No need to mix docs and code. I can try that out. Thoughts?
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660241#comment-14660241 ] Jason Gustafson commented on KAFKA-2120: [~mgharat] It occurred to me that the global request timeout doesn't work so well for the consumer. In particular, join group requests can take as long as the session timeout (which we encourage people to set high), but it would be unfortunate if we had to use that as the timeout for all requests. I wonder if it would be a good idea to have per-request timeouts? Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14660242#comment-14660242 ] Glenn Sontheimer commented on KAFKA-2120: - Hello, I am out of the office and will return on August 10. Thanks. -Glenn Add a request timeout to NetworkClient -- Key: KAFKA-2120 URL: https://issues.apache.org/jira/browse/KAFKA-2120 Project: Kafka Issue Type: New Feature Reporter: Jiangjie Qin Assignee: Mayuresh Gharat Fix For: 0.8.3 Attachments: KAFKA-2120.patch, KAFKA-2120_2015-07-27_15:31:19.patch, KAFKA-2120_2015-07-29_15:57:02.patch Currently NetworkClient does not have a timeout setting for requests. So if no response is received for a request due to reasons such as broker is down, the request will never be completed. Request timeout will also be used as implicit timeout for some methods such as KafkaProducer.flush() and kafkaProducer.close(). KIP-19 is created for this public interface change. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -- This message was sent by Atlassian JIRA (v6.3.4#6332)