Hi Jun, No, all 9 brokers are up and when I look at the files in /opt/kafka-[]-logs there is data for partition 0 of that topic on 3 different brokers.
After confirming this was still happening this morning, I bounced all the brokers and on restart one of them took over primary on partition 0. No more errors after reboot. However, I now have a different problem. To see if the issue was creating a new topic with all the brokers live, I created a new topic using the same command line as below. The list_topics show it was created with primaries on all partitions. However on one of machines (with 3 brokers running (1,2& 3) ) I keep getting the following warning: [2012-11-28 07:56:46,014] WARN [ReplicaFetcherThread-9-0-on-broker-1], error for test2 2 to broker 9 (kafka.server.ReplicaFetcherThread) kafka.common.UnknownTopicOrPartitionException at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:106) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) [2012-11-28 07:56:46,289] WARN [ReplicaFetcherThread-8-0-on-broker-1], error for test2 1 to broker 8 (kafka.server.ReplicaFetcherThread) kafka.common.UnknownTopicOrPartitionException at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:106) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) (3 brokers on that machine so I can't easily tell if the errors to the screen are from one or all 3.) The 2nd set of brokers, (4,5,6) don't show these messages. On the 3rd set of brokers (7,8,9) I get a different message: [2012-11-28 07:58:34,180] WARN Replica Manager on Broker 8: While recording the follower position, the partition [test2, 1] hasn't been created, skip updating leader HW (kafka.server.ReplicaManager) [2012-11-28 07:58:34,180] ERROR [KafkaApi-8] error when processing request (test2,1,0,1048576) (kafka.server.KafkaApis) kafka.common.UnknownTopicOrPartitionException: Topic test2 partition 1 doesn't exist on 8 at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:163) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:359) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:325) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:321) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map2.map(Map.scala:110) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:321) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:289) at kafka.server.KafkaApis.handle(KafkaApis.scala:57) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Unknown Source) Once I reset all the brokers again the warnings stop and everything looks okay. So I went and did the create topic again for test3 and no problems this time. Quick question: how do I setup log4j for the broker so the messages are written into a file per broker instead of just to the console? Might help me to only shutdown a broker having an issue vs. all on a machine. Thanks, Chris On Wed, Nov 28, 2012 at 12:13 AM, Jun Rao <jun...@gmail.com> wrote: > Is a broker down in your test? If so, you could > see LeaderNotAvailableException in the producer. The producer is trying to > refresh the metadata and the leader may not have been elected yet. You > shouldn't see it often though. > > Thanks, > > Jun > > On Tue, Nov 27, 2012 at 1:11 PM, Chris Curtin <curtin.ch...@gmail.com > >wrote: > > > Hi, > > > > I noticed several errors when writing to a topic with 5 partitions. It > > looks like the data was written to all 3 brokers, but I get the following > > errors: > > > > 9961 [main] DEBUG kafka.producer.BrokerPartitionInfo - Metadata for > topic > > partition [test1, 0] is errornous: > > [PartitionMetadata(0,None,WrappedArray(),WrappedArray(),5)] > > kafka.common.LeaderNotAvailableException > > at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) > > at > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > > <snip> > > > > 9962 [main] DEBUG kafka.producer.async.DefaultEventHandler - Getting the > > number of broker partitions registered for topic: test1 > > 9963 [main] DEBUG kafka.producer.BrokerPartitionInfo - Getting broker > > partition info for topic test1 > > 9963 [main] DEBUG kafka.producer.BrokerPartitionInfo - Topic test1 > > partition 0 does not have a leader yet > > 9963 [main] DEBUG kafka.producer.BrokerPartitionInfo - Topic test1 > > partition 1 has leader 7 > > 9963 [main] DEBUG kafka.producer.BrokerPartitionInfo - Topic test1 > > partition 2 has leader 8 > > 9963 [main] DEBUG kafka.producer.BrokerPartitionInfo - Topic test1 > > partition 3 has leader 9 > > 9963 [main] DEBUG kafka.producer.BrokerPartitionInfo - Topic test1 > > partition 4 has leader 1 > > 9963 [main] DEBUG kafka.producer.async.DefaultEventHandler - Broker > > partitions registered for topic: test1 are 0,1,2,3,4 > > > > This happens a lot as I write data to the Broker. > > > > Topic was created with: > > > > -bash-3.2$ ./kafka-create-topic.sh --topic test1 --partition 5 --replica > 3 > > --zookeeper localhost:2181 > > > > Doing a list of topics shows an empty list for that partition: > > > > [2012-11-27 16:03:35,604] INFO Session establishment complete on server > > localhost/127.0.0.1:2181, sessionid = 0x23b4218eccd000b, negotiated > > timeout > > = 30000 (org.apache.zookeeper.ClientCnxn) > > [2012-11-27 16:03:35,607] INFO zookeeper state changed (SyncConnected) > > (org.I0Itec.zkclient.ZkClient) > > topic: test1 > > PartitionMetadata(0,None,List(),List(),5) > > > > > PartitionMetadata(1,Some(id:7,creatorId:10.121.31.57-1354023708335,host:10.121.31.57,port:9092),List(id:7,creatorId:10.121.31.57-1354023708335,host:10.121.31.57,port:9092, > > id:8,creatorId:10.121.31.57-1354023708340,host:10.121.31.57,port:9093, > > > > > id:9,creatorId:10.121.31.57-1354023944130,host:10.121.31.57,port:9094),ArrayBuffer(id:7,creatorId:10.121.31.57-1354023708335,host:10.121.31.57,port:9092, > > id:8,creatorId:10.121.31.57-1354023708340,host:10.121.31.57,port:9093, > > id:9,creatorId:10.121.31.57-1354023944130,host:10.121.31.57,port:9094),0) > > > > > PartitionMetadata(2,Some(id:8,creatorId:10.121.31.57-1354023708340,host:10.121.31.57,port:9093),List(id:8,creatorId:10.121.31.57-1354023708340,host:10.121.31.57,port:9093, > > id:9,creatorId:10.121.31.57-1354023944130,host:10.121.31.57,port:9094, > > > > > id:1,creatorId:10.121.31.55-1354023701351,host:10.121.31.55,port:9092),ArrayBuffer(id:8,creatorId:10.121.31.57-1354023708340,host:10.121.31.57,port:9093, > > id:9,creatorId:10.121.31.57-1354023944130,host:10.121.31.57,port:9094, > > id:1,creatorId:10.121.31.55-1354023701351,host:10.121.31.55,port:9092),0) > > > > > PartitionMetadata(3,Some(id:9,creatorId:10.121.31.57-1354023944130,host:10.121.31.57,port:9094),List(id:9,creatorId:10.121.31.57-1354023944130,host:10.121.31.57,port:9094, > > id:1,creatorId:10.121.31.55-1354023701351,host:10.121.31.55,port:9092, > > > > > id:2,creatorId:10.121.31.55-1354023701344,host:10.121.31.55,port:9093),ArrayBuffer(id:9,creatorId:10.121.31.57-1354023944130,host:10.121.31.57,port:9094, > > id:1,creatorId:10.121.31.55-1354023701351,host:10.121.31.55,port:9092, > > id:2,creatorId:10.121.31.55-1354023701344,host:10.121.31.55,port:9093),0) > > > > > PartitionMetadata(4,Some(id:1,creatorId:10.121.31.55-1354023701351,host:10.121.31.55,port:9092),List(id:1,creatorId:10.121.31.55-1354023701351,host:10.121.31.55,port:9092, > > id:2,creatorId:10.121.31.55-1354023701344,host:10.121.31.55,port:9093, > > > > > id:3,creatorId:10.121.31.55-1354023701345,host:10.121.31.55,port:9094),ArrayBuffer(id:1,creatorId:10.121.31.55-1354023701351,host:10.121.31.55,port:9092, > > id:2,creatorId:10.121.31.55-1354023701344,host:10.121.31.55,port:9093, > > id:3,creatorId:10.121.31.55-1354023701345,host:10.121.31.55,port:9094),0) > > [2012-11-27 16:03:36,005] INFO Terminate ZkClient event thread. > > (org.I0Itec.zkclient.ZkEventThread) > > > > My partitioner logic is doing a simple modulo on the # of partitions > > passed: > > > > return (int) (organizationId % a_numPartitions); > > > > Did I miss a step setting up the topics? > > > > Thanks, > > > > Chris > > >