[ 
https://issues.apache.org/jira/browse/KAFKA-633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-633:
-----------------------------

    Attachment: KAFKA-633-v1.patch

This is a timing issue that affects low-volume topics (and in this case an 
empty topic). The issue is that the leader that is being shut down receives a 
leaderAndIsrRequest informing it is no longer the leader and thus starts up a 
follower which starts issuing fetch requests to the new leader. We then shrink 
the ISR and send a StopReplicaRequest to the shutting down broker. However, the 
new leader upon receiving the fetch request expands the ISR again.

The shutdown itself is working correctly in that the leader has been 
successfully moved. This patch fixes the assertion by checking the controller's 
cached ISR instead of ZooKeeper. The patch also fixes the annoying 
zookeeper-related messages if the test fails - the problem was that the brokers 
were not getting shut down and were trying to talk to the torn down zookeeper.

I think it would be better to fix the corner case in a separate non-blocker 
jira. One possible approach would be to use the callback feature in the 
ControllerBrokerRequestBatch and wait until the StopReplicaRequest has been 
processed by the shutting down broker before shrinking the ISR; and there are 
probably other ways as well.

                
> AdminTest.testShutdownBroker fails
> ----------------------------------
>
>                 Key: KAFKA-633
>                 URL: https://issues.apache.org/jira/browse/KAFKA-633
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>         Attachments: KAFKA-633-v1.patch
>
>
> 0m[ [31merror [0m]  [0mTest Failed: testShutdownBroker(kafka.admin.AdminTest) 
> [0m
> junit.framework.AssertionFailedError: expected:<2> but was:<3>
>       at junit.framework.Assert.fail(Assert.java:47)
>       at junit.framework.Assert.failNotEquals(Assert.java:277)
>       at junit.framework.Assert.assertEquals(Assert.java:64)
>       at junit.framework.Assert.assertEquals(Assert.java:195)
>       at junit.framework.Assert.assertEquals(Assert.java:201)
>       at kafka.admin.AdminTest.testShutdownBroker(AdminTest.scala:381)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at junit.framework.TestCase.runTest(TestCase.java:164)
>       at junit.framework.TestCase.runBare(TestCase.java:130)
>       at junit.framework.TestResult$1.protect(TestResult.java:110)
>       at junit.framework.TestResult.runProtected(TestResult.java:128)
>       at junit.framework.TestResult.run(TestResult.java:113)
>       at junit.framework.TestCase.run(TestCase.java:120)
>       at junit.framework.TestSuite.runTest(TestSuite.java:228)
>       at junit.framework.TestSuite.run(TestSuite.java:223)
>       at junit.framework.TestSuite.runTest(TestSuite.java:228)
>       at junit.framework.TestSuite.run(TestSuite.java:223)
>       at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>       at 
> org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>       at sbt.TestRunner.run(TestFramework.scala:53)
>       at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>       at sbt.TestRunner.run(TestFramework.scala:76)
>       at 
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>       at 
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>       at 
> sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>       at sbt.NamedTestTask.run(TestFramework.scala:92)
>       at 
> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>       at 
> sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>       at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>       at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>       at sbt.impl.RunTask.runTask(RunTask.scala:85)
>       at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>       at 
> sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>       at 
> sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>       at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>       at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>       at sbt.Control$.trapUnit(Control.scala:19)
>       at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to