[jira] [Commented] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster

2015-07-09 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620659#comment-14620659
 ] 

Flavio Junqueira commented on KAFKA-2300:
-

My assessment so far focused on the original problem that has been reported in 
this issue, the one that the controller is stuck because of some spurious 
state. From the description and the comments, 
ControllerBrokerRequestBatch.updateMetadataRequestMap isn't empty at the time a 
broker tries to re-join, which causes the processing of 
KafkaController.onBrokerStartup to fail because a call to 
ControllerBrokerRequestBatch.newBatch   (verifies that three data structures 
are empty, including updateMetadataRequestMap) throws an exception.

From the description, the update metadata requests have -2 for the leader, 
which is LeaderDuringDelete. Such requests are put in the update metadata map 
via a call from onTopicDeletion to controller.sendUpdateMetadataRequest. 
Interestingly, the call to sendUpdateMetadataRequest adds to the update 
metadata map by calling brokerRequestBatch.addUpdateMetadataRequestForBrokers 
and right after invokes brokerRequestBatch.sendRequestsToBrokers. The latter 
is supposed to clear the update metadata map, which makes me think that there 
could have been an exception that interrupted the flow, causing the 
updateMetadataRequestMap to not be cleared. 

I wanted to ask if anyone has anything to add or correct in the analysis so 
far, and I also if [~kharriger] would be able to post the whole log so that I 
can have a look. Ultimately, it'd be great to avoid having the controller 
unable to make progress because of some bad state.

 Error in controller log when broker tries to rejoin cluster
 ---

 Key: KAFKA-2300
 URL: https://issues.apache.org/jira/browse/KAFKA-2300
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Johnny Brown
Assignee: Flavio Junqueira

 Hello Kafka folks,
 We are having an issue where a broker attempts to join the cluster after 
 being restarted, but is never added to the ISR for its assigned partitions. 
 This is a three-node cluster, and the controller is broker 2.
 When broker 1 starts, we see the following message in broker 2's 
 controller.log.
 {{
 [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error 
 while handling broker changes 
 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
 java.lang.IllegalStateException: Controller to broker state change requests 
 batch is not empty while creating a new one. Some UpdateMetadata state 
 changes Map(2 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)),
  1 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)),
  3 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)))
  might be lost 
   at 
 kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202)
   at 
 kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974)
   at 
 kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
   at kafka.utils.Utils$.inLock(Utils.scala:535)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 }}
 {{prod-sver-end}} is a topic we previously deleted. It seems some remnant of 
 it persists in the controller's memory, causing an exception which interrupts 
 the state change triggered by the broker startup.
 Has anyone seen something like this? Any idea what's 

Re: Build failed in Jenkins: KafkaPreCommit #142

2015-07-09 Thread Ewen Cheslack-Postava
Of course you're right -- my bad, I scanned the log too quickly and misread
the cause of the error.

On Wed, Jul 8, 2015 at 10:30 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Agreed that we should change it to exclude build/** - however, build 142
 failure does not seem to be rat-related is it? E.g., compare the console
 output with an earlier build (138 I think). Sorry I can't verify/debug
 right
 now, but can look tomorrow.

 On Wednesday, July 8, 2015, Ewen Cheslack-Postava e...@confluent.io
 javascript:_e(%7B%7D,'cvml','e...@confluent.io'); wrote:

  Joel, this looks like it's failing for basically the same file set as the
  last fix. We probably want to just ignore all of build/, not just
  build/rat/rat-report.xml.
 
  I would say rat may end up being too much of a hassle, but it already
  caught invalid license headers in another case...
 
  -Ewen
 
  On Wed, Jul 8, 2015 at 10:39 AM, Apache Jenkins Server 
  jenk...@builds.apache.org wrote:
 
   See https://builds.apache.org/job/KafkaPreCommit/142/changes
  
   Changes:
  
   [wangguoz] KAFKA-2308: make MemoryRecords idempotent; reviewed by
  Guozhang
   Wang
  
   [cshapi] KAFKA-2316: Drop java 1.6 support; patched by Sriharsha
   Chintalapani reviewed by Ismael Juma and Gwen Shapira
  
   --
   Started by an SCM change
   Building remotely on H11 (Ubuntu ubuntu) in workspace 
   https://builds.apache.org/job/KafkaPreCommit/ws/
   Cloning the remote Git repository
   Cloning repository https://git-wip-us.apache.org/repos/asf/kafka.git
 git init https://builds.apache.org/job/KafkaPreCommit/ws/ #
   timeout=10
   Fetching upstream changes from
   https://git-wip-us.apache.org/repos/asf/kafka.git
 git --version # timeout=10
 git fetch --tags --progress
   https://git-wip-us.apache.org/repos/asf/kafka.git
   +refs/heads/*:refs/remotes/origin/*
 git config remote.origin.url
   https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
 git config remote.origin.fetch +refs/heads/*:refs/remotes/origin/* #
   timeout=10
 git config remote.origin.url
   https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
   Fetching upstream changes from
   https://git-wip-us.apache.org/repos/asf/kafka.git
 git fetch --tags --progress
   https://git-wip-us.apache.org/repos/asf/kafka.git
   +refs/heads/*:refs/remotes/origin/*
 git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
   Checking out Revision 7df39e0394a6fd2f26e8f12768ebf7fecd56e3da
   (refs/remotes/origin/trunk)
 git config core.sparsecheckout # timeout=10
 git checkout -f 7df39e0394a6fd2f26e8f12768ebf7fecd56e3da
 git rev-list 4204f4a06bf23160ceec4aa54331db62681bff82 # timeout=10
   Setting
  
 
 GRADLE_2_1_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.1
   [KafkaPreCommit] $ /bin/bash -xe /tmp/hudson301654436593382.sh
   +
  
 
 /home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.1/bin/gradle
   To honour the JVM settings for this build a new JVM will be forked.
  Please
   consider using the daemon:
   http://gradle.org/docs/2.1/userguide/gradle_daemon.html.
   Download
  
 
 https://repo1.maven.org/maven2/org/ajoberstar/grgit/0.2.3/grgit-0.2.3.pom
   Download
  
 
 https://repo1.maven.org/maven2/org/eclipse/jgit/org.eclipse.jgit/3.3.0.201403021825-r/org.eclipse.jgit-3.3.0.201403021825-r.pom
   Download
  
 
 https://repo1.maven.org/maven2/org/eclipse/jgit/org.eclipse.jgit-parent/3.3.0.201403021825-r/org.eclipse.jgit-parent-3.3.0.201403021825-r.pom
   Download
  
 
 https://repo1.maven.org/maven2/org/eclipse/jgit/org.eclipse.jgit.ui/3.3.0.201403021825-r/org.eclipse.jgit.ui-3.3.0.201403021825-r.pom
   Download
  
 
 https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.jsch/0.0.7/jsch.agentproxy.jsch-0.0.7.pom
   Download
  
 
 https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy/0.0.7/jsch.agentproxy-0.0.7.pom
   Download
  
 
 https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.pageant/0.0.7/jsch.agentproxy.pageant-0.0.7.pom
   Download
  
 
 https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.sshagent/0.0.7/jsch.agentproxy.sshagent-0.0.7.pom
   Download
  
 
 https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.usocket-jna/0.0.7/jsch.agentproxy.usocket-jna-0.0.7.pom
   Download
  
 
 https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.usocket-nc/0.0.7/jsch.agentproxy.usocket-nc-0.0.7.pom
   Download
   https://repo1.maven.org/maven2/com/jcraft/jsch/0.1.46/jsch-0.1.46.pom
   Download
  
 
 https://repo1.maven.org/maven2/com/jcraft/jsch.agentproxy.core/0.0.7/jsch.agentproxy.core-0.0.7.pom
   Download
  
 
 https://repo1.maven.org/maven2/org/ajoberstar/grgit/0.2.3/grgit-0.2.3.jar
   Download
  
 
 https://repo1.maven.org/maven2/org/eclipse/jgit/org.eclipse.jgit/3.3.0.201403021825-r/org.eclipse.jgit-3.3.0.201403021825-r.jar
  

[jira] [Created] (KAFKA-2326) KafkaProducer - the this reference escapes during construction

2015-07-09 Thread Steve Tian (JIRA)
Steve Tian created KAFKA-2326:
-

 Summary: KafkaProducer - the this reference escapes during 
construction
 Key: KAFKA-2326
 URL: https://issues.apache.org/jira/browse/KAFKA-2326
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Affects Versions: 0.8.2.1, 0.8.2.0
Reporter: Steve Tian
Assignee: Jun Rao


The this reference of KafkaProducer escapes during construction as 
KafkaProducer invokes Thread.start() during construction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36333: Patch for KAFKA-2123

2015-07-09 Thread Jason Gustafson


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  Browsed through the patch, overall looks very promising.
  
  I am not very clear on a few detailed changes though:
  
  1. The request future adapter / handler modifications.
  2. Retry backoff implementation seems not correct.
  
  Could you explain a little bit on these two aspects?

Thanks for the feedback. Comments below.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java,
   line 27
  https://reviews.apache.org/r/36333/diff/1/?file=1002919#file1002919line27
 
  You may want to add the committed offset map in the callback since 
  otherwise it is unclear which commit it is referring to when triggered.

This is a good idea.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   lines 88-93
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line88
 
  This is not introduced in the patch, but I am not sure if this is the 
  right way to respect backoff time. For example, if the destination broker 
  is down for a short period of time, poll(retryBackoffMs) will immediately 
  return, and hence this function will busy triggering poll() and fluding the 
  network with metadata requests right?
  
  What we want in this case, is that the consumer should wait for 
  retryBackoffMs before retry sending the next metadata request.

This code was lifted from KafkaConsumer, but I think you are right. I'll fix it 
for the next patch.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 120
  https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line120
 
  We can remove this line since it is checked inside ensureAssignment() 
  already.

It's a little subtle, but the call to ensureCoordinatorKnown in 
ensureAssignment is inside the loop, so it will only be called when an 
assignment is needed. I considered putting it outside the loop, but inside is 
where it needs to be, and it looks odd to have it in both places.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 218
  https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line218
 
  There is a potential risk of not aligning the scheduling of heartbeat 
  with the discovery of the coordinator. For example, let's say:
  
  1. at t0 we call initHeartbeatTask with interval 100;
  2. at t1 the consumer already find the coordinator, but it will not 
  send the first HB until t100;
  3. at t100 the consumer may find itself already been kicked out of the 
  group by the coordinator, and reschedule at t200 and re-join group.
  4. at t101 the consumer has re-joined the group, but will not send the 
  HB until t200, and so on ..

Perhaps the way to deal with this is to give the heartbeat task suspend() and 
resume() methods so that we can align it properly according to coordinator 
events.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 296
  https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line296
 
  Why we change the behavior to directly throw exception here?

I removed the retry action from RequestFuture in favor of a check for 
RetriableException. In this case, both errors are retriable, so the result is 
the same.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java,
   lines 1-43
  https://reviews.apache.org/r/36333/diff/1/?file=1002931#file1002931line1
 
  Not clear why you want to convert the future type in this adapter, can 
  you elaborate a bit?

The idea of the future adapter was to provide a way to convert from a future of 
one type to another. The use case was converting RequestFutureClientResponse 
to instances of the respective request (e.g. RequestFutureJoinGroupResponse). 
If you look at the error handling in the consumer, they are all doing basically 
the same thing. If the error is NOT_COORDINATOR_FOR_CONSUMER, for example, we 
just mark the coordinator dead. I hoped that this api would allow us to handle 
some coordinator errors generically while leaving the respective request 
handler to deal only with its specific errors. Unfortunately, the only generic 
check that this enabled was the check for a disconnect. To locate server error 
codes, you have to traverse the respective response structures, which cannot be 
done generically. At the moment, I'm not sure it brings enough to the table to 
justify its existence, but since it's a fairly common pattern with future 
handling and one that might be helpful in the future, I op
 ted to 

Re: Review Request 36346: fix heartbeat to allow offset commits during PreparingRebalance

2015-07-09 Thread Onur Karaman


 On July 9, 2015, 4:58 p.m., Ewen Cheslack-Postava wrote:
  Just FYI, this was also noticed in the patch for KAFKA-2123 (which is 
  moving quite a bit of code around). The solution was a bit different (the 
  condition is different and keeps the existing generation ID increment 
  location).

Thanks for the heads up Ewen. I started looking at that patch yesterday but 
didn't reach the changes to core (it's at the end of the rb). It looks like 
Jason roughly followed option 1 from my comment in 
https://issues.apache.org/jira/browse/KAFKA-1740?focusedCommentId=14620038page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14620038
 . I prefer the approach I provided because the generation id increment logic 
is easier to understand and the logged state transitions with their generation 
ids are more readable.


- Onur


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36346/#review91118
---


On July 9, 2015, 9:42 a.m., Onur Karaman wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36346/
 ---
 
 (Updated July 9, 2015, 9:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1740
 https://issues.apache.org/jira/browse/KAFKA-1740
 
 
 Repository: kafka
 
 
 Description
 ---
 
 fix heartbeat to allow offset commits during PreparingRebalance
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 476973b2c551db5be3f1c54f94990f0dd15ff65e 
 
 Diff: https://reviews.apache.org/r/36346/diff/
 
 
 Testing
 ---
 
 Manual testing only so far. The OffsetManager merge didn't add anything to 
 ConsumerCoordinatorResponseTest.scala. I'll try to add all the missing 
 OffsetCommitRequest and OffsetFetchRequest handling tests to this rb.
 
 
 Thanks,
 
 Onur Karaman
 




[jira] [Updated] (KAFKA-2298) Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient

2015-07-09 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-2298:
--
Resolution: Fixed
Status: Resolved  (was: Patch Available)

 Client Selector can drop connections on InvalidReceiveException without 
 notifying NetworkClient
 ---

 Key: KAFKA-2298
 URL: https://issues.apache.org/jira/browse/KAFKA-2298
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2298.patch, KAFKA-2298_2015-06-23_18:47:54.patch, 
 KAFKA-2298_2015-06-24_13:00:39.patch


 I run into the problem described in KAFKA-2266 when testing quota. I was told 
 the bug was fixed in KAFKA-2266 after I figured out the problem.
 But the patch provided in KAFKA-2266 probably doesn't solve all related 
 problems. From reading the code there is still one edge case where the client 
 selector can close connection in poll() without notifying NetworkClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread Geoffrey Anderson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Geoffrey Anderson updated KAFKA-2327:
-
Description: 
To reproduce locally, in server.properties, define advertised.host and 
port, but not advertised.port 

port=9092
advertised.host.name=localhost

Then start zookeeper and try to start kafka. The result is an error like so:
[2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
broker endpoint
at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
at 
kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
at kafka.Kafka$.main(Kafka.scala:58)
at kafka.Kafka.main(Kafka.scala)


Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac

the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
method, and I believe the fix is (starting at line 727)

} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
getString(KafkaConfig.AdvertisedHostNameProp) + : + 
getInt(KafkaConfig.AdvertisedPortProp))



-



} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
advertisedHostName + : + advertisedPort







  was:
To reproduce locally, in server.properties, define advertised.host and 
port, but not advertised.port 

port=9092
advertised.host.name=localhost

Then start zookeeper and try to start kafka. The result is an error like so:
[2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
broker endpoint
at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
at 
kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
at kafka.Kafka$.main(Kafka.scala:58)
at kafka.Kafka.main(Kafka.scala)


Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac

the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
method, and I believe the fix is (starting at line 727)

} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
getString(KafkaConfig.AdvertisedHostNameProp) + : + 
getInt(KafkaConfig.AdvertisedPortProp))

-

} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
advertisedHostName + : + advertisedPort








 broker doesn't start if config defines advertised.host but not advertised.port
 --

 Key: KAFKA-2327
 URL: 

[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621085#comment-14621085
 ] 

ASF GitHub Bot commented on KAFKA-2327:
---

GitHub user granders opened a pull request:

https://github.com/apache/kafka/pull/73

KAFKA-2327

Added unit tests as well. These fail without the fix, but pass with the fix.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/confluentinc/kafka KAFKA-2327

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/73.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #73


commit 23b3340b91800ff6568ac8f07f9188659358ecc3
Author: Geoff Anderson ge...@confluent.io
Date:   2015-07-09T19:01:27Z

Fixes KAFKA-2327




 broker doesn't start if config defines advertised.host but not advertised.port
 --

 Key: KAFKA-2327
 URL: https://issues.apache.org/jira/browse/KAFKA-2327
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Geoffrey Anderson
Assignee: Geoffrey Anderson
Priority: Minor

 To reproduce locally, in server.properties, define advertised.host and 
 port, but not advertised.port 
 port=9092
 advertised.host.name=localhost
 Then start zookeeper and try to start kafka. The result is an error like so:
 [2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
 kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
 broker endpoint
   at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
   at 
 kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
   at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
   at kafka.Kafka$.main(Kafka.scala:58)
   at kafka.Kafka.main(Kafka.scala)
 Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac
 the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
 method, and I believe the fix is (starting at line 727)
 {code}
 ...
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 getString(KafkaConfig.AdvertisedHostNameProp) + : + 
 getInt(KafkaConfig.AdvertisedPortProp))
 ...
 {code}
 -
 {code}
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 advertisedHostName + : + advertisedPort
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2327

2015-07-09 Thread granders
GitHub user granders opened a pull request:

https://github.com/apache/kafka/pull/73

KAFKA-2327

Added unit tests as well. These fail without the fix, but pass with the fix.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/confluentinc/kafka KAFKA-2327

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/73.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #73


commit 23b3340b91800ff6568ac8f07f9188659358ecc3
Author: Geoff Anderson ge...@confluent.io
Date:   2015-07-09T19:01:27Z

Fixes KAFKA-2327




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621045#comment-14621045
 ] 

Gwen Shapira commented on KAFKA-2327:
-

Oops. My bug :)

Do you have a patch, or shall I fix this?

 broker doesn't start if config defines advertised.host but not advertised.port
 --

 Key: KAFKA-2327
 URL: https://issues.apache.org/jira/browse/KAFKA-2327
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Geoffrey Anderson
Assignee: Geoffrey Anderson
Priority: Minor

 To reproduce locally, in server.properties, define advertised.host and 
 port, but not advertised.port 
 port=9092
 advertised.host.name=localhost
 Then start zookeeper and try to start kafka. The result is an error like so:
 [2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
 kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
 broker endpoint
   at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
   at 
 kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
   at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
   at kafka.Kafka$.main(Kafka.scala:58)
   at kafka.Kafka.main(Kafka.scala)
 Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac
 the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
 method, and I believe the fix is (starting at line 727)
 {code}
 ...
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 getString(KafkaConfig.AdvertisedHostNameProp) + : + 
 getInt(KafkaConfig.AdvertisedPortProp))
 ...
 {code}
 -
 {code}
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 advertisedHostName + : + advertisedPort
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread Geoffrey Anderson (JIRA)
Geoffrey Anderson created KAFKA-2327:


 Summary: broker doesn't start if config defines advertised.host 
but not advertised.port
 Key: KAFKA-2327
 URL: https://issues.apache.org/jira/browse/KAFKA-2327
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Geoffrey Anderson
Assignee: Geoffrey Anderson
Priority: Minor


To reproduce locally, in server.properties, define advertised.host and 
port, but not advertised.port 

port=9092
advertised.host.name=localhost

Then start zookeeper and try to start kafka. The result is an error like so:
[2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
broker endpoint
at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
at 
kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
at kafka.Kafka$.main(Kafka.scala:58)
at kafka.Kafka.main(Kafka.scala)


Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac

the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
method, and I believe the fix is (starting at line 727)

} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
getString(KafkaConfig.AdvertisedHostNameProp) + : + 
getInt(KafkaConfig.AdvertisedPortProp))

-

} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
advertisedHostName + : + advertisedPort









--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread Geoffrey Anderson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Geoffrey Anderson updated KAFKA-2327:
-
Description: 
To reproduce locally, in server.properties, define advertised.host and 
port, but not advertised.port 

port=9092
advertised.host.name=localhost

Then start zookeeper and try to start kafka. The result is an error like so:
[2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
broker endpoint
at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
at 
kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
at kafka.Kafka$.main(Kafka.scala:58)
at kafka.Kafka.main(Kafka.scala)


Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac

the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
method, and I believe the fix is (starting at line 727)

{code}
...
} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
getString(KafkaConfig.AdvertisedHostNameProp) + : + 
getInt(KafkaConfig.AdvertisedPortProp))
...
{code}

-

{code}
} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
advertisedHostName + : + advertisedPort
{code}






  was:
To reproduce locally, in server.properties, define advertised.host and 
port, but not advertised.port 

port=9092
advertised.host.name=localhost

Then start zookeeper and try to start kafka. The result is an error like so:
[2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
broker endpoint
at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
at 
kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
at kafka.Kafka$.main(Kafka.scala:58)
at kafka.Kafka.main(Kafka.scala)


Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac

the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
method, and I believe the fix is (starting at line 727)

} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
getString(KafkaConfig.AdvertisedHostNameProp) + : + 
getInt(KafkaConfig.AdvertisedPortProp))



-



} else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
  CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
advertisedHostName + : + advertisedPort








 broker doesn't start if config defines advertised.host but not advertised.port
 --

 Key: KAFKA-2327
 URL: 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-09 Thread Jason Gustafson


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 121
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121
 
  Is this the behavior we want? Both timeout and 
  delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks 
  especially, it seems like we're tying the failure of requests to unrelated 
  events?
  
  Previously, I thought request failures may happen quickly, but then 
  there was a Utils.sleep backoff. I am not seeing how that is handled now? 
  If the connection isn't established, won't poll() not send out requests, 
  run client.poll(), possibly return very quickly before the connection is 
  established/fails, then fail the unsent requests, then just return to the 
  caller? And that caller might be one of those while() loops, so it may just 
  continue to retry, busy looping while not actually accomplishing anything.
  
  If we're going to introduce queuing of send requests here, it seems 
  like a timeout (rather than fast fail + backoff) might be a more natural 
  solution. So rather than clearUnsentRequests, it might be 
  clearExpiredRequests.

This is not clear from the comments, but the main reason for the unsent request 
queue in this class is to get around NetworkClient's single-send behavior. 
Currently, NetworkClient will only accept one send at a time for a given node. 
If you have multiple requests, you have to sprinkle in a bunch of poll() calls 
and use ready() to make sure that they can all get buffered. This is annoying 
because you cannot attach a callback to a request until you are actually able 
to send it. This means you always have to send requests in a poll loop, which 
implies you can't really have asynchronous requests. The unsent request queue 
provides a way to buffer these requests before transporting to the network 
layer which allows us to give a future to the caller immediately. 

Now, when the user calls poll, we try to send all of the unsent requests 
immediately (by doing the ready()/poll() dance until nothing else can be sent). 
Note that none of this depends on the timeout passed to poll: we are just 
calling poll(0) until nothing can be sent. After that, then we call 
poll(timeout). It may be the case that there were some requests that failed to 
be sent. This could be because the client is still connecting to the node or 
that its send buffer is full. To make the behavior of this class easy to 
understand, I just fail these requests which means that the unsent request 
queue never accumulates beyond a poll call. So every request sent either gets 
transported or fails.

I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. It 
should probably be invoked whenever we retry a request to the coordinator. One 
potential issue is that the configuration retry.backoff.ms (as its currently 
documented) is only intended for fetch requests, but we have been using it for 
all requests. Perhaps there should be a separate configuration? For example 
coordinator.retry.backoff.ms? Or perhaps a hard-coded value is really what we 
want.


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 91
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91
 
  Is the retryBackoffMs here because that's how long you want to wait for 
  a response before we jump back out of the poll, which fails the request and 
  triggers another NetworkClient.poll() call, which in turn sends the 
  MetadataRequest?
  
  Just trying to figure out the flow because a) the flow is unclear and 
  b) this is the only use of metadata in this class. Would this make sense to 
  push into NetworkClient, which is the one responsible for making the 
  request anyway? I'm not sure it's a good idea since nothing besides this 
  class uses NetworkClient anymore, but worth thinking about.

Guozhang mentioned this code as well, which was copied from KafkaConsumer. I 
think the use of retryBackoffMs is incorrect. It probably could be poll(-1) 
since we are not actually resending any requests.

To me, the weird thing has always been that metadata updates are handled 
directly in NetworkClient and not in the higher layers. I can see the reason 
for it, but it makes it difficult to attach code to metadata updates (which is 
exactly what we need to implement regex subscriptions). I would personally be 
in favor of pulling it out of NetworkClient, but that has implications for 
KafkaProducer as well. Moving this method into NetworkClient is probably a 
better idea at the moment and may give us a fighting chance to get the logic 
right.


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 119
  

[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread Geoffrey Anderson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621091#comment-14621091
 ] 

Geoffrey Anderson commented on KAFKA-2327:
--

Oh quick reply! Yup I just opened a PR for the patch :)

 broker doesn't start if config defines advertised.host but not advertised.port
 --

 Key: KAFKA-2327
 URL: https://issues.apache.org/jira/browse/KAFKA-2327
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Geoffrey Anderson
Assignee: Geoffrey Anderson
Priority: Minor

 To reproduce locally, in server.properties, define advertised.host and 
 port, but not advertised.port 
 port=9092
 advertised.host.name=localhost
 Then start zookeeper and try to start kafka. The result is an error like so:
 [2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
 kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
 broker endpoint
   at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
   at 
 kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
   at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
   at kafka.Kafka$.main(Kafka.scala:58)
   at kafka.Kafka.main(Kafka.scala)
 Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac
 the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
 method, and I believe the fix is (starting at line 727)
 {code}
 ...
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 getString(KafkaConfig.AdvertisedHostNameProp) + : + 
 getInt(KafkaConfig.AdvertisedPortProp))
 ...
 {code}
 -
 {code}
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 advertisedHostName + : + advertisedPort
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34492: Patch for KAFKA-2210

2015-07-09 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34492/
---

(Updated July 10, 2015, 1 a.m.)


Review request for kafka.


Bugs: KAFKA-2210
https://issues.apache.org/jira/browse/KAFKA-2210


Repository: kafka


Description (updated)
---

Addressing review comments from Jun.


Adding CREATE check for offset topic only if the topic does not exist already.


Addressing some more comments.


Removing acl.json file


Diffs (updated)
-

  core/src/main/scala/kafka/api/OffsetRequest.scala 
f418868046f7c99aefdccd9956541a0cb72b1500 
  core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
c75c68589681b2c9d6eba2b440ce5e58cddf6370 
  core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION 
  core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION 
  core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION 
  core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION 
  core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION 
  core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION 
  core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 
18f5b5b895af1469876b2223841fd90a2dd255e0 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
c1f0ccad4900d74e41936fae4c6c2235fe9314fe 
  core/src/main/scala/kafka/server/KafkaServer.scala 
18917bc4464b9403b16d85d20c3fd4c24893d1d3 
  core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
98a5b042a710d3c1064b0379db1d152efc9eabee 

Diff: https://reviews.apache.org/r/34492/diff/


Testing
---


Thanks,

Parth Brahmbhatt



[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.

2015-07-09 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-2210:

Attachment: KAFKA-2210_2015-07-09_18:00:34.patch

 KafkaAuthorizer: Add all public entities, config changes and changes to 
 KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
 --

 Key: KAFKA-2210
 URL: https://issues.apache.org/jira/browse/KAFKA-2210
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Parth Brahmbhatt
Assignee: Parth Brahmbhatt
 Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, 
 KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch


 This is the first subtask for Kafka-1688. As Part of this jira we intend to 
 agree on all the public entities, configs and changes to existing kafka 
 classes to allow pluggable authorizer implementation.
 Please see KIP-11 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface
  for detailed design. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621561#comment-14621561
 ] 

Guozhang Wang commented on KAFKA-1740:
--

Committed to trunk, thanks.

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, 
 KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch, 
 KAFKA-1740_2015-07-09_14:41:31.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.

2015-07-09 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-2210:

Status: Patch Available  (was: In Progress)

 KafkaAuthorizer: Add all public entities, config changes and changes to 
 KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
 --

 Key: KAFKA-2210
 URL: https://issues.apache.org/jira/browse/KAFKA-2210
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Parth Brahmbhatt
Assignee: Parth Brahmbhatt
 Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, 
 KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch


 This is the first subtask for Kafka-1688. As Part of this jira we intend to 
 agree on all the public entities, configs and changes to existing kafka 
 classes to allow pluggable authorizer implementation.
 Please see KIP-11 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface
  for detailed design. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34492: Patch for KAFKA-2210

2015-07-09 Thread Parth Brahmbhatt


 On June 9, 2015, 7:13 a.m., Dapeng Sun wrote:
  core/src/main/scala/kafka/common/ErrorMapping.scala, line 57
  https://reviews.apache.org/r/34492/diff/4/?file=979556#file979556line57
 
  Why not use 23 here?

I have changed it so it now uses 23.


 On June 9, 2015, 7:13 a.m., Dapeng Sun wrote:
  core/src/main/scala/kafka/security/auth/Authorizer.scala, line 32
  https://reviews.apache.org/r/34492/diff/4/?file=979558#file979558line32
 
  Resource related operations are authorized, but authorizer itself seems 
  not be authorized. Could a normal user  operated the ACL? we may need to 
  add something (eg. Session) to addACL, removeACL and etc.

Given add/edit/remove acls will be invoked from CLI and these are not server 
side APIs (how I wish I would have started with that) that authorization will 
be done in a different way. Its not really part of this CR but essentially we 
will have to rely on the acl storage layer's authorization for this (zookeeper 
in case of default authorizer implementation).


 On June 9, 2015, 7:13 a.m., Dapeng Sun wrote:
  core/src/main/scala/kafka/security/auth/Authorizer.scala, line 60
  https://reviews.apache.org/r/34492/diff/4/?file=979558#file979558line60
 
  I didn't found any code invoke removeAcls, it seems Acl entities are 
  not cleaned when resource(eg. topic) is deleting, is that really so? or I 
  missed?

It is part of the PR that implements CLI.


 On June 9, 2015, 7:13 a.m., Dapeng Sun wrote:
  core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala, line 23
  https://reviews.apache.org/r/34492/diff/4/?file=979559#file979559line23
 
  I think it is better to use constants or enum to stand for UserType

We are using constants by using val. Enum is avoided so a custom authorizer 
implementation could add any principalType that they wish without having to 
change code in kafka codebase.


- Parth


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34492/#review86541
---


On July 10, 2015, 1 a.m., Parth Brahmbhatt wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34492/
 ---
 
 (Updated July 10, 2015, 1 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2210
 https://issues.apache.org/jira/browse/KAFKA-2210
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressing review comments from Jun.
 
 
 Adding CREATE check for offset topic only if the topic does not exist already.
 
 
 Addressing some more comments.
 
 
 Removing acl.json file
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/OffsetRequest.scala 
 f418868046f7c99aefdccd9956541a0cb72b1500 
   core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 c75c68589681b2c9d6eba2b440ce5e58cddf6370 
   core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 18f5b5b895af1469876b2223841fd90a2dd255e0 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 c1f0ccad4900d74e41936fae4c6c2235fe9314fe 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 18917bc4464b9403b16d85d20c3fd4c24893d1d3 
   core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 98a5b042a710d3c1064b0379db1d152efc9eabee 
 
 Diff: https://reviews.apache.org/r/34492/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Parth Brahmbhatt
 




Re: Review Request 34492: Patch for KAFKA-2210

2015-07-09 Thread Parth Brahmbhatt


 On June 11, 2015, 7:19 a.m., Dapeng Sun wrote:
  core/src/main/scala/kafka/server/KafkaConfig.scala, line 160
  https://reviews.apache.org/r/34492/diff/4/?file=979565#file979565line160
 
  Why add a new config file path? could authorization related config 
  options be merged into Kafka Config?

Not until https://issues.apache.org/jira/browse/KAFKA-2249 is fixed.


- Parth


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34492/#review87532
---


On July 10, 2015, 1 a.m., Parth Brahmbhatt wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34492/
 ---
 
 (Updated July 10, 2015, 1 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2210
 https://issues.apache.org/jira/browse/KAFKA-2210
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressing review comments from Jun.
 
 
 Adding CREATE check for offset topic only if the topic does not exist already.
 
 
 Addressing some more comments.
 
 
 Removing acl.json file
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/OffsetRequest.scala 
 f418868046f7c99aefdccd9956541a0cb72b1500 
   core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 c75c68589681b2c9d6eba2b440ce5e58cddf6370 
   core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Operation.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION 
   core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION 
   core/src/main/scala/kafka/security/auth/ResourceType.scala PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 18f5b5b895af1469876b2223841fd90a2dd255e0 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 c1f0ccad4900d74e41936fae4c6c2235fe9314fe 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 18917bc4464b9403b16d85d20c3fd4c24893d1d3 
   core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 98a5b042a710d3c1064b0379db1d152efc9eabee 
 
 Diff: https://reviews.apache.org/r/34492/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Parth Brahmbhatt
 




Re: Review Request 34492: Patch for KAFKA-2210

2015-07-09 Thread Parth Brahmbhatt


 On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote:
  core/src/main/scala/kafka/security/auth/Acl.scala, lines 57-62
  https://reviews.apache.org/r/34492/diff/1/?file=965651#file965651line57
 
  I tried exactly that but it tunrs out our current Json parser does not 
  work when a json string has other special characters, somehow gets into 
  some double parsing and fails. Has been long since I wrote this code so 
  dont exactly remember why it was failing but I did try it and with current 
  JsonUtil it does not work.
 
 Jun Rao wrote:
 Could you explain a bit which part doesn't work? The following simple 
 test works for me.
 
 scala val a = [{\a\: \aa\}]
 a: String = [{a: aa}]
 
 scala JSON.parseFull(a)
 res4: Option[Any] = Some(List(Map(a - aa)))

So I tried this again and found out the issue. Nothing to do with double 
parsing ( I was probably mixing some other project here) but our current 
JSON.parseFull does not support custom objects. it only supports primitive 
types , lists, maps and iterables. It throws IllegalArg if I try to pass the 
Acl object as is because of following lines.

case other: AnyRef = throw new IllegalArgumentException(Unknown arguement of 
type  + other.getClass + :  + other)


 On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote:
  core/src/main/scala/kafka/security/auth/Authorizer.scala, line 36
  https://reviews.apache.org/r/34492/diff/1/?file=965652#file965652line36
 
  In the KIP dicussion it was proposed to add a config 
  authoizer.config.path which will contain path to a property files on all 
  broker hosts. This is how the plugin specific property file gets passed on. 
  Do we want to instead use configurable?
 
 Jun Rao wrote:
 Sorry, but I missed this in the KIP review. I think it's probably better 
 NOT to have another config.path inside a configuration file. We already have 
 other pluggable logic such as the MetrisReporter and will be adding other 
 pluggable logic such as PrincipalExtractor in KAFKA-1690. Introducing a 
 separate config path for each pluggable logic may not be ideal. Also, 
 currently, we allow people to instantiate KafkaServerStartble directly so 
 that people can obtain the properties from any configuration system and pass 
 them to Kafka, instead of assuming that the properties are always specified 
 in a file. So, it's probably better to specify the properties needed by any 
 pluggable logic in the same property file, then pass them to the pluggable 
 logic through the configure() api. We have KAFKA-2249 filed to allow 
 KafkaConfig to do this. Perhaps, we can fix KAFKA-2249 first.

can we file another jira so this one is not blocked on KAFKA-2249? Or you feel 
as this is a config change we are better off just waiting for KAFKA-2249.


 On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote:
  core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala, line 22
  https://reviews.apache.org/r/34492/diff/1/?file=965653#file965653line22
 
  I haven't added Group support yet but they will be of the form 
  Group:group-name. Why did you get the impression that groups will not 
  have :
 
 Jun Rao wrote:
 Oh, I was just saying that if the group name itself can contain :, 
 parsing will be more difficult if : is the separator.

Fixed, users and groups can have : in them, also added a unit test to verify 
the same.


 On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote:
  core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala, line 41
  https://reviews.apache.org/r/34492/diff/1/?file=965653#file965653line41
 
  Yes we can and as mentioned in the design doc when no authentication is 
  configured it will be set as User:DrWho?.
 
 Jun Rao wrote:
 So, I guess authentication will always authenticate at the user level and 
 it's up to the Authorization model to implement the user to group mapping?

The design it self does not assume that so the authentication layer could 
authenticate with some other principalType. My current implementation of 
authorizer makes this assumption (I think) but that is easy to fix and its part 
of another review request.


 On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote:
  core/src/main/scala/kafka/security/auth/Operation.java, line 22
  https://reviews.apache.org/r/34492/diff/1/?file=965654#file965654line22
 
  I grepped through kafka code base to understand how enums were used in 
  other parts and all places used java enums. I assumed that was the 
  convention . If that is not the case I can change all enum classes in core 
  to use http://www.scala-lang.org/api/current/index.html#scala.Enumeration
 
 Jun Rao wrote:
 Under core/, we don't have java files except when defining the java api. 
 We implement enum using case object in scala (see BrokerStates as an example).

Done.


 On June 3, 2015, 11:36 p.m., Parth Brahmbhatt wrote:
  core/src/test/scala/unit/kafka/security/auth/AclTest.scala, line 36
  

[jira] [Commented] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.

2015-07-09 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621546#comment-14621546
 ] 

Parth Brahmbhatt commented on KAFKA-2210:
-

Updated reviewboard https://reviews.apache.org/r/34492/diff/
 against branch origin/trunk

 KafkaAuthorizer: Add all public entities, config changes and changes to 
 KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
 --

 Key: KAFKA-2210
 URL: https://issues.apache.org/jira/browse/KAFKA-2210
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Parth Brahmbhatt
Assignee: Parth Brahmbhatt
 Attachments: KAFKA-2210.patch, KAFKA-2210_2015-06-03_16:36:11.patch, 
 KAFKA-2210_2015-06-04_16:07:39.patch, KAFKA-2210_2015-07-09_18:00:34.patch


 This is the first subtask for Kafka-1688. As Part of this jira we intend to 
 agree on all the public entities, configs and changes to existing kafka 
 classes to allow pluggable authorizer implementation.
 Please see KIP-11 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface
  for detailed design. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: Kafka-trunk #538

2015-07-09 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/538/changes

Changes:

[wangguoz] KAFKA-1740 follow-up: add state checking in handling heartbeat 
request; reviewed by Onur Karaman, Ewen Cheslack-Postavam and Guozhang Wang

[jjkoshy] Trivial commit - explicitly exclude build/** from rat check

--
[...truncated 1434 lines...]
kafka.api.ConsumerBounceTest  testConsumptionWithBrokerFailures PASSED

kafka.api.ConsumerBounceTest  testSeekAndCommitWithBrokerFailures PASSED

kafka.api.RequestResponseSerializationTest  
testSerializationAndDeserialization PASSED

kafka.api.ProducerBounceTest  testBrokerFailure PASSED

kafka.api.ApiUtilsTest  testShortStringASCII PASSED

kafka.api.ApiUtilsTest  testShortStringNonASCII PASSED

kafka.api.test.ProducerCompressionTest  testCompression[0] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[1] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[2] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[3] PASSED

kafka.cluster.BrokerEndPointTest  testFromJSON PASSED

kafka.cluster.BrokerEndPointTest  testFromOldJSON PASSED

kafka.cluster.BrokerEndPointTest  testBrokerEndpointFromURI PASSED

kafka.cluster.BrokerEndPointTest  testEndpointFromURI PASSED

kafka.cluster.BrokerEndPointTest  testHashAndEquals PASSED

kafka.cluster.BrokerEndPointTest  testSerDe PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest  testBasic PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testSizeInBytes PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest  
testIteratorIsConsistentWithCompression PASSED

kafka.integration.UncleanLeaderElectionTest  testUncleanLeaderElectionEnabled 
PASSED

kafka.integration.UncleanLeaderElectionTest  
testCleanLeaderElectionDisabledByTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest  
testUncleanLeaderElectionInvalidTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest  testUncleanLeaderElectionDisabled 
PASSED

kafka.integration.UncleanLeaderElectionTest  
testUncleanLeaderElectionEnabledByTopicOverride PASSED

kafka.integration.FetcherTest  testFetcher PASSED

kafka.integration.RollingBounceTest  testRollingBounce PASSED

kafka.integration.MinIsrConfigTest  testDefaultKafkaConfig PASSED

kafka.integration.PrimitiveApiTest  
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest  testMultiProduce PASSED

kafka.integration.PrimitiveApiTest  testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest  testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest  testEmptyFetchRequest PASSED

kafka.integration.PrimitiveApiTest  testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest  testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest  testDefaultEncoderProducerAndFetch PASSED

kafka.integration.AutoOffsetResetTest  testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest  testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest  testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest  testResetToLatestWhenOffsetTooHigh 
PASSED

kafka.integration.TopicMetadataTest  testBasicTopicMetadata PASSED

kafka.integration.TopicMetadataTest  testAutoCreateTopic PASSED

kafka.integration.TopicMetadataTest  testIsrAfterBrokerShutDownAndJoinsBack 
PASSED

kafka.integration.TopicMetadataTest  testTopicMetadataRequest PASSED

kafka.integration.TopicMetadataTest  testGetAllTopicMetadata PASSED

kafka.metrics.KafkaTimerTest  testKafkaTimer PASSED

kafka.utils.UtilsTest  testCsvList PASSED

kafka.utils.UtilsTest  testCsvMap PASSED

kafka.utils.UtilsTest  testReadBytes PASSED

kafka.utils.UtilsTest  testReadInt PASSED

kafka.utils.UtilsTest  testInLock PASSED

kafka.utils.UtilsTest  testCircularIterator PASSED

kafka.utils.UtilsTest  testReplaceSuffix PASSED

kafka.utils.UtilsTest  testSwallow PASSED

kafka.utils.UtilsTest  testAbs PASSED

kafka.utils.UtilsTest  testDoublyLinkedList PASSED

kafka.utils.SchedulerTest  testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest  testNonPeriodicTask PASSED

kafka.utils.SchedulerTest  testRestart PASSED

kafka.utils.SchedulerTest  testPeriodicTask PASSED

kafka.utils.SchedulerTest  testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest  testMockSchedulerNonPeriodicTask PASSED

kafka.utils.ByteBoundedBlockingQueueTest  testByteBoundedBlockingQueue PASSED

kafka.utils.CommandLineUtilsTest  testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest  

[jira] [Updated] (KAFKA-2322) Use Java 7 features to improve code quality

2015-07-09 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2322:
---
Status: Patch Available  (was: Open)

 Use Java 7 features to improve code quality
 ---

 Key: KAFKA-2322
 URL: https://issues.apache.org/jira/browse/KAFKA-2322
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Attachments: KAFKA-2322.patch


 Once KAFKA-2316 is merged, we should take advantage of Java 7 features that 
 improve code quality (readability, safety, etc.).
 Examples:
 * Diamond operator
 * Try with resources
 * Multi-catch
 * String in switch (maybe)
 * Suppressed exceptions (maybe)
 This issue is for simple and mechanical improvements. More complex changes  
 should be considered in separate issues (using nio.2, new concurrency 
 classes, etc.).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2322) Use Java 7 features to improve code quality

2015-07-09 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2322:
---
Attachment: KAFKA-2322.patch

 Use Java 7 features to improve code quality
 ---

 Key: KAFKA-2322
 URL: https://issues.apache.org/jira/browse/KAFKA-2322
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Attachments: KAFKA-2322.patch


 Once KAFKA-2316 is merged, we should take advantage of Java 7 features that 
 improve code quality (readability, safety, etc.).
 Examples:
 * Diamond operator
 * Try with resources
 * Multi-catch
 * String in switch (maybe)
 * Suppressed exceptions (maybe)
 This issue is for simple and mechanical improvements. More complex changes  
 should be considered in separate issues (using nio.2, new concurrency 
 classes, etc.).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36345: Patch for KAFKA-2322

2015-07-09 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36345/
---

(Updated July 9, 2015, 9:05 a.m.)


Review request for kafka.


Bugs: KAFKA-2322
https://issues.apache.org/jira/browse/KAFKA-2322


Repository: kafka


Description
---

kafka-2322; Use multi-catch to reduce redundancy


kafka-2322; Use try with resources instead of try/finally

It's more concise and handles the exception from `close`
better.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
9ebda5eae5936a6b0897e74cfb231803c9d6a2da 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
0387f2602c93a62cd333f1b3c569ca6b66b5b779 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
daff34db5bf2144e9dc274b23dc56b88f4efafdc 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
7aa076084c894bb8f47b9df2c086475b06f47060 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
46e26a665a22625d50888efa7b53472279f36e79 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1c8172cd45f6715262f9a6f497a7b1797a834a3 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
695eaf63db9a5fa20dc2ca68957901462a96cd96 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 13fc9af7392b4ade958daf3b0c9a165ddda351a6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 683745304c671952ff566f23b5dd4cf3ab75377a 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
36e7ffa2a0a0b9bfaa41c22feb1be8ae476ab321 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
aa264202f2724907924985a5ecbe74afc4c6c04b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
 4cb1e50d6c4ed55241aeaef1d3af09def5274103 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
0baf16e55046a2f49f6431e01d52c323c95eddf0 
  clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
13f4d5958052afcc8ad66eacbcae50b6fd149398 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
60594a7dce90130911a626ea80cf80d815aeb46e 
  clients/src/main/java/org/apache/kafka/common/MetricName.java 
04b4a09badd5157a426812b78d491248a4d38fba 
  clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
bae528d31516679bed88ee61b408f209f185a8cc 
  clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
4170bcc7def5b50d8aa20e8e84089c35b705b527 
  clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java 
6b9590c418aedd2727544c5dd23c017b4b72467a 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
5f6caf957e3bd3789e575236b00b3996cd7731c2 
  clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
ca823fd4639523018311b814fde69b6177e73b97 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java 
78c93e88fa0b886b8a618e80dfd86ff53f753507 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java 
b341b7daaa10204906d78b812fb05fd27bc69373 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
aaf60c98c2c0f4513a8d65ee0db67953a529d598 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
4c0ecc3badd99727b5bd9d430364e61c184e0923 
  clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
dab1a94dd29563688b6ecf4eeb0e180b06049d3f 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 
3a14ac0fb350a9d60b5cba82d55d4656cb5be8d7 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
df073a0e76cc5cc731861b9604d0e19a928970e0 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
eb8951fba48c335095cc43fc3672de1c733e07ff 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ffe0760b40c4c669ffceedd231a2129e0eb9b24 
  

How low-level management of threads is justified in Mirror Maker?

2015-07-09 Thread Kostya Golikov
I've skimmed through a source code of Mirror Maker and I see that it relies
on raw threads
https://github.com/apache/kafka/blob/7df39e0394a6fd2f26e8f12768ebf7fecd56e3da/core/src/main/scala/kafka/tools/MirrorMaker.scala#L285-L287,
as opposed to Runnable + Thread pool combination. While former approach
is just as functional as later, I am quite curious why such decision was
done. Anything specific that prevents move to runnables?


[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620038#comment-14620038
 ] 

Onur Karaman commented on KAFKA-1740:
-

There's a discrepancy between https://reviews.apache.org/r/35231/ and what's 
checked in.

We used to bump the generationId immediately after transitioning to 
PreparingRebalance. Commit 3f8480ccfb011eb43da774737597c597f703e11b moved the 
generationId bump to be immediately after the transition to Rebalancing.

I think the intent was to support the following rebalance pattern: consumers 
stop consuming, commit offsets, and then do the join group to get the new 
partitions they own (which KafkaConsumer does not seem to currently follow). 
Since the group is already PreparingRebalance by the time the consumer finds 
out about the rebalance, we need to allow offset commits during the Preparing 
Rebalance state. I think you can do this in one of two ways:
1. postpone the generationId bump to the Rebalancing state and have 
handleHeartbeat additionally check for the group being in the Stable state.
2. revert back to having the generationId bump happen in the PreparingRebalance 
state and have handleCommitOffsets allow offset commits either during Stable 
state when the request's generationId == group.generationId or when 
PreparingRebalance and request's generationId == group.generationId - 1.

I prefer option 2 because it makes more sense (to me at least) to log a 
generationId increase as soon as the rebalancing process begins.

This commit chose option 1 but didn't fix handleHeartbeat. It currently allows 
consumers to heartbeat even during the PreparingRebalance group state, which is 
incorrect. Heartbeats should only be accepted during the Stable state. The 
problem here is that a consumer gets notified of the rebalance from a 
HeartbeatResponse when generation ids don't match, yet the generationId bump 
only happens after the Rebalancing state. By the time we reach the Rebalancing 
state, PreparingRebalance state has finished and the consumer is already 
considered failed.

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740_2015-06-29_18:21:42.patch, 
 KAFKA-1740_2015-06-29_18:44:54.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 36345: Patch for KAFKA-2322

2015-07-09 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36345/
---

Review request for kafka.


Bugs: KAFKA-2322
https://issues.apache.org/jira/browse/KAFKA-2322


Repository: kafka


Description
---

kafka-2322; Use multi-catch to reduce redundancy


kafka-2322; Use try with resources instead of try/finally

It's more concise and handles the exception from `close`
better.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
9ebda5eae5936a6b0897e74cfb231803c9d6a2da 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
0387f2602c93a62cd333f1b3c569ca6b66b5b779 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
daff34db5bf2144e9dc274b23dc56b88f4efafdc 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
7aa076084c894bb8f47b9df2c086475b06f47060 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
46e26a665a22625d50888efa7b53472279f36e79 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1c8172cd45f6715262f9a6f497a7b1797a834a3 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
695eaf63db9a5fa20dc2ca68957901462a96cd96 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 13fc9af7392b4ade958daf3b0c9a165ddda351a6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 683745304c671952ff566f23b5dd4cf3ab75377a 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
36e7ffa2a0a0b9bfaa41c22feb1be8ae476ab321 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
aa264202f2724907924985a5ecbe74afc4c6c04b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
 4cb1e50d6c4ed55241aeaef1d3af09def5274103 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
0baf16e55046a2f49f6431e01d52c323c95eddf0 
  clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
13f4d5958052afcc8ad66eacbcae50b6fd149398 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
60594a7dce90130911a626ea80cf80d815aeb46e 
  clients/src/main/java/org/apache/kafka/common/MetricName.java 
04b4a09badd5157a426812b78d491248a4d38fba 
  clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
bae528d31516679bed88ee61b408f209f185a8cc 
  clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
4170bcc7def5b50d8aa20e8e84089c35b705b527 
  clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java 
6b9590c418aedd2727544c5dd23c017b4b72467a 
  clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
5f6caf957e3bd3789e575236b00b3996cd7731c2 
  clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
ca823fd4639523018311b814fde69b6177e73b97 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java 
78c93e88fa0b886b8a618e80dfd86ff53f753507 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java 
b341b7daaa10204906d78b812fb05fd27bc69373 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
aaf60c98c2c0f4513a8d65ee0db67953a529d598 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
4c0ecc3badd99727b5bd9d430364e61c184e0923 
  clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
dab1a94dd29563688b6ecf4eeb0e180b06049d3f 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 
3a14ac0fb350a9d60b5cba82d55d4656cb5be8d7 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
df073a0e76cc5cc731861b9604d0e19a928970e0 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
eb8951fba48c335095cc43fc3672de1c733e07ff 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ffe0760b40c4c669ffceedd231a2129e0eb9b24 
  

[jira] [Commented] (KAFKA-2322) Use Java 7 features to improve code quality

2015-07-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620153#comment-14620153
 ] 

Ismael Juma commented on KAFKA-2322:


Created reviewboard https://reviews.apache.org/r/36345/diff/
 against branch upstream/trunk

 Use Java 7 features to improve code quality
 ---

 Key: KAFKA-2322
 URL: https://issues.apache.org/jira/browse/KAFKA-2322
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Attachments: KAFKA-2322.patch


 Once KAFKA-2316 is merged, we should take advantage of Java 7 features that 
 improve code quality (readability, safety, etc.).
 Examples:
 * Diamond operator
 * Try with resources
 * Multi-catch
 * String in switch (maybe)
 * Suppressed exceptions (maybe)
 This issue is for simple and mechanical improvements. More complex changes  
 should be considered in separate issues (using nio.2, new concurrency 
 classes, etc.).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2322) Use Java 7 features to improve code quality

2015-07-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620158#comment-14620158
 ] 

Ismael Juma commented on KAFKA-2322:


These changes are mechanical but affect a lot of files, so it would be good to 
merge relatively quickly to avoid merge conflicts.

 Use Java 7 features to improve code quality
 ---

 Key: KAFKA-2322
 URL: https://issues.apache.org/jira/browse/KAFKA-2322
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Attachments: KAFKA-2322.patch


 Once KAFKA-2316 is merged, we should take advantage of Java 7 features that 
 improve code quality (readability, safety, etc.).
 Examples:
 * Diamond operator
 * Try with resources
 * Multi-catch
 * String in switch (maybe)
 * Suppressed exceptions (maybe)
 This issue is for simple and mechanical improvements. More complex changes  
 should be considered in separate issues (using nio.2, new concurrency 
 classes, etc.).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 36346: fix heartbeat to allow offset commits during PreparingRebalance

2015-07-09 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36346/
---

Review request for kafka.


Bugs: KAFKA-1740
https://issues.apache.org/jira/browse/KAFKA-1740


Repository: kafka


Description
---

fix heartbeat to allow offset commits during PreparingRebalance


Diffs
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
476973b2c551db5be3f1c54f94990f0dd15ff65e 

Diff: https://reviews.apache.org/r/36346/diff/


Testing
---


Thanks,

Onur Karaman



[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620190#comment-14620190
 ] 

Onur Karaman commented on KAFKA-1740:
-

Created reviewboard https://reviews.apache.org/r/36346/diff/
 against branch origin/trunk

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, 
 KAFKA-1740_2015-06-29_18:44:54.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36346: fix heartbeat to allow offset commits during PreparingRebalance

2015-07-09 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36346/
---

(Updated July 9, 2015, 9:42 a.m.)


Review request for kafka.


Bugs: KAFKA-1740
https://issues.apache.org/jira/browse/KAFKA-1740


Repository: kafka


Description
---

fix heartbeat to allow offset commits during PreparingRebalance


Diffs
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
476973b2c551db5be3f1c54f94990f0dd15ff65e 

Diff: https://reviews.apache.org/r/36346/diff/


Testing (updated)
---

Manual testing only so far. The OffsetManager merge didn't add anything to 
ConsumerCoordinatorResponseTest.scala. I'll try to add all the missing 
OffsetCommitRequest and OffsetFetchRequest handling tests to this rb.


Thanks,

Onur Karaman



[jira] [Updated] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-1740:

Attachment: KAFKA-1740.patch

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, 
 KAFKA-1740_2015-06-29_18:44:54.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2322) Use Java 7 features to improve code quality

2015-07-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620469#comment-14620469
 ] 

Ismael Juma commented on KAFKA-2322:


[~sriharsha], sure. :)

 Use Java 7 features to improve code quality
 ---

 Key: KAFKA-2322
 URL: https://issues.apache.org/jira/browse/KAFKA-2322
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Attachments: KAFKA-2322.patch


 Once KAFKA-2316 is merged, we should take advantage of Java 7 features that 
 improve code quality (readability, safety, etc.).
 Examples:
 * Diamond operator
 * Try with resources
 * Multi-catch
 * String in switch (maybe)
 * Suppressed exceptions (maybe)
 This issue is for simple and mechanical improvements. More complex changes  
 should be considered in separate issues (using nio.2, new concurrency 
 classes, etc.).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2322) Use Java 7 features to improve code quality

2015-07-09 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620465#comment-14620465
 ] 

Sriharsha Chintalapani commented on KAFKA-2322:
---

[~ijuma] can this be merged after KAFKA-1690 we are getting closer but need few 
more days to send a new patch.
cc [~jjkoshy] [~junrao]

 Use Java 7 features to improve code quality
 ---

 Key: KAFKA-2322
 URL: https://issues.apache.org/jira/browse/KAFKA-2322
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Attachments: KAFKA-2322.patch


 Once KAFKA-2316 is merged, we should take advantage of Java 7 features that 
 improve code quality (readability, safety, etc.).
 Examples:
 * Diamond operator
 * Try with resources
 * Multi-catch
 * String in switch (maybe)
 * Suppressed exceptions (maybe)
 This issue is for simple and mechanical improvements. More complex changes  
 should be considered in separate issues (using nio.2, new concurrency 
 classes, etc.).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621217#comment-14621217
 ] 

Gwen Shapira commented on KAFKA-2187:
-

I think that per Apache, we need to acknowledge Spark somewhere since we 
essentially lifted their code. LICENSE file, maybe?

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621228#comment-14621228
 ] 

Ismael Juma commented on KAFKA-2187:


It is mentioned in the script itself (near the top). Happy to add it in LICENSE 
too if that's the right place.

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621239#comment-14621239
 ] 

Gwen Shapira commented on KAFKA-2187:
-

Sorry, I missed that. I think its good and I don't see any legal verbiage about 
having in in LICENSE. 
(http://www.apache.org/dev/licensing-howto.html)

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621243#comment-14621243
 ] 

Gwen Shapira commented on KAFKA-2187:
-

One more thing:
If I start a merge and cancel (say, by choosing 'n' when asked if I want to 
proceed), I'm left on a detached branch. Any chance the script can put me back 
in the original branch? or in trunk?

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36333: Patch for KAFKA-2123

2015-07-09 Thread Guozhang Wang


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 775
  https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775
 
  I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?
 
 Jason Gustafson wrote:
 I think this is still debatable. Is wakeup() sufficient to assuage our 
 guilt for allowing poll to block indefinitely in spite of the passed timeout? 
 Perhaps I'm the only one, but I'm still holding out hope that we'll be able 
 to enforce the timeout even if we are in the middle of a join group. The code 
 is actually not that far from being able to do so.

Yeah makes sense.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
---


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 8, 2015, 9:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  d085fe5c9e2a0567893508a1c71f014fae6d7510 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  405efdc7a59438731cbc3630876bda0042a3adb3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  

[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621257#comment-14621257
 ] 

Ismael Juma commented on KAFKA-2187:


Thanks for checking regarding the license.

Good question. I didn't add new functionality in the initial version of the 
script under the assumption that it's been in use for a while and is robust. I 
can check tomorrow if there's a good reason for the current behaviour under 
that scenario.

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621260#comment-14621260
 ] 

Ismael Juma commented on KAFKA-2187:


In the meantime `git checkout -` should take you to the previous branch.

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36333: Patch for KAFKA-2123

2015-07-09 Thread Guozhang Wang


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 91
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91
 
  Is the retryBackoffMs here because that's how long you want to wait for 
  a response before we jump back out of the poll, which fails the request and 
  triggers another NetworkClient.poll() call, which in turn sends the 
  MetadataRequest?
  
  Just trying to figure out the flow because a) the flow is unclear and 
  b) this is the only use of metadata in this class. Would this make sense to 
  push into NetworkClient, which is the one responsible for making the 
  request anyway? I'm not sure it's a good idea since nothing besides this 
  class uses NetworkClient anymore, but worth thinking about.
 
 Jason Gustafson wrote:
 Guozhang mentioned this code as well, which was copied from 
 KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably 
 could be poll(-1) since we are not actually resending any requests.
 
 To me, the weird thing has always been that metadata updates are handled 
 directly in NetworkClient and not in the higher layers. I can see the reason 
 for it, but it makes it difficult to attach code to metadata updates (which 
 is exactly what we need to implement regex subscriptions). I would personally 
 be in favor of pulling it out of NetworkClient, but that has implications for 
 KafkaProducer as well. Moving this method into NetworkClient is probably a 
 better idea at the moment and may give us a fighting chance to get the logic 
 right.

When implementing the new producer, people feel it is better letting 
NetworkClient handles metadata responses specifically aside from other 
responses so that the high-level module (Sender for producer, and whatever the 
module for consumer at that time) does not need to handle it. I think this 
motivation is still valid even as today, but we did not thought through the 
regex subscriptions at that time..


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 121
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121
 
  Is this the behavior we want? Both timeout and 
  delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks 
  especially, it seems like we're tying the failure of requests to unrelated 
  events?
  
  Previously, I thought request failures may happen quickly, but then 
  there was a Utils.sleep backoff. I am not seeing how that is handled now? 
  If the connection isn't established, won't poll() not send out requests, 
  run client.poll(), possibly return very quickly before the connection is 
  established/fails, then fail the unsent requests, then just return to the 
  caller? And that caller might be one of those while() loops, so it may just 
  continue to retry, busy looping while not actually accomplishing anything.
  
  If we're going to introduce queuing of send requests here, it seems 
  like a timeout (rather than fast fail + backoff) might be a more natural 
  solution. So rather than clearUnsentRequests, it might be 
  clearExpiredRequests.
 
 Jason Gustafson wrote:
 This is not clear from the comments, but the main reason for the unsent 
 request queue in this class is to get around NetworkClient's single-send 
 behavior. Currently, NetworkClient will only accept one send at a time for a 
 given node. If you have multiple requests, you have to sprinkle in a bunch of 
 poll() calls and use ready() to make sure that they can all get buffered. 
 This is annoying because you cannot attach a callback to a request until you 
 are actually able to send it. This means you always have to send requests in 
 a poll loop, which implies you can't really have asynchronous requests. The 
 unsent request queue provides a way to buffer these requests before 
 transporting to the network layer which allows us to give a future to the 
 caller immediately. 
 
 Now, when the user calls poll, we try to send all of the unsent requests 
 immediately (by doing the ready()/poll() dance until nothing else can be 
 sent). Note that none of this depends on the timeout passed to poll: we are 
 just calling poll(0) until nothing can be sent. After that, then we call 
 poll(timeout). It may be the case that there were some requests that failed 
 to be sent. This could be because the client is still connecting to the node 
 or that its send buffer is full. To make the behavior of this class easy to 
 understand, I just fail these requests which means that the unsent request 
 queue never accumulates beyond a poll call. So every request sent either gets 
 transported or fails.
 
 I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. 
 It should probably be invoked 

Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---



core/src/main/scala/kafka/server/AbstractFetcherThread.scala (line 71)
https://reviews.apache.org/r/34965/#comment16

As you probably noticed synchronization in the 
AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. 
Since the simple consumer is force-closed without the SimpleConsumer’s lock 
consider the following sequence:
- You call forceClose
- In the mean time (before isClosed is set to true) an ongoing call to 
sendRequest recreates the connection
- The fetcher thread will subsequently exit (since the ShutdownableThread’s 
isRunning flag is false)
- So even though the SimpleConsumer is _closed_ at that point, the 
connection will remain

Can you verify or is it a non-issue?



core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
https://reviews.apache.org/r/34965/#comment18

You could get around the above by retaining this call to 
simpleConsumer.close (although it would be mostly redundant). However this is 
still not ideal, since it is a caveat that the user of the (public) forceClose 
API needs to be aware of.


- Joel Koshy


On June 3, 2015, 10:30 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34965/
 ---
 
 (Updated June 3, 2015, 10:30 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2241
 https://issues.apache.org/jira/browse/KAFKA-2241
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
 ReadableByteChannel.read(buffer)
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 31a2639477bf66f9a05d2b9b07794572d7ec393b 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
 
 Diff: https://reviews.apache.org/r/34965/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin
 




[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621174#comment-14621174
 ] 

Onur Karaman commented on KAFKA-1740:
-

Sounds good. Just a heads up, KAFKA-2123 checks if we're PreparingRebalance. A 
more accurate check would be if we're not Stable.

So I would suggest the following to handleHeartbeat:
{code}
} else if (generationId != group.generationId || !group.is(Stable)) {
  responseCallback(Errors.ILLEGAL_GENERATION.code)
}
{code}

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, 
 KAFKA-1740_2015-06-29_18:44:54.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread Edward Ribeiro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621176#comment-14621176
 ] 

Edward Ribeiro commented on KAFKA-2327:
---

Left some minor review comments, hope you don't mind. :)

 broker doesn't start if config defines advertised.host but not advertised.port
 --

 Key: KAFKA-2327
 URL: https://issues.apache.org/jira/browse/KAFKA-2327
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Geoffrey Anderson
Assignee: Geoffrey Anderson
Priority: Minor

 To reproduce locally, in server.properties, define advertised.host and 
 port, but not advertised.port 
 port=9092
 advertised.host.name=localhost
 Then start zookeeper and try to start kafka. The result is an error like so:
 [2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
 kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
 broker endpoint
   at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
   at 
 kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
   at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
   at kafka.Kafka$.main(Kafka.scala:58)
   at kafka.Kafka.main(Kafka.scala)
 Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac
 the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
 method, and I believe the fix is (starting at line 727)
 {code}
 ...
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 getString(KafkaConfig.AdvertisedHostNameProp) + : + 
 getInt(KafkaConfig.AdvertisedPortProp))
 ...
 {code}
 -
 {code}
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 advertisedHostName + : + advertisedPort
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1740:
---
Attachment: KAFKA-1740.patch

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, 
 KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621193#comment-14621193
 ] 

Jason Gustafson commented on KAFKA-1740:


Created reviewboard https://reviews.apache.org/r/36368/diff/
 against branch upstream/trunk

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, 
 KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36368: Patch for KAFKA-1740

2015-07-09 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36368/#review91184
---

Ship it!


Ship It!

- Onur Karaman


On July 9, 2015, 8:23 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36368/
 ---
 
 (Updated July 9, 2015, 8:23 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1740
 https://issues.apache.org/jira/browse/KAFKA-1740
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1740; heartbeat should return illegal generation if rebalancing
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 476973b2c551db5be3f1c54f94990f0dd15ff65e 
 
 Diff: https://reviews.apache.org/r/36368/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 36368: Patch for KAFKA-1740

2015-07-09 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36368/#review91187
---


Could you add some unit test for this fix?


core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (line 358)
https://reviews.apache.org/r/36368/#comment144470

I feel it is helpful to add add the new members list in this info entry as 
well. I can do that upon checkin if people are OK with it?


- Guozhang Wang


On July 9, 2015, 8:23 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36368/
 ---
 
 (Updated July 9, 2015, 8:23 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1740
 https://issues.apache.org/jira/browse/KAFKA-1740
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1740; heartbeat should return illegal generation if rebalancing
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 476973b2c551db5be3f1c54f94990f0dd15ff65e 
 
 Diff: https://reviews.apache.org/r/36368/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621144#comment-14621144
 ] 

Guozhang Wang commented on KAFKA-1740:
--

Thanks for the findings [~onurkaraman]! And sorry I overlooked this issue when 
making the commit.

As for the solution options, I actually still prefer option 1 because I feel it 
makes more sense to me to only log generation id increase when we finished form 
a new group so that we can log the new group members along with the new 
generation id (so I guess it is purely personal taste :)

Anyways I think this is an urgent bug fix, so if you are OK with option 1 I can 
extract it from [~hachikuji]'s patch and commit it to unblock people?

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, 
 KAFKA-1740_2015-06-29_18:44:54.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2303) Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction failures

2015-07-09 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2303:
-
Fix Version/s: 0.8.3

 Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction 
 failures
 

 Key: KAFKA-2303
 URL: https://issues.apache.org/jira/browse/KAFKA-2303
 Project: Kafka
  Issue Type: Bug
  Components: core, log
Affects Versions: 0.8.2.1
Reporter: Alexander Demidko
Assignee: Jay Kreps
 Fix For: 0.8.3


 We have rolled out the patch for KAFKA-2235 to our kafka cluster, and 
 recently instead of 
 {code}
 kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
 java.lang.IllegalArgumentException: requirement failed: Attempt to add a new 
 entry to a full offset map. 
 {code}
 we started to see 
 {code}
 kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
 java.lang.IllegalArgumentException: requirement failed: 131390902 messages in 
 segment topic-name-cgstate-8/79840768.log but offset map can 
 fit only 80530612. You can increase log.cleaner.dedupe.buffer.size or 
 decrease log.cleaner.threads
 {code}
 So, we had to roll it back to avoid disk depletion although I'm not sure if 
 it needs to be rolled back in trunk. This patch applies more strict checks 
 than were in place before: even if there is only one unique key for a 
 segment, cleanup will fail if this segment is too big. 
 Does it make sense to eliminate a limit for the offset map slots count, for 
 example to use an offset map backed by a memory mapped file?
 The limit of 80530612 slots comes from memory / bytesPerEntry, where memory 
 is Int.MaxValue (we use only one cleaner thread) and bytesPerEntry is 8 + 
 digest hash size. Might be wrong, but it seems if the overall number of 
 unique keys per partition is more than 80M slots in an OffsetMap, compaction 
 will always fail and cleaner thread will die. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 36368: Patch for KAFKA-1740

2015-07-09 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36368/
---

Review request for kafka.


Bugs: KAFKA-1740
https://issues.apache.org/jira/browse/KAFKA-1740


Repository: kafka


Description
---

KAFKA-1740; heartbeat should return illegal generation if rebalancing


Diffs
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
476973b2c551db5be3f1c54f94990f0dd15ff65e 

Diff: https://reviews.apache.org/r/36368/diff/


Testing
---


Thanks,

Jason Gustafson



[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14620788#comment-14620788
 ] 

Onur Karaman commented on KAFKA-1740:
-

Here are two scenarios from 3f8480ccfb011eb43da774737597c597f703e11b which 
cause issues:

Let the following be a sample consumer:
{code}
KafkaConsumerString, String consumer = new KafkaConsumerString, 
String(props);
consumer.subscribe(topic1, topic2);
while (true) {
  ConsumerRecordsString, String records = consumer.poll(100);
  for (ConsumerRecordString, String record : records) {
System.out.printf(topic = %s, partition = %d, offset = %d, key = %s, value 
= %s,
record.topic(), record.partition(), record.offset(), record.key(), 
record.value());
  }
}
{code}
1. Start the consumer. Then expand the partitions for topic1. The coordinator 
logs will show the state going from Stable - PreparingRebalance - Dead - 
Stable (because the consmer is running in a loop and will rejoin). This state 
will now be steady.

2. Start consumer c1. Wait for the group to be Stable. Start consumer c2. The 
coordinator logs will show the state forever loop from Stable - 
PreparingRebalance - Rebalancing - Stable - PreparingRebalance - 
Rebalancing - Stable. Basically c1 joining causes c2 to timeout and c2 joining 
causes c1 to timeout.

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740_2015-06-29_18:21:42.patch, 
 KAFKA-1740_2015-06-29_18:44:54.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36346: fix heartbeat to allow offset commits during PreparingRebalance

2015-07-09 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36346/#review91118
---


Just FYI, this was also noticed in the patch for KAFKA-2123 (which is moving 
quite a bit of code around). The solution was a bit different (the condition is 
different and keeps the existing generation ID increment location).

- Ewen Cheslack-Postava


On July 9, 2015, 9:42 a.m., Onur Karaman wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36346/
 ---
 
 (Updated July 9, 2015, 9:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1740
 https://issues.apache.org/jira/browse/KAFKA-1740
 
 
 Repository: kafka
 
 
 Description
 ---
 
 fix heartbeat to allow offset commits during PreparingRebalance
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 476973b2c551db5be3f1c54f94990f0dd15ff65e 
 
 Diff: https://reviews.apache.org/r/36346/diff/
 
 
 Testing
 ---
 
 Manual testing only so far. The OffsetManager merge didn't add anything to 
 ConsumerCoordinatorResponseTest.scala. I'll try to add all the missing 
 OffsetCommitRequest and OffsetFetchRequest handling tests to this rb.
 
 
 Thanks,
 
 Onur Karaman
 




[jira] [Updated] (KAFKA-2241) AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)

2015-07-09 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-2241:

Attachment: KAFKA-2241_2015-07-09_15:35:49.patch

 AbstractFetcherThread.shutdown() should not block on 
 ReadableByteChannel.read(buffer)
 -

 Key: KAFKA-2241
 URL: https://issues.apache.org/jira/browse/KAFKA-2241
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2241.patch, KAFKA-2241_2015-06-03_15:30:35.patch, 
 KAFKA-2241_2015-07-09_15:35:49.patch, client.java, server.java


 This is likely a bug from Java. This affects Kafka and here is the patch to 
 fix it.
 Here is the description of the bug. By description of SocketChannel in Java 7 
 Documentation. If another thread interrupts the current thread while the read 
 operation is in progress, the it should closes the channel and throw 
 ClosedByInterruptException. However, we find that interrupting the thread 
 will not unblock the channel immediately. Instead, it waits for response or 
 socket timeout before throwing an exception.
 This will cause problem in the following scenario. Suppose one 
 console_consumer_1 is reading from a topic, and due to quota delay or 
 whatever reason, it block on channel.read(buffer). At this moment, another 
 console_consumer_2 joins and triggers rebalance at console_consumer_1. But 
 consumer_1 will block waiting on the channel.read before it can release 
 partition ownership, causing consumer_2 to fail after a number of failed 
 attempts to obtain partition ownership.
 In other words, AbstractFetcherThread.shutdown() is not guaranteed to 
 shutdown due to this bug.
 The problem is confirmed with Java 1.7 and java 1.6. To check it by yourself, 
 you can use the attached server.java and client.java -- start the server 
 before the client and see if client unblock after interruption.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2241) AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)

2015-07-09 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621396#comment-14621396
 ] 

Dong Lin commented on KAFKA-2241:
-

Updated reviewboard https://reviews.apache.org/r/34965/diff/
 against branch origin/trunk

 AbstractFetcherThread.shutdown() should not block on 
 ReadableByteChannel.read(buffer)
 -

 Key: KAFKA-2241
 URL: https://issues.apache.org/jira/browse/KAFKA-2241
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2241.patch, KAFKA-2241_2015-06-03_15:30:35.patch, 
 KAFKA-2241_2015-07-09_15:35:49.patch, client.java, server.java


 This is likely a bug from Java. This affects Kafka and here is the patch to 
 fix it.
 Here is the description of the bug. By description of SocketChannel in Java 7 
 Documentation. If another thread interrupts the current thread while the read 
 operation is in progress, the it should closes the channel and throw 
 ClosedByInterruptException. However, we find that interrupting the thread 
 will not unblock the channel immediately. Instead, it waits for response or 
 socket timeout before throwing an exception.
 This will cause problem in the following scenario. Suppose one 
 console_consumer_1 is reading from a topic, and due to quota delay or 
 whatever reason, it block on channel.read(buffer). At this moment, another 
 console_consumer_2 joins and triggers rebalance at console_consumer_1. But 
 consumer_1 will block waiting on the channel.read before it can release 
 partition ownership, causing consumer_2 to fail after a number of failed 
 attempts to obtain partition ownership.
 In other words, AbstractFetcherThread.shutdown() is not guaranteed to 
 shutdown due to this bug.
 The problem is confirmed with Java 1.7 and java 1.6. To check it by yourself, 
 you can use the attached server.java and client.java -- start the server 
 before the client and see if client unblock after interruption.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Dong Lin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
---

(Updated July 9, 2015, 10:35 p.m.)


Review request for kafka.


Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241


Repository: kafka


Description
---

KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
ReadableByteChannel.read(buffer)


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
c16f7edd322709060e54c77eb505c44cbd77a4ec 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
83fc47417dd7fe4edf030217fa7fd69d99b170b0 

Diff: https://reviews.apache.org/r/34965/diff/


Testing
---


Thanks,

Dong Lin



[jira] [Updated] (KAFKA-2241) AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)

2015-07-09 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-2241:

Status: Patch Available  (was: In Progress)

 AbstractFetcherThread.shutdown() should not block on 
 ReadableByteChannel.read(buffer)
 -

 Key: KAFKA-2241
 URL: https://issues.apache.org/jira/browse/KAFKA-2241
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin
  Labels: quotas
 Attachments: KAFKA-2241.patch, KAFKA-2241_2015-06-03_15:30:35.patch, 
 KAFKA-2241_2015-07-09_15:35:49.patch, client.java, server.java


 This is likely a bug from Java. This affects Kafka and here is the patch to 
 fix it.
 Here is the description of the bug. By description of SocketChannel in Java 7 
 Documentation. If another thread interrupts the current thread while the read 
 operation is in progress, the it should closes the channel and throw 
 ClosedByInterruptException. However, we find that interrupting the thread 
 will not unblock the channel immediately. Instead, it waits for response or 
 socket timeout before throwing an exception.
 This will cause problem in the following scenario. Suppose one 
 console_consumer_1 is reading from a topic, and due to quota delay or 
 whatever reason, it block on channel.read(buffer). At this moment, another 
 console_consumer_2 joins and triggers rebalance at console_consumer_1. But 
 consumer_1 will block waiting on the channel.read before it can release 
 partition ownership, causing consumer_2 to fail after a number of failed 
 attempts to obtain partition ownership.
 In other words, AbstractFetcherThread.shutdown() is not guaranteed to 
 shutdown due to this bug.
 The problem is confirmed with Java 1.7 and java 1.6. To check it by yourself, 
 you can use the attached server.java and client.java -- start the server 
 before the client and see if client unblock after interruption.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Dong Lin


 On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 71
  https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line71
 
  As you probably noticed synchronization in the 
  AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. 
  Since the simple consumer is force-closed without the SimpleConsumer’s lock 
  consider the following sequence:
  - You call forceClose
  - In the mean time (before isClosed is set to true) an ongoing call to 
  sendRequest recreates the connection
  - The fetcher thread will subsequently exit (since the 
  ShutdownableThread’s isRunning flag is false)
  - So even though the SimpleConsumer is _closed_ at that point, the 
  connection will remain
  
  Can you verify or is it a non-issue?

Thanks for the catch! Yes this is an issue. After looking through the code 
carefully I think we have to keep the simpleConsumer.close() to avoid this 
problem.

I have also changed the function name and added comments to document the use of 
the new API.


 On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
  https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76
 
  You could get around the above by retaining this call to 
  simpleConsumer.close (although it would be mostly redundant). However this 
  is still not ideal, since it is a caveat that the user of the (public) 
  forceClose API needs to be aware of.

I agree. I have updated the code and comments to hopefully avoid any confusion 
to user.


- Dong


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---


On July 9, 2015, 10:35 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34965/
 ---
 
 (Updated July 9, 2015, 10:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2241
 https://issues.apache.org/jira/browse/KAFKA-2241
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
 ReadableByteChannel.read(buffer)
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 c16f7edd322709060e54c77eb505c44cbd77a4ec 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
 
 Diff: https://reviews.apache.org/r/34965/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin
 




Jenkins build is back to normal : KafkaPreCommit #143

2015-07-09 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/143/changes



Re: Review Request 36368: Patch for KAFKA-1740

2015-07-09 Thread Jason Gustafson


 On July 9, 2015, 8:40 p.m., Guozhang Wang wrote:
  Could you add some unit test for this fix?

Done. The test case depends on timing like the other response tests, but it 
should test what we want most of the time and succeed even if there is an 
unexpected delay in execution.


 On July 9, 2015, 8:40 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 358
  https://reviews.apache.org/r/36368/diff/1/?file=1003888#file1003888line358
 
  I feel it is helpful to add add the new members list in this info entry 
  as well. I can do that upon checkin if people are OK with it?

Sounds reasonable to me.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36368/#review91187
---


On July 9, 2015, 9:41 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36368/
 ---
 
 (Updated July 9, 2015, 9:41 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1740
 https://issues.apache.org/jira/browse/KAFKA-1740
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1740; add unit test for heartbeat during rebalance
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 476973b2c551db5be3f1c54f94990f0dd15ff65e 
   
 core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
  3cd726d291d0b1f9dd30d499c204e40961eb2c41 
 
 Diff: https://reviews.apache.org/r/36368/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




[GitHub] kafka pull request: KAFKA-2327; broker doesn't start if config def...

2015-07-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/73


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621352#comment-14621352
 ] 

ASF GitHub Bot commented on KAFKA-2327:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/73


 broker doesn't start if config defines advertised.host but not advertised.port
 --

 Key: KAFKA-2327
 URL: https://issues.apache.org/jira/browse/KAFKA-2327
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Geoffrey Anderson
Assignee: Geoffrey Anderson
Priority: Minor

 To reproduce locally, in server.properties, define advertised.host and 
 port, but not advertised.port 
 port=9092
 advertised.host.name=localhost
 Then start zookeeper and try to start kafka. The result is an error like so:
 [2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
 kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
 broker endpoint
   at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
   at 
 kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
   at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
   at kafka.Kafka$.main(Kafka.scala:58)
   at kafka.Kafka.main(Kafka.scala)
 Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac
 the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
 method, and I believe the fix is (starting at line 727)
 {code}
 ...
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 getString(KafkaConfig.AdvertisedHostNameProp) + : + 
 getInt(KafkaConfig.AdvertisedPortProp))
 ...
 {code}
 -
 {code}
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 advertisedHostName + : + advertisedPort
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2327) broker doesn't start if config defines advertised.host but not advertised.port

2015-07-09 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira resolved KAFKA-2327.
-
   Resolution: Fixed
Fix Version/s: 0.8.3

Merged PR.

 broker doesn't start if config defines advertised.host but not advertised.port
 --

 Key: KAFKA-2327
 URL: https://issues.apache.org/jira/browse/KAFKA-2327
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Geoffrey Anderson
Assignee: Geoffrey Anderson
Priority: Minor
 Fix For: 0.8.3


 To reproduce locally, in server.properties, define advertised.host and 
 port, but not advertised.port 
 port=9092
 advertised.host.name=localhost
 Then start zookeeper and try to start kafka. The result is an error like so:
 [2015-07-09 11:29:20,760] FATAL  (kafka.Kafka$)
 kafka.common.KafkaException: Unable to parse PLAINTEXT://localhost:null to a 
 broker endpoint
   at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 kafka.utils.CoreUtils$$anonfun$listenerListToEndPoints$1.apply(CoreUtils.scala:309)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:309)
   at 
 kafka.server.KafkaConfig.getAdvertisedListeners(KafkaConfig.scala:728)
   at kafka.server.KafkaConfig.init(KafkaConfig.scala:668)
   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:541)
   at kafka.Kafka$.main(Kafka.scala:58)
   at kafka.Kafka.main(Kafka.scala)
 Looks like this was changed in 5c9040745466945a04ea0315de583ccdab0614ac
 the cause seems to be in KafkaConfig.scala in the getAdvertisedListeners 
 method, and I believe the fix is (starting at line 727)
 {code}
 ...
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 getString(KafkaConfig.AdvertisedHostNameProp) + : + 
 getInt(KafkaConfig.AdvertisedPortProp))
 ...
 {code}
 -
 {code}
 } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
 getInt(KafkaConfig.AdvertisedPortProp) != null) {
   CoreUtils.listenerListToEndPoints(PLAINTEXT:// +
 advertisedHostName + : + advertisedPort
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621387#comment-14621387
 ] 

Ismael Juma commented on KAFKA-2187:


OK, thank you.

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : Kafka-trunk #537

2015-07-09 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/537/changes



[jira] [Commented] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621329#comment-14621329
 ] 

Jason Gustafson commented on KAFKA-1740:


Updated reviewboard https://reviews.apache.org/r/36368/diff/
 against branch upstream/trunk

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, 
 KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch, 
 KAFKA-1740_2015-07-09_14:41:31.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36368: Patch for KAFKA-1740

2015-07-09 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36368/
---

(Updated July 9, 2015, 9:41 p.m.)


Review request for kafka.


Bugs: KAFKA-1740
https://issues.apache.org/jira/browse/KAFKA-1740


Repository: kafka


Description (updated)
---

KAFKA-1740; add unit test for heartbeat during rebalance


Diffs (updated)
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
476973b2c551db5be3f1c54f94990f0dd15ff65e 
  
core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
 3cd726d291d0b1f9dd30d499c204e40961eb2c41 

Diff: https://reviews.apache.org/r/36368/diff/


Testing
---


Thanks,

Jason Gustafson



[jira] [Updated] (KAFKA-1740) Merge Offset manager into Coordinator

2015-07-09 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-1740:
---
Attachment: KAFKA-1740_2015-07-09_14:41:31.patch

 Merge Offset manager into Coordinator
 -

 Key: KAFKA-1740
 URL: https://issues.apache.org/jira/browse/KAFKA-1740
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Critical
 Fix For: 0.8.3

 Attachments: KAFKA-1740.patch, KAFKA-1740.patch, 
 KAFKA-1740_2015-06-29_18:21:42.patch, KAFKA-1740_2015-06-29_18:44:54.patch, 
 KAFKA-1740_2015-07-09_14:41:31.patch


 This JIRA involves refactoring offset manager and merge it into coordinator, 
 including adding the logic for consumer-id / generation-id checking.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster

2015-07-09 Thread Bob Cotton (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621336#comment-14621336
 ] 

Bob Cotton commented on KAFKA-2300:
---

Which logs would you like? server, controller, state-change?

 Error in controller log when broker tries to rejoin cluster
 ---

 Key: KAFKA-2300
 URL: https://issues.apache.org/jira/browse/KAFKA-2300
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Johnny Brown
Assignee: Flavio Junqueira

 Hello Kafka folks,
 We are having an issue where a broker attempts to join the cluster after 
 being restarted, but is never added to the ISR for its assigned partitions. 
 This is a three-node cluster, and the controller is broker 2.
 When broker 1 starts, we see the following message in broker 2's 
 controller.log.
 {{
 [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error 
 while handling broker changes 
 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
 java.lang.IllegalStateException: Controller to broker state change requests 
 batch is not empty while creating a new one. Some UpdateMetadata state 
 changes Map(2 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)),
  1 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)),
  3 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)))
  might be lost 
   at 
 kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202)
   at 
 kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974)
   at 
 kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
   at kafka.utils.Utils$.inLock(Utils.scala:535)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 }}
 {{prod-sver-end}} is a topic we previously deleted. It seems some remnant of 
 it persists in the controller's memory, causing an exception which interrupts 
 the state change triggered by the broker startup.
 Has anyone seen something like this? Any idea what's happening here? Any 
 information would be greatly appreciated.
 Thanks,
 Johnny



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2300) Error in controller log when broker tries to rejoin cluster

2015-07-09 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621344#comment-14621344
 ] 

Flavio Junqueira commented on KAFKA-2300:
-

controller logs, please.

 Error in controller log when broker tries to rejoin cluster
 ---

 Key: KAFKA-2300
 URL: https://issues.apache.org/jira/browse/KAFKA-2300
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Johnny Brown
Assignee: Flavio Junqueira

 Hello Kafka folks,
 We are having an issue where a broker attempts to join the cluster after 
 being restarted, but is never added to the ISR for its assigned partitions. 
 This is a three-node cluster, and the controller is broker 2.
 When broker 1 starts, we see the following message in broker 2's 
 controller.log.
 {{
 [2015-06-23 13:57:16,535] ERROR [BrokerChangeListener on Controller 2]: Error 
 while handling broker changes 
 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
 java.lang.IllegalStateException: Controller to broker state change requests 
 batch is not empty while creating a new one. Some UpdateMetadata state 
 changes Map(2 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)),
  1 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)),
  3 - Map([prod-sver-end,1] - 
 (LeaderAndIsrInfo:(Leader:-2,ISR:1,LeaderEpoch:0,ControllerEpoch:165),ReplicationFactor:1),AllReplicas:1)))
  might be lost 
   at 
 kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:202)
   at 
 kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:974)
   at 
 kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:399)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:371)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
   at kafka.utils.Utils$.inLock(Utils.scala:535)
   at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 }}
 {{prod-sver-end}} is a topic we previously deleted. It seems some remnant of 
 it persists in the controller's memory, causing an exception which interrupts 
 the state change triggered by the broker startup.
 Has anyone seen something like this? Any idea what's happening here? Any 
 information would be greatly appreciated.
 Thanks,
 Johnny



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2187:

   Resolution: Fixed
Fix Version/s: 0.8.3
   Status: Resolved  (was: Patch Available)

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script

2015-07-09 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14621384#comment-14621384
 ] 

Gwen Shapira commented on KAFKA-2187:
-

I accidentally committed this script as part of PR 73 (I did a git add, 
intending to commit this separately, and then up committing PR 73 from same 
branch).

I think the script is in good condition, so I'm not taking it out. Just file a 
new JIRA to fix the branch thing when you have a chance.

 Introduce merge-kafka-pr.py script
 --

 Key: KAFKA-2187
 URL: https://issues.apache.org/jira/browse/KAFKA-2187
 Project: Kafka
  Issue Type: New Feature
Reporter: Ismael Juma
Assignee: Ismael Juma
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch, 
 KAFKA-2187_2015-06-02_20:05:50.patch


 This script will be used to merge GitHub pull requests and it will pull from 
 the Apache Git repo to the current branch, squash and merge the PR, push the 
 commit to trunk, close the PR (via commit message) and close the relevant 
 JIRA issue (via JIRA API).
 Spark has a script that does most (if not all) of this and that will be used 
 as the starting point:
 https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Joel Koshy


 On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
  https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76
 
  You could get around the above by retaining this call to 
  simpleConsumer.close (although it would be mostly redundant). However this 
  is still not ideal, since it is a caveat that the user of the (public) 
  forceClose API needs to be aware of.
 
 Dong Lin wrote:
 I agree. I have updated the code and comments to hopefully avoid any 
 confusion to user.

Would it work to just modify what you had before in `forceClose` to:
```
disconnect();
close();
```


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---


On July 9, 2015, 10:35 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34965/
 ---
 
 (Updated July 9, 2015, 10:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2241
 https://issues.apache.org/jira/browse/KAFKA-2241
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
 ReadableByteChannel.read(buffer)
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 c16f7edd322709060e54c77eb505c44cbd77a4ec 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
 
 Diff: https://reviews.apache.org/r/34965/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin
 




Re: Review Request 34965: Patch for KAFKA-2241

2015-07-09 Thread Dong Lin


 On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
  https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76
 
  You could get around the above by retaining this call to 
  simpleConsumer.close (although it would be mostly redundant). However this 
  is still not ideal, since it is a caveat that the user of the (public) 
  forceClose API needs to be aware of.
 
 Dong Lin wrote:
 I agree. I have updated the code and comments to hopefully avoid any 
 confusion to user.
 
 Joel Koshy wrote:
 Would it work to just modify what you had before in `forceClose` to:
 ```
 disconnect();
 close();
 ```

I think that won't work. The event sequence you described will still cause 
problem.

The following sequence of events may happen:

- the forceClose() as well as close() is executed by thread 1
- thread 2 calls sendRequest(). blockingChannel.send(request) will throw 
ClosedChannelException which triggers reconnect().

It is possible to make this work by changing the way sendRequest() handles 
ClosedChannelException. But I find the API in the second patch is better.

Which solution do you prefer?


- Dong


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
---


On July 9, 2015, 10:35 p.m., Dong Lin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34965/
 ---
 
 (Updated July 9, 2015, 10:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2241
 https://issues.apache.org/jira/browse/KAFKA-2241
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2241; AbstractFetcherThread.shutdown() should not block on 
 ReadableByteChannel.read(buffer)
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 c16f7edd322709060e54c77eb505c44cbd77a4ec 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
 
 Diff: https://reviews.apache.org/r/34965/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dong Lin