lucasbru commented on code in PR #20702:
URL: https://github.com/apache/kafka/pull/20702#discussion_r2431739687
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java:
##########
@@ -85,7 +85,7 @@ public List<CoordinatorRecord> build() {
// Add group epoch record.
records.add(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
groupEpoch, metadataHash, validatedTopologyEpoch));
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
groupEpoch, metadataHash, validatedTopologyEpoch, Map.of()));
Review Comment:
I think we should add assignmentConfigs to the builder as well an pass it
here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -231,6 +237,9 @@ public StreamsGroup(
this.currentWarmupTaskToProcessIds = new
TimelineHashMap<>(snapshotRegistry, 0);
this.topology = new TimelineObject<>(snapshotRegistry,
Optional.empty());
this.configuredTopology = new TimelineObject<>(snapshotRegistry,
Optional.empty());
+ this.assignmentConfigs = new TimelineHashMap<>(snapshotRegistry, 0);
+ // Set default assignment configuration
Review Comment:
I would put any default assignment configs here. We will update the value
inside `GroupMetadataManager`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -210,6 +210,12 @@ public static class DeadlineAndEpoch {
*/
private int endpointInformationEpoch = -1;
+ /**
+ * The assignment configurations for this streams group.
+ * This is used to determine when assignment configuration changes should
trigger a rebalance.
+ */
+ private TimelineHashMap<String, String> assignmentConfigs;
Review Comment:
This may differ from the current assignment configuration for the group, so
I wonder if we shouldn't just `lastUsedAssignmentConfig` instead.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -5419,6 +5429,13 @@ public void replay(
streamsGroup.setGroupEpoch(value.epoch());
streamsGroup.setMetadataHash(value.metadataHash());
streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch());
+
+ if (value.assignmentConfigs() != null) {
+ for (StreamsGroupMetadataValue.AssignmentConfig config :
value.assignmentConfigs()) {
+ streamsGroup.setAssignmentConfig(config.key(),
config.value());
Review Comment:
I think this would never "unset" a configuration. For example, if we remove
a configuration, we'd probably want to also remove it from the StreamsGroup
object. I would change setAssignmentConfig to setAssignmentConfigs and pass in
a whole collection. Inside the method you can clear and putAll on the internal
collection.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -100,18 +100,30 @@ public static CoordinatorRecord
newStreamsGroupMetadataRecord(
String groupId,
int newGroupEpoch,
long metadataHash,
- int validatedTopologyEpoch
+ int validatedTopologyEpoch,
+ Map<String, String> assignmentConfigs
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
Review Comment:
I think we should also require assignmentConfigs to be non-null here and add
a corresponding test. When we read a record, it may happen that the record does
not have the field (yet) and is null. But when we write a record, we should
always have assignmentConfigs.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -100,18 +100,30 @@ public static CoordinatorRecord
newStreamsGroupMetadataRecord(
String groupId,
int newGroupEpoch,
long metadataHash,
- int validatedTopologyEpoch
+ int validatedTopologyEpoch,
+ Map<String, String> assignmentConfigs
) {
Objects.requireNonNull(groupId, "groupId should not be null here");
+ List<StreamsGroupMetadataValue.AssignmentConfig> assignmentConfigList
= new ArrayList<>();
+ if (assignmentConfigs != null && !assignmentConfigs.isEmpty()) {
Review Comment:
If we require non-null above, remove the check here.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
Review Comment:
Please add assingment configs to this test
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18815,6 +18815,66 @@ public void
testStreamsGroupEndpointInformationOnlyWhenEpochGreater() {
assertNull(result.response().data().partitionsByUserEndpoint());
}
+ @Test
+ public void testStreamsGroupEpochIncreaseWithAssignmentConfigChanges() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 0)
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2,
3, 4, 5)))
+ .build())
+ .withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
+ .withTargetAssignmentEpoch(10)
+ .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+ )
+ .build();
+
+ // Change the assignment config
+ Properties newConfig = new Properties();
+ newConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2");
+ context.groupConfigManager.updateGroupConfig(groupId, newConfig);
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+ );
+
+ context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10));
+
+ // Verify that group epoch was bumped
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+ int newGroupEpoch = group.groupEpoch();
+ assertEquals(11, newGroupEpoch);
+ assertEquals("2",
group.assignmentConfigs().get("num.standby.replicas"));
Review Comment:
Please also check that the records returned by `streamsGroupHeartbeat`
contain the record with the correct updated config.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8647,10 +8664,21 @@ private TaskAssignor streamsGroupAssignor(String
groupId) {
* Get the assignor of the provided streams group.
*/
private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
Review Comment:
I don't think we need to change this method. This will just return the
current assignment config
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1949,6 +1949,15 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
reconfigureTopology = true;
}
+ // Check if assignment configurations have changed
+ Map<String, String> currentAssignmentConfigs =
streamsGroupAssignmentConfigs(groupId);
+ Map<String, String> storedAssignmentConfigs =
group.assignmentConfigs();
+ if (!currentAssignmentConfigs.equals(storedAssignmentConfigs)) {
Review Comment:
maybe also skip this branch if bumpGroupEpoch is already true, otherwise we
will get the log message even for new groups.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4298,7 +4308,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord>
streamsGroupFenceMember(
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
- records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch,
group.metadataHash(), group.validatedTopologyEpoch()));
+ records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch,
group.metadataHash(), group.validatedTopologyEpoch(), Map.of()));
Review Comment:
Wait, why are dropping the assignment configuration on the floor here? We
should use group.assignmentConfiguration here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1978,7 +1987,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
- records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch));
+ Map<String, String> assignmentConfigs =
streamsGroupAssignmentConfigs(groupId);
Review Comment:
We already have currentAssignmentConfigs above, can we not use it here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1949,6 +1949,15 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
reconfigureTopology = true;
}
+ // Check if assignment configurations have changed
Review Comment:
I think this should be moved outside of this if. We can check it right
before actually bumping the group epoch, so in a sense, it would become step 3c
after step 3b.
##########
group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json:
##########
@@ -26,6 +26,20 @@
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." },
{ "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions":
"0+", "tag": 0, "default": -1, "type": "int32",
- "about": "The topology epoch whose topics where validated to be present
in a valid configuration in the metadata." }
+ "about": "The topology epoch whose topics where validated to be present
in a valid configuration in the metadata." },
+ {
+ "name": "AssignmentConfigs",
+ "type": "[]AssignmentConfig",
+ "versions": "0+",
Review Comment:
I think we want to add this as a tagged field, so that brokers that were
using EA in 4.1 already can still read the record. Something like
```"taggedVersions": "0+", "nullableVersions": "0+", "tag": 1, "default":
null, ```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
Review Comment:
I would pass currentAssignmentConfigs into `updateStreamsTargetAssignment`
so that we do not have to fetch it again inside.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18815,6 +18815,66 @@ public void
testStreamsGroupEndpointInformationOnlyWhenEpochGreater() {
assertNull(result.response().data().partitionsByUserEndpoint());
}
+ @Test
+ public void testStreamsGroupEpochIncreaseWithAssignmentConfigChanges() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 0)
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2,
3, 4, 5)))
+ .build())
+ .withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
+ .withTargetAssignmentEpoch(10)
+ .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
Review Comment:
I think we have to set `verifiedTopologyEpoch` to 0 here, otherwise the
group epoch will be bumped because of the topology verification.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9739,7 +9739,7 @@ public void
testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
StreamsGroupMember.Builder memberBuilder1 =
streamsGroupMemberBuilderWithDefaults(memberId1);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
memberBuilder1.build()));
-
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 1, 0, -1));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 1, 0, -1, null));
Review Comment:
These tests will have to be updated to not pass `null` anymore but
`Map.of()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]