[ https://issues.apache.org/jira/browse/KAFKA-369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yang Ye updated KAFKA-369: -------------------------- Attachment: kafka_369_v2.diff In AsyncProducerTest.testFailedSendRetryLogic(), we may see following error messges, it won't affect the success of the test. It's because in handle() function of DefaultEventHandler, if (outstandingProduceRequests.size > 0) { // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.producerRetryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic))) it will refresh the cached metadata, but the broker is not up. This doesn't affect the correctness of the test. Maybe we need try to eliminate the error messages. [2012-08-03 19:11:44,182] ERROR Connection attempt to 127.0.0.1:52955 failed, next attempt in 100 ms (kafka.producer.SyncProducer:99) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect(Native Method) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:161) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:182) at kafka.producer.SyncProducer.doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:116) at kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:86) at kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:81) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:44) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:81) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:53) at kafka.utils.Utils$.swallow(Utils.scala:429) at kafka.utils.Logging$class.swallowError(Logging.scala:102) at kafka.utils.Utils$.swallowError(Utils.scala:40) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53) at kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:438) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at junit.framework.TestCase.runTest(TestCase.java:164) at junit.framework.TestCase.runBare(TestCase.java:130) at junit.framework.TestResult$1.protect(TestResult.java:110) at junit.framework.TestResult.runProtected(TestResult.java:128) at junit.framework.TestResult.run(TestResult.java:113) at junit.framework.TestCase.run(TestCase.java:120) at junit.framework.TestSuite.runTest(TestSuite.java:228) at junit.framework.TestSuite.run(TestSuite.java:223) at org.junit.internal.runners.OldTestClassRunner.run(OldTestClassRunner.java:35) at org.junit.runner.JUnitCore.run(JUnitCore.java:121) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:71) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:199) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:62) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) > remove ZK dependency on producer > -------------------------------- > > Key: KAFKA-369 > URL: https://issues.apache.org/jira/browse/KAFKA-369 > Project: Kafka > Issue Type: Sub-task > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Yang Ye > Attachments: kafka_369_v1.diff, kafka_369_v2.diff > > Original Estimate: 252h > Remaining Estimate: 252h > > Currently, the only place that ZK is actually used is in BrokerPartitionInfo. > We use ZK to get a list of brokers for making TopicMetadataRequest requests. > Instead, we can provide a list of brokers in the producer config directly. > That way, the producer client is no longer dependant on ZK. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira