Eric,

Let me try to explain what's going on here, then you can tell me why your
tests fail due to this approach.

The broker has a value for default number of partitions (in
server.properties), but it registers this information in zookeeper ONLY
along with the topic. So, if the topic currently doesn't exist on the
broker, the number of partitions it is configured with, is not known in
zookeeper. We could've added this information in the /brokers/ids path in
zookeeper, but for now, we don't have it.

Given the above, when the first message for a new topic is produced, the
zookeeper producer doesn't know the right number of partitions on a
particular broker. Well, it could either assume it to be 1 or the
num.partitions. Since this only is a problem for the very first message, it
really doesn't matter. Once, the first message reaches the broker, it
registers the new topic in zookeeper, along with the correct number of
partitions. So all following produce requests for that topic are routed
considering the correct value for number of partitions on that broker.

Having said that, can you explain why your flume plugin tests fail ? I'm
wondering if they fail since they are written with expected outputs that
don't align with the current code behavior.

Thanks,
Neha

On Fri, Sep 16, 2011 at 9:27 AM, Eric Hauser <ewhau...@gmail.com> wrote:

> I've written a very simple Flume plugin for Kafka -
> https://github.com/ewhauser/flume-kafka-plugin.
>
> I'm trying to enhance it a bit to work with partitioning, but I'm
> having trouble getting it to work correctly in my tests.  It seems
> that when a producer bootstraps, it goes to ZK to get the information
> about a topic, and if the topic is not there then it creates it:
>
> [2011-09-16 12:17:58,704] DEBUG Getting the number of broker
> partitions registered for topic: test (kafka.producer.Producer)
> [2011-09-16 12:17:58,704] DEBUG Getting the number of broker
> partitions registered for topic: test (kafka.producer.Producer)
> [2011-09-16 12:17:58,722] DEBUG Currently, no brokers are registered
> under topic: test (kafka.producer.ZKBrokerPartitionInfo)
> [2011-09-16 12:17:58,722] DEBUG Currently, no brokers are registered
> under topic: test (kafka.producer.ZKBrokerPartitionInfo)
> [2011-09-16 12:17:58,722] DEBUG Bootstrapping topic: test with
> available brokers in the cluster with default number of partitions = 1
> (kafka.producer.ZKBrokerPartitionInfo)
> [2011-09-16 12:17:58,722] DEBUG Bootstrapping topic: test with
> available brokers in the cluster with default number of partitions = 1
> (kafka.producer.ZKBrokerPartitionInfo)
> [2011-09-16 12:17:58,732] DEBUG Adding following broker id, partition
> id for NEW topic: test=TreeSet(0-0)
> (kafka.producer.ZKBrokerPartitionInfo)
> [2011-09-16 12:17:58,732] DEBUG Adding following broker id, partition
> id for NEW topic: test=TreeSet(0-0)
> (kafka.producer.ZKBrokerPartitionInfo)
> [2011-09-16 12:17:58,733] DEBUG Broker partitions registered for
> topic: test = List(0-0) (kafka.producer.Producer)
> [2011-09-16 12:17:58,733] DEBUG Broker partitions registered for
> topic: test = List(0-0) (kafka.producer.Producer)
> [2011-09-16 12:17:58,734] DEBUG Sending message to broker
> 10.25.245.96:9092 on partition 0 (kafka.producer.Producer)
> [2011-09-16 12:17:58,734] DEBUG Sending message to broker
> 10.25.245.96:9092 on partition 0 (kafka.producer.Producer)
>
> But the producer does not know about the number of partitions - nor
> should it - so it creates a topic map with 1 partition.  Is there a
> way to force the server to register the topics in ZK prior to a
> consumer hitting it?  I'm starting up the broker using the standard
> methods in most of the other tests:
>
>    zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect());
>    Properties props = TestUtils.createBrokerConfig(0, port);
>    props.setProperty("num.partitions", "2");
>    props.setProperty("topic.partition.count.map", "test:2");
>    KafkaConfig config = new KafkaConfig(props);
>    server = TestUtils.createServer(config);
>
> Thanks.
>

Reply via email to