mjsax commented on code in PR #20600:
URL: https://github.com/apache/kafka/pull/20600#discussion_r2408950220
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -837,7 +853,7 @@ private void maybeUpdateGroupState() {
if (members.isEmpty()) {
newState = EMPTY;
clearShutdownRequestMemberId();
- } else if (topology().isEmpty() || configuredTopology().isEmpty() ||
!configuredTopology().get().isReady()) {
+ } else if (topology().filter(x -> x.topologyEpoch() ==
validatedTopologyEpoch.get()).isEmpty()) {
Review Comment:
```suggestion
} else if (topology().filter(t -> t.topologyEpoch() ==
validatedTopologyEpoch.get()).isEmpty()) {
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -148,9 +148,9 @@ public static class DeadlineAndEpoch {
private final TimelineHashMap<String, String> staticMembers;
/**
- * The metadata associated with each subscribed topic name.
+ * The topology epoch for which the subscribed topics identified by
metadataHash are validated.
*/
- private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+ protected final TimelineInteger validatedTopologyEpoch;
Review Comment:
Why do we add this as `protected` but not `private`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16035,7 +16038,7 @@ public void testStreamsGroupMemberEpochValidation() {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
member));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
100, 0));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
100, 0, 0));
Review Comment:
Any reason why we use `0` instead of `-1` here (ie, compared to the test
changes above)?
##########
group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json:
##########
@@ -24,6 +24,8 @@
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
- "about": "The hash of all topics in the group." }
+ "about": "The hash of all topics in the group." },
+ { "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions":
"0+", "tag": 0, "default": -1, "type": "int32",
Review Comment:
Given that we are in EA only, (besides being nice), is there any reason to
do it this way (or and disadvantage doing it this way)?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -250,19 +250,20 @@ public void testNewStreamsGroupMemberTombstoneRecord() {
}
@Test
- public void testNewStreamsGroupEpochRecord() {
+ public void testNewStreamsGroupMetadataRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new StreamsGroupMetadataKey()
.setGroupId(GROUP_ID),
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(42)
- .setMetadataHash(42),
+ .setMetadataHash(42)
+ .setValidatedTopologyEpoch(43),
(short) 0
)
);
- assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42, 42));
+ assertEquals(expectedRecord,
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 42,
43));
Review Comment:
Using 42 twice but now 43 is kinda weird? Should we use a different value
for all three numbers, or three times the same one?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]