squah-confluent commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2867839878
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java:
##########
@@ -804,13 +804,20 @@ public static CoordinatorRecord
newShareGroupStatePartitionMetadataRecord(
}
private static
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
toTopicPartitions(
- Map<Uuid, Set<Integer>> topicPartitions
- ) {
- List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topics
= new ArrayList<>(topicPartitions.size());
- topicPartitions.forEach((topicId, partitions) ->
+ Map<Uuid, Map<Integer, Integer>> topicPartitionsWithEpochs
+ ) {
+ List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topics
= new ArrayList<>(topicPartitionsWithEpochs.size());
+ topicPartitionsWithEpochs.forEach((topicId, partitionEpochMap) -> {
+ List<Integer> partitionList = new
ArrayList<>(partitionEpochMap.keySet());
+ Collections.sort(partitionList);
Review Comment:
I would disregard the copilot comment. The previous implementation was
non-deterministic too.
```suggestion
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -190,12 +193,21 @@ public Builder setState(MemberState state) {
return this;
}
- public Builder setAssignedPartitions(Map<Uuid, Set<Integer>>
assignedPartitions) {
+ public Builder setAssignedPartitions(Map<Uuid, Set<Integer>>
assignedPartitions, int assignmentEpoch) {
+ this.assignedPartitions = assignedPartitions.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().stream().collect(Collectors.toMap(p ->
p, p -> assignmentEpoch))
+ ));
Review Comment:
This operation is equivalent to `AssignmentTestUtil.toEpochsAssignment`.
Since we're adding a util method for this, we can remove
`setAssignedPartitions(assignedPartitions, assignmentEpoch)` and have the
caller use the util method. The util method will need moving to `Utils.java`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -336,16 +376,16 @@ public Optional<String> serverAssignorName() {
}
/**
- * @return The set of assigned partitions.
+ * @return The epoch-annotated assigned partitions map.
*/
- public Map<Uuid, Set<Integer>> assignedPartitions() {
+ public Map<Uuid, Map<Integer, Integer>> assignedPartitions() {
return assignedPartitions;
}
/**
- * @return The set of partitions awaiting revocation from the member.
+ * @return The epoch-annotated pending revocation partitions map.
Review Comment:
```suggestion
* @return The partitions awaiting revocation from this member and their
assignment epochs.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -417,9 +487,9 @@ public ConsumerGroupDescribeResponseData.Member
asConsumerGroupDescribeMember(
.setMemberEpoch(memberEpoch)
.setMemberId(memberId)
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
- .setTopicPartitions(topicPartitionsFromMap(assignedPartitions,
image)))
+
.setTopicPartitions(topicPartitionsFromEpochMap(assignedPartitions, image)))
Review Comment:
Here we create a new version of `topicPartitionsFromMap()` but in
`GroupMetadataManager.prepareAssignment` we clone the assignment without
epochs. We're inconsistent in our approaches.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -432,7 +502,21 @@ public ConsumerGroupDescribeResponseData.Member
asConsumerGroupDescribeMember(
.setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
}
- private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromMap(
+ private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromEpochMap(
+ Map<Uuid, Map<Integer, Integer>> partitions,
+ CoordinatorMetadataImage image
+ ) {
+ List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitions = new ArrayList<>();
+ partitions.forEach((topicId, partitionEpochMap) -> {
+ image.topicMetadata(topicId).ifPresent(topicMetadata ->
topicPartitions.add(new ConsumerGroupDescribeResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setTopicName(topicMetadata.name())
+ .setPartitions(new ArrayList<>(partitionEpochMap.keySet()))));
+ });
+ return topicPartitions;
+ }
+
+ private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromPartitionSet(
Review Comment:
```suggestion
private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromAssignmentWithoutEpochs(
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java:
##########
@@ -294,6 +324,10 @@ public void testAsConsumerGroupDescribeMember(boolean
withClassicMemberMetadata)
.build();
ConsumerGroupDescribeResponseData.Member actual =
member.asConsumerGroupDescribeMember(targetAssignment, new
KRaftCoordinatorMetadataImage(metadataImage));
+ // Sort partitions for comparison
+ actual.assignment().topicPartitions().forEach(tp ->
Collections.sort(tp.partitions()));
+ actual.targetAssignment().topicPartitions().forEach(tp ->
Collections.sort(tp.partitions()));
+
Review Comment:
Is this change necessary?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -264,14 +302,21 @@ public ConsumerGroupMember build() {
private final String serverAssignorName;
/**
- * The partitions being revoked by this member.
+ * The classic member metadata if the consumer uses the classic protocol.
*/
- private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
+ private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata;
/**
- * The classic member metadata if the consumer uses the classic protocol.
+ * The epoch at which each partition was assigned to this member.
+ * Map: topicId -> partitionId -> assignmentEpoch
Review Comment:
```suggestion
* The partitions assigned to this member and their assignment epochs.
* A map of topic ids to partitions to assignment epochs.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -62,6 +62,37 @@ public static Map<Uuid, Set<Integer>>
mkAssignment(Map.Entry<Uuid, Set<Integer>>
return Collections.unmodifiableMap(assignment);
}
+ /**
+ * Converts a regular assignment to an epochs-based assignment using the
given epoch.
+ */
+ public static Map<Uuid, Map<Integer, Integer>> toEpochsAssignment(
+ Map<Uuid, Set<Integer>> assignment,
+ int epoch
+ ) {
+ Map<Uuid, Map<Integer, Integer>> result = new LinkedHashMap<>();
+ for (Map.Entry<Uuid, Set<Integer>> entry : assignment.entrySet()) {
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partition : entry.getValue()) {
+ partitionEpochs.put(partition, epoch);
+ }
+ result.put(entry.getKey(),
Collections.unmodifiableMap(partitionEpochs));
+ }
+ return Collections.unmodifiableMap(result);
+ }
+
+ /**
+ * Converts an epochs-based assignment to a regular assignment (without
epochs).
+ */
+ public static Map<Uuid, Set<Integer>> toPartitionMap(
Review Comment:
```suggestion
/**
* Discards the assignment epochs from an assignment with epochs.
*/
public static Map<Uuid, Set<Integer>> toAssignmentWithoutEpochs(
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3874,8 +3891,8 @@ fooTopicName, computeTopicHash(fooTopicName, new
MetadataImageBuilder()
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(List.of("foo", "bar"))
.setServerAssignorName("range")
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .setAssignedPartitions(Map.of(
+ fooTopicId, new TreeMap<>(Map.of(0, 10, 1, 10, 2, 10, 3, 11,
4, 11, 5, 11))))
Review Comment:
This is pretty ugly and inconsistent with the other tests.
Can we write this as
```
mkAssignmentWithEpochs(
mkTopicAssignmentWithEpochs(fooTopicId, 10, 0, 1, 2, 3),
mkTopicAssignmentWithEpochs(fooTopicId, 11, 4, 5))
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -264,14 +302,21 @@ public ConsumerGroupMember build() {
private final String serverAssignorName;
/**
- * The partitions being revoked by this member.
+ * The classic member metadata if the consumer uses the classic protocol.
*/
- private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
+ private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata;
/**
- * The classic member metadata if the consumer uses the classic protocol.
+ * The epoch at which each partition was assigned to this member.
+ * Map: topicId -> partitionId -> assignmentEpoch
*/
- private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata;
+ private final Map<Uuid, Map<Integer, Integer>> assignedPartitions;
+
+ /**
+ * The epoch at which each partition pending revocation was assigned.
+ * Map: topicId -> partitionId -> assignmentEpoch
Review Comment:
```suggestion
* The partitions awaiting revocation from this member and their
assignment epochs.
* A map of topic ids to partitions to assignment epochs.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -7602,7 +7606,11 @@ private byte[] prepareAssignment(ConsumerGroupMember
member) {
try {
return ConsumerProtocol.serializeAssignment(
toConsumerProtocolAssignment(
- member.assignedPartitions(),
+ member.assignedPartitions().entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> Set.copyOf(e.getValue().keySet())
+ )),
Review Comment:
This is the same operation as `AssignmentTestUtil.toPartitionMap`. Can we
reuse the method?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -264,19 +302,21 @@ public ConsumerGroupMember build() {
private final String serverAssignorName;
/**
- * The partitions assigned to this member.
+ * The classic member metadata if the consumer uses the classic protocol.
*/
- private final Map<Uuid, Set<Integer>> assignedPartitions;
+ private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata;
Review Comment:
nit: Could we avoid reordering fields here? It makes it harder to review.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -360,28 +361,29 @@ private ConsumerGroupMember updateCurrentAssignment(
*
* @param memberEpoch The epoch of the member to use. This
may be different
* from the epoch in {@link
CurrentAssignmentBuilder#member}.
- * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @param memberAssignedPartitionsWithEpochs The assigned partitions with
epochs of the member to use.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember computeNextAssignment(
int memberEpoch,
- Map<Uuid, Set<Integer>> memberAssignedPartitions
+ Map<Uuid, Map<Integer, Integer>> memberAssignedPartitionsWithEpochs
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
boolean hasUnreleasedPartitions = false;
- Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>();
- Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new
HashMap<>();
+ Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs = new
HashMap<>();
+ Map<Uuid, Map<Integer, Integer>>
newPartitionsPendingRevocationWithEpochs = new HashMap<>();
Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = new
HashMap<>();
Set<Uuid> allTopicIds = new
HashSet<>(targetAssignment.partitions().keySet());
- allTopicIds.addAll(memberAssignedPartitions.keySet());
+ allTopicIds.addAll(memberAssignedPartitionsWithEpochs.keySet());
for (Uuid topicId : allTopicIds) {
Set<Integer> target = targetAssignment.partitions()
.getOrDefault(topicId, Set.of());
- Set<Integer> currentAssignedPartitions = memberAssignedPartitions
- .getOrDefault(topicId, Set.of());
+ Map<Integer, Integer> currentAssignedPartitionsWithEpochs =
memberAssignedPartitionsWithEpochs
+ .getOrDefault(topicId, Map.of());
+ Set<Integer> currentAssignedPartitions =
currentAssignedPartitionsWithEpochs.keySet();
Review Comment:
```suggestion
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -336,16 +376,16 @@ public Optional<String> serverAssignorName() {
}
/**
- * @return The set of assigned partitions.
+ * @return The epoch-annotated assigned partitions map.
Review Comment:
```suggestion
* @return The partitions assigned to this member and their assignment
epochs.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
Review Comment:
```
Map<Integer, Integer> assignedPartitions = new
HashMap<>(currentAssignedPartitionsWithEpochs);
assignedPartitions.keySet().retainAll(target);
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1045,18 +1045,18 @@ private void maybeRemovePartitionEpoch(
/**
* Removes the partition epochs based on the provided assignment.
*
- * @param assignment The assignment.
+ * @param assignmentWithEpochs The assignment with epochs.
* @param expectedEpoch The expected epoch.
* package-private for testing.
*/
void removePartitionEpochs(
- Map<Uuid, Set<Integer>> assignment,
+ Map<Uuid, Map<Integer, Integer>> assignmentWithEpochs,
int expectedEpoch
) {
- assignment.forEach((topicId, assignedPartitions) -> {
+ assignmentWithEpochs.forEach((topicId, partitionEpochs) -> {
Review Comment:
We are inconsistent in the naming of the `Map<partition, assignment epoch>`.
Here we call them `partitionEpochs`, but elsewhere we call them
`partitionEpochMap`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -356,7 +396,37 @@ public static boolean hasAssignedPartitionsChanged(
ConsumerGroupMember member1,
ConsumerGroupMember member2
) {
- return
!member1.assignedPartitions().equals(member2.assignedPartitions());
+ return
!member1.assignedPartitions.equals(member2.assignedPartitions());
+ }
+
+ /**
+ * Gets the assignment epoch for a specific partition.
Review Comment:
```suggestion
* Gets the assignment epoch for an assigned partition.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1045,18 +1045,18 @@ private void maybeRemovePartitionEpoch(
/**
* Removes the partition epochs based on the provided assignment.
*
- * @param assignment The assignment.
+ * @param assignmentWithEpochs The assignment with epochs.
* @param expectedEpoch The expected epoch.
* package-private for testing.
Review Comment:
The javadoc is now ambiguous. It's not clear whether we use the assignment
epochs or member epoch.
```suggestion
* Removes the partition epochs based on the provided assignment and
member epoch.
*
* @param assignment The assignment with epochs. The assignment
epochs are ignored.
* @param expectedEpoch The expected member epoch.
* package-private for testing.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -93,6 +93,36 @@ public static String assignmentToString(
return builder.toString();
}
+ /**
+ * @return The provided assignment with epochs as a String.
+ *
+ * Example:
+ * [topicid1-0@5, topicid1-1@5, topicid2-0@3, topicid2-1@3]
+ */
+ public static String assignmentEpochToString(
Review Comment:
```suggestion
public static String assignmentWithEpochsToString(
```
##########
clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java:
##########
@@ -79,12 +78,12 @@ public static ConsumerGroupHeartbeatResponse parse(Readable
readable, short vers
}
public static ConsumerGroupHeartbeatResponseData.Assignment
createAssignment(
- Map<Uuid, Set<Integer>> assignment
+ Map<Uuid, Map<Integer, Integer>> assignmentWithEpochs
Review Comment:
Could we not rename these everywhere unless absolutely necessary? I don't
think it helps much with clarity and makes the review harder.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1082,21 +1082,21 @@ void removePartitionEpochs(
/**
* Adds the partitions epoch based on the provided assignment.
*
- * @param assignment The assignment.
+ * @param assignmentWithEpochs The assignment with epochs.
* @param epoch The new epoch.
* @throws IllegalStateException if updating a partition with a smaller or
equal epoch.
* package-private for testing.
Review Comment:
```suggestion
* Adds the partitions epoch based on the provided assignment and member
epoch.
*
* @param assignment The assignment with epochs. The assignment
epochs are ignored.
* @param epoch The new member epoch.
* @throws IllegalStateException if updating a partition with a smaller
or equal member epoch.
* package-private for testing.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1045,18 +1045,18 @@ private void maybeRemovePartitionEpoch(
/**
* Removes the partition epochs based on the provided assignment.
*
- * @param assignment The assignment.
+ * @param assignmentWithEpochs The assignment with epochs.
* @param expectedEpoch The expected epoch.
Review Comment:
The renaming has broken the alignment in the javadocs, here and elsewhere.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -62,6 +62,37 @@ public static Map<Uuid, Set<Integer>>
mkAssignment(Map.Entry<Uuid, Set<Integer>>
return Collections.unmodifiableMap(assignment);
}
+ /**
+ * Converts a regular assignment to an epochs-based assignment using the
given epoch.
+ */
+ public static Map<Uuid, Map<Integer, Integer>> toEpochsAssignment(
+ Map<Uuid, Set<Integer>> assignment,
+ int epoch
+ ) {
+ Map<Uuid, Map<Integer, Integer>> result = new LinkedHashMap<>();
Review Comment:
Is it necessary to use a `LinkedHashMap`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
Review Comment:
```
Map<Integer, Integer> partitionsPendingRevocation = new
HashSet<>(currentAssignedPartitionsWithEpochs);
partitionsPendingRevocation.keySet().removeAll(assignedPartitions.keySet());
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -432,7 +502,21 @@ public ConsumerGroupDescribeResponseData.Member
asConsumerGroupDescribeMember(
.setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
}
- private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromMap(
+ private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromEpochMap(
Review Comment:
```suggestion
private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromAssignmentWithEpochs(
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java:
##########
@@ -81,8 +84,8 @@ public void testNewMember() {
assertEquals(Set.of("bar", "foo"), member.subscribedTopicNames());
assertEquals("regex", member.subscribedTopicRegex());
assertEquals("range", member.serverAssignorName().get());
- assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)),
member.assignedPartitions());
- assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)),
member.partitionsPendingRevocation());
+
assertEquals(toEpochsAssignment(mkAssignment(mkTopicAssignment(topicId1, 1, 2,
3)), 10), member.assignedPartitions());
+
assertEquals(toEpochsAssignment(mkAssignment(mkTopicAssignment(topicId2, 4, 5,
6)), 9), member.partitionsPendingRevocation());
Review Comment:
Could we add asserts for `ConsumerGroupMember.assignmentEpoch` and
`pendingRevocationEpoch`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -201,17 +231,31 @@ public static
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPar
}
/**
- * Creates a map of topic id and partition set from a list of consumer
group TopicPartitions.
+ * Creates a map of topic id and partition-epoch map from a list of
consumer group TopicPartitions.
*
- * @param topicPartitionsList The list of TopicPartitions.
- * @return a map of topic id and partition set.
+ * @param topicPartitions The list of TopicPartitions.
+ * @param defaultEpoch The default epoch to use when the epoch information
is not available for a partition.
+ * @return a map of topic id and partition-epoch map.
*/
- public static Map<Uuid, Set<Integer>> assignmentFromTopicPartitions(
- List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
topicPartitionsList
+ public static Map<Uuid, Map<Integer, Integer>>
assignmentFromTopicPartitions(
+ List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
topicPartitions,
+ int defaultEpoch
) {
- return topicPartitionsList.stream().collect(Collectors.toMap(
- ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId,
- topicPartitions -> Collections.unmodifiableSet(new
HashSet<>(topicPartitions.partitions()))));
+ // For legacy static member, the defaultEpoch could be
-2(LEAVE_GROUP_STATIC_MEMBER_EPOCH).
Review Comment:
```suggestion
// For legacy static member, the defaultEpoch could be -2
(LEAVE_GROUP_STATIC_MEMBER_EPOCH).
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java:
##########
@@ -243,8 +246,35 @@ public void
testUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
assertEquals(10, member.memberEpoch());
assertEquals(9, member.previousMemberEpoch());
- assertEquals(mkAssignment(mkTopicAssignment(topicId1, 0, 1, 2)),
member.assignedPartitions());
- assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)),
member.partitionsPendingRevocation());
+
assertEquals(toEpochsAssignment(mkAssignment(mkTopicAssignment(topicId1, 0, 1,
2)), 10), member.assignedPartitions());
+
assertEquals(toEpochsAssignment(mkAssignment(mkTopicAssignment(topicId2, 3, 4,
5)), 10), member.partitionsPendingRevocation());
+ }
+
+ @Test
+ public void
testUpdateWithConsumerGroupCurrentMemberAssignmentValueWithNegativeEpoch() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupCurrentMemberAssignmentValue record = new
ConsumerGroupCurrentMemberAssignmentValue()
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH) // -2
+ .setPreviousMemberEpoch(5)
+ .setAssignedPartitions(List.of(new
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(topicId1)
+ .setPartitions(Arrays.asList(0, 1, 2))))
+ .setPartitionsPendingRevocation(List.of(new
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(topicId2)
+ .setPartitions(Arrays.asList(3, 4, 5))));
+
+ ConsumerGroupMember member = new
ConsumerGroupMember.Builder("member-id")
+ .updateWith(record)
+ .build();
+
+ assertEquals(-2, member.memberEpoch());
+ assertEquals(5, member.previousMemberEpoch());
+
+ // Partition epoch should be 0, not -2.
Review Comment:
```suggestion
// Assignment epochs should be 0, not -2.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -404,47 +406,64 @@ private ConsumerGroupMember computeNextAssignment(
// Don't consider a partition unreleased if it is owned by the
current member
// because it is pending revocation. This is safe to do since
only a single member
// can own a partition at a time.
- !member.partitionsPendingRevocation().getOrDefault(topicId,
Set.of()).contains(partitionId)
+ !member.partitionsPendingRevocation().getOrDefault(topicId,
Map.of()).containsKey(partitionId)
) || hasUnreleasedPartitions;
+ // Build epochs map for assigned partitions, preserve existing
epochs
if (!assignedPartitions.isEmpty()) {
- newAssignedPartitions.put(topicId, assignedPartitions);
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partitionId : assignedPartitions) {
+ partitionEpochs.put(partitionId,
currentAssignedPartitionsWithEpochs.get(partitionId));
+ }
+ newAssignedPartitionsWithEpochs.put(topicId, partitionEpochs);
}
+ // Build epochs map for partitions pending revocation, preserve
existing epochs
if (!partitionsPendingRevocation.isEmpty()) {
- newPartitionsPendingRevocation.put(topicId,
partitionsPendingRevocation);
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partitionId : partitionsPendingRevocation) {
+ partitionEpochs.put(partitionId,
currentAssignedPartitionsWithEpochs.get(partitionId));
+ }
+ newPartitionsPendingRevocationWithEpochs.put(topicId,
partitionEpochs);
}
if (!partitionsPendingAssignment.isEmpty()) {
newPartitionsPendingAssignment.put(topicId,
partitionsPendingAssignment);
}
}
- if (!newPartitionsPendingRevocation.isEmpty() &&
ownsRevokedPartitions(newPartitionsPendingRevocation)) {
+ if (!newPartitionsPendingRevocationWithEpochs.isEmpty() &&
ownsRevokedPartitions(newPartitionsPendingRevocationWithEpochs)) {
// If there are partitions to be revoked, the member remains in
its current
// epoch and requests the revocation of those partitions. It
transitions to
// the UNREVOKED_PARTITIONS state to wait until the client
acknowledges the
// revocation of the partitions.
return new ConsumerGroupMember.Builder(member)
.setState(MemberState.UNREVOKED_PARTITIONS)
.updateMemberEpoch(memberEpoch)
- .setAssignedPartitions(newAssignedPartitions)
- .setPartitionsPendingRevocation(newPartitionsPendingRevocation)
+ .setAssignedPartitions(newAssignedPartitionsWithEpochs)
+
.setPartitionsPendingRevocation(newPartitionsPendingRevocationWithEpochs)
.build();
} else if (!newPartitionsPendingAssignment.isEmpty()) {
// If there are partitions to be assigned, the member transitions
to the
// target epoch and requests the assignment of those partitions.
Note that
// the partitions are directly added to the assigned partitions
set. The
// member transitions to the STABLE state or to the
UNRELEASED_PARTITIONS
// state depending on whether there are unreleased partitions or
not.
- newPartitionsPendingAssignment.forEach((topicId, partitions) ->
newAssignedPartitions
- .computeIfAbsent(topicId, __ -> new HashSet<>())
- .addAll(partitions));
+ // Add newly assigned partitions, preserving original epoch if
partition was pending revocation
+ newPartitionsPendingAssignment.forEach((topicId, partitions) -> {
+ Map<Integer, Integer> topicEpochs =
newAssignedPartitionsWithEpochs
+ .computeIfAbsent(topicId, __ -> new HashMap<>());
+ Map<Integer, Integer> pendingRevocationEpochs =
member.partitionsPendingRevocation()
+ .getOrDefault(topicId, Map.of());
+ for (Integer partitionId : partitions) {
+ topicEpochs.put(partitionId,
pendingRevocationEpochs.getOrDefault(partitionId, targetAssignmentEpoch));
+ }
+ });
Review Comment:
We cannot reach `computeNextAssignment` unless the member has revoked all of
its partitions pending revocation. I don't think it's reasonable for it to
perform an offset commit with an older epoch after revocation?
```suggestion
newPartitionsPendingAssignment.forEach((topicId, partitions) -> {
Map<Integer, Integer> topicEpochs =
newAssignedPartitionsWithEpochs
.computeIfAbsent(topicId, __ -> new HashMap<>());
for (Integer partitionId : partitions) {
topicEpochs.put(partitionId, targetAssignmentEpoch);
}
});
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -404,47 +406,64 @@ private ConsumerGroupMember computeNextAssignment(
// Don't consider a partition unreleased if it is owned by the
current member
// because it is pending revocation. This is safe to do since
only a single member
// can own a partition at a time.
- !member.partitionsPendingRevocation().getOrDefault(topicId,
Set.of()).contains(partitionId)
+ !member.partitionsPendingRevocation().getOrDefault(topicId,
Map.of()).containsKey(partitionId)
) || hasUnreleasedPartitions;
+ // Build epochs map for assigned partitions, preserve existing
epochs
if (!assignedPartitions.isEmpty()) {
- newAssignedPartitions.put(topicId, assignedPartitions);
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partitionId : assignedPartitions) {
+ partitionEpochs.put(partitionId,
currentAssignedPartitionsWithEpochs.get(partitionId));
+ }
+ newAssignedPartitionsWithEpochs.put(topicId, partitionEpochs);
}
Review Comment:
```suggestion
if (!assignedPartitions.isEmpty()) {
newAssignedPartitions.put(topicId, assignedPartitions);
}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -404,47 +406,64 @@ private ConsumerGroupMember computeNextAssignment(
// Don't consider a partition unreleased if it is owned by the
current member
// because it is pending revocation. This is safe to do since
only a single member
// can own a partition at a time.
- !member.partitionsPendingRevocation().getOrDefault(topicId,
Set.of()).contains(partitionId)
+ !member.partitionsPendingRevocation().getOrDefault(topicId,
Map.of()).containsKey(partitionId)
) || hasUnreleasedPartitions;
+ // Build epochs map for assigned partitions, preserve existing
epochs
if (!assignedPartitions.isEmpty()) {
- newAssignedPartitions.put(topicId, assignedPartitions);
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partitionId : assignedPartitions) {
+ partitionEpochs.put(partitionId,
currentAssignedPartitionsWithEpochs.get(partitionId));
+ }
+ newAssignedPartitionsWithEpochs.put(topicId, partitionEpochs);
}
+ // Build epochs map for partitions pending revocation, preserve
existing epochs
if (!partitionsPendingRevocation.isEmpty()) {
- newPartitionsPendingRevocation.put(topicId,
partitionsPendingRevocation);
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partitionId : partitionsPendingRevocation) {
+ partitionEpochs.put(partitionId,
currentAssignedPartitionsWithEpochs.get(partitionId));
+ }
+ newPartitionsPendingRevocationWithEpochs.put(topicId,
partitionEpochs);
}
Review Comment:
```suggestion
if (!partitionsPendingRevocation.isEmpty()) {
newPartitionsPendingRevocation.put(topicId,
partitionsPendingRevocation);
}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,55 +291,56 @@ private boolean ownsRevokedPartitions(
* This method is a lot faster than running the full reconciliation logic
in computeNextAssignment.
*
* @param memberEpoch The epoch of the member to use.
- * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @param memberAssignedPartitionsWithEpochs The assigned partitions with
epochs of the member to use.
Review Comment:
"epochs of the member" reads too much like "member epoch"
```suggestion
* @param memberAssignedPartitionsWithEpochs The assigned partitions of
the member to use and their assignment epochs.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -360,28 +361,29 @@ private ConsumerGroupMember updateCurrentAssignment(
*
* @param memberEpoch The epoch of the member to use. This
may be different
* from the epoch in {@link
CurrentAssignmentBuilder#member}.
- * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @param memberAssignedPartitionsWithEpochs The assigned partitions with
epochs of the member to use.
Review Comment:
```suggestion
* @param memberAssignedPartitionsWithEpochs The assigned partitions of
the member to use and their assignment epochs.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -201,17 +231,31 @@ public static
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPar
}
/**
- * Creates a map of topic id and partition set from a list of consumer
group TopicPartitions.
+ * Creates a map of topic id and partition-epoch map from a list of
consumer group TopicPartitions.
*
- * @param topicPartitionsList The list of TopicPartitions.
- * @return a map of topic id and partition set.
+ * @param topicPartitions The list of TopicPartitions.
+ * @param defaultEpoch The default epoch to use when the epoch information
is not available for a partition.
+ * @return a map of topic id and partition-epoch map.
Review Comment:
```suggestion
* @return a map of topic id and partitions with assignment epochs.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -201,17 +231,31 @@ public static
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> toTopicPar
}
/**
- * Creates a map of topic id and partition set from a list of consumer
group TopicPartitions.
+ * Creates a map of topic id and partition-epoch map from a list of
consumer group TopicPartitions.
Review Comment:
```suggestion
* Creates a map of topic id and partition with assignment epochs from a
list of consumer group TopicPartitions.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,21 +1414,21 @@ private void removeGroup(
* it owns any other partitions.
*
* @param ownedTopicPartitions The partitions provided by the consumer in
the request.
- * @param target The partitions that the member should have.
+ * @param target The partitions with epochs that the member
should have.
Review Comment:
```suggestion
* @param target The partitions that the member should
have with assignment epochs.
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3561,14 +3573,18 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
result.response()
);
+ // member2: partition 3 (fooTopicId) and 2 (barTopicId) were retained
from epoch 10,
+ // partition 2 (fooTopicId) is newly assigned at epoch 11
+ Map<Uuid, Map<Integer, Integer>> member2ExpectedAssignment = Map.of(
+ fooTopicId, new TreeMap<>(Map.of(2, 11, 3, 10)),
Review Comment:
Why do we need `TreeMap`s now? They weren't there before.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -199,10 +200,10 @@ public void testUpdatingMemberUpdatesPartitionEpoch() {
member = new ConsumerGroupMember.Builder("member")
.setMemberEpoch(10)
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 1, 2, 3)))
- .setPartitionsPendingRevocation(mkAssignment(
- mkTopicAssignment(barTopicId, 4, 5, 6)))
+ .setAssignedPartitions(toEpochsAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1, 2, 3)), 10))
+ .setPartitionsPendingRevocation(toEpochsAssignment(mkAssignment(
+ mkTopicAssignment(barTopicId, 4, 5, 6)), 10))
Review Comment:
For these partition epoch tests, can we choose an older assignment epoch?
They should fail if the implementation uses the assignment epoch instead of the
member epoch.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java:
##########
@@ -57,9 +58,9 @@ public void testStableToStable() {
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
- .setAssignedPartitions(mkAssignment(
+ .setAssignedPartitions(toEpochsAssignment(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3),
- mkTopicAssignment(topicId2, 4, 5, 6)))
Review Comment:
For these tests, could we pick an older assignment epoch, to confirm that
the implementation is not resetting them to the latest member epoch?
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/CurrentAssignmentBuilderBenchmark.java:
##########
Review Comment:
```
.setMemberEpoch(memberEpoch)
.setPreviousMemberEpoch(memberEpoch)
```
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java:
##########
@@ -131,12 +131,16 @@ public static GroupSpec createConsumerGroupSpec(
for (Map.Entry<String, ConsumerGroupMember> memberEntry :
members.entrySet()) {
String memberId = memberEntry.getKey();
ConsumerGroupMember member = memberEntry.getValue();
+ Map<Uuid, Set<Integer>> partitions = new HashMap<>();
+ member.assignedPartitions().forEach((topicId, partitionEpochMap) ->
+ partitions.put(topicId, partitionEpochMap.keySet())
+ );
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicResolver),
- new Assignment(member.assignedPartitions())
+ new Assignment(partitions)
Review Comment:
```suggestion
new
Assignment(AssignmentTestUtil.toPartitionMap(member.assignedPartitions()))
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -62,6 +62,37 @@ public static Map<Uuid, Set<Integer>>
mkAssignment(Map.Entry<Uuid, Set<Integer>>
return Collections.unmodifiableMap(assignment);
}
+ /**
+ * Converts a regular assignment to an epochs-based assignment using the
given epoch.
+ */
+ public static Map<Uuid, Map<Integer, Integer>> toEpochsAssignment(
Review Comment:
```suggestion
/**
* Adds the given assignment epoch to an assignment without epochs.
*/
public static Map<Uuid, Map<Integer, Integer>> toAssignmentWithEpochs(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]