mjsax commented on code in PR #22213:
URL: https://github.com/apache/kafka/pull/22213#discussion_r3203826150
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -553,6 +557,7 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMaxSize;
private final int streamsGroupNumStandbyReplicas;
private final int streamsGroupMaxStandbyReplicas;
+ private final List<String> streamsGroupRackAwareAssignmentTags;
Review Comment:
Doesn't really matter, but why do insert this here, and not at the end of he
list?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2140,11 +2141,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
}
// Prepare the response.
+ String rackAwareTagsValue =
currentAssignmentConfigs.getOrDefault("rack.aware.assignment.tags", "");
+ List<String> rackAwareAssignmentTags = rackAwareTagsValue.isEmpty()
+ ? Collections.emptyList()
+ : Arrays.asList(rackAwareTagsValue.split(",", -1));
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId));
+ .setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId))
+ .setRackAwareAssignmentTags(rackAwareAssignmentTags);
Review Comment:
Yes.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -533,6 +533,18 @@ private void onSuccessResponse(final
StreamsGroupHeartbeatResponse response, fin
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
+ if (data.rackAwareAssignmentTags() != null) {
+ Set<String> clientTagKeys =
streamsRebalanceData.clientTags().keySet();
+ for (String requiredTag : data.rackAwareAssignmentTags()) {
+ if (!clientTagKeys.contains(requiredTag)) {
+ logger.warn("Broker requires client tag '{}' for
rack-aware standby assignment, " +
+ "but this client does not have it configured. " +
+ "Configure it via 'client.tag.{}' in your Streams
config.",
+ requiredTag, requiredTag);
+ }
Review Comment:
Not sure what you mean by "reject it" @chickenchickenlove ?
`rack.aware.assignment.tags` are the tags configure for the assignor to use,
to compute the assignment. The client is not using them by itself, so there is
nothing to be rejected.
The logging we do here is to inform the developer/operator of the client,
that the broker uses some tags for rack aware assignment, but the client
doesn't set these tags, to rack aware assignment might not work as expected.
##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
Review Comment:
This will be fixed via https://github.com/apache/kafka/pull/21799
##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -52,6 +52,8 @@
"about": "The maximal lag a warm-up task can have to be considered
caught-up." },
{ "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
"about": "The interval in which the task changelog offsets on a client
are updated on the broker. The offsets are sent with the next heartbeat after
this time has passed." },
+ { "name": "RackAwareAssignmentTags", "type": "[]string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
+ "about": "The list of client tag keys used for rack-aware standby task
assignment. Null if unchanged since last heartbeat." },
Review Comment:
Yes, this need to get changed to `"versions": "1+"`, and
`"nullableVersions": "1+"`
But this PR depends on https://github.com/apache/kafka/pull/21799 -- do I
guess it's ok for now and we can fix after the other PR got merged
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -292,6 +298,11 @@ public final class GroupConfig extends AbstractConfig {
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
+ .define(STREAMS_RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+ LIST,
+
GroupCoordinatorConfig.STREAMS_GROUP_RACK_AWARE_ASSIGNMENT_TAGS_DEFAULT,
+ MEDIUM,
+
GroupCoordinatorConfig.STREAMS_GROUP_RACK_AWARE_ASSIGNMENT_TAGS_DOC)
Review Comment:
I believe that `LIST` type does take care of this already? We have an
existing test for the "classic" client side config,
`StreamsConfigTest#shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithEmptyTag`,
and it should work the same broker side?
Might be good to add a test for it (maybe both, coordinator-config, and
group-config)?
What also reminds me, that we might want to also have guard on both configs
(especially the coordinator one -- for the group one, we can simply reject the
config change with an error?) to also ensure that the group-coordinator starts
up, even if `group.streams.rack.aware.assignment.tags` is set so something
invalid (and maybe fall back to default empty "string"). -- Would be good to
hear from one of @lucasbru @lianetm @AndrewJSchofield @dajac @squah-confluent
about it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -693,7 +699,6 @@ public GroupCoordinatorConfig(AbstractConfig config) {
String.format("%s must be less than or equal to %s",
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
-
Review Comment:
nit: keep double empty line to separate "share" from "streams" section
clearly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]