Thanks, Jay, for the good summary.  Regarding point 2, I would think the 
heartbeat would still be desired, to give control over liveness detection 
parameters and to directly inform clients when gaining or losing a partition 
(especially when gaining a partition).  There would be no barrier and the 
rebalancer would be an offline scheduler, issuing SwitchPartition commands.  
The evaluation of a SwitchPartition command would await the consumer losing a 
partition to commit offset and any local commit work needed before confirming 
completion to the co-ordinator, which would then inform the new consumer and 
ISR brokers about the partition gain.  Broker security would be the master 
record of love assignments.  

Thanks,
Rob

> On Jul 21, 2014, at 6:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> 
> This thread is a bit long, but let me see if I can restate it
> correctly (not sure I fully follow).
> 
> There are two suggestions:
> 1. Allow partial rebalances that move just some partitions. I.e. if a
> consumer fails and has only one partition only one other consumer
> should be effected (the one who picks up the new partition). If there
> are many partitions to be reassigned there will obviously be a
> tradeoff between impacting all consumers and balancing load evenly.
> I.e. if you moved all load to one other consumer that would cause
> little rebalancing interruption but poor load balancing.
> 2. Have the co-ordinator communicate the assignments to the brokers
> rather than to the client directly. This could potentially simplify
> the consumer. Perhaps it would be possible to have the leader track
> liveness using the fetch requests rather than needing an artificial
> heartbeat.
> 
> These are interesting ideas.
> 
> -Jay
> 
> 
> 
>> On Mon, Jul 21, 2014 at 4:46 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>> Hello Rob,
>> 
>> If I get your idea right, the idea is that if the rebalance only changes
>> the ownership of a few consumers in the group, the coordinator can just
>> sync with them and do not interrupt with other consumers.
>> 
>> I think this approach may work. However it will likely complicates the
>> logic of coordinator after we sketch out all the details since the
>> rebalance results is basically depend on two variables: 1) partitions for
>> the subscribed topics, 2) consumers inside the group, hence following this
>> approach by the time the coordinator decides to trigger a rebalance, it
>> must correctly keep track of which variable changes triggers the current
>> rebalance process; on the other hand, the normal rebalance process even
>> with a global barrier should usually be very fast, with a few hundreds of
>> millis. So I am not sure if this is a worthy optimization that we would
>> want for now. What do you think?
>> 
>> Guozhang
>> 
>> 
>> On Sat, Jul 19, 2014 at 12:33 PM, Robert Withers <robert.w.with...@gmail.com
>>> wrote:
>> 
>>> Lock is a bad way to say it; a barrier is better.  I don't think what I am
>>> saying is even a barrier, since the rebalance would just need to recompute
>>> a rebalance schedule and submit it.  The only processing delay is to allow
>>> a soft remove to let the client cleanup, before you turn on the new guy, so
>>> it lags a bit.  Do you think this could this work?
>>> 
>>> Thanks,
>>> Rob
>>> 
>>>>> On Jul 18, 2014, at 7:22 PM, Robert Withers <robert.w.with...@gmail.com>
>>>> wrote:
>>>> 
>>>> Hi Guozhang,
>>>> 
>>>> Thank you for considering my suggestions.  The security layer sounds
>>> like the right facet to design for these sorts of capabilities.  Have you
>>> considered a chained ocap security model for the broker using hash tokens?
>>> This would provide for per-partition read/write capabilities with QoS
>>> context including leases, revocation, debug level and monitoring.  Overkill
>>> disappears as no domain specific info needs to be stored at the brokers,
>>> like consumer/partition assignments.  The read ocap for consumer 7/topic
>>> bingo/partition 131 could be revoked at the brokers for a partition and
>>> subsequent reads would fail the fetch for requests with that ocap token.
>>> You could also dynamically change the log level for a specific
>>> consumer/partition.
>>>> 
>>>> There are advantages we could discuss to having finer grained control.
>>> Consider that scheduled partition rebalancing could be implemented with no
>>> pauses from the perspective of the consumer threads; it looks like single
>>> partition lag, as the offset commit occurs before rotation, with no lag to
>>> non-rebalanced partitions: rebalance 1 partition per second so as to creep
>>> load to a newbie consumer.  It would eliminate a global read lock and even
>>> the internal Kafka consumer would never block on IO protocol other than the
>>> normal fetch request (and the initial join group request).
>>>> 
>>>> A global lock acquired through a pull protocol (HeartbeatRequest
>>> followed by a JoinGroupRequest) for all live consumers is a much bigger
>>> lock than security-based push protocol, as I assume the coordinator will
>>> have open sockets to all brokers in order to reach out as needed.  As well,
>>> each lock would be independent between partitions and be with only those
>>> brokers in ISR for a given partition.  It is a much smaller lock.
>>>> 
>>>> I had some time to consider my suggestion that it be viewed as a
>>> relativistic frame of reference.  Consider the model where each dimension
>>> of the frame of reference for each consumer is each partition, actually a
>>> sub-space with the dimensionality of the replication factor, but with a
>>> single leader election, so consider it 1 dimension.  The total
>>> dimensionality of the consumers frame of reference is the number of
>>> partitions, but only assigned partitions are open to a given consumers
>>> viewpoint.  The offset is the partition dimension's coordinate and only
>>> consumers with an open dimension can translate the offset.  A rebalance
>>> opens or closes a dimension for a given consumer and can be viewed as a
>>> rotation.  Could Kafka consumption and rebalance (and ISR leader election)
>>> be reduced to matrix operations?
>>>> 
>>>> Rob
>>>> 
>>>>> On Jul 18, 2014, at 12:08 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>> 
>>>>> Hi Rob,
>>>>> 
>>>>> Sorry for the late reply.
>>>>> 
>>>>> If I understand your approach correctly, it requires all brokers to
>>>>> remember the partition assignment of each consumer in order to decide
>>>>> whether or not authorizing the fetch request, correct? If we are indeed
>>>>> going to do such authorization for the security project then maybe it
>>> is a
>>>>> good way to go, but otherwise might be an overkill to just support finer
>>>>> grained partition assignment. In addition, instead of requiring a round
>>>>> trip between the coordinator and the consumers for the synchronization
>>>>> barrier, now the coordinator needs to wait for a round trip between
>>> itself
>>>>> and other brokers before it can return the join-group request, right?
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> 
>>>>> On Wed, Jul 16, 2014 at 10:27 AM, Rob Withers <
>>> robert.w.with...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Guozhang,
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Currently, the brokers do not know which high-level consumers are
>>> reading
>>>>>> which partitions and it is the rebalance between the consumers and the
>>>>>> coordinator which would authorize a consumer to fetch a particular
>>>>>> partition, I think.  Does this mean that when a rebalance occurs, all
>>>>>> consumers must send a JoinGroupRequest and that the coordinator will
>>> not
>>>>>> respond to any consumers until all consumers have sent the
>>>>>> JoinGroupRequest, to enable the synchronization barrier?  That has the
>>>>>> potential to be a sizable global delay.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On the assumption that there is only one coordinator for a group, why
>>>>>> couldn't the synchronization barrier be per partition and internal to
>>> kafka
>>>>>> and mostly not involve the consumers, other than a chance for
>>> offsetCommit
>>>>>> by the consumer losing a partition?   If the brokers have session
>>> state and
>>>>>> knows the new assignment before the consumer is notified with a
>>>>>> HeartbeatResponse, it could fail the fetch request from that consumer
>>> for
>>>>>> an invalidly assigned partition.  The consumer could take an invalid
>>>>>> partition failure of the fetch request as if it were a
>>> HeartbeatResponse
>>>>>> partition removal.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> The only gap seems to be: when does a consumer that is losing a
>>> partition
>>>>>> get a chance to commit its offset?  If there were a
>>>>>> PartitionCommittedNotification message that a consumer could send to
>>> the
>>>>>> coordinator after committing its offsets, then the coordinator could
>>> send
>>>>>> the add partition HeartbeatResponse, after receiving the
>>>>>> PartitionCommittedNotification, to the consumer gaining the partition
>>> and
>>>>>> the offset management is stable.  The advantage is that none of the
>>> other
>>>>>> consumers would be paused on any other partitions.  Only partitions
>>> being
>>>>>> rebalanced would see any consumption pause.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> So, something like:
>>>>>> 
>>>>>> 1.  3 consumers running with 12 partitions balanced, 4 each
>>>>>> 
>>>>>> 2.  New consumer starts and sends JoinGroupRequest to coordinator
>>>>>> 
>>>>>> 3.  Coordinator computes rebalance with 4 consumers: each existing
>>>>>> consumer will lose a partition assigned to the new consumer
>>>>>> 
>>>>>> 4.  Coordinator informs all live brokers of partition reassignments
>>>>>> 
>>>>>> 5.  Brokers receive reassignments, starts failing unauthorized fetch
>>>>>> requests and acks back to the coordinator
>>>>>> 
>>>>>> 6.  Coordinator receives all broker acks and sends HeartbeatResponses
>>> with
>>>>>> partition removals to existing consumers and awaits
>>>>>> PartitionCommittedNotifications from consumers losing partitions.
>>>>>> 
>>>>>> 7.  Existing consumers can continue to fetch messages from correctly
>>>>>> assigned partitions
>>>>>> 
>>>>>> 8.  When an existing consumer fails a fetch for a partition or gets a
>>>>>> HeartbeatResponse with a partition removal, it would commitOffsets for
>>> that
>>>>>> partition and then send a PartitionCommittedNotification to the
>>> coordinator.
>>>>>> 
>>>>>> 9.  As the Coordinator receives the PartitionCommittedNotification,
>>> for a
>>>>>> particular partition from an existing consumer, it sends the
>>> addPartition
>>>>>> to consumer 4, in a HeartbeatResponse and the new consumer can start
>>>>>> fetching that partition.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> If a consumer drops HeartbeatRequests within a session timeout, the
>>>>>> coordinator would inform the brokers and they would fail fetchRequests
>>> for
>>>>>> those partitions from that consumer.   There is no chance to send
>>>>>> removePartitions since no Heartbeat is occurring, but the addPartitions
>>>>>> could be sent and the offset is what it is.  This seems no different
>>> than
>>>>>> this sort of failure, today.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Instead of a global synchronization barrier isn’t it possible to have
>>> an
>>>>>> incremental per-partition synchronization barrier?  The brokers would
>>> have
>>>>>> to be aware of this.  I think of it as relativistic from each
>>> acceleration
>>>>>> frame of reference, which is each consumer: event horizons.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> 
>>>>>> Rob
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -----Original Message-----
>>>>>> From: Guozhang Wang [mailto:wangg...@gmail.com]
>>>>>> Sent: Wednesday, July 16, 2014 9:20 AM
>>>>>> To: users@kafka.apache.org
>>>>>> Subject: Re: New Consumer Design
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Hi Rob,
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Piggy-back the rebalance partition info in HeartbeatResponse may cause
>>>>>> inconsistency of the partition assignments to consumers with
>>> consecutive
>>>>>> triggering of rebalances, since the coordinator no longer has a
>>>>>> synchronization barrier any more for re-compute the distribution with a
>>>>>> consistent view.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Jul 14, 2014 at 4:16 PM, Robert Withers < <mailto:
>>>>>> robert.w.with...@gmail.com> robert.w.with...@gmail.com>
>>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> On Jul 14, 2014, at 3:20 PM, Baran Nohutçuoğlu < <mailto:
>>>>>> ba...@tinkerhq.com> ba...@tinkerhq.com>
>>>>>> 
>>>>>>> wrote:
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>>> On Jul 8, 2014, at 3:17 PM, Guozhang Wang < <mailto:
>>>>>> wangg...@gmail.com> wangg...@gmail.com> wrote:
>>>>>> 
>>>>>> 
>>>>>>>>> Hi All,
>>>>>> 
>>>>>> 
>>>>>>>>> We have written a wiki a few weeks back proposing a single-threaded
>>>>>> 
>>>>>>> ZK-free
>>>>>> 
>>>>>>>>> consumer client design for 0.9:
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> <
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+R
>>>>>> 
>>>>>>> ewrite+Design
>>>>>> 
>>>>>> 
>>>>>>>>> We want to share some of the ideas that came up for this design to
>>>>>> 
>>>>>>>>> get
>>>>>> 
>>>>>>> some
>>>>>> 
>>>>>>>>> early feedback. The below discussion assumes you have read the
>>>>>> 
>>>>>>>>> above
>>>>>> 
>>>>>>> wiki.
>>>>>> 
>>>>>> 
>>>>>>>>> *Offset Management*
>>>>>> 
>>>>>> 
>>>>>>>>> To make consumer clients ZK-free we need to move the offset
>>>>>> 
>>>>>>>>> management utility from ZooKeeper into the Kafka servers, which
>>>>>> 
>>>>>>>>> then store offsets
>>>>>> 
>>>>>>> as
>>>>>> 
>>>>>>>>> a special log. Key-based log compaction will be used to keep this
>>>>>> 
>>>>>>>>> ever-growing log's size bounded. Brokers who are responsible for
>>>>>> 
>>>>>>>>> the
>>>>>> 
>>>>>>> offset
>>>>>> 
>>>>>>>>> management for a given consumer group will be the leader of this
>>>>>> 
>>>>>>>>> special offset log partition where the partition key will be
>>>>>> 
>>>>>>>>> consumer group
>>>>>> 
>>>>>>> names.
>>>>>> 
>>>>>>>>> On the consumer side, instead of talking to ZK for commit and read
>>>>>> 
>>>>>>> offsets,
>>>>>> 
>>>>>>>>> it talks to the servers with new offset commit and offset fetch
>>>>>> 
>>>>>>>>> request types for offset management. This work has been done and
>>>>>> 
>>>>>>>>> will be
>>>>>> 
>>>>>>> included
>>>>>> 
>>>>>>>>> in the 0.8.2 release. Details of the implementation can be found in
>>>>>> 
>>>>>>>>> KAFKA-1000 and this wiki:
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> <
>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Off
>>>>>> 
>>>>>>> set+Management
>>>>>> 
>>>>>> 
>>>>>>>>> *Group Membership Management*
>>>>>> 
>>>>>> 
>>>>>>>>> The next step is to move the membership management and rebalancing
>>>>>> 
>>>>>>> utility
>>>>>> 
>>>>>>>>> out of ZooKeeper as well. To do this we introduced a consumer
>>>>>> 
>>>>>>> coordinator
>>>>>> 
>>>>>>>>> hosted on the Kafka servers which is responsible for keeping track
>>>>>> 
>>>>>>>>> of
>>>>>> 
>>>>>>> group
>>>>>> 
>>>>>>>>> membership and consumption partition changes, and the corresponding
>>>>>> 
>>>>>>>>> rebalancing process.
>>>>>> 
>>>>>> 
>>>>>>>>> *1. Failure Detection*
>>>>>> 
>>>>>> 
>>>>>>>>> More specifically, we will use a heartbeating protocol for consumer
>>>>>> 
>>>>>>>>> and coordinator failure detections. The heartbeating protocol would
>>>>>> 
>>>>>>>>> be
>>>>>> 
>>>>>>> similar
>>>>>> 
>>>>>>>>> to the one ZK used, i.e. the consumer will set a session timeout
>>>>>> 
>>>>>>>>> value
>>>>>> 
>>>>>>> to
>>>>>> 
>>>>>>>>> the coordinator on startup, and keep sending heartbeats to the
>>>>>> 
>>>>>>> coordinator
>>>>>> 
>>>>>>>>> afterwards every session-timeout / heartbeat-frequency. The
>>>>>> 
>>>>>>>>> coordinator will treat a consumer as failed when it has not heard
>>>>>> 
>>>>>>>>> from the consumer after session-timeout, and the consumer will
>>>>>> 
>>>>>>>>> treat its coordinator as failed after it has not heard its
>>>>>> 
>>>>>>>>> heartbeat responses after
>>>>>> 
>>>>>>> session-timeout.
>>>>>> 
>>>>>> 
>>>>>>>>> One difference with ZK heartbeat protocol is that instead of fixing
>>>>>> 
>>>>>>>>> the heartbeat-frequency as three, we make this value configurable
>>>>>> 
>>>>>>>>> in
>>>>>> 
>>>>>>> consumer
>>>>>> 
>>>>>>>>> clients. This is because the rebalancing process (we will discuss
>>>>>> 
>>>>>>>>> later) latency is lower bounded by the heartbeat frequency , so we
>>>>>> 
>>>>>>>>> want this
>>>>>> 
>>>>>>> value
>>>>>> 
>>>>>>>>> to be large while not DDoSing the servers.
>>>>>> 
>>>>>> 
>>>>>>>>> *2. Consumer Rebalance*
>>>>>> 
>>>>>> 
>>>>>>>>> A bunch of events can trigger a rebalance process, 1) consumer
>>>>>> 
>>>>>>>>> failure,
>>>>>> 
>>>>>>> 2)
>>>>>> 
>>>>>>>>> new consumer joining group, 3) existing consumer change consume
>>>>>> 
>>>>>>>>> topic/partitions, 4) partition change for the consuming topics.
>>>>>> 
>>>>>>>>> Once the coordinator decides to trigger a rebalance it will notify
>>>>>> 
>>>>>>>>> the consumers within to resend their topic/partition subscription
>>>>>> 
>>>>>>>>> information, and
>>>>>> 
>>>>>>> then
>>>>>> 
>>>>>>>>> assign existing partitions to these consumers. On the consumers
>>>>>> 
>>>>>>>>> end,
>>>>>> 
>>>>>>> they
>>>>>> 
>>>>>>>>> no long run any rebalance logic in a distributed manner but follows
>>>>>> 
>>>>>>>>> the partition assignment it receives from the coordinator. Since
>>>>>> 
>>>>>>>>> the
>>>>>> 
>>>>>>> consumers
>>>>>> 
>>>>>>>>> can only be notified of a rebalance triggering via heartbeat
>>>>>> 
>>>>>>>>> responses,
>>>>>> 
>>>>>>> the
>>>>>> 
>>>>>>>>> rebalance latency is lower bounded by the heartbeat frequency; that
>>>>>> 
>>>>>>>>> is
>>>>>> 
>>>>>>> why
>>>>>> 
>>>>>>>>> we allow consumers to negotiate this value with the coordinator
>>>>>> 
>>>>>>>>> upon registration, and we would like to hear people's thoughts
>>>>>> 
>>>>>>>>> about this protocol.
>>>>>> 
>>>>>> 
>>>>>>>> One question on this topic, my planned usage pattern is to have a
>>>>>> 
>>>>>>>> high
>>>>>> 
>>>>>>> number of consumers that are ephemeral.  Imagine a consumer coming
>>>>>> 
>>>>>>> online, consuming messages for a short duration, then disappearing
>>>>>> 
>>>>>>> forever.  Is this use case supported?
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> If consumers are coming and going, there will be a high rate of
>>>>>> 
>>>>>>> rebalances and that will really start throttling consumption at some
>>>>>> 
>>>>>>> point.  You may have a use that hits that detrimentally.  A custom
>>>>>> 
>>>>>>> rebalance algorithm may be able to rebalance a sub-set of the
>>>>>> 
>>>>>>> consumers and not inform others of a need to rebalance them, which
>>>>>> 
>>>>>>> would ameliorate the cost.  Still the consumers that need to rebalance
>>>>>> would pay.
>>>>>> 
>>>>>> 
>>>>>>> If the initial JoinGroupRequest was preserved, would it be possible to
>>>>>> 
>>>>>>> send rebalanced partition information in the HeartbeatResponse, up
>>>>>> 
>>>>>>> front to all (or a sub-set) consumers, and avoid the rejoin round
>>>>>> 
>>>>>>> trip?  I mean, push the rebalance and not pull it with a secondary
>>>>>> 
>>>>>>> JoinGroupRequest.  This would reduce cost and lower the throttle
>>>>>> 
>>>>>>> point, but poorly timed fetch requests may fail.  If the consumer were
>>>>>> 
>>>>>>> resilient to fetch failures, might it work?
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>>> In addition, the rebalance logic in the new consumer will be
>>>>>> 
>>>>>>> customizable
>>>>>> 
>>>>>>>>> instead of hard-written in a per-topic round-robin manner. Users
>>>>>> 
>>>>>>>>> will be able to write their own assignment class following a common
>>>>>> 
>>>>>>>>> interface,
>>>>>> 
>>>>>>> and
>>>>>> 
>>>>>>>>> consumers upon registering themselves will specify in their
>>>>>> 
>>>>>>>>> registration request the assigner class name they wanted to use. If
>>>>>> 
>>>>>>>>> consumers within
>>>>>> 
>>>>>>> the
>>>>>> 
>>>>>>>>> same group specify different algorithms the registration will be
>>>>>> 
>>>>>>> rejected.
>>>>>> 
>>>>>>>>> We are not sure if it is the best way of customizing rebalance
>>>>>> 
>>>>>>>>> logic
>>>>>> 
>>>>>>> yet,
>>>>>> 
>>>>>>>>> so any feedbacks are more than welcome.
>>>>>> 
>>>>>> 
>>>>>>>>> For wildcard consumption, the consumers now will capture new topics
>>>>>> 
>>>>>>>>> that are available for fetching through the topic metadata request.
>>>>>> 
>>>>>>>>> That is, periodically the consumers will update its topic metadata
>>>>>> 
>>>>>>>>> for fetching,
>>>>>> 
>>>>>>> and
>>>>>> 
>>>>>>>>> if new topics are returned in the metadata response matching its
>>>>>> 
>>>>>>> wildcard
>>>>>> 
>>>>>>>>> regex, it will notify the coordinator to let it trigger a new
>>>>>> 
>>>>>>>>> rebalance
>>>>>> 
>>>>>>> to
>>>>>> 
>>>>>>>>> assign partitions for this new topic.
>>>>>> 
>>>>>> 
>>>>>>>>> There are some failure handling cases during the rebalancing
>>>>>> 
>>>>>>>>> process discussed in the consumer rewrite design wiki. We would
>>>>>> 
>>>>>>>>> encourage
>>>>>> 
>>>>>>> people to
>>>>>> 
>>>>>>>>> read it and let us know if there are any other corner cases not
>>>>>> covered.
>>>>>> 
>>>>>> 
>>>>>>>>> Moving forward, we are thinking about making the coordinator to
>>>>>> 
>>>>>>>>> handle
>>>>>> 
>>>>>>> both
>>>>>> 
>>>>>>>>> the group management and offset management for the given groups,
>>> i.e.
>>>>>> 
>>>>>>> the
>>>>>> 
>>>>>>>>> leader of the offset log partition will naturally become the
>>>>>> 
>>>>>>> coordinator of
>>>>>> 
>>>>>>>>> the corresponding consumer. This allows the coordinator to reject
>>>>>> 
>>>>>>>>> offset commit requests if its sender is not part of the group. To
>>>>>> 
>>>>>>>>> do so a
>>>>>> 
>>>>>>> consumer
>>>>>> 
>>>>>>>>> id and generation id are applied to control the group member
>>>>>> 
>>>>>>> generations.
>>>>>> 
>>>>>>>>> Details of this design is included in the write design wiki page,
>>>>>> 
>>>>>>>>> and
>>>>>> 
>>>>>>> again
>>>>>> 
>>>>>>>>> any feedbacks are appreciated.
>>>>>> 
>>>>>> 
>>>>>>>>> *3. Non-blocking Network IO*
>>>>>> 
>>>>>> 
>>>>>>>>> With a single-threaded consumer, we will instead use a non-blocking
>>>>>> 
>>>>>>> network
>>>>>> 
>>>>>>>>> IO (i.e. java.nio) to do fetching loops just as the new producer
>>>>>> 
>>>>>>>>> does
>>>>>> 
>>>>>>> for
>>>>>> 
>>>>>>>>> asynchronous sending data. In terms of implementation, we will let
>>>>>> 
>>>>>>>>> the
>>>>>> 
>>>>>>> new
>>>>>> 
>>>>>>>>> consumer/producer clients be based on a common non-blocking IO
>>>>>> 
>>>>>>>>> thread class. Details of this refactoring can be found in
>>> KAFKA-1316.
>>>>>> 
>>>>>> 
>>>>>>>>> *4. Consumer API*
>>>>>> 
>>>>>> 
>>>>>>>>> As a result of the single-threaded non-blocking network IO
>>>>>> 
>>>>>>> implementation,
>>>>>> 
>>>>>>>>> the new consumer API will no longer base on a blocking stream
>>>>>> 
>>>>>>>>> iterator interface but a poll(timeout) interface. In addition, the
>>>>>> 
>>>>>>>>> consumer APIs
>>>>>> 
>>>>>>> are
>>>>>> 
>>>>>>>>> designed to be flexible so that people can choose to either use the
>>>>>> 
>>>>>>> default
>>>>>> 
>>>>>>>>> offset and group membership management utilities mentioned above or
>>>>>> 
>>>>>>>>> implement their own offset/group management logic. For example, for
>>>>>> 
>>>>>>>>> scenarios that needs fixed partition assignment or that prefer to
>>>>>> 
>>>>>>>>> store offsets locally in a data store, the new consumer APIs would
>>>>>> 
>>>>>>>>> make it
>>>>>> 
>>>>>>> much
>>>>>> 
>>>>>>>>> easier to customize. Details of the API design can be found in
>>>>>> 
>>>>>>> KAFKA-1328.
>>>>>> 
>>>>>>>>> A question to think about is are there any other potential use
>>>>>> 
>>>>>>>>> cases
>>>>>> 
>>>>>>> that
>>>>>> 
>>>>>>>>> can not be easily supported by this API.
>>>>>> 
>>>>>> 
>>>>>>>>> Cheers,
>>>>>> 
>>>>>>>>> Guozhang
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> 
>>>>>> -- Guozhang
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>> 
>> 
>> 
>> --
>> -- Guozhang

Reply via email to