If I understand correctly, the brokers stay informed about one another through 
ZooKeeper and therefor any broker can give info about any other broker?

This is an interesting approach. What would happen if your broker list changed 
dramatically over time? 

On Nov 20, 2012, at 1:02 PM, Neha Narkhede wrote:

> This is being discussed in another thread -
> http://markmail.org/message/mypnt7sgkqt55jb2?q=Jason+async+producer
> 
> Basically, you want zookeeper on the producer to do just one thing -
> notify the change in the liveness of brokers in Kafka
> cluster. In 0.8, brokers are not the entity to worry about, what we
> care about are replicas for the partitions that the producer
> is sending data to, in particular just the leader replica (since only
> the leader can accept writes for a partition)
> 
> The producer keeps a cache of (topic, partition) -> leader-replica.
> Now, if that cache is either empty or stale (due to changes
> on the Kafka cluster), the next produce request will get an ACK with
> an error code NotLeaderForPartition. That's when it
> fires the getMetadata request that refreshes its cache. Assuming
> you've configured your producer to rety (producer.num.retries)
> more than once, it will succeed sending data the next time.
> 
> In other words, instead of zookeeper 'notifying' us of the changes on
> the Kafka cluster, we let the producer lazily update its
> cache by invoking a special API on any of the Kafka brokers. That way,
> we have much fewer connections to zk, zk upgrades
> are easier, so are upgrades to the producer and we also achieve the
> goal of replica discovery.
> 
> Thanks,
> Neha
> 
> On Tue, Nov 20, 2012 at 9:58 AM, Bae, Jae Hyeon <metac...@gmail.com> wrote:
>> In the case that producer does not require zk.connect, how can the
>> producer recognize the new brokers or brokers which went down?
>> 
>> On Tue, Nov 20, 2012 at 8:31 AM, Jun Rao <jun...@gmail.com> wrote:
>>> David,
>>> 
>>> The change in 0.8 is that instead of requiring zk.connect, we require
>>> broker.list. In both cases, you typically provide a list of hosts and
>>> ports. Functionality wise, they achieve the same thing, ie, the producer is
>>> able to send the data to the right broker. Are you saying that zk.connect
>>> is more convenient? One benefit of using broker.list is that one can
>>> provide a vip as the only host. This makes it easy to add/remove brokers
>>> since no producer side config needs to be changed. Changing hosts in
>>> zk.connect, on the other hand, requires config changes in the client.
>>> Another reason for removing zkclient in the producer is that if the client
>>> GCs, it can cause churns in the producer and extra load on the zk server.
>>> Since our producer can be embedded in any client, it's hard for us to
>>> control the GC rate. So, removing zkclient in the producer releases the
>>> potential pressure from client GC.
>>> 
>>> We still rely on ZK for failure detection and leader election on the broker
>>> and the consumer though.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> On Tue, Nov 20, 2012 at 7:54 AM, David Arthur <mum...@gmail.com> wrote:
>>> 
>>>> 
>>>> On Nov 20, 2012, at 12:23 AM, Jun Rao wrote:
>>>> 
>>>>> Jason,
>>>>> 
>>>>> In 0.8, producer doesn't use zkclient at all. You just need to set
>>>>> broker.list.
>>>> 
>>>> This seems like a regression in functionality. For me, one of the benefits
>>>> of Kafka is only needing to know about ZooKeeper
>>>> 
>>>>> A number of things have changed In 0.8. First, number of
>>>>> partitions of a topic is global in a cluster and they don't really change
>>>>> as new brokers are added. Second, a partition is assigned to multiple
>>>>> brokers for replication and one of the replicas is the leader which
>>>> serves
>>>>> writes. When a producer starts up, it first uses the getMetadata api to
>>>>> figure out the replica assignment for the relevant topic/partition. It
>>>> then
>>>>> issues producer request directly the broker where the leader resides. If
>>>>> the leader broker goes down, the producer gets an exception and it will
>>>>> re-issue the getMetadata api to obtain the information about the new
>>>> leader.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun
>>>>> 
>>>>> On Mon, Nov 19, 2012 at 1:29 PM, Jason Rosenberg <j...@squareup.com>
>>>> wrote:
>>>>> 
>>>>>> Well, they do use zk though, to get the initial list of kafka nodes, and
>>>>>> while zk is available, presumably they do use it to keep up with the
>>>>>> dynamically changing set of kafka brokers, no?  You are just saying
>>>> that if
>>>>>> zk goes away, 0.8 producers can keep on producing, as long as the kafka
>>>>>> cluster remains stable?
>>>>>> 
>>>>>> Jason
>>>>>> 
>>>>>> On Mon, Nov 19, 2012 at 12:20 PM, Neha Narkhede <
>>>> neha.narkh...@gmail.com
>>>>>>> wrote:
>>>>>> 
>>>>>>> In 0.8, producers don't use zk. When producers encounter an error
>>>>>>> while sending data, they use a special getMetadata request to refresh
>>>>>>> the kafka cluster info from a randomly selected Kafka broker, and
>>>>>>> retry sending the data.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Neha
>>>>>>> 
>>>>>>> On Mon, Nov 19, 2012 at 12:10 PM, Jason Rosenberg <j...@squareup.com>
>>>>>>> wrote:
>>>>>>>> Are you saying that in 0.8, producers don't use zkclient?  Or don't
>>>>>> need
>>>>>>>> it?  How can a producer dynamically respond to a change in the kafka
>>>>>>>> cluster without zk?
>>>>>>>> 
>>>>>>>> On Mon, Nov 19, 2012 at 8:07 AM, Jun Rao <jun...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> Jae,
>>>>>>>>> 
>>>>>>>>> In 0.8, producers don't need ZK client anymore. Instead, it uses a
>>>> new
>>>>>>>>> getMetadata api to get topic/partition/leader information from the
>>>>>>> broker.
>>>>>>>>> Consumers still need ZK client. We plan to redesign the consumer post
>>>>>>> 0.8
>>>>>>>>> and can keep this in mind.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Jun
>>>>>>>>> 
>>>>>>>>> On Sun, Nov 18, 2012 at 10:35 PM, Bae, Jae Hyeon <metac...@gmail.com
>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> I want to suggest kafka should create only one instance of ZkClient
>>>>>>>>>> globally because ZkClient is thread safe and it will make many users
>>>>>>>>>> easily customize kafka source code for Zookeeper.
>>>>>>>>>> 
>>>>>>>>>> In our company's cloud environment, it is not recommended to create
>>>>>>>>>> ZkClient from zkConnect string directly because zookeeper cluster
>>>>>> can
>>>>>>>>>> be dynamically changing. So, I have to create ZkClient using our
>>>>>>>>>> company's own platform library. Because of this requirement, I can't
>>>>>>>>>> use kafka jar file directly. I can modify and build kafka source
>>>>>> code
>>>>>>>>>> but I have to repeat this work whenever I update kafka version,
>>>>>> pretty
>>>>>>>>>> annoying.
>>>>>>>>>> 
>>>>>>>>>> So, my suggestion is, let me pass ZkClient outs of Producer,
>>>>>> Consumer,
>>>>>>>>>> and Broker, as the following example.
>>>>>>>>>> 
>>>>>>>>>> Producer<String, String> producer =
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> ProducerBuilder.withZkClient(zkClient).build<String,String>(producerConfig);
>>>>>>>>>> 
>>>>>>>>>> ConsumerConnector connector =
>>>>>>>>>> Consumer.withZkClient(zkClient).createJavaConsumerConnector(new
>>>>>>>>>> ConsumerConfig(consumerProps));
>>>>>>>>>> 
>>>>>>>>>> KafkaServer is a little more complicated but I believe without much
>>>>>>>>>> effort we can refactor KafkaServer to be customized with ZkClient.
>>>>>>>>>> 
>>>>>>>>>> I really appreciate if this suggestion is accepted and merged to
>>>>>> 0.8.
>>>>>>>>>> If you want me to contribute with this suggestion, please let me
>>>>>> know
>>>>>>>>>> your opinion. If you are positive with this idea, I will contribute
>>>>>>>>>> very happily.
>>>>>>>>>> 
>>>>>>>>>> Thank you
>>>>>>>>>> Best, Jae
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>>> 

Reply via email to