Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-26 Thread Guozhang Wang
Re versions: fair enough, I think it's okay to keep it as strings. Just to
clarify the concern I had is that if we do want to augment it in the
future, it will be harder to change a `string` field into a `struct` :P

Re onAssignment: actually even with the current proposal, since we are
giving the final encoded metadata upon the first `onAssignment` call for
the generation already, Streams would infer all the standby tasks that need
are migrated in / out, so for Streams' purposes, we'd need to handle such
decoupling anyways, and it's actually the opposite: if the first
`onAssignment` does not include those yet-to-be-assigned then Streams would
not know if a migrating out standby would really be promoting to active
later or should it be completely removed, until later when next
`onAssignment` is called. And that's where I mentioned "we'd need to figure
out which onAssignment is the final call" etc. If we have all the
partitions on the `onAssignment`, then we can infer such actions and decide
whether we should close a standby task immediately or just recycle it and
wait for the active task to be assigned eventually.

On the other hand, if we call onAssignment incrementally similar to
onPartitionsAssigned, in which we include both assigned and
yet-to-be-assigned partitions, then for people who implement both
interfaces, they can just ignore the `onPartitionsAssigned` since they have
all knowledge they need from onAssignment already. And that's what I'm
trying to avoid by making the functionality of the two more separated.


Guozhang




On Mon, Sep 26, 2022 at 11:30 AM David Jacot 
wrote:

> Regarding the version, I would rather add this later when we have a
> clear use case for it. It seems to me that we are speculating here. I
> understand your point but I am not fully convinced by the solution at
> the moment. Would you agree with this?
>
> Regarding the onAssignment, I was thinking about the case where a task
> is promoted from standby to active. In this case, having onAssignment
> with the combined partitions could make it difficult, no? I am
> thinking that the assignor will have to remove the standby based on
> the metadata but it won't know what to do after that. If the partition
> is assigned directly, it can convert it to an active task. On the
> other hand, if the partition is not available yet, it would have to
> keep it as a standby until the partition is really assigned. It seems
> to be that the assignor won't have the information to make this
> decision, no? In this case, decouple the "why" from the "when" seems
> to make things harder. I am not so familiar with Streams though so my
> intuition could be wrong here.
>
> David
>
> On Mon, Sep 26, 2022 at 7:26 PM Guozhang Wang  wrote:
> >
> > Regarding the version, what I was thinking is that in the HB request, for
> > "serverAssignor" field, instead of just having it as a single string
> field,
> > maybe we could consider also making it a structure that includes: name,
> > minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion
> > means the versions of the server assignor that the client work best with.
> > That being said, I agree with you that such information may also be
> > inferred elsewhere e.g. by looking into the "rackId" field, and see if it
> > contains a hyphen or not etc. All I was wondering is that, if such
> version
> > information would be useful for the server assignors to determine its
> > actual assignment logic. I do not feel very strong about this one though
> > --- even if we do not add it now, we can potentially add later, it's just
> > that changing a single string field to a structure would be hard for
> > compatibility and we'd then probably have to add top-level fields.
> >
> > Regarding the `onAssignment` logic, again my train of thoughts is that,
> if
> > users want to know exactly when a partition is assigned / revoked, they
> > would be leveraging on the rebalance callbacks, as that's what people
> > should rely on to determine "when" partitions are assigned. The
> > `onAssignment` should be used for getting "why" such partition assignment
> > decision is made, and hence returning `combined partitions` would be
> okay.
> > Streams e.g. implement both rebalance callbacks and the assignors, and it
> > gets the "when" from the former (and create/close active tasks
> accordingly)
> > and the "why" from the latter (and update its global info bookkeeping as
> > well as standby maintenance accordingly). Most users would be just
> > interested in the rebalance callback, and not implement their own
> assignor
> > at all if they do not care about "why" as they trust the server assignors
> > would take good care of those, and only about "when". So if we did build
> > such two types of APIs from scratch, I'd indeed feel that not providing
> the
> > partitions but only the metadata for `onAssignment` may be less confusing
> > and push users to separate the usage of these two more clearly, but since
> > we 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-26 Thread David Jacot
Regarding the version, I would rather add this later when we have a
clear use case for it. It seems to me that we are speculating here. I
understand your point but I am not fully convinced by the solution at
the moment. Would you agree with this?

Regarding the onAssignment, I was thinking about the case where a task
is promoted from standby to active. In this case, having onAssignment
with the combined partitions could make it difficult, no? I am
thinking that the assignor will have to remove the standby based on
the metadata but it won't know what to do after that. If the partition
is assigned directly, it can convert it to an active task. On the
other hand, if the partition is not available yet, it would have to
keep it as a standby until the partition is really assigned. It seems
to be that the assignor won't have the information to make this
decision, no? In this case, decouple the "why" from the "when" seems
to make things harder. I am not so familiar with Streams though so my
intuition could be wrong here.

David

On Mon, Sep 26, 2022 at 7:26 PM Guozhang Wang  wrote:
>
> Regarding the version, what I was thinking is that in the HB request, for
> "serverAssignor" field, instead of just having it as a single string field,
> maybe we could consider also making it a structure that includes: name,
> minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion
> means the versions of the server assignor that the client work best with.
> That being said, I agree with you that such information may also be
> inferred elsewhere e.g. by looking into the "rackId" field, and see if it
> contains a hyphen or not etc. All I was wondering is that, if such version
> information would be useful for the server assignors to determine its
> actual assignment logic. I do not feel very strong about this one though
> --- even if we do not add it now, we can potentially add later, it's just
> that changing a single string field to a structure would be hard for
> compatibility and we'd then probably have to add top-level fields.
>
> Regarding the `onAssignment` logic, again my train of thoughts is that, if
> users want to know exactly when a partition is assigned / revoked, they
> would be leveraging on the rebalance callbacks, as that's what people
> should rely on to determine "when" partitions are assigned. The
> `onAssignment` should be used for getting "why" such partition assignment
> decision is made, and hence returning `combined partitions` would be okay.
> Streams e.g. implement both rebalance callbacks and the assignors, and it
> gets the "when" from the former (and create/close active tasks accordingly)
> and the "why" from the latter (and update its global info bookkeeping as
> well as standby maintenance accordingly). Most users would be just
> interested in the rebalance callback, and not implement their own assignor
> at all if they do not care about "why" as they trust the server assignors
> would take good care of those, and only about "when". So if we did build
> such two types of APIs from scratch, I'd indeed feel that not providing the
> partitions but only the metadata for `onAssignment` may be less confusing
> and push users to separate the usage of these two more clearly, but since
> we already introduced partitions in `onAssignment` for compatibility I'm
> less keen on removing them.
>
>
> Guozhang
>
> On Mon, Sep 26, 2022 at 6:55 AM David Jacot 
> wrote:
>
> > Hi Guozhang,
> >
> > Regarding the version, my understanding is that the version would be
> > either the client software version or the request version, is this
> > correct? If so, we could indeed pass this information down to the
> > assignor via the interface. One way would be to pass a "server
> > context" to the assignor and that context would include that
> > information (and perhaps more). Is this what you are looking for?
> >
> > Regarding the onAssignment, I think that I understand your point. I
> > suppose that the assignor could also be clever and keep track of the
> > last metadata to decide whether it has to do something or not. One
> > question that is still not clear to me is whether the assignor needs
> > to know all the assigned partitions upfront regardless of whether they
> > are already revoked or not. Do you think that we need this as well?
> >
> > From an API perspective, we could have something like
> > onAssignment(Metadata(version, reason, metadata, assigned partitions,
> > pending partitions)). Where the assigned partitions are the partitions
> > ready to be used and the pending partitions are the one assigned to
> > the member but not revoked yet. I find it a bit weird that this method
> > would be called only once because the assignor would not know when the
> > pending partitions changes. That does not look like a clean API. An
> > alternative would be to use onAssignment(Metadata(version, reason,
> > metadata, combined partitions)) but this seems error prone because it
> > is not clear whether a 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-26 Thread Guozhang Wang
Regarding the version, what I was thinking is that in the HB request, for
"serverAssignor" field, instead of just having it as a single string field,
maybe we could consider also making it a structure that includes: name,
minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion
means the versions of the server assignor that the client work best with.
That being said, I agree with you that such information may also be
inferred elsewhere e.g. by looking into the "rackId" field, and see if it
contains a hyphen or not etc. All I was wondering is that, if such version
information would be useful for the server assignors to determine its
actual assignment logic. I do not feel very strong about this one though
--- even if we do not add it now, we can potentially add later, it's just
that changing a single string field to a structure would be hard for
compatibility and we'd then probably have to add top-level fields.

Regarding the `onAssignment` logic, again my train of thoughts is that, if
users want to know exactly when a partition is assigned / revoked, they
would be leveraging on the rebalance callbacks, as that's what people
should rely on to determine "when" partitions are assigned. The
`onAssignment` should be used for getting "why" such partition assignment
decision is made, and hence returning `combined partitions` would be okay.
Streams e.g. implement both rebalance callbacks and the assignors, and it
gets the "when" from the former (and create/close active tasks accordingly)
and the "why" from the latter (and update its global info bookkeeping as
well as standby maintenance accordingly). Most users would be just
interested in the rebalance callback, and not implement their own assignor
at all if they do not care about "why" as they trust the server assignors
would take good care of those, and only about "when". So if we did build
such two types of APIs from scratch, I'd indeed feel that not providing the
partitions but only the metadata for `onAssignment` may be less confusing
and push users to separate the usage of these two more clearly, but since
we already introduced partitions in `onAssignment` for compatibility I'm
less keen on removing them.


Guozhang

On Mon, Sep 26, 2022 at 6:55 AM David Jacot 
wrote:

> Hi Guozhang,
>
> Regarding the version, my understanding is that the version would be
> either the client software version or the request version, is this
> correct? If so, we could indeed pass this information down to the
> assignor via the interface. One way would be to pass a "server
> context" to the assignor and that context would include that
> information (and perhaps more). Is this what you are looking for?
>
> Regarding the onAssignment, I think that I understand your point. I
> suppose that the assignor could also be clever and keep track of the
> last metadata to decide whether it has to do something or not. One
> question that is still not clear to me is whether the assignor needs
> to know all the assigned partitions upfront regardless of whether they
> are already revoked or not. Do you think that we need this as well?
>
> From an API perspective, we could have something like
> onAssignment(Metadata(version, reason, metadata, assigned partitions,
> pending partitions)). Where the assigned partitions are the partitions
> ready to be used and the pending partitions are the one assigned to
> the member but not revoked yet. I find it a bit weird that this method
> would be called only once because the assignor would not know when the
> pending partitions changes. That does not look like a clean API. An
> alternative would be to use onAssignment(Metadata(version, reason,
> metadata, combined partitions)) but this seems error prone because it
> is not clear whether a partition is usable or not. Or do you think
> that we should not provide the partitions but only the metadata?
>
> Best,
> David
>
> On Fri, Sep 23, 2022 at 9:40 PM Guozhang Wang  wrote:
> >
> > Hello David,
> >
> > On Fri, Sep 23, 2022 at 2:00 AM David Jacot  >
> > wrote:
> >
> > > Hey,
> > >
> > > > Just to clarify I was asking about the `version` of the assignor
> (i.e. up
> > > to what version that the client would support), and I do agree we
> would not
> > > need metadata. What I have in mind is that, for some specific built-in
> > > broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> > > newer version we would have a hierarchical rack ID string format, like
> > > "tier1-tier2" etc, but if some client has not upgraded their rack ID
> > > would still be in old format. In this case, the broker then needs to
> choose
> > > the old versioned assignor. I'm probably making something up here for
> rack
> > > aware assignors, but I'm wondering if in general such an
> "auto-downgrade"
> > > behavior would be needed still for broker-side assignor, and if yes
> would
> > > "version" still be useful.
> > >
> > > Got it. That's an interesting thought. I think that the issue is that
> > > the 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-26 Thread David Jacot
Hi Guozhang,

Regarding the version, my understanding is that the version would be
either the client software version or the request version, is this
correct? If so, we could indeed pass this information down to the
assignor via the interface. One way would be to pass a "server
context" to the assignor and that context would include that
information (and perhaps more). Is this what you are looking for?

Regarding the onAssignment, I think that I understand your point. I
suppose that the assignor could also be clever and keep track of the
last metadata to decide whether it has to do something or not. One
question that is still not clear to me is whether the assignor needs
to know all the assigned partitions upfront regardless of whether they
are already revoked or not. Do you think that we need this as well?

>From an API perspective, we could have something like
onAssignment(Metadata(version, reason, metadata, assigned partitions,
pending partitions)). Where the assigned partitions are the partitions
ready to be used and the pending partitions are the one assigned to
the member but not revoked yet. I find it a bit weird that this method
would be called only once because the assignor would not know when the
pending partitions changes. That does not look like a clean API. An
alternative would be to use onAssignment(Metadata(version, reason,
metadata, combined partitions)) but this seems error prone because it
is not clear whether a partition is usable or not. Or do you think
that we should not provide the partitions but only the metadata?

Best,
David

On Fri, Sep 23, 2022 at 9:40 PM Guozhang Wang  wrote:
>
> Hello David,
>
> On Fri, Sep 23, 2022 at 2:00 AM David Jacot 
> wrote:
>
> > Hey,
> >
> > > Just to clarify I was asking about the `version` of the assignor (i.e. up
> > to what version that the client would support), and I do agree we would not
> > need metadata. What I have in mind is that, for some specific built-in
> > broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> > newer version we would have a hierarchical rack ID string format, like
> > "tier1-tier2" etc, but if some client has not upgraded their rack ID
> > would still be in old format. In this case, the broker then needs to choose
> > the old versioned assignor. I'm probably making something up here for rack
> > aware assignors, but I'm wondering if in general such an "auto-downgrade"
> > behavior would be needed still for broker-side assignor, and if yes would
> > "version" still be useful.
> >
> > Got it. That's an interesting thought. I think that the issue is that
> > the client will never tell you which version of the server-side
> > assignor should be used. Do you think that the coordinator would
> > downgrade the version if the assignment fails with a higher version? I
> > tend to believe that this should be handled within the assignor
> > itself. In the example that you mentioned, the assignor would have to
> > handle all the cases. I am not really convinced that we need this at
> > the moment.
> >
> > The version from the client side would not be indicating the broker which
> version to use, but rather which version the client would "work best with".
> Such a "version" field would not be settible by the users, since they will
> be hard-codedly bumped when the Kafka byte code version bumped.
> Back to the rack aware assignor example, if the older versioned client does
> not have a hierarchical rack ID, however if the assignment returned to them
> is assuming a hierarchical rack structure, it may not reflect the best
> workload balance among those new and old versioned clients. That means,
> when receiving the members subscriptions at the server side, if the
> versions from all these members are different, the broker's assignor may
> need to consider using the lower version logic to do the assignment. So yes
> the assignor would indeed have to handle all such cases, but it needs to do
> so such that if there are clients who would not work with certain new
> logic, it would then handle such cases automatically by e.g. still using an
> older versioned logic.
>
>
>
> > > Okay, my understanding is that the calling ordering of these callbacks
> > would be like the following:
> >
> > Yes, your examples look right.
> >
> > > I'm wondering if we would still call onAssignment just once, that encodes
> > all the assignment for this rebalance, including all the partitions that
> > should be assigned to the member but not yet assigned since they have not
> > been revoked by others. In that case the call ordering would be:
> >
> > Interesting. Is there a case for Streams where having the full
> > assignment is beneficial? For instance, I can think of the following
> > case. When a standby task is promoted to an active task, the metadata
> > would not contain the standby task anymore and the assignment may not
> > have the partition yet. In this case, Streams would stop the standby
> > tasks but not have the active task 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-23 Thread Guozhang Wang
Hello David,

On Fri, Sep 23, 2022 at 2:00 AM David Jacot 
wrote:

> Hey,
>
> > Just to clarify I was asking about the `version` of the assignor (i.e. up
> to what version that the client would support), and I do agree we would not
> need metadata. What I have in mind is that, for some specific built-in
> broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> newer version we would have a hierarchical rack ID string format, like
> "tier1-tier2" etc, but if some client has not upgraded their rack ID
> would still be in old format. In this case, the broker then needs to choose
> the old versioned assignor. I'm probably making something up here for rack
> aware assignors, but I'm wondering if in general such an "auto-downgrade"
> behavior would be needed still for broker-side assignor, and if yes would
> "version" still be useful.
>
> Got it. That's an interesting thought. I think that the issue is that
> the client will never tell you which version of the server-side
> assignor should be used. Do you think that the coordinator would
> downgrade the version if the assignment fails with a higher version? I
> tend to believe that this should be handled within the assignor
> itself. In the example that you mentioned, the assignor would have to
> handle all the cases. I am not really convinced that we need this at
> the moment.
>
> The version from the client side would not be indicating the broker which
version to use, but rather which version the client would "work best with".
Such a "version" field would not be settible by the users, since they will
be hard-codedly bumped when the Kafka byte code version bumped.
Back to the rack aware assignor example, if the older versioned client does
not have a hierarchical rack ID, however if the assignment returned to them
is assuming a hierarchical rack structure, it may not reflect the best
workload balance among those new and old versioned clients. That means,
when receiving the members subscriptions at the server side, if the
versions from all these members are different, the broker's assignor may
need to consider using the lower version logic to do the assignment. So yes
the assignor would indeed have to handle all such cases, but it needs to do
so such that if there are clients who would not work with certain new
logic, it would then handle such cases automatically by e.g. still using an
older versioned logic.



> > Okay, my understanding is that the calling ordering of these callbacks
> would be like the following:
>
> Yes, your examples look right.
>
> > I'm wondering if we would still call onAssignment just once, that encodes
> all the assignment for this rebalance, including all the partitions that
> should be assigned to the member but not yet assigned since they have not
> been revoked by others. In that case the call ordering would be:
>
> Interesting. Is there a case for Streams where having the full
> assignment is beneficial? For instance, I can think of the following
> case. When a standby task is promoted to an active task, the metadata
> would not contain the standby task anymore and the assignment may not
> have the partition yet. In this case, Streams would stop the standby
> tasks but not have the active task yet if my understanding of Streams
> is correct. So knowing the full assignment could be helpful here.
>
> If we want to do this, we could structure the assignment given to the
> member as follow: version, error, metadata, assigned partitions,
> pending partitions, where the pending partitions would be the one
> assigned to this member but not yet available. What do you think?
>
> Regarding onAssignment being called only once, I am not sure to fully
> grasp the benefit yet. Does the assignor really care about this? In
> the end, the epoch does not really matter for the assignor because it
> has to converge its state to the desired state anyway.
>
> Here's my rationale (maybe rephased a bit :P ): the implementers of
rebalance listener and assignor are two groups of people, and most users
fall into the former group, while only very few people fall into the later
group. For rebalance listener implementers, they just want to know when a
partition is actually revoked or assigned to the consumer and reacts to it,
for this purpose, `onPartitionsRevoked` and `onPartitionsAssigned` would be
triggered interleavingly upon `poll` calls across rebalances. The usual
logic for such rebalance listeners are metrics reporting, committing
offsets (if they do not use Kafka for that), etc. They would not care which
calls are from which rebalances --- in the past with eager rebalance, it
maybe that each rebalance is associated with exactly a
`onPartitionsRevoked` first and then a `onPartitionsAssigned`, but it would
no longer the cases now.

The implementers of the assignor though, would care about "how the
assignment was made", that includes from which rebalance a certain
revoke/assign decision was made, based on what metadata such assignment is

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-23 Thread David Jacot
Hey,

> Just to clarify I was asking about the `version` of the assignor (i.e. up
to what version that the client would support), and I do agree we would not
need metadata. What I have in mind is that, for some specific built-in
broker-assignors, e.g. rack-aware assignors, if it's possible that in a
newer version we would have a hierarchical rack ID string format, like
"tier1-tier2" etc, but if some client has not upgraded their rack ID
would still be in old format. In this case, the broker then needs to choose
the old versioned assignor. I'm probably making something up here for rack
aware assignors, but I'm wondering if in general such an "auto-downgrade"
behavior would be needed still for broker-side assignor, and if yes would
"version" still be useful.

Got it. That's an interesting thought. I think that the issue is that
the client will never tell you which version of the server-side
assignor should be used. Do you think that the coordinator would
downgrade the version if the assignment fails with a higher version? I
tend to believe that this should be handled within the assignor
itself. In the example that you mentioned, the assignor would have to
handle all the cases. I am not really convinced that we need this at
the moment.

> Okay, my understanding is that the calling ordering of these callbacks
would be like the following:

Yes, your examples look right.

> I'm wondering if we would still call onAssignment just once, that encodes
all the assignment for this rebalance, including all the partitions that
should be assigned to the member but not yet assigned since they have not
been revoked by others. In that case the call ordering would be:

Interesting. Is there a case for Streams where having the full
assignment is beneficial? For instance, I can think of the following
case. When a standby task is promoted to an active task, the metadata
would not contain the standby task anymore and the assignment may not
have the partition yet. In this case, Streams would stop the standby
tasks but not have the active task yet if my understanding of Streams
is correct. So knowing the full assignment could be helpful here.

If we want to do this, we could structure the assignment given to the
member as follow: version, error, metadata, assigned partitions,
pending partitions, where the pending partitions would be the one
assigned to this member but not yet available. What do you think?

Regarding onAssignment being called only once, I am not sure to fully
grasp the benefit yet. Does the assignor really care about this? In
the end, the epoch does not really matter for the assignor because it
has to converge its state to the desired state anyway.

Best,
David

On Thu, Sep 22, 2022 at 6:01 PM Guozhang Wang  wrote:
>
> Hi David, thanks for all the detailed explanations. I think they all make
> sense. Just want to have a couple follow-ups here:
>
> > I don't really see the benefits here because server side assignors
> don't have metadata at all. They only assign topic-partitions. They
> are not supposed to generate metadata nor to receive metadata from the
> members.
>
> Just to clarify I was asking about the `version` of the assignor (i.e. up
> to what version that the client would support), and I do agree we would not
> need metadata. What I have in mind is that, for some specific built-in
> broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> newer version we would have a hierarchical rack ID string format, like
> "tier1-tier2" etc, but if some client has not upgraded their rack ID
> would still be in old format. In this case, the broker then needs to choose
> the old versioned assignor. I'm probably making something up here for rack
> aware assignors, but I'm wondering if in general such an "auto-downgrade"
> behavior would be needed still for broker-side assignor, and if yes would
> "version" still be useful.
>
> > Yeah, that's right. Within a rebalance, `onAssignment` is called once
> when the member transitions to a new epoch. This one contains the full
> metadata provided by the client side assignor. Then, `onAssignment`
> can be called max N times where N is the number of partitions pending
> revocation by other members. Let me try to clarify this in the KIP.
>
> Okay, my understanding is that the calling ordering of these callbacks
> would be like the following:
>
> 
> onPartitionsRevoked();   // just once, since we do not really need
> to revoke incrementally.
>
> onAssignment();// the first call, with epoch incremented
> onPartitionsAssigned();   // paired with the onAssignment
>
> onAssignment();  // the first onAssignment would bump up the
> epoch, and the metadata reflected.
> onPartitionsAssigned();   // each time we get an additional assignment, we
> call onAssignment and then paired with an onPartitionsAssigned
> ...
> onAssignment();
> onPartitionsAssigned();   // on each of the onAssignment calls, the encoded
> metadata 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-13 Thread David Jacot
47. group.consumer.session.timeout.ms: how does the client communicate
> its session timeout config to the broker?
>
> 48. There still seems to be some typos in the examples. For example, the
> first reference of the following seems incorrect.
> B - epoch=2, partitions=[foo-2]
> C - epoch=3, partitions=[foo-1]
> C - epoch=22, partitions=[foo-2, foo-5]
>
> Thanks,
>
> Jun
>
> On Mon, Sep 12, 2022 at 11:42 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
> hgerald...@bloomberg.net> wrote:
>
> > Yes, I agree that leadership could be modeled after partition
> > assignment(s).  However, I can think of some expanded versions of the
> > 'leader election' use case that exist today in Schema Registry.
> >
> > The advantage of creating a different 'type' that isn't necessarily tied
> > to topics/partitions (and is used only for resource management) would be
> > that we can scale the number of resources it handles (akin to a connect
> > cluster increasing the number of connectors/tasks) without having to change
> > topics/partitions, as these partitions will never have any data (and can't
> > be shrunk), they will be used just for leadership.
> >
> > This is in the spirit of KIP-795. We can table this discussion for after
> > the Connect discussion, as there will be more clarity on how extending the
> > new protocol will look like.
> >
> > From: dev@kafka.apache.org At: 09/12/22 07:58:32 UTC-4:00To:
> > dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer
> > Rebalance Protocol
> >
> > Hi Hector,
> >
> > We definitely need to share internals with Connect APIs. That model
> > would not scale otherwise.
> >
> > Regarding the schema registry, I wonder if we could just use the new
> > protocol. At the end of the day, the schema registry wants to elect a
> > single writer for a partition and the owner of the partition can be
> > considered as the leader. I haven't really tried this out but that
> > seems doable. What do you think?
> >
> > Best,
> > David
> >
> > On Fri, Sep 9, 2022 at 8:45 PM Hector Geraldino (BLOOMBERG/ 919 3RD A)
> >  wrote:
> > >
> > > So it seems there's a consensus on having dedicated APIs for Connect,
> > which
> > means having data model (group, member, assignment) and APIs (heartbeat
> > request/response, assignment prepare and install) tailored specifically to
> > connect. I wonder if adding support for other coordinator group types
> > (e.g.
> > leadership, in the case of schema registry) will require similar assets
> > (new
> > model classes to express members and resources, custom heartbeats and
> > assignment prepare/install APIs).
> > >
> > > I think that, as new use cases are considered, the core primitives of
> > the new
> > protocol will be generalized, so new types don't have to implement the
> > whole
> > stack (e.g. state machines), but only functions like detecting group
> > metadata
> > changes, or computing assignments of the resources handled by each type
> > (Topic/Partitions in the case of consumer, Connector/Task in the case of
> > Connect, Leadership in the case of Schema Registry, and so on).
> > >
> > >
> > > From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To:
> > dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer
> > Rebalance
> > Protocol
> > >
> > > Thank you Guozhang/David for the feedback. Looks like there's agreement
> > on
> > > using separate APIs for Connect. I would revisit the doc and see what
> > > changes are to be made.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
> > > wrote:
> > >
> > > > Hi Sagar,
> > > >
> > > > Thanks for the feedback and the document. That's really helpful. I
> > > > will take a look at it.
> > > >
> > > > Overall, it seems to me that both Connect and the Consumer could share
> > > > the same underlying "engine". The main difference is that the Consumer
> > > > assigns topic-partitions to members whereas Connect assigns tasks to
> > > > workers. I see two ways to move forward:
> > > > 1) We extend the new proposed APIs to support different resource types
> > > > (e.g. partitions, tasks, etc.); or
> > > > 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> > > > similar to the new 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-12 Thread Jun Rao
Hi, David,

Thanks for the reply. A few more comments.

40. A member updates its assignors' reason to trigger a rebalance: What API
in the consumer does the power user use to change the assignor reason?

41. partition.assignment.strategy defaults class
org.apache.kafka.clients.consumer.RangeAssignor,class
org.apache.kafka.clients.consumer.CooperativeStickyAssignor. Does that
still make sense since the assignor name on the broker is not the full
class name? Should it default to a strategy determined by the broker?

42. "This means that the assignment metadata may already reference
partitions which are not assigned to the member yet." Not sure I
follow this one.

43. ConsumerGroupHeartbeat:
43.1 Do we need an ErrorCode to indicate that the group state is being
loaded (during coordinator failover)? If so, do we need the same ErrorCode
for other requests?
43.2 COMPUTE_ASSIGNMENT : It's a bit weird to represent this as an error
code since there is no error. Could we represent that in the payload of
ConsumerGroupHeartbeat response?

44. ConsumerGroupPrepareAssignmentResponse:
44.1 Why does it have AssignorName since the client decides the assignor?
44.2 TopicPartitions: The target topic-partitions of the member. Should
that be the topic partition currently owned by the member?

45. List group: Should the ACL be Describe (Cluster) instead of Describe
Group?

46. AlterIncrementalConfigs: what group configs could be changed?

47. group.consumer.session.timeout.ms: how does the client communicate
its session timeout config to the broker?

48. There still seems to be some typos in the examples. For example, the
first reference of the following seems incorrect.
B - epoch=2, partitions=[foo-2]
C - epoch=3, partitions=[foo-1]
C - epoch=22, partitions=[foo-2, foo-5]

Thanks,

Jun

On Mon, Sep 12, 2022 at 11:42 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Yes, I agree that leadership could be modeled after partition
> assignment(s).  However, I can think of some expanded versions of the
> 'leader election' use case that exist today in Schema Registry.
>
> The advantage of creating a different 'type' that isn't necessarily tied
> to topics/partitions (and is used only for resource management) would be
> that we can scale the number of resources it handles (akin to a connect
> cluster increasing the number of connectors/tasks) without having to change
> topics/partitions, as these partitions will never have any data (and can't
> be shrunk), they will be used just for leadership.
>
> This is in the spirit of KIP-795. We can table this discussion for after
> the Connect discussion, as there will be more clarity on how extending the
> new protocol will look like.
>
> From: dev@kafka.apache.org At: 09/12/22 07:58:32 UTC-4:00To:
> dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer
> Rebalance Protocol
>
> Hi Hector,
>
> We definitely need to share internals with Connect APIs. That model
> would not scale otherwise.
>
> Regarding the schema registry, I wonder if we could just use the new
> protocol. At the end of the day, the schema registry wants to elect a
> single writer for a partition and the owner of the partition can be
> considered as the leader. I haven't really tried this out but that
> seems doable. What do you think?
>
> Best,
> David
>
> On Fri, Sep 9, 2022 at 8:45 PM Hector Geraldino (BLOOMBERG/ 919 3RD A)
>  wrote:
> >
> > So it seems there's a consensus on having dedicated APIs for Connect,
> which
> means having data model (group, member, assignment) and APIs (heartbeat
> request/response, assignment prepare and install) tailored specifically to
> connect. I wonder if adding support for other coordinator group types
> (e.g.
> leadership, in the case of schema registry) will require similar assets
> (new
> model classes to express members and resources, custom heartbeats and
> assignment prepare/install APIs).
> >
> > I think that, as new use cases are considered, the core primitives of
> the new
> protocol will be generalized, so new types don't have to implement the
> whole
> stack (e.g. state machines), but only functions like detecting group
> metadata
> changes, or computing assignments of the resources handled by each type
> (Topic/Partitions in the case of consumer, Connector/Task in the case of
> Connect, Leadership in the case of Schema Registry, and so on).
> >
> >
> > From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To:
> dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer
> Rebalance
> Protocol
> >
> > Thank you Guozhang/David for the feedback. Looks like there's agreement
> on
> > using separate APIs for Connect. I would revisit 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-12 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Yes, I agree that leadership could be modeled after partition assignment(s).  
However, I can think of some expanded versions of the 'leader election' use 
case that exist today in Schema Registry. 

The advantage of creating a different 'type' that isn't necessarily tied to 
topics/partitions (and is used only for resource management) would be that we 
can scale the number of resources it handles (akin to a connect cluster 
increasing the number of connectors/tasks) without having to change 
topics/partitions, as these partitions will never have any data (and can't be 
shrunk), they will be used just for leadership.

This is in the spirit of KIP-795. We can table this discussion for after the 
Connect discussion, as there will be more clarity on how extending the new 
protocol will look like.

From: dev@kafka.apache.org At: 09/12/22 07:58:32 UTC-4:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance 
Protocol

Hi Hector,

We definitely need to share internals with Connect APIs. That model
would not scale otherwise.

Regarding the schema registry, I wonder if we could just use the new
protocol. At the end of the day, the schema registry wants to elect a
single writer for a partition and the owner of the partition can be
considered as the leader. I haven't really tried this out but that
seems doable. What do you think?

Best,
David

On Fri, Sep 9, 2022 at 8:45 PM Hector Geraldino (BLOOMBERG/ 919 3RD A)
 wrote:
>
> So it seems there's a consensus on having dedicated APIs for Connect, which 
means having data model (group, member, assignment) and APIs (heartbeat 
request/response, assignment prepare and install) tailored specifically to 
connect. I wonder if adding support for other coordinator group types (e.g. 
leadership, in the case of schema registry) will require similar assets (new 
model classes to express members and resources, custom heartbeats and 
assignment prepare/install APIs).
>
> I think that, as new use cases are considered, the core primitives of the new 
protocol will be generalized, so new types don't have to implement the whole 
stack (e.g. state machines), but only functions like detecting group metadata 
changes, or computing assignments of the resources handled by each type 
(Topic/Partitions in the case of consumer, Connector/Task in the case of 
Connect, Leadership in the case of Schema Registry, and so on).
>
>
> From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To:  
dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance 
Protocol
>
> Thank you Guozhang/David for the feedback. Looks like there's agreement on
> using separate APIs for Connect. I would revisit the doc and see what
> changes are to be made.
>
> Thanks!
> Sagar.
>
> On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
> wrote:
>
> > Hi Sagar,
> >
> > Thanks for the feedback and the document. That's really helpful. I
> > will take a look at it.
> >
> > Overall, it seems to me that both Connect and the Consumer could share
> > the same underlying "engine". The main difference is that the Consumer
> > assigns topic-partitions to members whereas Connect assigns tasks to
> > workers. I see two ways to move forward:
> > 1) We extend the new proposed APIs to support different resource types
> > (e.g. partitions, tasks, etc.); or
> > 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> > similar to the new ones but different on the content/resources and
> > they would rely on the same engine on the coordinator side.
> >
> > I personally lean towards 2) because I am not a fan of overcharging
> > APIs to serve different purposes. That being said, I am not opposed to
> > 1) if we can find an elegant way to do it.
> >
> > I think that we can continue to discuss it here for now in order to
> > ensure that this KIP is compatible with what we will do for Connect in
> > the future.
> >
> > Best,
> > David
> >
> > On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
> > >
> > > Hi all,
> > >
> > > I am back from vacation. I will go through and address your comments
> > > in the coming days. Thanks for your feedback.
> > >
> > > Cheers,
> > > David
> > >
> > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris 
> > wrote:
> > > >
> > > > Hey All!
> > > >
> > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing
> > making it
> > > > down the stack!
> > > >
> > > > I had a few questions:
> > > >
> > > > 1. The 'Rejected Alternatives' section describes how member epoch
> 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-12 Thread David Jacot
Hi Guozhang,

1. I have added a reference to the relevant chapter instead of
repeating the whole thing. Does that work for you?

2. The "Rebalance Triggers" section you are referring to is about when
a rebalance should be triggered for the non-upgraded members using the
old protocol. The section mentions that a rebalance must be triggered
when a new assignment is installed. This implies that the group epoch
was updated either by a native member or a non-upgraded member. For
the latter, the JoinGroup request would be the trigger. I have added a
reference to the relevant chapter in the "JoinGroup Handling" section
as well. Does that make sense?

Thanks,
David

On Fri, Sep 9, 2022 at 10:35 PM Guozhang Wang  wrote:
>
> Hello David,
>
> Alright I think that's sufficient. Just to make that clear in the doc,
> could we update:
>
> 1) the heartbeat request handling section, stating when coordinator will
> trigger rebalance based on the HB's member metadata / reason?
> 2) the "Rebalance Triggers" section to include what we described in "Group
> Epoch - Trigger a rebalance" section as well?
>
>
> Guozhang
>
> On Fri, Sep 9, 2022 at 1:28 AM David Jacot 
> wrote:
>
> > Hi Guozhang,
> >
> > I thought that the assignor will always be consulted when the next
> > heartbeat request is constructed. In other words,
> > `PartitionAssignor#metadata` will be called for every heartbeat. This
> > gives the opportunity for the assignor to enforce a rebalance by
> > setting the reason to a non-zero value or by changing the bytes. Do
> > you think that this is not sufficient? Are you concerned by the delay?
> >
> > Best,
> > David
> >
> > On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang  wrote:
> > >
> > > Hello David,
> > >
> > > One of Jun's comments make me thinking:
> > >
> > > ```
> > > In this case, a new assignment is triggered by the client side
> > > assignor. When constructing the HB, the consumer will always consult
> > > the client side assignor and propagate the information to the group
> > > coordinator. In other words, we don't expect users to call
> > > Consumer#enforceRebalance anymore.
> > > ```
> > >
> > > As I looked at the current PartitionAssignor's interface, we actually do
> > > not have a way yet to instruct how to construct the next HB request, e.g.
> > > when the assignor wants to enforce a new rebalance with a new assignment,
> > > we'd need some customizable APIs inside the PartitionAssignor to indicate
> > > the next HB telling broker about so. WDYT about adding such an API on the
> > > PartitionAssignor?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Sep 6, 2022 at 6:09 AM David Jacot 
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > I have updated the KIP to include your feedback. I have also tried to
> > > > clarify the parts which were not cleared.
> > > >
> > > > Best,
> > > > David
> > > >
> > > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot 
> > wrote:
> > > > >
> > > > > Hi Jun,
> > > > >
> > > > > Thanks for your feedback. Let me start by answering your questions
> > > > > inline and I will update the KIP next week.
> > > > >
> > > > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to
> > be
> > > > fewer
> > > > > > RPCs during rebalance and more efficient support of wildcard. A few
> > > > > > comments below.
> > > > >
> > > > > I would also add that the KIP removes the global sync barrier in the
> > > > > protocol which is essential to improve group stability and
> > > > > scalability, and the KIP also simplifies the client by moving most of
> > > > > the logic to the server side.
> > > > >
> > > > > > 30. ConsumerGroupHeartbeatRequest
> > > > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling
> > > > changing
> > > > > > of the partition assignor in the consumers?
> > > > >
> > > > > Definitely. The group coordinator will use the assignor used by a
> > > > > majority of the members. This allows the group to move from one
> > > > > assignor to another by a roll. This is explained in the Assignor
> > > > > Selection chapter.
> > > > >
> > > > > > 30.2 For each field, could you explain whether it's required in
> > every
> > > > > > request or the scenarios when it needs to be filled? For example,
> > it's
> > > > not
> > > > > > clear to me when TopicPartitions needs to be filled.
> > > > >
> > > > > The client is expected to set those fields in case of a connection
> > > > > issue (e.g. timeout) or when the fields have changed since the last
> > > > > HB. The server populates those fields as long as the member is not
> > > > > fully reconciled - the member should acknowledge that it has the
> > > > > expected epoch and assignment. I will clarify this in the KIP.
> > > > >
> > > > > > 31. In the current consumer protocol, the rack affinity between the
> > > > client
> > > > > > and the broker is only considered during fetching, but not during
> > > > assigning
> > > > > > partitions to consumers. Sometimes, once the assignment is made,
> > there
> > > > 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-12 Thread David Jacot
Hi Hector,

We definitely need to share internals with Connect APIs. That model
would not scale otherwise.

Regarding the schema registry, I wonder if we could just use the new
protocol. At the end of the day, the schema registry wants to elect a
single writer for a partition and the owner of the partition can be
considered as the leader. I haven't really tried this out but that
seems doable. What do you think?

Best,
David

On Fri, Sep 9, 2022 at 8:45 PM Hector Geraldino (BLOOMBERG/ 919 3RD A)
 wrote:
>
> So it seems there's a consensus on having dedicated APIs for Connect, which 
> means having data model (group, member, assignment) and APIs (heartbeat 
> request/response, assignment prepare and install) tailored specifically to 
> connect. I wonder if adding support for other coordinator group types (e.g. 
> leadership, in the case of schema registry) will require similar assets (new 
> model classes to express members and resources, custom heartbeats and 
> assignment prepare/install APIs).
>
> I think that, as new use cases are considered, the core primitives of the new 
> protocol will be generalized, so new types don't have to implement the whole 
> stack (e.g. state machines), but only functions like detecting group metadata 
> changes, or computing assignments of the resources handled by each type 
> (Topic/Partitions in the case of consumer, Connector/Task in the case of 
> Connect, Leadership in the case of Schema Registry, and so on).
>
>
> From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To:  
> dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance 
> Protocol
>
> Thank you Guozhang/David for the feedback. Looks like there's agreement on
> using separate APIs for Connect. I would revisit the doc and see what
> changes are to be made.
>
> Thanks!
> Sagar.
>
> On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
> wrote:
>
> > Hi Sagar,
> >
> > Thanks for the feedback and the document. That's really helpful. I
> > will take a look at it.
> >
> > Overall, it seems to me that both Connect and the Consumer could share
> > the same underlying "engine". The main difference is that the Consumer
> > assigns topic-partitions to members whereas Connect assigns tasks to
> > workers. I see two ways to move forward:
> > 1) We extend the new proposed APIs to support different resource types
> > (e.g. partitions, tasks, etc.); or
> > 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> > similar to the new ones but different on the content/resources and
> > they would rely on the same engine on the coordinator side.
> >
> > I personally lean towards 2) because I am not a fan of overcharging
> > APIs to serve different purposes. That being said, I am not opposed to
> > 1) if we can find an elegant way to do it.
> >
> > I think that we can continue to discuss it here for now in order to
> > ensure that this KIP is compatible with what we will do for Connect in
> > the future.
> >
> > Best,
> > David
> >
> > On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
> > >
> > > Hi all,
> > >
> > > I am back from vacation. I will go through and address your comments
> > > in the coming days. Thanks for your feedback.
> > >
> > > Cheers,
> > > David
> > >
> > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris 
> > wrote:
> > > >
> > > > Hey All!
> > > >
> > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing
> > making it
> > > > down the stack!
> > > >
> > > > I had a few questions:
> > > >
> > > > 1. The 'Rejected Alternatives' section describes how member epoch
> > should
> > > > advance in step with the group epoch and assignment epoch values. I
> > think
> > > > that this is a good idea for the reasons described in the KIP. When the
> > > > protocol is incrementally assigning partitions to a worker, what member
> > > > epoch does each incremental assignment use? Are member epochs re-used,
> > and
> > > > a single member epoch can correspond to multiple different
> > (monotonically
> > > > larger) assignments?
> > > >
> > > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> > > > not, should custom client-side assignor implementations interact with
> > the
> > > > Reason field, and how is its common meaning agreed upon? If so, what
> > is the
> > > > benefit of a distin

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread Guozhang Wang
Hello David,

Alright I think that's sufficient. Just to make that clear in the doc,
could we update:

1) the heartbeat request handling section, stating when coordinator will
trigger rebalance based on the HB's member metadata / reason?
2) the "Rebalance Triggers" section to include what we described in "Group
Epoch - Trigger a rebalance" section as well?


Guozhang

On Fri, Sep 9, 2022 at 1:28 AM David Jacot 
wrote:

> Hi Guozhang,
>
> I thought that the assignor will always be consulted when the next
> heartbeat request is constructed. In other words,
> `PartitionAssignor#metadata` will be called for every heartbeat. This
> gives the opportunity for the assignor to enforce a rebalance by
> setting the reason to a non-zero value or by changing the bytes. Do
> you think that this is not sufficient? Are you concerned by the delay?
>
> Best,
> David
>
> On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang  wrote:
> >
> > Hello David,
> >
> > One of Jun's comments make me thinking:
> >
> > ```
> > In this case, a new assignment is triggered by the client side
> > assignor. When constructing the HB, the consumer will always consult
> > the client side assignor and propagate the information to the group
> > coordinator. In other words, we don't expect users to call
> > Consumer#enforceRebalance anymore.
> > ```
> >
> > As I looked at the current PartitionAssignor's interface, we actually do
> > not have a way yet to instruct how to construct the next HB request, e.g.
> > when the assignor wants to enforce a new rebalance with a new assignment,
> > we'd need some customizable APIs inside the PartitionAssignor to indicate
> > the next HB telling broker about so. WDYT about adding such an API on the
> > PartitionAssignor?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Sep 6, 2022 at 6:09 AM David Jacot 
> > wrote:
> >
> > > Hi Jun,
> > >
> > > I have updated the KIP to include your feedback. I have also tried to
> > > clarify the parts which were not cleared.
> > >
> > > Best,
> > > David
> > >
> > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot 
> wrote:
> > > >
> > > > Hi Jun,
> > > >
> > > > Thanks for your feedback. Let me start by answering your questions
> > > > inline and I will update the KIP next week.
> > > >
> > > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to
> be
> > > fewer
> > > > > RPCs during rebalance and more efficient support of wildcard. A few
> > > > > comments below.
> > > >
> > > > I would also add that the KIP removes the global sync barrier in the
> > > > protocol which is essential to improve group stability and
> > > > scalability, and the KIP also simplifies the client by moving most of
> > > > the logic to the server side.
> > > >
> > > > > 30. ConsumerGroupHeartbeatRequest
> > > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling
> > > changing
> > > > > of the partition assignor in the consumers?
> > > >
> > > > Definitely. The group coordinator will use the assignor used by a
> > > > majority of the members. This allows the group to move from one
> > > > assignor to another by a roll. This is explained in the Assignor
> > > > Selection chapter.
> > > >
> > > > > 30.2 For each field, could you explain whether it's required in
> every
> > > > > request or the scenarios when it needs to be filled? For example,
> it's
> > > not
> > > > > clear to me when TopicPartitions needs to be filled.
> > > >
> > > > The client is expected to set those fields in case of a connection
> > > > issue (e.g. timeout) or when the fields have changed since the last
> > > > HB. The server populates those fields as long as the member is not
> > > > fully reconciled - the member should acknowledge that it has the
> > > > expected epoch and assignment. I will clarify this in the KIP.
> > > >
> > > > > 31. In the current consumer protocol, the rack affinity between the
> > > client
> > > > > and the broker is only considered during fetching, but not during
> > > assigning
> > > > > partitions to consumers. Sometimes, once the assignment is made,
> there
> > > is
> > > > > no opportunity for read affinity because no replicas of assigned
> > > partitions
> > > > > are close to the member. I am wondering if we should use this
> > > opportunity
> > > > > to address this by including rack in GroupMember.
> > > >
> > > > That's an interesting idea. I don't see any issue with adding the
> rack
> > > > to the members. I will do so.
> > > >
> > > > > 32. On the metric side, often, it's useful to know how busy a group
> > > > > coordinator is. By moving the event loop model, it seems that we
> could
> > > add
> > > > > a metric that tracks the fraction of the time the event loop is
> doing
> > > the
> > > > > actual work.
> > > >
> > > > That's a great idea. I will add it. Thanks.
> > > >
> > > > > 33. Could we add a section on coordinator failover handling? For
> > > example,
> > > > > does it need to trigger the check if any group with the wildcard
> > > > > subscription now has a new 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
So it seems there's a consensus on having dedicated APIs for Connect, which 
means having data model (group, member, assignment) and APIs (heartbeat 
request/response, assignment prepare and install) tailored specifically to 
connect. I wonder if adding support for other coordinator group types (e.g. 
leadership, in the case of schema registry) will require similar assets (new 
model classes to express members and resources, custom heartbeats and 
assignment prepare/install APIs).

I think that, as new use cases are considered, the core primitives of the new 
protocol will be generalized, so new types don't have to implement the whole 
stack (e.g. state machines), but only functions like detecting group metadata 
changes, or computing assignments of the resources handled by each type 
(Topic/Partitions in the case of consumer, Connector/Task in the case of 
Connect, Leadership in the case of Schema Registry, and so on).


From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance 
Protocol

Thank you Guozhang/David for the feedback. Looks like there's agreement on
using separate APIs for Connect. I would revisit the doc and see what
changes are to be made.

Thanks!
Sagar.

On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
wrote:

> Hi Sagar,
>
> Thanks for the feedback and the document. That's really helpful. I
> will take a look at it.
>
> Overall, it seems to me that both Connect and the Consumer could share
> the same underlying "engine". The main difference is that the Consumer
> assigns topic-partitions to members whereas Connect assigns tasks to
> workers. I see two ways to move forward:
> 1) We extend the new proposed APIs to support different resource types
> (e.g. partitions, tasks, etc.); or
> 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> similar to the new ones but different on the content/resources and
> they would rely on the same engine on the coordinator side.
>
> I personally lean towards 2) because I am not a fan of overcharging
> APIs to serve different purposes. That being said, I am not opposed to
> 1) if we can find an elegant way to do it.
>
> I think that we can continue to discuss it here for now in order to
> ensure that this KIP is compatible with what we will do for Connect in
> the future.
>
> Best,
> David
>
> On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
> >
> > Hi all,
> >
> > I am back from vacation. I will go through and address your comments
> > in the coming days. Thanks for your feedback.
> >
> > Cheers,
> > David
> >
> > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris 
> wrote:
> > >
> > > Hey All!
> > >
> > > Thanks for the KIP, it's wonderful to see cooperative rebalancing
> making it
> > > down the stack!
> > >
> > > I had a few questions:
> > >
> > > 1. The 'Rejected Alternatives' section describes how member epoch
> should
> > > advance in step with the group epoch and assignment epoch values. I
> think
> > > that this is a good idea for the reasons described in the KIP. When the
> > > protocol is incrementally assigning partitions to a worker, what member
> > > epoch does each incremental assignment use? Are member epochs re-used,
> and
> > > a single member epoch can correspond to multiple different
> (monotonically
> > > larger) assignments?
> > >
> > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> > > not, should custom client-side assignor implementations interact with
> the
> > > Reason field, and how is its common meaning agreed upon? If so, what
> is the
> > > benefit of a distinct Reason field over including such functionality
> in the
> > > opaque metadata?
> > >
> > > 3. The following is included in the KIP: "Thanks to this, the input of
> the
> > > client side assignor is entirely driven by the group coordinator. The
> > > consumer is no longer responsible for maintaining any state besides its
> > > assigned partitions." Does this mean that the client-side assignor MAY
> > > incorporate additional non-Metadata state (such as partition
> throughput,
> > > cpu/memory metrics, config topics, etc), or that additional
> non-Metadata
> > > state SHOULD NOT be used?
> > >
> > > 4. I see that there are separate classes
> > > for org.apache.kafka.server.group.consumer.PartitionAssignor
> > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> > > overlap significantly. Is it possible for these tw

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-09 Thread David Jacot
Hi Guozhang,

I thought that the assignor will always be consulted when the next
heartbeat request is constructed. In other words,
`PartitionAssignor#metadata` will be called for every heartbeat. This
gives the opportunity for the assignor to enforce a rebalance by
setting the reason to a non-zero value or by changing the bytes. Do
you think that this is not sufficient? Are you concerned by the delay?

Best,
David

On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang  wrote:
>
> Hello David,
>
> One of Jun's comments make me thinking:
>
> ```
> In this case, a new assignment is triggered by the client side
> assignor. When constructing the HB, the consumer will always consult
> the client side assignor and propagate the information to the group
> coordinator. In other words, we don't expect users to call
> Consumer#enforceRebalance anymore.
> ```
>
> As I looked at the current PartitionAssignor's interface, we actually do
> not have a way yet to instruct how to construct the next HB request, e.g.
> when the assignor wants to enforce a new rebalance with a new assignment,
> we'd need some customizable APIs inside the PartitionAssignor to indicate
> the next HB telling broker about so. WDYT about adding such an API on the
> PartitionAssignor?
>
>
> Guozhang
>
>
> On Tue, Sep 6, 2022 at 6:09 AM David Jacot 
> wrote:
>
> > Hi Jun,
> >
> > I have updated the KIP to include your feedback. I have also tried to
> > clarify the parts which were not cleared.
> >
> > Best,
> > David
> >
> > On Fri, Sep 2, 2022 at 4:18 PM David Jacot  wrote:
> > >
> > > Hi Jun,
> > >
> > > Thanks for your feedback. Let me start by answering your questions
> > > inline and I will update the KIP next week.
> > >
> > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to be
> > fewer
> > > > RPCs during rebalance and more efficient support of wildcard. A few
> > > > comments below.
> > >
> > > I would also add that the KIP removes the global sync barrier in the
> > > protocol which is essential to improve group stability and
> > > scalability, and the KIP also simplifies the client by moving most of
> > > the logic to the server side.
> > >
> > > > 30. ConsumerGroupHeartbeatRequest
> > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling
> > changing
> > > > of the partition assignor in the consumers?
> > >
> > > Definitely. The group coordinator will use the assignor used by a
> > > majority of the members. This allows the group to move from one
> > > assignor to another by a roll. This is explained in the Assignor
> > > Selection chapter.
> > >
> > > > 30.2 For each field, could you explain whether it's required in every
> > > > request or the scenarios when it needs to be filled? For example, it's
> > not
> > > > clear to me when TopicPartitions needs to be filled.
> > >
> > > The client is expected to set those fields in case of a connection
> > > issue (e.g. timeout) or when the fields have changed since the last
> > > HB. The server populates those fields as long as the member is not
> > > fully reconciled - the member should acknowledge that it has the
> > > expected epoch and assignment. I will clarify this in the KIP.
> > >
> > > > 31. In the current consumer protocol, the rack affinity between the
> > client
> > > > and the broker is only considered during fetching, but not during
> > assigning
> > > > partitions to consumers. Sometimes, once the assignment is made, there
> > is
> > > > no opportunity for read affinity because no replicas of assigned
> > partitions
> > > > are close to the member. I am wondering if we should use this
> > opportunity
> > > > to address this by including rack in GroupMember.
> > >
> > > That's an interesting idea. I don't see any issue with adding the rack
> > > to the members. I will do so.
> > >
> > > > 32. On the metric side, often, it's useful to know how busy a group
> > > > coordinator is. By moving the event loop model, it seems that we could
> > add
> > > > a metric that tracks the fraction of the time the event loop is doing
> > the
> > > > actual work.
> > >
> > > That's a great idea. I will add it. Thanks.
> > >
> > > > 33. Could we add a section on coordinator failover handling? For
> > example,
> > > > does it need to trigger the check if any group with the wildcard
> > > > subscription now has a new matching topic?
> > >
> > > Sure. When the new group coordinator takes over, it has to:
> > > * Setup the session timeouts.
> > > * Trigger a new assignment if a client side assignor is used. We don't
> > > store the information about the member selected to run the assignment
> > > so we have to start a new one.
> > > * Update the topics metadata, verify the wildcard subscriptions, and
> > > trigger a rebalance if needed.
> > >
> > > > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue,
> > > > ConsumerGroupMemberMetadataValue: Could we document what the epoch
> > field
> > > > reflects? For example, does the epoch in ConsumerGroupMetadataValue
> 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-08 Thread Guozhang Wang
Hello David,

One of Jun's comments make me thinking:

```
In this case, a new assignment is triggered by the client side
assignor. When constructing the HB, the consumer will always consult
the client side assignor and propagate the information to the group
coordinator. In other words, we don't expect users to call
Consumer#enforceRebalance anymore.
```

As I looked at the current PartitionAssignor's interface, we actually do
not have a way yet to instruct how to construct the next HB request, e.g.
when the assignor wants to enforce a new rebalance with a new assignment,
we'd need some customizable APIs inside the PartitionAssignor to indicate
the next HB telling broker about so. WDYT about adding such an API on the
PartitionAssignor?


Guozhang


On Tue, Sep 6, 2022 at 6:09 AM David Jacot 
wrote:

> Hi Jun,
>
> I have updated the KIP to include your feedback. I have also tried to
> clarify the parts which were not cleared.
>
> Best,
> David
>
> On Fri, Sep 2, 2022 at 4:18 PM David Jacot  wrote:
> >
> > Hi Jun,
> >
> > Thanks for your feedback. Let me start by answering your questions
> > inline and I will update the KIP next week.
> >
> > > Thanks for the KIP. Overall, the main benefits of the KIP seem to be
> fewer
> > > RPCs during rebalance and more efficient support of wildcard. A few
> > > comments below.
> >
> > I would also add that the KIP removes the global sync barrier in the
> > protocol which is essential to improve group stability and
> > scalability, and the KIP also simplifies the client by moving most of
> > the logic to the server side.
> >
> > > 30. ConsumerGroupHeartbeatRequest
> > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling
> changing
> > > of the partition assignor in the consumers?
> >
> > Definitely. The group coordinator will use the assignor used by a
> > majority of the members. This allows the group to move from one
> > assignor to another by a roll. This is explained in the Assignor
> > Selection chapter.
> >
> > > 30.2 For each field, could you explain whether it's required in every
> > > request or the scenarios when it needs to be filled? For example, it's
> not
> > > clear to me when TopicPartitions needs to be filled.
> >
> > The client is expected to set those fields in case of a connection
> > issue (e.g. timeout) or when the fields have changed since the last
> > HB. The server populates those fields as long as the member is not
> > fully reconciled - the member should acknowledge that it has the
> > expected epoch and assignment. I will clarify this in the KIP.
> >
> > > 31. In the current consumer protocol, the rack affinity between the
> client
> > > and the broker is only considered during fetching, but not during
> assigning
> > > partitions to consumers. Sometimes, once the assignment is made, there
> is
> > > no opportunity for read affinity because no replicas of assigned
> partitions
> > > are close to the member. I am wondering if we should use this
> opportunity
> > > to address this by including rack in GroupMember.
> >
> > That's an interesting idea. I don't see any issue with adding the rack
> > to the members. I will do so.
> >
> > > 32. On the metric side, often, it's useful to know how busy a group
> > > coordinator is. By moving the event loop model, it seems that we could
> add
> > > a metric that tracks the fraction of the time the event loop is doing
> the
> > > actual work.
> >
> > That's a great idea. I will add it. Thanks.
> >
> > > 33. Could we add a section on coordinator failover handling? For
> example,
> > > does it need to trigger the check if any group with the wildcard
> > > subscription now has a new matching topic?
> >
> > Sure. When the new group coordinator takes over, it has to:
> > * Setup the session timeouts.
> > * Trigger a new assignment if a client side assignor is used. We don't
> > store the information about the member selected to run the assignment
> > so we have to start a new one.
> > * Update the topics metadata, verify the wildcard subscriptions, and
> > trigger a rebalance if needed.
> >
> > > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue,
> > > ConsumerGroupMemberMetadataValue: Could we document what the epoch
> field
> > > reflects? For example, does the epoch in ConsumerGroupMetadataValue
> reflect
> > > the latest group epoch? What about the one in
> > > ConsumerGroupPartitionMetadataValue and
> ConsumerGroupMemberMetadataValue?
> >
> > Sure. I will clarify that but it is always the latest group epoch.
> > When the group state is updated, the group epoch is bumped so we use
> > that one for all the change records related to the update.
> >
> > > 35. "the group coordinator will ensure that the following invariants
> are
> > > met: ... All members exists." It's possible for a member not to get any
> > > assigned partitions, right?
> >
> > That's right. Here I meant that the members provided by the assignor
> > in the assignment must exist in the group. The assignor can 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-06 Thread David Jacot
Hi Jun,

I have updated the KIP to include your feedback. I have also tried to
clarify the parts which were not cleared.

Best,
David

On Fri, Sep 2, 2022 at 4:18 PM David Jacot  wrote:
>
> Hi Jun,
>
> Thanks for your feedback. Let me start by answering your questions
> inline and I will update the KIP next week.
>
> > Thanks for the KIP. Overall, the main benefits of the KIP seem to be fewer
> > RPCs during rebalance and more efficient support of wildcard. A few
> > comments below.
>
> I would also add that the KIP removes the global sync barrier in the
> protocol which is essential to improve group stability and
> scalability, and the KIP also simplifies the client by moving most of
> the logic to the server side.
>
> > 30. ConsumerGroupHeartbeatRequest
> > 30.1 ServerAssignor is a singleton. Do we plan to support rolling changing
> > of the partition assignor in the consumers?
>
> Definitely. The group coordinator will use the assignor used by a
> majority of the members. This allows the group to move from one
> assignor to another by a roll. This is explained in the Assignor
> Selection chapter.
>
> > 30.2 For each field, could you explain whether it's required in every
> > request or the scenarios when it needs to be filled? For example, it's not
> > clear to me when TopicPartitions needs to be filled.
>
> The client is expected to set those fields in case of a connection
> issue (e.g. timeout) or when the fields have changed since the last
> HB. The server populates those fields as long as the member is not
> fully reconciled - the member should acknowledge that it has the
> expected epoch and assignment. I will clarify this in the KIP.
>
> > 31. In the current consumer protocol, the rack affinity between the client
> > and the broker is only considered during fetching, but not during assigning
> > partitions to consumers. Sometimes, once the assignment is made, there is
> > no opportunity for read affinity because no replicas of assigned partitions
> > are close to the member. I am wondering if we should use this opportunity
> > to address this by including rack in GroupMember.
>
> That's an interesting idea. I don't see any issue with adding the rack
> to the members. I will do so.
>
> > 32. On the metric side, often, it's useful to know how busy a group
> > coordinator is. By moving the event loop model, it seems that we could add
> > a metric that tracks the fraction of the time the event loop is doing the
> > actual work.
>
> That's a great idea. I will add it. Thanks.
>
> > 33. Could we add a section on coordinator failover handling? For example,
> > does it need to trigger the check if any group with the wildcard
> > subscription now has a new matching topic?
>
> Sure. When the new group coordinator takes over, it has to:
> * Setup the session timeouts.
> * Trigger a new assignment if a client side assignor is used. We don't
> store the information about the member selected to run the assignment
> so we have to start a new one.
> * Update the topics metadata, verify the wildcard subscriptions, and
> trigger a rebalance if needed.
>
> > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue,
> > ConsumerGroupMemberMetadataValue: Could we document what the epoch field
> > reflects? For example, does the epoch in ConsumerGroupMetadataValue reflect
> > the latest group epoch? What about the one in
> > ConsumerGroupPartitionMetadataValue and ConsumerGroupMemberMetadataValue?
>
> Sure. I will clarify that but it is always the latest group epoch.
> When the group state is updated, the group epoch is bumped so we use
> that one for all the change records related to the update.
>
> > 35. "the group coordinator will ensure that the following invariants are
> > met: ... All members exists." It's possible for a member not to get any
> > assigned partitions, right?
>
> That's right. Here I meant that the members provided by the assignor
> in the assignment must exist in the group. The assignor can not make
> up new member ids.
>
> > 36. "He can rejoins the group with a member epoch equals to 0": When would
> > a consumer rejoin and what member id would be used?
>
> A member is expected to abandon all its partitions and rejoins when it
> receives the FENCED_MEMBER_EPOCH error. In this case, the group
> coordinator will have removed the member from the group. The member
> can rejoin the group with the same member id but with 0 as epoch. Let
> me see if I can clarify this in the KIP.
>
> > 37. "Instead, power users will have the ability to trigger a reassignment
> > by either providing a non-zero reason or by updating the assignor
> > metadata." Hmm, this seems to be conflicting with the deprecation of
> > Consumer#enforeRebalance.
>
> In this case, a new assignment is triggered by the client side
> assignor. When constructing the HB, the consumer will always consult
> the client side assignor and propagate the information to the group
> coordinator. In other words, we don't expect users to call

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-02 Thread David Jacot
Hi Jun,

Thanks for your feedback. Let me start by answering your questions
inline and I will update the KIP next week.

> Thanks for the KIP. Overall, the main benefits of the KIP seem to be fewer
> RPCs during rebalance and more efficient support of wildcard. A few
> comments below.

I would also add that the KIP removes the global sync barrier in the
protocol which is essential to improve group stability and
scalability, and the KIP also simplifies the client by moving most of
the logic to the server side.

> 30. ConsumerGroupHeartbeatRequest
> 30.1 ServerAssignor is a singleton. Do we plan to support rolling changing
> of the partition assignor in the consumers?

Definitely. The group coordinator will use the assignor used by a
majority of the members. This allows the group to move from one
assignor to another by a roll. This is explained in the Assignor
Selection chapter.

> 30.2 For each field, could you explain whether it's required in every
> request or the scenarios when it needs to be filled? For example, it's not
> clear to me when TopicPartitions needs to be filled.

The client is expected to set those fields in case of a connection
issue (e.g. timeout) or when the fields have changed since the last
HB. The server populates those fields as long as the member is not
fully reconciled - the member should acknowledge that it has the
expected epoch and assignment. I will clarify this in the KIP.

> 31. In the current consumer protocol, the rack affinity between the client
> and the broker is only considered during fetching, but not during assigning
> partitions to consumers. Sometimes, once the assignment is made, there is
> no opportunity for read affinity because no replicas of assigned partitions
> are close to the member. I am wondering if we should use this opportunity
> to address this by including rack in GroupMember.

That's an interesting idea. I don't see any issue with adding the rack
to the members. I will do so.

> 32. On the metric side, often, it's useful to know how busy a group
> coordinator is. By moving the event loop model, it seems that we could add
> a metric that tracks the fraction of the time the event loop is doing the
> actual work.

That's a great idea. I will add it. Thanks.

> 33. Could we add a section on coordinator failover handling? For example,
> does it need to trigger the check if any group with the wildcard
> subscription now has a new matching topic?

Sure. When the new group coordinator takes over, it has to:
* Setup the session timeouts.
* Trigger a new assignment if a client side assignor is used. We don't
store the information about the member selected to run the assignment
so we have to start a new one.
* Update the topics metadata, verify the wildcard subscriptions, and
trigger a rebalance if needed.

> 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue,
> ConsumerGroupMemberMetadataValue: Could we document what the epoch field
> reflects? For example, does the epoch in ConsumerGroupMetadataValue reflect
> the latest group epoch? What about the one in
> ConsumerGroupPartitionMetadataValue and ConsumerGroupMemberMetadataValue?

Sure. I will clarify that but it is always the latest group epoch.
When the group state is updated, the group epoch is bumped so we use
that one for all the change records related to the update.

> 35. "the group coordinator will ensure that the following invariants are
> met: ... All members exists." It's possible for a member not to get any
> assigned partitions, right?

That's right. Here I meant that the members provided by the assignor
in the assignment must exist in the group. The assignor can not make
up new member ids.

> 36. "He can rejoins the group with a member epoch equals to 0": When would
> a consumer rejoin and what member id would be used?

A member is expected to abandon all its partitions and rejoins when it
receives the FENCED_MEMBER_EPOCH error. In this case, the group
coordinator will have removed the member from the group. The member
can rejoin the group with the same member id but with 0 as epoch. Let
me see if I can clarify this in the KIP.

> 37. "Instead, power users will have the ability to trigger a reassignment
> by either providing a non-zero reason or by updating the assignor
> metadata." Hmm, this seems to be conflicting with the deprecation of
> Consumer#enforeRebalance.

In this case, a new assignment is triggered by the client side
assignor. When constructing the HB, the consumer will always consult
the client side assignor and propagate the information to the group
coordinator. In other words, we don't expect users to call
Consumer#enforceRebalance anymore.

> 38. The reassignment examples are nice. But the section seems to have
> multiple typos.
> 38.1 When the group transitions to epoch 2, B immediately gets into
> "epoch=1, partitions=[foo-2]", which seems incorrect.
> 38.2 When the group transitions to epoch 3, C seems to get into epoch=3,
> partitions=[foo-1] too early.
> 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-09-01 Thread Jun Rao
Hi, David,

Thanks for the KIP. Overall, the main benefits of the KIP seem to be fewer
RPCs during rebalance and more efficient support of wildcard. A few
comments below.

30. ConsumerGroupHeartbeatRequest
30.1 ServerAssignor is a singleton. Do we plan to support rolling changing
of the partition assignor in the consumers?
30.2 For each field, could you explain whether it's required in every
request or the scenarios when it needs to be filled? For example, it's not
clear to me when TopicPartitions needs to be filled.

31. In the current consumer protocol, the rack affinity between the client
and the broker is only considered during fetching, but not during assigning
partitions to consumers. Sometimes, once the assignment is made, there is
no opportunity for read affinity because no replicas of assigned partitions
are close to the member. I am wondering if we should use this opportunity
to address this by including rack in GroupMember.

32. On the metric side, often, it's useful to know how busy a group
coordinator is. By moving the event loop model, it seems that we could add
a metric that tracks the fraction of the time the event loop is doing the
actual work.

33. Could we add a section on coordinator failover handling? For example,
does it need to trigger the check if any group with the wildcard
subscription now has a new matching topic?

34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue,
ConsumerGroupMemberMetadataValue: Could we document what the epoch field
reflects? For example, does the epoch in ConsumerGroupMetadataValue reflect
the latest group epoch? What about the one in
ConsumerGroupPartitionMetadataValue and ConsumerGroupMemberMetadataValue?

35. "the group coordinator will ensure that the following invariants are
met: ... All members exists." It's possible for a member not to get any
assigned partitions, right?

36. "He can rejoins the group with a member epoch equals to 0": When would
a consumer rejoin and what member id would be used?

37. "Instead, power users will have the ability to trigger a reassignment
by either providing a non-zero reason or by updating the assignor
metadata." Hmm, this seems to be conflicting with the deprecation of
Consumer#enforeRebalance.

38. The reassignment examples are nice. But the section seems to have
multiple typos.
38.1 When the group transitions to epoch 2, B immediately gets into
"epoch=1, partitions=[foo-2]", which seems incorrect.
38.2 When the group transitions to epoch 3, C seems to get into epoch=3,
partitions=[foo-1] too early.
38.3 After A transitions to epoch 3, C still has A - epoch=2,
partitions=[foo-0].

39. Rolling upgrade of consumers: Do we support the upgrade from any old
version to new one?

Thanks,

Jun

On Mon, Aug 29, 2022 at 9:20 AM David Jacot 
wrote:

> Hi all,
>
> The KIP states that we will re-implement the coordinator in Java. I
> discussed this offline with a few folks and folks are concerned that
> we could introduce many regressions in the old protocol if we do so.
> Therefore, I am going to remove this statement from the KIP. It is an
> implementation detail after all so it does not have to be decided at
> this stage. We will likely start by trying to refactor the current
> implementation as a first step.
>
> Cheers,
> David
>
> On Mon, Aug 29, 2022 at 3:52 PM David Jacot  wrote:
> >
> > Hi Luke,
> >
> > > 1.1. I think the state machine are: "Empty, assigning, reconciling,
> stable,
> > > dead" mentioned in Consumer Group States section, right?
> >
> > This sentence does not refer to those group states but rather to a
> > state machine replication (SMR). This refers to the entire state of
> > group coordinator which is replicated via the log layer. I will
> > clarify this in the KIP.
> >
> > > 1.2. What do you mean "each state machine is modelled as an event
> loop"?
> >
> > The idea is to follow a model similar to the new quorum controller. We
> > will have N threads to process events. Each __consumer_offsets
> > partition is assigned to a unique thread and all the events (e.g.
> > requests, callbacks, etc.) are processed by this thread. This simplify
> > concurrency and will enable us to do simulation testing for the group
> > coordinator.
> >
> > > 1.3. Why do we need a state machine per *__consumer_offsets*
> partitions?
> > > Not a state machine "per consumer group" owned by a group coordinator?
> For
> > > example, if one group coordinator owns 2 consumer groups, and both
> exist in
> > > *__consumer_offsets-0*, will we have 1 state machine for it, or 2?
> >
> > See 1.1. The confusion comes from there, I think.
> >
> > > 1.4. I know the "*group.coordinator.threads" *is the number of threads
> used
> > > to run the state machines. But I'm wondering if the purpose of the
> threads
> > > is only to keep the state of each consumer group (or
> *__consumer_offsets*
> > > partitions?), and no heavy computation, why should we need
> multi-threads
> > > here?
> >
> > See 1.2. The idea is to have an ability 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-29 Thread David Jacot
Hi all,

The KIP states that we will re-implement the coordinator in Java. I
discussed this offline with a few folks and folks are concerned that
we could introduce many regressions in the old protocol if we do so.
Therefore, I am going to remove this statement from the KIP. It is an
implementation detail after all so it does not have to be decided at
this stage. We will likely start by trying to refactor the current
implementation as a first step.

Cheers,
David

On Mon, Aug 29, 2022 at 3:52 PM David Jacot  wrote:
>
> Hi Luke,
>
> > 1.1. I think the state machine are: "Empty, assigning, reconciling, stable,
> > dead" mentioned in Consumer Group States section, right?
>
> This sentence does not refer to those group states but rather to a
> state machine replication (SMR). This refers to the entire state of
> group coordinator which is replicated via the log layer. I will
> clarify this in the KIP.
>
> > 1.2. What do you mean "each state machine is modelled as an event loop"?
>
> The idea is to follow a model similar to the new quorum controller. We
> will have N threads to process events. Each __consumer_offsets
> partition is assigned to a unique thread and all the events (e.g.
> requests, callbacks, etc.) are processed by this thread. This simplify
> concurrency and will enable us to do simulation testing for the group
> coordinator.
>
> > 1.3. Why do we need a state machine per *__consumer_offsets* partitions?
> > Not a state machine "per consumer group" owned by a group coordinator? For
> > example, if one group coordinator owns 2 consumer groups, and both exist in
> > *__consumer_offsets-0*, will we have 1 state machine for it, or 2?
>
> See 1.1. The confusion comes from there, I think.
>
> > 1.4. I know the "*group.coordinator.threads" *is the number of threads used
> > to run the state machines. But I'm wondering if the purpose of the threads
> > is only to keep the state of each consumer group (or *__consumer_offsets*
> > partitions?), and no heavy computation, why should we need multi-threads
> > here?
>
> See 1.2. The idea is to have an ability to shard the processing as the
> computation could be heavy.
>
> > 2.1. The consumer session timeout, why does the default session timeout not
> > locate between min (45s) and max(60s)? I thought the min/max session
> > timeout is to define lower/upper bound of it, no?
> >
> > group.consumer.session.timeout.ms int 30s The timeout to detect client
> > failures when using the consumer group protocol.
> > group.consumer.min.session.timeout.ms int 45s The minimum session timeout.
> > group.consumer.max.session.timeout.ms int 60s The maximum session timeout.
>
> This is indeed a mistake. The default session timeout should be 45s
> (the current default).
>
> > 2.2. The default server side assignor are [range, uniform], which means
> > we'll default to "range" assignor. I'd like to know why not uniform one? I
> > thought usually users will choose uniform assignor (former sticky assinor)
> > for better evenly distribution. Any other reason we choose range assignor
> > as default?
> > group.consumer.assignors List range, uniform The server side assignors.
>
> The order on the server side has no influence because the client must
> chose the selector that he wants to use. There is no default in the
> current proposal. If the assignor is not specified by the client, the
> request is rejected. The default client value for
> `group.remote.assignor` is `uniform` though.
>
> Thanks for your very good comments, Luke. I hope that my answers help
> to clarify things. I will update the KIP as well based on your
> feedback.
>
> Cheers,
> David
>
> On Mon, Aug 22, 2022 at 9:29 AM Luke Chen  wrote:
> >
> > Hi David,
> >
> > Thanks for the update.
> >
> > Some more questions:
> > 1. In Group Coordinator section, you mentioned:
> > > The new group coordinator will have a state machine per
> > *__consumer_offsets* partitions, where each state machine is modelled as an
> > event loop. Those state machines will be executed in
> > *group.coordinator.threads* threads.
> >
> > 1.1. I think the state machine are: "Empty, assigning, reconciling, stable,
> > dead" mentioned in Consumer Group States section, right?
> > 1.2. What do you mean "each state machine is modelled as an event loop"?
> > 1.3. Why do we need a state machine per *__consumer_offsets* partitions?
> > Not a state machine "per consumer group" owned by a group coordinator? For
> > example, if one group coordinator owns 2 consumer groups, and both exist in
> > *__consumer_offsets-0*, will we have 1 state machine for it, or 2?
> > 1.4. I know the "*group.coordinator.threads" *is the number of threads used
> > to run the state machines. But I'm wondering if the purpose of the threads
> > is only to keep the state of each consumer group (or *__consumer_offsets*
> > partitions?), and no heavy computation, why should we need multi-threads
> > here?
> >
> > 2. For the default value in the new configs:
> > 2.1. The consumer 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-29 Thread David Jacot
Hi Luke,

> 1.1. I think the state machine are: "Empty, assigning, reconciling, stable,
> dead" mentioned in Consumer Group States section, right?

This sentence does not refer to those group states but rather to a
state machine replication (SMR). This refers to the entire state of
group coordinator which is replicated via the log layer. I will
clarify this in the KIP.

> 1.2. What do you mean "each state machine is modelled as an event loop"?

The idea is to follow a model similar to the new quorum controller. We
will have N threads to process events. Each __consumer_offsets
partition is assigned to a unique thread and all the events (e.g.
requests, callbacks, etc.) are processed by this thread. This simplify
concurrency and will enable us to do simulation testing for the group
coordinator.

> 1.3. Why do we need a state machine per *__consumer_offsets* partitions?
> Not a state machine "per consumer group" owned by a group coordinator? For
> example, if one group coordinator owns 2 consumer groups, and both exist in
> *__consumer_offsets-0*, will we have 1 state machine for it, or 2?

See 1.1. The confusion comes from there, I think.

> 1.4. I know the "*group.coordinator.threads" *is the number of threads used
> to run the state machines. But I'm wondering if the purpose of the threads
> is only to keep the state of each consumer group (or *__consumer_offsets*
> partitions?), and no heavy computation, why should we need multi-threads
> here?

See 1.2. The idea is to have an ability to shard the processing as the
computation could be heavy.

> 2.1. The consumer session timeout, why does the default session timeout not
> locate between min (45s) and max(60s)? I thought the min/max session
> timeout is to define lower/upper bound of it, no?
>
> group.consumer.session.timeout.ms int 30s The timeout to detect client
> failures when using the consumer group protocol.
> group.consumer.min.session.timeout.ms int 45s The minimum session timeout.
> group.consumer.max.session.timeout.ms int 60s The maximum session timeout.

This is indeed a mistake. The default session timeout should be 45s
(the current default).

> 2.2. The default server side assignor are [range, uniform], which means
> we'll default to "range" assignor. I'd like to know why not uniform one? I
> thought usually users will choose uniform assignor (former sticky assinor)
> for better evenly distribution. Any other reason we choose range assignor
> as default?
> group.consumer.assignors List range, uniform The server side assignors.

The order on the server side has no influence because the client must
chose the selector that he wants to use. There is no default in the
current proposal. If the assignor is not specified by the client, the
request is rejected. The default client value for
`group.remote.assignor` is `uniform` though.

Thanks for your very good comments, Luke. I hope that my answers help
to clarify things. I will update the KIP as well based on your
feedback.

Cheers,
David

On Mon, Aug 22, 2022 at 9:29 AM Luke Chen  wrote:
>
> Hi David,
>
> Thanks for the update.
>
> Some more questions:
> 1. In Group Coordinator section, you mentioned:
> > The new group coordinator will have a state machine per
> *__consumer_offsets* partitions, where each state machine is modelled as an
> event loop. Those state machines will be executed in
> *group.coordinator.threads* threads.
>
> 1.1. I think the state machine are: "Empty, assigning, reconciling, stable,
> dead" mentioned in Consumer Group States section, right?
> 1.2. What do you mean "each state machine is modelled as an event loop"?
> 1.3. Why do we need a state machine per *__consumer_offsets* partitions?
> Not a state machine "per consumer group" owned by a group coordinator? For
> example, if one group coordinator owns 2 consumer groups, and both exist in
> *__consumer_offsets-0*, will we have 1 state machine for it, or 2?
> 1.4. I know the "*group.coordinator.threads" *is the number of threads used
> to run the state machines. But I'm wondering if the purpose of the threads
> is only to keep the state of each consumer group (or *__consumer_offsets*
> partitions?), and no heavy computation, why should we need multi-threads
> here?
>
> 2. For the default value in the new configs:
> 2.1. The consumer session timeout, why does the default session timeout not
> locate between min (45s) and max(60s)? I thought the min/max session
> timeout is to define lower/upper bound of it, no?
>
> group.consumer.session.timeout.ms int 30s The timeout to detect client
> failures when using the consumer group protocol.
> group.consumer.min.session.timeout.ms int 45s The minimum session timeout.
> group.consumer.max.session.timeout.ms int 60s The maximum session timeout.
>
>
>
> 2.2. The default server side assignor are [range, uniform], which means
> we'll default to "range" assignor. I'd like to know why not uniform one? I
> thought usually users will choose uniform assignor (former sticky assinor)
> for 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-22 Thread Sagar
Hi All,

As per the suggestions from David/Guozhang above, I updated the page for
Connect to have it's own set of APIs and not extend the ones from consumer.
Plz review again.

@Luke,

Thank you. I am actually looking for review comments/thoughts/suggestions
as Connect changes are still in draft stage but since this is a wholesome
change, I am sure there might be many cases missed by me. Also, on this
discussion thread David suggested to continue discussing on this very
thread to ensure connect is compatible with whatever is done in KIP-848. I
am ok either way.

@Guozhang,

Regarding the migration plan and the link to KIP-415 that you had shared, I
had taken a look at it. From what I understood, that migration/downgrading
is from switching the assignment protocol from eager -> incremental or vice
versa. In this case, the migration is to the new rebalance protocol which
would also depend on the group coordinator and whether they support the new
rebalance protocol or not. I have tried to incorporate this aspect in the
draft page.

Thanks!
Sagar.




On Mon, Aug 22, 2022 at 1:00 PM Luke Chen  wrote:

> Hi David,
>
> Thanks for the update.
>
> Some more questions:
> 1. In Group Coordinator section, you mentioned:
> > The new group coordinator will have a state machine per
> *__consumer_offsets* partitions, where each state machine is modelled as an
> event loop. Those state machines will be executed in
> *group.coordinator.threads* threads.
>
> 1.1. I think the state machine are: "Empty, assigning, reconciling, stable,
> dead" mentioned in Consumer Group States section, right?
> 1.2. What do you mean "each state machine is modelled as an event loop"?
> 1.3. Why do we need a state machine per *__consumer_offsets* partitions?
> Not a state machine "per consumer group" owned by a group coordinator? For
> example, if one group coordinator owns 2 consumer groups, and both exist in
> *__consumer_offsets-0*, will we have 1 state machine for it, or 2?
> 1.4. I know the "*group.coordinator.threads" *is the number of threads used
> to run the state machines. But I'm wondering if the purpose of the threads
> is only to keep the state of each consumer group (or *__consumer_offsets*
> partitions?), and no heavy computation, why should we need multi-threads
> here?
>
> 2. For the default value in the new configs:
> 2.1. The consumer session timeout, why does the default session timeout not
> locate between min (45s) and max(60s)? I thought the min/max session
> timeout is to define lower/upper bound of it, no?
>
> group.consumer.session.timeout.ms int 30s The timeout to detect client
> failures when using the consumer group protocol.
> group.consumer.min.session.timeout.ms int 45s The minimum session timeout.
> group.consumer.max.session.timeout.ms int 60s The maximum session timeout.
>
>
>
> 2.2. The default server side assignor are [range, uniform], which means
> we'll default to "range" assignor. I'd like to know why not uniform one? I
> thought usually users will choose uniform assignor (former sticky assinor)
> for better evenly distribution. Any other reason we choose range assignor
> as default?
> group.consumer.assignors List range, uniform The server side assignors.
>
>
>
>
>
>
> Thank you.
> Luke
>
>
>
>
>
>
> On Mon, Aug 22, 2022 at 2:10 PM Luke Chen  wrote:
>
> > Hi Sagar,
> >
> > I have some thoughts about Kafka Connect integrating with KIP-848, but I
> > think we should have a separate discussion thread for the Kafka Connect
> > KIP: Integrating Kafka Connect With New Consumer Rebalance Protocol [1],
> > and let this discussion thread focus on consumer rebalance protocol,
> WDYT?
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> >
> > Thank you.
> > Luke
> >
> >
> >
> > On Fri, Aug 12, 2022 at 9:31 PM Sagar  wrote:
> >
> >> Thank you Guozhang/David for the feedback. Looks like there's agreement
> on
> >> using separate APIs for Connect. I would revisit the doc and see what
> >> changes are to be made.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Tue, Aug 9, 2022 at 7:11 PM David Jacot  >
> >> wrote:
> >>
> >> > Hi Sagar,
> >> >
> >> > Thanks for the feedback and the document. That's really helpful. I
> >> > will take a look at it.
> >> >
> >> > Overall, it seems to me that both Connect and the Consumer could share
> >> > the same underlying "engine". The main difference is that the Consumer
> >> > assigns topic-partitions to members whereas Connect assigns tasks to
> >> > workers. I see two ways to move forward:
> >> > 1) We extend the new proposed APIs to support different resource types
> >> > (e.g. partitions, tasks, etc.); or
> >> > 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> >> > similar to the new ones but different on the content/resources and
> >> > they would rely on the same engine on the coordinator side.
> >> >
> >> > I personally lean towards 2) because I am not a fan of 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-22 Thread Luke Chen
Hi David,

Thanks for the update.

Some more questions:
1. In Group Coordinator section, you mentioned:
> The new group coordinator will have a state machine per
*__consumer_offsets* partitions, where each state machine is modelled as an
event loop. Those state machines will be executed in
*group.coordinator.threads* threads.

1.1. I think the state machine are: "Empty, assigning, reconciling, stable,
dead" mentioned in Consumer Group States section, right?
1.2. What do you mean "each state machine is modelled as an event loop"?
1.3. Why do we need a state machine per *__consumer_offsets* partitions?
Not a state machine "per consumer group" owned by a group coordinator? For
example, if one group coordinator owns 2 consumer groups, and both exist in
*__consumer_offsets-0*, will we have 1 state machine for it, or 2?
1.4. I know the "*group.coordinator.threads" *is the number of threads used
to run the state machines. But I'm wondering if the purpose of the threads
is only to keep the state of each consumer group (or *__consumer_offsets*
partitions?), and no heavy computation, why should we need multi-threads
here?

2. For the default value in the new configs:
2.1. The consumer session timeout, why does the default session timeout not
locate between min (45s) and max(60s)? I thought the min/max session
timeout is to define lower/upper bound of it, no?

group.consumer.session.timeout.ms int 30s The timeout to detect client
failures when using the consumer group protocol.
group.consumer.min.session.timeout.ms int 45s The minimum session timeout.
group.consumer.max.session.timeout.ms int 60s The maximum session timeout.



2.2. The default server side assignor are [range, uniform], which means
we'll default to "range" assignor. I'd like to know why not uniform one? I
thought usually users will choose uniform assignor (former sticky assinor)
for better evenly distribution. Any other reason we choose range assignor
as default?
group.consumer.assignors List range, uniform The server side assignors.






Thank you.
Luke






On Mon, Aug 22, 2022 at 2:10 PM Luke Chen  wrote:

> Hi Sagar,
>
> I have some thoughts about Kafka Connect integrating with KIP-848, but I
> think we should have a separate discussion thread for the Kafka Connect
> KIP: Integrating Kafka Connect With New Consumer Rebalance Protocol [1],
> and let this discussion thread focus on consumer rebalance protocol, WDYT?
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
>
> Thank you.
> Luke
>
>
>
> On Fri, Aug 12, 2022 at 9:31 PM Sagar  wrote:
>
>> Thank you Guozhang/David for the feedback. Looks like there's agreement on
>> using separate APIs for Connect. I would revisit the doc and see what
>> changes are to be made.
>>
>> Thanks!
>> Sagar.
>>
>> On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
>> wrote:
>>
>> > Hi Sagar,
>> >
>> > Thanks for the feedback and the document. That's really helpful. I
>> > will take a look at it.
>> >
>> > Overall, it seems to me that both Connect and the Consumer could share
>> > the same underlying "engine". The main difference is that the Consumer
>> > assigns topic-partitions to members whereas Connect assigns tasks to
>> > workers. I see two ways to move forward:
>> > 1) We extend the new proposed APIs to support different resource types
>> > (e.g. partitions, tasks, etc.); or
>> > 2) We use new dedicated APIs for Connect. The dedicated APIs would be
>> > similar to the new ones but different on the content/resources and
>> > they would rely on the same engine on the coordinator side.
>> >
>> > I personally lean towards 2) because I am not a fan of overcharging
>> > APIs to serve different purposes. That being said, I am not opposed to
>> > 1) if we can find an elegant way to do it.
>> >
>> > I think that we can continue to discuss it here for now in order to
>> > ensure that this KIP is compatible with what we will do for Connect in
>> > the future.
>> >
>> > Best,
>> > David
>> >
>> > On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
>> > >
>> > > Hi all,
>> > >
>> > > I am back from vacation. I will go through and address your comments
>> > > in the coming days. Thanks for your feedback.
>> > >
>> > > Cheers,
>> > > David
>> > >
>> > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris > >
>> > wrote:
>> > > >
>> > > > Hey All!
>> > > >
>> > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing
>> > making it
>> > > > down the stack!
>> > > >
>> > > > I had a few questions:
>> > > >
>> > > > 1. The 'Rejected Alternatives' section describes how member epoch
>> > should
>> > > > advance in step with the group epoch and assignment epoch values. I
>> > think
>> > > > that this is a good idea for the reasons described in the KIP. When
>> the
>> > > > protocol is incrementally assigning partitions to a worker, what
>> member
>> > > > epoch does each incremental assignment use? Are member epochs
>> re-used,
>> > and
>> > > > 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-22 Thread Luke Chen
Hi Sagar,

I have some thoughts about Kafka Connect integrating with KIP-848, but I
think we should have a separate discussion thread for the Kafka Connect
KIP: Integrating Kafka Connect With New Consumer Rebalance Protocol [1],
and let this discussion thread focus on consumer rebalance protocol, WDYT?

[1]
https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol

Thank you.
Luke



On Fri, Aug 12, 2022 at 9:31 PM Sagar  wrote:

> Thank you Guozhang/David for the feedback. Looks like there's agreement on
> using separate APIs for Connect. I would revisit the doc and see what
> changes are to be made.
>
> Thanks!
> Sagar.
>
> On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
> wrote:
>
> > Hi Sagar,
> >
> > Thanks for the feedback and the document. That's really helpful. I
> > will take a look at it.
> >
> > Overall, it seems to me that both Connect and the Consumer could share
> > the same underlying "engine". The main difference is that the Consumer
> > assigns topic-partitions to members whereas Connect assigns tasks to
> > workers. I see two ways to move forward:
> > 1) We extend the new proposed APIs to support different resource types
> > (e.g. partitions, tasks, etc.); or
> > 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> > similar to the new ones but different on the content/resources and
> > they would rely on the same engine on the coordinator side.
> >
> > I personally lean towards 2) because I am not a fan of overcharging
> > APIs to serve different purposes. That being said, I am not opposed to
> > 1) if we can find an elegant way to do it.
> >
> > I think that we can continue to discuss it here for now in order to
> > ensure that this KIP is compatible with what we will do for Connect in
> > the future.
> >
> > Best,
> > David
> >
> > On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
> > >
> > > Hi all,
> > >
> > > I am back from vacation. I will go through and address your comments
> > > in the coming days. Thanks for your feedback.
> > >
> > > Cheers,
> > > David
> > >
> > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris 
> > wrote:
> > > >
> > > > Hey All!
> > > >
> > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing
> > making it
> > > > down the stack!
> > > >
> > > > I had a few questions:
> > > >
> > > > 1. The 'Rejected Alternatives' section describes how member epoch
> > should
> > > > advance in step with the group epoch and assignment epoch values. I
> > think
> > > > that this is a good idea for the reasons described in the KIP. When
> the
> > > > protocol is incrementally assigning partitions to a worker, what
> member
> > > > epoch does each incremental assignment use? Are member epochs
> re-used,
> > and
> > > > a single member epoch can correspond to multiple different
> > (monotonically
> > > > larger) assignments?
> > > >
> > > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator?
> If
> > > > not, should custom client-side assignor implementations interact with
> > the
> > > > Reason field, and how is its common meaning agreed upon? If so, what
> > is the
> > > > benefit of a distinct Reason field over including such functionality
> > in the
> > > > opaque metadata?
> > > >
> > > > 3. The following is included in the KIP: "Thanks to this, the input
> of
> > the
> > > > client side assignor is entirely driven by the group coordinator. The
> > > > consumer is no longer responsible for maintaining any state besides
> its
> > > > assigned partitions." Does this mean that the client-side assignor
> MAY
> > > > incorporate additional non-Metadata state (such as partition
> > throughput,
> > > > cpu/memory metrics, config topics, etc), or that additional
> > non-Metadata
> > > > state SHOULD NOT be used?
> > > >
> > > > 4. I see that there are separate classes
> > > > for org.apache.kafka.server.group.consumer.PartitionAssignor
> > > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> > > > overlap significantly. Is it possible for these two implementations
> to
> > be
> > > > unified? This would serve to promote feature parity of server-side
> and
> > > > client-side assignors, and would also facilitate operational
> > flexibility in
> > > > certain situations. For example, if a server-side assignor has some
> > poor
> > > > behavior and needs a patch, deploying the patched assignor to the
> > client
> > > > and switching one consumer group to a client-side assignor may be
> > faster
> > > > and less risky than patching all of the brokers. With the currently
> > > > proposed distinct APIs, a non-trivial reimplementation would have to
> be
> > > > assembled, and if the two APIs have diverged significantly, then it
> is
> > > > possible that a reimplementation would not be possible.
> > > >
> > > > --
> > > > Greg Harris
> > > > gharris1...@gmail.com
> > > > github.com/gharris1727
> > > >
> > > > On Wed, Aug 3, 2022 at 8:39 AM Sagar 
> > 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-12 Thread Sagar
Thank you Guozhang/David for the feedback. Looks like there's agreement on
using separate APIs for Connect. I would revisit the doc and see what
changes are to be made.

Thanks!
Sagar.

On Tue, Aug 9, 2022 at 7:11 PM David Jacot 
wrote:

> Hi Sagar,
>
> Thanks for the feedback and the document. That's really helpful. I
> will take a look at it.
>
> Overall, it seems to me that both Connect and the Consumer could share
> the same underlying "engine". The main difference is that the Consumer
> assigns topic-partitions to members whereas Connect assigns tasks to
> workers. I see two ways to move forward:
> 1) We extend the new proposed APIs to support different resource types
> (e.g. partitions, tasks, etc.); or
> 2) We use new dedicated APIs for Connect. The dedicated APIs would be
> similar to the new ones but different on the content/resources and
> they would rely on the same engine on the coordinator side.
>
> I personally lean towards 2) because I am not a fan of overcharging
> APIs to serve different purposes. That being said, I am not opposed to
> 1) if we can find an elegant way to do it.
>
> I think that we can continue to discuss it here for now in order to
> ensure that this KIP is compatible with what we will do for Connect in
> the future.
>
> Best,
> David
>
> On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
> >
> > Hi all,
> >
> > I am back from vacation. I will go through and address your comments
> > in the coming days. Thanks for your feedback.
> >
> > Cheers,
> > David
> >
> > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris 
> wrote:
> > >
> > > Hey All!
> > >
> > > Thanks for the KIP, it's wonderful to see cooperative rebalancing
> making it
> > > down the stack!
> > >
> > > I had a few questions:
> > >
> > > 1. The 'Rejected Alternatives' section describes how member epoch
> should
> > > advance in step with the group epoch and assignment epoch values. I
> think
> > > that this is a good idea for the reasons described in the KIP. When the
> > > protocol is incrementally assigning partitions to a worker, what member
> > > epoch does each incremental assignment use? Are member epochs re-used,
> and
> > > a single member epoch can correspond to multiple different
> (monotonically
> > > larger) assignments?
> > >
> > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> > > not, should custom client-side assignor implementations interact with
> the
> > > Reason field, and how is its common meaning agreed upon? If so, what
> is the
> > > benefit of a distinct Reason field over including such functionality
> in the
> > > opaque metadata?
> > >
> > > 3. The following is included in the KIP: "Thanks to this, the input of
> the
> > > client side assignor is entirely driven by the group coordinator. The
> > > consumer is no longer responsible for maintaining any state besides its
> > > assigned partitions." Does this mean that the client-side assignor MAY
> > > incorporate additional non-Metadata state (such as partition
> throughput,
> > > cpu/memory metrics, config topics, etc), or that additional
> non-Metadata
> > > state SHOULD NOT be used?
> > >
> > > 4. I see that there are separate classes
> > > for org.apache.kafka.server.group.consumer.PartitionAssignor
> > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> > > overlap significantly. Is it possible for these two implementations to
> be
> > > unified? This would serve to promote feature parity of server-side and
> > > client-side assignors, and would also facilitate operational
> flexibility in
> > > certain situations. For example, if a server-side assignor has some
> poor
> > > behavior and needs a patch, deploying the patched assignor to the
> client
> > > and switching one consumer group to a client-side assignor may be
> faster
> > > and less risky than patching all of the brokers. With the currently
> > > proposed distinct APIs, a non-trivial reimplementation would have to be
> > > assembled, and if the two APIs have diverged significantly, then it is
> > > possible that a reimplementation would not be possible.
> > >
> > > --
> > > Greg Harris
> > > gharris1...@gmail.com
> > > github.com/gharris1727
> > >
> > > On Wed, Aug 3, 2022 at 8:39 AM Sagar 
> wrote:
> > >
> > > > Hi Guozhang/David,
> > > >
> > > > I created a confluence page to discuss how Connect would need to
> change
> > > > based on the new rebalance protocol. Here's the page:
> > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> > > >
> > > > It's also pretty longish and I have tried to keep a format similar to
> > > > KIP-848. Let me know what you think. Also, do you think this should
> be
> > > > moved to a separate discussion thread or is this one fine?
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Tue, Jul 26, 2022 at 7:37 AM Sagar 
> wrote:
> > > >
> > > > > Hello Guozhang,
> > > > >
> > > > > Thank you so much for the 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-12 Thread David Jacot
Hi all,

Thanks again for all the very good feedback so far. I have addressed,
I think, all the outstanding points. Here are the main updates:

* I have introduced STALE_MEMBER_EPOCH to differentiate it from the
fatal FENCED_MEMBER_EPOCH error.

* I have introduced topic ids in the OffsetCommit/OffsetFetch APIs.
This allows the Consumer to use Topic IDs everywhere and thus gives it
stronger guarantees. I have also added a few words about topic
recreation in this context.

* After discussing with Jason and Guozhang offline, I have decided to
follow Ismael's advice to store dynamic group configuration in the
controller. This is simpler with the IncrementalAlterConfigs API and
decouple the life cycle of the configs from the life cycle of the
groups.

* I have introduced a new Pattern class to model the new regular
expressions syntax (Google RE2/J) and differentiate them from the
java.util.regex.Pattern ones. Simple regular expressions should work
in both cases but more advances won't so it seems preferable, from a
user point of view, to be clear and to differentiate them.

Please take another look and let me know what you think.

Best,
David

On Wed, Aug 10, 2022 at 2:36 AM Guozhang Wang  wrote:
>
> Hello Greg,
>
> Thanks for reviewing the KIP! I hope David has addressed your questions
> above, and I'd like to just emphasize one thing regarding your question 1):
> one principle of the reconciliation is that, for a single client, before it
> has completely revoked all the partitions that it should revoke, the
> coordinator would not assign new partitions to it even if those partitions
> have already been revoked by the old host and hence are available. And when
> it has revoked all its partitions, the coordinator would start giving it
> new partitions while at the same time bumping its epoch. Of course in
> practice, a client should try to always revoke all its should-be-revoked
> partitions together in a single shot which is more efficient, but from the
> point view of the coordinator it's always "first revoke all you should
> revoke, and then I will bump your epoch up and start giving you more
> partitions". Though this may not be most efficient since coordinators maybe
> holding on some available-partitions-to-give to the client before the
> client has complete revoking, it gives us a good barrier (hence the epoch
> bumping) to determine if the client can continue getting new partitions
> even after soft failures or network partitions.
>
>
> Guozhang
>
> On Wed, Aug 3, 2022 at 1:05 PM Gregory Harris  wrote:
>
> > Hey All!
> >
> > Thanks for the KIP, it's wonderful to see cooperative rebalancing making it
> > down the stack!
> >
> > I had a few questions:
> >
> > 1. The 'Rejected Alternatives' section describes how member epoch should
> > advance in step with the group epoch and assignment epoch values. I think
> > that this is a good idea for the reasons described in the KIP. When the
> > protocol is incrementally assigning partitions to a worker, what member
> > epoch does each incremental assignment use? Are member epochs re-used, and
> > a single member epoch can correspond to multiple different (monotonically
> > larger) assignments?
> >
> > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> > not, should custom client-side assignor implementations interact with the
> > Reason field, and how is its common meaning agreed upon? If so, what is the
> > benefit of a distinct Reason field over including such functionality in the
> > opaque metadata?
> >
> > 3. The following is included in the KIP: "Thanks to this, the input of the
> > client side assignor is entirely driven by the group coordinator. The
> > consumer is no longer responsible for maintaining any state besides its
> > assigned partitions." Does this mean that the client-side assignor MAY
> > incorporate additional non-Metadata state (such as partition throughput,
> > cpu/memory metrics, config topics, etc), or that additional non-Metadata
> > state SHOULD NOT be used?
> >
> > 4. I see that there are separate classes
> > for org.apache.kafka.server.group.consumer.PartitionAssignor
> > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> > overlap significantly. Is it possible for these two implementations to be
> > unified? This would serve to promote feature parity of server-side and
> > client-side assignors, and would also facilitate operational flexibility in
> > certain situations. For example, if a server-side assignor has some poor
> > behavior and needs a patch, deploying the patched assignor to the client
> > and switching one consumer group to a client-side assignor may be faster
> > and less risky than patching all of the brokers. With the currently
> > proposed distinct APIs, a non-trivial reimplementation would have to be
> > assembled, and if the two APIs have diverged significantly, then it is
> > possible that a reimplementation would not be possible.
> >
> > --
> > Greg Harris
> > 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-09 Thread Guozhang Wang
Hello Greg,

Thanks for reviewing the KIP! I hope David has addressed your questions
above, and I'd like to just emphasize one thing regarding your question 1):
one principle of the reconciliation is that, for a single client, before it
has completely revoked all the partitions that it should revoke, the
coordinator would not assign new partitions to it even if those partitions
have already been revoked by the old host and hence are available. And when
it has revoked all its partitions, the coordinator would start giving it
new partitions while at the same time bumping its epoch. Of course in
practice, a client should try to always revoke all its should-be-revoked
partitions together in a single shot which is more efficient, but from the
point view of the coordinator it's always "first revoke all you should
revoke, and then I will bump your epoch up and start giving you more
partitions". Though this may not be most efficient since coordinators maybe
holding on some available-partitions-to-give to the client before the
client has complete revoking, it gives us a good barrier (hence the epoch
bumping) to determine if the client can continue getting new partitions
even after soft failures or network partitions.


Guozhang

On Wed, Aug 3, 2022 at 1:05 PM Gregory Harris  wrote:

> Hey All!
>
> Thanks for the KIP, it's wonderful to see cooperative rebalancing making it
> down the stack!
>
> I had a few questions:
>
> 1. The 'Rejected Alternatives' section describes how member epoch should
> advance in step with the group epoch and assignment epoch values. I think
> that this is a good idea for the reasons described in the KIP. When the
> protocol is incrementally assigning partitions to a worker, what member
> epoch does each incremental assignment use? Are member epochs re-used, and
> a single member epoch can correspond to multiple different (monotonically
> larger) assignments?
>
> 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> not, should custom client-side assignor implementations interact with the
> Reason field, and how is its common meaning agreed upon? If so, what is the
> benefit of a distinct Reason field over including such functionality in the
> opaque metadata?
>
> 3. The following is included in the KIP: "Thanks to this, the input of the
> client side assignor is entirely driven by the group coordinator. The
> consumer is no longer responsible for maintaining any state besides its
> assigned partitions." Does this mean that the client-side assignor MAY
> incorporate additional non-Metadata state (such as partition throughput,
> cpu/memory metrics, config topics, etc), or that additional non-Metadata
> state SHOULD NOT be used?
>
> 4. I see that there are separate classes
> for org.apache.kafka.server.group.consumer.PartitionAssignor
> and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> overlap significantly. Is it possible for these two implementations to be
> unified? This would serve to promote feature parity of server-side and
> client-side assignors, and would also facilitate operational flexibility in
> certain situations. For example, if a server-side assignor has some poor
> behavior and needs a patch, deploying the patched assignor to the client
> and switching one consumer group to a client-side assignor may be faster
> and less risky than patching all of the brokers. With the currently
> proposed distinct APIs, a non-trivial reimplementation would have to be
> assembled, and if the two APIs have diverged significantly, then it is
> possible that a reimplementation would not be possible.
>
> --
> Greg Harris
> gharris1...@gmail.com
> github.com/gharris1727
>
> On Wed, Aug 3, 2022 at 8:39 AM Sagar  wrote:
>
> > Hi Guozhang/David,
> >
> > I created a confluence page to discuss how Connect would need to change
> > based on the new rebalance protocol. Here's the page:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> >
> > It's also pretty longish and I have tried to keep a format similar to
> > KIP-848. Let me know what you think. Also, do you think this should be
> > moved to a separate discussion thread or is this one fine?
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 26, 2022 at 7:37 AM Sagar  wrote:
> >
> > > Hello Guozhang,
> > >
> > > Thank you so much for the doc on Kafka Streams. Sure, I would do the
> > > analysis and come up with such a document.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang 
> > wrote:
> > >
> > >> Hello Sagar,
> > >>
> > >> It would be great if you could come back with some analysis on how to
> > >> implement the Connect side integration with the new protocol; so far
> > >> besides leveraging on the new "protocol type" we did not yet think
> > through
> > >> the Connect side implementations. For Streams here's a draft of
> > >> integration
> > >> plan:
> > >>
> > >>
> >
> 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-09 Thread Guozhang Wang
Thanks Sagar, I made a quick read on your doc, and am also leaning towards
having a separate API for connect. Would also love to hear @Randall Hauch
 @Konstantin 's opinion about that.

One thing is that for Kafka Connect, their compatibility story, especially
downgrading is more flexible than consumer (you can find that from
https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)
so maybe we need to think a bit differently for the compatibility.


Guozhang

On Wed, Aug 3, 2022 at 5:39 AM Sagar  wrote:

> Hi Guozhang/David,
>
> I created a confluence page to discuss how Connect would need to change
> based on the new rebalance protocol. Here's the page:
>
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
>
> It's also pretty longish and I have tried to keep a format similar to
> KIP-848. Let me know what you think. Also, do you think this should be
> moved to a separate discussion thread or is this one fine?
>
> Thanks!
> Sagar.
>
> On Tue, Jul 26, 2022 at 7:37 AM Sagar  wrote:
>
> > Hello Guozhang,
> >
> > Thank you so much for the doc on Kafka Streams. Sure, I would do the
> > analysis and come up with such a document.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang 
> wrote:
> >
> >> Hello Sagar,
> >>
> >> It would be great if you could come back with some analysis on how to
> >> implement the Connect side integration with the new protocol; so far
> >> besides leveraging on the new "protocol type" we did not yet think
> through
> >> the Connect side implementations. For Streams here's a draft of
> >> integration
> >> plan:
> >>
> >>
> https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn
> >> just FYI for your analysis on Connect.
> >>
> >> On Tue, Jul 19, 2022 at 10:48 PM Sagar 
> wrote:
> >>
> >> > Hi David,
> >> >
> >> > Thank you for your response. The reason I thought connect can also fit
> >> into
> >> > this new scheme is that even today the connect uses a
> WorkerCoordinator
> >> > extending from AbstractCoordinator to empower rebalances of
> >> > tasks/connectors. The WorkerCoordinator sets the protocolType() to
> >> connect
> >> > and uses the metadata() method by plumbing into
> >> JoinGroupRequestProtocol.
> >> >
> >> > I think the changes to support connect would be similar at a high
> level
> >> to
> >> > the changes in streams mainly because of the Client side assignors
> being
> >> > used in both. At an implementation level, we might need to make a lot
> of
> >> > changes to get onto this new assignment protocol like enhancing the
> >> > JoinGroup request/response and SyncGroup and using
> >> ConsumerGroupHeartbeat
> >> > API etc again on similar lines to streams (or there might be
> >> deviations). I
> >> > would try to perform a detailed analysis of the same  and we can have
> a
> >> > separate discussion thread for that as that would derail this
> discussion
> >> > thread. Let me know if that sounds good to you.
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> >
> >> >
> >> > On Fri, Jul 15, 2022 at 5:47 PM David Jacot
>  >> >
> >> > wrote:
> >> >
> >> > > Hi Sagar,
> >> > >
> >> > > Thanks for your comments.
> >> > >
> >> > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it.
> >> > >
> >> > > 2) The idea is to transition C from his current assignment to his
> >> > > target assignment when he can move to epoch 3. When that happens,
> the
> >> > > member assignment is updated and persisted with all its assigned
> >> > > partitions even if they are not all revoked yet. In other words, the
> >> > > member assignment becomes the target assignment. This is basically
> an
> >> > > optimization to avoid having to write all the changes to the log.
> The
> >> > > examples are based on the persisted state so I understand the
> >> > > confusion. Let me see if I can improve this in the description.
> >> > >
> >> > > 3) Regarding Connect, it could reuse the protocol with a client side
> >> > > assignor if it fits in the protocol. The assignment is about
> >> > > topicid-partitions + metadata, could Connect fit into this?
> >> > >
> >> > > Best,
> >> > > David
> >> > >
> >> > > On Fri, Jul 15, 2022 at 1:55 PM Sagar 
> >> wrote:
> >> > > >
> >> > > > Hi David,
> >> > > >
> >> > > > Thanks for the KIP. I just had minor observations:
> >> > > >
> >> > > > 1) In the Assignment Error section in Client Side mode Assignment
> >> > > process,
> >> > > > you mentioned => `In this case, the client side assignor can
> return
> >> an
> >> > > > error to the group coordinator`. In this case are you referring to
> >> the
> >> > > > Assignor returning an AssignmentError that's listed down towards
> the
> >> > end?
> >> > > > If yes, do you think it would make sense to mention this
> explicitly
> >> > here?
> >> > > >
> >> > > > 2) In the Case Studies section, I have a slight 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-09 Thread David Jacot
Hi Gregory,

Thanks for your feedback. Please find my answers below.

> 1. The 'Rejected Alternatives' section describes how member epoch should
> advance in step with the group epoch and assignment epoch values. I think
> that this is a good idea for the reasons described in the KIP. When the
> protocol is incrementally assigning partitions to a worker, what member
> epoch does each incremental assignment use? Are member epochs re-used, and
> a single member epoch can correspond to multiple different (monotonically
> larger) assignments?

Those are good questions. The member epoch is aligned to the
group/assignment. A member transitions to its new target epoch when it
has revoked the partitions that he no longer owns. It has to revoke
partitions with its current epoch. When the revocation is completed,
it immediately gets its new epoch with the assigned partitions which
are already free to be assigned. When the remaining partitions are
freed up by the other members, the coordinator assigns them to the
member while keeping the same member epoch. In other words, a member
can revoke partitions or can get new partitions while having the same
epoch.

> 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> not, should custom client-side assignor implementations interact with the
> Reason field, and how is its common meaning agreed upon? If so, what is the
> benefit of a distinct Reason field over including such functionality in the
> opaque metadata?

It is opaque to a certain extent. Any non zero values are considered
as errors but the non zero values are not specified in the protocol.
This is left to the client-side assignor implementation. There are two
benefits here: 1) The reason is used by the coordinator to do the
validation. If non-zero, the assignment should not change. If zero,
the assignment must be valid. 2) It allows the operator to see if the
assignment was successful without having to deserialize the metadata.

> 3. The following is included in the KIP: "Thanks to this, the input of the
> client side assignor is entirely driven by the group coordinator. The
> consumer is no longer responsible for maintaining any state besides its
> assigned partitions." Does this mean that the client-side assignor MAY
> incorporate additional non-Metadata state (such as partition throughput,
> cpu/memory metrics, config topics, etc), or that additional non-Metadata
> state SHOULD NOT be used?

Here, I was referring to the canonical case. Today the KafkaConsumer
tracks metadata (e.g. partitions) and feeds the assignor with it. With
the new protocol, we want the assignor to get all its information from
the coordinator in order to avoid metadata divergences. You can
definitely do whatever you want in a custom assignor. You could use
metadata propagated via the protocol itself in the metadata field or
use external metadata sources as well.

> 4. I see that there are separate classes
> for org.apache.kafka.server.group.consumer.PartitionAssignor
> and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> overlap significantly. Is it possible for these two implementations to be
> unified? This would serve to promote feature parity of server-side and
> client-side assignors, and would also facilitate operational flexibility in
> certain situations. For example, if a server-side assignor has some poor
> behavior and needs a patch, deploying the patched assignor to the client
> and switching one consumer group to a client-side assignor may be faster
> and less risky than patching all of the brokers. With the currently
> proposed distinct APIs, a non-trivial reimplementation would have to be
> assembled, and if the two APIs have diverged significantly, then it is
> possible that a reimplementation would not be possible.

That's a good question. I went with two different interfaces because
the server side assignor does not have a notion of custom metadata and
is thus simpler. I thought that it is misleading to reuse the
client-side interface in this case due to this.

Best,
David

On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris  wrote:
>
> Hey All!
>
> Thanks for the KIP, it's wonderful to see cooperative rebalancing making it
> down the stack!
>
> I had a few questions:
>
> 1. The 'Rejected Alternatives' section describes how member epoch should
> advance in step with the group epoch and assignment epoch values. I think
> that this is a good idea for the reasons described in the KIP. When the
> protocol is incrementally assigning partitions to a worker, what member
> epoch does each incremental assignment use? Are member epochs re-used, and
> a single member epoch can correspond to multiple different (monotonically
> larger) assignments?
>
> 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> not, should custom client-side assignor implementations interact with the
> Reason field, and how is its common meaning agreed upon? If so, what is the
> benefit of a distinct 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-09 Thread David Jacot
Hi Luke,

Thanks for your comments. Please find my answers below.

> 1. Will the member reconciling jump to the latest target assignment? For
> example, member epoch are A:1, B:2, but now the group is in target
> assignment epoch 4. Will the members jump to the target assignment result
> in epoch 4 directly, or they should first sync to epoch 2, then 3, and 4? I
> tried to find out if there's any problem if we jump to 4, but I can't. So,
> I think we'll go to epoch 4 directly, right?

That's right. The reconciliation always uses the latest target
assignment so members might skip epochs. The only constraint to jump
to a new epoch is that the member should have revoked the required
partitions. In your example, if A and B do not have to revoke any
partitions, they would be moved to epoch 4 immediately by the
coordinator. Otherwise, the coordinator will ask them to revoke
partitions first and then move them to epoch 4 when the revocation is
acknowledged.

> 2. For the ConsumerGroupPrepareAssignment and
> ConsumerGroupInstallAssignment API, when member epoch doesn't match, it'll
> retry (and hope) in next heartbeat response, the member epoch will be
> bumped. But in ConsumerGroupHeartbeat, Upon receiving the
> FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and
> rejoins with the same member id and the epoch 0, unless the partitions
> owned by the members are a subset of the target partitions. I'm wondering
> why can't we return the latest member epoch to the member in
> ConsumerGroupHeartbeat response no matter if the owned partitions are
> subset of target partitions or not. After all, the member is still one of
> the group member, the assignment for the member is still valid, right? This
> way, the ConsumerGroupPrepareAssignment and ConsumerGroupInstallAssignment
> API can piggyback it to retry in next heartbeat response. WDYT?

As explained by Guozhang, the error semantic is different based on the
API. For the *PrepareAssignment and the *InstallAssignment, it means
that the member epoch is stale and that the client should retry when
it gets a newer epoch whereas for the ConsumerGroupHeartbeat, it means
that the client has been fenced by the coordinator and thus should
abandon all its partitions and rejoin. I do agree with Guozhang that
it would be better to use different errors in order to be clear. It is
misleading otherwise.

Regarding your second point, I think that we can make the API
idempotent here. There are three cases: 1) If a client sends a
ConsumerGroupHeartbeat request with an older member epoch but with
partitions which are a subset of the expected target partitions, we
can move it to the current member epoch. 2) If the member epoch is
smaller and the partitions differ, it is very likely that the member
owns partitions which are owned by other members now so it seems to me
that we should treat it as a fatal error in this case. We could be a
bit smarter and check whether the partitions are really owned by other
members and if they are not we could proceed. The issue in this case
is that the member won't be able to revoke the partitions and to
commit the offsets with a stale member epoch before the coordinator
moves it to the target epoch. 3) If the member epoch is larger, we
should fence the member because this should never happen.

> 3. When receiving COMPUTE_ASSIGNMENT error in ConsumerGroupHeartbeat API,
> the consumer starts the assignment process. But what if somehow the member
> didn't send out ConsumerGroupPrepareAssignment request, what would we do? I
> think we'll wait for rebalance timeout and kick the member out of the
> group, right? And then, what will the group coordinator do? I think it'll
> select another member to be the leader and return COMPUTE_ASSIGNMENT error
> in that member's heartbeat response, right? Maybe we should add that into
> KIP.

When the member is selected to perform the assignment, the timer
starts ticking so the coordinator gives the rebalance timeout to the
member to perform it. If it does not complete the full process by that
time, the coordinator will pick another member to run the assignment.
I believe that we should not fence the member based on this though. I
was thinking that fencing would be only done via the
ConsumerGroupHeartbeat API and the session timeout. I will clarify
this in the KIP.

> Some typos:
> 4. In "Assignment Process" section:
> If the selected assignor does exist, the group coordinator will reject the
> heartbeat with an UNSUPPORTED_ASSIGNOR error.
> -> I think it should be "does not exist"
>
> 5. In "JoinGroup Handling" section:
> If the member has revoked all its partitions or the required partitions,
> the member can transition to its next epoch. The current assignment become
> the current assignment.
> -> I think it should be "The target assignment becomes the current
> assignment", right?
>
> 6. In "ConsumerGroupDescribe API" section:
> When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-09 Thread David Jacot
Hi Sagar,

Thanks for the feedback and the document. That's really helpful. I
will take a look at it.

Overall, it seems to me that both Connect and the Consumer could share
the same underlying "engine". The main difference is that the Consumer
assigns topic-partitions to members whereas Connect assigns tasks to
workers. I see two ways to move forward:
1) We extend the new proposed APIs to support different resource types
(e.g. partitions, tasks, etc.); or
2) We use new dedicated APIs for Connect. The dedicated APIs would be
similar to the new ones but different on the content/resources and
they would rely on the same engine on the coordinator side.

I personally lean towards 2) because I am not a fan of overcharging
APIs to serve different purposes. That being said, I am not opposed to
1) if we can find an elegant way to do it.

I think that we can continue to discuss it here for now in order to
ensure that this KIP is compatible with what we will do for Connect in
the future.

Best,
David

On Mon, Aug 8, 2022 at 2:41 PM David Jacot  wrote:
>
> Hi all,
>
> I am back from vacation. I will go through and address your comments
> in the coming days. Thanks for your feedback.
>
> Cheers,
> David
>
> On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris  wrote:
> >
> > Hey All!
> >
> > Thanks for the KIP, it's wonderful to see cooperative rebalancing making it
> > down the stack!
> >
> > I had a few questions:
> >
> > 1. The 'Rejected Alternatives' section describes how member epoch should
> > advance in step with the group epoch and assignment epoch values. I think
> > that this is a good idea for the reasons described in the KIP. When the
> > protocol is incrementally assigning partitions to a worker, what member
> > epoch does each incremental assignment use? Are member epochs re-used, and
> > a single member epoch can correspond to multiple different (monotonically
> > larger) assignments?
> >
> > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> > not, should custom client-side assignor implementations interact with the
> > Reason field, and how is its common meaning agreed upon? If so, what is the
> > benefit of a distinct Reason field over including such functionality in the
> > opaque metadata?
> >
> > 3. The following is included in the KIP: "Thanks to this, the input of the
> > client side assignor is entirely driven by the group coordinator. The
> > consumer is no longer responsible for maintaining any state besides its
> > assigned partitions." Does this mean that the client-side assignor MAY
> > incorporate additional non-Metadata state (such as partition throughput,
> > cpu/memory metrics, config topics, etc), or that additional non-Metadata
> > state SHOULD NOT be used?
> >
> > 4. I see that there are separate classes
> > for org.apache.kafka.server.group.consumer.PartitionAssignor
> > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> > overlap significantly. Is it possible for these two implementations to be
> > unified? This would serve to promote feature parity of server-side and
> > client-side assignors, and would also facilitate operational flexibility in
> > certain situations. For example, if a server-side assignor has some poor
> > behavior and needs a patch, deploying the patched assignor to the client
> > and switching one consumer group to a client-side assignor may be faster
> > and less risky than patching all of the brokers. With the currently
> > proposed distinct APIs, a non-trivial reimplementation would have to be
> > assembled, and if the two APIs have diverged significantly, then it is
> > possible that a reimplementation would not be possible.
> >
> > --
> > Greg Harris
> > gharris1...@gmail.com
> > github.com/gharris1727
> >
> > On Wed, Aug 3, 2022 at 8:39 AM Sagar  wrote:
> >
> > > Hi Guozhang/David,
> > >
> > > I created a confluence page to discuss how Connect would need to change
> > > based on the new rebalance protocol. Here's the page:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> > >
> > > It's also pretty longish and I have tried to keep a format similar to
> > > KIP-848. Let me know what you think. Also, do you think this should be
> > > moved to a separate discussion thread or is this one fine?
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Tue, Jul 26, 2022 at 7:37 AM Sagar  wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > Thank you so much for the doc on Kafka Streams. Sure, I would do the
> > > > analysis and come up with such a document.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang 
> > > wrote:
> > > >
> > > >> Hello Sagar,
> > > >>
> > > >> It would be great if you could come back with some analysis on how to
> > > >> implement the Connect side integration with the new protocol; so far
> > > >> besides leveraging on the new "protocol type" we did not yet think
> > > 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-08 Thread David Jacot
Hi all,

I am back from vacation. I will go through and address your comments
in the coming days. Thanks for your feedback.

Cheers,
David

On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris  wrote:
>
> Hey All!
>
> Thanks for the KIP, it's wonderful to see cooperative rebalancing making it
> down the stack!
>
> I had a few questions:
>
> 1. The 'Rejected Alternatives' section describes how member epoch should
> advance in step with the group epoch and assignment epoch values. I think
> that this is a good idea for the reasons described in the KIP. When the
> protocol is incrementally assigning partitions to a worker, what member
> epoch does each incremental assignment use? Are member epochs re-used, and
> a single member epoch can correspond to multiple different (monotonically
> larger) assignments?
>
> 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
> not, should custom client-side assignor implementations interact with the
> Reason field, and how is its common meaning agreed upon? If so, what is the
> benefit of a distinct Reason field over including such functionality in the
> opaque metadata?
>
> 3. The following is included in the KIP: "Thanks to this, the input of the
> client side assignor is entirely driven by the group coordinator. The
> consumer is no longer responsible for maintaining any state besides its
> assigned partitions." Does this mean that the client-side assignor MAY
> incorporate additional non-Metadata state (such as partition throughput,
> cpu/memory metrics, config topics, etc), or that additional non-Metadata
> state SHOULD NOT be used?
>
> 4. I see that there are separate classes
> for org.apache.kafka.server.group.consumer.PartitionAssignor
> and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
> overlap significantly. Is it possible for these two implementations to be
> unified? This would serve to promote feature parity of server-side and
> client-side assignors, and would also facilitate operational flexibility in
> certain situations. For example, if a server-side assignor has some poor
> behavior and needs a patch, deploying the patched assignor to the client
> and switching one consumer group to a client-side assignor may be faster
> and less risky than patching all of the brokers. With the currently
> proposed distinct APIs, a non-trivial reimplementation would have to be
> assembled, and if the two APIs have diverged significantly, then it is
> possible that a reimplementation would not be possible.
>
> --
> Greg Harris
> gharris1...@gmail.com
> github.com/gharris1727
>
> On Wed, Aug 3, 2022 at 8:39 AM Sagar  wrote:
>
> > Hi Guozhang/David,
> >
> > I created a confluence page to discuss how Connect would need to change
> > based on the new rebalance protocol. Here's the page:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> >
> > It's also pretty longish and I have tried to keep a format similar to
> > KIP-848. Let me know what you think. Also, do you think this should be
> > moved to a separate discussion thread or is this one fine?
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 26, 2022 at 7:37 AM Sagar  wrote:
> >
> > > Hello Guozhang,
> > >
> > > Thank you so much for the doc on Kafka Streams. Sure, I would do the
> > > analysis and come up with such a document.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang 
> > wrote:
> > >
> > >> Hello Sagar,
> > >>
> > >> It would be great if you could come back with some analysis on how to
> > >> implement the Connect side integration with the new protocol; so far
> > >> besides leveraging on the new "protocol type" we did not yet think
> > through
> > >> the Connect side implementations. For Streams here's a draft of
> > >> integration
> > >> plan:
> > >>
> > >>
> > https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn
> > >> just FYI for your analysis on Connect.
> > >>
> > >> On Tue, Jul 19, 2022 at 10:48 PM Sagar 
> > wrote:
> > >>
> > >> > Hi David,
> > >> >
> > >> > Thank you for your response. The reason I thought connect can also fit
> > >> into
> > >> > this new scheme is that even today the connect uses a
> > WorkerCoordinator
> > >> > extending from AbstractCoordinator to empower rebalances of
> > >> > tasks/connectors. The WorkerCoordinator sets the protocolType() to
> > >> connect
> > >> > and uses the metadata() method by plumbing into
> > >> JoinGroupRequestProtocol.
> > >> >
> > >> > I think the changes to support connect would be similar at a high
> > level
> > >> to
> > >> > the changes in streams mainly because of the Client side assignors
> > being
> > >> > used in both. At an implementation level, we might need to make a lot
> > of
> > >> > changes to get onto this new assignment protocol like enhancing the
> > >> > JoinGroup request/response and SyncGroup and using
> > >> 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-03 Thread Gregory Harris
Hey All!

Thanks for the KIP, it's wonderful to see cooperative rebalancing making it
down the stack!

I had a few questions:

1. The 'Rejected Alternatives' section describes how member epoch should
advance in step with the group epoch and assignment epoch values. I think
that this is a good idea for the reasons described in the KIP. When the
protocol is incrementally assigning partitions to a worker, what member
epoch does each incremental assignment use? Are member epochs re-used, and
a single member epoch can correspond to multiple different (monotonically
larger) assignments?

2. Is the Assignor's 'Reason' field opaque to the group coordinator? If
not, should custom client-side assignor implementations interact with the
Reason field, and how is its common meaning agreed upon? If so, what is the
benefit of a distinct Reason field over including such functionality in the
opaque metadata?

3. The following is included in the KIP: "Thanks to this, the input of the
client side assignor is entirely driven by the group coordinator. The
consumer is no longer responsible for maintaining any state besides its
assigned partitions." Does this mean that the client-side assignor MAY
incorporate additional non-Metadata state (such as partition throughput,
cpu/memory metrics, config topics, etc), or that additional non-Metadata
state SHOULD NOT be used?

4. I see that there are separate classes
for org.apache.kafka.server.group.consumer.PartitionAssignor
and org.apache.kafka.clients.consumer.PartitionAssignor that seem to
overlap significantly. Is it possible for these two implementations to be
unified? This would serve to promote feature parity of server-side and
client-side assignors, and would also facilitate operational flexibility in
certain situations. For example, if a server-side assignor has some poor
behavior and needs a patch, deploying the patched assignor to the client
and switching one consumer group to a client-side assignor may be faster
and less risky than patching all of the brokers. With the currently
proposed distinct APIs, a non-trivial reimplementation would have to be
assembled, and if the two APIs have diverged significantly, then it is
possible that a reimplementation would not be possible.

--
Greg Harris
gharris1...@gmail.com
github.com/gharris1727

On Wed, Aug 3, 2022 at 8:39 AM Sagar  wrote:

> Hi Guozhang/David,
>
> I created a confluence page to discuss how Connect would need to change
> based on the new rebalance protocol. Here's the page:
>
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
>
> It's also pretty longish and I have tried to keep a format similar to
> KIP-848. Let me know what you think. Also, do you think this should be
> moved to a separate discussion thread or is this one fine?
>
> Thanks!
> Sagar.
>
> On Tue, Jul 26, 2022 at 7:37 AM Sagar  wrote:
>
> > Hello Guozhang,
> >
> > Thank you so much for the doc on Kafka Streams. Sure, I would do the
> > analysis and come up with such a document.
> >
> > Thanks!
> > Sagar.
> >
> > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang 
> wrote:
> >
> >> Hello Sagar,
> >>
> >> It would be great if you could come back with some analysis on how to
> >> implement the Connect side integration with the new protocol; so far
> >> besides leveraging on the new "protocol type" we did not yet think
> through
> >> the Connect side implementations. For Streams here's a draft of
> >> integration
> >> plan:
> >>
> >>
> https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn
> >> just FYI for your analysis on Connect.
> >>
> >> On Tue, Jul 19, 2022 at 10:48 PM Sagar 
> wrote:
> >>
> >> > Hi David,
> >> >
> >> > Thank you for your response. The reason I thought connect can also fit
> >> into
> >> > this new scheme is that even today the connect uses a
> WorkerCoordinator
> >> > extending from AbstractCoordinator to empower rebalances of
> >> > tasks/connectors. The WorkerCoordinator sets the protocolType() to
> >> connect
> >> > and uses the metadata() method by plumbing into
> >> JoinGroupRequestProtocol.
> >> >
> >> > I think the changes to support connect would be similar at a high
> level
> >> to
> >> > the changes in streams mainly because of the Client side assignors
> being
> >> > used in both. At an implementation level, we might need to make a lot
> of
> >> > changes to get onto this new assignment protocol like enhancing the
> >> > JoinGroup request/response and SyncGroup and using
> >> ConsumerGroupHeartbeat
> >> > API etc again on similar lines to streams (or there might be
> >> deviations). I
> >> > would try to perform a detailed analysis of the same  and we can have
> a
> >> > separate discussion thread for that as that would derail this
> discussion
> >> > thread. Let me know if that sounds good to you.
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> >
> >> >
> >> > On Fri, Jul 15, 2022 at 5:47 PM David 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-08-03 Thread Sagar
Hi Guozhang/David,

I created a confluence page to discuss how Connect would need to change
based on the new rebalance protocol. Here's the page:
https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol

It's also pretty longish and I have tried to keep a format similar to
KIP-848. Let me know what you think. Also, do you think this should be
moved to a separate discussion thread or is this one fine?

Thanks!
Sagar.

On Tue, Jul 26, 2022 at 7:37 AM Sagar  wrote:

> Hello Guozhang,
>
> Thank you so much for the doc on Kafka Streams. Sure, I would do the
> analysis and come up with such a document.
>
> Thanks!
> Sagar.
>
> On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang  wrote:
>
>> Hello Sagar,
>>
>> It would be great if you could come back with some analysis on how to
>> implement the Connect side integration with the new protocol; so far
>> besides leveraging on the new "protocol type" we did not yet think through
>> the Connect side implementations. For Streams here's a draft of
>> integration
>> plan:
>>
>> https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn
>> just FYI for your analysis on Connect.
>>
>> On Tue, Jul 19, 2022 at 10:48 PM Sagar  wrote:
>>
>> > Hi David,
>> >
>> > Thank you for your response. The reason I thought connect can also fit
>> into
>> > this new scheme is that even today the connect uses a WorkerCoordinator
>> > extending from AbstractCoordinator to empower rebalances of
>> > tasks/connectors. The WorkerCoordinator sets the protocolType() to
>> connect
>> > and uses the metadata() method by plumbing into
>> JoinGroupRequestProtocol.
>> >
>> > I think the changes to support connect would be similar at a high level
>> to
>> > the changes in streams mainly because of the Client side assignors being
>> > used in both. At an implementation level, we might need to make a lot of
>> > changes to get onto this new assignment protocol like enhancing the
>> > JoinGroup request/response and SyncGroup and using
>> ConsumerGroupHeartbeat
>> > API etc again on similar lines to streams (or there might be
>> deviations). I
>> > would try to perform a detailed analysis of the same  and we can have a
>> > separate discussion thread for that as that would derail this discussion
>> > thread. Let me know if that sounds good to you.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> >
>> >
>> > On Fri, Jul 15, 2022 at 5:47 PM David Jacot > >
>> > wrote:
>> >
>> > > Hi Sagar,
>> > >
>> > > Thanks for your comments.
>> > >
>> > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it.
>> > >
>> > > 2) The idea is to transition C from his current assignment to his
>> > > target assignment when he can move to epoch 3. When that happens, the
>> > > member assignment is updated and persisted with all its assigned
>> > > partitions even if they are not all revoked yet. In other words, the
>> > > member assignment becomes the target assignment. This is basically an
>> > > optimization to avoid having to write all the changes to the log. The
>> > > examples are based on the persisted state so I understand the
>> > > confusion. Let me see if I can improve this in the description.
>> > >
>> > > 3) Regarding Connect, it could reuse the protocol with a client side
>> > > assignor if it fits in the protocol. The assignment is about
>> > > topicid-partitions + metadata, could Connect fit into this?
>> > >
>> > > Best,
>> > > David
>> > >
>> > > On Fri, Jul 15, 2022 at 1:55 PM Sagar 
>> wrote:
>> > > >
>> > > > Hi David,
>> > > >
>> > > > Thanks for the KIP. I just had minor observations:
>> > > >
>> > > > 1) In the Assignment Error section in Client Side mode Assignment
>> > > process,
>> > > > you mentioned => `In this case, the client side assignor can return
>> an
>> > > > error to the group coordinator`. In this case are you referring to
>> the
>> > > > Assignor returning an AssignmentError that's listed down towards the
>> > end?
>> > > > If yes, do you think it would make sense to mention this explicitly
>> > here?
>> > > >
>> > > > 2) In the Case Studies section, I have a slight confusion, not sure
>> if
>> > > > others have the same. Consider this step:
>> > > >
>> > > > When B heartbeats, the group coordinator transitions him to epoch 3
>> > > because
>> > > > B has no partitions to revoke. It persists the change and reply.
>> > > >
>> > > >- Group (epoch=3)
>> > > >   - A
>> > > >   - B
>> > > >   - C
>> > > >- Target Assignment (epoch=3)
>> > > >   - A - partitions=[foo-0]
>> > > >   - B - partitions=[foo-2]
>> > > >   - C - partitions=[foo-1]
>> > > >- Member Assignment
>> > > >   - A - epoch=2, partitions=[foo-0, foo-1]
>> > > >   - B - epoch=3, partitions=[foo-2]
>> > > >   - C - epoch=3, partitions=[foo-1]
>> > > >
>> > > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1
>> yet.
>> > > >
>> > > > Here,it's mentioned 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-25 Thread Sagar
Hello Guozhang,

Thank you so much for the doc on Kafka Streams. Sure, I would do the
analysis and come up with such a document.

Thanks!
Sagar.

On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang  wrote:

> Hello Sagar,
>
> It would be great if you could come back with some analysis on how to
> implement the Connect side integration with the new protocol; so far
> besides leveraging on the new "protocol type" we did not yet think through
> the Connect side implementations. For Streams here's a draft of integration
> plan:
>
> https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn
> just FYI for your analysis on Connect.
>
> On Tue, Jul 19, 2022 at 10:48 PM Sagar  wrote:
>
> > Hi David,
> >
> > Thank you for your response. The reason I thought connect can also fit
> into
> > this new scheme is that even today the connect uses a WorkerCoordinator
> > extending from AbstractCoordinator to empower rebalances of
> > tasks/connectors. The WorkerCoordinator sets the protocolType() to
> connect
> > and uses the metadata() method by plumbing into JoinGroupRequestProtocol.
> >
> > I think the changes to support connect would be similar at a high level
> to
> > the changes in streams mainly because of the Client side assignors being
> > used in both. At an implementation level, we might need to make a lot of
> > changes to get onto this new assignment protocol like enhancing the
> > JoinGroup request/response and SyncGroup and using ConsumerGroupHeartbeat
> > API etc again on similar lines to streams (or there might be
> deviations). I
> > would try to perform a detailed analysis of the same  and we can have a
> > separate discussion thread for that as that would derail this discussion
> > thread. Let me know if that sounds good to you.
> >
> > Thanks!
> > Sagar.
> >
> >
> >
> > On Fri, Jul 15, 2022 at 5:47 PM David Jacot  >
> > wrote:
> >
> > > Hi Sagar,
> > >
> > > Thanks for your comments.
> > >
> > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it.
> > >
> > > 2) The idea is to transition C from his current assignment to his
> > > target assignment when he can move to epoch 3. When that happens, the
> > > member assignment is updated and persisted with all its assigned
> > > partitions even if they are not all revoked yet. In other words, the
> > > member assignment becomes the target assignment. This is basically an
> > > optimization to avoid having to write all the changes to the log. The
> > > examples are based on the persisted state so I understand the
> > > confusion. Let me see if I can improve this in the description.
> > >
> > > 3) Regarding Connect, it could reuse the protocol with a client side
> > > assignor if it fits in the protocol. The assignment is about
> > > topicid-partitions + metadata, could Connect fit into this?
> > >
> > > Best,
> > > David
> > >
> > > On Fri, Jul 15, 2022 at 1:55 PM Sagar 
> wrote:
> > > >
> > > > Hi David,
> > > >
> > > > Thanks for the KIP. I just had minor observations:
> > > >
> > > > 1) In the Assignment Error section in Client Side mode Assignment
> > > process,
> > > > you mentioned => `In this case, the client side assignor can return
> an
> > > > error to the group coordinator`. In this case are you referring to
> the
> > > > Assignor returning an AssignmentError that's listed down towards the
> > end?
> > > > If yes, do you think it would make sense to mention this explicitly
> > here?
> > > >
> > > > 2) In the Case Studies section, I have a slight confusion, not sure
> if
> > > > others have the same. Consider this step:
> > > >
> > > > When B heartbeats, the group coordinator transitions him to epoch 3
> > > because
> > > > B has no partitions to revoke. It persists the change and reply.
> > > >
> > > >- Group (epoch=3)
> > > >   - A
> > > >   - B
> > > >   - C
> > > >- Target Assignment (epoch=3)
> > > >   - A - partitions=[foo-0]
> > > >   - B - partitions=[foo-2]
> > > >   - C - partitions=[foo-1]
> > > >- Member Assignment
> > > >   - A - epoch=2, partitions=[foo-0, foo-1]
> > > >   - B - epoch=3, partitions=[foo-2]
> > > >   - C - epoch=3, partitions=[foo-1]
> > > >
> > > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1
> yet.
> > > >
> > > > Here,it's mentioned that member C can't get the foo-1 partition yet,
> > but
> > > > based on the description above, it seems it already has it. Do you
> > think
> > > it
> > > > would be better to remove it and populate it only when it actually
> gets
> > > it?
> > > > I see this in a lot of other places, so have I understood it
> > incorrectly
> > > ?
> > > >
> > > >
> > > > Regarding connect , it might be out of scope of this discussion, but
> > from
> > > > what I understood it would probably be running in client side
> assignor
> > > mode
> > > > even on the new rebalance protocol as it has its own Custom
> > > Assignors(Eager
> > > > and IncrementalCooperative).
> > > >
> > > 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-25 Thread Guozhang Wang
Hello Luke,

Thanks for the detailed feedback! I tried to incorporate / answer some of
them inline below.

On Thu, Jul 21, 2022 at 6:39 AM Luke Chen  wrote:

> Hi David,
>
> So excited to see the consumer protocol will be renewed!
> Thanks for David, Guozhang, and Jason for creating this great proposal!
>
> Some comments about the protocol:
> 1. Will the member reconciling jump to the latest target assignment? For
> example, member epoch are A:1, B:2, but now the group is in target
> assignment epoch 4. Will the members jump to the target assignment result
> in epoch 4 directly, or they should first sync to epoch 2, then 3, and 4? I
> tried to find out if there's any problem if we jump to 4, but I can't. So,
> I think we'll go to epoch 4 directly, right?
>
> The short answer is "yes" :) But the a bit longer answer is that, such
reconciliation logic that may skip some epochs are controlled by the
brokers, not the clients. The clients should be simply dumb and takes in
the indicated (delta) assignment as long as its epoch is no smaller than
its current one. The brokers, having full knowledge of where everyone is at
the moment, and the latest target assignment, would decide which delta
assignment to send, step by step, and which assignment could be skipped.


> 2. For the ConsumerGroupPrepareAssignment and
> ConsumerGroupInstallAssignment API, when member epoch doesn't match, it'll
> retry (and hope) in next heartbeat response, the member epoch will be
> bumped. But in ConsumerGroupHeartbeat, Upon receiving the
> FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and
> rejoins with the same member id and the epoch 0, unless the partitions
> owned by the members are a subset of the target partitions. I'm wondering
> why can't we return the latest member epoch to the member in
> ConsumerGroupHeartbeat response no matter if the owned partitions are
> subset of target partitions or not. After all, the member is still one of
> the group member, the assignment for the member is still valid, right? This
> way, the ConsumerGroupPrepareAssignment and ConsumerGroupInstallAssignment
> API can piggyback it to retry in next heartbeat response. WDYT?
>
> The semantics of these two classes of requests are different: for
ConsumerGroupPrepareAssignment / ConsumerGroupInstallAssignment it's not
really indicating the consumer is actually "fenced", but only to indicate
"your information is probably stale, so just stay put and wait for the
renewed info". Note that these two requests are only for clients to try
collecting information or making a change to the group assignment, so
even fai that request does not mean the member is no longer part of the
group. BTW that reminds me for these two requests maybe we should use a
different error code than FENCED_MEMBER_EPOCH since it's a bit confusing,
maybe "STALE_MEMBER_EPOCH"?

For ConsumerGroupHeartbeat, the fencing purpose is only to make sure that
any partitions that are assigned to others should not be retained at the
"fenced member". So I think we can make the assignment response in
ConsumerGroupHeartbeat idempotent, and only return the fatal error code if
the consumer's "owned partitions" has any overlaps with other members'
current assignment, and otherwise we just proceed with the new assignment.



> 3. When receiving COMPUTE_ASSIGNMENT error in ConsumerGroupHeartbeat API,
> the consumer starts the assignment process. But what if somehow the member
> didn't send out ConsumerGroupPrepareAssignment request, what would we do? I
> think we'll wait for rebalance timeout and kick the member out of the
> group, right? And then, what will the group coordinator do? I think it'll
> select another member to be the leader and return COMPUTE_ASSIGNMENT error
> in that member's heartbeat response, right? Maybe we should add that into
> KIP.
>
The broker has a timer for this process, and if it did not receive the
assignment (i.e. it is until the ConsumerGroupInstallAssignment is
received; note that for some assignor they may not need to always refresh
their information via ConsumerGroupPrepareAssignment upon computing the new
assignment) in time, then the broker would pick another member and ask it
to COMPUTE_ASSIGNMENT.

But, we may not try to kick the member out of the group based on that
timer, but only based on the heartbeat timer. I.e. let's say there's one
member who keeps heartbeating but never want to compute the new assignment
even when being asked, it would still be remained in the group.

We can clarify this a bit more in the doc.

>
> Some typos:
> 4. In "Assignment Process" section:
> If the selected assignor does exist, the group coordinator will reject the
> heartbeat with an UNSUPPORTED_ASSIGNOR error.
> -> I think it should be "does not exist"
>
> Ack!


> 5. In "JoinGroup Handling" section:
> If the member has revoked all its partitions or the required partitions,
> the member can transition to its next epoch. The current assignment become
> the current 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-25 Thread Guozhang Wang
Hello Sagar,

It would be great if you could come back with some analysis on how to
implement the Connect side integration with the new protocol; so far
besides leveraging on the new "protocol type" we did not yet think through
the Connect side implementations. For Streams here's a draft of integration
plan:
https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn
just FYI for your analysis on Connect.

On Tue, Jul 19, 2022 at 10:48 PM Sagar  wrote:

> Hi David,
>
> Thank you for your response. The reason I thought connect can also fit into
> this new scheme is that even today the connect uses a WorkerCoordinator
> extending from AbstractCoordinator to empower rebalances of
> tasks/connectors. The WorkerCoordinator sets the protocolType() to connect
> and uses the metadata() method by plumbing into JoinGroupRequestProtocol.
>
> I think the changes to support connect would be similar at a high level to
> the changes in streams mainly because of the Client side assignors being
> used in both. At an implementation level, we might need to make a lot of
> changes to get onto this new assignment protocol like enhancing the
> JoinGroup request/response and SyncGroup and using ConsumerGroupHeartbeat
> API etc again on similar lines to streams (or there might be deviations). I
> would try to perform a detailed analysis of the same  and we can have a
> separate discussion thread for that as that would derail this discussion
> thread. Let me know if that sounds good to you.
>
> Thanks!
> Sagar.
>
>
>
> On Fri, Jul 15, 2022 at 5:47 PM David Jacot 
> wrote:
>
> > Hi Sagar,
> >
> > Thanks for your comments.
> >
> > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it.
> >
> > 2) The idea is to transition C from his current assignment to his
> > target assignment when he can move to epoch 3. When that happens, the
> > member assignment is updated and persisted with all its assigned
> > partitions even if they are not all revoked yet. In other words, the
> > member assignment becomes the target assignment. This is basically an
> > optimization to avoid having to write all the changes to the log. The
> > examples are based on the persisted state so I understand the
> > confusion. Let me see if I can improve this in the description.
> >
> > 3) Regarding Connect, it could reuse the protocol with a client side
> > assignor if it fits in the protocol. The assignment is about
> > topicid-partitions + metadata, could Connect fit into this?
> >
> > Best,
> > David
> >
> > On Fri, Jul 15, 2022 at 1:55 PM Sagar  wrote:
> > >
> > > Hi David,
> > >
> > > Thanks for the KIP. I just had minor observations:
> > >
> > > 1) In the Assignment Error section in Client Side mode Assignment
> > process,
> > > you mentioned => `In this case, the client side assignor can return an
> > > error to the group coordinator`. In this case are you referring to the
> > > Assignor returning an AssignmentError that's listed down towards the
> end?
> > > If yes, do you think it would make sense to mention this explicitly
> here?
> > >
> > > 2) In the Case Studies section, I have a slight confusion, not sure if
> > > others have the same. Consider this step:
> > >
> > > When B heartbeats, the group coordinator transitions him to epoch 3
> > because
> > > B has no partitions to revoke. It persists the change and reply.
> > >
> > >- Group (epoch=3)
> > >   - A
> > >   - B
> > >   - C
> > >- Target Assignment (epoch=3)
> > >   - A - partitions=[foo-0]
> > >   - B - partitions=[foo-2]
> > >   - C - partitions=[foo-1]
> > >- Member Assignment
> > >   - A - epoch=2, partitions=[foo-0, foo-1]
> > >   - B - epoch=3, partitions=[foo-2]
> > >   - C - epoch=3, partitions=[foo-1]
> > >
> > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet.
> > >
> > > Here,it's mentioned that member C can't get the foo-1 partition yet,
> but
> > > based on the description above, it seems it already has it. Do you
> think
> > it
> > > would be better to remove it and populate it only when it actually gets
> > it?
> > > I see this in a lot of other places, so have I understood it
> incorrectly
> > ?
> > >
> > >
> > > Regarding connect , it might be out of scope of this discussion, but
> from
> > > what I understood it would probably be running in client side assignor
> > mode
> > > even on the new rebalance protocol as it has its own Custom
> > Assignors(Eager
> > > and IncrementalCooperative).
> > >
> > > Thanks!
> > >
> > > Sagar.
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jul 15, 2022 at 5:00 PM David Jacot
>  > >
> > > wrote:
> > >
> > > > Thanks Hector! Our goal is to move forward with specialized API
> > > > instead of relying on one generic API. For Connect, we can apply the
> > > > exact same pattern and reuse/share the core implementation on the
> > > > server side. For the schema registry, I think that we should consider
> > > > having a 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-21 Thread Luke Chen
Hi David,

So excited to see the consumer protocol will be renewed!
Thanks for David, Guozhang, and Jason for creating this great proposal!

Some comments about the protocol:
1. Will the member reconciling jump to the latest target assignment? For
example, member epoch are A:1, B:2, but now the group is in target
assignment epoch 4. Will the members jump to the target assignment result
in epoch 4 directly, or they should first sync to epoch 2, then 3, and 4? I
tried to find out if there's any problem if we jump to 4, but I can't. So,
I think we'll go to epoch 4 directly, right?

2. For the ConsumerGroupPrepareAssignment and
ConsumerGroupInstallAssignment API, when member epoch doesn't match, it'll
retry (and hope) in next heartbeat response, the member epoch will be
bumped. But in ConsumerGroupHeartbeat, Upon receiving the
FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and
rejoins with the same member id and the epoch 0, unless the partitions
owned by the members are a subset of the target partitions. I'm wondering
why can't we return the latest member epoch to the member in
ConsumerGroupHeartbeat response no matter if the owned partitions are
subset of target partitions or not. After all, the member is still one of
the group member, the assignment for the member is still valid, right? This
way, the ConsumerGroupPrepareAssignment and ConsumerGroupInstallAssignment
API can piggyback it to retry in next heartbeat response. WDYT?

3. When receiving COMPUTE_ASSIGNMENT error in ConsumerGroupHeartbeat API,
the consumer starts the assignment process. But what if somehow the member
didn't send out ConsumerGroupPrepareAssignment request, what would we do? I
think we'll wait for rebalance timeout and kick the member out of the
group, right? And then, what will the group coordinator do? I think it'll
select another member to be the leader and return COMPUTE_ASSIGNMENT error
in that member's heartbeat response, right? Maybe we should add that into
KIP.

Some typos:
4. In "Assignment Process" section:
If the selected assignor does exist, the group coordinator will reject the
heartbeat with an UNSUPPORTED_ASSIGNOR error.
-> I think it should be "does not exist"

5. In "JoinGroup Handling" section:
If the member has revoked all its partitions or the required partitions,
the member can transition to its next epoch. The current assignment become
the current assignment.
-> I think it should be "The target assignment becomes the current
assignment", right?

6. In "ConsumerGroupDescribe API" section:
When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest
request:
-> It should be "handle a ConsumerGroupDescribe request"

Again, thanks for the great proposal!
It has considered multiple edge cases.
Thank you.

Luke

On Wed, Jul 20, 2022 at 1:48 PM Sagar  wrote:

> Hi David,
>
> Thank you for your response. The reason I thought connect can also fit into
> this new scheme is that even today the connect uses a WorkerCoordinator
> extending from AbstractCoordinator to empower rebalances of
> tasks/connectors. The WorkerCoordinator sets the protocolType() to connect
> and uses the metadata() method by plumbing into JoinGroupRequestProtocol.
>
> I think the changes to support connect would be similar at a high level to
> the changes in streams mainly because of the Client side assignors being
> used in both. At an implementation level, we might need to make a lot of
> changes to get onto this new assignment protocol like enhancing the
> JoinGroup request/response and SyncGroup and using ConsumerGroupHeartbeat
> API etc again on similar lines to streams (or there might be deviations). I
> would try to perform a detailed analysis of the same  and we can have a
> separate discussion thread for that as that would derail this discussion
> thread. Let me know if that sounds good to you.
>
> Thanks!
> Sagar.
>
>
>
> On Fri, Jul 15, 2022 at 5:47 PM David Jacot 
> wrote:
>
> > Hi Sagar,
> >
> > Thanks for your comments.
> >
> > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it.
> >
> > 2) The idea is to transition C from his current assignment to his
> > target assignment when he can move to epoch 3. When that happens, the
> > member assignment is updated and persisted with all its assigned
> > partitions even if they are not all revoked yet. In other words, the
> > member assignment becomes the target assignment. This is basically an
> > optimization to avoid having to write all the changes to the log. The
> > examples are based on the persisted state so I understand the
> > confusion. Let me see if I can improve this in the description.
> >
> > 3) Regarding Connect, it could reuse the protocol with a client side
> > assignor if it fits in the protocol. The assignment is about
> > topicid-partitions + metadata, could Connect fit into this?
> >
> > Best,
> > David
> >
> > On Fri, Jul 15, 2022 at 1:55 PM Sagar  wrote:
> > >
> > > Hi David,
> > >
> > > Thanks for the KIP. I just had minor 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-19 Thread Sagar
Hi David,

Thank you for your response. The reason I thought connect can also fit into
this new scheme is that even today the connect uses a WorkerCoordinator
extending from AbstractCoordinator to empower rebalances of
tasks/connectors. The WorkerCoordinator sets the protocolType() to connect
and uses the metadata() method by plumbing into JoinGroupRequestProtocol.

I think the changes to support connect would be similar at a high level to
the changes in streams mainly because of the Client side assignors being
used in both. At an implementation level, we might need to make a lot of
changes to get onto this new assignment protocol like enhancing the
JoinGroup request/response and SyncGroup and using ConsumerGroupHeartbeat
API etc again on similar lines to streams (or there might be deviations). I
would try to perform a detailed analysis of the same  and we can have a
separate discussion thread for that as that would derail this discussion
thread. Let me know if that sounds good to you.

Thanks!
Sagar.



On Fri, Jul 15, 2022 at 5:47 PM David Jacot 
wrote:

> Hi Sagar,
>
> Thanks for your comments.
>
> 1) Yes. That refers to `Assignment#error`. Sure, I can mention it.
>
> 2) The idea is to transition C from his current assignment to his
> target assignment when he can move to epoch 3. When that happens, the
> member assignment is updated and persisted with all its assigned
> partitions even if they are not all revoked yet. In other words, the
> member assignment becomes the target assignment. This is basically an
> optimization to avoid having to write all the changes to the log. The
> examples are based on the persisted state so I understand the
> confusion. Let me see if I can improve this in the description.
>
> 3) Regarding Connect, it could reuse the protocol with a client side
> assignor if it fits in the protocol. The assignment is about
> topicid-partitions + metadata, could Connect fit into this?
>
> Best,
> David
>
> On Fri, Jul 15, 2022 at 1:55 PM Sagar  wrote:
> >
> > Hi David,
> >
> > Thanks for the KIP. I just had minor observations:
> >
> > 1) In the Assignment Error section in Client Side mode Assignment
> process,
> > you mentioned => `In this case, the client side assignor can return an
> > error to the group coordinator`. In this case are you referring to the
> > Assignor returning an AssignmentError that's listed down towards the end?
> > If yes, do you think it would make sense to mention this explicitly here?
> >
> > 2) In the Case Studies section, I have a slight confusion, not sure if
> > others have the same. Consider this step:
> >
> > When B heartbeats, the group coordinator transitions him to epoch 3
> because
> > B has no partitions to revoke. It persists the change and reply.
> >
> >- Group (epoch=3)
> >   - A
> >   - B
> >   - C
> >- Target Assignment (epoch=3)
> >   - A - partitions=[foo-0]
> >   - B - partitions=[foo-2]
> >   - C - partitions=[foo-1]
> >- Member Assignment
> >   - A - epoch=2, partitions=[foo-0, foo-1]
> >   - B - epoch=3, partitions=[foo-2]
> >   - C - epoch=3, partitions=[foo-1]
> >
> > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet.
> >
> > Here,it's mentioned that member C can't get the foo-1 partition yet, but
> > based on the description above, it seems it already has it. Do you think
> it
> > would be better to remove it and populate it only when it actually gets
> it?
> > I see this in a lot of other places, so have I understood it incorrectly
> ?
> >
> >
> > Regarding connect , it might be out of scope of this discussion, but from
> > what I understood it would probably be running in client side assignor
> mode
> > even on the new rebalance protocol as it has its own Custom
> Assignors(Eager
> > and IncrementalCooperative).
> >
> > Thanks!
> >
> > Sagar.
> >
> >
> >
> >
> >
> >
> > On Fri, Jul 15, 2022 at 5:00 PM David Jacot  >
> > wrote:
> >
> > > Thanks Hector! Our goal is to move forward with specialized API
> > > instead of relying on one generic API. For Connect, we can apply the
> > > exact same pattern and reuse/share the core implementation on the
> > > server side. For the schema registry, I think that we should consider
> > > having a tailored API to do simple membership/leader election.
> > >
> > > Best,
> > > David
> > >
> > > On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma 
> wrote:
> > > >
> > > > Three quick comments:
> > > >
> > > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we
> > > should
> > > > document the differences in more detail before deciding one way or
> > > another.
> > > > That said, if people pass java.util.regex.Pattern, they expect their
> > > > semantics to be honored. If we are doing something different, then we
> > > > should consider adding an overload with our own Pattern class (I
> don't
> > > > think we'd want to expose re2j's at this point).
> > > > 2. Regarding topic ids, any major new protocol should integrate fully
> 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-15 Thread David Jacot
I'll be away from July 18th to August 8th with limited access to my
emails so I will address new comments and questions when I come back.

Cheers,
David

On Fri, Jul 15, 2022 at 2:16 PM David Jacot  wrote:
>
> Hi Sagar,
>
> Thanks for your comments.
>
> 1) Yes. That refers to `Assignment#error`. Sure, I can mention it.
>
> 2) The idea is to transition C from his current assignment to his
> target assignment when he can move to epoch 3. When that happens, the
> member assignment is updated and persisted with all its assigned
> partitions even if they are not all revoked yet. In other words, the
> member assignment becomes the target assignment. This is basically an
> optimization to avoid having to write all the changes to the log. The
> examples are based on the persisted state so I understand the
> confusion. Let me see if I can improve this in the description.
>
> 3) Regarding Connect, it could reuse the protocol with a client side
> assignor if it fits in the protocol. The assignment is about
> topicid-partitions + metadata, could Connect fit into this?
>
> Best,
> David
>
> On Fri, Jul 15, 2022 at 1:55 PM Sagar  wrote:
> >
> > Hi David,
> >
> > Thanks for the KIP. I just had minor observations:
> >
> > 1) In the Assignment Error section in Client Side mode Assignment process,
> > you mentioned => `In this case, the client side assignor can return an
> > error to the group coordinator`. In this case are you referring to the
> > Assignor returning an AssignmentError that's listed down towards the end?
> > If yes, do you think it would make sense to mention this explicitly here?
> >
> > 2) In the Case Studies section, I have a slight confusion, not sure if
> > others have the same. Consider this step:
> >
> > When B heartbeats, the group coordinator transitions him to epoch 3 because
> > B has no partitions to revoke. It persists the change and reply.
> >
> >- Group (epoch=3)
> >   - A
> >   - B
> >   - C
> >- Target Assignment (epoch=3)
> >   - A - partitions=[foo-0]
> >   - B - partitions=[foo-2]
> >   - C - partitions=[foo-1]
> >- Member Assignment
> >   - A - epoch=2, partitions=[foo-0, foo-1]
> >   - B - epoch=3, partitions=[foo-2]
> >   - C - epoch=3, partitions=[foo-1]
> >
> > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet.
> >
> > Here,it's mentioned that member C can't get the foo-1 partition yet, but
> > based on the description above, it seems it already has it. Do you think it
> > would be better to remove it and populate it only when it actually gets it?
> > I see this in a lot of other places, so have I understood it incorrectly ?
> >
> >
> > Regarding connect , it might be out of scope of this discussion, but from
> > what I understood it would probably be running in client side assignor mode
> > even on the new rebalance protocol as it has its own Custom Assignors(Eager
> > and IncrementalCooperative).
> >
> > Thanks!
> >
> > Sagar.
> >
> >
> >
> >
> >
> >
> > On Fri, Jul 15, 2022 at 5:00 PM David Jacot 
> > wrote:
> >
> > > Thanks Hector! Our goal is to move forward with specialized API
> > > instead of relying on one generic API. For Connect, we can apply the
> > > exact same pattern and reuse/share the core implementation on the
> > > server side. For the schema registry, I think that we should consider
> > > having a tailored API to do simple membership/leader election.
> > >
> > > Best,
> > > David
> > >
> > > On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma  wrote:
> > > >
> > > > Three quick comments:
> > > >
> > > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we
> > > should
> > > > document the differences in more detail before deciding one way or
> > > another.
> > > > That said, if people pass java.util.regex.Pattern, they expect their
> > > > semantics to be honored. If we are doing something different, then we
> > > > should consider adding an overload with our own Pattern class (I don't
> > > > think we'd want to expose re2j's at this point).
> > > > 2. Regarding topic ids, any major new protocol should integrate fully
> > > with
> > > > it and should handle the topic recreation case correctly. That's the 
> > > > main
> > > > part we need to handle. I agree with David that we'd want to add topic
> > > ids
> > > > to the relevant protocols that don't have it yet and that we can 
> > > > probably
> > > > focus on the internals versus adding new APIs to the Java Consumer
> > > (unless
> > > > we find that adding new APIs is required for reasonable semantics).
> > > > 3. I am still not sure about the coordinator storing the configs. It's
> > > > powerful for configs to be centralized in the metadata log for various
> > > > reasons (auditability, visibility, consistency, etc.). Similarly, I am
> > > not
> > > > sure about automatically deleting configs in a way that they cannot be
> > > > recovered. A good property for modern systems is to minimize the number
> > > of
> > > > unrecoverable 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-15 Thread David Jacot
Hi Sagar,

Thanks for your comments.

1) Yes. That refers to `Assignment#error`. Sure, I can mention it.

2) The idea is to transition C from his current assignment to his
target assignment when he can move to epoch 3. When that happens, the
member assignment is updated and persisted with all its assigned
partitions even if they are not all revoked yet. In other words, the
member assignment becomes the target assignment. This is basically an
optimization to avoid having to write all the changes to the log. The
examples are based on the persisted state so I understand the
confusion. Let me see if I can improve this in the description.

3) Regarding Connect, it could reuse the protocol with a client side
assignor if it fits in the protocol. The assignment is about
topicid-partitions + metadata, could Connect fit into this?

Best,
David

On Fri, Jul 15, 2022 at 1:55 PM Sagar  wrote:
>
> Hi David,
>
> Thanks for the KIP. I just had minor observations:
>
> 1) In the Assignment Error section in Client Side mode Assignment process,
> you mentioned => `In this case, the client side assignor can return an
> error to the group coordinator`. In this case are you referring to the
> Assignor returning an AssignmentError that's listed down towards the end?
> If yes, do you think it would make sense to mention this explicitly here?
>
> 2) In the Case Studies section, I have a slight confusion, not sure if
> others have the same. Consider this step:
>
> When B heartbeats, the group coordinator transitions him to epoch 3 because
> B has no partitions to revoke. It persists the change and reply.
>
>- Group (epoch=3)
>   - A
>   - B
>   - C
>- Target Assignment (epoch=3)
>   - A - partitions=[foo-0]
>   - B - partitions=[foo-2]
>   - C - partitions=[foo-1]
>- Member Assignment
>   - A - epoch=2, partitions=[foo-0, foo-1]
>   - B - epoch=3, partitions=[foo-2]
>   - C - epoch=3, partitions=[foo-1]
>
> When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet.
>
> Here,it's mentioned that member C can't get the foo-1 partition yet, but
> based on the description above, it seems it already has it. Do you think it
> would be better to remove it and populate it only when it actually gets it?
> I see this in a lot of other places, so have I understood it incorrectly ?
>
>
> Regarding connect , it might be out of scope of this discussion, but from
> what I understood it would probably be running in client side assignor mode
> even on the new rebalance protocol as it has its own Custom Assignors(Eager
> and IncrementalCooperative).
>
> Thanks!
>
> Sagar.
>
>
>
>
>
>
> On Fri, Jul 15, 2022 at 5:00 PM David Jacot 
> wrote:
>
> > Thanks Hector! Our goal is to move forward with specialized API
> > instead of relying on one generic API. For Connect, we can apply the
> > exact same pattern and reuse/share the core implementation on the
> > server side. For the schema registry, I think that we should consider
> > having a tailored API to do simple membership/leader election.
> >
> > Best,
> > David
> >
> > On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma  wrote:
> > >
> > > Three quick comments:
> > >
> > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we
> > should
> > > document the differences in more detail before deciding one way or
> > another.
> > > That said, if people pass java.util.regex.Pattern, they expect their
> > > semantics to be honored. If we are doing something different, then we
> > > should consider adding an overload with our own Pattern class (I don't
> > > think we'd want to expose re2j's at this point).
> > > 2. Regarding topic ids, any major new protocol should integrate fully
> > with
> > > it and should handle the topic recreation case correctly. That's the main
> > > part we need to handle. I agree with David that we'd want to add topic
> > ids
> > > to the relevant protocols that don't have it yet and that we can probably
> > > focus on the internals versus adding new APIs to the Java Consumer
> > (unless
> > > we find that adding new APIs is required for reasonable semantics).
> > > 3. I am still not sure about the coordinator storing the configs. It's
> > > powerful for configs to be centralized in the metadata log for various
> > > reasons (auditability, visibility, consistency, etc.). Similarly, I am
> > not
> > > sure about automatically deleting configs in a way that they cannot be
> > > recovered. A good property for modern systems is to minimize the number
> > of
> > > unrecoverable data loss scenarios.
> > >
> > > Ismael
> > >
> > > On Wed, Jul 13, 2022 at 3:47 PM David Jacot  > >
> > > wrote:
> > >
> > > > Thanks Guozhang. My answers are below:
> > > >
> > > > > 1) the migration path, especially the last step when clients flip the
> > > > flag
> > > > > to enable the new protocol, in which we would have a window where
> > both
> > > > new
> > > > > protocols / rpcs and old protocols / rpcs are used by members of the
> > same
> 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-15 Thread David Jacot
Thanks, Ismael.

> 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we should
> document the differences in more detail before deciding one way or another.
> That said, if people pass java.util.regex.Pattern, they expect their
> semantics to be honored. If we are doing something different, then we
> should consider adding an overload with our own Pattern class (I don't
> think we'd want to expose re2j's at this point).

Yeah. I do agree with you. I looked a bit more into re2j and the
differences are subtle. Simple regexes will work with both but special
constructs or character classes won't work. I like the idea of having
our own Pattern. That would make things really clear and avoid
compatibility issues. I do agree that we should not expose re2j's
Pattern in our public API.

> 2. Regarding topic ids, any major new protocol should integrate fully with
> it and should handle the topic recreation case correctly. That's the main
> part we need to handle. I agree with David that we'd want to add topic ids
> to the relevant protocols that don't have it yet and that we can probably
> focus on the internals versus adding new APIs to the Java Consumer (unless
> we find that adding new APIs is required for reasonable semantics).

I wonder what we should do with the admin client though. We have a
couple of apis related to offsets there. Do we want to allow users to
fetch, get or delete offsets based on topic ids? On the server side,
we will have to store the offsets based on the topic id. When a topic
is recreated, we will likely have offsets stored with both the old
topic id and the new topic id. From an admin perspective, we can't
really differentiate them without letting the user use the topic id
and providing the topic ids to him as well. One way would be to
disambiguate them based on the group current assignment. I need to
think a little more about this.

> 3. I am still not sure about the coordinator storing the configs. It's
> powerful for configs to be centralized in the metadata log for various
> reasons (auditability, visibility, consistency, etc.). Similarly, I am not
> sure about automatically deleting configs in a way that they cannot be
> recovered. A good property for modern systems is to minimize the number of
> unrecoverable data loss scenarios.

I see your point. One concern that I have regarding storing them in
the controller is that they will never be deleted unless the user does
it. As users consider groups as throwing away resources, it will never
happen. If we draw a parallel with topics, we do delete their configs
when they are deleted. It is also worth pointing out that offsets are
deleted when the group is expired and that is even worse than deleting
configs in my opinion. One way would be to store configs in the
controller and to let the coordinator delete them when a group is
deleted. I suppose that the problem is the same because the configs
are also automatically deleted. Things would be different if groups
were a first class resource in the cluster.

Best,
David

On Fri, Jul 15, 2022 at 1:30 PM David Jacot  wrote:
>
> Thanks Hector! Our goal is to move forward with specialized API
> instead of relying on one generic API. For Connect, we can apply the
> exact same pattern and reuse/share the core implementation on the
> server side. For the schema registry, I think that we should consider
> having a tailored API to do simple membership/leader election.
>
> Best,
> David
>
> On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma  wrote:
> >
> > Three quick comments:
> >
> > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we should
> > document the differences in more detail before deciding one way or another.
> > That said, if people pass java.util.regex.Pattern, they expect their
> > semantics to be honored. If we are doing something different, then we
> > should consider adding an overload with our own Pattern class (I don't
> > think we'd want to expose re2j's at this point).
> > 2. Regarding topic ids, any major new protocol should integrate fully with
> > it and should handle the topic recreation case correctly. That's the main
> > part we need to handle. I agree with David that we'd want to add topic ids
> > to the relevant protocols that don't have it yet and that we can probably
> > focus on the internals versus adding new APIs to the Java Consumer (unless
> > we find that adding new APIs is required for reasonable semantics).
> > 3. I am still not sure about the coordinator storing the configs. It's
> > powerful for configs to be centralized in the metadata log for various
> > reasons (auditability, visibility, consistency, etc.). Similarly, I am not
> > sure about automatically deleting configs in a way that they cannot be
> > recovered. A good property for modern systems is to minimize the number of
> > unrecoverable data loss scenarios.
> >
> > Ismael
> >
> > On Wed, Jul 13, 2022 at 3:47 PM David Jacot 
> > wrote:
> >
> > > Thanks Guozhang. My answers 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-15 Thread Sagar
Hi David,

Thanks for the KIP. I just had minor observations:

1) In the Assignment Error section in Client Side mode Assignment process,
you mentioned => `In this case, the client side assignor can return an
error to the group coordinator`. In this case are you referring to the
Assignor returning an AssignmentError that's listed down towards the end?
If yes, do you think it would make sense to mention this explicitly here?

2) In the Case Studies section, I have a slight confusion, not sure if
others have the same. Consider this step:

When B heartbeats, the group coordinator transitions him to epoch 3 because
B has no partitions to revoke. It persists the change and reply.

   - Group (epoch=3)
  - A
  - B
  - C
   - Target Assignment (epoch=3)
  - A - partitions=[foo-0]
  - B - partitions=[foo-2]
  - C - partitions=[foo-1]
   - Member Assignment
  - A - epoch=2, partitions=[foo-0, foo-1]
  - B - epoch=3, partitions=[foo-2]
  - C - epoch=3, partitions=[foo-1]

When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet.

Here,it's mentioned that member C can't get the foo-1 partition yet, but
based on the description above, it seems it already has it. Do you think it
would be better to remove it and populate it only when it actually gets it?
I see this in a lot of other places, so have I understood it incorrectly ?


Regarding connect , it might be out of scope of this discussion, but from
what I understood it would probably be running in client side assignor mode
even on the new rebalance protocol as it has its own Custom Assignors(Eager
and IncrementalCooperative).

Thanks!

Sagar.






On Fri, Jul 15, 2022 at 5:00 PM David Jacot 
wrote:

> Thanks Hector! Our goal is to move forward with specialized API
> instead of relying on one generic API. For Connect, we can apply the
> exact same pattern and reuse/share the core implementation on the
> server side. For the schema registry, I think that we should consider
> having a tailored API to do simple membership/leader election.
>
> Best,
> David
>
> On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma  wrote:
> >
> > Three quick comments:
> >
> > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we
> should
> > document the differences in more detail before deciding one way or
> another.
> > That said, if people pass java.util.regex.Pattern, they expect their
> > semantics to be honored. If we are doing something different, then we
> > should consider adding an overload with our own Pattern class (I don't
> > think we'd want to expose re2j's at this point).
> > 2. Regarding topic ids, any major new protocol should integrate fully
> with
> > it and should handle the topic recreation case correctly. That's the main
> > part we need to handle. I agree with David that we'd want to add topic
> ids
> > to the relevant protocols that don't have it yet and that we can probably
> > focus on the internals versus adding new APIs to the Java Consumer
> (unless
> > we find that adding new APIs is required for reasonable semantics).
> > 3. I am still not sure about the coordinator storing the configs. It's
> > powerful for configs to be centralized in the metadata log for various
> > reasons (auditability, visibility, consistency, etc.). Similarly, I am
> not
> > sure about automatically deleting configs in a way that they cannot be
> > recovered. A good property for modern systems is to minimize the number
> of
> > unrecoverable data loss scenarios.
> >
> > Ismael
> >
> > On Wed, Jul 13, 2022 at 3:47 PM David Jacot  >
> > wrote:
> >
> > > Thanks Guozhang. My answers are below:
> > >
> > > > 1) the migration path, especially the last step when clients flip the
> > > flag
> > > > to enable the new protocol, in which we would have a window where
> both
> > > new
> > > > protocols / rpcs and old protocols / rpcs are used by members of the
> same
> > > > group. How the coordinator could "mimic" the old behavior while
> using the
> > > > new protocol is something we need to present about.
> > >
> > > Noted. I just published a new version of KIP which includes more
> > > details about this. See the "Supporting Online Consumer Group Upgrade"
> > > and the "Compatibility, Deprecation, and Migration Plan". I think that
> > > I have to think through a few cases now but the overall idea and
> > > mechanism should be understandable.
> > >
> > > > 2) the usage of topic ids. So far as KIP-516 the topic ids are only
> used
> > > as
> > > > part of RPCs and admin client, but they are not exposed via any
> public
> > > APIs
> > > > to consumers yet. I think the question is, first should we let the
> > > consumer
> > > > client to be maintaining the names -> ids mapping itself to fully
> > > leverage
> > > > on all the augmented existing RPCs and the new RPCs with the topic
> ids;
> > > and
> > > > secondly, should we ever consider exposing the topic ids in the
> consumer
> > > > public APIs as well (both subscribe/assign, as well as 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-15 Thread David Jacot
Thanks Hector! Our goal is to move forward with specialized API
instead of relying on one generic API. For Connect, we can apply the
exact same pattern and reuse/share the core implementation on the
server side. For the schema registry, I think that we should consider
having a tailored API to do simple membership/leader election.

Best,
David

On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma  wrote:
>
> Three quick comments:
>
> 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we should
> document the differences in more detail before deciding one way or another.
> That said, if people pass java.util.regex.Pattern, they expect their
> semantics to be honored. If we are doing something different, then we
> should consider adding an overload with our own Pattern class (I don't
> think we'd want to expose re2j's at this point).
> 2. Regarding topic ids, any major new protocol should integrate fully with
> it and should handle the topic recreation case correctly. That's the main
> part we need to handle. I agree with David that we'd want to add topic ids
> to the relevant protocols that don't have it yet and that we can probably
> focus on the internals versus adding new APIs to the Java Consumer (unless
> we find that adding new APIs is required for reasonable semantics).
> 3. I am still not sure about the coordinator storing the configs. It's
> powerful for configs to be centralized in the metadata log for various
> reasons (auditability, visibility, consistency, etc.). Similarly, I am not
> sure about automatically deleting configs in a way that they cannot be
> recovered. A good property for modern systems is to minimize the number of
> unrecoverable data loss scenarios.
>
> Ismael
>
> On Wed, Jul 13, 2022 at 3:47 PM David Jacot 
> wrote:
>
> > Thanks Guozhang. My answers are below:
> >
> > > 1) the migration path, especially the last step when clients flip the
> > flag
> > > to enable the new protocol, in which we would have a window where both
> > new
> > > protocols / rpcs and old protocols / rpcs are used by members of the same
> > > group. How the coordinator could "mimic" the old behavior while using the
> > > new protocol is something we need to present about.
> >
> > Noted. I just published a new version of KIP which includes more
> > details about this. See the "Supporting Online Consumer Group Upgrade"
> > and the "Compatibility, Deprecation, and Migration Plan". I think that
> > I have to think through a few cases now but the overall idea and
> > mechanism should be understandable.
> >
> > > 2) the usage of topic ids. So far as KIP-516 the topic ids are only used
> > as
> > > part of RPCs and admin client, but they are not exposed via any public
> > APIs
> > > to consumers yet. I think the question is, first should we let the
> > consumer
> > > client to be maintaining the names -> ids mapping itself to fully
> > leverage
> > > on all the augmented existing RPCs and the new RPCs with the topic ids;
> > and
> > > secondly, should we ever consider exposing the topic ids in the consumer
> > > public APIs as well (both subscribe/assign, as well as in the rebalance
> > > listener for cases like topic deletion-and-recreation).
> >
> > a) Assuming that we would include converting all the offsets related
> > RPCs to using topic ids in this KIP, the consumer would be able to
> > fully operate with topic ids. That being said, it still has to provide
> > the topics names in various APIs so having a mapping in the consumer
> > seems inevitable to me.
> > b) I don't have a strong opinion on this. Here I wonder if this goes
> > beyond the scope of this KIP. I would rather focus on the internals
> > here and we can consider this separately if we see value in doing it.
> >
> > Coming back to Ismael's point about using topic ids in the
> > ConsumerGroupHeartbeatRequest, I think that there is one advantage in
> > favour of it. The consumer will have the opportunity to validate that
> > the topics exists before passing them into the group rebalance
> > protocol. Obviously, the coordinator will also notice it but it does
> > not really have a way to reject an invalid topic in the response.
> >
> > > I'm agreeing with David on all other minor questions except for the
> > > `subscribe(Pattern)` question: personally I think it's not necessary to
> > > deprecate the subscribe API with Pattern, but instead we still use
> > Pattern
> > > while just documenting that our subscription may be rejected by the
> > server.
> > > Since the incompatible case is a very rare scenario I felt using an
> > > overloaded `String` based subscription may be more vulnerable to various
> > > invalid regexes.
> >
> > That could work. I have to look at the differences between the two
> > engines to better understand the potential issues. My understanding is
> > that would work for all the basic regular expressions. The differences
> > between the two are mainly about the various character classes. I
> > wonder what other people 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-15 Thread Ismael Juma
Three quick comments:

1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we should
document the differences in more detail before deciding one way or another.
That said, if people pass java.util.regex.Pattern, they expect their
semantics to be honored. If we are doing something different, then we
should consider adding an overload with our own Pattern class (I don't
think we'd want to expose re2j's at this point).
2. Regarding topic ids, any major new protocol should integrate fully with
it and should handle the topic recreation case correctly. That's the main
part we need to handle. I agree with David that we'd want to add topic ids
to the relevant protocols that don't have it yet and that we can probably
focus on the internals versus adding new APIs to the Java Consumer (unless
we find that adding new APIs is required for reasonable semantics).
3. I am still not sure about the coordinator storing the configs. It's
powerful for configs to be centralized in the metadata log for various
reasons (auditability, visibility, consistency, etc.). Similarly, I am not
sure about automatically deleting configs in a way that they cannot be
recovered. A good property for modern systems is to minimize the number of
unrecoverable data loss scenarios.

Ismael

On Wed, Jul 13, 2022 at 3:47 PM David Jacot 
wrote:

> Thanks Guozhang. My answers are below:
>
> > 1) the migration path, especially the last step when clients flip the
> flag
> > to enable the new protocol, in which we would have a window where both
> new
> > protocols / rpcs and old protocols / rpcs are used by members of the same
> > group. How the coordinator could "mimic" the old behavior while using the
> > new protocol is something we need to present about.
>
> Noted. I just published a new version of KIP which includes more
> details about this. See the "Supporting Online Consumer Group Upgrade"
> and the "Compatibility, Deprecation, and Migration Plan". I think that
> I have to think through a few cases now but the overall idea and
> mechanism should be understandable.
>
> > 2) the usage of topic ids. So far as KIP-516 the topic ids are only used
> as
> > part of RPCs and admin client, but they are not exposed via any public
> APIs
> > to consumers yet. I think the question is, first should we let the
> consumer
> > client to be maintaining the names -> ids mapping itself to fully
> leverage
> > on all the augmented existing RPCs and the new RPCs with the topic ids;
> and
> > secondly, should we ever consider exposing the topic ids in the consumer
> > public APIs as well (both subscribe/assign, as well as in the rebalance
> > listener for cases like topic deletion-and-recreation).
>
> a) Assuming that we would include converting all the offsets related
> RPCs to using topic ids in this KIP, the consumer would be able to
> fully operate with topic ids. That being said, it still has to provide
> the topics names in various APIs so having a mapping in the consumer
> seems inevitable to me.
> b) I don't have a strong opinion on this. Here I wonder if this goes
> beyond the scope of this KIP. I would rather focus on the internals
> here and we can consider this separately if we see value in doing it.
>
> Coming back to Ismael's point about using topic ids in the
> ConsumerGroupHeartbeatRequest, I think that there is one advantage in
> favour of it. The consumer will have the opportunity to validate that
> the topics exists before passing them into the group rebalance
> protocol. Obviously, the coordinator will also notice it but it does
> not really have a way to reject an invalid topic in the response.
>
> > I'm agreeing with David on all other minor questions except for the
> > `subscribe(Pattern)` question: personally I think it's not necessary to
> > deprecate the subscribe API with Pattern, but instead we still use
> Pattern
> > while just documenting that our subscription may be rejected by the
> server.
> > Since the incompatible case is a very rare scenario I felt using an
> > overloaded `String` based subscription may be more vulnerable to various
> > invalid regexes.
>
> That could work. I have to look at the differences between the two
> engines to better understand the potential issues. My understanding is
> that would work for all the basic regular expressions. The differences
> between the two are mainly about the various character classes. I
> wonder what other people think about this.
>
> Best,
> David
>
> On Tue, Jul 12, 2022 at 11:28 PM Guozhang Wang  wrote:
> >
> > Thanks David! I think on the high level there are two meta points we need
> > to concretize a bit more:
> >
> > 1) the migration path, especially the last step when clients flip the
> flag
> > to enable the new protocol, in which we would have a window where both
> new
> > protocols / rpcs and old protocols / rpcs are used by members of the same
> > group. How the coordinator could "mimic" the old behavior while using the
> > new protocol is something we need to 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-14 Thread Guozhang Wang
Thanks Hector! Yes, making the templated group "type" with extensible
handling logic is part of the motivation of this rebalance protocol.


Guozhang

On Thu, Jul 14, 2022 at 10:35 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) <
hgerald...@bloomberg.net> wrote:

> Kudos David, Guozhang, and Jason for putting together such a great
> proposal.
>
> I don't want to hijack the discussion, just wanted to mention that it
> would be great if the final design is made extensible enough, so other use
> cases (like Kafka Connect, Schema Registry, etc.) can be added later on.
>
> I can see how the concept of different group "types" in the group
> coordinator can be leveraged to support such cases. On KIP-795, I wanted to
> add public APIs for the AbstractCoordinator with the intent of formalizing
> the use of the Group Membership Protocol for resource management use cases.
> I'll probably close this KIP and wait to see what comes out of this
> redesign of the protocol.
>
> Thanks
>
> -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator
>
> From: dev@kafka.apache.org At: 07/06/22 04:44:59 UTC-4:00To:
> dev@kafka.apache.org
> Subject: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance
> Protocol
>
> Hi all,
>
> I would like to start a discussion thread on KIP-848: The Next
> Generation of the Consumer Rebalance Protocol. With this KIP, we aim
> to make the rebalance protocol (for consumers) more reliable, more
> scalable, easier to implement for clients, and easier to debug for
> operators.
>
> The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.
>
> Please take a look and let me know what you think.
>
> Best,
> David
>
> PS: I will be away from July 18th to August 8th. That gives you a bit
> of time to read and digest this long KIP.
>
>
>

-- 
-- Guozhang


Re:[DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-14 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Kudos David, Guozhang, and Jason for putting together such a great proposal.

I don't want to hijack the discussion, just wanted to mention that it would be 
great if the final design is made extensible enough, so other use cases (like 
Kafka Connect, Schema Registry, etc.) can be added later on.

I can see how the concept of different group "types" in the group coordinator 
can be leveraged to support such cases. On KIP-795, I wanted to add public APIs 
for the AbstractCoordinator with the intent of formalizing the use of the Group 
Membership Protocol for resource management use cases. I'll probably close this 
KIP and wait to see what comes out of this redesign of the protocol.

Thanks

- 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator

From: dev@kafka.apache.org At: 07/06/22 04:44:59 UTC-4:00To:  
dev@kafka.apache.org
Subject: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance 
Protocol

Hi all,

I would like to start a discussion thread on KIP-848: The Next
Generation of the Consumer Rebalance Protocol. With this KIP, we aim
to make the rebalance protocol (for consumers) more reliable, more
scalable, easier to implement for clients, and easier to debug for
operators.

The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.

Please take a look and let me know what you think.

Best,
David

PS: I will be away from July 18th to August 8th. That gives you a bit
of time to read and digest this long KIP.




Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-13 Thread David Jacot
Thanks Guozhang. My answers are below:

> 1) the migration path, especially the last step when clients flip the flag
> to enable the new protocol, in which we would have a window where both new
> protocols / rpcs and old protocols / rpcs are used by members of the same
> group. How the coordinator could "mimic" the old behavior while using the
> new protocol is something we need to present about.

Noted. I just published a new version of KIP which includes more
details about this. See the "Supporting Online Consumer Group Upgrade"
and the "Compatibility, Deprecation, and Migration Plan". I think that
I have to think through a few cases now but the overall idea and
mechanism should be understandable.

> 2) the usage of topic ids. So far as KIP-516 the topic ids are only used as
> part of RPCs and admin client, but they are not exposed via any public APIs
> to consumers yet. I think the question is, first should we let the consumer
> client to be maintaining the names -> ids mapping itself to fully leverage
> on all the augmented existing RPCs and the new RPCs with the topic ids; and
> secondly, should we ever consider exposing the topic ids in the consumer
> public APIs as well (both subscribe/assign, as well as in the rebalance
> listener for cases like topic deletion-and-recreation).

a) Assuming that we would include converting all the offsets related
RPCs to using topic ids in this KIP, the consumer would be able to
fully operate with topic ids. That being said, it still has to provide
the topics names in various APIs so having a mapping in the consumer
seems inevitable to me.
b) I don't have a strong opinion on this. Here I wonder if this goes
beyond the scope of this KIP. I would rather focus on the internals
here and we can consider this separately if we see value in doing it.

Coming back to Ismael's point about using topic ids in the
ConsumerGroupHeartbeatRequest, I think that there is one advantage in
favour of it. The consumer will have the opportunity to validate that
the topics exists before passing them into the group rebalance
protocol. Obviously, the coordinator will also notice it but it does
not really have a way to reject an invalid topic in the response.

> I'm agreeing with David on all other minor questions except for the
> `subscribe(Pattern)` question: personally I think it's not necessary to
> deprecate the subscribe API with Pattern, but instead we still use Pattern
> while just documenting that our subscription may be rejected by the server.
> Since the incompatible case is a very rare scenario I felt using an
> overloaded `String` based subscription may be more vulnerable to various
> invalid regexes.

That could work. I have to look at the differences between the two
engines to better understand the potential issues. My understanding is
that would work for all the basic regular expressions. The differences
between the two are mainly about the various character classes. I
wonder what other people think about this.

Best,
David

On Tue, Jul 12, 2022 at 11:28 PM Guozhang Wang  wrote:
>
> Thanks David! I think on the high level there are two meta points we need
> to concretize a bit more:
>
> 1) the migration path, especially the last step when clients flip the flag
> to enable the new protocol, in which we would have a window where both new
> protocols / rpcs and old protocols / rpcs are used by members of the same
> group. How the coordinator could "mimic" the old behavior while using the
> new protocol is something we need to present about.
> 2) the usage of topic ids. So far as KIP-516 the topic ids are only used as
> part of RPCs and admin client, but they are not exposed via any public APIs
> to consumers yet. I think the question is, first should we let the consumer
> client to be maintaining the names -> ids mapping itself to fully leverage
> on all the augmented existing RPCs and the new RPCs with the topic ids; and
> secondly, should we ever consider exposing the topic ids in the consumer
> public APIs as well (both subscribe/assign, as well as in the rebalance
> listener for cases like topic deletion-and-recreation).
>
> I'm agreeing with David on all other minor questions except for the
> `subscribe(Pattern)` question: personally I think it's not necessary to
> deprecate the subscribe API with Pattern, but instead we still use Pattern
> while just documenting that our subscription may be rejected by the server.
> Since the incompatible case is a very rare scenario I felt using an
> overloaded `String` based subscription may be more vulnerable to various
> invalid regexes.
>
>
> Guozhang
>
> On Tue, Jul 12, 2022 at 5:23 AM David Jacot 
> wrote:
>
> > Hi Ismael,
> >
> > Thanks for your feedback. Let me answer your questions inline.
> >
> > > 1. I think it's premature to talk about target versions for deprecation
> > and
> > > removal of the existing group protocol. Unlike KRaft, this affects a core
> > > client protocol and hence deprecation/removal will be heavily 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-12 Thread Guozhang Wang
Thanks David! I think on the high level there are two meta points we need
to concretize a bit more:

1) the migration path, especially the last step when clients flip the flag
to enable the new protocol, in which we would have a window where both new
protocols / rpcs and old protocols / rpcs are used by members of the same
group. How the coordinator could "mimic" the old behavior while using the
new protocol is something we need to present about.
2) the usage of topic ids. So far as KIP-516 the topic ids are only used as
part of RPCs and admin client, but they are not exposed via any public APIs
to consumers yet. I think the question is, first should we let the consumer
client to be maintaining the names -> ids mapping itself to fully leverage
on all the augmented existing RPCs and the new RPCs with the topic ids; and
secondly, should we ever consider exposing the topic ids in the consumer
public APIs as well (both subscribe/assign, as well as in the rebalance
listener for cases like topic deletion-and-recreation).

I'm agreeing with David on all other minor questions except for the
`subscribe(Pattern)` question: personally I think it's not necessary to
deprecate the subscribe API with Pattern, but instead we still use Pattern
while just documenting that our subscription may be rejected by the server.
Since the incompatible case is a very rare scenario I felt using an
overloaded `String` based subscription may be more vulnerable to various
invalid regexes.


Guozhang

On Tue, Jul 12, 2022 at 5:23 AM David Jacot 
wrote:

> Hi Ismael,
>
> Thanks for your feedback. Let me answer your questions inline.
>
> > 1. I think it's premature to talk about target versions for deprecation
> and
> > removal of the existing group protocol. Unlike KRaft, this affects a core
> > client protocol and hence deprecation/removal will be heavily dependent
> on
> > how quickly applications migrate to the new protocol.
>
> That makes sense. I will remove it.
>
> > 2. The KIP says we intend to release this in 4.x, but it wasn't made
> clear
> > why. If we added that as a way to estimate when we'd deprecate and remove
> > the group protocol, I also suggest removing this part.
>
> Let me explain my reasoning. As explained, I plan to rewrite the group
> coordinator in Java while we implement the new protocol. This means
> that the internals will be slightly different (e.g. threading model).
> Therefore, I wanted to tighten the switch from the old group
> coordinator to the new group coordinator to a major release. The
> alternative would be to use a flag to do the switch instead of relying
> on the software upgrade.
>
> > 3. We need to flesh out the details of the migration story. It sounds
> like
> > we're saying we will support online migrations. Is that correct? We
> should
> > explain this in detail. It could also be done as a separate KIP, if it's
> > easier.
>
> Yes, we will support online migrations for the group. That means that
> a group using the old protocol will be able to switch to the new
> protocol.
>
> Let me briefly explain how that will work though. It is basically a
> four step process:
>
> 1. The cluster must be upgraded or rolled to a software supporting the
> new group coordinator. Both the old and the new coordinator will
> support the old protocol and rely on the same persisted metadata so
> they can work together. This point is an offline migration. We cannot
> do this one live because it would require shutting down the current
> coordinator and starting up the new one and that would cause
> unavailabilities.
> 2. The cluster's metadata version/IBP must be upgraded to X in order
> to enable the new protocol. This cannot be done before 1) is
> terminated because the old coordinator doesn't support the new
> protocol.
> 3. The consumers must be upgraded to a version supporting the online
> migration (must have KIP-792). If the consumer is already there.
> Nothing must be done at this point.
> 4. The consumers must be rolled with the feature flag turned on. The
> consumer group is automatically converted when the first consumer
> using the new protocol joins the group. While the members using the
> old protocol are being upgraded, the old protocol is proxied into the
> new one.
>
> Let me clarify all of this in the KIP.
>
> > 4. I am happy that we are pushing the pattern subscriptions to the
> server,
> > but it seems like there could be some tricky compatibility issues. Will
> we
> > have a mechanism for users to detect that they need to update their regex
> > before switching to the new protocol?
>
> I think that I am a bit more optimistic than you on this point. I
> believe that the majority of the cases are simple regexes which should
> work with the new engine. The coordinator will verify the regex anyway
> and reject the consumer if the regex is not valid. Coming back to the
> migration path, in the worst case, the first upgraded consumer joining
> the group will be rejected. This should be used as the last 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-12 Thread David Jacot
Hi Ismael,

Thanks for your feedback. Let me answer your questions inline.

> 1. I think it's premature to talk about target versions for deprecation and
> removal of the existing group protocol. Unlike KRaft, this affects a core
> client protocol and hence deprecation/removal will be heavily dependent on
> how quickly applications migrate to the new protocol.

That makes sense. I will remove it.

> 2. The KIP says we intend to release this in 4.x, but it wasn't made clear
> why. If we added that as a way to estimate when we'd deprecate and remove
> the group protocol, I also suggest removing this part.

Let me explain my reasoning. As explained, I plan to rewrite the group
coordinator in Java while we implement the new protocol. This means
that the internals will be slightly different (e.g. threading model).
Therefore, I wanted to tighten the switch from the old group
coordinator to the new group coordinator to a major release. The
alternative would be to use a flag to do the switch instead of relying
on the software upgrade.

> 3. We need to flesh out the details of the migration story. It sounds like
> we're saying we will support online migrations. Is that correct? We should
> explain this in detail. It could also be done as a separate KIP, if it's
> easier.

Yes, we will support online migrations for the group. That means that
a group using the old protocol will be able to switch to the new
protocol.

Let me briefly explain how that will work though. It is basically a
four step process:

1. The cluster must be upgraded or rolled to a software supporting the
new group coordinator. Both the old and the new coordinator will
support the old protocol and rely on the same persisted metadata so
they can work together. This point is an offline migration. We cannot
do this one live because it would require shutting down the current
coordinator and starting up the new one and that would cause
unavailabilities.
2. The cluster's metadata version/IBP must be upgraded to X in order
to enable the new protocol. This cannot be done before 1) is
terminated because the old coordinator doesn't support the new
protocol.
3. The consumers must be upgraded to a version supporting the online
migration (must have KIP-792). If the consumer is already there.
Nothing must be done at this point.
4. The consumers must be rolled with the feature flag turned on. The
consumer group is automatically converted when the first consumer
using the new protocol joins the group. While the members using the
old protocol are being upgraded, the old protocol is proxied into the
new one.

Let me clarify all of this in the KIP.

> 4. I am happy that we are pushing the pattern subscriptions to the server,
> but it seems like there could be some tricky compatibility issues. Will we
> have a mechanism for users to detect that they need to update their regex
> before switching to the new protocol?

I think that I am a bit more optimistic than you on this point. I
believe that the majority of the cases are simple regexes which should
work with the new engine. The coordinator will verify the regex anyway
and reject the consumer if the regex is not valid. Coming back to the
migration path, in the worst case, the first upgraded consumer joining
the group will be rejected. This should be used as the last defence, I
would say.

One way for customers to validate their regex before upgrading their
prod would be to test them with another group. For instance, that
could be done in a pre-prod environment. Another way would be to
extend the consumer-group tool to provide a regex validation
mechanism. Would this be enough in your opinion?

> 5. Related to the last question, will the Java client allow the users to
> stick with the current regex engine for compatibility reasons? For example,
> it may be handy to keep using client based regex at first to keep
> migrations simple and then migrate to server based regexes as a second step.

I understand your point but I am concerned that this would allow users
to actually stay in this mode. That would go against our goal of
simplifying the client because we would have to continue monitoring
the metadata on the client side. I would rather not do this.

> 6. When we say that the group coordinator will be responsible for storing
> the configurations and that the configurations will be deleted when the
> group is deleted. Will a transition to DEAD trigger deletion of
> configurations?

That's right. The configurations will be deleted when the group is
deleted. They go together.

> 7. Will the choice to store the configs in the group coordinator make it
> harder to list all cluster configs and their values?

I don't think so. The group configurations are overrides of cluster
configs. If you want to know all the overrides though, you would have
to ask all the group coordinators. You cannot rely on the metadata log
for instance.

> 8. How would someone configure a group before starting the consumers? Have
> we considered 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-12 Thread David Jacot
Hi Justine,

Thanks for your comments. Please find my answers below.

- Yes, the new protocol relies on topic IDs with the exception of the
topic names based in the ConsumerGroupHeartbeatRequest. I am not sure
if using topic names is the right call here. I need to think about it
a little more. Obviously, the KIP does not change the fetch/commit
offsets RPCs to use topic IDs. This may be something that we should
include though as it would give better overall guarantee in the
producer.
- You're right. I think that I should not have mentioned this flag at
all. I will remove it. We can use an internal configuration while
developing the feature.
- Both cluster types will be supported. The change is orthogonal. The
only requirement is that the cluster uses topic IDs.

Best,
David

On Mon, Jul 11, 2022 at 9:53 PM Guozhang Wang  wrote:
>
> Hi Ismael,
>
> Thanks for the feedback. Here are some replies inlined below:
>
> On Sat, Jul 9, 2022 at 2:53 AM Ismael Juma  wrote:
>
> > Thanks for the KIP. This has the potential to be a great improvement. A few
> > initial questions/comments:
> >
> > 1. I think it's premature to talk about target versions for deprecation and
> > removal of the existing group protocol. Unlike KRaft, this affects a core
> > client protocol and hence deprecation/removal will be heavily dependent on
> > how quickly applications migrate to the new protocol.
> >
>
> Yeah I agree with you. I think we can remove the proposed timeline in the
> `Compatibility, Deprecation, and Migration Plan` and instead just state
> that we will decide in the future about when we would deprecate old
> protocol and behaviors.
>
>
> > 2. The KIP says we intend to release this in 4.x, but it wasn't made clear
> > why. If we added that as a way to estimate when we'd deprecate and remove
> > the group protocol, I also suggest removing this part.
> >
>
> I think that's not specifically related to the deprecation/removal timeline
> plan, but it's more for client upgrades. I.e. the broker-side
> implementation may be done first, and then the client side, and we would
> only mark it as "released" by the time clients implementations are done. At
> that time, to enable the feature the clients need to first swap-in the
> bytecode with a rolling bounce and then set the flag with a second rolling
> bounce, and hence we feel it's better to be released in a major version.
>
>
> > 3. We need to flesh out the details of the migration story. It sounds like
> > we're saying we will support online migrations. Is that correct? We should
> > explain this in detail. It could also be done as a separate KIP, if it's
> > easier.
> >
>
> Yes I think that's the part we can be more concrete about for sure (and
> this is related to your question 2) above). We will work on making it more
> explicit in parallel as we solicit more feedback.
>
>
> > 4. I am happy that we are pushing the pattern subscriptions to the server,
> > but it seems like there could be some tricky compatibility issues. Will we
> > have a mechanism for users to detect that they need to update their regex
> > before switching to the new protocol?
> >
>
> Yes I think we need some tooling for non-java client users to sort of
> "dry-run" the client before switching to the new protocol. I do not have a
> specific idea on top of my head though, maybe others like @Matt Howlett can
> chime-in here?
>
>
> > 5. Related to the last question, will the Java client allow the users to
> > stick with the current regex engine for compatibility reasons? For example,
> > it may be handy to keep using client based regex at first to keep
> > migrations simple and then migrate to server based regexes as a second
> > step.
> >
>
> Honestly I have not thought about that for java clients, and we can discuss
> that. What kind of compatibility issues do you have in mind?
>
>
> > 6. When we say that the group coordinator will be responsible for storing
> > the configurations and that the configurations will be deleted when the
> > group is deleted. Will a transition to DEAD trigger deletion of
> > configurations?
> >
>
> Yes, since the DEAD state is an ending state (we would only transit to that
> state when the group is EMPTY and also all of its metadata are gone), once
> it's transited to DEAD this group would never be revived.
>
>
> > 7. Will the choice to store the configs in the group coordinator make it
> > harder to list all cluster configs and their values?
> >
>
> That's a good question, and our thoughts are that the so-called "group
> configurations" are overrides of the cluster-level configurations
> customized per group so when an admin list cluster configs it's okay to
> list just the cluster-level "defaults", not showing any per-group
> customizations.
>
>
> > 8. How would someone configure a group before starting the consumers? Have
> > we considered allowing the explicit creation of groups? Alternatively, the
> > configs could be decoupled from the group lifecycle.
> >
>
> The configs can 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-11 Thread Guozhang Wang
Hi Ismael,

Thanks for the feedback. Here are some replies inlined below:

On Sat, Jul 9, 2022 at 2:53 AM Ismael Juma  wrote:

> Thanks for the KIP. This has the potential to be a great improvement. A few
> initial questions/comments:
>
> 1. I think it's premature to talk about target versions for deprecation and
> removal of the existing group protocol. Unlike KRaft, this affects a core
> client protocol and hence deprecation/removal will be heavily dependent on
> how quickly applications migrate to the new protocol.
>

Yeah I agree with you. I think we can remove the proposed timeline in the
`Compatibility, Deprecation, and Migration Plan` and instead just state
that we will decide in the future about when we would deprecate old
protocol and behaviors.


> 2. The KIP says we intend to release this in 4.x, but it wasn't made clear
> why. If we added that as a way to estimate when we'd deprecate and remove
> the group protocol, I also suggest removing this part.
>

I think that's not specifically related to the deprecation/removal timeline
plan, but it's more for client upgrades. I.e. the broker-side
implementation may be done first, and then the client side, and we would
only mark it as "released" by the time clients implementations are done. At
that time, to enable the feature the clients need to first swap-in the
bytecode with a rolling bounce and then set the flag with a second rolling
bounce, and hence we feel it's better to be released in a major version.


> 3. We need to flesh out the details of the migration story. It sounds like
> we're saying we will support online migrations. Is that correct? We should
> explain this in detail. It could also be done as a separate KIP, if it's
> easier.
>

Yes I think that's the part we can be more concrete about for sure (and
this is related to your question 2) above). We will work on making it more
explicit in parallel as we solicit more feedback.


> 4. I am happy that we are pushing the pattern subscriptions to the server,
> but it seems like there could be some tricky compatibility issues. Will we
> have a mechanism for users to detect that they need to update their regex
> before switching to the new protocol?
>

Yes I think we need some tooling for non-java client users to sort of
"dry-run" the client before switching to the new protocol. I do not have a
specific idea on top of my head though, maybe others like @Matt Howlett can
chime-in here?


> 5. Related to the last question, will the Java client allow the users to
> stick with the current regex engine for compatibility reasons? For example,
> it may be handy to keep using client based regex at first to keep
> migrations simple and then migrate to server based regexes as a second
> step.
>

Honestly I have not thought about that for java clients, and we can discuss
that. What kind of compatibility issues do you have in mind?


> 6. When we say that the group coordinator will be responsible for storing
> the configurations and that the configurations will be deleted when the
> group is deleted. Will a transition to DEAD trigger deletion of
> configurations?
>

Yes, since the DEAD state is an ending state (we would only transit to that
state when the group is EMPTY and also all of its metadata are gone), once
it's transited to DEAD this group would never be revived.


> 7. Will the choice to store the configs in the group coordinator make it
> harder to list all cluster configs and their values?
>

That's a good question, and our thoughts are that the so-called "group
configurations" are overrides of the cluster-level configurations
customized per group so when an admin list cluster configs it's okay to
list just the cluster-level "defaults", not showing any per-group
customizations.


> 8. How would someone configure a group before starting the consumers? Have
> we considered allowing the explicit creation of groups? Alternatively, the
> configs could be decoupled from the group lifecycle.
>

The configs can be created before the group itself as an independent entity
--- of course, this requires the corresponding request to be routed to the
right coordinator based on the group id --- the only thing that differs is,
when the group itself is gone we also check if there are any configuration
entities related to that group and delete as well.

Admittedly this indeed introduces an asymmetry on the creation / deletion
lifecycles of the config entities, and we would like to hear everyone's
feelings whether we should aim for symmetry i.e. totally decouple group
configs and hence not delete them at all when the group is gone, but always
require explicit deletion operations by themselves.


> 9. Will the Consumer.subscribe method for the Java client still take a
> `java.util.regex.Pattern` of do we have to introduce an overload?
>

I think we do not need to introduce an overload, but I'm all ears if there
may be some compatibility issues that we may overlook.


> 10. I agree with Justine that we should be 

Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-11 Thread Guozhang Wang
Hi Justine,

Thanks for sharing your feedback! Here are some thoughts inlined below:

On Fri, Jul 8, 2022 at 11:33 AM Justine Olshan 
wrote:

> Hi David,
> Thanks for sharing this KIP! Really exciting to hear how we are changing
> the protocol! The motivation section really made me realize how useful this
> change will be.
>
> I've done a first pass of the KIP, and may have more questions, but thought
> I'd start with a few I thought of already.
>
>- I saw some usages of topic IDs in the new
>protocols/records/interfaces, but wasn't sure if they were used
> everywhere.
>Are you planning on relying on topic IDs for the new protocol?
>

Yes we do plan to use topic IDs in the newly introduced RPC protocols (e.g.
the new heartbeat req/resp), while keep the status quo for the existing
protocols (e.g. the offset fetch / commit req/resp) that already encoded
with topic names.


>- I saw the section about using a feature flag first before integrating
>the feature with ibp/metadata version. I understand the logic for
> testing
>with the flag, but it also seems like a bit of work to deprecate and
> switch
>to the ibp/metadata version approach. What was the reasoning behind
>switching the enablement mechanism?
>

The main rationale is on the clients side, since such a behavioral change
cannot be captured only by the apikey-version itself as the clients needs
to behave differently with the new protocol even before they talk to the
group coordinator.


>- Generally, are there implications for KRaft here? (IBP/metadata
>version is something that I think of) And if so, will both cluster
> types be
>supported?
>

Besides IBP / metadata, the only think I could think of is the group state
machine snapshotting we may want to consider in the future with KRaft's
extended snapshot capabilities (i.e. KIP-630).


>
> Thanks again to everyone who worked on this KIP!
> Justine
>
> On Wed, Jul 6, 2022 at 1:45 AM David Jacot 
> wrote:
>
> > Hi all,
> >
> > I would like to start a discussion thread on KIP-848: The Next
> > Generation of the Consumer Rebalance Protocol. With this KIP, we aim
> > to make the rebalance protocol (for consumers) more reliable, more
> > scalable, easier to implement for clients, and easier to debug for
> > operators.
> >
> > The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.
> >
> > Please take a look and let me know what you think.
> >
> > Best,
> > David
> >
> > PS: I will be away from July 18th to August 8th. That gives you a bit
> > of time to read and digest this long KIP.
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-09 Thread Ismael Juma
Thanks for the KIP. This has the potential to be a great improvement. A few
initial questions/comments:

1. I think it's premature to talk about target versions for deprecation and
removal of the existing group protocol. Unlike KRaft, this affects a core
client protocol and hence deprecation/removal will be heavily dependent on
how quickly applications migrate to the new protocol.
2. The KIP says we intend to release this in 4.x, but it wasn't made clear
why. If we added that as a way to estimate when we'd deprecate and remove
the group protocol, I also suggest removing this part.
3. We need to flesh out the details of the migration story. It sounds like
we're saying we will support online migrations. Is that correct? We should
explain this in detail. It could also be done as a separate KIP, if it's
easier.
4. I am happy that we are pushing the pattern subscriptions to the server,
but it seems like there could be some tricky compatibility issues. Will we
have a mechanism for users to detect that they need to update their regex
before switching to the new protocol?
5. Related to the last question, will the Java client allow the users to
stick with the current regex engine for compatibility reasons? For example,
it may be handy to keep using client based regex at first to keep
migrations simple and then migrate to server based regexes as a second step.
6. When we say that the group coordinator will be responsible for storing
the configurations and that the configurations will be deleted when the
group is deleted. Will a transition to DEAD trigger deletion of
configurations?
7. Will the choice to store the configs in the group coordinator make it
harder to list all cluster configs and their values?
8. How would someone configure a group before starting the consumers? Have
we considered allowing the explicit creation of groups? Alternatively, the
configs could be decoupled from the group lifecycle.
9. Will the Consumer.subscribe method for the Java client still take a
`java.util.regex.Pattern` of do we have to introduce an overload?
10. I agree with Justine that we should be clearer about the reason to
switch to IBP/metadata.version from the feature flag. Maybe we mean that we
can switch the default for the feature flag to true based on the
metadata.version once we want to make it the default.
11. Some of the protocol APIs don't mention the required ACLs, it would be
good to add that for consistency.
12. It is a bit odd that ConsumerGroupHeartbeat requires "Read Group" even
though it seems to do more than reading.
13. How is topic recreation handled by the consumer with the new group
protocol? It would be good to have a section on this.
14. The KIP mentions we will write the new coordinator in Java. Even though
this is an implementation detail, do we plan to have a new gradle module
for it?
15. Do we have a scalability goal when it comes to how many members the new
group protocol can support?
16. Did we consider having SubscribedTopidIds instead of
SubscribedTopicNames in ConsumerGroupHeartbeatRequest? Is the idea that
since we have to resolve the regex on the server, we can do the same for
the topic name? The difference is that sending the regex is more efficient
whereas sending the topic names is less efficient. Furthermore, delete and
recreation is easier to handle if we have topic ids.

Thanks,
Ismael

On Wed, Jul 6, 2022 at 10:45 AM David Jacot 
wrote:

> Hi all,
>
> I would like to start a discussion thread on KIP-848: The Next
> Generation of the Consumer Rebalance Protocol. With this KIP, we aim
> to make the rebalance protocol (for consumers) more reliable, more
> scalable, easier to implement for clients, and easier to debug for
> operators.
>
> The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.
>
> Please take a look and let me know what you think.
>
> Best,
> David
>
> PS: I will be away from July 18th to August 8th. That gives you a bit
> of time to read and digest this long KIP.
>


Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-08 Thread Justine Olshan
Hi David,
Thanks for sharing this KIP! Really exciting to hear how we are changing
the protocol! The motivation section really made me realize how useful this
change will be.

I've done a first pass of the KIP, and may have more questions, but thought
I'd start with a few I thought of already.

   - I saw some usages of topic IDs in the new
   protocols/records/interfaces, but wasn't sure if they were used everywhere.
   Are you planning on relying on topic IDs for the new protocol?
   - I saw the section about using a feature flag first before integrating
   the feature with ibp/metadata version. I understand the logic for testing
   with the flag, but it also seems like a bit of work to deprecate and switch
   to the ibp/metadata version approach. What was the reasoning behind
   switching the enablement mechanism?
   - Generally, are there implications for KRaft here? (IBP/metadata
   version is something that I think of) And if so, will both cluster types be
   supported?

Thanks again to everyone who worked on this KIP!
Justine

On Wed, Jul 6, 2022 at 1:45 AM David Jacot 
wrote:

> Hi all,
>
> I would like to start a discussion thread on KIP-848: The Next
> Generation of the Consumer Rebalance Protocol. With this KIP, we aim
> to make the rebalance protocol (for consumers) more reliable, more
> scalable, easier to implement for clients, and easier to debug for
> operators.
>
> The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.
>
> Please take a look and let me know what you think.
>
> Best,
> David
>
> PS: I will be away from July 18th to August 8th. That gives you a bit
> of time to read and digest this long KIP.
>


[DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol

2022-07-06 Thread David Jacot
Hi all,

I would like to start a discussion thread on KIP-848: The Next
Generation of the Consumer Rebalance Protocol. With this KIP, we aim
to make the rebalance protocol (for consumers) more reliable, more
scalable, easier to implement for clients, and easier to debug for
operators.

The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D.

Please take a look and let me know what you think.

Best,
David

PS: I will be away from July 18th to August 8th. That gives you a bit
of time to read and digest this long KIP.