[ 
https://issues.apache.org/jira/browse/KAFKA-369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Ye updated KAFKA-369:
--------------------------

    Attachment: kafka_369_v2.diff


In AsyncProducerTest.testFailedSendRetryLogic(), we may see following error 
messges, it won't affect the success of the test. It's because in handle() 
function of DefaultEventHandler,

        if (outstandingProduceRequests.size > 0)  {
          // back off and update the topic metadata cache before attempting 
another send operation
          Thread.sleep(config.producerRetryBackoffMs)
          // get topics of the outstanding produce requests and refresh 
metadata for those          
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))

it will refresh the cached metadata, but the broker is not up.

This doesn't affect the correctness of the test. Maybe we need try to eliminate 
the error messages.  





[2012-08-03 19:11:44,182] ERROR Connection attempt to 127.0.0.1:52955 failed, 
next attempt in 100 ms (kafka.producer.SyncProducer:99)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:182)
        at kafka.producer.SyncProducer.doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:116)
        at 
kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:86)
        at 
kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:81)
        at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:44)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:81)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:53)
        at kafka.utils.Utils$.swallow(Utils.scala:429)
        at kafka.utils.Logging$class.swallowError(Logging.scala:102)
        at kafka.utils.Utils$.swallowError(Utils.scala:40)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53)
        at 
kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:438)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at junit.framework.TestCase.runTest(TestCase.java:164)
        at junit.framework.TestCase.runBare(TestCase.java:130)
        at junit.framework.TestResult$1.protect(TestResult.java:110)
        at junit.framework.TestResult.runProtected(TestResult.java:128)
        at junit.framework.TestResult.run(TestResult.java:113)
        at junit.framework.TestCase.run(TestCase.java:120)
        at junit.framework.TestSuite.runTest(TestSuite.java:228)
        at junit.framework.TestSuite.run(TestSuite.java:223)
        at 
org.junit.internal.runners.OldTestClassRunner.run(OldTestClassRunner.java:35)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:121)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:71)
        at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:199)
        at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:62)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
                
> remove ZK dependency on producer
> --------------------------------
>
>                 Key: KAFKA-369
>                 URL: https://issues.apache.org/jira/browse/KAFKA-369
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Yang Ye
>         Attachments: kafka_369_v1.diff, kafka_369_v2.diff
>
>   Original Estimate: 252h
>  Remaining Estimate: 252h
>
> Currently, the only place that ZK is actually used is in BrokerPartitionInfo. 
> We use ZK to get a list of brokers for making TopicMetadataRequest requests. 
> Instead, we can provide a list of brokers in the producer config directly. 
> That way, the producer client is no longer dependant on ZK.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to