Eric, Let me try to explain what's going on here, then you can tell me why your tests fail due to this approach.
The broker has a value for default number of partitions (in server.properties), but it registers this information in zookeeper ONLY along with the topic. So, if the topic currently doesn't exist on the broker, the number of partitions it is configured with, is not known in zookeeper. We could've added this information in the /brokers/ids path in zookeeper, but for now, we don't have it. Given the above, when the first message for a new topic is produced, the zookeeper producer doesn't know the right number of partitions on a particular broker. Well, it could either assume it to be 1 or the num.partitions. Since this only is a problem for the very first message, it really doesn't matter. Once, the first message reaches the broker, it registers the new topic in zookeeper, along with the correct number of partitions. So all following produce requests for that topic are routed considering the correct value for number of partitions on that broker. Having said that, can you explain why your flume plugin tests fail ? I'm wondering if they fail since they are written with expected outputs that don't align with the current code behavior. Thanks, Neha On Fri, Sep 16, 2011 at 9:27 AM, Eric Hauser <ewhau...@gmail.com> wrote: > I've written a very simple Flume plugin for Kafka - > https://github.com/ewhauser/flume-kafka-plugin. > > I'm trying to enhance it a bit to work with partitioning, but I'm > having trouble getting it to work correctly in my tests. It seems > that when a producer bootstraps, it goes to ZK to get the information > about a topic, and if the topic is not there then it creates it: > > [2011-09-16 12:17:58,704] DEBUG Getting the number of broker > partitions registered for topic: test (kafka.producer.Producer) > [2011-09-16 12:17:58,704] DEBUG Getting the number of broker > partitions registered for topic: test (kafka.producer.Producer) > [2011-09-16 12:17:58,722] DEBUG Currently, no brokers are registered > under topic: test (kafka.producer.ZKBrokerPartitionInfo) > [2011-09-16 12:17:58,722] DEBUG Currently, no brokers are registered > under topic: test (kafka.producer.ZKBrokerPartitionInfo) > [2011-09-16 12:17:58,722] DEBUG Bootstrapping topic: test with > available brokers in the cluster with default number of partitions = 1 > (kafka.producer.ZKBrokerPartitionInfo) > [2011-09-16 12:17:58,722] DEBUG Bootstrapping topic: test with > available brokers in the cluster with default number of partitions = 1 > (kafka.producer.ZKBrokerPartitionInfo) > [2011-09-16 12:17:58,732] DEBUG Adding following broker id, partition > id for NEW topic: test=TreeSet(0-0) > (kafka.producer.ZKBrokerPartitionInfo) > [2011-09-16 12:17:58,732] DEBUG Adding following broker id, partition > id for NEW topic: test=TreeSet(0-0) > (kafka.producer.ZKBrokerPartitionInfo) > [2011-09-16 12:17:58,733] DEBUG Broker partitions registered for > topic: test = List(0-0) (kafka.producer.Producer) > [2011-09-16 12:17:58,733] DEBUG Broker partitions registered for > topic: test = List(0-0) (kafka.producer.Producer) > [2011-09-16 12:17:58,734] DEBUG Sending message to broker > 10.25.245.96:9092 on partition 0 (kafka.producer.Producer) > [2011-09-16 12:17:58,734] DEBUG Sending message to broker > 10.25.245.96:9092 on partition 0 (kafka.producer.Producer) > > But the producer does not know about the number of partitions - nor > should it - so it creates a topic map with 1 partition. Is there a > way to force the server to register the topics in ZK prior to a > consumer hitting it? I'm starting up the broker using the standard > methods in most of the other tests: > > zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect()); > Properties props = TestUtils.createBrokerConfig(0, port); > props.setProperty("num.partitions", "2"); > props.setProperty("topic.partition.count.map", "test:2"); > KafkaConfig config = new KafkaConfig(props); > server = TestUtils.createServer(config); > > Thanks. >