Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
dajac closed pull request #14673: KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged URL: https://github.com/apache/kafka/pull/14673 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381143945 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE + * state if it is fully reconciled. * - * - ASSIGNING: - * This state means that the member waits on partitions which are still owned by other members in the - * group. It remains in this state until they are all freed up. + * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT + * When the assignment is acknowledged, the member remains in the + * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed. * - * - STABLE: - * This state means that the member has received all its assigned partitions. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. + * + * - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS + * When the assignment is acknowledged, the member transitions to the + * UNRELEASED_PARTITIONS if newly assigned partitions are not available yet. + * + * - UNRELEASED_PARTITIONS: + * The member's reconciliation cannot progress because newly assigned partitions + * are still owned by other members in the group. They are not released yet. * - * The reconciliation process is started or re-started whenever a new target assignment is installed; - * the epoch of the new target assignment is different from the next epoch of the member. In this transient - * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment - * are updated. If the partitions pending revocation is not empty, the state machine transitions to - * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it - * transitions to STABLE. + * Valid Transitions: + * - UNRELEASED_PARTITIONS -> STABLE + * The member may transition to the STABLE state if the partitions
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. Review Comment: small question for my understanding, why are we just waiting for _at least_ one of them to be released? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE Review Comment: Small question for my understanding, is the ack sent by the client once the new target assignment is received/reconciliation starts or once the reconciliation is complete? Or once partial reconciliation is complete without the revoked partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381127886 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -251,144 +260,52 @@ private ConsumerGroupMember transitionToNewTargetAssignmentState() { } if (!newPartitionsPendingRevocation.isEmpty()) { -// If the partition pending revocation set is not empty, we transition the -// member to revoking and keep the current epoch. The transition to the new -// state is done when the member is updated. +// If there are partitions to be revoked, the member remains in its current +// epoch and requests the revocation of those partitions. It transitions to +// the UNACKNOWLEDGED_ASSIGNMENT state to wait until the new assignment is +// acknowledged. return new ConsumerGroupMember.Builder(member) + .setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT) .setAssignedPartitions(newAssignedPartitions) -.setPartitionsPendingRevocation(newPartitionsPendingRevocation) -.setPartitionsPendingAssignment(newPartitionsPendingAssignment) -.setTargetMemberEpoch(targetAssignmentEpoch) +.setRevokedPartitions(newPartitionsPendingRevocation) .build(); -} else { -if (!newPartitionsPendingAssignment.isEmpty()) { -// If the partitions pending assignment set is not empty, we check -// if some or all partitions are free to use. If they are, we move -// them to the partitions assigned set. -maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); -} - -// We transition to the target epoch. If the partitions pending assignment -// set is empty, the member transition to stable, otherwise to assigning. -// The transition to the new state is done when the member is updated. +} else if (!newPartitionsPendingAssignment.isEmpty()) { +// If there are partitions to be assigned, the member transitions to the +// target epoch and requests the assignment of those partitions. Note that +// the partitions are directly added to the assigned partitions set. The +// member transitions to the UNACKNOWLEDGED_ASSIGNMENT state to wait until +// the new assignment is acknowledged. +newPartitionsPendingAssignment.forEach((topicId, partitions) -> { +newAssignedPartitions +.computeIfAbsent(topicId, __ -> new HashSet<>()) +.addAll(partitions); +}); return new ConsumerGroupMember.Builder(member) + .setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT) +.updateMemberEpoch(targetAssignmentEpoch) .setAssignedPartitions(newAssignedPartitions) -.setPartitionsPendingRevocation(Collections.emptyMap()) -.setPartitionsPendingAssignment(newPartitionsPendingAssignment) -.setPreviousMemberEpoch(member.memberEpoch()) -.setMemberEpoch(targetAssignmentEpoch) -.setTargetMemberEpoch(targetAssignmentEpoch) +.setRevokedPartitions(Collections.emptyMap()) .build(); -} -} - -/** - * Tries to transition from Revoke to Assigning or Stable. This is only - * possible when the member acknowledges that it only owns the partition - * in the assigned partitions. - * - * @return A new ConsumerGroupMember with the new state or the current one - * if the member stays in the current state. - */ -private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() { -if (member.partitionsPendingRevocation().isEmpty() || matchesAssignedPartitions(ownedTopicPartitions)) { -Map> newAssignedPartitions = deepCopy(member.assignedPartitions()); -Map> newPartitionsPendingAssignment = deepCopy(member.partitionsPendingAssignment()); - -if (!newPartitionsPendingAssignment.isEmpty()) { -// If the partitions pending assignment set is not empty, we check -// if some or all partitions are free to use. If they are, we move -// them to the assigned set. -maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); -} - -// We transition to the target epoch. If the partitions pending assignment -// set is empty, the member transition to stable, otherwise to assigning. -// The transition to the new state is done when the member is updated. +
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381123599 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS Review Comment: I'm confused about this as well, I would think it would go from stable -> unack (when new assignment computed) -> unreleased (if there's unreleased partitions). If it's possible to ack the assignment even when there's unreleased partitions, we're transitioning from unack to unreleased right so how does it get to stable with unreleased partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE Review Comment: Small question for my understanding, is the ack sent by the client once the new target assignment is received/reconciliation starts or once the reconciliation is complete? Or once partial reconciliation is complete without the revoked partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE Review Comment: Small question for my understanding, is the ack sent once the assignment is received or once the reconciliation is complete? Or once partial reconciliation is complete without the revoked partitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381123599 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS Review Comment: I'm confused about this as well, I would think it would go from stable -> unack (when new assignment computed) -> unreleased (if there's unreleased partitions). If it's possible to ack the assignment even when there's unreleased partitions, we're transitioning from unack to unreleased right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. Review Comment: small question for my understanding, why are we just waiting for _at least_ one of them to be released? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381118918 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. Review Comment: Do we want to transition to unacknowledged assignment every time a new assignment is computed and sent? I'm confused about the "when a new target assignment is installed" prerequisite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. Review Comment: small question for my understanding, why are we waiting for just _at least_ one of them to be released? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380922729 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member Review Comment: Just for my understanding, when we say new target assignment installed do we mean the assignment reconciliation on the client aka assignment installed on the client? Also does installed mean that the assignment is already acknowledged ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1381118918 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. Review Comment: Do we want to transition to unacknowledged assignment every time a new assignment is computed and sent? I'm confused about the necessity of the "when a new target assignment is installed" prerequisite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954663 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE + * state if it is fully reconciled. * - * - ASSIGNING: - * This state means that the member waits on partitions which are still owned by other members in the - * group. It remains in this state until they are all freed up. + * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT + * When the assignment is acknowledged, the member remains in the + * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed. * - * - STABLE: - * This state means that the member has received all its assigned partitions. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. + * + * - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS + * When the assignment is acknowledged, the member transitions to the + * UNRELEASED_PARTITIONS if newly assigned partitions are not available yet. + * + * - UNRELEASED_PARTITIONS: + * The member's reconciliation cannot progress because newly assigned partitions + * are still owned by other members in the group. They are not released yet. * - * The reconciliation process is started or re-started whenever a new target assignment is installed; - * the epoch of the new target assignment is different from the next epoch of the member. In this transient - * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment - * are updated. If the partitions pending revocation is not empty, the state machine transitions to - * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it - * transitions to STABLE. + * Valid Transitions: + * - UNRELEASED_PARTITIONS -> STABLE + * The member may transition to the STABLE state if the partitions
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954345 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE + * state if it is fully reconciled. * - * - ASSIGNING: - * This state means that the member waits on partitions which are still owned by other members in the - * group. It remains in this state until they are all freed up. + * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT + * When the assignment is acknowledged, the member remains in the + * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed. * - * - STABLE: - * This state means that the member has received all its assigned partitions. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. + * + * - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS + * When the assignment is acknowledged, the member transitions to the + * UNRELEASED_PARTITIONS if newly assigned partitions are not available yet. + * + * - UNRELEASED_PARTITIONS: + * The member's reconciliation cannot progress because newly assigned partitions + * are still owned by other members in the group. They are not released yet. * - * The reconciliation process is started or re-started whenever a new target assignment is installed; - * the epoch of the new target assignment is different from the next epoch of the member. In this transient - * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment - * are updated. If the partitions pending revocation is not empty, the state machine transitions to - * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it - * transitions to STABLE. + * Valid Transitions: + * - UNRELEASED_PARTITIONS -> STABLE + * The member may transition to the STABLE state if the partitions
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954188 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS Review Comment: For this transition, I'm confused which one is the default first transition. Ie, if the client has not yet acknowledged AND we have unreleased partitions which one do we go to first? And is going between these two both ways only possible if a new assignment occurs in the middle? I'm struggling to understand how all transitions are valid ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS Review Comment: For this transition, I'm confused which one is the default first transition. Ie, if the client has not yet acknowledged AND we have unreleased partitions which one do we go to first? And is going between these two both ways only possible if a new assignment occurs in the middle? I'm struggling to understand how all transitions are valid -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380949384 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member Review Comment: I think this is the case where we get a new assignment server side, but nothing changed for the member? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380943451 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * - * @param groupId The group id. - * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param groupId The group id. + * @param expectedMemberThe expected member. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, -String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +ConsumerGroupMember expectedMember ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, expectedMember.memberId()); +timer.schedule(key, expectedMember.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); -ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(expectedMember.memberId(), false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (!member.assignmentEquals(expectedMember)) { Review Comment: I also wasn't sure about the states being the same. Is it possible when we are unstable we will be not equal to the expected member? Or am I not understanding when we transition to stable correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380936875 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -301,11 +322,9 @@ public String toString() { private final int previousMemberEpoch; /** - * The next member epoch. This corresponds to the target - * assignment epoch used to compute the current assigned, - * revoking and assigning partitions. + * The member state. */ -private final int targetMemberEpoch; +private final MemberState state; Review Comment: Is the idea with the new code that we just want to match the desired state (ie, add the assigned and remove the revoked) If we change the assignment then we will just bump the epoch and fence older ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380929415 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * - * @param groupId The group id. - * @param memberId The member id. - * @param revocationTimeoutMs The revocation timeout. - * @param expectedMemberEpoch The expected member epoch. + * @param groupId The group id. + * @param expectedMemberThe expected member. */ -private void scheduleConsumerGroupRevocationTimeout( +private void scheduleConsumerGroupRebalanceTimeout( String groupId, -String memberId, -long revocationTimeoutMs, -int expectedMemberEpoch +ConsumerGroupMember expectedMember ) { -String key = consumerGroupRevocationTimeoutKey(groupId, memberId); -timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +String key = consumerGroupRebalanceTimeoutKey(groupId, expectedMember.memberId()); +timer.schedule(key, expectedMember.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, true, () -> { try { ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); -ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(expectedMember.memberId(), false); -if (member.state() != ConsumerGroupMember.MemberState.REVOKING || -member.memberEpoch() != expectedMemberEpoch) { -log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " + -"state does not match the expected state.", groupId, memberId); +if (!member.assignmentEquals(expectedMember)) { Review Comment: So this works because we removed the pending assignment fields from the equality check? In other words, this should only be false when the fields that change indicate the assignment is no longer valid and not that the assignment just failed to complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923649 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed Review Comment: Small question, irrespective of anything else, whenever there's a new assignment sent to the member we transition to stable, is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923649 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed Review Comment: Small question, irrespective of anything else, whenever there's a new assignment sent to the member we transition to stable, is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923297 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout( } /** - * Schedules a revocation timeout for the member. + * Schedules a rebalance timeout for the member. * - * @param groupId The group id. Review Comment: should we have removed this? It is still a param below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380922729 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member Review Comment: Just for my understanding, when we say new target assignment installed do we mean the assignment reconciliation on the client aka assignment installed on the client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380913612 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -934,32 +934,25 @@ private CoordinatorResult consumerGr // 3. Reconcile the member's assignment with the target assignment. This is only required if // the member is not stable or if a new target assignment has been installed. boolean assignmentUpdated = false; -if (updatedMember.state() != ConsumerGroupMember.MemberState.STABLE || updatedMember.targetMemberEpoch() != targetAssignmentEpoch) { +if (!updatedMember.isReconciledTo(targetAssignmentEpoch)) { ConsumerGroupMember prevMember = updatedMember; updatedMember = new CurrentAssignmentBuilder(updatedMember) .withTargetAssignment(targetAssignmentEpoch, targetAssignment) .withCurrentPartitionEpoch(group::currentPartitionEpoch) .withOwnedTopicPartitions(ownedTopicPartitions) .build(); -// Checking the reference is enough here because a new instance -// is created only when the state has changed. -if (updatedMember != prevMember) { Review Comment: Is this due to the change with assignments or just a cleanup we wanted to do? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jolshan commented on PR #14673: URL: https://github.com/apache/kafka/pull/14673#issuecomment-1791722750 > When partitions are revoked, the state machine cannot advanced until the consumer acknowledges the revocation. However, when partitions are assigned, the state machine does not wait on receiving an acknowledgement at all. This means that the next assignment can be delivered anytime. Just for my understanding, we decided that the assignment should be the same as revocation -- that we must acknowledge before taking on anything new. Is this because the other way (advancing the state machine on revocations) was not possible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380879868 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: Review Comment: nit: Could we also include a list of all the valid states in the beginning, might be easier to follow the total states -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
rreddy-22 commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1380878154 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. Review Comment: nit: could we change all the he's to it maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
jeffkbkim commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1379393118 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -537,22 +533,34 @@ public Map> partitionsPendingAssignment() { public String currentAssignmentSummary() { return "CurrentAssignment(memberEpoch=" + memberEpoch + ", previousMemberEpoch=" + previousMemberEpoch + -", targetMemberEpoch=" + targetMemberEpoch + ", state=" + state + ", assignedPartitions=" + assignedPartitions + -", partitionsPendingRevocation=" + partitionsPendingRevocation + -", partitionsPendingAssignment=" + partitionsPendingAssignment + +", revokedPartitions=" + revokedPartitions + ')'; } +/** + * @return True if the assignment of this member is equals to the assignment Review Comment: nit: is equal to ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -511,24 +507,24 @@ public MemberState state() { } /** - * @return The set of assigned partitions. + * @return True of the member is in the Stable state and at the desired epoch. Review Comment: nit: True if ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: + * - STABLE: + * The member is fully reconciled to the desired target assignment. * - * - Next Epoch: - * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. - * The member transitions to this epoch when it has revoked the partitions that it does not own - * or if it does not have to revoke any. + * Valid Transitions: + * - STABLE -> STABLE + * When a new target assignment is installed but the assignment of the member + * has not changed, the member transitions to the next epoch and remains in + * the STABLE state. * - * - Previous Epoch: - * The epoch of the member when the state was last updated. + * - STABLE -> UNACKNOWLEDGED_ASSIGNMENT + * When a new target assignment is installed and a new assignment is computed + * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state. * - * - Assigned Partitions: - * The set of partitions currently assigned to the member. This represents what the member should have. + * If the next assignment contains partitions to be revoked, the member stays + * in his current epoch. Otherwise, he transitions to the target epoch. * - * - Partitions Pending Revocation: - * The set of partitions that the member should revoke before it can transition to the next state. + * - STABLE -> UNRELEASED_PARTITIONS + * When a new target assignment is installed and all the newly assigned partitions + * are not available yet, he transitions to the UNRELEASED_PARTITIONS state + * and waits until at least one of them is available. * - * - Partitions Pending Assignment: - * The set of partitions that the member will eventually receive. The partitions in this set are - * still owned by other members in the group. + * - UNACKNOWLEDGED_ASSIGNMENT: + * The member has received a new assignment from the group coordinator but + * he has not acknowledged it yet. The member is removed from the group if + * he does not acknowledge it within the rebalance timeout. * - * The state machine has three states: - * - REVOKING: - * This state means that the member must revoke partitions before it can transition to the next epoch - * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions - * are committed with the current epoch. The member transitions to the next state only when it has - * acknowledged the revocation. + * Valid Transitions: + * - UNACKNOWLEDGED_ASSIGNMENT -> STABLE + * When the assignment is acknowledged, the member transitions to the STABLE + * state if it is fully reconciled. * - * - ASSIGNING: - * This state means that the member waits on partitions which are still owned by other members in the - * group. It remains in this state until they are all freed up. + * - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT + * When the assignment is acknowledged, the member remains in the + * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed. * - * - STABLE: - * This state means that the member has received all its assigned partitions. + * If the next
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
dajac commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1377310585 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -19,16 +19,9 @@ import org.apache.kafka.common.Uuid; Review Comment: All the tests have been fully replaced in this file so we can just ignore the diff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
dajac commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1377309079 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -34,48 +33,68 @@ * consumer group protocol. Given the current state of a member and a desired or target * assignment state, the state machine takes the necessary steps to converge them. * - * The member state has the following properties: - * - Current Epoch: - * The current epoch of the member. + * State Machine: Review Comment: The core of the change is in this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
dajac commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1377308235 ## group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json: ## @@ -24,20 +24,12 @@ "about": "The current member epoch that is expected from the member in the heartbeat request." }, { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32", "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." }, -{ "name": "TargetMemberEpoch", "versions": "0+", "type": "int32", - "about": "The target epoch corresponding to the assignment used to compute the AssignedPartitions, the PartitionsPendingRevocation and the PartitionsPendingAssignment fields." }, +{ "name": "State", "versions": "0+", "type": "int8", + "about": "The member state. See ConsumerGroupMember.MemberState for the possible values." }, { "name": "AssignedPartitions", "versions": "0+", "type": "[]TopicPartitions", "about": "The partitions assigned to (or owned by) this member." }, -{ "name": "PartitionsPendingRevocation", "versions": "0+", "type": "[]TopicPartitions", - "about": "The partitions that must be revoked by this member." }, -{ "name": "PartitionsPendingAssignment", "versions": "0+", "type": "[]TopicPartitions", - "about": "The partitions that will be assigned to this member when they are freed up by their current owners." }, -{ "name": "Error", "versions": "0+", "type": "int8", - "about": "The error reported by the assignor." }, -{ "name": "MetadataVersion", "versions": "0+", "type": "int16", - "about": "The version of the metadata bytes." }, -{ "name": "MetadataBytes", "versions": "0+", "type": "bytes", - "about": "The metadata bytes." } Review Comment: I also removed those because we don't use them at the moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
dajac commented on code in PR #14673: URL: https://github.com/apache/kafka/pull/14673#discussion_r1377307212 ## group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json: ## @@ -24,20 +24,12 @@ "about": "The current member epoch that is expected from the member in the heartbeat request." }, { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32", "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." }, -{ "name": "TargetMemberEpoch", "versions": "0+", "type": "int32", - "about": "The target epoch corresponding to the assignment used to compute the AssignedPartitions, the PartitionsPendingRevocation and the PartitionsPendingAssignment fields." }, +{ "name": "State", "versions": "0+", "type": "int8", + "about": "The member state. See ConsumerGroupMember.MemberState for the possible values." }, Review Comment: The main change here is that we introduce an explicit state in the record. ## group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json: ## @@ -24,20 +24,12 @@ "about": "The current member epoch that is expected from the member in the heartbeat request." }, { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32", "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." }, -{ "name": "TargetMemberEpoch", "versions": "0+", "type": "int32", - "about": "The target epoch corresponding to the assignment used to compute the AssignedPartitions, the PartitionsPendingRevocation and the PartitionsPendingAssignment fields." }, +{ "name": "State", "versions": "0+", "type": "int8", + "about": "The member state. See ConsumerGroupMember.MemberState for the possible values." }, { "name": "AssignedPartitions", "versions": "0+", "type": "[]TopicPartitions", "about": "The partitions assigned to (or owned by) this member." }, -{ "name": "PartitionsPendingRevocation", "versions": "0+", "type": "[]TopicPartitions", - "about": "The partitions that must be revoked by this member." }, -{ "name": "PartitionsPendingAssignment", "versions": "0+", "type": "[]TopicPartitions", - "about": "The partitions that will be assigned to this member when they are freed up by their current owners." }, Review Comment: This is not required anymore with the new implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]
dajac opened a new pull request, #14673: URL: https://github.com/apache/kafka/pull/14673 The current implementation of the reconciliation state machine on the server side is asymmetric when it comes to how it treats the revocation or the assignment of partitions. When partitions are revoked, the state machine cannot advanced until the consumer acknowledges the revocation. However, when partitions are assigned, the state machine does not wait on receiving an acknowledgement at all. This means that the next assignment can be delivered anytime. While working on the client side state machine, this asymmetric handling caused a lot of confusion and made the client side a lot more complex than we imagined. The complexity came from the need to buffer all the new assignments received while one assignment is being reconciled. Our goal is to simplify the consumer with the new protocol so this was going in the opposite direction. This patch changes the server side state machine to always wait on a new assignment to be acknowledge before delivering a new assignment regardless from whether partitions are revoked, assigned or both. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org