Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2024-02-13 Thread via GitHub


dajac closed pull request #14673: KAFKA-15763; Group Coordinator should not 
deliver new assignment before previous one is acknowledged
URL: https://github.com/apache/kafka/pull/14673


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381143945


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE
+ * state if it is fully reconciled.
  *
- * - ASSIGNING:
- *   This state means that the member waits on partitions which are still 
owned by other members in the
- *   group. It remains in this state until they are all freed up.
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
  *
- * - STABLE:
- *   This state means that the member has received all its assigned partitions.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
+ *
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS
+ * When the assignment is acknowledged, the member transitions to the
+ * UNRELEASED_PARTITIONS if newly assigned partitions are not available 
yet.
+ *
+ * - UNRELEASED_PARTITIONS:
+ *   The member's reconciliation cannot progress because newly assigned 
partitions
+ *   are still owned by other members in the group. They are not released yet.
  *
- * The reconciliation process is started or re-started whenever a new target 
assignment is installed;
- * the epoch of the new target assignment is different from the next epoch of 
the member. In this transient
- * state, the assigned partitions, the partitions pending revocation and the 
partitions pending assignment
- * are updated. If the partitions pending revocation is not empty, the state 
machine transitions to
- * REVOKING; if partitions pending assignment is not empty, it transitions to 
ASSIGNING; otherwise it
- * transitions to STABLE.
+ *   Valid Transitions:
+ *   - UNRELEASED_PARTITIONS -> STABLE
+ * The member may transition to the STABLE state if the partitions 

Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.

Review Comment:
   small question for my understanding, why are we just waiting for  _at least_ 
one of them to be released?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE

Review Comment:
   Small question for my understanding, is the ack sent by the client once the 
new target assignment is received/reconciliation starts or once the 
reconciliation is complete? Or once partial reconciliation is complete without 
the revoked partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381127886


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -251,144 +260,52 @@ private ConsumerGroupMember 
transitionToNewTargetAssignmentState() {
 }
 
 if (!newPartitionsPendingRevocation.isEmpty()) {
-// If the partition pending revocation set is not empty, we 
transition the
-// member to revoking and keep the current epoch. The transition 
to the new
-// state is done when the member is updated.
+// If there are partitions to be revoked, the member remains in 
its current
+// epoch and requests the revocation of those partitions. It 
transitions to
+// the UNACKNOWLEDGED_ASSIGNMENT state to wait until the new 
assignment is
+// acknowledged.
 return new ConsumerGroupMember.Builder(member)
+
.setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT)
 .setAssignedPartitions(newAssignedPartitions)
-.setPartitionsPendingRevocation(newPartitionsPendingRevocation)
-.setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-.setTargetMemberEpoch(targetAssignmentEpoch)
+.setRevokedPartitions(newPartitionsPendingRevocation)
 .build();
-} else {
-if (!newPartitionsPendingAssignment.isEmpty()) {
-// If the partitions pending assignment set is not empty, we 
check
-// if some or all partitions are free to use. If they are, we 
move
-// them to the partitions assigned set.
-maybeAssignPendingPartitions(newAssignedPartitions, 
newPartitionsPendingAssignment);
-}
-
-// We transition to the target epoch. If the partitions pending 
assignment
-// set is empty, the member transition to stable, otherwise to 
assigning.
-// The transition to the new state is done when the member is 
updated.
+} else if (!newPartitionsPendingAssignment.isEmpty()) {
+// If there are partitions to be assigned, the member transitions 
to the
+// target epoch and requests the assignment of those partitions. 
Note that
+// the partitions are directly added to the assigned partitions 
set. The
+// member transitions to the UNACKNOWLEDGED_ASSIGNMENT state to 
wait until
+// the new assignment is acknowledged.
+newPartitionsPendingAssignment.forEach((topicId, partitions) -> {
+newAssignedPartitions
+.computeIfAbsent(topicId, __ -> new HashSet<>())
+.addAll(partitions);
+});
 return new ConsumerGroupMember.Builder(member)
+
.setState(ConsumerGroupMember.MemberState.UNACKNOWLEDGED_ASSIGNMENT)
+.updateMemberEpoch(targetAssignmentEpoch)
 .setAssignedPartitions(newAssignedPartitions)
-.setPartitionsPendingRevocation(Collections.emptyMap())
-.setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-.setPreviousMemberEpoch(member.memberEpoch())
-.setMemberEpoch(targetAssignmentEpoch)
-.setTargetMemberEpoch(targetAssignmentEpoch)
+.setRevokedPartitions(Collections.emptyMap())
 .build();
-}
-}
-
-/**
- * Tries to transition from Revoke to Assigning or Stable. This is only
- * possible when the member acknowledges that it only owns the partition
- * in the assigned partitions.
- *
- * @return A new ConsumerGroupMember with the new state or the current one
- * if the member stays in the current state.
- */
-private ConsumerGroupMember 
maybeTransitionFromRevokingToAssigningOrStable() {
-if (member.partitionsPendingRevocation().isEmpty() || 
matchesAssignedPartitions(ownedTopicPartitions)) {
-Map> newAssignedPartitions = 
deepCopy(member.assignedPartitions());
-Map> newPartitionsPendingAssignment = 
deepCopy(member.partitionsPendingAssignment());
-
-if (!newPartitionsPendingAssignment.isEmpty()) {
-// If the partitions pending assignment set is not empty, we 
check
-// if some or all partitions are free to use. If they are, we 
move
-// them to the assigned set.
-maybeAssignPendingPartitions(newAssignedPartitions, 
newPartitionsPendingAssignment);
-}
-
-// We transition to the target epoch. If the partitions pending 
assignment
-// set is empty, the member transition to stable, otherwise to 
assigning.
-// The transition to the new state is done when the member is 
updated.
+  

Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381123599


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS

Review Comment:
   I'm confused about this as well, I would think it would go from stable -> 
unack (when new assignment computed) -> unreleased (if there's unreleased 
partitions). If it's possible to ack the assignment even when there's 
unreleased partitions, we're transitioning from unack to unreleased right so 
how does it get to stable with unreleased partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE

Review Comment:
   Small question for my understanding, is the ack sent by the client once the 
new target assignment is received/reconciliation starts or once the 
reconciliation is complete? Or once partial reconciliation is complete without 
the revoked partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381125394


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE

Review Comment:
   Small question for my understanding, is the ack sent once the assignment is 
received or once the reconciliation is complete? Or once partial reconciliation 
is complete without the revoked partitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381123599


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS

Review Comment:
   I'm confused about this as well, I would think it would go from stable -> 
unack (when new assignment computed) -> unreleased (if there's unreleased 
partitions). If it's possible to ack the assignment even when there's 
unreleased partitions, we're transitioning from unack to unreleased right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.

Review Comment:
   small question for my understanding, why are we just waiting for  _at least_ 
one of them to be released?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381118918


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.

Review Comment:
   Do we want to transition to unacknowledged assignment every time a new 
assignment is computed and sent? I'm confused about the "when a new target 
assignment is installed" prerequisite 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381121312


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.

Review Comment:
   small question for my understanding, why are we waiting for just _at least_ 
one of them to be released?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380922729


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member

Review Comment:
   Just for my understanding, when we say new target assignment installed do we 
mean the assignment reconciliation on the client aka assignment installed on 
the client? 
   Also does installed mean that the assignment is already acknowledged ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1381118918


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.

Review Comment:
   Do we want to transition to unacknowledged assignment every time a new 
assignment is computed and sent? I'm confused about the necessity of the "when 
a new target assignment is installed" prerequisite 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954663


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE
+ * state if it is fully reconciled.
  *
- * - ASSIGNING:
- *   This state means that the member waits on partitions which are still 
owned by other members in the
- *   group. It remains in this state until they are all freed up.
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
  *
- * - STABLE:
- *   This state means that the member has received all its assigned partitions.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
+ *
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS
+ * When the assignment is acknowledged, the member transitions to the
+ * UNRELEASED_PARTITIONS if newly assigned partitions are not available 
yet.
+ *
+ * - UNRELEASED_PARTITIONS:
+ *   The member's reconciliation cannot progress because newly assigned 
partitions
+ *   are still owned by other members in the group. They are not released yet.
  *
- * The reconciliation process is started or re-started whenever a new target 
assignment is installed;
- * the epoch of the new target assignment is different from the next epoch of 
the member. In this transient
- * state, the assigned partitions, the partitions pending revocation and the 
partitions pending assignment
- * are updated. If the partitions pending revocation is not empty, the state 
machine transitions to
- * REVOKING; if partitions pending assignment is not empty, it transitions to 
ASSIGNING; otherwise it
- * transitions to STABLE.
+ *   Valid Transitions:
+ *   - UNRELEASED_PARTITIONS -> STABLE
+ * The member may transition to the STABLE state if the partitions 

Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954345


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE
+ * state if it is fully reconciled.
  *
- * - ASSIGNING:
- *   This state means that the member waits on partitions which are still 
owned by other members in the
- *   group. It remains in this state until they are all freed up.
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
  *
- * - STABLE:
- *   This state means that the member has received all its assigned partitions.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
+ *
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNRELEASED_PARTITIONS
+ * When the assignment is acknowledged, the member transitions to the
+ * UNRELEASED_PARTITIONS if newly assigned partitions are not available 
yet.
+ *
+ * - UNRELEASED_PARTITIONS:
+ *   The member's reconciliation cannot progress because newly assigned 
partitions
+ *   are still owned by other members in the group. They are not released yet.
  *
- * The reconciliation process is started or re-started whenever a new target 
assignment is installed;
- * the epoch of the new target assignment is different from the next epoch of 
the member. In this transient
- * state, the assigned partitions, the partitions pending revocation and the 
partitions pending assignment
- * are updated. If the partitions pending revocation is not empty, the state 
machine transitions to
- * REVOKING; if partitions pending assignment is not empty, it transitions to 
ASSIGNING; otherwise it
- * transitions to STABLE.
+ *   Valid Transitions:
+ *   - UNRELEASED_PARTITIONS -> STABLE
+ * The member may transition to the STABLE state if the partitions 

Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380954188


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS

Review Comment:
   For this transition, I'm confused which one is the default first transition. 
Ie, if the client has not yet acknowledged AND we have unreleased partitions 
which one do we go to first? And is going between these two both ways only 
possible if a new assignment occurs in the middle? I'm struggling to understand 
how all transitions are valid  



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS

Review Comment:
   For this transition, I'm confused which one is the default first transition. 
Ie, if the client has not yet acknowledged AND we have unreleased partitions 
which one do we go to first? And is going between these two both ways only 
possible if a new assignment occurs in the middle? I'm struggling to understand 
how all transitions are valid  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380949384


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member

Review Comment:
   I think this is the case where we get a new assignment server side, but 
nothing changed for the member?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380943451


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
- * @param groupId   The group id.
- * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param groupId   The group id.
+ * @param expectedMemberThe expected member.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
-String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+ConsumerGroupMember expectedMember
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, 
expectedMember.memberId());
+timer.schedule(key, expectedMember.rebalanceTimeoutMs(), 
TimeUnit.MILLISECONDS, true, () -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
-ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(expectedMember.memberId(), false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (!member.assignmentEquals(expectedMember)) {

Review Comment:
   I also wasn't sure about the states being the same. Is it possible when we 
are unstable we will be not equal to the expected member? Or am I not 
understanding when we transition to stable correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380936875


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -301,11 +322,9 @@ public String toString() {
 private final int previousMemberEpoch;
 
 /**
- * The next member epoch. This corresponds to the target
- * assignment epoch used to compute the current assigned,
- * revoking and assigning partitions.
+ * The member state.
  */
-private final int targetMemberEpoch;
+private final MemberState state;

Review Comment:
   Is the idea with the new code that we just want to match the desired state 
(ie, add the assigned and remove the revoked)
   
   If we change the assignment then we will just bump the epoch and fence older 
ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380929415


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
- * @param groupId   The group id.
- * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param groupId   The group id.
+ * @param expectedMemberThe expected member.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
-String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+ConsumerGroupMember expectedMember
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, 
expectedMember.memberId());
+timer.schedule(key, expectedMember.rebalanceTimeoutMs(), 
TimeUnit.MILLISECONDS, true, () -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
-ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(expectedMember.memberId(), false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (!member.assignmentEquals(expectedMember)) {

Review Comment:
   So this works because we removed the pending assignment fields from the 
equality check? In other words, this should only be false when the fields that 
change indicate the assignment is no longer valid and not that the assignment 
just failed to complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923649


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed

Review Comment:
   Small question, irrespective of anything else, whenever there's a new 
assignment sent to the member we transition to stable, is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923649


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed

Review Comment:
   Small question, irrespective of anything else, whenever there's a new 
assignment sent to the member we transition to stable, is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380923297


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1094,58 +1087,54 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
- * @param groupId   The group id.

Review Comment:
   should we have removed this? It is still a param below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380922729


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member

Review Comment:
   Just for my understanding, when we say new target assignment installed do we 
mean the assignment reconciliation on the client aka assignment installed on 
the client? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380913612


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -934,32 +934,25 @@ private 
CoordinatorResult consumerGr
 // 3. Reconcile the member's assignment with the target assignment. 
This is only required if
 // the member is not stable or if a new target assignment has been 
installed.
 boolean assignmentUpdated = false;
-if (updatedMember.state() != ConsumerGroupMember.MemberState.STABLE || 
updatedMember.targetMemberEpoch() != targetAssignmentEpoch) {
+if (!updatedMember.isReconciledTo(targetAssignmentEpoch)) {
 ConsumerGroupMember prevMember = updatedMember;
 updatedMember = new CurrentAssignmentBuilder(updatedMember)
 .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
 .withCurrentPartitionEpoch(group::currentPartitionEpoch)
 .withOwnedTopicPartitions(ownedTopicPartitions)
 .build();
 
-// Checking the reference is enough here because a new instance
-// is created only when the state has changed.
-if (updatedMember != prevMember) {

Review Comment:
   Is this due to the change with assignments or just a cleanup we wanted to do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


jolshan commented on PR #14673:
URL: https://github.com/apache/kafka/pull/14673#issuecomment-1791722750

   > When partitions are revoked, the state machine cannot advanced until the 
consumer acknowledges the revocation. However, when partitions are assigned, 
the state machine does not wait on receiving an acknowledgement at all. This 
means that the next assignment can be delivered anytime.
   
   Just for my understanding, we decided that the assignment should be the same 
as revocation -- that we must acknowledge before taking on anything new. Is 
this because the other way (advancing the state machine on revocations) was not 
possible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380879868


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:

Review Comment:
   nit: Could we also include a list of all the valid states in the beginning, 
might be easier to follow the total states



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-02 Thread via GitHub


rreddy-22 commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1380878154


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.

Review Comment:
   nit: could we change all the he's to it maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-11-01 Thread via GitHub


jeffkbkim commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1379393118


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -537,22 +533,34 @@ public Map> 
partitionsPendingAssignment() {
 public String currentAssignmentSummary() {
 return "CurrentAssignment(memberEpoch=" + memberEpoch +
 ", previousMemberEpoch=" + previousMemberEpoch +
-", targetMemberEpoch=" + targetMemberEpoch +
 ", state=" + state +
 ", assignedPartitions=" + assignedPartitions +
-", partitionsPendingRevocation=" + partitionsPendingRevocation +
-", partitionsPendingAssignment=" + partitionsPendingAssignment +
+", revokedPartitions=" + revokedPartitions +
 ')';
 }
 
+/**
+ * @return True if the assignment of this member is equals to the 
assignment

Review Comment:
   nit: is equal to



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -511,24 +507,24 @@ public MemberState state() {
 }
 
 /**
- * @return The set of assigned partitions.
+ * @return True of the member is in the Stable state and at the desired 
epoch.

Review Comment:
   nit: True if



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:
+ * - STABLE:
+ *   The member is fully reconciled to the desired target assignment.
  *
- * - Next Epoch:
- *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
- *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
- *   or if it does not have to revoke any.
+ *   Valid Transitions:
+ *   - STABLE -> STABLE
+ * When a new target assignment is installed but the assignment of the 
member
+ * has not changed, the member transitions to the next epoch and remains in
+ * the STABLE state.
  *
- * - Previous Epoch:
- *   The epoch of the member when the state was last updated.
+ *   - STABLE -> UNACKNOWLEDGED_ASSIGNMENT
+ * When a new target assignment is installed and a new assignment is 
computed
+ * for the member, he transitions to the UNACKNOWLEDGED_ASSIGNMENT state.
  *
- * - Assigned Partitions:
- *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ * If the next assignment contains partitions to be revoked, the member 
stays
+ * in his current epoch. Otherwise, he transitions to the target epoch.
  *
- * - Partitions Pending Revocation:
- *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *   - STABLE -> UNRELEASED_PARTITIONS
+ * When a new target assignment is installed and all the newly assigned 
partitions
+ * are not available yet, he transitions to the UNRELEASED_PARTITIONS state
+ * and waits until at least one of them is available.
  *
- * - Partitions Pending Assignment:
- *   The set of partitions that the member will eventually receive. The 
partitions in this set are
- *   still owned by other members in the group.
+ * - UNACKNOWLEDGED_ASSIGNMENT:
+ *   The member has received a new assignment from the group coordinator but
+ *   he has not acknowledged it yet. The member is removed from the group if
+ *   he does not acknowledge it within the rebalance timeout.
  *
- * The state machine has three states:
- * - REVOKING:
- *   This state means that the member must revoke partitions before it can 
transition to the next epoch
- *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
- *   are committed with the current epoch. The member transitions to the next 
state only when it has
- *   acknowledged the revocation.
+ *   Valid Transitions:
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> STABLE
+ * When the assignment is acknowledged, the member transitions to the 
STABLE
+ * state if it is fully reconciled.
  *
- * - ASSIGNING:
- *   This state means that the member waits on partitions which are still 
owned by other members in the
- *   group. It remains in this state until they are all freed up.
+ *   - UNACKNOWLEDGED_ASSIGNMENT -> UNACKNOWLEDGED_ASSIGNMENT
+ * When the assignment is acknowledged, the member remains in the
+ * UNACKNOWLEDGED_ASSIGNMENT state if a new assignment is computed.
  *
- * - STABLE:
- *   This state means that the member has received all its assigned partitions.
+ * If the next 

Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-10-31 Thread via GitHub


dajac commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1377310585


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -19,16 +19,9 @@
 import org.apache.kafka.common.Uuid;

Review Comment:
   All the tests have been fully replaced in this file so we can just ignore 
the diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-10-31 Thread via GitHub


dajac commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1377309079


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -34,48 +33,68 @@
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
  *
- * The member state has the following properties:
- * - Current Epoch:
- *   The current epoch of the member.
+ * State Machine:

Review Comment:
   The core of the change is in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-10-31 Thread via GitHub


dajac commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1377308235


##
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json:
##
@@ -24,20 +24,12 @@
   "about": "The current member epoch that is expected from the member in 
the heartbeat request." },
 { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
   "about": "If the last epoch bump is lost before reaching the member, the 
member will retry with the previous epoch." },
-{ "name": "TargetMemberEpoch", "versions": "0+", "type": "int32",
-  "about": "The target epoch corresponding to the assignment used to 
compute the AssignedPartitions, the PartitionsPendingRevocation and the 
PartitionsPendingAssignment fields." },
+{ "name": "State", "versions": "0+", "type": "int8",
+  "about": "The member state. See ConsumerGroupMember.MemberState for the 
possible values." },
 { "name": "AssignedPartitions", "versions": "0+", "type": 
"[]TopicPartitions",
   "about": "The partitions assigned to (or owned by) this member." },
-{ "name": "PartitionsPendingRevocation", "versions": "0+", "type": 
"[]TopicPartitions",
-  "about": "The partitions that must be revoked by this member." },
-{ "name": "PartitionsPendingAssignment", "versions": "0+", "type": 
"[]TopicPartitions",
-  "about": "The partitions that will be assigned to this member when they 
are freed up by their current owners." },
-{ "name": "Error", "versions": "0+", "type": "int8",
-  "about": "The error reported by the assignor." },
-{ "name": "MetadataVersion", "versions": "0+", "type": "int16",
-  "about": "The version of the metadata bytes." },
-{ "name": "MetadataBytes", "versions": "0+", "type": "bytes",
-  "about": "The metadata bytes." }

Review Comment:
   I also removed those because we don't use them at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-10-31 Thread via GitHub


dajac commented on code in PR #14673:
URL: https://github.com/apache/kafka/pull/14673#discussion_r1377307212


##
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json:
##
@@ -24,20 +24,12 @@
   "about": "The current member epoch that is expected from the member in 
the heartbeat request." },
 { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
   "about": "If the last epoch bump is lost before reaching the member, the 
member will retry with the previous epoch." },
-{ "name": "TargetMemberEpoch", "versions": "0+", "type": "int32",
-  "about": "The target epoch corresponding to the assignment used to 
compute the AssignedPartitions, the PartitionsPendingRevocation and the 
PartitionsPendingAssignment fields." },
+{ "name": "State", "versions": "0+", "type": "int8",
+  "about": "The member state. See ConsumerGroupMember.MemberState for the 
possible values." },

Review Comment:
   The main change here is that we introduce an explicit state in the record.



##
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json:
##
@@ -24,20 +24,12 @@
   "about": "The current member epoch that is expected from the member in 
the heartbeat request." },
 { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
   "about": "If the last epoch bump is lost before reaching the member, the 
member will retry with the previous epoch." },
-{ "name": "TargetMemberEpoch", "versions": "0+", "type": "int32",
-  "about": "The target epoch corresponding to the assignment used to 
compute the AssignedPartitions, the PartitionsPendingRevocation and the 
PartitionsPendingAssignment fields." },
+{ "name": "State", "versions": "0+", "type": "int8",
+  "about": "The member state. See ConsumerGroupMember.MemberState for the 
possible values." },
 { "name": "AssignedPartitions", "versions": "0+", "type": 
"[]TopicPartitions",
   "about": "The partitions assigned to (or owned by) this member." },
-{ "name": "PartitionsPendingRevocation", "versions": "0+", "type": 
"[]TopicPartitions",
-  "about": "The partitions that must be revoked by this member." },
-{ "name": "PartitionsPendingAssignment", "versions": "0+", "type": 
"[]TopicPartitions",
-  "about": "The partitions that will be assigned to this member when they 
are freed up by their current owners." },

Review Comment:
   This is not required anymore with the new implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15763; Group Coordinator should not deliver new assignment before previous one is acknowledged [kafka]

2023-10-31 Thread via GitHub


dajac opened a new pull request, #14673:
URL: https://github.com/apache/kafka/pull/14673

   The current implementation of the reconciliation state machine on the server 
side is asymmetric when it comes to how it treats the revocation or the 
assignment of partitions. When partitions are revoked, the state machine cannot 
advanced until the consumer acknowledges the revocation. However, when 
partitions are assigned, the state machine does not wait on receiving an 
acknowledgement at all. This means that the next assignment can be delivered 
anytime.
   
   While working on the client side state machine, this asymmetric handling 
caused a lot of confusion and made the client side a lot more complex than we 
imagined. The complexity came from the need to buffer all the new assignments 
received while one assignment is being reconciled. Our goal is to simplify the 
consumer with the new protocol so this was going in the opposite direction.
   
   This patch changes the server side state machine to always wait on a new 
assignment to be acknowledge before delivering a new assignment regardless from 
whether partitions are revoked, assigned or both.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org