Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Re versions: fair enough, I think it's okay to keep it as strings. Just to clarify the concern I had is that if we do want to augment it in the future, it will be harder to change a `string` field into a `struct` :P Re onAssignment: actually even with the current proposal, since we are giving the final encoded metadata upon the first `onAssignment` call for the generation already, Streams would infer all the standby tasks that need are migrated in / out, so for Streams' purposes, we'd need to handle such decoupling anyways, and it's actually the opposite: if the first `onAssignment` does not include those yet-to-be-assigned then Streams would not know if a migrating out standby would really be promoting to active later or should it be completely removed, until later when next `onAssignment` is called. And that's where I mentioned "we'd need to figure out which onAssignment is the final call" etc. If we have all the partitions on the `onAssignment`, then we can infer such actions and decide whether we should close a standby task immediately or just recycle it and wait for the active task to be assigned eventually. On the other hand, if we call onAssignment incrementally similar to onPartitionsAssigned, in which we include both assigned and yet-to-be-assigned partitions, then for people who implement both interfaces, they can just ignore the `onPartitionsAssigned` since they have all knowledge they need from onAssignment already. And that's what I'm trying to avoid by making the functionality of the two more separated. Guozhang On Mon, Sep 26, 2022 at 11:30 AM David Jacot wrote: > Regarding the version, I would rather add this later when we have a > clear use case for it. It seems to me that we are speculating here. I > understand your point but I am not fully convinced by the solution at > the moment. Would you agree with this? > > Regarding the onAssignment, I was thinking about the case where a task > is promoted from standby to active. In this case, having onAssignment > with the combined partitions could make it difficult, no? I am > thinking that the assignor will have to remove the standby based on > the metadata but it won't know what to do after that. If the partition > is assigned directly, it can convert it to an active task. On the > other hand, if the partition is not available yet, it would have to > keep it as a standby until the partition is really assigned. It seems > to be that the assignor won't have the information to make this > decision, no? In this case, decouple the "why" from the "when" seems > to make things harder. I am not so familiar with Streams though so my > intuition could be wrong here. > > David > > On Mon, Sep 26, 2022 at 7:26 PM Guozhang Wang wrote: > > > > Regarding the version, what I was thinking is that in the HB request, for > > "serverAssignor" field, instead of just having it as a single string > field, > > maybe we could consider also making it a structure that includes: name, > > minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion > > means the versions of the server assignor that the client work best with. > > That being said, I agree with you that such information may also be > > inferred elsewhere e.g. by looking into the "rackId" field, and see if it > > contains a hyphen or not etc. All I was wondering is that, if such > version > > information would be useful for the server assignors to determine its > > actual assignment logic. I do not feel very strong about this one though > > --- even if we do not add it now, we can potentially add later, it's just > > that changing a single string field to a structure would be hard for > > compatibility and we'd then probably have to add top-level fields. > > > > Regarding the `onAssignment` logic, again my train of thoughts is that, > if > > users want to know exactly when a partition is assigned / revoked, they > > would be leveraging on the rebalance callbacks, as that's what people > > should rely on to determine "when" partitions are assigned. The > > `onAssignment` should be used for getting "why" such partition assignment > > decision is made, and hence returning `combined partitions` would be > okay. > > Streams e.g. implement both rebalance callbacks and the assignors, and it > > gets the "when" from the former (and create/close active tasks > accordingly) > > and the "why" from the latter (and update its global info bookkeeping as > > well as standby maintenance accordingly). Most users would be just > > interested in the rebalance callback, and not implement their own > assignor > > at all if they do not care about "why" as they trust the server assignors > > would take good care of those, and only about "when". So if we did build > > such two types of APIs from scratch, I'd indeed feel that not providing > the > > partitions but only the metadata for `onAssignment` may be less confusing > > and push users to separate the usage of these two more clearly, but since > > we
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Regarding the version, I would rather add this later when we have a clear use case for it. It seems to me that we are speculating here. I understand your point but I am not fully convinced by the solution at the moment. Would you agree with this? Regarding the onAssignment, I was thinking about the case where a task is promoted from standby to active. In this case, having onAssignment with the combined partitions could make it difficult, no? I am thinking that the assignor will have to remove the standby based on the metadata but it won't know what to do after that. If the partition is assigned directly, it can convert it to an active task. On the other hand, if the partition is not available yet, it would have to keep it as a standby until the partition is really assigned. It seems to be that the assignor won't have the information to make this decision, no? In this case, decouple the "why" from the "when" seems to make things harder. I am not so familiar with Streams though so my intuition could be wrong here. David On Mon, Sep 26, 2022 at 7:26 PM Guozhang Wang wrote: > > Regarding the version, what I was thinking is that in the HB request, for > "serverAssignor" field, instead of just having it as a single string field, > maybe we could consider also making it a structure that includes: name, > minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion > means the versions of the server assignor that the client work best with. > That being said, I agree with you that such information may also be > inferred elsewhere e.g. by looking into the "rackId" field, and see if it > contains a hyphen or not etc. All I was wondering is that, if such version > information would be useful for the server assignors to determine its > actual assignment logic. I do not feel very strong about this one though > --- even if we do not add it now, we can potentially add later, it's just > that changing a single string field to a structure would be hard for > compatibility and we'd then probably have to add top-level fields. > > Regarding the `onAssignment` logic, again my train of thoughts is that, if > users want to know exactly when a partition is assigned / revoked, they > would be leveraging on the rebalance callbacks, as that's what people > should rely on to determine "when" partitions are assigned. The > `onAssignment` should be used for getting "why" such partition assignment > decision is made, and hence returning `combined partitions` would be okay. > Streams e.g. implement both rebalance callbacks and the assignors, and it > gets the "when" from the former (and create/close active tasks accordingly) > and the "why" from the latter (and update its global info bookkeeping as > well as standby maintenance accordingly). Most users would be just > interested in the rebalance callback, and not implement their own assignor > at all if they do not care about "why" as they trust the server assignors > would take good care of those, and only about "when". So if we did build > such two types of APIs from scratch, I'd indeed feel that not providing the > partitions but only the metadata for `onAssignment` may be less confusing > and push users to separate the usage of these two more clearly, but since > we already introduced partitions in `onAssignment` for compatibility I'm > less keen on removing them. > > > Guozhang > > On Mon, Sep 26, 2022 at 6:55 AM David Jacot > wrote: > > > Hi Guozhang, > > > > Regarding the version, my understanding is that the version would be > > either the client software version or the request version, is this > > correct? If so, we could indeed pass this information down to the > > assignor via the interface. One way would be to pass a "server > > context" to the assignor and that context would include that > > information (and perhaps more). Is this what you are looking for? > > > > Regarding the onAssignment, I think that I understand your point. I > > suppose that the assignor could also be clever and keep track of the > > last metadata to decide whether it has to do something or not. One > > question that is still not clear to me is whether the assignor needs > > to know all the assigned partitions upfront regardless of whether they > > are already revoked or not. Do you think that we need this as well? > > > > From an API perspective, we could have something like > > onAssignment(Metadata(version, reason, metadata, assigned partitions, > > pending partitions)). Where the assigned partitions are the partitions > > ready to be used and the pending partitions are the one assigned to > > the member but not revoked yet. I find it a bit weird that this method > > would be called only once because the assignor would not know when the > > pending partitions changes. That does not look like a clean API. An > > alternative would be to use onAssignment(Metadata(version, reason, > > metadata, combined partitions)) but this seems error prone because it > > is not clear whether a
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Regarding the version, what I was thinking is that in the HB request, for "serverAssignor" field, instead of just having it as a single string field, maybe we could consider also making it a structure that includes: name, minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion means the versions of the server assignor that the client work best with. That being said, I agree with you that such information may also be inferred elsewhere e.g. by looking into the "rackId" field, and see if it contains a hyphen or not etc. All I was wondering is that, if such version information would be useful for the server assignors to determine its actual assignment logic. I do not feel very strong about this one though --- even if we do not add it now, we can potentially add later, it's just that changing a single string field to a structure would be hard for compatibility and we'd then probably have to add top-level fields. Regarding the `onAssignment` logic, again my train of thoughts is that, if users want to know exactly when a partition is assigned / revoked, they would be leveraging on the rebalance callbacks, as that's what people should rely on to determine "when" partitions are assigned. The `onAssignment` should be used for getting "why" such partition assignment decision is made, and hence returning `combined partitions` would be okay. Streams e.g. implement both rebalance callbacks and the assignors, and it gets the "when" from the former (and create/close active tasks accordingly) and the "why" from the latter (and update its global info bookkeeping as well as standby maintenance accordingly). Most users would be just interested in the rebalance callback, and not implement their own assignor at all if they do not care about "why" as they trust the server assignors would take good care of those, and only about "when". So if we did build such two types of APIs from scratch, I'd indeed feel that not providing the partitions but only the metadata for `onAssignment` may be less confusing and push users to separate the usage of these two more clearly, but since we already introduced partitions in `onAssignment` for compatibility I'm less keen on removing them. Guozhang On Mon, Sep 26, 2022 at 6:55 AM David Jacot wrote: > Hi Guozhang, > > Regarding the version, my understanding is that the version would be > either the client software version or the request version, is this > correct? If so, we could indeed pass this information down to the > assignor via the interface. One way would be to pass a "server > context" to the assignor and that context would include that > information (and perhaps more). Is this what you are looking for? > > Regarding the onAssignment, I think that I understand your point. I > suppose that the assignor could also be clever and keep track of the > last metadata to decide whether it has to do something or not. One > question that is still not clear to me is whether the assignor needs > to know all the assigned partitions upfront regardless of whether they > are already revoked or not. Do you think that we need this as well? > > From an API perspective, we could have something like > onAssignment(Metadata(version, reason, metadata, assigned partitions, > pending partitions)). Where the assigned partitions are the partitions > ready to be used and the pending partitions are the one assigned to > the member but not revoked yet. I find it a bit weird that this method > would be called only once because the assignor would not know when the > pending partitions changes. That does not look like a clean API. An > alternative would be to use onAssignment(Metadata(version, reason, > metadata, combined partitions)) but this seems error prone because it > is not clear whether a partition is usable or not. Or do you think > that we should not provide the partitions but only the metadata? > > Best, > David > > On Fri, Sep 23, 2022 at 9:40 PM Guozhang Wang wrote: > > > > Hello David, > > > > On Fri, Sep 23, 2022 at 2:00 AM David Jacot > > > wrote: > > > > > Hey, > > > > > > > Just to clarify I was asking about the `version` of the assignor > (i.e. up > > > to what version that the client would support), and I do agree we > would not > > > need metadata. What I have in mind is that, for some specific built-in > > > broker-assignors, e.g. rack-aware assignors, if it's possible that in a > > > newer version we would have a hierarchical rack ID string format, like > > > "tier1-tier2" etc, but if some client has not upgraded their rack ID > > > would still be in old format. In this case, the broker then needs to > choose > > > the old versioned assignor. I'm probably making something up here for > rack > > > aware assignors, but I'm wondering if in general such an > "auto-downgrade" > > > behavior would be needed still for broker-side assignor, and if yes > would > > > "version" still be useful. > > > > > > Got it. That's an interesting thought. I think that the issue is that > > > the
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Guozhang, Regarding the version, my understanding is that the version would be either the client software version or the request version, is this correct? If so, we could indeed pass this information down to the assignor via the interface. One way would be to pass a "server context" to the assignor and that context would include that information (and perhaps more). Is this what you are looking for? Regarding the onAssignment, I think that I understand your point. I suppose that the assignor could also be clever and keep track of the last metadata to decide whether it has to do something or not. One question that is still not clear to me is whether the assignor needs to know all the assigned partitions upfront regardless of whether they are already revoked or not. Do you think that we need this as well? >From an API perspective, we could have something like onAssignment(Metadata(version, reason, metadata, assigned partitions, pending partitions)). Where the assigned partitions are the partitions ready to be used and the pending partitions are the one assigned to the member but not revoked yet. I find it a bit weird that this method would be called only once because the assignor would not know when the pending partitions changes. That does not look like a clean API. An alternative would be to use onAssignment(Metadata(version, reason, metadata, combined partitions)) but this seems error prone because it is not clear whether a partition is usable or not. Or do you think that we should not provide the partitions but only the metadata? Best, David On Fri, Sep 23, 2022 at 9:40 PM Guozhang Wang wrote: > > Hello David, > > On Fri, Sep 23, 2022 at 2:00 AM David Jacot > wrote: > > > Hey, > > > > > Just to clarify I was asking about the `version` of the assignor (i.e. up > > to what version that the client would support), and I do agree we would not > > need metadata. What I have in mind is that, for some specific built-in > > broker-assignors, e.g. rack-aware assignors, if it's possible that in a > > newer version we would have a hierarchical rack ID string format, like > > "tier1-tier2" etc, but if some client has not upgraded their rack ID > > would still be in old format. In this case, the broker then needs to choose > > the old versioned assignor. I'm probably making something up here for rack > > aware assignors, but I'm wondering if in general such an "auto-downgrade" > > behavior would be needed still for broker-side assignor, and if yes would > > "version" still be useful. > > > > Got it. That's an interesting thought. I think that the issue is that > > the client will never tell you which version of the server-side > > assignor should be used. Do you think that the coordinator would > > downgrade the version if the assignment fails with a higher version? I > > tend to believe that this should be handled within the assignor > > itself. In the example that you mentioned, the assignor would have to > > handle all the cases. I am not really convinced that we need this at > > the moment. > > > > The version from the client side would not be indicating the broker which > version to use, but rather which version the client would "work best with". > Such a "version" field would not be settible by the users, since they will > be hard-codedly bumped when the Kafka byte code version bumped. > Back to the rack aware assignor example, if the older versioned client does > not have a hierarchical rack ID, however if the assignment returned to them > is assuming a hierarchical rack structure, it may not reflect the best > workload balance among those new and old versioned clients. That means, > when receiving the members subscriptions at the server side, if the > versions from all these members are different, the broker's assignor may > need to consider using the lower version logic to do the assignment. So yes > the assignor would indeed have to handle all such cases, but it needs to do > so such that if there are clients who would not work with certain new > logic, it would then handle such cases automatically by e.g. still using an > older versioned logic. > > > > > > Okay, my understanding is that the calling ordering of these callbacks > > would be like the following: > > > > Yes, your examples look right. > > > > > I'm wondering if we would still call onAssignment just once, that encodes > > all the assignment for this rebalance, including all the partitions that > > should be assigned to the member but not yet assigned since they have not > > been revoked by others. In that case the call ordering would be: > > > > Interesting. Is there a case for Streams where having the full > > assignment is beneficial? For instance, I can think of the following > > case. When a standby task is promoted to an active task, the metadata > > would not contain the standby task anymore and the assignment may not > > have the partition yet. In this case, Streams would stop the standby > > tasks but not have the active task
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hello David, On Fri, Sep 23, 2022 at 2:00 AM David Jacot wrote: > Hey, > > > Just to clarify I was asking about the `version` of the assignor (i.e. up > to what version that the client would support), and I do agree we would not > need metadata. What I have in mind is that, for some specific built-in > broker-assignors, e.g. rack-aware assignors, if it's possible that in a > newer version we would have a hierarchical rack ID string format, like > "tier1-tier2" etc, but if some client has not upgraded their rack ID > would still be in old format. In this case, the broker then needs to choose > the old versioned assignor. I'm probably making something up here for rack > aware assignors, but I'm wondering if in general such an "auto-downgrade" > behavior would be needed still for broker-side assignor, and if yes would > "version" still be useful. > > Got it. That's an interesting thought. I think that the issue is that > the client will never tell you which version of the server-side > assignor should be used. Do you think that the coordinator would > downgrade the version if the assignment fails with a higher version? I > tend to believe that this should be handled within the assignor > itself. In the example that you mentioned, the assignor would have to > handle all the cases. I am not really convinced that we need this at > the moment. > > The version from the client side would not be indicating the broker which version to use, but rather which version the client would "work best with". Such a "version" field would not be settible by the users, since they will be hard-codedly bumped when the Kafka byte code version bumped. Back to the rack aware assignor example, if the older versioned client does not have a hierarchical rack ID, however if the assignment returned to them is assuming a hierarchical rack structure, it may not reflect the best workload balance among those new and old versioned clients. That means, when receiving the members subscriptions at the server side, if the versions from all these members are different, the broker's assignor may need to consider using the lower version logic to do the assignment. So yes the assignor would indeed have to handle all such cases, but it needs to do so such that if there are clients who would not work with certain new logic, it would then handle such cases automatically by e.g. still using an older versioned logic. > > Okay, my understanding is that the calling ordering of these callbacks > would be like the following: > > Yes, your examples look right. > > > I'm wondering if we would still call onAssignment just once, that encodes > all the assignment for this rebalance, including all the partitions that > should be assigned to the member but not yet assigned since they have not > been revoked by others. In that case the call ordering would be: > > Interesting. Is there a case for Streams where having the full > assignment is beneficial? For instance, I can think of the following > case. When a standby task is promoted to an active task, the metadata > would not contain the standby task anymore and the assignment may not > have the partition yet. In this case, Streams would stop the standby > tasks but not have the active task yet if my understanding of Streams > is correct. So knowing the full assignment could be helpful here. > > If we want to do this, we could structure the assignment given to the > member as follow: version, error, metadata, assigned partitions, > pending partitions, where the pending partitions would be the one > assigned to this member but not yet available. What do you think? > > Regarding onAssignment being called only once, I am not sure to fully > grasp the benefit yet. Does the assignor really care about this? In > the end, the epoch does not really matter for the assignor because it > has to converge its state to the desired state anyway. > > Here's my rationale (maybe rephased a bit :P ): the implementers of rebalance listener and assignor are two groups of people, and most users fall into the former group, while only very few people fall into the later group. For rebalance listener implementers, they just want to know when a partition is actually revoked or assigned to the consumer and reacts to it, for this purpose, `onPartitionsRevoked` and `onPartitionsAssigned` would be triggered interleavingly upon `poll` calls across rebalances. The usual logic for such rebalance listeners are metrics reporting, committing offsets (if they do not use Kafka for that), etc. They would not care which calls are from which rebalances --- in the past with eager rebalance, it maybe that each rebalance is associated with exactly a `onPartitionsRevoked` first and then a `onPartitionsAssigned`, but it would no longer the cases now. The implementers of the assignor though, would care about "how the assignment was made", that includes from which rebalance a certain revoke/assign decision was made, based on what metadata such assignment is
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hey, > Just to clarify I was asking about the `version` of the assignor (i.e. up to what version that the client would support), and I do agree we would not need metadata. What I have in mind is that, for some specific built-in broker-assignors, e.g. rack-aware assignors, if it's possible that in a newer version we would have a hierarchical rack ID string format, like "tier1-tier2" etc, but if some client has not upgraded their rack ID would still be in old format. In this case, the broker then needs to choose the old versioned assignor. I'm probably making something up here for rack aware assignors, but I'm wondering if in general such an "auto-downgrade" behavior would be needed still for broker-side assignor, and if yes would "version" still be useful. Got it. That's an interesting thought. I think that the issue is that the client will never tell you which version of the server-side assignor should be used. Do you think that the coordinator would downgrade the version if the assignment fails with a higher version? I tend to believe that this should be handled within the assignor itself. In the example that you mentioned, the assignor would have to handle all the cases. I am not really convinced that we need this at the moment. > Okay, my understanding is that the calling ordering of these callbacks would be like the following: Yes, your examples look right. > I'm wondering if we would still call onAssignment just once, that encodes all the assignment for this rebalance, including all the partitions that should be assigned to the member but not yet assigned since they have not been revoked by others. In that case the call ordering would be: Interesting. Is there a case for Streams where having the full assignment is beneficial? For instance, I can think of the following case. When a standby task is promoted to an active task, the metadata would not contain the standby task anymore and the assignment may not have the partition yet. In this case, Streams would stop the standby tasks but not have the active task yet if my understanding of Streams is correct. So knowing the full assignment could be helpful here. If we want to do this, we could structure the assignment given to the member as follow: version, error, metadata, assigned partitions, pending partitions, where the pending partitions would be the one assigned to this member but not yet available. What do you think? Regarding onAssignment being called only once, I am not sure to fully grasp the benefit yet. Does the assignor really care about this? In the end, the epoch does not really matter for the assignor because it has to converge its state to the desired state anyway. Best, David On Thu, Sep 22, 2022 at 6:01 PM Guozhang Wang wrote: > > Hi David, thanks for all the detailed explanations. I think they all make > sense. Just want to have a couple follow-ups here: > > > I don't really see the benefits here because server side assignors > don't have metadata at all. They only assign topic-partitions. They > are not supposed to generate metadata nor to receive metadata from the > members. > > Just to clarify I was asking about the `version` of the assignor (i.e. up > to what version that the client would support), and I do agree we would not > need metadata. What I have in mind is that, for some specific built-in > broker-assignors, e.g. rack-aware assignors, if it's possible that in a > newer version we would have a hierarchical rack ID string format, like > "tier1-tier2" etc, but if some client has not upgraded their rack ID > would still be in old format. In this case, the broker then needs to choose > the old versioned assignor. I'm probably making something up here for rack > aware assignors, but I'm wondering if in general such an "auto-downgrade" > behavior would be needed still for broker-side assignor, and if yes would > "version" still be useful. > > > Yeah, that's right. Within a rebalance, `onAssignment` is called once > when the member transitions to a new epoch. This one contains the full > metadata provided by the client side assignor. Then, `onAssignment` > can be called max N times where N is the number of partitions pending > revocation by other members. Let me try to clarify this in the KIP. > > Okay, my understanding is that the calling ordering of these callbacks > would be like the following: > > > onPartitionsRevoked(); // just once, since we do not really need > to revoke incrementally. > > onAssignment();// the first call, with epoch incremented > onPartitionsAssigned(); // paired with the onAssignment > > onAssignment(); // the first onAssignment would bump up the > epoch, and the metadata reflected. > onPartitionsAssigned(); // each time we get an additional assignment, we > call onAssignment and then paired with an onPartitionsAssigned > ... > onAssignment(); > onPartitionsAssigned(); // on each of the onAssignment calls, the encoded > metadata
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
47. group.consumer.session.timeout.ms: how does the client communicate > its session timeout config to the broker? > > 48. There still seems to be some typos in the examples. For example, the > first reference of the following seems incorrect. > B - epoch=2, partitions=[foo-2] > C - epoch=3, partitions=[foo-1] > C - epoch=22, partitions=[foo-2, foo-5] > > Thanks, > > Jun > > On Mon, Sep 12, 2022 at 11:42 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) < > hgerald...@bloomberg.net> wrote: > > > Yes, I agree that leadership could be modeled after partition > > assignment(s). However, I can think of some expanded versions of the > > 'leader election' use case that exist today in Schema Registry. > > > > The advantage of creating a different 'type' that isn't necessarily tied > > to topics/partitions (and is used only for resource management) would be > > that we can scale the number of resources it handles (akin to a connect > > cluster increasing the number of connectors/tasks) without having to change > > topics/partitions, as these partitions will never have any data (and can't > > be shrunk), they will be used just for leadership. > > > > This is in the spirit of KIP-795. We can table this discussion for after > > the Connect discussion, as there will be more clarity on how extending the > > new protocol will look like. > > > > From: dev@kafka.apache.org At: 09/12/22 07:58:32 UTC-4:00To: > > dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer > > Rebalance Protocol > > > > Hi Hector, > > > > We definitely need to share internals with Connect APIs. That model > > would not scale otherwise. > > > > Regarding the schema registry, I wonder if we could just use the new > > protocol. At the end of the day, the schema registry wants to elect a > > single writer for a partition and the owner of the partition can be > > considered as the leader. I haven't really tried this out but that > > seems doable. What do you think? > > > > Best, > > David > > > > On Fri, Sep 9, 2022 at 8:45 PM Hector Geraldino (BLOOMBERG/ 919 3RD A) > > wrote: > > > > > > So it seems there's a consensus on having dedicated APIs for Connect, > > which > > means having data model (group, member, assignment) and APIs (heartbeat > > request/response, assignment prepare and install) tailored specifically to > > connect. I wonder if adding support for other coordinator group types > > (e.g. > > leadership, in the case of schema registry) will require similar assets > > (new > > model classes to express members and resources, custom heartbeats and > > assignment prepare/install APIs). > > > > > > I think that, as new use cases are considered, the core primitives of > > the new > > protocol will be generalized, so new types don't have to implement the > > whole > > stack (e.g. state machines), but only functions like detecting group > > metadata > > changes, or computing assignments of the resources handled by each type > > (Topic/Partitions in the case of consumer, Connector/Task in the case of > > Connect, Leadership in the case of Schema Registry, and so on). > > > > > > > > > From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To: > > dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer > > Rebalance > > Protocol > > > > > > Thank you Guozhang/David for the feedback. Looks like there's agreement > > on > > > using separate APIs for Connect. I would revisit the doc and see what > > > changes are to be made. > > > > > > Thanks! > > > Sagar. > > > > > > On Tue, Aug 9, 2022 at 7:11 PM David Jacot > > > wrote: > > > > > > > Hi Sagar, > > > > > > > > Thanks for the feedback and the document. That's really helpful. I > > > > will take a look at it. > > > > > > > > Overall, it seems to me that both Connect and the Consumer could share > > > > the same underlying "engine". The main difference is that the Consumer > > > > assigns topic-partitions to members whereas Connect assigns tasks to > > > > workers. I see two ways to move forward: > > > > 1) We extend the new proposed APIs to support different resource types > > > > (e.g. partitions, tasks, etc.); or > > > > 2) We use new dedicated APIs for Connect. The dedicated APIs would be > > > > similar to the new
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi, David, Thanks for the reply. A few more comments. 40. A member updates its assignors' reason to trigger a rebalance: What API in the consumer does the power user use to change the assignor reason? 41. partition.assignment.strategy defaults class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor. Does that still make sense since the assignor name on the broker is not the full class name? Should it default to a strategy determined by the broker? 42. "This means that the assignment metadata may already reference partitions which are not assigned to the member yet." Not sure I follow this one. 43. ConsumerGroupHeartbeat: 43.1 Do we need an ErrorCode to indicate that the group state is being loaded (during coordinator failover)? If so, do we need the same ErrorCode for other requests? 43.2 COMPUTE_ASSIGNMENT : It's a bit weird to represent this as an error code since there is no error. Could we represent that in the payload of ConsumerGroupHeartbeat response? 44. ConsumerGroupPrepareAssignmentResponse: 44.1 Why does it have AssignorName since the client decides the assignor? 44.2 TopicPartitions: The target topic-partitions of the member. Should that be the topic partition currently owned by the member? 45. List group: Should the ACL be Describe (Cluster) instead of Describe Group? 46. AlterIncrementalConfigs: what group configs could be changed? 47. group.consumer.session.timeout.ms: how does the client communicate its session timeout config to the broker? 48. There still seems to be some typos in the examples. For example, the first reference of the following seems incorrect. B - epoch=2, partitions=[foo-2] C - epoch=3, partitions=[foo-1] C - epoch=22, partitions=[foo-2, foo-5] Thanks, Jun On Mon, Sep 12, 2022 at 11:42 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) < hgerald...@bloomberg.net> wrote: > Yes, I agree that leadership could be modeled after partition > assignment(s). However, I can think of some expanded versions of the > 'leader election' use case that exist today in Schema Registry. > > The advantage of creating a different 'type' that isn't necessarily tied > to topics/partitions (and is used only for resource management) would be > that we can scale the number of resources it handles (akin to a connect > cluster increasing the number of connectors/tasks) without having to change > topics/partitions, as these partitions will never have any data (and can't > be shrunk), they will be used just for leadership. > > This is in the spirit of KIP-795. We can table this discussion for after > the Connect discussion, as there will be more clarity on how extending the > new protocol will look like. > > From: dev@kafka.apache.org At: 09/12/22 07:58:32 UTC-4:00To: > dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer > Rebalance Protocol > > Hi Hector, > > We definitely need to share internals with Connect APIs. That model > would not scale otherwise. > > Regarding the schema registry, I wonder if we could just use the new > protocol. At the end of the day, the schema registry wants to elect a > single writer for a partition and the owner of the partition can be > considered as the leader. I haven't really tried this out but that > seems doable. What do you think? > > Best, > David > > On Fri, Sep 9, 2022 at 8:45 PM Hector Geraldino (BLOOMBERG/ 919 3RD A) > wrote: > > > > So it seems there's a consensus on having dedicated APIs for Connect, > which > means having data model (group, member, assignment) and APIs (heartbeat > request/response, assignment prepare and install) tailored specifically to > connect. I wonder if adding support for other coordinator group types > (e.g. > leadership, in the case of schema registry) will require similar assets > (new > model classes to express members and resources, custom heartbeats and > assignment prepare/install APIs). > > > > I think that, as new use cases are considered, the core primitives of > the new > protocol will be generalized, so new types don't have to implement the > whole > stack (e.g. state machines), but only functions like detecting group > metadata > changes, or computing assignments of the resources handled by each type > (Topic/Partitions in the case of consumer, Connector/Task in the case of > Connect, Leadership in the case of Schema Registry, and so on). > > > > > > From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To: > dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer > Rebalance > Protocol > > > > Thank you Guozhang/David for the feedback. Looks like there's agreement > on > > using separate APIs for Connect. I would revisit
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Yes, I agree that leadership could be modeled after partition assignment(s). However, I can think of some expanded versions of the 'leader election' use case that exist today in Schema Registry. The advantage of creating a different 'type' that isn't necessarily tied to topics/partitions (and is used only for resource management) would be that we can scale the number of resources it handles (akin to a connect cluster increasing the number of connectors/tasks) without having to change topics/partitions, as these partitions will never have any data (and can't be shrunk), they will be used just for leadership. This is in the spirit of KIP-795. We can table this discussion for after the Connect discussion, as there will be more clarity on how extending the new protocol will look like. From: dev@kafka.apache.org At: 09/12/22 07:58:32 UTC-4:00To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol Hi Hector, We definitely need to share internals with Connect APIs. That model would not scale otherwise. Regarding the schema registry, I wonder if we could just use the new protocol. At the end of the day, the schema registry wants to elect a single writer for a partition and the owner of the partition can be considered as the leader. I haven't really tried this out but that seems doable. What do you think? Best, David On Fri, Sep 9, 2022 at 8:45 PM Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote: > > So it seems there's a consensus on having dedicated APIs for Connect, which means having data model (group, member, assignment) and APIs (heartbeat request/response, assignment prepare and install) tailored specifically to connect. I wonder if adding support for other coordinator group types (e.g. leadership, in the case of schema registry) will require similar assets (new model classes to express members and resources, custom heartbeats and assignment prepare/install APIs). > > I think that, as new use cases are considered, the core primitives of the new protocol will be generalized, so new types don't have to implement the whole stack (e.g. state machines), but only functions like detecting group metadata changes, or computing assignments of the resources handled by each type (Topic/Partitions in the case of consumer, Connector/Task in the case of Connect, Leadership in the case of Schema Registry, and so on). > > > From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol > > Thank you Guozhang/David for the feedback. Looks like there's agreement on > using separate APIs for Connect. I would revisit the doc and see what > changes are to be made. > > Thanks! > Sagar. > > On Tue, Aug 9, 2022 at 7:11 PM David Jacot > wrote: > > > Hi Sagar, > > > > Thanks for the feedback and the document. That's really helpful. I > > will take a look at it. > > > > Overall, it seems to me that both Connect and the Consumer could share > > the same underlying "engine". The main difference is that the Consumer > > assigns topic-partitions to members whereas Connect assigns tasks to > > workers. I see two ways to move forward: > > 1) We extend the new proposed APIs to support different resource types > > (e.g. partitions, tasks, etc.); or > > 2) We use new dedicated APIs for Connect. The dedicated APIs would be > > similar to the new ones but different on the content/resources and > > they would rely on the same engine on the coordinator side. > > > > I personally lean towards 2) because I am not a fan of overcharging > > APIs to serve different purposes. That being said, I am not opposed to > > 1) if we can find an elegant way to do it. > > > > I think that we can continue to discuss it here for now in order to > > ensure that this KIP is compatible with what we will do for Connect in > > the future. > > > > Best, > > David > > > > On Mon, Aug 8, 2022 at 2:41 PM David Jacot wrote: > > > > > > Hi all, > > > > > > I am back from vacation. I will go through and address your comments > > > in the coming days. Thanks for your feedback. > > > > > > Cheers, > > > David > > > > > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris > > wrote: > > > > > > > > Hey All! > > > > > > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing > > making it > > > > down the stack! > > > > > > > > I had a few questions: > > > > > > > > 1. The 'Rejected Alternatives' section describes how member epoch >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Guozhang, 1. I have added a reference to the relevant chapter instead of repeating the whole thing. Does that work for you? 2. The "Rebalance Triggers" section you are referring to is about when a rebalance should be triggered for the non-upgraded members using the old protocol. The section mentions that a rebalance must be triggered when a new assignment is installed. This implies that the group epoch was updated either by a native member or a non-upgraded member. For the latter, the JoinGroup request would be the trigger. I have added a reference to the relevant chapter in the "JoinGroup Handling" section as well. Does that make sense? Thanks, David On Fri, Sep 9, 2022 at 10:35 PM Guozhang Wang wrote: > > Hello David, > > Alright I think that's sufficient. Just to make that clear in the doc, > could we update: > > 1) the heartbeat request handling section, stating when coordinator will > trigger rebalance based on the HB's member metadata / reason? > 2) the "Rebalance Triggers" section to include what we described in "Group > Epoch - Trigger a rebalance" section as well? > > > Guozhang > > On Fri, Sep 9, 2022 at 1:28 AM David Jacot > wrote: > > > Hi Guozhang, > > > > I thought that the assignor will always be consulted when the next > > heartbeat request is constructed. In other words, > > `PartitionAssignor#metadata` will be called for every heartbeat. This > > gives the opportunity for the assignor to enforce a rebalance by > > setting the reason to a non-zero value or by changing the bytes. Do > > you think that this is not sufficient? Are you concerned by the delay? > > > > Best, > > David > > > > On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang wrote: > > > > > > Hello David, > > > > > > One of Jun's comments make me thinking: > > > > > > ``` > > > In this case, a new assignment is triggered by the client side > > > assignor. When constructing the HB, the consumer will always consult > > > the client side assignor and propagate the information to the group > > > coordinator. In other words, we don't expect users to call > > > Consumer#enforceRebalance anymore. > > > ``` > > > > > > As I looked at the current PartitionAssignor's interface, we actually do > > > not have a way yet to instruct how to construct the next HB request, e.g. > > > when the assignor wants to enforce a new rebalance with a new assignment, > > > we'd need some customizable APIs inside the PartitionAssignor to indicate > > > the next HB telling broker about so. WDYT about adding such an API on the > > > PartitionAssignor? > > > > > > > > > Guozhang > > > > > > > > > On Tue, Sep 6, 2022 at 6:09 AM David Jacot > > > wrote: > > > > > > > Hi Jun, > > > > > > > > I have updated the KIP to include your feedback. I have also tried to > > > > clarify the parts which were not cleared. > > > > > > > > Best, > > > > David > > > > > > > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot > > wrote: > > > > > > > > > > Hi Jun, > > > > > > > > > > Thanks for your feedback. Let me start by answering your questions > > > > > inline and I will update the KIP next week. > > > > > > > > > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to > > be > > > > fewer > > > > > > RPCs during rebalance and more efficient support of wildcard. A few > > > > > > comments below. > > > > > > > > > > I would also add that the KIP removes the global sync barrier in the > > > > > protocol which is essential to improve group stability and > > > > > scalability, and the KIP also simplifies the client by moving most of > > > > > the logic to the server side. > > > > > > > > > > > 30. ConsumerGroupHeartbeatRequest > > > > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling > > > > changing > > > > > > of the partition assignor in the consumers? > > > > > > > > > > Definitely. The group coordinator will use the assignor used by a > > > > > majority of the members. This allows the group to move from one > > > > > assignor to another by a roll. This is explained in the Assignor > > > > > Selection chapter. > > > > > > > > > > > 30.2 For each field, could you explain whether it's required in > > every > > > > > > request or the scenarios when it needs to be filled? For example, > > it's > > > > not > > > > > > clear to me when TopicPartitions needs to be filled. > > > > > > > > > > The client is expected to set those fields in case of a connection > > > > > issue (e.g. timeout) or when the fields have changed since the last > > > > > HB. The server populates those fields as long as the member is not > > > > > fully reconciled - the member should acknowledge that it has the > > > > > expected epoch and assignment. I will clarify this in the KIP. > > > > > > > > > > > 31. In the current consumer protocol, the rack affinity between the > > > > client > > > > > > and the broker is only considered during fetching, but not during > > > > assigning > > > > > > partitions to consumers. Sometimes, once the assignment is made, > > there > > > >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Hector, We definitely need to share internals with Connect APIs. That model would not scale otherwise. Regarding the schema registry, I wonder if we could just use the new protocol. At the end of the day, the schema registry wants to elect a single writer for a partition and the owner of the partition can be considered as the leader. I haven't really tried this out but that seems doable. What do you think? Best, David On Fri, Sep 9, 2022 at 8:45 PM Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote: > > So it seems there's a consensus on having dedicated APIs for Connect, which > means having data model (group, member, assignment) and APIs (heartbeat > request/response, assignment prepare and install) tailored specifically to > connect. I wonder if adding support for other coordinator group types (e.g. > leadership, in the case of schema registry) will require similar assets (new > model classes to express members and resources, custom heartbeats and > assignment prepare/install APIs). > > I think that, as new use cases are considered, the core primitives of the new > protocol will be generalized, so new types don't have to implement the whole > stack (e.g. state machines), but only functions like detecting group metadata > changes, or computing assignments of the resources handled by each type > (Topic/Partitions in the case of consumer, Connector/Task in the case of > Connect, Leadership in the case of Schema Registry, and so on). > > > From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To: > dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance > Protocol > > Thank you Guozhang/David for the feedback. Looks like there's agreement on > using separate APIs for Connect. I would revisit the doc and see what > changes are to be made. > > Thanks! > Sagar. > > On Tue, Aug 9, 2022 at 7:11 PM David Jacot > wrote: > > > Hi Sagar, > > > > Thanks for the feedback and the document. That's really helpful. I > > will take a look at it. > > > > Overall, it seems to me that both Connect and the Consumer could share > > the same underlying "engine". The main difference is that the Consumer > > assigns topic-partitions to members whereas Connect assigns tasks to > > workers. I see two ways to move forward: > > 1) We extend the new proposed APIs to support different resource types > > (e.g. partitions, tasks, etc.); or > > 2) We use new dedicated APIs for Connect. The dedicated APIs would be > > similar to the new ones but different on the content/resources and > > they would rely on the same engine on the coordinator side. > > > > I personally lean towards 2) because I am not a fan of overcharging > > APIs to serve different purposes. That being said, I am not opposed to > > 1) if we can find an elegant way to do it. > > > > I think that we can continue to discuss it here for now in order to > > ensure that this KIP is compatible with what we will do for Connect in > > the future. > > > > Best, > > David > > > > On Mon, Aug 8, 2022 at 2:41 PM David Jacot wrote: > > > > > > Hi all, > > > > > > I am back from vacation. I will go through and address your comments > > > in the coming days. Thanks for your feedback. > > > > > > Cheers, > > > David > > > > > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris > > wrote: > > > > > > > > Hey All! > > > > > > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing > > making it > > > > down the stack! > > > > > > > > I had a few questions: > > > > > > > > 1. The 'Rejected Alternatives' section describes how member epoch > > should > > > > advance in step with the group epoch and assignment epoch values. I > > think > > > > that this is a good idea for the reasons described in the KIP. When the > > > > protocol is incrementally assigning partitions to a worker, what member > > > > epoch does each incremental assignment use? Are member epochs re-used, > > and > > > > a single member epoch can correspond to multiple different > > (monotonically > > > > larger) assignments? > > > > > > > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > > > > not, should custom client-side assignor implementations interact with > > the > > > > Reason field, and how is its common meaning agreed upon? If so, what > > is the > > > > benefit of a distin
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hello David, Alright I think that's sufficient. Just to make that clear in the doc, could we update: 1) the heartbeat request handling section, stating when coordinator will trigger rebalance based on the HB's member metadata / reason? 2) the "Rebalance Triggers" section to include what we described in "Group Epoch - Trigger a rebalance" section as well? Guozhang On Fri, Sep 9, 2022 at 1:28 AM David Jacot wrote: > Hi Guozhang, > > I thought that the assignor will always be consulted when the next > heartbeat request is constructed. In other words, > `PartitionAssignor#metadata` will be called for every heartbeat. This > gives the opportunity for the assignor to enforce a rebalance by > setting the reason to a non-zero value or by changing the bytes. Do > you think that this is not sufficient? Are you concerned by the delay? > > Best, > David > > On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang wrote: > > > > Hello David, > > > > One of Jun's comments make me thinking: > > > > ``` > > In this case, a new assignment is triggered by the client side > > assignor. When constructing the HB, the consumer will always consult > > the client side assignor and propagate the information to the group > > coordinator. In other words, we don't expect users to call > > Consumer#enforceRebalance anymore. > > ``` > > > > As I looked at the current PartitionAssignor's interface, we actually do > > not have a way yet to instruct how to construct the next HB request, e.g. > > when the assignor wants to enforce a new rebalance with a new assignment, > > we'd need some customizable APIs inside the PartitionAssignor to indicate > > the next HB telling broker about so. WDYT about adding such an API on the > > PartitionAssignor? > > > > > > Guozhang > > > > > > On Tue, Sep 6, 2022 at 6:09 AM David Jacot > > wrote: > > > > > Hi Jun, > > > > > > I have updated the KIP to include your feedback. I have also tried to > > > clarify the parts which were not cleared. > > > > > > Best, > > > David > > > > > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot > wrote: > > > > > > > > Hi Jun, > > > > > > > > Thanks for your feedback. Let me start by answering your questions > > > > inline and I will update the KIP next week. > > > > > > > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to > be > > > fewer > > > > > RPCs during rebalance and more efficient support of wildcard. A few > > > > > comments below. > > > > > > > > I would also add that the KIP removes the global sync barrier in the > > > > protocol which is essential to improve group stability and > > > > scalability, and the KIP also simplifies the client by moving most of > > > > the logic to the server side. > > > > > > > > > 30. ConsumerGroupHeartbeatRequest > > > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling > > > changing > > > > > of the partition assignor in the consumers? > > > > > > > > Definitely. The group coordinator will use the assignor used by a > > > > majority of the members. This allows the group to move from one > > > > assignor to another by a roll. This is explained in the Assignor > > > > Selection chapter. > > > > > > > > > 30.2 For each field, could you explain whether it's required in > every > > > > > request or the scenarios when it needs to be filled? For example, > it's > > > not > > > > > clear to me when TopicPartitions needs to be filled. > > > > > > > > The client is expected to set those fields in case of a connection > > > > issue (e.g. timeout) or when the fields have changed since the last > > > > HB. The server populates those fields as long as the member is not > > > > fully reconciled - the member should acknowledge that it has the > > > > expected epoch and assignment. I will clarify this in the KIP. > > > > > > > > > 31. In the current consumer protocol, the rack affinity between the > > > client > > > > > and the broker is only considered during fetching, but not during > > > assigning > > > > > partitions to consumers. Sometimes, once the assignment is made, > there > > > is > > > > > no opportunity for read affinity because no replicas of assigned > > > partitions > > > > > are close to the member. I am wondering if we should use this > > > opportunity > > > > > to address this by including rack in GroupMember. > > > > > > > > That's an interesting idea. I don't see any issue with adding the > rack > > > > to the members. I will do so. > > > > > > > > > 32. On the metric side, often, it's useful to know how busy a group > > > > > coordinator is. By moving the event loop model, it seems that we > could > > > add > > > > > a metric that tracks the fraction of the time the event loop is > doing > > > the > > > > > actual work. > > > > > > > > That's a great idea. I will add it. Thanks. > > > > > > > > > 33. Could we add a section on coordinator failover handling? For > > > example, > > > > > does it need to trigger the check if any group with the wildcard > > > > > subscription now has a new
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
So it seems there's a consensus on having dedicated APIs for Connect, which means having data model (group, member, assignment) and APIs (heartbeat request/response, assignment prepare and install) tailored specifically to connect. I wonder if adding support for other coordinator group types (e.g. leadership, in the case of schema registry) will require similar assets (new model classes to express members and resources, custom heartbeats and assignment prepare/install APIs). I think that, as new use cases are considered, the core primitives of the new protocol will be generalized, so new types don't have to implement the whole stack (e.g. state machines), but only functions like detecting group metadata changes, or computing assignments of the resources handled by each type (Topic/Partitions in the case of consumer, Connector/Task in the case of Connect, Leadership in the case of Schema Registry, and so on). From: dev@kafka.apache.org At: 08/12/22 09:31:36 UTC-4:00To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol Thank you Guozhang/David for the feedback. Looks like there's agreement on using separate APIs for Connect. I would revisit the doc and see what changes are to be made. Thanks! Sagar. On Tue, Aug 9, 2022 at 7:11 PM David Jacot wrote: > Hi Sagar, > > Thanks for the feedback and the document. That's really helpful. I > will take a look at it. > > Overall, it seems to me that both Connect and the Consumer could share > the same underlying "engine". The main difference is that the Consumer > assigns topic-partitions to members whereas Connect assigns tasks to > workers. I see two ways to move forward: > 1) We extend the new proposed APIs to support different resource types > (e.g. partitions, tasks, etc.); or > 2) We use new dedicated APIs for Connect. The dedicated APIs would be > similar to the new ones but different on the content/resources and > they would rely on the same engine on the coordinator side. > > I personally lean towards 2) because I am not a fan of overcharging > APIs to serve different purposes. That being said, I am not opposed to > 1) if we can find an elegant way to do it. > > I think that we can continue to discuss it here for now in order to > ensure that this KIP is compatible with what we will do for Connect in > the future. > > Best, > David > > On Mon, Aug 8, 2022 at 2:41 PM David Jacot wrote: > > > > Hi all, > > > > I am back from vacation. I will go through and address your comments > > in the coming days. Thanks for your feedback. > > > > Cheers, > > David > > > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris > wrote: > > > > > > Hey All! > > > > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing > making it > > > down the stack! > > > > > > I had a few questions: > > > > > > 1. The 'Rejected Alternatives' section describes how member epoch > should > > > advance in step with the group epoch and assignment epoch values. I > think > > > that this is a good idea for the reasons described in the KIP. When the > > > protocol is incrementally assigning partitions to a worker, what member > > > epoch does each incremental assignment use? Are member epochs re-used, > and > > > a single member epoch can correspond to multiple different > (monotonically > > > larger) assignments? > > > > > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > > > not, should custom client-side assignor implementations interact with > the > > > Reason field, and how is its common meaning agreed upon? If so, what > is the > > > benefit of a distinct Reason field over including such functionality > in the > > > opaque metadata? > > > > > > 3. The following is included in the KIP: "Thanks to this, the input of > the > > > client side assignor is entirely driven by the group coordinator. The > > > consumer is no longer responsible for maintaining any state besides its > > > assigned partitions." Does this mean that the client-side assignor MAY > > > incorporate additional non-Metadata state (such as partition > throughput, > > > cpu/memory metrics, config topics, etc), or that additional > non-Metadata > > > state SHOULD NOT be used? > > > > > > 4. I see that there are separate classes > > > for org.apache.kafka.server.group.consumer.PartitionAssignor > > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to > > > overlap significantly. Is it possible for these tw
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Guozhang, I thought that the assignor will always be consulted when the next heartbeat request is constructed. In other words, `PartitionAssignor#metadata` will be called for every heartbeat. This gives the opportunity for the assignor to enforce a rebalance by setting the reason to a non-zero value or by changing the bytes. Do you think that this is not sufficient? Are you concerned by the delay? Best, David On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang wrote: > > Hello David, > > One of Jun's comments make me thinking: > > ``` > In this case, a new assignment is triggered by the client side > assignor. When constructing the HB, the consumer will always consult > the client side assignor and propagate the information to the group > coordinator. In other words, we don't expect users to call > Consumer#enforceRebalance anymore. > ``` > > As I looked at the current PartitionAssignor's interface, we actually do > not have a way yet to instruct how to construct the next HB request, e.g. > when the assignor wants to enforce a new rebalance with a new assignment, > we'd need some customizable APIs inside the PartitionAssignor to indicate > the next HB telling broker about so. WDYT about adding such an API on the > PartitionAssignor? > > > Guozhang > > > On Tue, Sep 6, 2022 at 6:09 AM David Jacot > wrote: > > > Hi Jun, > > > > I have updated the KIP to include your feedback. I have also tried to > > clarify the parts which were not cleared. > > > > Best, > > David > > > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot wrote: > > > > > > Hi Jun, > > > > > > Thanks for your feedback. Let me start by answering your questions > > > inline and I will update the KIP next week. > > > > > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to be > > fewer > > > > RPCs during rebalance and more efficient support of wildcard. A few > > > > comments below. > > > > > > I would also add that the KIP removes the global sync barrier in the > > > protocol which is essential to improve group stability and > > > scalability, and the KIP also simplifies the client by moving most of > > > the logic to the server side. > > > > > > > 30. ConsumerGroupHeartbeatRequest > > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling > > changing > > > > of the partition assignor in the consumers? > > > > > > Definitely. The group coordinator will use the assignor used by a > > > majority of the members. This allows the group to move from one > > > assignor to another by a roll. This is explained in the Assignor > > > Selection chapter. > > > > > > > 30.2 For each field, could you explain whether it's required in every > > > > request or the scenarios when it needs to be filled? For example, it's > > not > > > > clear to me when TopicPartitions needs to be filled. > > > > > > The client is expected to set those fields in case of a connection > > > issue (e.g. timeout) or when the fields have changed since the last > > > HB. The server populates those fields as long as the member is not > > > fully reconciled - the member should acknowledge that it has the > > > expected epoch and assignment. I will clarify this in the KIP. > > > > > > > 31. In the current consumer protocol, the rack affinity between the > > client > > > > and the broker is only considered during fetching, but not during > > assigning > > > > partitions to consumers. Sometimes, once the assignment is made, there > > is > > > > no opportunity for read affinity because no replicas of assigned > > partitions > > > > are close to the member. I am wondering if we should use this > > opportunity > > > > to address this by including rack in GroupMember. > > > > > > That's an interesting idea. I don't see any issue with adding the rack > > > to the members. I will do so. > > > > > > > 32. On the metric side, often, it's useful to know how busy a group > > > > coordinator is. By moving the event loop model, it seems that we could > > add > > > > a metric that tracks the fraction of the time the event loop is doing > > the > > > > actual work. > > > > > > That's a great idea. I will add it. Thanks. > > > > > > > 33. Could we add a section on coordinator failover handling? For > > example, > > > > does it need to trigger the check if any group with the wildcard > > > > subscription now has a new matching topic? > > > > > > Sure. When the new group coordinator takes over, it has to: > > > * Setup the session timeouts. > > > * Trigger a new assignment if a client side assignor is used. We don't > > > store the information about the member selected to run the assignment > > > so we have to start a new one. > > > * Update the topics metadata, verify the wildcard subscriptions, and > > > trigger a rebalance if needed. > > > > > > > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue, > > > > ConsumerGroupMemberMetadataValue: Could we document what the epoch > > field > > > > reflects? For example, does the epoch in ConsumerGroupMetadataValue >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hello David, One of Jun's comments make me thinking: ``` In this case, a new assignment is triggered by the client side assignor. When constructing the HB, the consumer will always consult the client side assignor and propagate the information to the group coordinator. In other words, we don't expect users to call Consumer#enforceRebalance anymore. ``` As I looked at the current PartitionAssignor's interface, we actually do not have a way yet to instruct how to construct the next HB request, e.g. when the assignor wants to enforce a new rebalance with a new assignment, we'd need some customizable APIs inside the PartitionAssignor to indicate the next HB telling broker about so. WDYT about adding such an API on the PartitionAssignor? Guozhang On Tue, Sep 6, 2022 at 6:09 AM David Jacot wrote: > Hi Jun, > > I have updated the KIP to include your feedback. I have also tried to > clarify the parts which were not cleared. > > Best, > David > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot wrote: > > > > Hi Jun, > > > > Thanks for your feedback. Let me start by answering your questions > > inline and I will update the KIP next week. > > > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to be > fewer > > > RPCs during rebalance and more efficient support of wildcard. A few > > > comments below. > > > > I would also add that the KIP removes the global sync barrier in the > > protocol which is essential to improve group stability and > > scalability, and the KIP also simplifies the client by moving most of > > the logic to the server side. > > > > > 30. ConsumerGroupHeartbeatRequest > > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling > changing > > > of the partition assignor in the consumers? > > > > Definitely. The group coordinator will use the assignor used by a > > majority of the members. This allows the group to move from one > > assignor to another by a roll. This is explained in the Assignor > > Selection chapter. > > > > > 30.2 For each field, could you explain whether it's required in every > > > request or the scenarios when it needs to be filled? For example, it's > not > > > clear to me when TopicPartitions needs to be filled. > > > > The client is expected to set those fields in case of a connection > > issue (e.g. timeout) or when the fields have changed since the last > > HB. The server populates those fields as long as the member is not > > fully reconciled - the member should acknowledge that it has the > > expected epoch and assignment. I will clarify this in the KIP. > > > > > 31. In the current consumer protocol, the rack affinity between the > client > > > and the broker is only considered during fetching, but not during > assigning > > > partitions to consumers. Sometimes, once the assignment is made, there > is > > > no opportunity for read affinity because no replicas of assigned > partitions > > > are close to the member. I am wondering if we should use this > opportunity > > > to address this by including rack in GroupMember. > > > > That's an interesting idea. I don't see any issue with adding the rack > > to the members. I will do so. > > > > > 32. On the metric side, often, it's useful to know how busy a group > > > coordinator is. By moving the event loop model, it seems that we could > add > > > a metric that tracks the fraction of the time the event loop is doing > the > > > actual work. > > > > That's a great idea. I will add it. Thanks. > > > > > 33. Could we add a section on coordinator failover handling? For > example, > > > does it need to trigger the check if any group with the wildcard > > > subscription now has a new matching topic? > > > > Sure. When the new group coordinator takes over, it has to: > > * Setup the session timeouts. > > * Trigger a new assignment if a client side assignor is used. We don't > > store the information about the member selected to run the assignment > > so we have to start a new one. > > * Update the topics metadata, verify the wildcard subscriptions, and > > trigger a rebalance if needed. > > > > > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue, > > > ConsumerGroupMemberMetadataValue: Could we document what the epoch > field > > > reflects? For example, does the epoch in ConsumerGroupMetadataValue > reflect > > > the latest group epoch? What about the one in > > > ConsumerGroupPartitionMetadataValue and > ConsumerGroupMemberMetadataValue? > > > > Sure. I will clarify that but it is always the latest group epoch. > > When the group state is updated, the group epoch is bumped so we use > > that one for all the change records related to the update. > > > > > 35. "the group coordinator will ensure that the following invariants > are > > > met: ... All members exists." It's possible for a member not to get any > > > assigned partitions, right? > > > > That's right. Here I meant that the members provided by the assignor > > in the assignment must exist in the group. The assignor can
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Jun, I have updated the KIP to include your feedback. I have also tried to clarify the parts which were not cleared. Best, David On Fri, Sep 2, 2022 at 4:18 PM David Jacot wrote: > > Hi Jun, > > Thanks for your feedback. Let me start by answering your questions > inline and I will update the KIP next week. > > > Thanks for the KIP. Overall, the main benefits of the KIP seem to be fewer > > RPCs during rebalance and more efficient support of wildcard. A few > > comments below. > > I would also add that the KIP removes the global sync barrier in the > protocol which is essential to improve group stability and > scalability, and the KIP also simplifies the client by moving most of > the logic to the server side. > > > 30. ConsumerGroupHeartbeatRequest > > 30.1 ServerAssignor is a singleton. Do we plan to support rolling changing > > of the partition assignor in the consumers? > > Definitely. The group coordinator will use the assignor used by a > majority of the members. This allows the group to move from one > assignor to another by a roll. This is explained in the Assignor > Selection chapter. > > > 30.2 For each field, could you explain whether it's required in every > > request or the scenarios when it needs to be filled? For example, it's not > > clear to me when TopicPartitions needs to be filled. > > The client is expected to set those fields in case of a connection > issue (e.g. timeout) or when the fields have changed since the last > HB. The server populates those fields as long as the member is not > fully reconciled - the member should acknowledge that it has the > expected epoch and assignment. I will clarify this in the KIP. > > > 31. In the current consumer protocol, the rack affinity between the client > > and the broker is only considered during fetching, but not during assigning > > partitions to consumers. Sometimes, once the assignment is made, there is > > no opportunity for read affinity because no replicas of assigned partitions > > are close to the member. I am wondering if we should use this opportunity > > to address this by including rack in GroupMember. > > That's an interesting idea. I don't see any issue with adding the rack > to the members. I will do so. > > > 32. On the metric side, often, it's useful to know how busy a group > > coordinator is. By moving the event loop model, it seems that we could add > > a metric that tracks the fraction of the time the event loop is doing the > > actual work. > > That's a great idea. I will add it. Thanks. > > > 33. Could we add a section on coordinator failover handling? For example, > > does it need to trigger the check if any group with the wildcard > > subscription now has a new matching topic? > > Sure. When the new group coordinator takes over, it has to: > * Setup the session timeouts. > * Trigger a new assignment if a client side assignor is used. We don't > store the information about the member selected to run the assignment > so we have to start a new one. > * Update the topics metadata, verify the wildcard subscriptions, and > trigger a rebalance if needed. > > > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue, > > ConsumerGroupMemberMetadataValue: Could we document what the epoch field > > reflects? For example, does the epoch in ConsumerGroupMetadataValue reflect > > the latest group epoch? What about the one in > > ConsumerGroupPartitionMetadataValue and ConsumerGroupMemberMetadataValue? > > Sure. I will clarify that but it is always the latest group epoch. > When the group state is updated, the group epoch is bumped so we use > that one for all the change records related to the update. > > > 35. "the group coordinator will ensure that the following invariants are > > met: ... All members exists." It's possible for a member not to get any > > assigned partitions, right? > > That's right. Here I meant that the members provided by the assignor > in the assignment must exist in the group. The assignor can not make > up new member ids. > > > 36. "He can rejoins the group with a member epoch equals to 0": When would > > a consumer rejoin and what member id would be used? > > A member is expected to abandon all its partitions and rejoins when it > receives the FENCED_MEMBER_EPOCH error. In this case, the group > coordinator will have removed the member from the group. The member > can rejoin the group with the same member id but with 0 as epoch. Let > me see if I can clarify this in the KIP. > > > 37. "Instead, power users will have the ability to trigger a reassignment > > by either providing a non-zero reason or by updating the assignor > > metadata." Hmm, this seems to be conflicting with the deprecation of > > Consumer#enforeRebalance. > > In this case, a new assignment is triggered by the client side > assignor. When constructing the HB, the consumer will always consult > the client side assignor and propagate the information to the group > coordinator. In other words, we don't expect users to call
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Jun, Thanks for your feedback. Let me start by answering your questions inline and I will update the KIP next week. > Thanks for the KIP. Overall, the main benefits of the KIP seem to be fewer > RPCs during rebalance and more efficient support of wildcard. A few > comments below. I would also add that the KIP removes the global sync barrier in the protocol which is essential to improve group stability and scalability, and the KIP also simplifies the client by moving most of the logic to the server side. > 30. ConsumerGroupHeartbeatRequest > 30.1 ServerAssignor is a singleton. Do we plan to support rolling changing > of the partition assignor in the consumers? Definitely. The group coordinator will use the assignor used by a majority of the members. This allows the group to move from one assignor to another by a roll. This is explained in the Assignor Selection chapter. > 30.2 For each field, could you explain whether it's required in every > request or the scenarios when it needs to be filled? For example, it's not > clear to me when TopicPartitions needs to be filled. The client is expected to set those fields in case of a connection issue (e.g. timeout) or when the fields have changed since the last HB. The server populates those fields as long as the member is not fully reconciled - the member should acknowledge that it has the expected epoch and assignment. I will clarify this in the KIP. > 31. In the current consumer protocol, the rack affinity between the client > and the broker is only considered during fetching, but not during assigning > partitions to consumers. Sometimes, once the assignment is made, there is > no opportunity for read affinity because no replicas of assigned partitions > are close to the member. I am wondering if we should use this opportunity > to address this by including rack in GroupMember. That's an interesting idea. I don't see any issue with adding the rack to the members. I will do so. > 32. On the metric side, often, it's useful to know how busy a group > coordinator is. By moving the event loop model, it seems that we could add > a metric that tracks the fraction of the time the event loop is doing the > actual work. That's a great idea. I will add it. Thanks. > 33. Could we add a section on coordinator failover handling? For example, > does it need to trigger the check if any group with the wildcard > subscription now has a new matching topic? Sure. When the new group coordinator takes over, it has to: * Setup the session timeouts. * Trigger a new assignment if a client side assignor is used. We don't store the information about the member selected to run the assignment so we have to start a new one. * Update the topics metadata, verify the wildcard subscriptions, and trigger a rebalance if needed. > 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue, > ConsumerGroupMemberMetadataValue: Could we document what the epoch field > reflects? For example, does the epoch in ConsumerGroupMetadataValue reflect > the latest group epoch? What about the one in > ConsumerGroupPartitionMetadataValue and ConsumerGroupMemberMetadataValue? Sure. I will clarify that but it is always the latest group epoch. When the group state is updated, the group epoch is bumped so we use that one for all the change records related to the update. > 35. "the group coordinator will ensure that the following invariants are > met: ... All members exists." It's possible for a member not to get any > assigned partitions, right? That's right. Here I meant that the members provided by the assignor in the assignment must exist in the group. The assignor can not make up new member ids. > 36. "He can rejoins the group with a member epoch equals to 0": When would > a consumer rejoin and what member id would be used? A member is expected to abandon all its partitions and rejoins when it receives the FENCED_MEMBER_EPOCH error. In this case, the group coordinator will have removed the member from the group. The member can rejoin the group with the same member id but with 0 as epoch. Let me see if I can clarify this in the KIP. > 37. "Instead, power users will have the ability to trigger a reassignment > by either providing a non-zero reason or by updating the assignor > metadata." Hmm, this seems to be conflicting with the deprecation of > Consumer#enforeRebalance. In this case, a new assignment is triggered by the client side assignor. When constructing the HB, the consumer will always consult the client side assignor and propagate the information to the group coordinator. In other words, we don't expect users to call Consumer#enforceRebalance anymore. > 38. The reassignment examples are nice. But the section seems to have > multiple typos. > 38.1 When the group transitions to epoch 2, B immediately gets into > "epoch=1, partitions=[foo-2]", which seems incorrect. > 38.2 When the group transitions to epoch 3, C seems to get into epoch=3, > partitions=[foo-1] too early. >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi, David, Thanks for the KIP. Overall, the main benefits of the KIP seem to be fewer RPCs during rebalance and more efficient support of wildcard. A few comments below. 30. ConsumerGroupHeartbeatRequest 30.1 ServerAssignor is a singleton. Do we plan to support rolling changing of the partition assignor in the consumers? 30.2 For each field, could you explain whether it's required in every request or the scenarios when it needs to be filled? For example, it's not clear to me when TopicPartitions needs to be filled. 31. In the current consumer protocol, the rack affinity between the client and the broker is only considered during fetching, but not during assigning partitions to consumers. Sometimes, once the assignment is made, there is no opportunity for read affinity because no replicas of assigned partitions are close to the member. I am wondering if we should use this opportunity to address this by including rack in GroupMember. 32. On the metric side, often, it's useful to know how busy a group coordinator is. By moving the event loop model, it seems that we could add a metric that tracks the fraction of the time the event loop is doing the actual work. 33. Could we add a section on coordinator failover handling? For example, does it need to trigger the check if any group with the wildcard subscription now has a new matching topic? 34. ConsumerGroupMetadataValue, ConsumerGroupPartitionMetadataValue, ConsumerGroupMemberMetadataValue: Could we document what the epoch field reflects? For example, does the epoch in ConsumerGroupMetadataValue reflect the latest group epoch? What about the one in ConsumerGroupPartitionMetadataValue and ConsumerGroupMemberMetadataValue? 35. "the group coordinator will ensure that the following invariants are met: ... All members exists." It's possible for a member not to get any assigned partitions, right? 36. "He can rejoins the group with a member epoch equals to 0": When would a consumer rejoin and what member id would be used? 37. "Instead, power users will have the ability to trigger a reassignment by either providing a non-zero reason or by updating the assignor metadata." Hmm, this seems to be conflicting with the deprecation of Consumer#enforeRebalance. 38. The reassignment examples are nice. But the section seems to have multiple typos. 38.1 When the group transitions to epoch 2, B immediately gets into "epoch=1, partitions=[foo-2]", which seems incorrect. 38.2 When the group transitions to epoch 3, C seems to get into epoch=3, partitions=[foo-1] too early. 38.3 After A transitions to epoch 3, C still has A - epoch=2, partitions=[foo-0]. 39. Rolling upgrade of consumers: Do we support the upgrade from any old version to new one? Thanks, Jun On Mon, Aug 29, 2022 at 9:20 AM David Jacot wrote: > Hi all, > > The KIP states that we will re-implement the coordinator in Java. I > discussed this offline with a few folks and folks are concerned that > we could introduce many regressions in the old protocol if we do so. > Therefore, I am going to remove this statement from the KIP. It is an > implementation detail after all so it does not have to be decided at > this stage. We will likely start by trying to refactor the current > implementation as a first step. > > Cheers, > David > > On Mon, Aug 29, 2022 at 3:52 PM David Jacot wrote: > > > > Hi Luke, > > > > > 1.1. I think the state machine are: "Empty, assigning, reconciling, > stable, > > > dead" mentioned in Consumer Group States section, right? > > > > This sentence does not refer to those group states but rather to a > > state machine replication (SMR). This refers to the entire state of > > group coordinator which is replicated via the log layer. I will > > clarify this in the KIP. > > > > > 1.2. What do you mean "each state machine is modelled as an event > loop"? > > > > The idea is to follow a model similar to the new quorum controller. We > > will have N threads to process events. Each __consumer_offsets > > partition is assigned to a unique thread and all the events (e.g. > > requests, callbacks, etc.) are processed by this thread. This simplify > > concurrency and will enable us to do simulation testing for the group > > coordinator. > > > > > 1.3. Why do we need a state machine per *__consumer_offsets* > partitions? > > > Not a state machine "per consumer group" owned by a group coordinator? > For > > > example, if one group coordinator owns 2 consumer groups, and both > exist in > > > *__consumer_offsets-0*, will we have 1 state machine for it, or 2? > > > > See 1.1. The confusion comes from there, I think. > > > > > 1.4. I know the "*group.coordinator.threads" *is the number of threads > used > > > to run the state machines. But I'm wondering if the purpose of the > threads > > > is only to keep the state of each consumer group (or > *__consumer_offsets* > > > partitions?), and no heavy computation, why should we need > multi-threads > > > here? > > > > See 1.2. The idea is to have an ability
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi all, The KIP states that we will re-implement the coordinator in Java. I discussed this offline with a few folks and folks are concerned that we could introduce many regressions in the old protocol if we do so. Therefore, I am going to remove this statement from the KIP. It is an implementation detail after all so it does not have to be decided at this stage. We will likely start by trying to refactor the current implementation as a first step. Cheers, David On Mon, Aug 29, 2022 at 3:52 PM David Jacot wrote: > > Hi Luke, > > > 1.1. I think the state machine are: "Empty, assigning, reconciling, stable, > > dead" mentioned in Consumer Group States section, right? > > This sentence does not refer to those group states but rather to a > state machine replication (SMR). This refers to the entire state of > group coordinator which is replicated via the log layer. I will > clarify this in the KIP. > > > 1.2. What do you mean "each state machine is modelled as an event loop"? > > The idea is to follow a model similar to the new quorum controller. We > will have N threads to process events. Each __consumer_offsets > partition is assigned to a unique thread and all the events (e.g. > requests, callbacks, etc.) are processed by this thread. This simplify > concurrency and will enable us to do simulation testing for the group > coordinator. > > > 1.3. Why do we need a state machine per *__consumer_offsets* partitions? > > Not a state machine "per consumer group" owned by a group coordinator? For > > example, if one group coordinator owns 2 consumer groups, and both exist in > > *__consumer_offsets-0*, will we have 1 state machine for it, or 2? > > See 1.1. The confusion comes from there, I think. > > > 1.4. I know the "*group.coordinator.threads" *is the number of threads used > > to run the state machines. But I'm wondering if the purpose of the threads > > is only to keep the state of each consumer group (or *__consumer_offsets* > > partitions?), and no heavy computation, why should we need multi-threads > > here? > > See 1.2. The idea is to have an ability to shard the processing as the > computation could be heavy. > > > 2.1. The consumer session timeout, why does the default session timeout not > > locate between min (45s) and max(60s)? I thought the min/max session > > timeout is to define lower/upper bound of it, no? > > > > group.consumer.session.timeout.ms int 30s The timeout to detect client > > failures when using the consumer group protocol. > > group.consumer.min.session.timeout.ms int 45s The minimum session timeout. > > group.consumer.max.session.timeout.ms int 60s The maximum session timeout. > > This is indeed a mistake. The default session timeout should be 45s > (the current default). > > > 2.2. The default server side assignor are [range, uniform], which means > > we'll default to "range" assignor. I'd like to know why not uniform one? I > > thought usually users will choose uniform assignor (former sticky assinor) > > for better evenly distribution. Any other reason we choose range assignor > > as default? > > group.consumer.assignors List range, uniform The server side assignors. > > The order on the server side has no influence because the client must > chose the selector that he wants to use. There is no default in the > current proposal. If the assignor is not specified by the client, the > request is rejected. The default client value for > `group.remote.assignor` is `uniform` though. > > Thanks for your very good comments, Luke. I hope that my answers help > to clarify things. I will update the KIP as well based on your > feedback. > > Cheers, > David > > On Mon, Aug 22, 2022 at 9:29 AM Luke Chen wrote: > > > > Hi David, > > > > Thanks for the update. > > > > Some more questions: > > 1. In Group Coordinator section, you mentioned: > > > The new group coordinator will have a state machine per > > *__consumer_offsets* partitions, where each state machine is modelled as an > > event loop. Those state machines will be executed in > > *group.coordinator.threads* threads. > > > > 1.1. I think the state machine are: "Empty, assigning, reconciling, stable, > > dead" mentioned in Consumer Group States section, right? > > 1.2. What do you mean "each state machine is modelled as an event loop"? > > 1.3. Why do we need a state machine per *__consumer_offsets* partitions? > > Not a state machine "per consumer group" owned by a group coordinator? For > > example, if one group coordinator owns 2 consumer groups, and both exist in > > *__consumer_offsets-0*, will we have 1 state machine for it, or 2? > > 1.4. I know the "*group.coordinator.threads" *is the number of threads used > > to run the state machines. But I'm wondering if the purpose of the threads > > is only to keep the state of each consumer group (or *__consumer_offsets* > > partitions?), and no heavy computation, why should we need multi-threads > > here? > > > > 2. For the default value in the new configs: > > 2.1. The consumer
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Luke, > 1.1. I think the state machine are: "Empty, assigning, reconciling, stable, > dead" mentioned in Consumer Group States section, right? This sentence does not refer to those group states but rather to a state machine replication (SMR). This refers to the entire state of group coordinator which is replicated via the log layer. I will clarify this in the KIP. > 1.2. What do you mean "each state machine is modelled as an event loop"? The idea is to follow a model similar to the new quorum controller. We will have N threads to process events. Each __consumer_offsets partition is assigned to a unique thread and all the events (e.g. requests, callbacks, etc.) are processed by this thread. This simplify concurrency and will enable us to do simulation testing for the group coordinator. > 1.3. Why do we need a state machine per *__consumer_offsets* partitions? > Not a state machine "per consumer group" owned by a group coordinator? For > example, if one group coordinator owns 2 consumer groups, and both exist in > *__consumer_offsets-0*, will we have 1 state machine for it, or 2? See 1.1. The confusion comes from there, I think. > 1.4. I know the "*group.coordinator.threads" *is the number of threads used > to run the state machines. But I'm wondering if the purpose of the threads > is only to keep the state of each consumer group (or *__consumer_offsets* > partitions?), and no heavy computation, why should we need multi-threads > here? See 1.2. The idea is to have an ability to shard the processing as the computation could be heavy. > 2.1. The consumer session timeout, why does the default session timeout not > locate between min (45s) and max(60s)? I thought the min/max session > timeout is to define lower/upper bound of it, no? > > group.consumer.session.timeout.ms int 30s The timeout to detect client > failures when using the consumer group protocol. > group.consumer.min.session.timeout.ms int 45s The minimum session timeout. > group.consumer.max.session.timeout.ms int 60s The maximum session timeout. This is indeed a mistake. The default session timeout should be 45s (the current default). > 2.2. The default server side assignor are [range, uniform], which means > we'll default to "range" assignor. I'd like to know why not uniform one? I > thought usually users will choose uniform assignor (former sticky assinor) > for better evenly distribution. Any other reason we choose range assignor > as default? > group.consumer.assignors List range, uniform The server side assignors. The order on the server side has no influence because the client must chose the selector that he wants to use. There is no default in the current proposal. If the assignor is not specified by the client, the request is rejected. The default client value for `group.remote.assignor` is `uniform` though. Thanks for your very good comments, Luke. I hope that my answers help to clarify things. I will update the KIP as well based on your feedback. Cheers, David On Mon, Aug 22, 2022 at 9:29 AM Luke Chen wrote: > > Hi David, > > Thanks for the update. > > Some more questions: > 1. In Group Coordinator section, you mentioned: > > The new group coordinator will have a state machine per > *__consumer_offsets* partitions, where each state machine is modelled as an > event loop. Those state machines will be executed in > *group.coordinator.threads* threads. > > 1.1. I think the state machine are: "Empty, assigning, reconciling, stable, > dead" mentioned in Consumer Group States section, right? > 1.2. What do you mean "each state machine is modelled as an event loop"? > 1.3. Why do we need a state machine per *__consumer_offsets* partitions? > Not a state machine "per consumer group" owned by a group coordinator? For > example, if one group coordinator owns 2 consumer groups, and both exist in > *__consumer_offsets-0*, will we have 1 state machine for it, or 2? > 1.4. I know the "*group.coordinator.threads" *is the number of threads used > to run the state machines. But I'm wondering if the purpose of the threads > is only to keep the state of each consumer group (or *__consumer_offsets* > partitions?), and no heavy computation, why should we need multi-threads > here? > > 2. For the default value in the new configs: > 2.1. The consumer session timeout, why does the default session timeout not > locate between min (45s) and max(60s)? I thought the min/max session > timeout is to define lower/upper bound of it, no? > > group.consumer.session.timeout.ms int 30s The timeout to detect client > failures when using the consumer group protocol. > group.consumer.min.session.timeout.ms int 45s The minimum session timeout. > group.consumer.max.session.timeout.ms int 60s The maximum session timeout. > > > > 2.2. The default server side assignor are [range, uniform], which means > we'll default to "range" assignor. I'd like to know why not uniform one? I > thought usually users will choose uniform assignor (former sticky assinor) > for
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi All, As per the suggestions from David/Guozhang above, I updated the page for Connect to have it's own set of APIs and not extend the ones from consumer. Plz review again. @Luke, Thank you. I am actually looking for review comments/thoughts/suggestions as Connect changes are still in draft stage but since this is a wholesome change, I am sure there might be many cases missed by me. Also, on this discussion thread David suggested to continue discussing on this very thread to ensure connect is compatible with whatever is done in KIP-848. I am ok either way. @Guozhang, Regarding the migration plan and the link to KIP-415 that you had shared, I had taken a look at it. From what I understood, that migration/downgrading is from switching the assignment protocol from eager -> incremental or vice versa. In this case, the migration is to the new rebalance protocol which would also depend on the group coordinator and whether they support the new rebalance protocol or not. I have tried to incorporate this aspect in the draft page. Thanks! Sagar. On Mon, Aug 22, 2022 at 1:00 PM Luke Chen wrote: > Hi David, > > Thanks for the update. > > Some more questions: > 1. In Group Coordinator section, you mentioned: > > The new group coordinator will have a state machine per > *__consumer_offsets* partitions, where each state machine is modelled as an > event loop. Those state machines will be executed in > *group.coordinator.threads* threads. > > 1.1. I think the state machine are: "Empty, assigning, reconciling, stable, > dead" mentioned in Consumer Group States section, right? > 1.2. What do you mean "each state machine is modelled as an event loop"? > 1.3. Why do we need a state machine per *__consumer_offsets* partitions? > Not a state machine "per consumer group" owned by a group coordinator? For > example, if one group coordinator owns 2 consumer groups, and both exist in > *__consumer_offsets-0*, will we have 1 state machine for it, or 2? > 1.4. I know the "*group.coordinator.threads" *is the number of threads used > to run the state machines. But I'm wondering if the purpose of the threads > is only to keep the state of each consumer group (or *__consumer_offsets* > partitions?), and no heavy computation, why should we need multi-threads > here? > > 2. For the default value in the new configs: > 2.1. The consumer session timeout, why does the default session timeout not > locate between min (45s) and max(60s)? I thought the min/max session > timeout is to define lower/upper bound of it, no? > > group.consumer.session.timeout.ms int 30s The timeout to detect client > failures when using the consumer group protocol. > group.consumer.min.session.timeout.ms int 45s The minimum session timeout. > group.consumer.max.session.timeout.ms int 60s The maximum session timeout. > > > > 2.2. The default server side assignor are [range, uniform], which means > we'll default to "range" assignor. I'd like to know why not uniform one? I > thought usually users will choose uniform assignor (former sticky assinor) > for better evenly distribution. Any other reason we choose range assignor > as default? > group.consumer.assignors List range, uniform The server side assignors. > > > > > > > Thank you. > Luke > > > > > > > On Mon, Aug 22, 2022 at 2:10 PM Luke Chen wrote: > > > Hi Sagar, > > > > I have some thoughts about Kafka Connect integrating with KIP-848, but I > > think we should have a separate discussion thread for the Kafka Connect > > KIP: Integrating Kafka Connect With New Consumer Rebalance Protocol [1], > > and let this discussion thread focus on consumer rebalance protocol, > WDYT? > > > > [1] > > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > > > Thank you. > > Luke > > > > > > > > On Fri, Aug 12, 2022 at 9:31 PM Sagar wrote: > > > >> Thank you Guozhang/David for the feedback. Looks like there's agreement > on > >> using separate APIs for Connect. I would revisit the doc and see what > >> changes are to be made. > >> > >> Thanks! > >> Sagar. > >> > >> On Tue, Aug 9, 2022 at 7:11 PM David Jacot > > >> wrote: > >> > >> > Hi Sagar, > >> > > >> > Thanks for the feedback and the document. That's really helpful. I > >> > will take a look at it. > >> > > >> > Overall, it seems to me that both Connect and the Consumer could share > >> > the same underlying "engine". The main difference is that the Consumer > >> > assigns topic-partitions to members whereas Connect assigns tasks to > >> > workers. I see two ways to move forward: > >> > 1) We extend the new proposed APIs to support different resource types > >> > (e.g. partitions, tasks, etc.); or > >> > 2) We use new dedicated APIs for Connect. The dedicated APIs would be > >> > similar to the new ones but different on the content/resources and > >> > they would rely on the same engine on the coordinator side. > >> > > >> > I personally lean towards 2) because I am not a fan of
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi David, Thanks for the update. Some more questions: 1. In Group Coordinator section, you mentioned: > The new group coordinator will have a state machine per *__consumer_offsets* partitions, where each state machine is modelled as an event loop. Those state machines will be executed in *group.coordinator.threads* threads. 1.1. I think the state machine are: "Empty, assigning, reconciling, stable, dead" mentioned in Consumer Group States section, right? 1.2. What do you mean "each state machine is modelled as an event loop"? 1.3. Why do we need a state machine per *__consumer_offsets* partitions? Not a state machine "per consumer group" owned by a group coordinator? For example, if one group coordinator owns 2 consumer groups, and both exist in *__consumer_offsets-0*, will we have 1 state machine for it, or 2? 1.4. I know the "*group.coordinator.threads" *is the number of threads used to run the state machines. But I'm wondering if the purpose of the threads is only to keep the state of each consumer group (or *__consumer_offsets* partitions?), and no heavy computation, why should we need multi-threads here? 2. For the default value in the new configs: 2.1. The consumer session timeout, why does the default session timeout not locate between min (45s) and max(60s)? I thought the min/max session timeout is to define lower/upper bound of it, no? group.consumer.session.timeout.ms int 30s The timeout to detect client failures when using the consumer group protocol. group.consumer.min.session.timeout.ms int 45s The minimum session timeout. group.consumer.max.session.timeout.ms int 60s The maximum session timeout. 2.2. The default server side assignor are [range, uniform], which means we'll default to "range" assignor. I'd like to know why not uniform one? I thought usually users will choose uniform assignor (former sticky assinor) for better evenly distribution. Any other reason we choose range assignor as default? group.consumer.assignors List range, uniform The server side assignors. Thank you. Luke On Mon, Aug 22, 2022 at 2:10 PM Luke Chen wrote: > Hi Sagar, > > I have some thoughts about Kafka Connect integrating with KIP-848, but I > think we should have a separate discussion thread for the Kafka Connect > KIP: Integrating Kafka Connect With New Consumer Rebalance Protocol [1], > and let this discussion thread focus on consumer rebalance protocol, WDYT? > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > Thank you. > Luke > > > > On Fri, Aug 12, 2022 at 9:31 PM Sagar wrote: > >> Thank you Guozhang/David for the feedback. Looks like there's agreement on >> using separate APIs for Connect. I would revisit the doc and see what >> changes are to be made. >> >> Thanks! >> Sagar. >> >> On Tue, Aug 9, 2022 at 7:11 PM David Jacot >> wrote: >> >> > Hi Sagar, >> > >> > Thanks for the feedback and the document. That's really helpful. I >> > will take a look at it. >> > >> > Overall, it seems to me that both Connect and the Consumer could share >> > the same underlying "engine". The main difference is that the Consumer >> > assigns topic-partitions to members whereas Connect assigns tasks to >> > workers. I see two ways to move forward: >> > 1) We extend the new proposed APIs to support different resource types >> > (e.g. partitions, tasks, etc.); or >> > 2) We use new dedicated APIs for Connect. The dedicated APIs would be >> > similar to the new ones but different on the content/resources and >> > they would rely on the same engine on the coordinator side. >> > >> > I personally lean towards 2) because I am not a fan of overcharging >> > APIs to serve different purposes. That being said, I am not opposed to >> > 1) if we can find an elegant way to do it. >> > >> > I think that we can continue to discuss it here for now in order to >> > ensure that this KIP is compatible with what we will do for Connect in >> > the future. >> > >> > Best, >> > David >> > >> > On Mon, Aug 8, 2022 at 2:41 PM David Jacot wrote: >> > > >> > > Hi all, >> > > >> > > I am back from vacation. I will go through and address your comments >> > > in the coming days. Thanks for your feedback. >> > > >> > > Cheers, >> > > David >> > > >> > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris > > >> > wrote: >> > > > >> > > > Hey All! >> > > > >> > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing >> > making it >> > > > down the stack! >> > > > >> > > > I had a few questions: >> > > > >> > > > 1. The 'Rejected Alternatives' section describes how member epoch >> > should >> > > > advance in step with the group epoch and assignment epoch values. I >> > think >> > > > that this is a good idea for the reasons described in the KIP. When >> the >> > > > protocol is incrementally assigning partitions to a worker, what >> member >> > > > epoch does each incremental assignment use? Are member epochs >> re-used, >> > and >> > > >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Sagar, I have some thoughts about Kafka Connect integrating with KIP-848, but I think we should have a separate discussion thread for the Kafka Connect KIP: Integrating Kafka Connect With New Consumer Rebalance Protocol [1], and let this discussion thread focus on consumer rebalance protocol, WDYT? [1] https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol Thank you. Luke On Fri, Aug 12, 2022 at 9:31 PM Sagar wrote: > Thank you Guozhang/David for the feedback. Looks like there's agreement on > using separate APIs for Connect. I would revisit the doc and see what > changes are to be made. > > Thanks! > Sagar. > > On Tue, Aug 9, 2022 at 7:11 PM David Jacot > wrote: > > > Hi Sagar, > > > > Thanks for the feedback and the document. That's really helpful. I > > will take a look at it. > > > > Overall, it seems to me that both Connect and the Consumer could share > > the same underlying "engine". The main difference is that the Consumer > > assigns topic-partitions to members whereas Connect assigns tasks to > > workers. I see two ways to move forward: > > 1) We extend the new proposed APIs to support different resource types > > (e.g. partitions, tasks, etc.); or > > 2) We use new dedicated APIs for Connect. The dedicated APIs would be > > similar to the new ones but different on the content/resources and > > they would rely on the same engine on the coordinator side. > > > > I personally lean towards 2) because I am not a fan of overcharging > > APIs to serve different purposes. That being said, I am not opposed to > > 1) if we can find an elegant way to do it. > > > > I think that we can continue to discuss it here for now in order to > > ensure that this KIP is compatible with what we will do for Connect in > > the future. > > > > Best, > > David > > > > On Mon, Aug 8, 2022 at 2:41 PM David Jacot wrote: > > > > > > Hi all, > > > > > > I am back from vacation. I will go through and address your comments > > > in the coming days. Thanks for your feedback. > > > > > > Cheers, > > > David > > > > > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris > > wrote: > > > > > > > > Hey All! > > > > > > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing > > making it > > > > down the stack! > > > > > > > > I had a few questions: > > > > > > > > 1. The 'Rejected Alternatives' section describes how member epoch > > should > > > > advance in step with the group epoch and assignment epoch values. I > > think > > > > that this is a good idea for the reasons described in the KIP. When > the > > > > protocol is incrementally assigning partitions to a worker, what > member > > > > epoch does each incremental assignment use? Are member epochs > re-used, > > and > > > > a single member epoch can correspond to multiple different > > (monotonically > > > > larger) assignments? > > > > > > > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? > If > > > > not, should custom client-side assignor implementations interact with > > the > > > > Reason field, and how is its common meaning agreed upon? If so, what > > is the > > > > benefit of a distinct Reason field over including such functionality > > in the > > > > opaque metadata? > > > > > > > > 3. The following is included in the KIP: "Thanks to this, the input > of > > the > > > > client side assignor is entirely driven by the group coordinator. The > > > > consumer is no longer responsible for maintaining any state besides > its > > > > assigned partitions." Does this mean that the client-side assignor > MAY > > > > incorporate additional non-Metadata state (such as partition > > throughput, > > > > cpu/memory metrics, config topics, etc), or that additional > > non-Metadata > > > > state SHOULD NOT be used? > > > > > > > > 4. I see that there are separate classes > > > > for org.apache.kafka.server.group.consumer.PartitionAssignor > > > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to > > > > overlap significantly. Is it possible for these two implementations > to > > be > > > > unified? This would serve to promote feature parity of server-side > and > > > > client-side assignors, and would also facilitate operational > > flexibility in > > > > certain situations. For example, if a server-side assignor has some > > poor > > > > behavior and needs a patch, deploying the patched assignor to the > > client > > > > and switching one consumer group to a client-side assignor may be > > faster > > > > and less risky than patching all of the brokers. With the currently > > > > proposed distinct APIs, a non-trivial reimplementation would have to > be > > > > assembled, and if the two APIs have diverged significantly, then it > is > > > > possible that a reimplementation would not be possible. > > > > > > > > -- > > > > Greg Harris > > > > gharris1...@gmail.com > > > > github.com/gharris1727 > > > > > > > > On Wed, Aug 3, 2022 at 8:39 AM Sagar > >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thank you Guozhang/David for the feedback. Looks like there's agreement on using separate APIs for Connect. I would revisit the doc and see what changes are to be made. Thanks! Sagar. On Tue, Aug 9, 2022 at 7:11 PM David Jacot wrote: > Hi Sagar, > > Thanks for the feedback and the document. That's really helpful. I > will take a look at it. > > Overall, it seems to me that both Connect and the Consumer could share > the same underlying "engine". The main difference is that the Consumer > assigns topic-partitions to members whereas Connect assigns tasks to > workers. I see two ways to move forward: > 1) We extend the new proposed APIs to support different resource types > (e.g. partitions, tasks, etc.); or > 2) We use new dedicated APIs for Connect. The dedicated APIs would be > similar to the new ones but different on the content/resources and > they would rely on the same engine on the coordinator side. > > I personally lean towards 2) because I am not a fan of overcharging > APIs to serve different purposes. That being said, I am not opposed to > 1) if we can find an elegant way to do it. > > I think that we can continue to discuss it here for now in order to > ensure that this KIP is compatible with what we will do for Connect in > the future. > > Best, > David > > On Mon, Aug 8, 2022 at 2:41 PM David Jacot wrote: > > > > Hi all, > > > > I am back from vacation. I will go through and address your comments > > in the coming days. Thanks for your feedback. > > > > Cheers, > > David > > > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris > wrote: > > > > > > Hey All! > > > > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing > making it > > > down the stack! > > > > > > I had a few questions: > > > > > > 1. The 'Rejected Alternatives' section describes how member epoch > should > > > advance in step with the group epoch and assignment epoch values. I > think > > > that this is a good idea for the reasons described in the KIP. When the > > > protocol is incrementally assigning partitions to a worker, what member > > > epoch does each incremental assignment use? Are member epochs re-used, > and > > > a single member epoch can correspond to multiple different > (monotonically > > > larger) assignments? > > > > > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > > > not, should custom client-side assignor implementations interact with > the > > > Reason field, and how is its common meaning agreed upon? If so, what > is the > > > benefit of a distinct Reason field over including such functionality > in the > > > opaque metadata? > > > > > > 3. The following is included in the KIP: "Thanks to this, the input of > the > > > client side assignor is entirely driven by the group coordinator. The > > > consumer is no longer responsible for maintaining any state besides its > > > assigned partitions." Does this mean that the client-side assignor MAY > > > incorporate additional non-Metadata state (such as partition > throughput, > > > cpu/memory metrics, config topics, etc), or that additional > non-Metadata > > > state SHOULD NOT be used? > > > > > > 4. I see that there are separate classes > > > for org.apache.kafka.server.group.consumer.PartitionAssignor > > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to > > > overlap significantly. Is it possible for these two implementations to > be > > > unified? This would serve to promote feature parity of server-side and > > > client-side assignors, and would also facilitate operational > flexibility in > > > certain situations. For example, if a server-side assignor has some > poor > > > behavior and needs a patch, deploying the patched assignor to the > client > > > and switching one consumer group to a client-side assignor may be > faster > > > and less risky than patching all of the brokers. With the currently > > > proposed distinct APIs, a non-trivial reimplementation would have to be > > > assembled, and if the two APIs have diverged significantly, then it is > > > possible that a reimplementation would not be possible. > > > > > > -- > > > Greg Harris > > > gharris1...@gmail.com > > > github.com/gharris1727 > > > > > > On Wed, Aug 3, 2022 at 8:39 AM Sagar > wrote: > > > > > > > Hi Guozhang/David, > > > > > > > > I created a confluence page to discuss how Connect would need to > change > > > > based on the new rebalance protocol. Here's the page: > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > > > > > > > It's also pretty longish and I have tried to keep a format similar to > > > > KIP-848. Let me know what you think. Also, do you think this should > be > > > > moved to a separate discussion thread or is this one fine? > > > > > > > > Thanks! > > > > Sagar. > > > > > > > > On Tue, Jul 26, 2022 at 7:37 AM Sagar > wrote: > > > > > > > > > Hello Guozhang, > > > > > > > > > > Thank you so much for the
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi all, Thanks again for all the very good feedback so far. I have addressed, I think, all the outstanding points. Here are the main updates: * I have introduced STALE_MEMBER_EPOCH to differentiate it from the fatal FENCED_MEMBER_EPOCH error. * I have introduced topic ids in the OffsetCommit/OffsetFetch APIs. This allows the Consumer to use Topic IDs everywhere and thus gives it stronger guarantees. I have also added a few words about topic recreation in this context. * After discussing with Jason and Guozhang offline, I have decided to follow Ismael's advice to store dynamic group configuration in the controller. This is simpler with the IncrementalAlterConfigs API and decouple the life cycle of the configs from the life cycle of the groups. * I have introduced a new Pattern class to model the new regular expressions syntax (Google RE2/J) and differentiate them from the java.util.regex.Pattern ones. Simple regular expressions should work in both cases but more advances won't so it seems preferable, from a user point of view, to be clear and to differentiate them. Please take another look and let me know what you think. Best, David On Wed, Aug 10, 2022 at 2:36 AM Guozhang Wang wrote: > > Hello Greg, > > Thanks for reviewing the KIP! I hope David has addressed your questions > above, and I'd like to just emphasize one thing regarding your question 1): > one principle of the reconciliation is that, for a single client, before it > has completely revoked all the partitions that it should revoke, the > coordinator would not assign new partitions to it even if those partitions > have already been revoked by the old host and hence are available. And when > it has revoked all its partitions, the coordinator would start giving it > new partitions while at the same time bumping its epoch. Of course in > practice, a client should try to always revoke all its should-be-revoked > partitions together in a single shot which is more efficient, but from the > point view of the coordinator it's always "first revoke all you should > revoke, and then I will bump your epoch up and start giving you more > partitions". Though this may not be most efficient since coordinators maybe > holding on some available-partitions-to-give to the client before the > client has complete revoking, it gives us a good barrier (hence the epoch > bumping) to determine if the client can continue getting new partitions > even after soft failures or network partitions. > > > Guozhang > > On Wed, Aug 3, 2022 at 1:05 PM Gregory Harris wrote: > > > Hey All! > > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing making it > > down the stack! > > > > I had a few questions: > > > > 1. The 'Rejected Alternatives' section describes how member epoch should > > advance in step with the group epoch and assignment epoch values. I think > > that this is a good idea for the reasons described in the KIP. When the > > protocol is incrementally assigning partitions to a worker, what member > > epoch does each incremental assignment use? Are member epochs re-used, and > > a single member epoch can correspond to multiple different (monotonically > > larger) assignments? > > > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > > not, should custom client-side assignor implementations interact with the > > Reason field, and how is its common meaning agreed upon? If so, what is the > > benefit of a distinct Reason field over including such functionality in the > > opaque metadata? > > > > 3. The following is included in the KIP: "Thanks to this, the input of the > > client side assignor is entirely driven by the group coordinator. The > > consumer is no longer responsible for maintaining any state besides its > > assigned partitions." Does this mean that the client-side assignor MAY > > incorporate additional non-Metadata state (such as partition throughput, > > cpu/memory metrics, config topics, etc), or that additional non-Metadata > > state SHOULD NOT be used? > > > > 4. I see that there are separate classes > > for org.apache.kafka.server.group.consumer.PartitionAssignor > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to > > overlap significantly. Is it possible for these two implementations to be > > unified? This would serve to promote feature parity of server-side and > > client-side assignors, and would also facilitate operational flexibility in > > certain situations. For example, if a server-side assignor has some poor > > behavior and needs a patch, deploying the patched assignor to the client > > and switching one consumer group to a client-side assignor may be faster > > and less risky than patching all of the brokers. With the currently > > proposed distinct APIs, a non-trivial reimplementation would have to be > > assembled, and if the two APIs have diverged significantly, then it is > > possible that a reimplementation would not be possible. > > > > -- > > Greg Harris > >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hello Greg, Thanks for reviewing the KIP! I hope David has addressed your questions above, and I'd like to just emphasize one thing regarding your question 1): one principle of the reconciliation is that, for a single client, before it has completely revoked all the partitions that it should revoke, the coordinator would not assign new partitions to it even if those partitions have already been revoked by the old host and hence are available. And when it has revoked all its partitions, the coordinator would start giving it new partitions while at the same time bumping its epoch. Of course in practice, a client should try to always revoke all its should-be-revoked partitions together in a single shot which is more efficient, but from the point view of the coordinator it's always "first revoke all you should revoke, and then I will bump your epoch up and start giving you more partitions". Though this may not be most efficient since coordinators maybe holding on some available-partitions-to-give to the client before the client has complete revoking, it gives us a good barrier (hence the epoch bumping) to determine if the client can continue getting new partitions even after soft failures or network partitions. Guozhang On Wed, Aug 3, 2022 at 1:05 PM Gregory Harris wrote: > Hey All! > > Thanks for the KIP, it's wonderful to see cooperative rebalancing making it > down the stack! > > I had a few questions: > > 1. The 'Rejected Alternatives' section describes how member epoch should > advance in step with the group epoch and assignment epoch values. I think > that this is a good idea for the reasons described in the KIP. When the > protocol is incrementally assigning partitions to a worker, what member > epoch does each incremental assignment use? Are member epochs re-used, and > a single member epoch can correspond to multiple different (monotonically > larger) assignments? > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > not, should custom client-side assignor implementations interact with the > Reason field, and how is its common meaning agreed upon? If so, what is the > benefit of a distinct Reason field over including such functionality in the > opaque metadata? > > 3. The following is included in the KIP: "Thanks to this, the input of the > client side assignor is entirely driven by the group coordinator. The > consumer is no longer responsible for maintaining any state besides its > assigned partitions." Does this mean that the client-side assignor MAY > incorporate additional non-Metadata state (such as partition throughput, > cpu/memory metrics, config topics, etc), or that additional non-Metadata > state SHOULD NOT be used? > > 4. I see that there are separate classes > for org.apache.kafka.server.group.consumer.PartitionAssignor > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to > overlap significantly. Is it possible for these two implementations to be > unified? This would serve to promote feature parity of server-side and > client-side assignors, and would also facilitate operational flexibility in > certain situations. For example, if a server-side assignor has some poor > behavior and needs a patch, deploying the patched assignor to the client > and switching one consumer group to a client-side assignor may be faster > and less risky than patching all of the brokers. With the currently > proposed distinct APIs, a non-trivial reimplementation would have to be > assembled, and if the two APIs have diverged significantly, then it is > possible that a reimplementation would not be possible. > > -- > Greg Harris > gharris1...@gmail.com > github.com/gharris1727 > > On Wed, Aug 3, 2022 at 8:39 AM Sagar wrote: > > > Hi Guozhang/David, > > > > I created a confluence page to discuss how Connect would need to change > > based on the new rebalance protocol. Here's the page: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > > > It's also pretty longish and I have tried to keep a format similar to > > KIP-848. Let me know what you think. Also, do you think this should be > > moved to a separate discussion thread or is this one fine? > > > > Thanks! > > Sagar. > > > > On Tue, Jul 26, 2022 at 7:37 AM Sagar wrote: > > > > > Hello Guozhang, > > > > > > Thank you so much for the doc on Kafka Streams. Sure, I would do the > > > analysis and come up with such a document. > > > > > > Thanks! > > > Sagar. > > > > > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang > > wrote: > > > > > >> Hello Sagar, > > >> > > >> It would be great if you could come back with some analysis on how to > > >> implement the Connect side integration with the new protocol; so far > > >> besides leveraging on the new "protocol type" we did not yet think > > through > > >> the Connect side implementations. For Streams here's a draft of > > >> integration > > >> plan: > > >> > > >> > > >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thanks Sagar, I made a quick read on your doc, and am also leaning towards having a separate API for connect. Would also love to hear @Randall Hauch @Konstantin 's opinion about that. One thing is that for Kafka Connect, their compatibility story, especially downgrading is more flexible than consumer (you can find that from https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect) so maybe we need to think a bit differently for the compatibility. Guozhang On Wed, Aug 3, 2022 at 5:39 AM Sagar wrote: > Hi Guozhang/David, > > I created a confluence page to discuss how Connect would need to change > based on the new rebalance protocol. Here's the page: > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > It's also pretty longish and I have tried to keep a format similar to > KIP-848. Let me know what you think. Also, do you think this should be > moved to a separate discussion thread or is this one fine? > > Thanks! > Sagar. > > On Tue, Jul 26, 2022 at 7:37 AM Sagar wrote: > > > Hello Guozhang, > > > > Thank you so much for the doc on Kafka Streams. Sure, I would do the > > analysis and come up with such a document. > > > > Thanks! > > Sagar. > > > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang > wrote: > > > >> Hello Sagar, > >> > >> It would be great if you could come back with some analysis on how to > >> implement the Connect side integration with the new protocol; so far > >> besides leveraging on the new "protocol type" we did not yet think > through > >> the Connect side implementations. For Streams here's a draft of > >> integration > >> plan: > >> > >> > https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn > >> just FYI for your analysis on Connect. > >> > >> On Tue, Jul 19, 2022 at 10:48 PM Sagar > wrote: > >> > >> > Hi David, > >> > > >> > Thank you for your response. The reason I thought connect can also fit > >> into > >> > this new scheme is that even today the connect uses a > WorkerCoordinator > >> > extending from AbstractCoordinator to empower rebalances of > >> > tasks/connectors. The WorkerCoordinator sets the protocolType() to > >> connect > >> > and uses the metadata() method by plumbing into > >> JoinGroupRequestProtocol. > >> > > >> > I think the changes to support connect would be similar at a high > level > >> to > >> > the changes in streams mainly because of the Client side assignors > being > >> > used in both. At an implementation level, we might need to make a lot > of > >> > changes to get onto this new assignment protocol like enhancing the > >> > JoinGroup request/response and SyncGroup and using > >> ConsumerGroupHeartbeat > >> > API etc again on similar lines to streams (or there might be > >> deviations). I > >> > would try to perform a detailed analysis of the same and we can have > a > >> > separate discussion thread for that as that would derail this > discussion > >> > thread. Let me know if that sounds good to you. > >> > > >> > Thanks! > >> > Sagar. > >> > > >> > > >> > > >> > On Fri, Jul 15, 2022 at 5:47 PM David Jacot > >> > > >> > wrote: > >> > > >> > > Hi Sagar, > >> > > > >> > > Thanks for your comments. > >> > > > >> > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it. > >> > > > >> > > 2) The idea is to transition C from his current assignment to his > >> > > target assignment when he can move to epoch 3. When that happens, > the > >> > > member assignment is updated and persisted with all its assigned > >> > > partitions even if they are not all revoked yet. In other words, the > >> > > member assignment becomes the target assignment. This is basically > an > >> > > optimization to avoid having to write all the changes to the log. > The > >> > > examples are based on the persisted state so I understand the > >> > > confusion. Let me see if I can improve this in the description. > >> > > > >> > > 3) Regarding Connect, it could reuse the protocol with a client side > >> > > assignor if it fits in the protocol. The assignment is about > >> > > topicid-partitions + metadata, could Connect fit into this? > >> > > > >> > > Best, > >> > > David > >> > > > >> > > On Fri, Jul 15, 2022 at 1:55 PM Sagar > >> wrote: > >> > > > > >> > > > Hi David, > >> > > > > >> > > > Thanks for the KIP. I just had minor observations: > >> > > > > >> > > > 1) In the Assignment Error section in Client Side mode Assignment > >> > > process, > >> > > > you mentioned => `In this case, the client side assignor can > return > >> an > >> > > > error to the group coordinator`. In this case are you referring to > >> the > >> > > > Assignor returning an AssignmentError that's listed down towards > the > >> > end? > >> > > > If yes, do you think it would make sense to mention this > explicitly > >> > here? > >> > > > > >> > > > 2) In the Case Studies section, I have a slight
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Gregory, Thanks for your feedback. Please find my answers below. > 1. The 'Rejected Alternatives' section describes how member epoch should > advance in step with the group epoch and assignment epoch values. I think > that this is a good idea for the reasons described in the KIP. When the > protocol is incrementally assigning partitions to a worker, what member > epoch does each incremental assignment use? Are member epochs re-used, and > a single member epoch can correspond to multiple different (monotonically > larger) assignments? Those are good questions. The member epoch is aligned to the group/assignment. A member transitions to its new target epoch when it has revoked the partitions that he no longer owns. It has to revoke partitions with its current epoch. When the revocation is completed, it immediately gets its new epoch with the assigned partitions which are already free to be assigned. When the remaining partitions are freed up by the other members, the coordinator assigns them to the member while keeping the same member epoch. In other words, a member can revoke partitions or can get new partitions while having the same epoch. > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > not, should custom client-side assignor implementations interact with the > Reason field, and how is its common meaning agreed upon? If so, what is the > benefit of a distinct Reason field over including such functionality in the > opaque metadata? It is opaque to a certain extent. Any non zero values are considered as errors but the non zero values are not specified in the protocol. This is left to the client-side assignor implementation. There are two benefits here: 1) The reason is used by the coordinator to do the validation. If non-zero, the assignment should not change. If zero, the assignment must be valid. 2) It allows the operator to see if the assignment was successful without having to deserialize the metadata. > 3. The following is included in the KIP: "Thanks to this, the input of the > client side assignor is entirely driven by the group coordinator. The > consumer is no longer responsible for maintaining any state besides its > assigned partitions." Does this mean that the client-side assignor MAY > incorporate additional non-Metadata state (such as partition throughput, > cpu/memory metrics, config topics, etc), or that additional non-Metadata > state SHOULD NOT be used? Here, I was referring to the canonical case. Today the KafkaConsumer tracks metadata (e.g. partitions) and feeds the assignor with it. With the new protocol, we want the assignor to get all its information from the coordinator in order to avoid metadata divergences. You can definitely do whatever you want in a custom assignor. You could use metadata propagated via the protocol itself in the metadata field or use external metadata sources as well. > 4. I see that there are separate classes > for org.apache.kafka.server.group.consumer.PartitionAssignor > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to > overlap significantly. Is it possible for these two implementations to be > unified? This would serve to promote feature parity of server-side and > client-side assignors, and would also facilitate operational flexibility in > certain situations. For example, if a server-side assignor has some poor > behavior and needs a patch, deploying the patched assignor to the client > and switching one consumer group to a client-side assignor may be faster > and less risky than patching all of the brokers. With the currently > proposed distinct APIs, a non-trivial reimplementation would have to be > assembled, and if the two APIs have diverged significantly, then it is > possible that a reimplementation would not be possible. That's a good question. I went with two different interfaces because the server side assignor does not have a notion of custom metadata and is thus simpler. I thought that it is misleading to reuse the client-side interface in this case due to this. Best, David On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris wrote: > > Hey All! > > Thanks for the KIP, it's wonderful to see cooperative rebalancing making it > down the stack! > > I had a few questions: > > 1. The 'Rejected Alternatives' section describes how member epoch should > advance in step with the group epoch and assignment epoch values. I think > that this is a good idea for the reasons described in the KIP. When the > protocol is incrementally assigning partitions to a worker, what member > epoch does each incremental assignment use? Are member epochs re-used, and > a single member epoch can correspond to multiple different (monotonically > larger) assignments? > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > not, should custom client-side assignor implementations interact with the > Reason field, and how is its common meaning agreed upon? If so, what is the > benefit of a distinct
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Luke, Thanks for your comments. Please find my answers below. > 1. Will the member reconciling jump to the latest target assignment? For > example, member epoch are A:1, B:2, but now the group is in target > assignment epoch 4. Will the members jump to the target assignment result > in epoch 4 directly, or they should first sync to epoch 2, then 3, and 4? I > tried to find out if there's any problem if we jump to 4, but I can't. So, > I think we'll go to epoch 4 directly, right? That's right. The reconciliation always uses the latest target assignment so members might skip epochs. The only constraint to jump to a new epoch is that the member should have revoked the required partitions. In your example, if A and B do not have to revoke any partitions, they would be moved to epoch 4 immediately by the coordinator. Otherwise, the coordinator will ask them to revoke partitions first and then move them to epoch 4 when the revocation is acknowledged. > 2. For the ConsumerGroupPrepareAssignment and > ConsumerGroupInstallAssignment API, when member epoch doesn't match, it'll > retry (and hope) in next heartbeat response, the member epoch will be > bumped. But in ConsumerGroupHeartbeat, Upon receiving the > FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and > rejoins with the same member id and the epoch 0, unless the partitions > owned by the members are a subset of the target partitions. I'm wondering > why can't we return the latest member epoch to the member in > ConsumerGroupHeartbeat response no matter if the owned partitions are > subset of target partitions or not. After all, the member is still one of > the group member, the assignment for the member is still valid, right? This > way, the ConsumerGroupPrepareAssignment and ConsumerGroupInstallAssignment > API can piggyback it to retry in next heartbeat response. WDYT? As explained by Guozhang, the error semantic is different based on the API. For the *PrepareAssignment and the *InstallAssignment, it means that the member epoch is stale and that the client should retry when it gets a newer epoch whereas for the ConsumerGroupHeartbeat, it means that the client has been fenced by the coordinator and thus should abandon all its partitions and rejoin. I do agree with Guozhang that it would be better to use different errors in order to be clear. It is misleading otherwise. Regarding your second point, I think that we can make the API idempotent here. There are three cases: 1) If a client sends a ConsumerGroupHeartbeat request with an older member epoch but with partitions which are a subset of the expected target partitions, we can move it to the current member epoch. 2) If the member epoch is smaller and the partitions differ, it is very likely that the member owns partitions which are owned by other members now so it seems to me that we should treat it as a fatal error in this case. We could be a bit smarter and check whether the partitions are really owned by other members and if they are not we could proceed. The issue in this case is that the member won't be able to revoke the partitions and to commit the offsets with a stale member epoch before the coordinator moves it to the target epoch. 3) If the member epoch is larger, we should fence the member because this should never happen. > 3. When receiving COMPUTE_ASSIGNMENT error in ConsumerGroupHeartbeat API, > the consumer starts the assignment process. But what if somehow the member > didn't send out ConsumerGroupPrepareAssignment request, what would we do? I > think we'll wait for rebalance timeout and kick the member out of the > group, right? And then, what will the group coordinator do? I think it'll > select another member to be the leader and return COMPUTE_ASSIGNMENT error > in that member's heartbeat response, right? Maybe we should add that into > KIP. When the member is selected to perform the assignment, the timer starts ticking so the coordinator gives the rebalance timeout to the member to perform it. If it does not complete the full process by that time, the coordinator will pick another member to run the assignment. I believe that we should not fence the member based on this though. I was thinking that fencing would be only done via the ConsumerGroupHeartbeat API and the session timeout. I will clarify this in the KIP. > Some typos: > 4. In "Assignment Process" section: > If the selected assignor does exist, the group coordinator will reject the > heartbeat with an UNSUPPORTED_ASSIGNOR error. > -> I think it should be "does not exist" > > 5. In "JoinGroup Handling" section: > If the member has revoked all its partitions or the required partitions, > the member can transition to its next epoch. The current assignment become > the current assignment. > -> I think it should be "The target assignment becomes the current > assignment", right? > > 6. In "ConsumerGroupDescribe API" section: > When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Sagar, Thanks for the feedback and the document. That's really helpful. I will take a look at it. Overall, it seems to me that both Connect and the Consumer could share the same underlying "engine". The main difference is that the Consumer assigns topic-partitions to members whereas Connect assigns tasks to workers. I see two ways to move forward: 1) We extend the new proposed APIs to support different resource types (e.g. partitions, tasks, etc.); or 2) We use new dedicated APIs for Connect. The dedicated APIs would be similar to the new ones but different on the content/resources and they would rely on the same engine on the coordinator side. I personally lean towards 2) because I am not a fan of overcharging APIs to serve different purposes. That being said, I am not opposed to 1) if we can find an elegant way to do it. I think that we can continue to discuss it here for now in order to ensure that this KIP is compatible with what we will do for Connect in the future. Best, David On Mon, Aug 8, 2022 at 2:41 PM David Jacot wrote: > > Hi all, > > I am back from vacation. I will go through and address your comments > in the coming days. Thanks for your feedback. > > Cheers, > David > > On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris wrote: > > > > Hey All! > > > > Thanks for the KIP, it's wonderful to see cooperative rebalancing making it > > down the stack! > > > > I had a few questions: > > > > 1. The 'Rejected Alternatives' section describes how member epoch should > > advance in step with the group epoch and assignment epoch values. I think > > that this is a good idea for the reasons described in the KIP. When the > > protocol is incrementally assigning partitions to a worker, what member > > epoch does each incremental assignment use? Are member epochs re-used, and > > a single member epoch can correspond to multiple different (monotonically > > larger) assignments? > > > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > > not, should custom client-side assignor implementations interact with the > > Reason field, and how is its common meaning agreed upon? If so, what is the > > benefit of a distinct Reason field over including such functionality in the > > opaque metadata? > > > > 3. The following is included in the KIP: "Thanks to this, the input of the > > client side assignor is entirely driven by the group coordinator. The > > consumer is no longer responsible for maintaining any state besides its > > assigned partitions." Does this mean that the client-side assignor MAY > > incorporate additional non-Metadata state (such as partition throughput, > > cpu/memory metrics, config topics, etc), or that additional non-Metadata > > state SHOULD NOT be used? > > > > 4. I see that there are separate classes > > for org.apache.kafka.server.group.consumer.PartitionAssignor > > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to > > overlap significantly. Is it possible for these two implementations to be > > unified? This would serve to promote feature parity of server-side and > > client-side assignors, and would also facilitate operational flexibility in > > certain situations. For example, if a server-side assignor has some poor > > behavior and needs a patch, deploying the patched assignor to the client > > and switching one consumer group to a client-side assignor may be faster > > and less risky than patching all of the brokers. With the currently > > proposed distinct APIs, a non-trivial reimplementation would have to be > > assembled, and if the two APIs have diverged significantly, then it is > > possible that a reimplementation would not be possible. > > > > -- > > Greg Harris > > gharris1...@gmail.com > > github.com/gharris1727 > > > > On Wed, Aug 3, 2022 at 8:39 AM Sagar wrote: > > > > > Hi Guozhang/David, > > > > > > I created a confluence page to discuss how Connect would need to change > > > based on the new rebalance protocol. Here's the page: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > > > > > It's also pretty longish and I have tried to keep a format similar to > > > KIP-848. Let me know what you think. Also, do you think this should be > > > moved to a separate discussion thread or is this one fine? > > > > > > Thanks! > > > Sagar. > > > > > > On Tue, Jul 26, 2022 at 7:37 AM Sagar wrote: > > > > > > > Hello Guozhang, > > > > > > > > Thank you so much for the doc on Kafka Streams. Sure, I would do the > > > > analysis and come up with such a document. > > > > > > > > Thanks! > > > > Sagar. > > > > > > > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang > > > wrote: > > > > > > > >> Hello Sagar, > > > >> > > > >> It would be great if you could come back with some analysis on how to > > > >> implement the Connect side integration with the new protocol; so far > > > >> besides leveraging on the new "protocol type" we did not yet think > > >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi all, I am back from vacation. I will go through and address your comments in the coming days. Thanks for your feedback. Cheers, David On Wed, Aug 3, 2022 at 10:05 PM Gregory Harris wrote: > > Hey All! > > Thanks for the KIP, it's wonderful to see cooperative rebalancing making it > down the stack! > > I had a few questions: > > 1. The 'Rejected Alternatives' section describes how member epoch should > advance in step with the group epoch and assignment epoch values. I think > that this is a good idea for the reasons described in the KIP. When the > protocol is incrementally assigning partitions to a worker, what member > epoch does each incremental assignment use? Are member epochs re-used, and > a single member epoch can correspond to multiple different (monotonically > larger) assignments? > > 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If > not, should custom client-side assignor implementations interact with the > Reason field, and how is its common meaning agreed upon? If so, what is the > benefit of a distinct Reason field over including such functionality in the > opaque metadata? > > 3. The following is included in the KIP: "Thanks to this, the input of the > client side assignor is entirely driven by the group coordinator. The > consumer is no longer responsible for maintaining any state besides its > assigned partitions." Does this mean that the client-side assignor MAY > incorporate additional non-Metadata state (such as partition throughput, > cpu/memory metrics, config topics, etc), or that additional non-Metadata > state SHOULD NOT be used? > > 4. I see that there are separate classes > for org.apache.kafka.server.group.consumer.PartitionAssignor > and org.apache.kafka.clients.consumer.PartitionAssignor that seem to > overlap significantly. Is it possible for these two implementations to be > unified? This would serve to promote feature parity of server-side and > client-side assignors, and would also facilitate operational flexibility in > certain situations. For example, if a server-side assignor has some poor > behavior and needs a patch, deploying the patched assignor to the client > and switching one consumer group to a client-side assignor may be faster > and less risky than patching all of the brokers. With the currently > proposed distinct APIs, a non-trivial reimplementation would have to be > assembled, and if the two APIs have diverged significantly, then it is > possible that a reimplementation would not be possible. > > -- > Greg Harris > gharris1...@gmail.com > github.com/gharris1727 > > On Wed, Aug 3, 2022 at 8:39 AM Sagar wrote: > > > Hi Guozhang/David, > > > > I created a confluence page to discuss how Connect would need to change > > based on the new rebalance protocol. Here's the page: > > > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > > > It's also pretty longish and I have tried to keep a format similar to > > KIP-848. Let me know what you think. Also, do you think this should be > > moved to a separate discussion thread or is this one fine? > > > > Thanks! > > Sagar. > > > > On Tue, Jul 26, 2022 at 7:37 AM Sagar wrote: > > > > > Hello Guozhang, > > > > > > Thank you so much for the doc on Kafka Streams. Sure, I would do the > > > analysis and come up with such a document. > > > > > > Thanks! > > > Sagar. > > > > > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang > > wrote: > > > > > >> Hello Sagar, > > >> > > >> It would be great if you could come back with some analysis on how to > > >> implement the Connect side integration with the new protocol; so far > > >> besides leveraging on the new "protocol type" we did not yet think > > through > > >> the Connect side implementations. For Streams here's a draft of > > >> integration > > >> plan: > > >> > > >> > > https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn > > >> just FYI for your analysis on Connect. > > >> > > >> On Tue, Jul 19, 2022 at 10:48 PM Sagar > > wrote: > > >> > > >> > Hi David, > > >> > > > >> > Thank you for your response. The reason I thought connect can also fit > > >> into > > >> > this new scheme is that even today the connect uses a > > WorkerCoordinator > > >> > extending from AbstractCoordinator to empower rebalances of > > >> > tasks/connectors. The WorkerCoordinator sets the protocolType() to > > >> connect > > >> > and uses the metadata() method by plumbing into > > >> JoinGroupRequestProtocol. > > >> > > > >> > I think the changes to support connect would be similar at a high > > level > > >> to > > >> > the changes in streams mainly because of the Client side assignors > > being > > >> > used in both. At an implementation level, we might need to make a lot > > of > > >> > changes to get onto this new assignment protocol like enhancing the > > >> > JoinGroup request/response and SyncGroup and using > > >>
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hey All! Thanks for the KIP, it's wonderful to see cooperative rebalancing making it down the stack! I had a few questions: 1. The 'Rejected Alternatives' section describes how member epoch should advance in step with the group epoch and assignment epoch values. I think that this is a good idea for the reasons described in the KIP. When the protocol is incrementally assigning partitions to a worker, what member epoch does each incremental assignment use? Are member epochs re-used, and a single member epoch can correspond to multiple different (monotonically larger) assignments? 2. Is the Assignor's 'Reason' field opaque to the group coordinator? If not, should custom client-side assignor implementations interact with the Reason field, and how is its common meaning agreed upon? If so, what is the benefit of a distinct Reason field over including such functionality in the opaque metadata? 3. The following is included in the KIP: "Thanks to this, the input of the client side assignor is entirely driven by the group coordinator. The consumer is no longer responsible for maintaining any state besides its assigned partitions." Does this mean that the client-side assignor MAY incorporate additional non-Metadata state (such as partition throughput, cpu/memory metrics, config topics, etc), or that additional non-Metadata state SHOULD NOT be used? 4. I see that there are separate classes for org.apache.kafka.server.group.consumer.PartitionAssignor and org.apache.kafka.clients.consumer.PartitionAssignor that seem to overlap significantly. Is it possible for these two implementations to be unified? This would serve to promote feature parity of server-side and client-side assignors, and would also facilitate operational flexibility in certain situations. For example, if a server-side assignor has some poor behavior and needs a patch, deploying the patched assignor to the client and switching one consumer group to a client-side assignor may be faster and less risky than patching all of the brokers. With the currently proposed distinct APIs, a non-trivial reimplementation would have to be assembled, and if the two APIs have diverged significantly, then it is possible that a reimplementation would not be possible. -- Greg Harris gharris1...@gmail.com github.com/gharris1727 On Wed, Aug 3, 2022 at 8:39 AM Sagar wrote: > Hi Guozhang/David, > > I created a confluence page to discuss how Connect would need to change > based on the new rebalance protocol. Here's the page: > > https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol > > It's also pretty longish and I have tried to keep a format similar to > KIP-848. Let me know what you think. Also, do you think this should be > moved to a separate discussion thread or is this one fine? > > Thanks! > Sagar. > > On Tue, Jul 26, 2022 at 7:37 AM Sagar wrote: > > > Hello Guozhang, > > > > Thank you so much for the doc on Kafka Streams. Sure, I would do the > > analysis and come up with such a document. > > > > Thanks! > > Sagar. > > > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang > wrote: > > > >> Hello Sagar, > >> > >> It would be great if you could come back with some analysis on how to > >> implement the Connect side integration with the new protocol; so far > >> besides leveraging on the new "protocol type" we did not yet think > through > >> the Connect side implementations. For Streams here's a draft of > >> integration > >> plan: > >> > >> > https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn > >> just FYI for your analysis on Connect. > >> > >> On Tue, Jul 19, 2022 at 10:48 PM Sagar > wrote: > >> > >> > Hi David, > >> > > >> > Thank you for your response. The reason I thought connect can also fit > >> into > >> > this new scheme is that even today the connect uses a > WorkerCoordinator > >> > extending from AbstractCoordinator to empower rebalances of > >> > tasks/connectors. The WorkerCoordinator sets the protocolType() to > >> connect > >> > and uses the metadata() method by plumbing into > >> JoinGroupRequestProtocol. > >> > > >> > I think the changes to support connect would be similar at a high > level > >> to > >> > the changes in streams mainly because of the Client side assignors > being > >> > used in both. At an implementation level, we might need to make a lot > of > >> > changes to get onto this new assignment protocol like enhancing the > >> > JoinGroup request/response and SyncGroup and using > >> ConsumerGroupHeartbeat > >> > API etc again on similar lines to streams (or there might be > >> deviations). I > >> > would try to perform a detailed analysis of the same and we can have > a > >> > separate discussion thread for that as that would derail this > discussion > >> > thread. Let me know if that sounds good to you. > >> > > >> > Thanks! > >> > Sagar. > >> > > >> > > >> > > >> > On Fri, Jul 15, 2022 at 5:47 PM David
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Guozhang/David, I created a confluence page to discuss how Connect would need to change based on the new rebalance protocol. Here's the page: https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol It's also pretty longish and I have tried to keep a format similar to KIP-848. Let me know what you think. Also, do you think this should be moved to a separate discussion thread or is this one fine? Thanks! Sagar. On Tue, Jul 26, 2022 at 7:37 AM Sagar wrote: > Hello Guozhang, > > Thank you so much for the doc on Kafka Streams. Sure, I would do the > analysis and come up with such a document. > > Thanks! > Sagar. > > On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang wrote: > >> Hello Sagar, >> >> It would be great if you could come back with some analysis on how to >> implement the Connect side integration with the new protocol; so far >> besides leveraging on the new "protocol type" we did not yet think through >> the Connect side implementations. For Streams here's a draft of >> integration >> plan: >> >> https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn >> just FYI for your analysis on Connect. >> >> On Tue, Jul 19, 2022 at 10:48 PM Sagar wrote: >> >> > Hi David, >> > >> > Thank you for your response. The reason I thought connect can also fit >> into >> > this new scheme is that even today the connect uses a WorkerCoordinator >> > extending from AbstractCoordinator to empower rebalances of >> > tasks/connectors. The WorkerCoordinator sets the protocolType() to >> connect >> > and uses the metadata() method by plumbing into >> JoinGroupRequestProtocol. >> > >> > I think the changes to support connect would be similar at a high level >> to >> > the changes in streams mainly because of the Client side assignors being >> > used in both. At an implementation level, we might need to make a lot of >> > changes to get onto this new assignment protocol like enhancing the >> > JoinGroup request/response and SyncGroup and using >> ConsumerGroupHeartbeat >> > API etc again on similar lines to streams (or there might be >> deviations). I >> > would try to perform a detailed analysis of the same and we can have a >> > separate discussion thread for that as that would derail this discussion >> > thread. Let me know if that sounds good to you. >> > >> > Thanks! >> > Sagar. >> > >> > >> > >> > On Fri, Jul 15, 2022 at 5:47 PM David Jacot > > >> > wrote: >> > >> > > Hi Sagar, >> > > >> > > Thanks for your comments. >> > > >> > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it. >> > > >> > > 2) The idea is to transition C from his current assignment to his >> > > target assignment when he can move to epoch 3. When that happens, the >> > > member assignment is updated and persisted with all its assigned >> > > partitions even if they are not all revoked yet. In other words, the >> > > member assignment becomes the target assignment. This is basically an >> > > optimization to avoid having to write all the changes to the log. The >> > > examples are based on the persisted state so I understand the >> > > confusion. Let me see if I can improve this in the description. >> > > >> > > 3) Regarding Connect, it could reuse the protocol with a client side >> > > assignor if it fits in the protocol. The assignment is about >> > > topicid-partitions + metadata, could Connect fit into this? >> > > >> > > Best, >> > > David >> > > >> > > On Fri, Jul 15, 2022 at 1:55 PM Sagar >> wrote: >> > > > >> > > > Hi David, >> > > > >> > > > Thanks for the KIP. I just had minor observations: >> > > > >> > > > 1) In the Assignment Error section in Client Side mode Assignment >> > > process, >> > > > you mentioned => `In this case, the client side assignor can return >> an >> > > > error to the group coordinator`. In this case are you referring to >> the >> > > > Assignor returning an AssignmentError that's listed down towards the >> > end? >> > > > If yes, do you think it would make sense to mention this explicitly >> > here? >> > > > >> > > > 2) In the Case Studies section, I have a slight confusion, not sure >> if >> > > > others have the same. Consider this step: >> > > > >> > > > When B heartbeats, the group coordinator transitions him to epoch 3 >> > > because >> > > > B has no partitions to revoke. It persists the change and reply. >> > > > >> > > >- Group (epoch=3) >> > > > - A >> > > > - B >> > > > - C >> > > >- Target Assignment (epoch=3) >> > > > - A - partitions=[foo-0] >> > > > - B - partitions=[foo-2] >> > > > - C - partitions=[foo-1] >> > > >- Member Assignment >> > > > - A - epoch=2, partitions=[foo-0, foo-1] >> > > > - B - epoch=3, partitions=[foo-2] >> > > > - C - epoch=3, partitions=[foo-1] >> > > > >> > > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 >> yet. >> > > > >> > > > Here,it's mentioned
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hello Guozhang, Thank you so much for the doc on Kafka Streams. Sure, I would do the analysis and come up with such a document. Thanks! Sagar. On Tue, Jul 26, 2022 at 4:47 AM Guozhang Wang wrote: > Hello Sagar, > > It would be great if you could come back with some analysis on how to > implement the Connect side integration with the new protocol; so far > besides leveraging on the new "protocol type" we did not yet think through > the Connect side implementations. For Streams here's a draft of integration > plan: > > https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn > just FYI for your analysis on Connect. > > On Tue, Jul 19, 2022 at 10:48 PM Sagar wrote: > > > Hi David, > > > > Thank you for your response. The reason I thought connect can also fit > into > > this new scheme is that even today the connect uses a WorkerCoordinator > > extending from AbstractCoordinator to empower rebalances of > > tasks/connectors. The WorkerCoordinator sets the protocolType() to > connect > > and uses the metadata() method by plumbing into JoinGroupRequestProtocol. > > > > I think the changes to support connect would be similar at a high level > to > > the changes in streams mainly because of the Client side assignors being > > used in both. At an implementation level, we might need to make a lot of > > changes to get onto this new assignment protocol like enhancing the > > JoinGroup request/response and SyncGroup and using ConsumerGroupHeartbeat > > API etc again on similar lines to streams (or there might be > deviations). I > > would try to perform a detailed analysis of the same and we can have a > > separate discussion thread for that as that would derail this discussion > > thread. Let me know if that sounds good to you. > > > > Thanks! > > Sagar. > > > > > > > > On Fri, Jul 15, 2022 at 5:47 PM David Jacot > > > wrote: > > > > > Hi Sagar, > > > > > > Thanks for your comments. > > > > > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it. > > > > > > 2) The idea is to transition C from his current assignment to his > > > target assignment when he can move to epoch 3. When that happens, the > > > member assignment is updated and persisted with all its assigned > > > partitions even if they are not all revoked yet. In other words, the > > > member assignment becomes the target assignment. This is basically an > > > optimization to avoid having to write all the changes to the log. The > > > examples are based on the persisted state so I understand the > > > confusion. Let me see if I can improve this in the description. > > > > > > 3) Regarding Connect, it could reuse the protocol with a client side > > > assignor if it fits in the protocol. The assignment is about > > > topicid-partitions + metadata, could Connect fit into this? > > > > > > Best, > > > David > > > > > > On Fri, Jul 15, 2022 at 1:55 PM Sagar > wrote: > > > > > > > > Hi David, > > > > > > > > Thanks for the KIP. I just had minor observations: > > > > > > > > 1) In the Assignment Error section in Client Side mode Assignment > > > process, > > > > you mentioned => `In this case, the client side assignor can return > an > > > > error to the group coordinator`. In this case are you referring to > the > > > > Assignor returning an AssignmentError that's listed down towards the > > end? > > > > If yes, do you think it would make sense to mention this explicitly > > here? > > > > > > > > 2) In the Case Studies section, I have a slight confusion, not sure > if > > > > others have the same. Consider this step: > > > > > > > > When B heartbeats, the group coordinator transitions him to epoch 3 > > > because > > > > B has no partitions to revoke. It persists the change and reply. > > > > > > > >- Group (epoch=3) > > > > - A > > > > - B > > > > - C > > > >- Target Assignment (epoch=3) > > > > - A - partitions=[foo-0] > > > > - B - partitions=[foo-2] > > > > - C - partitions=[foo-1] > > > >- Member Assignment > > > > - A - epoch=2, partitions=[foo-0, foo-1] > > > > - B - epoch=3, partitions=[foo-2] > > > > - C - epoch=3, partitions=[foo-1] > > > > > > > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 > yet. > > > > > > > > Here,it's mentioned that member C can't get the foo-1 partition yet, > > but > > > > based on the description above, it seems it already has it. Do you > > think > > > it > > > > would be better to remove it and populate it only when it actually > gets > > > it? > > > > I see this in a lot of other places, so have I understood it > > incorrectly > > > ? > > > > > > > > > > > > Regarding connect , it might be out of scope of this discussion, but > > from > > > > what I understood it would probably be running in client side > assignor > > > mode > > > > even on the new rebalance protocol as it has its own Custom > > > Assignors(Eager > > > > and IncrementalCooperative). > > > > > > >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hello Luke, Thanks for the detailed feedback! I tried to incorporate / answer some of them inline below. On Thu, Jul 21, 2022 at 6:39 AM Luke Chen wrote: > Hi David, > > So excited to see the consumer protocol will be renewed! > Thanks for David, Guozhang, and Jason for creating this great proposal! > > Some comments about the protocol: > 1. Will the member reconciling jump to the latest target assignment? For > example, member epoch are A:1, B:2, but now the group is in target > assignment epoch 4. Will the members jump to the target assignment result > in epoch 4 directly, or they should first sync to epoch 2, then 3, and 4? I > tried to find out if there's any problem if we jump to 4, but I can't. So, > I think we'll go to epoch 4 directly, right? > > The short answer is "yes" :) But the a bit longer answer is that, such reconciliation logic that may skip some epochs are controlled by the brokers, not the clients. The clients should be simply dumb and takes in the indicated (delta) assignment as long as its epoch is no smaller than its current one. The brokers, having full knowledge of where everyone is at the moment, and the latest target assignment, would decide which delta assignment to send, step by step, and which assignment could be skipped. > 2. For the ConsumerGroupPrepareAssignment and > ConsumerGroupInstallAssignment API, when member epoch doesn't match, it'll > retry (and hope) in next heartbeat response, the member epoch will be > bumped. But in ConsumerGroupHeartbeat, Upon receiving the > FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and > rejoins with the same member id and the epoch 0, unless the partitions > owned by the members are a subset of the target partitions. I'm wondering > why can't we return the latest member epoch to the member in > ConsumerGroupHeartbeat response no matter if the owned partitions are > subset of target partitions or not. After all, the member is still one of > the group member, the assignment for the member is still valid, right? This > way, the ConsumerGroupPrepareAssignment and ConsumerGroupInstallAssignment > API can piggyback it to retry in next heartbeat response. WDYT? > > The semantics of these two classes of requests are different: for ConsumerGroupPrepareAssignment / ConsumerGroupInstallAssignment it's not really indicating the consumer is actually "fenced", but only to indicate "your information is probably stale, so just stay put and wait for the renewed info". Note that these two requests are only for clients to try collecting information or making a change to the group assignment, so even fai that request does not mean the member is no longer part of the group. BTW that reminds me for these two requests maybe we should use a different error code than FENCED_MEMBER_EPOCH since it's a bit confusing, maybe "STALE_MEMBER_EPOCH"? For ConsumerGroupHeartbeat, the fencing purpose is only to make sure that any partitions that are assigned to others should not be retained at the "fenced member". So I think we can make the assignment response in ConsumerGroupHeartbeat idempotent, and only return the fatal error code if the consumer's "owned partitions" has any overlaps with other members' current assignment, and otherwise we just proceed with the new assignment. > 3. When receiving COMPUTE_ASSIGNMENT error in ConsumerGroupHeartbeat API, > the consumer starts the assignment process. But what if somehow the member > didn't send out ConsumerGroupPrepareAssignment request, what would we do? I > think we'll wait for rebalance timeout and kick the member out of the > group, right? And then, what will the group coordinator do? I think it'll > select another member to be the leader and return COMPUTE_ASSIGNMENT error > in that member's heartbeat response, right? Maybe we should add that into > KIP. > The broker has a timer for this process, and if it did not receive the assignment (i.e. it is until the ConsumerGroupInstallAssignment is received; note that for some assignor they may not need to always refresh their information via ConsumerGroupPrepareAssignment upon computing the new assignment) in time, then the broker would pick another member and ask it to COMPUTE_ASSIGNMENT. But, we may not try to kick the member out of the group based on that timer, but only based on the heartbeat timer. I.e. let's say there's one member who keeps heartbeating but never want to compute the new assignment even when being asked, it would still be remained in the group. We can clarify this a bit more in the doc. > > Some typos: > 4. In "Assignment Process" section: > If the selected assignor does exist, the group coordinator will reject the > heartbeat with an UNSUPPORTED_ASSIGNOR error. > -> I think it should be "does not exist" > > Ack! > 5. In "JoinGroup Handling" section: > If the member has revoked all its partitions or the required partitions, > the member can transition to its next epoch. The current assignment become > the current
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hello Sagar, It would be great if you could come back with some analysis on how to implement the Connect side integration with the new protocol; so far besides leveraging on the new "protocol type" we did not yet think through the Connect side implementations. For Streams here's a draft of integration plan: https://docs.google.com/document/d/17PNz2sGoIvGyIzz8vLyJTJTU2rqnD_D9uHJnH9XARjU/edit#heading=h.pdgirmi57dvn just FYI for your analysis on Connect. On Tue, Jul 19, 2022 at 10:48 PM Sagar wrote: > Hi David, > > Thank you for your response. The reason I thought connect can also fit into > this new scheme is that even today the connect uses a WorkerCoordinator > extending from AbstractCoordinator to empower rebalances of > tasks/connectors. The WorkerCoordinator sets the protocolType() to connect > and uses the metadata() method by plumbing into JoinGroupRequestProtocol. > > I think the changes to support connect would be similar at a high level to > the changes in streams mainly because of the Client side assignors being > used in both. At an implementation level, we might need to make a lot of > changes to get onto this new assignment protocol like enhancing the > JoinGroup request/response and SyncGroup and using ConsumerGroupHeartbeat > API etc again on similar lines to streams (or there might be deviations). I > would try to perform a detailed analysis of the same and we can have a > separate discussion thread for that as that would derail this discussion > thread. Let me know if that sounds good to you. > > Thanks! > Sagar. > > > > On Fri, Jul 15, 2022 at 5:47 PM David Jacot > wrote: > > > Hi Sagar, > > > > Thanks for your comments. > > > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it. > > > > 2) The idea is to transition C from his current assignment to his > > target assignment when he can move to epoch 3. When that happens, the > > member assignment is updated and persisted with all its assigned > > partitions even if they are not all revoked yet. In other words, the > > member assignment becomes the target assignment. This is basically an > > optimization to avoid having to write all the changes to the log. The > > examples are based on the persisted state so I understand the > > confusion. Let me see if I can improve this in the description. > > > > 3) Regarding Connect, it could reuse the protocol with a client side > > assignor if it fits in the protocol. The assignment is about > > topicid-partitions + metadata, could Connect fit into this? > > > > Best, > > David > > > > On Fri, Jul 15, 2022 at 1:55 PM Sagar wrote: > > > > > > Hi David, > > > > > > Thanks for the KIP. I just had minor observations: > > > > > > 1) In the Assignment Error section in Client Side mode Assignment > > process, > > > you mentioned => `In this case, the client side assignor can return an > > > error to the group coordinator`. In this case are you referring to the > > > Assignor returning an AssignmentError that's listed down towards the > end? > > > If yes, do you think it would make sense to mention this explicitly > here? > > > > > > 2) In the Case Studies section, I have a slight confusion, not sure if > > > others have the same. Consider this step: > > > > > > When B heartbeats, the group coordinator transitions him to epoch 3 > > because > > > B has no partitions to revoke. It persists the change and reply. > > > > > >- Group (epoch=3) > > > - A > > > - B > > > - C > > >- Target Assignment (epoch=3) > > > - A - partitions=[foo-0] > > > - B - partitions=[foo-2] > > > - C - partitions=[foo-1] > > >- Member Assignment > > > - A - epoch=2, partitions=[foo-0, foo-1] > > > - B - epoch=3, partitions=[foo-2] > > > - C - epoch=3, partitions=[foo-1] > > > > > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet. > > > > > > Here,it's mentioned that member C can't get the foo-1 partition yet, > but > > > based on the description above, it seems it already has it. Do you > think > > it > > > would be better to remove it and populate it only when it actually gets > > it? > > > I see this in a lot of other places, so have I understood it > incorrectly > > ? > > > > > > > > > Regarding connect , it might be out of scope of this discussion, but > from > > > what I understood it would probably be running in client side assignor > > mode > > > even on the new rebalance protocol as it has its own Custom > > Assignors(Eager > > > and IncrementalCooperative). > > > > > > Thanks! > > > > > > Sagar. > > > > > > > > > > > > > > > > > > > > > On Fri, Jul 15, 2022 at 5:00 PM David Jacot > > > > > > wrote: > > > > > > > Thanks Hector! Our goal is to move forward with specialized API > > > > instead of relying on one generic API. For Connect, we can apply the > > > > exact same pattern and reuse/share the core implementation on the > > > > server side. For the schema registry, I think that we should consider > > > > having a
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi David, So excited to see the consumer protocol will be renewed! Thanks for David, Guozhang, and Jason for creating this great proposal! Some comments about the protocol: 1. Will the member reconciling jump to the latest target assignment? For example, member epoch are A:1, B:2, but now the group is in target assignment epoch 4. Will the members jump to the target assignment result in epoch 4 directly, or they should first sync to epoch 2, then 3, and 4? I tried to find out if there's any problem if we jump to 4, but I can't. So, I think we'll go to epoch 4 directly, right? 2. For the ConsumerGroupPrepareAssignment and ConsumerGroupInstallAssignment API, when member epoch doesn't match, it'll retry (and hope) in next heartbeat response, the member epoch will be bumped. But in ConsumerGroupHeartbeat, Upon receiving the FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and rejoins with the same member id and the epoch 0, unless the partitions owned by the members are a subset of the target partitions. I'm wondering why can't we return the latest member epoch to the member in ConsumerGroupHeartbeat response no matter if the owned partitions are subset of target partitions or not. After all, the member is still one of the group member, the assignment for the member is still valid, right? This way, the ConsumerGroupPrepareAssignment and ConsumerGroupInstallAssignment API can piggyback it to retry in next heartbeat response. WDYT? 3. When receiving COMPUTE_ASSIGNMENT error in ConsumerGroupHeartbeat API, the consumer starts the assignment process. But what if somehow the member didn't send out ConsumerGroupPrepareAssignment request, what would we do? I think we'll wait for rebalance timeout and kick the member out of the group, right? And then, what will the group coordinator do? I think it'll select another member to be the leader and return COMPUTE_ASSIGNMENT error in that member's heartbeat response, right? Maybe we should add that into KIP. Some typos: 4. In "Assignment Process" section: If the selected assignor does exist, the group coordinator will reject the heartbeat with an UNSUPPORTED_ASSIGNOR error. -> I think it should be "does not exist" 5. In "JoinGroup Handling" section: If the member has revoked all its partitions or the required partitions, the member can transition to its next epoch. The current assignment become the current assignment. -> I think it should be "The target assignment becomes the current assignment", right? 6. In "ConsumerGroupDescribe API" section: When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request: -> It should be "handle a ConsumerGroupDescribe request" Again, thanks for the great proposal! It has considered multiple edge cases. Thank you. Luke On Wed, Jul 20, 2022 at 1:48 PM Sagar wrote: > Hi David, > > Thank you for your response. The reason I thought connect can also fit into > this new scheme is that even today the connect uses a WorkerCoordinator > extending from AbstractCoordinator to empower rebalances of > tasks/connectors. The WorkerCoordinator sets the protocolType() to connect > and uses the metadata() method by plumbing into JoinGroupRequestProtocol. > > I think the changes to support connect would be similar at a high level to > the changes in streams mainly because of the Client side assignors being > used in both. At an implementation level, we might need to make a lot of > changes to get onto this new assignment protocol like enhancing the > JoinGroup request/response and SyncGroup and using ConsumerGroupHeartbeat > API etc again on similar lines to streams (or there might be deviations). I > would try to perform a detailed analysis of the same and we can have a > separate discussion thread for that as that would derail this discussion > thread. Let me know if that sounds good to you. > > Thanks! > Sagar. > > > > On Fri, Jul 15, 2022 at 5:47 PM David Jacot > wrote: > > > Hi Sagar, > > > > Thanks for your comments. > > > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it. > > > > 2) The idea is to transition C from his current assignment to his > > target assignment when he can move to epoch 3. When that happens, the > > member assignment is updated and persisted with all its assigned > > partitions even if they are not all revoked yet. In other words, the > > member assignment becomes the target assignment. This is basically an > > optimization to avoid having to write all the changes to the log. The > > examples are based on the persisted state so I understand the > > confusion. Let me see if I can improve this in the description. > > > > 3) Regarding Connect, it could reuse the protocol with a client side > > assignor if it fits in the protocol. The assignment is about > > topicid-partitions + metadata, could Connect fit into this? > > > > Best, > > David > > > > On Fri, Jul 15, 2022 at 1:55 PM Sagar wrote: > > > > > > Hi David, > > > > > > Thanks for the KIP. I just had minor
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi David, Thank you for your response. The reason I thought connect can also fit into this new scheme is that even today the connect uses a WorkerCoordinator extending from AbstractCoordinator to empower rebalances of tasks/connectors. The WorkerCoordinator sets the protocolType() to connect and uses the metadata() method by plumbing into JoinGroupRequestProtocol. I think the changes to support connect would be similar at a high level to the changes in streams mainly because of the Client side assignors being used in both. At an implementation level, we might need to make a lot of changes to get onto this new assignment protocol like enhancing the JoinGroup request/response and SyncGroup and using ConsumerGroupHeartbeat API etc again on similar lines to streams (or there might be deviations). I would try to perform a detailed analysis of the same and we can have a separate discussion thread for that as that would derail this discussion thread. Let me know if that sounds good to you. Thanks! Sagar. On Fri, Jul 15, 2022 at 5:47 PM David Jacot wrote: > Hi Sagar, > > Thanks for your comments. > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it. > > 2) The idea is to transition C from his current assignment to his > target assignment when he can move to epoch 3. When that happens, the > member assignment is updated and persisted with all its assigned > partitions even if they are not all revoked yet. In other words, the > member assignment becomes the target assignment. This is basically an > optimization to avoid having to write all the changes to the log. The > examples are based on the persisted state so I understand the > confusion. Let me see if I can improve this in the description. > > 3) Regarding Connect, it could reuse the protocol with a client side > assignor if it fits in the protocol. The assignment is about > topicid-partitions + metadata, could Connect fit into this? > > Best, > David > > On Fri, Jul 15, 2022 at 1:55 PM Sagar wrote: > > > > Hi David, > > > > Thanks for the KIP. I just had minor observations: > > > > 1) In the Assignment Error section in Client Side mode Assignment > process, > > you mentioned => `In this case, the client side assignor can return an > > error to the group coordinator`. In this case are you referring to the > > Assignor returning an AssignmentError that's listed down towards the end? > > If yes, do you think it would make sense to mention this explicitly here? > > > > 2) In the Case Studies section, I have a slight confusion, not sure if > > others have the same. Consider this step: > > > > When B heartbeats, the group coordinator transitions him to epoch 3 > because > > B has no partitions to revoke. It persists the change and reply. > > > >- Group (epoch=3) > > - A > > - B > > - C > >- Target Assignment (epoch=3) > > - A - partitions=[foo-0] > > - B - partitions=[foo-2] > > - C - partitions=[foo-1] > >- Member Assignment > > - A - epoch=2, partitions=[foo-0, foo-1] > > - B - epoch=3, partitions=[foo-2] > > - C - epoch=3, partitions=[foo-1] > > > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet. > > > > Here,it's mentioned that member C can't get the foo-1 partition yet, but > > based on the description above, it seems it already has it. Do you think > it > > would be better to remove it and populate it only when it actually gets > it? > > I see this in a lot of other places, so have I understood it incorrectly > ? > > > > > > Regarding connect , it might be out of scope of this discussion, but from > > what I understood it would probably be running in client side assignor > mode > > even on the new rebalance protocol as it has its own Custom > Assignors(Eager > > and IncrementalCooperative). > > > > Thanks! > > > > Sagar. > > > > > > > > > > > > > > On Fri, Jul 15, 2022 at 5:00 PM David Jacot > > > wrote: > > > > > Thanks Hector! Our goal is to move forward with specialized API > > > instead of relying on one generic API. For Connect, we can apply the > > > exact same pattern and reuse/share the core implementation on the > > > server side. For the schema registry, I think that we should consider > > > having a tailored API to do simple membership/leader election. > > > > > > Best, > > > David > > > > > > On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma > wrote: > > > > > > > > Three quick comments: > > > > > > > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we > > > should > > > > document the differences in more detail before deciding one way or > > > another. > > > > That said, if people pass java.util.regex.Pattern, they expect their > > > > semantics to be honored. If we are doing something different, then we > > > > should consider adding an overload with our own Pattern class (I > don't > > > > think we'd want to expose re2j's at this point). > > > > 2. Regarding topic ids, any major new protocol should integrate fully >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
I'll be away from July 18th to August 8th with limited access to my emails so I will address new comments and questions when I come back. Cheers, David On Fri, Jul 15, 2022 at 2:16 PM David Jacot wrote: > > Hi Sagar, > > Thanks for your comments. > > 1) Yes. That refers to `Assignment#error`. Sure, I can mention it. > > 2) The idea is to transition C from his current assignment to his > target assignment when he can move to epoch 3. When that happens, the > member assignment is updated and persisted with all its assigned > partitions even if they are not all revoked yet. In other words, the > member assignment becomes the target assignment. This is basically an > optimization to avoid having to write all the changes to the log. The > examples are based on the persisted state so I understand the > confusion. Let me see if I can improve this in the description. > > 3) Regarding Connect, it could reuse the protocol with a client side > assignor if it fits in the protocol. The assignment is about > topicid-partitions + metadata, could Connect fit into this? > > Best, > David > > On Fri, Jul 15, 2022 at 1:55 PM Sagar wrote: > > > > Hi David, > > > > Thanks for the KIP. I just had minor observations: > > > > 1) In the Assignment Error section in Client Side mode Assignment process, > > you mentioned => `In this case, the client side assignor can return an > > error to the group coordinator`. In this case are you referring to the > > Assignor returning an AssignmentError that's listed down towards the end? > > If yes, do you think it would make sense to mention this explicitly here? > > > > 2) In the Case Studies section, I have a slight confusion, not sure if > > others have the same. Consider this step: > > > > When B heartbeats, the group coordinator transitions him to epoch 3 because > > B has no partitions to revoke. It persists the change and reply. > > > >- Group (epoch=3) > > - A > > - B > > - C > >- Target Assignment (epoch=3) > > - A - partitions=[foo-0] > > - B - partitions=[foo-2] > > - C - partitions=[foo-1] > >- Member Assignment > > - A - epoch=2, partitions=[foo-0, foo-1] > > - B - epoch=3, partitions=[foo-2] > > - C - epoch=3, partitions=[foo-1] > > > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet. > > > > Here,it's mentioned that member C can't get the foo-1 partition yet, but > > based on the description above, it seems it already has it. Do you think it > > would be better to remove it and populate it only when it actually gets it? > > I see this in a lot of other places, so have I understood it incorrectly ? > > > > > > Regarding connect , it might be out of scope of this discussion, but from > > what I understood it would probably be running in client side assignor mode > > even on the new rebalance protocol as it has its own Custom Assignors(Eager > > and IncrementalCooperative). > > > > Thanks! > > > > Sagar. > > > > > > > > > > > > > > On Fri, Jul 15, 2022 at 5:00 PM David Jacot > > wrote: > > > > > Thanks Hector! Our goal is to move forward with specialized API > > > instead of relying on one generic API. For Connect, we can apply the > > > exact same pattern and reuse/share the core implementation on the > > > server side. For the schema registry, I think that we should consider > > > having a tailored API to do simple membership/leader election. > > > > > > Best, > > > David > > > > > > On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma wrote: > > > > > > > > Three quick comments: > > > > > > > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we > > > should > > > > document the differences in more detail before deciding one way or > > > another. > > > > That said, if people pass java.util.regex.Pattern, they expect their > > > > semantics to be honored. If we are doing something different, then we > > > > should consider adding an overload with our own Pattern class (I don't > > > > think we'd want to expose re2j's at this point). > > > > 2. Regarding topic ids, any major new protocol should integrate fully > > > with > > > > it and should handle the topic recreation case correctly. That's the > > > > main > > > > part we need to handle. I agree with David that we'd want to add topic > > > ids > > > > to the relevant protocols that don't have it yet and that we can > > > > probably > > > > focus on the internals versus adding new APIs to the Java Consumer > > > (unless > > > > we find that adding new APIs is required for reasonable semantics). > > > > 3. I am still not sure about the coordinator storing the configs. It's > > > > powerful for configs to be centralized in the metadata log for various > > > > reasons (auditability, visibility, consistency, etc.). Similarly, I am > > > not > > > > sure about automatically deleting configs in a way that they cannot be > > > > recovered. A good property for modern systems is to minimize the number > > > of > > > > unrecoverable
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Sagar, Thanks for your comments. 1) Yes. That refers to `Assignment#error`. Sure, I can mention it. 2) The idea is to transition C from his current assignment to his target assignment when he can move to epoch 3. When that happens, the member assignment is updated and persisted with all its assigned partitions even if they are not all revoked yet. In other words, the member assignment becomes the target assignment. This is basically an optimization to avoid having to write all the changes to the log. The examples are based on the persisted state so I understand the confusion. Let me see if I can improve this in the description. 3) Regarding Connect, it could reuse the protocol with a client side assignor if it fits in the protocol. The assignment is about topicid-partitions + metadata, could Connect fit into this? Best, David On Fri, Jul 15, 2022 at 1:55 PM Sagar wrote: > > Hi David, > > Thanks for the KIP. I just had minor observations: > > 1) In the Assignment Error section in Client Side mode Assignment process, > you mentioned => `In this case, the client side assignor can return an > error to the group coordinator`. In this case are you referring to the > Assignor returning an AssignmentError that's listed down towards the end? > If yes, do you think it would make sense to mention this explicitly here? > > 2) In the Case Studies section, I have a slight confusion, not sure if > others have the same. Consider this step: > > When B heartbeats, the group coordinator transitions him to epoch 3 because > B has no partitions to revoke. It persists the change and reply. > >- Group (epoch=3) > - A > - B > - C >- Target Assignment (epoch=3) > - A - partitions=[foo-0] > - B - partitions=[foo-2] > - C - partitions=[foo-1] >- Member Assignment > - A - epoch=2, partitions=[foo-0, foo-1] > - B - epoch=3, partitions=[foo-2] > - C - epoch=3, partitions=[foo-1] > > When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet. > > Here,it's mentioned that member C can't get the foo-1 partition yet, but > based on the description above, it seems it already has it. Do you think it > would be better to remove it and populate it only when it actually gets it? > I see this in a lot of other places, so have I understood it incorrectly ? > > > Regarding connect , it might be out of scope of this discussion, but from > what I understood it would probably be running in client side assignor mode > even on the new rebalance protocol as it has its own Custom Assignors(Eager > and IncrementalCooperative). > > Thanks! > > Sagar. > > > > > > > On Fri, Jul 15, 2022 at 5:00 PM David Jacot > wrote: > > > Thanks Hector! Our goal is to move forward with specialized API > > instead of relying on one generic API. For Connect, we can apply the > > exact same pattern and reuse/share the core implementation on the > > server side. For the schema registry, I think that we should consider > > having a tailored API to do simple membership/leader election. > > > > Best, > > David > > > > On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma wrote: > > > > > > Three quick comments: > > > > > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we > > should > > > document the differences in more detail before deciding one way or > > another. > > > That said, if people pass java.util.regex.Pattern, they expect their > > > semantics to be honored. If we are doing something different, then we > > > should consider adding an overload with our own Pattern class (I don't > > > think we'd want to expose re2j's at this point). > > > 2. Regarding topic ids, any major new protocol should integrate fully > > with > > > it and should handle the topic recreation case correctly. That's the main > > > part we need to handle. I agree with David that we'd want to add topic > > ids > > > to the relevant protocols that don't have it yet and that we can probably > > > focus on the internals versus adding new APIs to the Java Consumer > > (unless > > > we find that adding new APIs is required for reasonable semantics). > > > 3. I am still not sure about the coordinator storing the configs. It's > > > powerful for configs to be centralized in the metadata log for various > > > reasons (auditability, visibility, consistency, etc.). Similarly, I am > > not > > > sure about automatically deleting configs in a way that they cannot be > > > recovered. A good property for modern systems is to minimize the number > > of > > > unrecoverable data loss scenarios. > > > > > > Ismael > > > > > > On Wed, Jul 13, 2022 at 3:47 PM David Jacot > > > > > wrote: > > > > > > > Thanks Guozhang. My answers are below: > > > > > > > > > 1) the migration path, especially the last step when clients flip the > > > > flag > > > > > to enable the new protocol, in which we would have a window where > > both > > > > new > > > > > protocols / rpcs and old protocols / rpcs are used by members of the > > same >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thanks, Ismael. > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we should > document the differences in more detail before deciding one way or another. > That said, if people pass java.util.regex.Pattern, they expect their > semantics to be honored. If we are doing something different, then we > should consider adding an overload with our own Pattern class (I don't > think we'd want to expose re2j's at this point). Yeah. I do agree with you. I looked a bit more into re2j and the differences are subtle. Simple regexes will work with both but special constructs or character classes won't work. I like the idea of having our own Pattern. That would make things really clear and avoid compatibility issues. I do agree that we should not expose re2j's Pattern in our public API. > 2. Regarding topic ids, any major new protocol should integrate fully with > it and should handle the topic recreation case correctly. That's the main > part we need to handle. I agree with David that we'd want to add topic ids > to the relevant protocols that don't have it yet and that we can probably > focus on the internals versus adding new APIs to the Java Consumer (unless > we find that adding new APIs is required for reasonable semantics). I wonder what we should do with the admin client though. We have a couple of apis related to offsets there. Do we want to allow users to fetch, get or delete offsets based on topic ids? On the server side, we will have to store the offsets based on the topic id. When a topic is recreated, we will likely have offsets stored with both the old topic id and the new topic id. From an admin perspective, we can't really differentiate them without letting the user use the topic id and providing the topic ids to him as well. One way would be to disambiguate them based on the group current assignment. I need to think a little more about this. > 3. I am still not sure about the coordinator storing the configs. It's > powerful for configs to be centralized in the metadata log for various > reasons (auditability, visibility, consistency, etc.). Similarly, I am not > sure about automatically deleting configs in a way that they cannot be > recovered. A good property for modern systems is to minimize the number of > unrecoverable data loss scenarios. I see your point. One concern that I have regarding storing them in the controller is that they will never be deleted unless the user does it. As users consider groups as throwing away resources, it will never happen. If we draw a parallel with topics, we do delete their configs when they are deleted. It is also worth pointing out that offsets are deleted when the group is expired and that is even worse than deleting configs in my opinion. One way would be to store configs in the controller and to let the coordinator delete them when a group is deleted. I suppose that the problem is the same because the configs are also automatically deleted. Things would be different if groups were a first class resource in the cluster. Best, David On Fri, Jul 15, 2022 at 1:30 PM David Jacot wrote: > > Thanks Hector! Our goal is to move forward with specialized API > instead of relying on one generic API. For Connect, we can apply the > exact same pattern and reuse/share the core implementation on the > server side. For the schema registry, I think that we should consider > having a tailored API to do simple membership/leader election. > > Best, > David > > On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma wrote: > > > > Three quick comments: > > > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we should > > document the differences in more detail before deciding one way or another. > > That said, if people pass java.util.regex.Pattern, they expect their > > semantics to be honored. If we are doing something different, then we > > should consider adding an overload with our own Pattern class (I don't > > think we'd want to expose re2j's at this point). > > 2. Regarding topic ids, any major new protocol should integrate fully with > > it and should handle the topic recreation case correctly. That's the main > > part we need to handle. I agree with David that we'd want to add topic ids > > to the relevant protocols that don't have it yet and that we can probably > > focus on the internals versus adding new APIs to the Java Consumer (unless > > we find that adding new APIs is required for reasonable semantics). > > 3. I am still not sure about the coordinator storing the configs. It's > > powerful for configs to be centralized in the metadata log for various > > reasons (auditability, visibility, consistency, etc.). Similarly, I am not > > sure about automatically deleting configs in a way that they cannot be > > recovered. A good property for modern systems is to minimize the number of > > unrecoverable data loss scenarios. > > > > Ismael > > > > On Wed, Jul 13, 2022 at 3:47 PM David Jacot > > wrote: > > > > > Thanks Guozhang. My answers
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi David, Thanks for the KIP. I just had minor observations: 1) In the Assignment Error section in Client Side mode Assignment process, you mentioned => `In this case, the client side assignor can return an error to the group coordinator`. In this case are you referring to the Assignor returning an AssignmentError that's listed down towards the end? If yes, do you think it would make sense to mention this explicitly here? 2) In the Case Studies section, I have a slight confusion, not sure if others have the same. Consider this step: When B heartbeats, the group coordinator transitions him to epoch 3 because B has no partitions to revoke. It persists the change and reply. - Group (epoch=3) - A - B - C - Target Assignment (epoch=3) - A - partitions=[foo-0] - B - partitions=[foo-2] - C - partitions=[foo-1] - Member Assignment - A - epoch=2, partitions=[foo-0, foo-1] - B - epoch=3, partitions=[foo-2] - C - epoch=3, partitions=[foo-1] When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet. Here,it's mentioned that member C can't get the foo-1 partition yet, but based on the description above, it seems it already has it. Do you think it would be better to remove it and populate it only when it actually gets it? I see this in a lot of other places, so have I understood it incorrectly ? Regarding connect , it might be out of scope of this discussion, but from what I understood it would probably be running in client side assignor mode even on the new rebalance protocol as it has its own Custom Assignors(Eager and IncrementalCooperative). Thanks! Sagar. On Fri, Jul 15, 2022 at 5:00 PM David Jacot wrote: > Thanks Hector! Our goal is to move forward with specialized API > instead of relying on one generic API. For Connect, we can apply the > exact same pattern and reuse/share the core implementation on the > server side. For the schema registry, I think that we should consider > having a tailored API to do simple membership/leader election. > > Best, > David > > On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma wrote: > > > > Three quick comments: > > > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we > should > > document the differences in more detail before deciding one way or > another. > > That said, if people pass java.util.regex.Pattern, they expect their > > semantics to be honored. If we are doing something different, then we > > should consider adding an overload with our own Pattern class (I don't > > think we'd want to expose re2j's at this point). > > 2. Regarding topic ids, any major new protocol should integrate fully > with > > it and should handle the topic recreation case correctly. That's the main > > part we need to handle. I agree with David that we'd want to add topic > ids > > to the relevant protocols that don't have it yet and that we can probably > > focus on the internals versus adding new APIs to the Java Consumer > (unless > > we find that adding new APIs is required for reasonable semantics). > > 3. I am still not sure about the coordinator storing the configs. It's > > powerful for configs to be centralized in the metadata log for various > > reasons (auditability, visibility, consistency, etc.). Similarly, I am > not > > sure about automatically deleting configs in a way that they cannot be > > recovered. A good property for modern systems is to minimize the number > of > > unrecoverable data loss scenarios. > > > > Ismael > > > > On Wed, Jul 13, 2022 at 3:47 PM David Jacot > > > wrote: > > > > > Thanks Guozhang. My answers are below: > > > > > > > 1) the migration path, especially the last step when clients flip the > > > flag > > > > to enable the new protocol, in which we would have a window where > both > > > new > > > > protocols / rpcs and old protocols / rpcs are used by members of the > same > > > > group. How the coordinator could "mimic" the old behavior while > using the > > > > new protocol is something we need to present about. > > > > > > Noted. I just published a new version of KIP which includes more > > > details about this. See the "Supporting Online Consumer Group Upgrade" > > > and the "Compatibility, Deprecation, and Migration Plan". I think that > > > I have to think through a few cases now but the overall idea and > > > mechanism should be understandable. > > > > > > > 2) the usage of topic ids. So far as KIP-516 the topic ids are only > used > > > as > > > > part of RPCs and admin client, but they are not exposed via any > public > > > APIs > > > > to consumers yet. I think the question is, first should we let the > > > consumer > > > > client to be maintaining the names -> ids mapping itself to fully > > > leverage > > > > on all the augmented existing RPCs and the new RPCs with the topic > ids; > > > and > > > > secondly, should we ever consider exposing the topic ids in the > consumer > > > > public APIs as well (both subscribe/assign, as well as
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thanks Hector! Our goal is to move forward with specialized API instead of relying on one generic API. For Connect, we can apply the exact same pattern and reuse/share the core implementation on the server side. For the schema registry, I think that we should consider having a tailored API to do simple membership/leader election. Best, David On Fri, Jul 15, 2022 at 10:22 AM Ismael Juma wrote: > > Three quick comments: > > 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we should > document the differences in more detail before deciding one way or another. > That said, if people pass java.util.regex.Pattern, they expect their > semantics to be honored. If we are doing something different, then we > should consider adding an overload with our own Pattern class (I don't > think we'd want to expose re2j's at this point). > 2. Regarding topic ids, any major new protocol should integrate fully with > it and should handle the topic recreation case correctly. That's the main > part we need to handle. I agree with David that we'd want to add topic ids > to the relevant protocols that don't have it yet and that we can probably > focus on the internals versus adding new APIs to the Java Consumer (unless > we find that adding new APIs is required for reasonable semantics). > 3. I am still not sure about the coordinator storing the configs. It's > powerful for configs to be centralized in the metadata log for various > reasons (auditability, visibility, consistency, etc.). Similarly, I am not > sure about automatically deleting configs in a way that they cannot be > recovered. A good property for modern systems is to minimize the number of > unrecoverable data loss scenarios. > > Ismael > > On Wed, Jul 13, 2022 at 3:47 PM David Jacot > wrote: > > > Thanks Guozhang. My answers are below: > > > > > 1) the migration path, especially the last step when clients flip the > > flag > > > to enable the new protocol, in which we would have a window where both > > new > > > protocols / rpcs and old protocols / rpcs are used by members of the same > > > group. How the coordinator could "mimic" the old behavior while using the > > > new protocol is something we need to present about. > > > > Noted. I just published a new version of KIP which includes more > > details about this. See the "Supporting Online Consumer Group Upgrade" > > and the "Compatibility, Deprecation, and Migration Plan". I think that > > I have to think through a few cases now but the overall idea and > > mechanism should be understandable. > > > > > 2) the usage of topic ids. So far as KIP-516 the topic ids are only used > > as > > > part of RPCs and admin client, but they are not exposed via any public > > APIs > > > to consumers yet. I think the question is, first should we let the > > consumer > > > client to be maintaining the names -> ids mapping itself to fully > > leverage > > > on all the augmented existing RPCs and the new RPCs with the topic ids; > > and > > > secondly, should we ever consider exposing the topic ids in the consumer > > > public APIs as well (both subscribe/assign, as well as in the rebalance > > > listener for cases like topic deletion-and-recreation). > > > > a) Assuming that we would include converting all the offsets related > > RPCs to using topic ids in this KIP, the consumer would be able to > > fully operate with topic ids. That being said, it still has to provide > > the topics names in various APIs so having a mapping in the consumer > > seems inevitable to me. > > b) I don't have a strong opinion on this. Here I wonder if this goes > > beyond the scope of this KIP. I would rather focus on the internals > > here and we can consider this separately if we see value in doing it. > > > > Coming back to Ismael's point about using topic ids in the > > ConsumerGroupHeartbeatRequest, I think that there is one advantage in > > favour of it. The consumer will have the opportunity to validate that > > the topics exists before passing them into the group rebalance > > protocol. Obviously, the coordinator will also notice it but it does > > not really have a way to reject an invalid topic in the response. > > > > > I'm agreeing with David on all other minor questions except for the > > > `subscribe(Pattern)` question: personally I think it's not necessary to > > > deprecate the subscribe API with Pattern, but instead we still use > > Pattern > > > while just documenting that our subscription may be rejected by the > > server. > > > Since the incompatible case is a very rare scenario I felt using an > > > overloaded `String` based subscription may be more vulnerable to various > > > invalid regexes. > > > > That could work. I have to look at the differences between the two > > engines to better understand the potential issues. My understanding is > > that would work for all the basic regular expressions. The differences > > between the two are mainly about the various character classes. I > > wonder what other people
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Three quick comments: 1. Regarding java.util.regex.Pattern vs com.google.re2j.Pattern, we should document the differences in more detail before deciding one way or another. That said, if people pass java.util.regex.Pattern, they expect their semantics to be honored. If we are doing something different, then we should consider adding an overload with our own Pattern class (I don't think we'd want to expose re2j's at this point). 2. Regarding topic ids, any major new protocol should integrate fully with it and should handle the topic recreation case correctly. That's the main part we need to handle. I agree with David that we'd want to add topic ids to the relevant protocols that don't have it yet and that we can probably focus on the internals versus adding new APIs to the Java Consumer (unless we find that adding new APIs is required for reasonable semantics). 3. I am still not sure about the coordinator storing the configs. It's powerful for configs to be centralized in the metadata log for various reasons (auditability, visibility, consistency, etc.). Similarly, I am not sure about automatically deleting configs in a way that they cannot be recovered. A good property for modern systems is to minimize the number of unrecoverable data loss scenarios. Ismael On Wed, Jul 13, 2022 at 3:47 PM David Jacot wrote: > Thanks Guozhang. My answers are below: > > > 1) the migration path, especially the last step when clients flip the > flag > > to enable the new protocol, in which we would have a window where both > new > > protocols / rpcs and old protocols / rpcs are used by members of the same > > group. How the coordinator could "mimic" the old behavior while using the > > new protocol is something we need to present about. > > Noted. I just published a new version of KIP which includes more > details about this. See the "Supporting Online Consumer Group Upgrade" > and the "Compatibility, Deprecation, and Migration Plan". I think that > I have to think through a few cases now but the overall idea and > mechanism should be understandable. > > > 2) the usage of topic ids. So far as KIP-516 the topic ids are only used > as > > part of RPCs and admin client, but they are not exposed via any public > APIs > > to consumers yet. I think the question is, first should we let the > consumer > > client to be maintaining the names -> ids mapping itself to fully > leverage > > on all the augmented existing RPCs and the new RPCs with the topic ids; > and > > secondly, should we ever consider exposing the topic ids in the consumer > > public APIs as well (both subscribe/assign, as well as in the rebalance > > listener for cases like topic deletion-and-recreation). > > a) Assuming that we would include converting all the offsets related > RPCs to using topic ids in this KIP, the consumer would be able to > fully operate with topic ids. That being said, it still has to provide > the topics names in various APIs so having a mapping in the consumer > seems inevitable to me. > b) I don't have a strong opinion on this. Here I wonder if this goes > beyond the scope of this KIP. I would rather focus on the internals > here and we can consider this separately if we see value in doing it. > > Coming back to Ismael's point about using topic ids in the > ConsumerGroupHeartbeatRequest, I think that there is one advantage in > favour of it. The consumer will have the opportunity to validate that > the topics exists before passing them into the group rebalance > protocol. Obviously, the coordinator will also notice it but it does > not really have a way to reject an invalid topic in the response. > > > I'm agreeing with David on all other minor questions except for the > > `subscribe(Pattern)` question: personally I think it's not necessary to > > deprecate the subscribe API with Pattern, but instead we still use > Pattern > > while just documenting that our subscription may be rejected by the > server. > > Since the incompatible case is a very rare scenario I felt using an > > overloaded `String` based subscription may be more vulnerable to various > > invalid regexes. > > That could work. I have to look at the differences between the two > engines to better understand the potential issues. My understanding is > that would work for all the basic regular expressions. The differences > between the two are mainly about the various character classes. I > wonder what other people think about this. > > Best, > David > > On Tue, Jul 12, 2022 at 11:28 PM Guozhang Wang wrote: > > > > Thanks David! I think on the high level there are two meta points we need > > to concretize a bit more: > > > > 1) the migration path, especially the last step when clients flip the > flag > > to enable the new protocol, in which we would have a window where both > new > > protocols / rpcs and old protocols / rpcs are used by members of the same > > group. How the coordinator could "mimic" the old behavior while using the > > new protocol is something we need to
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thanks Hector! Yes, making the templated group "type" with extensible handling logic is part of the motivation of this rebalance protocol. Guozhang On Thu, Jul 14, 2022 at 10:35 AM Hector Geraldino (BLOOMBERG/ 919 3RD A) < hgerald...@bloomberg.net> wrote: > Kudos David, Guozhang, and Jason for putting together such a great > proposal. > > I don't want to hijack the discussion, just wanted to mention that it > would be great if the final design is made extensible enough, so other use > cases (like Kafka Connect, Schema Registry, etc.) can be added later on. > > I can see how the concept of different group "types" in the group > coordinator can be leveraged to support such cases. On KIP-795, I wanted to > add public APIs for the AbstractCoordinator with the intent of formalizing > the use of the Group Membership Protocol for resource management use cases. > I'll probably close this KIP and wait to see what comes out of this > redesign of the protocol. > > Thanks > > - > https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator > > From: dev@kafka.apache.org At: 07/06/22 04:44:59 UTC-4:00To: > dev@kafka.apache.org > Subject: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance > Protocol > > Hi all, > > I would like to start a discussion thread on KIP-848: The Next > Generation of the Consumer Rebalance Protocol. With this KIP, we aim > to make the rebalance protocol (for consumers) more reliable, more > scalable, easier to implement for clients, and easier to debug for > operators. > > The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D. > > Please take a look and let me know what you think. > > Best, > David > > PS: I will be away from July 18th to August 8th. That gives you a bit > of time to read and digest this long KIP. > > > -- -- Guozhang
Re:[DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Kudos David, Guozhang, and Jason for putting together such a great proposal. I don't want to hijack the discussion, just wanted to mention that it would be great if the final design is made extensible enough, so other use cases (like Kafka Connect, Schema Registry, etc.) can be added later on. I can see how the concept of different group "types" in the group coordinator can be leveraged to support such cases. On KIP-795, I wanted to add public APIs for the AbstractCoordinator with the intent of formalizing the use of the Group Membership Protocol for resource management use cases. I'll probably close this KIP and wait to see what comes out of this redesign of the protocol. Thanks - https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator From: dev@kafka.apache.org At: 07/06/22 04:44:59 UTC-4:00To: dev@kafka.apache.org Subject: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol Hi all, I would like to start a discussion thread on KIP-848: The Next Generation of the Consumer Rebalance Protocol. With this KIP, we aim to make the rebalance protocol (for consumers) more reliable, more scalable, easier to implement for clients, and easier to debug for operators. The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D. Please take a look and let me know what you think. Best, David PS: I will be away from July 18th to August 8th. That gives you a bit of time to read and digest this long KIP.
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thanks Guozhang. My answers are below: > 1) the migration path, especially the last step when clients flip the flag > to enable the new protocol, in which we would have a window where both new > protocols / rpcs and old protocols / rpcs are used by members of the same > group. How the coordinator could "mimic" the old behavior while using the > new protocol is something we need to present about. Noted. I just published a new version of KIP which includes more details about this. See the "Supporting Online Consumer Group Upgrade" and the "Compatibility, Deprecation, and Migration Plan". I think that I have to think through a few cases now but the overall idea and mechanism should be understandable. > 2) the usage of topic ids. So far as KIP-516 the topic ids are only used as > part of RPCs and admin client, but they are not exposed via any public APIs > to consumers yet. I think the question is, first should we let the consumer > client to be maintaining the names -> ids mapping itself to fully leverage > on all the augmented existing RPCs and the new RPCs with the topic ids; and > secondly, should we ever consider exposing the topic ids in the consumer > public APIs as well (both subscribe/assign, as well as in the rebalance > listener for cases like topic deletion-and-recreation). a) Assuming that we would include converting all the offsets related RPCs to using topic ids in this KIP, the consumer would be able to fully operate with topic ids. That being said, it still has to provide the topics names in various APIs so having a mapping in the consumer seems inevitable to me. b) I don't have a strong opinion on this. Here I wonder if this goes beyond the scope of this KIP. I would rather focus on the internals here and we can consider this separately if we see value in doing it. Coming back to Ismael's point about using topic ids in the ConsumerGroupHeartbeatRequest, I think that there is one advantage in favour of it. The consumer will have the opportunity to validate that the topics exists before passing them into the group rebalance protocol. Obviously, the coordinator will also notice it but it does not really have a way to reject an invalid topic in the response. > I'm agreeing with David on all other minor questions except for the > `subscribe(Pattern)` question: personally I think it's not necessary to > deprecate the subscribe API with Pattern, but instead we still use Pattern > while just documenting that our subscription may be rejected by the server. > Since the incompatible case is a very rare scenario I felt using an > overloaded `String` based subscription may be more vulnerable to various > invalid regexes. That could work. I have to look at the differences between the two engines to better understand the potential issues. My understanding is that would work for all the basic regular expressions. The differences between the two are mainly about the various character classes. I wonder what other people think about this. Best, David On Tue, Jul 12, 2022 at 11:28 PM Guozhang Wang wrote: > > Thanks David! I think on the high level there are two meta points we need > to concretize a bit more: > > 1) the migration path, especially the last step when clients flip the flag > to enable the new protocol, in which we would have a window where both new > protocols / rpcs and old protocols / rpcs are used by members of the same > group. How the coordinator could "mimic" the old behavior while using the > new protocol is something we need to present about. > 2) the usage of topic ids. So far as KIP-516 the topic ids are only used as > part of RPCs and admin client, but they are not exposed via any public APIs > to consumers yet. I think the question is, first should we let the consumer > client to be maintaining the names -> ids mapping itself to fully leverage > on all the augmented existing RPCs and the new RPCs with the topic ids; and > secondly, should we ever consider exposing the topic ids in the consumer > public APIs as well (both subscribe/assign, as well as in the rebalance > listener for cases like topic deletion-and-recreation). > > I'm agreeing with David on all other minor questions except for the > `subscribe(Pattern)` question: personally I think it's not necessary to > deprecate the subscribe API with Pattern, but instead we still use Pattern > while just documenting that our subscription may be rejected by the server. > Since the incompatible case is a very rare scenario I felt using an > overloaded `String` based subscription may be more vulnerable to various > invalid regexes. > > > Guozhang > > On Tue, Jul 12, 2022 at 5:23 AM David Jacot > wrote: > > > Hi Ismael, > > > > Thanks for your feedback. Let me answer your questions inline. > > > > > 1. I think it's premature to talk about target versions for deprecation > > and > > > removal of the existing group protocol. Unlike KRaft, this affects a core > > > client protocol and hence deprecation/removal will be heavily
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thanks David! I think on the high level there are two meta points we need to concretize a bit more: 1) the migration path, especially the last step when clients flip the flag to enable the new protocol, in which we would have a window where both new protocols / rpcs and old protocols / rpcs are used by members of the same group. How the coordinator could "mimic" the old behavior while using the new protocol is something we need to present about. 2) the usage of topic ids. So far as KIP-516 the topic ids are only used as part of RPCs and admin client, but they are not exposed via any public APIs to consumers yet. I think the question is, first should we let the consumer client to be maintaining the names -> ids mapping itself to fully leverage on all the augmented existing RPCs and the new RPCs with the topic ids; and secondly, should we ever consider exposing the topic ids in the consumer public APIs as well (both subscribe/assign, as well as in the rebalance listener for cases like topic deletion-and-recreation). I'm agreeing with David on all other minor questions except for the `subscribe(Pattern)` question: personally I think it's not necessary to deprecate the subscribe API with Pattern, but instead we still use Pattern while just documenting that our subscription may be rejected by the server. Since the incompatible case is a very rare scenario I felt using an overloaded `String` based subscription may be more vulnerable to various invalid regexes. Guozhang On Tue, Jul 12, 2022 at 5:23 AM David Jacot wrote: > Hi Ismael, > > Thanks for your feedback. Let me answer your questions inline. > > > 1. I think it's premature to talk about target versions for deprecation > and > > removal of the existing group protocol. Unlike KRaft, this affects a core > > client protocol and hence deprecation/removal will be heavily dependent > on > > how quickly applications migrate to the new protocol. > > That makes sense. I will remove it. > > > 2. The KIP says we intend to release this in 4.x, but it wasn't made > clear > > why. If we added that as a way to estimate when we'd deprecate and remove > > the group protocol, I also suggest removing this part. > > Let me explain my reasoning. As explained, I plan to rewrite the group > coordinator in Java while we implement the new protocol. This means > that the internals will be slightly different (e.g. threading model). > Therefore, I wanted to tighten the switch from the old group > coordinator to the new group coordinator to a major release. The > alternative would be to use a flag to do the switch instead of relying > on the software upgrade. > > > 3. We need to flesh out the details of the migration story. It sounds > like > > we're saying we will support online migrations. Is that correct? We > should > > explain this in detail. It could also be done as a separate KIP, if it's > > easier. > > Yes, we will support online migrations for the group. That means that > a group using the old protocol will be able to switch to the new > protocol. > > Let me briefly explain how that will work though. It is basically a > four step process: > > 1. The cluster must be upgraded or rolled to a software supporting the > new group coordinator. Both the old and the new coordinator will > support the old protocol and rely on the same persisted metadata so > they can work together. This point is an offline migration. We cannot > do this one live because it would require shutting down the current > coordinator and starting up the new one and that would cause > unavailabilities. > 2. The cluster's metadata version/IBP must be upgraded to X in order > to enable the new protocol. This cannot be done before 1) is > terminated because the old coordinator doesn't support the new > protocol. > 3. The consumers must be upgraded to a version supporting the online > migration (must have KIP-792). If the consumer is already there. > Nothing must be done at this point. > 4. The consumers must be rolled with the feature flag turned on. The > consumer group is automatically converted when the first consumer > using the new protocol joins the group. While the members using the > old protocol are being upgraded, the old protocol is proxied into the > new one. > > Let me clarify all of this in the KIP. > > > 4. I am happy that we are pushing the pattern subscriptions to the > server, > > but it seems like there could be some tricky compatibility issues. Will > we > > have a mechanism for users to detect that they need to update their regex > > before switching to the new protocol? > > I think that I am a bit more optimistic than you on this point. I > believe that the majority of the cases are simple regexes which should > work with the new engine. The coordinator will verify the regex anyway > and reject the consumer if the regex is not valid. Coming back to the > migration path, in the worst case, the first upgraded consumer joining > the group will be rejected. This should be used as the last
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Ismael, Thanks for your feedback. Let me answer your questions inline. > 1. I think it's premature to talk about target versions for deprecation and > removal of the existing group protocol. Unlike KRaft, this affects a core > client protocol and hence deprecation/removal will be heavily dependent on > how quickly applications migrate to the new protocol. That makes sense. I will remove it. > 2. The KIP says we intend to release this in 4.x, but it wasn't made clear > why. If we added that as a way to estimate when we'd deprecate and remove > the group protocol, I also suggest removing this part. Let me explain my reasoning. As explained, I plan to rewrite the group coordinator in Java while we implement the new protocol. This means that the internals will be slightly different (e.g. threading model). Therefore, I wanted to tighten the switch from the old group coordinator to the new group coordinator to a major release. The alternative would be to use a flag to do the switch instead of relying on the software upgrade. > 3. We need to flesh out the details of the migration story. It sounds like > we're saying we will support online migrations. Is that correct? We should > explain this in detail. It could also be done as a separate KIP, if it's > easier. Yes, we will support online migrations for the group. That means that a group using the old protocol will be able to switch to the new protocol. Let me briefly explain how that will work though. It is basically a four step process: 1. The cluster must be upgraded or rolled to a software supporting the new group coordinator. Both the old and the new coordinator will support the old protocol and rely on the same persisted metadata so they can work together. This point is an offline migration. We cannot do this one live because it would require shutting down the current coordinator and starting up the new one and that would cause unavailabilities. 2. The cluster's metadata version/IBP must be upgraded to X in order to enable the new protocol. This cannot be done before 1) is terminated because the old coordinator doesn't support the new protocol. 3. The consumers must be upgraded to a version supporting the online migration (must have KIP-792). If the consumer is already there. Nothing must be done at this point. 4. The consumers must be rolled with the feature flag turned on. The consumer group is automatically converted when the first consumer using the new protocol joins the group. While the members using the old protocol are being upgraded, the old protocol is proxied into the new one. Let me clarify all of this in the KIP. > 4. I am happy that we are pushing the pattern subscriptions to the server, > but it seems like there could be some tricky compatibility issues. Will we > have a mechanism for users to detect that they need to update their regex > before switching to the new protocol? I think that I am a bit more optimistic than you on this point. I believe that the majority of the cases are simple regexes which should work with the new engine. The coordinator will verify the regex anyway and reject the consumer if the regex is not valid. Coming back to the migration path, in the worst case, the first upgraded consumer joining the group will be rejected. This should be used as the last defence, I would say. One way for customers to validate their regex before upgrading their prod would be to test them with another group. For instance, that could be done in a pre-prod environment. Another way would be to extend the consumer-group tool to provide a regex validation mechanism. Would this be enough in your opinion? > 5. Related to the last question, will the Java client allow the users to > stick with the current regex engine for compatibility reasons? For example, > it may be handy to keep using client based regex at first to keep > migrations simple and then migrate to server based regexes as a second step. I understand your point but I am concerned that this would allow users to actually stay in this mode. That would go against our goal of simplifying the client because we would have to continue monitoring the metadata on the client side. I would rather not do this. > 6. When we say that the group coordinator will be responsible for storing > the configurations and that the configurations will be deleted when the > group is deleted. Will a transition to DEAD trigger deletion of > configurations? That's right. The configurations will be deleted when the group is deleted. They go together. > 7. Will the choice to store the configs in the group coordinator make it > harder to list all cluster configs and their values? I don't think so. The group configurations are overrides of cluster configs. If you want to know all the overrides though, you would have to ask all the group coordinators. You cannot rely on the metadata log for instance. > 8. How would someone configure a group before starting the consumers? Have > we considered
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Justine, Thanks for your comments. Please find my answers below. - Yes, the new protocol relies on topic IDs with the exception of the topic names based in the ConsumerGroupHeartbeatRequest. I am not sure if using topic names is the right call here. I need to think about it a little more. Obviously, the KIP does not change the fetch/commit offsets RPCs to use topic IDs. This may be something that we should include though as it would give better overall guarantee in the producer. - You're right. I think that I should not have mentioned this flag at all. I will remove it. We can use an internal configuration while developing the feature. - Both cluster types will be supported. The change is orthogonal. The only requirement is that the cluster uses topic IDs. Best, David On Mon, Jul 11, 2022 at 9:53 PM Guozhang Wang wrote: > > Hi Ismael, > > Thanks for the feedback. Here are some replies inlined below: > > On Sat, Jul 9, 2022 at 2:53 AM Ismael Juma wrote: > > > Thanks for the KIP. This has the potential to be a great improvement. A few > > initial questions/comments: > > > > 1. I think it's premature to talk about target versions for deprecation and > > removal of the existing group protocol. Unlike KRaft, this affects a core > > client protocol and hence deprecation/removal will be heavily dependent on > > how quickly applications migrate to the new protocol. > > > > Yeah I agree with you. I think we can remove the proposed timeline in the > `Compatibility, Deprecation, and Migration Plan` and instead just state > that we will decide in the future about when we would deprecate old > protocol and behaviors. > > > > 2. The KIP says we intend to release this in 4.x, but it wasn't made clear > > why. If we added that as a way to estimate when we'd deprecate and remove > > the group protocol, I also suggest removing this part. > > > > I think that's not specifically related to the deprecation/removal timeline > plan, but it's more for client upgrades. I.e. the broker-side > implementation may be done first, and then the client side, and we would > only mark it as "released" by the time clients implementations are done. At > that time, to enable the feature the clients need to first swap-in the > bytecode with a rolling bounce and then set the flag with a second rolling > bounce, and hence we feel it's better to be released in a major version. > > > > 3. We need to flesh out the details of the migration story. It sounds like > > we're saying we will support online migrations. Is that correct? We should > > explain this in detail. It could also be done as a separate KIP, if it's > > easier. > > > > Yes I think that's the part we can be more concrete about for sure (and > this is related to your question 2) above). We will work on making it more > explicit in parallel as we solicit more feedback. > > > > 4. I am happy that we are pushing the pattern subscriptions to the server, > > but it seems like there could be some tricky compatibility issues. Will we > > have a mechanism for users to detect that they need to update their regex > > before switching to the new protocol? > > > > Yes I think we need some tooling for non-java client users to sort of > "dry-run" the client before switching to the new protocol. I do not have a > specific idea on top of my head though, maybe others like @Matt Howlett can > chime-in here? > > > > 5. Related to the last question, will the Java client allow the users to > > stick with the current regex engine for compatibility reasons? For example, > > it may be handy to keep using client based regex at first to keep > > migrations simple and then migrate to server based regexes as a second > > step. > > > > Honestly I have not thought about that for java clients, and we can discuss > that. What kind of compatibility issues do you have in mind? > > > > 6. When we say that the group coordinator will be responsible for storing > > the configurations and that the configurations will be deleted when the > > group is deleted. Will a transition to DEAD trigger deletion of > > configurations? > > > > Yes, since the DEAD state is an ending state (we would only transit to that > state when the group is EMPTY and also all of its metadata are gone), once > it's transited to DEAD this group would never be revived. > > > > 7. Will the choice to store the configs in the group coordinator make it > > harder to list all cluster configs and their values? > > > > That's a good question, and our thoughts are that the so-called "group > configurations" are overrides of the cluster-level configurations > customized per group so when an admin list cluster configs it's okay to > list just the cluster-level "defaults", not showing any per-group > customizations. > > > > 8. How would someone configure a group before starting the consumers? Have > > we considered allowing the explicit creation of groups? Alternatively, the > > configs could be decoupled from the group lifecycle. > > > > The configs can
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Ismael, Thanks for the feedback. Here are some replies inlined below: On Sat, Jul 9, 2022 at 2:53 AM Ismael Juma wrote: > Thanks for the KIP. This has the potential to be a great improvement. A few > initial questions/comments: > > 1. I think it's premature to talk about target versions for deprecation and > removal of the existing group protocol. Unlike KRaft, this affects a core > client protocol and hence deprecation/removal will be heavily dependent on > how quickly applications migrate to the new protocol. > Yeah I agree with you. I think we can remove the proposed timeline in the `Compatibility, Deprecation, and Migration Plan` and instead just state that we will decide in the future about when we would deprecate old protocol and behaviors. > 2. The KIP says we intend to release this in 4.x, but it wasn't made clear > why. If we added that as a way to estimate when we'd deprecate and remove > the group protocol, I also suggest removing this part. > I think that's not specifically related to the deprecation/removal timeline plan, but it's more for client upgrades. I.e. the broker-side implementation may be done first, and then the client side, and we would only mark it as "released" by the time clients implementations are done. At that time, to enable the feature the clients need to first swap-in the bytecode with a rolling bounce and then set the flag with a second rolling bounce, and hence we feel it's better to be released in a major version. > 3. We need to flesh out the details of the migration story. It sounds like > we're saying we will support online migrations. Is that correct? We should > explain this in detail. It could also be done as a separate KIP, if it's > easier. > Yes I think that's the part we can be more concrete about for sure (and this is related to your question 2) above). We will work on making it more explicit in parallel as we solicit more feedback. > 4. I am happy that we are pushing the pattern subscriptions to the server, > but it seems like there could be some tricky compatibility issues. Will we > have a mechanism for users to detect that they need to update their regex > before switching to the new protocol? > Yes I think we need some tooling for non-java client users to sort of "dry-run" the client before switching to the new protocol. I do not have a specific idea on top of my head though, maybe others like @Matt Howlett can chime-in here? > 5. Related to the last question, will the Java client allow the users to > stick with the current regex engine for compatibility reasons? For example, > it may be handy to keep using client based regex at first to keep > migrations simple and then migrate to server based regexes as a second > step. > Honestly I have not thought about that for java clients, and we can discuss that. What kind of compatibility issues do you have in mind? > 6. When we say that the group coordinator will be responsible for storing > the configurations and that the configurations will be deleted when the > group is deleted. Will a transition to DEAD trigger deletion of > configurations? > Yes, since the DEAD state is an ending state (we would only transit to that state when the group is EMPTY and also all of its metadata are gone), once it's transited to DEAD this group would never be revived. > 7. Will the choice to store the configs in the group coordinator make it > harder to list all cluster configs and their values? > That's a good question, and our thoughts are that the so-called "group configurations" are overrides of the cluster-level configurations customized per group so when an admin list cluster configs it's okay to list just the cluster-level "defaults", not showing any per-group customizations. > 8. How would someone configure a group before starting the consumers? Have > we considered allowing the explicit creation of groups? Alternatively, the > configs could be decoupled from the group lifecycle. > The configs can be created before the group itself as an independent entity --- of course, this requires the corresponding request to be routed to the right coordinator based on the group id --- the only thing that differs is, when the group itself is gone we also check if there are any configuration entities related to that group and delete as well. Admittedly this indeed introduces an asymmetry on the creation / deletion lifecycles of the config entities, and we would like to hear everyone's feelings whether we should aim for symmetry i.e. totally decouple group configs and hence not delete them at all when the group is gone, but always require explicit deletion operations by themselves. > 9. Will the Consumer.subscribe method for the Java client still take a > `java.util.regex.Pattern` of do we have to introduce an overload? > I think we do not need to introduce an overload, but I'm all ears if there may be some compatibility issues that we may overlook. > 10. I agree with Justine that we should be
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi Justine, Thanks for sharing your feedback! Here are some thoughts inlined below: On Fri, Jul 8, 2022 at 11:33 AM Justine Olshan wrote: > Hi David, > Thanks for sharing this KIP! Really exciting to hear how we are changing > the protocol! The motivation section really made me realize how useful this > change will be. > > I've done a first pass of the KIP, and may have more questions, but thought > I'd start with a few I thought of already. > >- I saw some usages of topic IDs in the new >protocols/records/interfaces, but wasn't sure if they were used > everywhere. >Are you planning on relying on topic IDs for the new protocol? > Yes we do plan to use topic IDs in the newly introduced RPC protocols (e.g. the new heartbeat req/resp), while keep the status quo for the existing protocols (e.g. the offset fetch / commit req/resp) that already encoded with topic names. >- I saw the section about using a feature flag first before integrating >the feature with ibp/metadata version. I understand the logic for > testing >with the flag, but it also seems like a bit of work to deprecate and > switch >to the ibp/metadata version approach. What was the reasoning behind >switching the enablement mechanism? > The main rationale is on the clients side, since such a behavioral change cannot be captured only by the apikey-version itself as the clients needs to behave differently with the new protocol even before they talk to the group coordinator. >- Generally, are there implications for KRaft here? (IBP/metadata >version is something that I think of) And if so, will both cluster > types be >supported? > Besides IBP / metadata, the only think I could think of is the group state machine snapshotting we may want to consider in the future with KRaft's extended snapshot capabilities (i.e. KIP-630). > > Thanks again to everyone who worked on this KIP! > Justine > > On Wed, Jul 6, 2022 at 1:45 AM David Jacot > wrote: > > > Hi all, > > > > I would like to start a discussion thread on KIP-848: The Next > > Generation of the Consumer Rebalance Protocol. With this KIP, we aim > > to make the rebalance protocol (for consumers) more reliable, more > > scalable, easier to implement for clients, and easier to debug for > > operators. > > > > The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D. > > > > Please take a look and let me know what you think. > > > > Best, > > David > > > > PS: I will be away from July 18th to August 8th. That gives you a bit > > of time to read and digest this long KIP. > > > -- -- Guozhang
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Thanks for the KIP. This has the potential to be a great improvement. A few initial questions/comments: 1. I think it's premature to talk about target versions for deprecation and removal of the existing group protocol. Unlike KRaft, this affects a core client protocol and hence deprecation/removal will be heavily dependent on how quickly applications migrate to the new protocol. 2. The KIP says we intend to release this in 4.x, but it wasn't made clear why. If we added that as a way to estimate when we'd deprecate and remove the group protocol, I also suggest removing this part. 3. We need to flesh out the details of the migration story. It sounds like we're saying we will support online migrations. Is that correct? We should explain this in detail. It could also be done as a separate KIP, if it's easier. 4. I am happy that we are pushing the pattern subscriptions to the server, but it seems like there could be some tricky compatibility issues. Will we have a mechanism for users to detect that they need to update their regex before switching to the new protocol? 5. Related to the last question, will the Java client allow the users to stick with the current regex engine for compatibility reasons? For example, it may be handy to keep using client based regex at first to keep migrations simple and then migrate to server based regexes as a second step. 6. When we say that the group coordinator will be responsible for storing the configurations and that the configurations will be deleted when the group is deleted. Will a transition to DEAD trigger deletion of configurations? 7. Will the choice to store the configs in the group coordinator make it harder to list all cluster configs and their values? 8. How would someone configure a group before starting the consumers? Have we considered allowing the explicit creation of groups? Alternatively, the configs could be decoupled from the group lifecycle. 9. Will the Consumer.subscribe method for the Java client still take a `java.util.regex.Pattern` of do we have to introduce an overload? 10. I agree with Justine that we should be clearer about the reason to switch to IBP/metadata.version from the feature flag. Maybe we mean that we can switch the default for the feature flag to true based on the metadata.version once we want to make it the default. 11. Some of the protocol APIs don't mention the required ACLs, it would be good to add that for consistency. 12. It is a bit odd that ConsumerGroupHeartbeat requires "Read Group" even though it seems to do more than reading. 13. How is topic recreation handled by the consumer with the new group protocol? It would be good to have a section on this. 14. The KIP mentions we will write the new coordinator in Java. Even though this is an implementation detail, do we plan to have a new gradle module for it? 15. Do we have a scalability goal when it comes to how many members the new group protocol can support? 16. Did we consider having SubscribedTopidIds instead of SubscribedTopicNames in ConsumerGroupHeartbeatRequest? Is the idea that since we have to resolve the regex on the server, we can do the same for the topic name? The difference is that sending the regex is more efficient whereas sending the topic names is less efficient. Furthermore, delete and recreation is easier to handle if we have topic ids. Thanks, Ismael On Wed, Jul 6, 2022 at 10:45 AM David Jacot wrote: > Hi all, > > I would like to start a discussion thread on KIP-848: The Next > Generation of the Consumer Rebalance Protocol. With this KIP, we aim > to make the rebalance protocol (for consumers) more reliable, more > scalable, easier to implement for clients, and easier to debug for > operators. > > The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D. > > Please take a look and let me know what you think. > > Best, > David > > PS: I will be away from July 18th to August 8th. That gives you a bit > of time to read and digest this long KIP. >
Re: [DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi David, Thanks for sharing this KIP! Really exciting to hear how we are changing the protocol! The motivation section really made me realize how useful this change will be. I've done a first pass of the KIP, and may have more questions, but thought I'd start with a few I thought of already. - I saw some usages of topic IDs in the new protocols/records/interfaces, but wasn't sure if they were used everywhere. Are you planning on relying on topic IDs for the new protocol? - I saw the section about using a feature flag first before integrating the feature with ibp/metadata version. I understand the logic for testing with the flag, but it also seems like a bit of work to deprecate and switch to the ibp/metadata version approach. What was the reasoning behind switching the enablement mechanism? - Generally, are there implications for KRaft here? (IBP/metadata version is something that I think of) And if so, will both cluster types be supported? Thanks again to everyone who worked on this KIP! Justine On Wed, Jul 6, 2022 at 1:45 AM David Jacot wrote: > Hi all, > > I would like to start a discussion thread on KIP-848: The Next > Generation of the Consumer Rebalance Protocol. With this KIP, we aim > to make the rebalance protocol (for consumers) more reliable, more > scalable, easier to implement for clients, and easier to debug for > operators. > > The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D. > > Please take a look and let me know what you think. > > Best, > David > > PS: I will be away from July 18th to August 8th. That gives you a bit > of time to read and digest this long KIP. >
[DISCUSS] KIP-848: The Next Generation of the Consumer Rebalance Protocol
Hi all, I would like to start a discussion thread on KIP-848: The Next Generation of the Consumer Rebalance Protocol. With this KIP, we aim to make the rebalance protocol (for consumers) more reliable, more scalable, easier to implement for clients, and easier to debug for operators. The KIP is here: https://cwiki.apache.org/confluence/x/HhD1D. Please take a look and let me know what you think. Best, David PS: I will be away from July 18th to August 8th. That gives you a bit of time to read and digest this long KIP.