[jira] [Commented] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster
[ https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620659#comment-14620659 ] Flavio Junqueira commented on KAFKA-2300: - My assessment so far focused on the original problem that has been reported in this issue, the one that the controller is stuck because of some spurious state. From the description and the comments, ControllerBrokerRequestBatch.updateMetadataRequestMap isn't empty at the time a broker tries to re-join, which causes the processing of KafkaController.onBrokerStartup to fail because a call to ControllerBrokerRequestBatch.newBatch (verifies that three data structures are empty, including updateMetadataRequestMap) throws an exception. From the description, the update metadata requests have -2 for the leader, which is LeaderDuringDelete. Such requests are put in the update metadata map via a call from onTopicDeletion to controller.sendUpdateMetadataRequest. Interestingly, the call to sendUpdateMetadataRequest adds to the update metadata map by calling brokerRequestBatch.addUpdateMetadataRequestForBrokers and right after invokes brokerRequestBatch.sendRequestsToBrokers. The latter is supposed to clear the update metadata map, which makes me think that there could have been an exception that interrupted the flow, causing the updateMetadataRequestMap to not be cleared. I wanted to ask if anyone has anything to add or correct in the analysis so far, and I also if [~kharriger] would be able to post the whole log so that I can have a look. Ultimately, it'd be great to avoid having the controller unable to make progress because of some bad state. Error in controller log when broker tries to rejoin cluster --- Key: KAFKA-2300 URL: https://issues.apache.org/jira/browse/KAFKA-2300 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Johnny Brown Assignee: Flavio Junqueira Hello Kafka folks, We are having an issue where a broker attempts to join the cluster after being restarted, but is never added to the ISR for its assigned partitions. This is a three-node cluster, and the controller is broker 2. When broker 1 starts, we see the following message in broker 2's controller.log. {{ [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener) java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some UpdateMetadata state changes Map(2 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 1 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 3 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1))) might be lost at kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974) at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) }} {{prod-sver-end}} is a topic we previously deleted. It seems some remnant of it persists in the controller's memory, causing an exception which interrupts the state change triggered by the broker startup. Has anyone seen something like this? Any idea what's
Re: Build failed in Jenkins: KafkaPreCommit #142
Of course you're right -- my bad, I scanned the log too quickly and misread the cause of the error. On Wed, Jul 8, 2015 at 10:30 PM, Joel Koshy jjkosh...@gmail.com wrote: Agreed that we should change it to exclude build/** - however, build 142 failure does not seem to be rat-related is it? E.g., compare the console output with an earlier build (138 I think). Sorry I can't verify/debug right now, but can look tomorrow. On Wednesday, July 8, 2015, Ewen Cheslack-Postava e...@confluent.io javascript:_e(%7B%7D,'cvml','e...@confluent.io'); wrote: Joel, this looks like it's failing for basically the same file set as the last fix. We probably want to just ignore all of build/, not just build/rat/rat-report.xml. I would say rat may end up being too much of a hassle, but it already caught invalid license headers in another case... -Ewen On Wed, Jul 8, 2015 at 10:39 AM, Apache Jenkins Server jenk...@builds.apache.org wrote: See https://builds.apache.org/job/KafkaPreCommit/142/changes Changes: [wangguoz] KAFKA-2308: make MemoryRecords idempotent; reviewed by Guozhang Wang [cshapi] KAFKA-2316: Drop java 1.6 support; patched by Sriharsha Chintalapani reviewed by Ismael Juma and Gwen Shapira -- Started by an SCM change Building remotely on H11 (Ubuntu ubuntu) in workspace https://builds.apache.org/job/KafkaPreCommit/ws/ Cloning the remote Git repository Cloning repository https://git-wip-us.apache.org/repos/asf/kafka.git git init https://builds.apache.org/job/KafkaPreCommit/ws/ # timeout=10 Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git git --version # timeout=10 git fetch --tags --progress https://git-wip-us.apache.org/repos/asf/kafka.git +refs/heads/*:refs/remotes/origin/* git config remote.origin.url https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10 git config remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # timeout=10 git config remote.origin.url https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10 Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git git fetch --tags --progress https://git-wip-us.apache.org/repos/asf/kafka.git +refs/heads/*:refs/remotes/origin/* git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10 git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10 Checking out Revision 7df39e0394a6fd2f26e8f12768ebf7fecd56e3da (refs/remotes/origin/trunk) git config core.sparsecheckout # timeout=10 git checkout -f 7df39e0394a6fd2f26e8f12768ebf7fecd56e3da git rev-list 4204f4a06bf23160ceec4aa54331db62681bff82 # timeout=10 Setting GRADLE_2_1_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.1 [KafkaPreCommit] $ /bin/bash -xe /tmp/hudson301654436593382.sh + /home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.1/bin/gradle To honour the JVM settings for this build a new JVM will be forked. Please consider using the daemon: http://gradle.org/docs/2.1/userguide/gradle_daemon.html. Download https://repo1.maven.org/maven2/org/ajoberstar/grgit/0.2.3/grgit-0.2.3.pom Download https://repo1.maven.org/maven2/org/eclipse/jgit/org.eclipse.jgit/3.3.0.201403021825-r/org.eclipse.jgit-3.3.0.201403021825-r.pom Download https://repo1.maven.org/maven2/org/eclipse/jgit/org.eclipse.jgit-parent/3.3.0.201403021825-r/org.eclipse.jgit-parent-3.3.0.201403021825-r.pom Download https://repo1.maven.org/maven2/org/eclipse/jgit/org.eclipse.jgit.ui/3.3.0.201403021825-r/org.eclipse.jgit.ui-3.3.0.201403021825-r.pom Download https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.jsch/0.0.7/jsch.agentproxy.jsch-0.0.7.pom Download https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy/0.0.7/jsch.agentproxy-0.0.7.pom Download https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.pageant/0.0.7/jsch.agentproxy.pageant-0.0.7.pom Download https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.sshagent/0.0.7/jsch.agentproxy.sshagent-0.0.7.pom Download https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.usocket-jna/0.0.7/jsch.agentproxy.usocket-jna-0.0.7.pom Download https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.usocket-nc/0.0.7/jsch.agentproxy.usocket-nc-0.0.7.pom Download https://repo1.maven.org/maven2/com/jcraft/jsch/0.1.46/jsch-0.1.46.pom Download https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.core/0.0.7/jsch.agentproxy.core-0.0.7.pom Download https://repo1.maven.org/maven2/org/ajoberstar/grgit/0.2.3/grgit-0.2.3.jar Download https://repo1.maven.org/maven2/org/eclipse/jgit/org.eclipse.jgit/3.3.0.201403021825-r/org.eclipse.jgit-3.3.0.201403021825-r.jar
[jira] [Created] (KAFKA-2326) KafkaProducer - the this reference escapes during construction
Steve Tian created KAFKA-2326: - Summary: KafkaProducer - the this reference escapes during construction Key: KAFKA-2326 URL: https://issues.apache.org/jira/browse/KAFKA-2326 Project: Kafka Issue Type: Bug Components: clients, producer Affects Versions: 0.8.2.1, 0.8.2.0 Reporter: Steve Tian Assignee: Jun Rao The this reference of KafkaProducer escapes during construction as KafkaProducer invokes Thread.start() during construction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: Browsed through the patch, overall looks very promising. I am not very clear on a few detailed changes though: 1. The request future adapter / handler modifications. 2. Retry backoff implementation seems not correct. Could you explain a little bit on these two aspects? Thanks for the feedback. Comments below. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java, line 27 https://reviews.apache.org/r/36333/diff/1/?file=1002919#file1002919line27 You may want to add the committed offset map in the callback since otherwise it is unclear which commit it is referring to when triggered. This is a good idea. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, lines 88-93 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line88 This is not introduced in the patch, but I am not sure if this is the right way to respect backoff time. For example, if the destination broker is down for a short period of time, poll(retryBackoffMs) will immediately return, and hence this function will busy triggering poll() and fluding the network with metadata requests right? What we want in this case, is that the consumer should wait for retryBackoffMs before retry sending the next metadata request. This code was lifted from KafkaConsumer, but I think you are right. I'll fix it for the next patch. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 120 https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line120 We can remove this line since it is checked inside ensureAssignment() already. It's a little subtle, but the call to ensureCoordinatorKnown in ensureAssignment is inside the loop, so it will only be called when an assignment is needed. I considered putting it outside the loop, but inside is where it needs to be, and it looks odd to have it in both places. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 218 https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line218 There is a potential risk of not aligning the scheduling of heartbeat with the discovery of the coordinator. For example, let's say: 1. at t0 we call initHeartbeatTask with interval 100; 2. at t1 the consumer already find the coordinator, but it will not send the first HB until t100; 3. at t100 the consumer may find itself already been kicked out of the group by the coordinator, and reschedule at t200 and re-join group. 4. at t101 the consumer has re-joined the group, but will not send the HB until t200, and so on .. Perhaps the way to deal with this is to give the heartbeat task suspend() and resume() methods so that we can align it properly according to coordinator events. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 296 https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line296 Why we change the behavior to directly throw exception here? I removed the retry action from RequestFuture in favor of a check for RetriableException. In this case, both errors are retriable, so the result is the same. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java, lines 1-43 https://reviews.apache.org/r/36333/diff/1/?file=1002931#file1002931line1 Not clear why you want to convert the future type in this adapter, can you elaborate a bit? The idea of the future adapter was to provide a way to convert from a future of one type to another. The use case was converting RequestFutureClientResponse to instances of the respective request (e.g. RequestFutureJoinGroupResponse). If you look at the error handling in the consumer, they are all doing basically the same thing. If the error is NOT_COORDINATOR_FOR_CONSUMER, for example, we just mark the coordinator dead. I hoped that this api would allow us to handle some coordinator errors generically while leaving the respective request handler to deal only with its specific errors. Unfortunately, the only generic check that this enabled was the check for a disconnect. To locate server error codes, you have to traverse the respective response structures, which cannot be done generically. At the moment, I'm not sure it brings enough to the table to justify its existence, but since it's a fairly common pattern with future handling and one that might be helpful in the future, I op ted to
Re: Review Request 36346: fix heartbeat to allow offset commits during PreparingRebalance
On July 9, 2015, 4:58 p.m., Ewen Cheslack-Postava wrote: Just FYI, this was also noticed in the patch for KAFKA-2123 (which is moving quite a bit of code around). The solution was a bit different (the condition is different and keeps the existing generation ID increment location). Thanks for the heads up Ewen. I started looking at that patch yesterday but didn't reach the changes to core (it's at the end of the rb). It looks like Jason roughly followed option 1 from my comment in https://issues.apache.org/jira/browse/KAFKA-1740?focusedCommentId=14620038page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14620038 . I prefer the approach I provided because the generation id increment logic is easier to understand and the logged state transitions with their generation ids are more readable. - Onur --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36346/#review91118 --- On July 9, 2015, 9:42 a.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36346/ --- (Updated July 9, 2015, 9:42 a.m.) Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- fix heartbeat to allow offset commits during PreparingRebalance Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e Diff: https://reviews.apache.org/r/36346/diff/ Testing --- Manual testing only so far. The OffsetManager merge didn't add anything to ConsumerCoordinatorResponseTest.scala. I'll try to add all the missing OffsetCommitRequest and OffsetFetchRequest handling tests to this rb. Thanks, Onur Karaman
[jira] [Updated] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-2298: -- Resolution: Fixed Status: Resolved (was: Patch Available) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient --- Key: KAFKA-2298 URL: https://issues.apache.org/jira/browse/KAFKA-2298 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Labels: quotas Attachments: KAFKA-2298.patch, KAFKA-2298_2015-06-23_18:47:54.patch, KAFKA-2298_2015-06-24_13:00:39.patch I run into the problem described in KAFKA-2266 when testing quota. I was told the bug was fixed in KAFKA-2266 after I figured out the problem. But the patch provided in KAFKA-2266 probably doesn't solve all related problems. From reading the code there is still one edge case where the client selector can close connection in poll() without notifying NetworkClient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
[ https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Geoffrey Anderson updated KAFKA-2327: - Description: To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) - } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort was: To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) - } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort broker doesn't start if config defines advertised.host but not advertised.port -- Key: KAFKA-2327 URL:
[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
[ https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621085#comment-14621085 ] ASF GitHub Bot commented on KAFKA-2327: --- GitHub user granders opened a pull request: https://github.com/apache/kafka/pull/73 KAFKA-2327 Added unit tests as well. These fail without the fix, but pass with the fix. You can merge this pull request into a Git repository by running: $ git pull https://github.com/confluentinc/kafka KAFKA-2327 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/73.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #73 commit 23b3340b91800ff6568ac8f07f9188659358ecc3 Author: Geoff Anderson ge...@confluent.io Date: 2015-07-09T19:01:27Z Fixes KAFKA-2327 broker doesn't start if config defines advertised.host but not advertised.port -- Key: KAFKA-2327 URL: https://issues.apache.org/jira/browse/KAFKA-2327 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Geoffrey Anderson Assignee: Geoffrey Anderson Priority: Minor To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) {code} ... } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) ... {code} - {code} } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request: KAFKA-2327
GitHub user granders opened a pull request: https://github.com/apache/kafka/pull/73 KAFKA-2327 Added unit tests as well. These fail without the fix, but pass with the fix. You can merge this pull request into a Git repository by running: $ git pull https://github.com/confluentinc/kafka KAFKA-2327 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/73.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #73 commit 23b3340b91800ff6568ac8f07f9188659358ecc3 Author: Geoff Anderson ge...@confluent.io Date: 2015-07-09T19:01:27Z Fixes KAFKA-2327 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
[ https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621045#comment-14621045 ] Gwen Shapira commented on KAFKA-2327: - Oops. My bug :) Do you have a patch, or shall I fix this? broker doesn't start if config defines advertised.host but not advertised.port -- Key: KAFKA-2327 URL: https://issues.apache.org/jira/browse/KAFKA-2327 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Geoffrey Anderson Assignee: Geoffrey Anderson Priority: Minor To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) {code} ... } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) ... {code} - {code} } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
Geoffrey Anderson created KAFKA-2327: Summary: broker doesn't start if config defines advertised.host but not advertised.port Key: KAFKA-2327 URL: https://issues.apache.org/jira/browse/KAFKA-2327 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Geoffrey Anderson Assignee: Geoffrey Anderson Priority: Minor To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) - } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
[ https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Geoffrey Anderson updated KAFKA-2327: - Description: To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) {code} ... } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) ... {code} - {code} } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort {code} was: To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) - } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort broker doesn't start if config defines advertised.host but not advertised.port -- Key: KAFKA-2327 URL:
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 121 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121 Is this the behavior we want? Both timeout and delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks especially, it seems like we're tying the failure of requests to unrelated events? Previously, I thought request failures may happen quickly, but then there was a Utils.sleep backoff. I am not seeing how that is handled now? If the connection isn't established, won't poll() not send out requests, run client.poll(), possibly return very quickly before the connection is established/fails, then fail the unsent requests, then just return to the caller? And that caller might be one of those while() loops, so it may just continue to retry, busy looping while not actually accomplishing anything. If we're going to introduce queuing of send requests here, it seems like a timeout (rather than fast fail + backoff) might be a more natural solution. So rather than clearUnsentRequests, it might be clearExpiredRequests. This is not clear from the comments, but the main reason for the unsent request queue in this class is to get around NetworkClient's single-send behavior. Currently, NetworkClient will only accept one send at a time for a given node. If you have multiple requests, you have to sprinkle in a bunch of poll() calls and use ready() to make sure that they can all get buffered. This is annoying because you cannot attach a callback to a request until you are actually able to send it. This means you always have to send requests in a poll loop, which implies you can't really have asynchronous requests. The unsent request queue provides a way to buffer these requests before transporting to the network layer which allows us to give a future to the caller immediately. Now, when the user calls poll, we try to send all of the unsent requests immediately (by doing the ready()/poll() dance until nothing else can be sent). Note that none of this depends on the timeout passed to poll: we are just calling poll(0) until nothing can be sent. After that, then we call poll(timeout). It may be the case that there were some requests that failed to be sent. This could be because the client is still connecting to the node or that its send buffer is full. To make the behavior of this class easy to understand, I just fail these requests which means that the unsent request queue never accumulates beyond a poll call. So every request sent either gets transported or fails. I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. It should probably be invoked whenever we retry a request to the coordinator. One potential issue is that the configuration retry.backoff.ms (as its currently documented) is only intended for fetch requests, but we have been using it for all requests. Perhaps there should be a separate configuration? For example coordinator.retry.backoff.ms? Or perhaps a hard-coded value is really what we want. On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 91 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91 Is the retryBackoffMs here because that's how long you want to wait for a response before we jump back out of the poll, which fails the request and triggers another NetworkClient.poll() call, which in turn sends the MetadataRequest? Just trying to figure out the flow because a) the flow is unclear and b) this is the only use of metadata in this class. Would this make sense to push into NetworkClient, which is the one responsible for making the request anyway? I'm not sure it's a good idea since nothing besides this class uses NetworkClient anymore, but worth thinking about. Guozhang mentioned this code as well, which was copied from KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably could be poll(-1) since we are not actually resending any requests. To me, the weird thing has always been that metadata updates are handled directly in NetworkClient and not in the higher layers. I can see the reason for it, but it makes it difficult to attach code to metadata updates (which is exactly what we need to implement regex subscriptions). I would personally be in favor of pulling it out of NetworkClient, but that has implications for KafkaProducer as well. Moving this method into NetworkClient is probably a better idea at the moment and may give us a fighting chance to get the logic right. On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 119
[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
[ https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621091#comment-14621091 ] Geoffrey Anderson commented on KAFKA-2327: -- Oh quick reply! Yup I just opened a PR for the patch :) broker doesn't start if config defines advertised.host but not advertised.port -- Key: KAFKA-2327 URL: https://issues.apache.org/jira/browse/KAFKA-2327 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Geoffrey Anderson Assignee: Geoffrey Anderson Priority: Minor To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) {code} ... } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) ... {code} - {code} } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 10, 2015, 1 a.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description (updated) --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Diffs (updated) - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala c1f0ccad4900d74e41936fae4c6c2235fe9314fe core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2210: Attachment: KAFKA-2210_2015-07-09_18:00:34.patch KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621561#comment-14621561 ] Guozhang Wang commented on KAFKA-1740: -- Committed to trunk, thanks. Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch, KAFKA-1740_2015-07-09_14:41:31.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2210: Status: Patch Available (was: In Progress) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34492: Patch for KAFKA-2210
On June 9, 2015, 7:13 a.m., Dapeng Sun wrote: core/src/main/scala/kafka/common/ErrorMapping.scala, line 57 https://reviews.apache.org/r/34492/diff/4/?file=979556#file979556line57 Why not use 23 here? I have changed it so it now uses 23. On June 9, 2015, 7:13 a.m., Dapeng Sun wrote: core/src/main/scala/kafka/security/auth/Authorizer.scala, line 32 https://reviews.apache.org/r/34492/diff/4/?file=979558#file979558line32 Resource related operations are authorized, but authorizer itself seems not be authorized. Could a normal user operated the ACL? we may need to add something (eg. Session) to addACL, removeACL and etc. Given add/edit/remove acls will be invoked from CLI and these are not server side APIs (how I wish I would have started with that) that authorization will be done in a different way. Its not really part of this CR but essentially we will have to rely on the acl storage layer's authorization for this (zookeeper in case of default authorizer implementation). On June 9, 2015, 7:13 a.m., Dapeng Sun wrote: core/src/main/scala/kafka/security/auth/Authorizer.scala, line 60 https://reviews.apache.org/r/34492/diff/4/?file=979558#file979558line60 I didn't found any code invoke removeAcls, it seems Acl entities are not cleaned when resource(eg. topic) is deleting, is that really so? or I missed? It is part of the PR that implements CLI. On June 9, 2015, 7:13 a.m., Dapeng Sun wrote: core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala, line 23 https://reviews.apache.org/r/34492/diff/4/?file=979559#file979559line23 I think it is better to use constants or enum to stand for UserType We are using constants by using val. Enum is avoided so a custom authorizer implementation could add any principalType that they wish without having to change code in kafka codebase. - Parth --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review86541 --- On July 10, 2015, 1 a.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 10, 2015, 1 a.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala c1f0ccad4900d74e41936fae4c6c2235fe9314fe core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: Review Request 34492: Patch for KAFKA-2210
On June 11, 2015, 7:19 a.m., Dapeng Sun wrote: core/src/main/scala/kafka/server/KafkaConfig.scala, line 160 https://reviews.apache.org/r/34492/diff/4/?file=979565#file979565line160 Why add a new config file path? could authorization related config options be merged into Kafka Config? Not until https://issues.apache.org/jira/browse/KAFKA-2249 is fixed. - Parth --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/#review87532 --- On July 10, 2015, 1 a.m., Parth Brahmbhatt wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- (Updated July 10, 2015, 1 a.m.) Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- Addressing review comments from Jun. Adding CREATE check for offset topic only if the topic does not exist already. Addressing some more comments. Removing acl.json file Diffs - core/src/main/scala/kafka/api/OffsetRequest.scala f418868046f7c99aefdccd9956541a0cb72b1500 core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 18f5b5b895af1469876b2223841fd90a2dd255e0 core/src/main/scala/kafka/server/KafkaConfig.scala c1f0ccad4900d74e41936fae4c6c2235fe9314fe core/src/main/scala/kafka/server/KafkaServer.scala 18917bc4464b9403b16d85d20c3fd4c24893d1d3 core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 98a5b042a710d3c1064b0379db1d152efc9eabee Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
Re: Review Request 34492: Patch for KAFKA-2210
On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote: core/src/main/scala/kafka/security/auth/Acl.scala, lines 57-62 https://reviews.apache.org/r/34492/diff/1/?file=965651#file965651line57 I tried exactly that but it tunrs out our current Json parser does not work when a json string has other special characters, somehow gets into some double parsing and fails. Has been long since I wrote this code so dont exactly remember why it was failing but I did try it and with current JsonUtil it does not work. Jun Rao wrote: Could you explain a bit which part doesn't work? The following simple test works for me. scala val a = [{\a\: \aa\}] a: String = [{a: aa}] scala JSON.parseFull(a) res4: Option[Any] = Some(List(Map(a - aa))) So I tried this again and found out the issue. Nothing to do with double parsing ( I was probably mixing some other project here) but our current JSON.parseFull does not support custom objects. it only supports primitive types , lists, maps and iterables. It throws IllegalArg if I try to pass the Acl object as is because of following lines. case other: AnyRef = throw new IllegalArgumentException(Unknown arguement of type + other.getClass + : + other) On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote: core/src/main/scala/kafka/security/auth/Authorizer.scala, line 36 https://reviews.apache.org/r/34492/diff/1/?file=965652#file965652line36 In the KIP dicussion it was proposed to add a config authoizer.config.path which will contain path to a property files on all broker hosts. This is how the plugin specific property file gets passed on. Do we want to instead use configurable? Jun Rao wrote: Sorry, but I missed this in the KIP review. I think it's probably better NOT to have another config.path inside a configuration file. We already have other pluggable logic such as the MetrisReporter and will be adding other pluggable logic such as PrincipalExtractor in KAFKA-1690. Introducing a separate config path for each pluggable logic may not be ideal. Also, currently, we allow people to instantiate KafkaServerStartble directly so that people can obtain the properties from any configuration system and pass them to Kafka, instead of assuming that the properties are always specified in a file. So, it's probably better to specify the properties needed by any pluggable logic in the same property file, then pass them to the pluggable logic through the configure() api. We have KAFKA-2249 filed to allow KafkaConfig to do this. Perhaps, we can fix KAFKA-2249 first. can we file another jira so this one is not blocked on KAFKA-2249? Or you feel as this is a config change we are better off just waiting for KAFKA-2249. On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote: core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala, line 22 https://reviews.apache.org/r/34492/diff/1/?file=965653#file965653line22 I haven't added Group support yet but they will be of the form Group:group-name. Why did you get the impression that groups will not have : Jun Rao wrote: Oh, I was just saying that if the group name itself can contain :, parsing will be more difficult if : is the separator. Fixed, users and groups can have : in them, also added a unit test to verify the same. On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote: core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala, line 41 https://reviews.apache.org/r/34492/diff/1/?file=965653#file965653line41 Yes we can and as mentioned in the design doc when no authentication is configured it will be set as User:DrWho?. Jun Rao wrote: So, I guess authentication will always authenticate at the user level and it's up to the Authorization model to implement the user to group mapping? The design it self does not assume that so the authentication layer could authenticate with some other principalType. My current implementation of authorizer makes this assumption (I think) but that is easy to fix and its part of another review request. On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote: core/src/main/scala/kafka/security/auth/Operation.java, line 22 https://reviews.apache.org/r/34492/diff/1/?file=965654#file965654line22 I grepped through kafka code base to understand how enums were used in other parts and all places used java enums. I assumed that was the convention . If that is not the case I can change all enum classes in core to use http://www.scala-lang.org/api/current/index.html#scala.Enumeration Jun Rao wrote: Under core/, we don't have java files except when defining the java api. We implement enum using case object in scala (see BrokerStates as an example). Done. On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote: core/src/test/scala/unit/kafka/security/auth/AclTest.scala, line 36
[jira] [Commented] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621546#comment-14621546 ] Parth Brahmbhatt commented on KAFKA-2210: - Updated reviewboard https://reviews.apache.org/r/34492/diff/ against branch origin/trunk KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: Kafka-trunk #538
See https://builds.apache.org/job/Kafka-trunk/538/changes Changes: [wangguoz] KAFKA-1740 follow-up: add state checking in handling heartbeat request; reviewed by Onur Karaman, Ewen Cheslack-Postavam and Guozhang Wang [jjkoshy] Trivial commit - explicitly exclude build/** from rat check -- [...truncated 1434 lines...] kafka.api.ConsumerBounceTest testConsumptionWithBrokerFailures PASSED kafka.api.ConsumerBounceTest testSeekAndCommitWithBrokerFailures PASSED kafka.api.RequestResponseSerializationTest testSerializationAndDeserialization PASSED kafka.api.ProducerBounceTest testBrokerFailure PASSED kafka.api.ApiUtilsTest testShortStringASCII PASSED kafka.api.ApiUtilsTest testShortStringNonASCII PASSED kafka.api.test.ProducerCompressionTest testCompression[0] PASSED kafka.api.test.ProducerCompressionTest testCompression[1] PASSED kafka.api.test.ProducerCompressionTest testCompression[2] PASSED kafka.api.test.ProducerCompressionTest testCompression[3] PASSED kafka.cluster.BrokerEndPointTest testFromJSON PASSED kafka.cluster.BrokerEndPointTest testFromOldJSON PASSED kafka.cluster.BrokerEndPointTest testBrokerEndpointFromURI PASSED kafka.cluster.BrokerEndPointTest testEndpointFromURI PASSED kafka.cluster.BrokerEndPointTest testHashAndEquals PASSED kafka.cluster.BrokerEndPointTest testSerDe PASSED kafka.javaapi.consumer.ZookeeperConsumerConnectorTest testBasic PASSED kafka.javaapi.message.ByteBufferMessageSetTest testWrittenEqualsRead PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistent PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytes PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEquals PASSED kafka.javaapi.message.ByteBufferMessageSetTest testEqualsWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testSizeInBytesWithCompression PASSED kafka.javaapi.message.ByteBufferMessageSetTest testIteratorIsConsistentWithCompression PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabled PASSED kafka.integration.UncleanLeaderElectionTest testCleanLeaderElectionDisabledByTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionInvalidTopicOverride PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionDisabled PASSED kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabledByTopicOverride PASSED kafka.integration.FetcherTest testFetcher PASSED kafka.integration.RollingBounceTest testRollingBounce PASSED kafka.integration.MinIsrConfigTest testDefaultKafkaConfig PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetchWithCompression PASSED kafka.integration.PrimitiveApiTest testMultiProduce PASSED kafka.integration.PrimitiveApiTest testProduceAndMultiFetch PASSED kafka.integration.PrimitiveApiTest testConsumerEmptyTopic PASSED kafka.integration.PrimitiveApiTest testEmptyFetchRequest PASSED kafka.integration.PrimitiveApiTest testPipelinedProduceRequests PASSED kafka.integration.PrimitiveApiTest testFetchRequestCanProperlySerialize PASSED kafka.integration.PrimitiveApiTest testDefaultEncoderProducerAndFetch PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooLow PASSED kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooHigh PASSED kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooLow PASSED kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooHigh PASSED kafka.integration.TopicMetadataTest testBasicTopicMetadata PASSED kafka.integration.TopicMetadataTest testAutoCreateTopic PASSED kafka.integration.TopicMetadataTest testIsrAfterBrokerShutDownAndJoinsBack PASSED kafka.integration.TopicMetadataTest testTopicMetadataRequest PASSED kafka.integration.TopicMetadataTest testGetAllTopicMetadata PASSED kafka.metrics.KafkaTimerTest testKafkaTimer PASSED kafka.utils.UtilsTest testCsvList PASSED kafka.utils.UtilsTest testCsvMap PASSED kafka.utils.UtilsTest testReadBytes PASSED kafka.utils.UtilsTest testReadInt PASSED kafka.utils.UtilsTest testInLock PASSED kafka.utils.UtilsTest testCircularIterator PASSED kafka.utils.UtilsTest testReplaceSuffix PASSED kafka.utils.UtilsTest testSwallow PASSED kafka.utils.UtilsTest testAbs PASSED kafka.utils.UtilsTest testDoublyLinkedList PASSED kafka.utils.SchedulerTest testReentrantTaskInMockScheduler PASSED kafka.utils.SchedulerTest testNonPeriodicTask PASSED kafka.utils.SchedulerTest testRestart PASSED kafka.utils.SchedulerTest testPeriodicTask PASSED kafka.utils.SchedulerTest testMockSchedulerPeriodicTask PASSED kafka.utils.SchedulerTest testMockSchedulerNonPeriodicTask PASSED kafka.utils.ByteBoundedBlockingQueueTest testByteBoundedBlockingQueue PASSED kafka.utils.CommandLineUtilsTest testParseSingleArg PASSED kafka.utils.CommandLineUtilsTest
[jira] [Updated] (KAFKA-2322) Use Java 7 features to improve code quality
[ https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2322: --- Status: Patch Available (was: Open) Use Java 7 features to improve code quality --- Key: KAFKA-2322 URL: https://issues.apache.org/jira/browse/KAFKA-2322 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Attachments: KAFKA-2322.patch Once KAFKA-2316 is merged, we should take advantage of Java 7 features that improve code quality (readability, safety, etc.). Examples: * Diamond operator * Try with resources * Multi-catch * String in switch (maybe) * Suppressed exceptions (maybe) This issue is for simple and mechanical improvements. More complex changes should be considered in separate issues (using nio.2, new concurrency classes, etc.). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2322) Use Java 7 features to improve code quality
[ https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2322: --- Attachment: KAFKA-2322.patch Use Java 7 features to improve code quality --- Key: KAFKA-2322 URL: https://issues.apache.org/jira/browse/KAFKA-2322 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Attachments: KAFKA-2322.patch Once KAFKA-2316 is merged, we should take advantage of Java 7 features that improve code quality (readability, safety, etc.). Examples: * Diamond operator * Try with resources * Multi-catch * String in switch (maybe) * Suppressed exceptions (maybe) This issue is for simple and mechanical improvements. More complex changes should be considered in separate issues (using nio.2, new concurrency classes, etc.). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36345: Patch for KAFKA-2322
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36345/ --- (Updated July 9, 2015, 9:05 a.m.) Review request for kafka. Bugs: KAFKA-2322 https://issues.apache.org/jira/browse/KAFKA-2322 Repository: kafka Description --- kafka-2322; Use multi-catch to reduce redundancy kafka-2322; Use try with resources instead of try/finally It's more concise and handles the exception from `close` better. Diffs - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 9ebda5eae5936a6b0897e74cfb231803c9d6a2da clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 clients/src/main/java/org/apache/kafka/clients/Metadata.java 0387f2602c93a62cd333f1b3c569ca6b66b5b779 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java daff34db5bf2144e9dc274b23dc56b88f4efafdc clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 36e7ffa2a0a0b9bfaa41c22feb1be8ae476ab321 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 13f4d5958052afcc8ad66eacbcae50b6fd149398 clients/src/main/java/org/apache/kafka/common/Cluster.java 60594a7dce90130911a626ea80cf80d815aeb46e clients/src/main/java/org/apache/kafka/common/MetricName.java 04b4a09badd5157a426812b78d491248a4d38fba clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 4170bcc7def5b50d8aa20e8e84089c35b705b527 clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java 6b9590c418aedd2727544c5dd23c017b4b72467a clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 5f6caf957e3bd3789e575236b00b3996cd7731c2 clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java 78c93e88fa0b886b8a618e80dfd86ff53f753507 clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b341b7daaa10204906d78b812fb05fd27bc69373 clients/src/main/java/org/apache/kafka/common/network/Selector.java aaf60c98c2c0f4513a8d65ee0db67953a529d598 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java dab1a94dd29563688b6ecf4eeb0e180b06049d3f clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 3a14ac0fb350a9d60b5cba82d55d4656cb5be8d7 clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java df073a0e76cc5cc731861b9604d0e19a928970e0 clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 1ffe0760b40c4c669ffceedd231a2129e0eb9b24
How low-level management of threads is justified in Mirror Maker?
I've skimmed through a source code of Mirror Maker and I see that it relies on raw threads https://github.com/apache/kafka/blob/7df39e0394a6fd2f26e8f12768ebf7fecd56e3da/core/src/main/scala/kafka/tools/MirrorMaker.scala#L285-L287, as opposed to Runnable + Thread pool combination. While former approach is just as functional as later, I am quite curious why such decision was done. Anything specific that prevents move to runnables?
[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620038#comment-14620038 ] Onur Karaman commented on KAFKA-1740: - There's a discrepancy between https://reviews.apache.org/r/35231/ and what's checked in. We used to bump the generationId immediately after transitioning to PreparingRebalance. Commit 3f8480ccfb011eb43da774737597c597f703e11b moved the generationId bump to be immediately after the transition to Rebalancing. I think the intent was to support the following rebalance pattern: consumers stop consuming, commit offsets, and then do the join group to get the new partitions they own (which KafkaConsumer does not seem to currently follow). Since the group is already PreparingRebalance by the time the consumer finds out about the rebalance, we need to allow offset commits during the Preparing Rebalance state. I think you can do this in one of two ways: 1. postpone the generationId bump to the Rebalancing state and have handleHeartbeat additionally check for the group being in the Stable state. 2. revert back to having the generationId bump happen in the PreparingRebalance state and have handleCommitOffsets allow offset commits either during Stable state when the request's generationId == group.generationId or when PreparingRebalance and request's generationId == group.generationId - 1. I prefer option 2 because it makes more sense (to me at least) to log a generationId increase as soon as the rebalancing process begins. This commit chose option 1 but didn't fix handleHeartbeat. It currently allows consumers to heartbeat even during the PreparingRebalance group state, which is incorrect. Heartbeats should only be accepted during the Stable state. The problem here is that a consumer gets notified of the rebalance from a HeartbeatResponse when generation ids don't match, yet the generationId bump only happens after the Rebalancing state. By the time we reach the Rebalancing state, PreparingRebalance state has finished and the consumer is already considered failed. Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 36345: Patch for KAFKA-2322
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36345/ --- Review request for kafka. Bugs: KAFKA-2322 https://issues.apache.org/jira/browse/KAFKA-2322 Repository: kafka Description --- kafka-2322; Use multi-catch to reduce redundancy kafka-2322; Use try with resources instead of try/finally It's more concise and handles the exception from `close` better. Diffs - clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 9ebda5eae5936a6b0897e74cfb231803c9d6a2da clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 clients/src/main/java/org/apache/kafka/clients/Metadata.java 0387f2602c93a62cd333f1b3c569ca6b66b5b779 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 48fe7961e2215372d8033ece4af739ea06c6457b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java daff34db5bf2144e9dc274b23dc56b88f4efafdc clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 36e7ffa2a0a0b9bfaa41c22feb1be8ae476ab321 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 13f4d5958052afcc8ad66eacbcae50b6fd149398 clients/src/main/java/org/apache/kafka/common/Cluster.java 60594a7dce90130911a626ea80cf80d815aeb46e clients/src/main/java/org/apache/kafka/common/MetricName.java 04b4a09badd5157a426812b78d491248a4d38fba clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java bae528d31516679bed88ee61b408f209f185a8cc clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 4170bcc7def5b50d8aa20e8e84089c35b705b527 clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java 6b9590c418aedd2727544c5dd23c017b4b72467a clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 5f6caf957e3bd3789e575236b00b3996cd7731c2 clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java 78c93e88fa0b886b8a618e80dfd86ff53f753507 clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b341b7daaa10204906d78b812fb05fd27bc69373 clients/src/main/java/org/apache/kafka/common/network/Selector.java aaf60c98c2c0f4513a8d65ee0db67953a529d598 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java dab1a94dd29563688b6ecf4eeb0e180b06049d3f clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 3a14ac0fb350a9d60b5cba82d55d4656cb5be8d7 clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java df073a0e76cc5cc731861b9604d0e19a928970e0 clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 1ffe0760b40c4c669ffceedd231a2129e0eb9b24
[jira] [Commented] (KAFKA-2322) Use Java 7 features to improve code quality
[ https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620153#comment-14620153 ] Ismael Juma commented on KAFKA-2322: Created reviewboard https://reviews.apache.org/r/36345/diff/ against branch upstream/trunk Use Java 7 features to improve code quality --- Key: KAFKA-2322 URL: https://issues.apache.org/jira/browse/KAFKA-2322 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Attachments: KAFKA-2322.patch Once KAFKA-2316 is merged, we should take advantage of Java 7 features that improve code quality (readability, safety, etc.). Examples: * Diamond operator * Try with resources * Multi-catch * String in switch (maybe) * Suppressed exceptions (maybe) This issue is for simple and mechanical improvements. More complex changes should be considered in separate issues (using nio.2, new concurrency classes, etc.). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2322) Use Java 7 features to improve code quality
[ https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620158#comment-14620158 ] Ismael Juma commented on KAFKA-2322: These changes are mechanical but affect a lot of files, so it would be good to merge relatively quickly to avoid merge conflicts. Use Java 7 features to improve code quality --- Key: KAFKA-2322 URL: https://issues.apache.org/jira/browse/KAFKA-2322 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Attachments: KAFKA-2322.patch Once KAFKA-2316 is merged, we should take advantage of Java 7 features that improve code quality (readability, safety, etc.). Examples: * Diamond operator * Try with resources * Multi-catch * String in switch (maybe) * Suppressed exceptions (maybe) This issue is for simple and mechanical improvements. More complex changes should be considered in separate issues (using nio.2, new concurrency classes, etc.). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 36346: fix heartbeat to allow offset commits during PreparingRebalance
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36346/ --- Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- fix heartbeat to allow offset commits during PreparingRebalance Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e Diff: https://reviews.apache.org/r/36346/diff/ Testing --- Thanks, Onur Karaman
[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620190#comment-14620190 ] Onur Karaman commented on KAFKA-1740: - Created reviewboard https://reviews.apache.org/r/36346/diff/ against branch origin/trunk Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36346: fix heartbeat to allow offset commits during PreparingRebalance
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36346/ --- (Updated July 9, 2015, 9:42 a.m.) Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- fix heartbeat to allow offset commits during PreparingRebalance Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e Diff: https://reviews.apache.org/r/36346/diff/ Testing (updated) --- Manual testing only so far. The OffsetManager merge didn't add anything to ConsumerCoordinatorResponseTest.scala. I'll try to add all the missing OffsetCommitRequest and OffsetFetchRequest handling tests to this rb. Thanks, Onur Karaman
[jira] [Updated] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1740: Attachment: KAFKA-1740.patch Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2322) Use Java 7 features to improve code quality
[ https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620469#comment-14620469 ] Ismael Juma commented on KAFKA-2322: [~sriharsha], sure. :) Use Java 7 features to improve code quality --- Key: KAFKA-2322 URL: https://issues.apache.org/jira/browse/KAFKA-2322 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Attachments: KAFKA-2322.patch Once KAFKA-2316 is merged, we should take advantage of Java 7 features that improve code quality (readability, safety, etc.). Examples: * Diamond operator * Try with resources * Multi-catch * String in switch (maybe) * Suppressed exceptions (maybe) This issue is for simple and mechanical improvements. More complex changes should be considered in separate issues (using nio.2, new concurrency classes, etc.). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2322) Use Java 7 features to improve code quality
[ https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620465#comment-14620465 ] Sriharsha Chintalapani commented on KAFKA-2322: --- [~ijuma] can this be merged after KAFKA-1690 we are getting closer but need few more days to send a new patch. cc [~jjkoshy] [~junrao] Use Java 7 features to improve code quality --- Key: KAFKA-2322 URL: https://issues.apache.org/jira/browse/KAFKA-2322 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Attachments: KAFKA-2322.patch Once KAFKA-2316 is merged, we should take advantage of Java 7 features that improve code quality (readability, safety, etc.). Examples: * Diamond operator * Try with resources * Multi-catch * String in switch (maybe) * Suppressed exceptions (maybe) This issue is for simple and mechanical improvements. More complex changes should be considered in separate issues (using nio.2, new concurrency classes, etc.). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621217#comment-14621217 ] Gwen Shapira commented on KAFKA-2187: - I think that per Apache, we need to acknowledge Spark somewhere since we essentially lifted their code. LICENSE file, maybe? Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621228#comment-14621228 ] Ismael Juma commented on KAFKA-2187: It is mentioned in the script itself (near the top). Happy to add it in LICENSE too if that's the right place. Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621239#comment-14621239 ] Gwen Shapira commented on KAFKA-2187: - Sorry, I missed that. I think its good and I don't see any legal verbiage about having in in LICENSE. (http://www.apache.org/dev/licensing-howto.html) Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621243#comment-14621243 ] Gwen Shapira commented on KAFKA-2187: - One more thing: If I start a merge and cancel (say, by choosing 'n' when asked if I want to proceed), I'm left on a detached branch. Any chance the script can put me back in the original branch? or in trunk? Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 775 https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775 I think KAFKA-1894 is already fixed in this patch + KAFKA-2168? Jason Gustafson wrote: I think this is still debatable. Is wakeup() sufficient to assuage our guilt for allowing poll to block indefinitely in spite of the passed timeout? Perhaps I'm the only one, but I'm still holding out hope that we'll be able to enforce the timeout even if we are in the middle of a join group. The code is actually not that far from being able to do so. Yeah makes sense. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91012 --- On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 8, 2015, 9:19 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621257#comment-14621257 ] Ismael Juma commented on KAFKA-2187: Thanks for checking regarding the license. Good question. I didn't add new functionality in the initial version of the script under the assumption that it's been in use for a while and is robust. I can check tomorrow if there's a good reason for the current behaviour under that scenario. Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621260#comment-14621260 ] Ismael Juma commented on KAFKA-2187: In the meantime `git checkout -` should take you to the previous branch. Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 91 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91 Is the retryBackoffMs here because that's how long you want to wait for a response before we jump back out of the poll, which fails the request and triggers another NetworkClient.poll() call, which in turn sends the MetadataRequest? Just trying to figure out the flow because a) the flow is unclear and b) this is the only use of metadata in this class. Would this make sense to push into NetworkClient, which is the one responsible for making the request anyway? I'm not sure it's a good idea since nothing besides this class uses NetworkClient anymore, but worth thinking about. Jason Gustafson wrote: Guozhang mentioned this code as well, which was copied from KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably could be poll(-1) since we are not actually resending any requests. To me, the weird thing has always been that metadata updates are handled directly in NetworkClient and not in the higher layers. I can see the reason for it, but it makes it difficult to attach code to metadata updates (which is exactly what we need to implement regex subscriptions). I would personally be in favor of pulling it out of NetworkClient, but that has implications for KafkaProducer as well. Moving this method into NetworkClient is probably a better idea at the moment and may give us a fighting chance to get the logic right. When implementing the new producer, people feel it is better letting NetworkClient handles metadata responses specifically aside from other responses so that the high-level module (Sender for producer, and whatever the module for consumer at that time) does not need to handle it. I think this motivation is still valid even as today, but we did not thought through the regex subscriptions at that time.. On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 121 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121 Is this the behavior we want? Both timeout and delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks especially, it seems like we're tying the failure of requests to unrelated events? Previously, I thought request failures may happen quickly, but then there was a Utils.sleep backoff. I am not seeing how that is handled now? If the connection isn't established, won't poll() not send out requests, run client.poll(), possibly return very quickly before the connection is established/fails, then fail the unsent requests, then just return to the caller? And that caller might be one of those while() loops, so it may just continue to retry, busy looping while not actually accomplishing anything. If we're going to introduce queuing of send requests here, it seems like a timeout (rather than fast fail + backoff) might be a more natural solution. So rather than clearUnsentRequests, it might be clearExpiredRequests. Jason Gustafson wrote: This is not clear from the comments, but the main reason for the unsent request queue in this class is to get around NetworkClient's single-send behavior. Currently, NetworkClient will only accept one send at a time for a given node. If you have multiple requests, you have to sprinkle in a bunch of poll() calls and use ready() to make sure that they can all get buffered. This is annoying because you cannot attach a callback to a request until you are actually able to send it. This means you always have to send requests in a poll loop, which implies you can't really have asynchronous requests. The unsent request queue provides a way to buffer these requests before transporting to the network layer which allows us to give a future to the caller immediately. Now, when the user calls poll, we try to send all of the unsent requests immediately (by doing the ready()/poll() dance until nothing else can be sent). Note that none of this depends on the timeout passed to poll: we are just calling poll(0) until nothing can be sent. After that, then we call poll(timeout). It may be the case that there were some requests that failed to be sent. This could be because the client is still connecting to the node or that its send buffer is full. To make the behavior of this class easy to understand, I just fail these requests which means that the unsent request queue never accumulates beyond a poll call. So every request sent either gets transported or fails. I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. It should probably be invoked
Re: Review Request 34965: Patch for KAFKA-2241
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/#review91159 --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala (line 71) https://reviews.apache.org/r/34965/#comment16 As you probably noticed synchronization in the AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. Since the simple consumer is force-closed without the SimpleConsumer’s lock consider the following sequence: - You call forceClose - In the mean time (before isClosed is set to true) an ongoing call to sendRequest recreates the connection - The fetcher thread will subsequently exit (since the ShutdownableThread’s isRunning flag is false) - So even though the SimpleConsumer is _closed_ at that point, the connection will remain Can you verify or is it a non-issue? core/src/main/scala/kafka/server/AbstractFetcherThread.scala https://reviews.apache.org/r/34965/#comment18 You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of. - Joel Koshy On June 3, 2015, 10:30 p.m., Dong Lin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/ --- (Updated June 3, 2015, 10:30 p.m.) Review request for kafka. Bugs: KAFKA-2241 https://issues.apache.org/jira/browse/KAFKA-2241 Repository: kafka Description --- KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) Diffs - core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 Diff: https://reviews.apache.org/r/34965/diff/ Testing --- Thanks, Dong Lin
[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621174#comment-14621174 ] Onur Karaman commented on KAFKA-1740: - Sounds good. Just a heads up, KAFKA-2123 checks if we're PreparingRebalance. A more accurate check would be if we're not Stable. So I would suggest the following to handleHeartbeat: {code} } else if (generationId != group.generationId || !group.is(Stable)) { responseCallback(Errors.ILLEGAL_GENERATION.code) } {code} Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
[ https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621176#comment-14621176 ] Edward Ribeiro commented on KAFKA-2327: --- Left some minor review comments, hope you don't mind. :) broker doesn't start if config defines advertised.host but not advertised.port -- Key: KAFKA-2327 URL: https://issues.apache.org/jira/browse/KAFKA-2327 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Geoffrey Anderson Assignee: Geoffrey Anderson Priority: Minor To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) {code} ... } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) ... {code} - {code} } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-1740: --- Attachment: KAFKA-1740.patch Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621193#comment-14621193 ] Jason Gustafson commented on KAFKA-1740: Created reviewboard https://reviews.apache.org/r/36368/diff/ against branch upstream/trunk Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36368: Patch for KAFKA-1740
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36368/#review91184 --- Ship it! Ship It! - Onur Karaman On July 9, 2015, 8:23 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36368/ --- (Updated July 9, 2015, 8:23 p.m.) Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- KAFKA-1740; heartbeat should return illegal generation if rebalancing Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e Diff: https://reviews.apache.org/r/36368/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 36368: Patch for KAFKA-1740
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36368/#review91187 --- Could you add some unit test for this fix? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (line 358) https://reviews.apache.org/r/36368/#comment144470 I feel it is helpful to add add the new members list in this info entry as well. I can do that upon checkin if people are OK with it? - Guozhang Wang On July 9, 2015, 8:23 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36368/ --- (Updated July 9, 2015, 8:23 p.m.) Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- KAFKA-1740; heartbeat should return illegal generation if rebalancing Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e Diff: https://reviews.apache.org/r/36368/diff/ Testing --- Thanks, Jason Gustafson
[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621144#comment-14621144 ] Guozhang Wang commented on KAFKA-1740: -- Thanks for the findings [~onurkaraman]! And sorry I overlooked this issue when making the commit. As for the solution options, I actually still prefer option 1 because I feel it makes more sense to me to only log generation id increase when we finished form a new group so that we can log the new group members along with the new generation id (so I guess it is purely personal taste :) Anyways I think this is an urgent bug fix, so if you are OK with option 1 I can extract it from [~hachikuji]'s patch and commit it to unblock people? Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2303) Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction failures
[ https://issues.apache.org/jira/browse/KAFKA-2303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2303: - Fix Version/s: 0.8.3 Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction failures Key: KAFKA-2303 URL: https://issues.apache.org/jira/browse/KAFKA-2303 Project: Kafka Issue Type: Bug Components: core, log Affects Versions: 0.8.2.1 Reporter: Alexander Demidko Assignee: Jay Kreps Fix For: 0.8.3 We have rolled out the patch for KAFKA-2235 to our kafka cluster, and recently instead of {code} kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to java.lang.IllegalArgumentException: requirement failed: Attempt to add a new entry to a full offset map. {code} we started to see {code} kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to java.lang.IllegalArgumentException: requirement failed: 131390902 messages in segment topic-name-cgstate-8/79840768.log but offset map can fit only 80530612. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads {code} So, we had to roll it back to avoid disk depletion although I'm not sure if it needs to be rolled back in trunk. This patch applies more strict checks than were in place before: even if there is only one unique key for a segment, cleanup will fail if this segment is too big. Does it make sense to eliminate a limit for the offset map slots count, for example to use an offset map backed by a memory mapped file? The limit of 80530612 slots comes from memory / bytesPerEntry, where memory is Int.MaxValue (we use only one cleaner thread) and bytesPerEntry is 8 + digest hash size. Might be wrong, but it seems if the overall number of unique keys per partition is more than 80M slots in an OffsetMap, compaction will always fail and cleaner thread will die. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 36368: Patch for KAFKA-1740
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36368/ --- Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- KAFKA-1740; heartbeat should return illegal generation if rebalancing Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e Diff: https://reviews.apache.org/r/36368/diff/ Testing --- Thanks, Jason Gustafson
[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620788#comment-14620788 ] Onur Karaman commented on KAFKA-1740: - Here are two scenarios from 3f8480ccfb011eb43da774737597c597f703e11b which cause issues: Let the following be a sample consumer: {code} KafkaConsumerString, String consumer = new KafkaConsumerString, String(props); consumer.subscribe(topic1, topic2); while (true) { ConsumerRecordsString, String records = consumer.poll(100); for (ConsumerRecordString, String record : records) { System.out.printf(topic = %s, partition = %d, offset = %d, key = %s, value = %s, record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } {code} 1. Start the consumer. Then expand the partitions for topic1. The coordinator logs will show the state going from Stable - PreparingRebalance - Dead - Stable (because the consmer is running in a loop and will rejoin). This state will now be steady. 2. Start consumer c1. Wait for the group to be Stable. Start consumer c2. The coordinator logs will show the state forever loop from Stable - PreparingRebalance - Rebalancing - Stable - PreparingRebalance - Rebalancing - Stable. Basically c1 joining causes c2 to timeout and c2 joining causes c1 to timeout. Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36346: fix heartbeat to allow offset commits during PreparingRebalance
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36346/#review91118 --- Just FYI, this was also noticed in the patch for KAFKA-2123 (which is moving quite a bit of code around). The solution was a bit different (the condition is different and keeps the existing generation ID increment location). - Ewen Cheslack-Postava On July 9, 2015, 9:42 a.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36346/ --- (Updated July 9, 2015, 9:42 a.m.) Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- fix heartbeat to allow offset commits during PreparingRebalance Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e Diff: https://reviews.apache.org/r/36346/diff/ Testing --- Manual testing only so far. The OffsetManager merge didn't add anything to ConsumerCoordinatorResponseTest.scala. I'll try to add all the missing OffsetCommitRequest and OffsetFetchRequest handling tests to this rb. Thanks, Onur Karaman
[jira] [Updated] (KAFKA-2241) AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
[ https://issues.apache.org/jira/browse/KAFKA-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-2241: Attachment: KAFKA-2241_2015-07-09_15:35:49.patch AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) - Key: KAFKA-2241 URL: https://issues.apache.org/jira/browse/KAFKA-2241 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Labels: quotas Attachments: KAFKA-2241.patch, KAFKA-2241_2015-06-03_15:30:35.patch, KAFKA-2241_2015-07-09_15:35:49.patch, client.java, server.java This is likely a bug from Java. This affects Kafka and here is the patch to fix it. Here is the description of the bug. By description of SocketChannel in Java 7 Documentation. If another thread interrupts the current thread while the read operation is in progress, the it should closes the channel and throw ClosedByInterruptException. However, we find that interrupting the thread will not unblock the channel immediately. Instead, it waits for response or socket timeout before throwing an exception. This will cause problem in the following scenario. Suppose one console_consumer_1 is reading from a topic, and due to quota delay or whatever reason, it block on channel.read(buffer). At this moment, another console_consumer_2 joins and triggers rebalance at console_consumer_1. But consumer_1 will block waiting on the channel.read before it can release partition ownership, causing consumer_2 to fail after a number of failed attempts to obtain partition ownership. In other words, AbstractFetcherThread.shutdown() is not guaranteed to shutdown due to this bug. The problem is confirmed with Java 1.7 and java 1.6. To check it by yourself, you can use the attached server.java and client.java -- start the server before the client and see if client unblock after interruption. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2241) AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
[ https://issues.apache.org/jira/browse/KAFKA-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621396#comment-14621396 ] Dong Lin commented on KAFKA-2241: - Updated reviewboard https://reviews.apache.org/r/34965/diff/ against branch origin/trunk AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) - Key: KAFKA-2241 URL: https://issues.apache.org/jira/browse/KAFKA-2241 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Labels: quotas Attachments: KAFKA-2241.patch, KAFKA-2241_2015-06-03_15:30:35.patch, KAFKA-2241_2015-07-09_15:35:49.patch, client.java, server.java This is likely a bug from Java. This affects Kafka and here is the patch to fix it. Here is the description of the bug. By description of SocketChannel in Java 7 Documentation. If another thread interrupts the current thread while the read operation is in progress, the it should closes the channel and throw ClosedByInterruptException. However, we find that interrupting the thread will not unblock the channel immediately. Instead, it waits for response or socket timeout before throwing an exception. This will cause problem in the following scenario. Suppose one console_consumer_1 is reading from a topic, and due to quota delay or whatever reason, it block on channel.read(buffer). At this moment, another console_consumer_2 joins and triggers rebalance at console_consumer_1. But consumer_1 will block waiting on the channel.read before it can release partition ownership, causing consumer_2 to fail after a number of failed attempts to obtain partition ownership. In other words, AbstractFetcherThread.shutdown() is not guaranteed to shutdown due to this bug. The problem is confirmed with Java 1.7 and java 1.6. To check it by yourself, you can use the attached server.java and client.java -- start the server before the client and see if client unblock after interruption. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34965: Patch for KAFKA-2241
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/ --- (Updated July 9, 2015, 10:35 p.m.) Review request for kafka. Bugs: KAFKA-2241 https://issues.apache.org/jira/browse/KAFKA-2241 Repository: kafka Description --- KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) Diffs (updated) - core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 Diff: https://reviews.apache.org/r/34965/diff/ Testing --- Thanks, Dong Lin
[jira] [Updated] (KAFKA-2241) AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
[ https://issues.apache.org/jira/browse/KAFKA-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-2241: Status: Patch Available (was: In Progress) AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) - Key: KAFKA-2241 URL: https://issues.apache.org/jira/browse/KAFKA-2241 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Labels: quotas Attachments: KAFKA-2241.patch, KAFKA-2241_2015-06-03_15:30:35.patch, KAFKA-2241_2015-07-09_15:35:49.patch, client.java, server.java This is likely a bug from Java. This affects Kafka and here is the patch to fix it. Here is the description of the bug. By description of SocketChannel in Java 7 Documentation. If another thread interrupts the current thread while the read operation is in progress, the it should closes the channel and throw ClosedByInterruptException. However, we find that interrupting the thread will not unblock the channel immediately. Instead, it waits for response or socket timeout before throwing an exception. This will cause problem in the following scenario. Suppose one console_consumer_1 is reading from a topic, and due to quota delay or whatever reason, it block on channel.read(buffer). At this moment, another console_consumer_2 joins and triggers rebalance at console_consumer_1. But consumer_1 will block waiting on the channel.read before it can release partition ownership, causing consumer_2 to fail after a number of failed attempts to obtain partition ownership. In other words, AbstractFetcherThread.shutdown() is not guaranteed to shutdown due to this bug. The problem is confirmed with Java 1.7 and java 1.6. To check it by yourself, you can use the attached server.java and client.java -- start the server before the client and see if client unblock after interruption. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34965: Patch for KAFKA-2241
On July 9, 2015, 7:19 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 71 https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line71 As you probably noticed synchronization in the AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. Since the simple consumer is force-closed without the SimpleConsumer’s lock consider the following sequence: - You call forceClose - In the mean time (before isClosed is set to true) an ongoing call to sendRequest recreates the connection - The fetcher thread will subsequently exit (since the ShutdownableThread’s isRunning flag is false) - So even though the SimpleConsumer is _closed_ at that point, the connection will remain Can you verify or is it a non-issue? Thanks for the catch! Yes this is an issue. After looking through the code carefully I think we have to keep the simpleConsumer.close() to avoid this problem. I have also changed the function name and added comments to document the use of the new API. On July 9, 2015, 7:19 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76 https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76 You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of. I agree. I have updated the code and comments to hopefully avoid any confusion to user. - Dong --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/#review91159 --- On July 9, 2015, 10:35 p.m., Dong Lin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/ --- (Updated July 9, 2015, 10:35 p.m.) Review request for kafka. Bugs: KAFKA-2241 https://issues.apache.org/jira/browse/KAFKA-2241 Repository: kafka Description --- KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) Diffs - core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 Diff: https://reviews.apache.org/r/34965/diff/ Testing --- Thanks, Dong Lin
Jenkins build is back to normal : KafkaPreCommit #143
See https://builds.apache.org/job/KafkaPreCommit/143/changes
Re: Review Request 36368: Patch for KAFKA-1740
On July 9, 2015, 8:40 p.m., Guozhang Wang wrote: Could you add some unit test for this fix? Done. The test case depends on timing like the other response tests, but it should test what we want most of the time and succeed even if there is an unexpected delay in execution. On July 9, 2015, 8:40 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 358 https://reviews.apache.org/r/36368/diff/1/?file=1003888#file1003888line358 I feel it is helpful to add add the new members list in this info entry as well. I can do that upon checkin if people are OK with it? Sounds reasonable to me. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36368/#review91187 --- On July 9, 2015, 9:41 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36368/ --- (Updated July 9, 2015, 9:41 p.m.) Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- KAFKA-1740; add unit test for heartbeat during rebalance Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala 3cd726d291d0b1f9dd30d499c204e40961eb2c41 Diff: https://reviews.apache.org/r/36368/diff/ Testing --- Thanks, Jason Gustafson
[GitHub] kafka pull request: KAFKA-2327; broker doesn't start if config def...
Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/73 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
[ https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621352#comment-14621352 ] ASF GitHub Bot commented on KAFKA-2327: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/73 broker doesn't start if config defines advertised.host but not advertised.port -- Key: KAFKA-2327 URL: https://issues.apache.org/jira/browse/KAFKA-2327 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Geoffrey Anderson Assignee: Geoffrey Anderson Priority: Minor To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) {code} ... } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) ... {code} - {code} } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port
[ https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira resolved KAFKA-2327. - Resolution: Fixed Fix Version/s: 0.8.3 Merged PR. broker doesn't start if config defines advertised.host but not advertised.port -- Key: KAFKA-2327 URL: https://issues.apache.org/jira/browse/KAFKA-2327 Project: Kafka Issue Type: Bug Affects Versions: 0.8.3 Reporter: Geoffrey Anderson Assignee: Geoffrey Anderson Priority: Minor Fix For: 0.8.3 To reproduce locally, in server.properties, define advertised.host and port, but not advertised.port port=9092 advertised.host.name=localhost Then start zookeeper and try to start kafka. The result is an error like so: [2015-07-09 11:29:20,760] FATAL (kafka.Kafka$) kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a broker endpoint at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309) at kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728) at kafka.server.KafkaConfig.init(KafkaConfig.scala:668) at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541) at kafka.Kafka$.main(Kafka.scala:58) at kafka.Kafka.main(Kafka.scala) Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners method, and I believe the fix is (starting at line 727) {code} ... } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + getString(KafkaConfig.AdvertisedHostNameProp) + : + getInt(KafkaConfig.AdvertisedPortProp)) ... {code} - {code} } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { CoreUtils.listenerListToEndPoints(PLAINTEXT:// + advertisedHostName + : + advertisedPort {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621387#comment-14621387 ] Ismael Juma commented on KAFKA-2187: OK, thank you. Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Jenkins build is back to normal : Kafka-trunk #537
See https://builds.apache.org/job/Kafka-trunk/537/changes
[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621329#comment-14621329 ] Jason Gustafson commented on KAFKA-1740: Updated reviewboard https://reviews.apache.org/r/36368/diff/ against branch upstream/trunk Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch, KAFKA-1740_2015-07-09_14:41:31.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 36368: Patch for KAFKA-1740
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36368/ --- (Updated July 9, 2015, 9:41 p.m.) Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description (updated) --- KAFKA-1740; add unit test for heartbeat during rebalance Diffs (updated) - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala 3cd726d291d0b1f9dd30d499c204e40961eb2c41 Diff: https://reviews.apache.org/r/36368/diff/ Testing --- Thanks, Jason Gustafson
[jira] [Updated] (KAFKA-1740) Merge Offset manager into Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-1740: --- Attachment: KAFKA-1740_2015-07-09_14:41:31.patch Merge Offset manager into Coordinator - Key: KAFKA-1740 URL: https://issues.apache.org/jira/browse/KAFKA-1740 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch, KAFKA-1740_2015-07-09_14:41:31.patch This JIRA involves refactoring offset manager and merge it into coordinator, including adding the logic for consumer-id / generation-id checking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster
[ https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621336#comment-14621336 ] Bob Cotton commented on KAFKA-2300: --- Which logs would you like? server, controller, state-change? Error in controller log when broker tries to rejoin cluster --- Key: KAFKA-2300 URL: https://issues.apache.org/jira/browse/KAFKA-2300 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Johnny Brown Assignee: Flavio Junqueira Hello Kafka folks, We are having an issue where a broker attempts to join the cluster after being restarted, but is never added to the ISR for its assigned partitions. This is a three-node cluster, and the controller is broker 2. When broker 1 starts, we see the following message in broker 2's controller.log. {{ [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener) java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some UpdateMetadata state changes Map(2 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 1 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 3 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1))) might be lost at kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974) at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) }} {{prod-sver-end}} is a topic we previously deleted. It seems some remnant of it persists in the controller's memory, causing an exception which interrupts the state change triggered by the broker startup. Has anyone seen something like this? Any idea what's happening here? Any information would be greatly appreciated. Thanks, Johnny -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster
[ https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621344#comment-14621344 ] Flavio Junqueira commented on KAFKA-2300: - controller logs, please. Error in controller log when broker tries to rejoin cluster --- Key: KAFKA-2300 URL: https://issues.apache.org/jira/browse/KAFKA-2300 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Johnny Brown Assignee: Flavio Junqueira Hello Kafka folks, We are having an issue where a broker attempts to join the cluster after being restarted, but is never added to the ISR for its assigned partitions. This is a three-node cluster, and the controller is broker 2. When broker 1 starts, we see the following message in broker 2's controller.log. {{ [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener) java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some UpdateMetadata state changes Map(2 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 1 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)), 3 - Map([prod-sver-end,1] - (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1))) might be lost at kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202) at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974) at kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) }} {{prod-sver-end}} is a topic we previously deleted. It seems some remnant of it persists in the controller's memory, causing an exception which interrupts the state change triggered by the broker startup. Has anyone seen something like this? Any idea what's happening here? Any information would be greatly appreciated. Thanks, Johnny -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2187: Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621384#comment-14621384 ] Gwen Shapira commented on KAFKA-2187: - I accidentally committed this script as part of PR 73 (I did a git add, intending to commit this separately, and then up committing PR 73 from same branch). I think the script is in good condition, so I'm not taking it out. Just file a new JIRA to fix the branch thing when you have a chance. Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, KAFKA-2187_2015-06-02_20:05:50.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34965: Patch for KAFKA-2241
On July 9, 2015, 7:19 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76 https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76 You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of. Dong Lin wrote: I agree. I have updated the code and comments to hopefully avoid any confusion to user. Would it work to just modify what you had before in `forceClose` to: ``` disconnect(); close(); ``` - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/#review91159 --- On July 9, 2015, 10:35 p.m., Dong Lin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/ --- (Updated July 9, 2015, 10:35 p.m.) Review request for kafka. Bugs: KAFKA-2241 https://issues.apache.org/jira/browse/KAFKA-2241 Repository: kafka Description --- KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) Diffs - core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 Diff: https://reviews.apache.org/r/34965/diff/ Testing --- Thanks, Dong Lin
Re: Review Request 34965: Patch for KAFKA-2241
On July 9, 2015, 7:19 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76 https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76 You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of. Dong Lin wrote: I agree. I have updated the code and comments to hopefully avoid any confusion to user. Joel Koshy wrote: Would it work to just modify what you had before in `forceClose` to: ``` disconnect(); close(); ``` I think that won't work. The event sequence you described will still cause problem. The following sequence of events may happen: - the forceClose() as well as close() is executed by thread 1 - thread 2 calls sendRequest(). blockingChannel.send(request) will throw ClosedChannelException which triggers reconnect(). It is possible to make this work by changing the way sendRequest() handles ClosedChannelException. But I find the API in the second patch is better. Which solution do you prefer? - Dong --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/#review91159 --- On July 9, 2015, 10:35 p.m., Dong Lin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34965/ --- (Updated July 9, 2015, 10:35 p.m.) Review request for kafka. Bugs: KAFKA-2241 https://issues.apache.org/jira/browse/KAFKA-2241 Repository: kafka Description --- KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) Diffs - core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 Diff: https://reviews.apache.org/r/34965/diff/ Testing --- Thanks, Dong Lin