I've written a very simple Flume plugin for Kafka -
https://github.com/ewhauser/flume-kafka-plugin.

I'm trying to enhance it a bit to work with partitioning, but I'm
having trouble getting it to work correctly in my tests.  It seems
that when a producer bootstraps, it goes to ZK to get the information
about a topic, and if the topic is not there then it creates it:

[2011-09-16 12:17:58,704] DEBUG Getting the number of broker
partitions registered for topic: test (kafka.producer.Producer)
[2011-09-16 12:17:58,704] DEBUG Getting the number of broker
partitions registered for topic: test (kafka.producer.Producer)
[2011-09-16 12:17:58,722] DEBUG Currently, no brokers are registered
under topic: test (kafka.producer.ZKBrokerPartitionInfo)
[2011-09-16 12:17:58,722] DEBUG Currently, no brokers are registered
under topic: test (kafka.producer.ZKBrokerPartitionInfo)
[2011-09-16 12:17:58,722] DEBUG Bootstrapping topic: test with
available brokers in the cluster with default number of partitions = 1
(kafka.producer.ZKBrokerPartitionInfo)
[2011-09-16 12:17:58,722] DEBUG Bootstrapping topic: test with
available brokers in the cluster with default number of partitions = 1
(kafka.producer.ZKBrokerPartitionInfo)
[2011-09-16 12:17:58,732] DEBUG Adding following broker id, partition
id for NEW topic: test=TreeSet(0-0)
(kafka.producer.ZKBrokerPartitionInfo)
[2011-09-16 12:17:58,732] DEBUG Adding following broker id, partition
id for NEW topic: test=TreeSet(0-0)
(kafka.producer.ZKBrokerPartitionInfo)
[2011-09-16 12:17:58,733] DEBUG Broker partitions registered for
topic: test = List(0-0) (kafka.producer.Producer)
[2011-09-16 12:17:58,733] DEBUG Broker partitions registered for
topic: test = List(0-0) (kafka.producer.Producer)
[2011-09-16 12:17:58,734] DEBUG Sending message to broker
10.25.245.96:9092 on partition 0 (kafka.producer.Producer)
[2011-09-16 12:17:58,734] DEBUG Sending message to broker
10.25.245.96:9092 on partition 0 (kafka.producer.Producer)

But the producer does not know about the number of partitions - nor
should it - so it creates a topic map with 1 partition.  Is there a
way to force the server to register the topics in ZK prior to a
consumer hitting it?  I'm starting up the broker using the standard
methods in most of the other tests:

    zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect());
    Properties props = TestUtils.createBrokerConfig(0, port);
    props.setProperty("num.partitions", "2");
    props.setProperty("topic.partition.count.map", "test:2");
    KafkaConfig config = new KafkaConfig(props);
    server = TestUtils.createServer(config);

Thanks.

Reply via email to