mjsax commented on code in PR #22213:
URL: https://github.com/apache/kafka/pull/22213#discussion_r3417141917
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -579,10 +581,20 @@ private void onSuccessResponse(final
StreamsGroupHeartbeatResponse response, fin
if (statuses != null) {
streamsRebalanceData.setStatuses(statuses);
if (!statuses.isEmpty()) {
- String statusDetails = statuses.stream()
- .map(status -> "(" + status.statusCode() + ") " +
status.statusDetail())
- .collect(Collectors.joining(", "));
- logger.warn("Membership is in the following statuses: {}",
statusDetails);
+ List<String> statusesToLog = new ArrayList<>();
+ for (StreamsGroupHeartbeatResponseData.Status status :
statuses) {
+ if (status.statusCode() ==
StreamsGroupHeartbeatResponse.Status.MISSING_CLIENT_TAGS.code()) {
+ if
(!status.statusDetail().equals(lastMissingClientTagsDetail)) {
+ lastMissingClientTagsDetail =
status.statusDetail();
Review Comment:
I think we also need to reset `lastMissingClientTagsDetail = null`, if no
reported `status == MISSING_CLIENT_TAGS` ?
Assume the client has configure tag `foo`. The broker also has configured
`foo`. All good. Next, broker side config change to require `foo,bar`. Client
log an error ones. -- Next, broker is reverted to only require `foo`, so we
should clear the flag, as we are not in any error state any longer
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -1700,6 +1701,94 @@ private ClientResponse buildClientErrorResponse(final
Errors error, final String
);
}
+ @Test
+ public void testMissingClientTagsStatusLogsWarningOnlyOnce() {
+ try (
+ final MockedConstruction<HeartbeatRequestState> ignored =
mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
+ final LogCaptureAppender logAppender =
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
+ ) {
+
logAppender.setClassLogger(StreamsGroupHeartbeatRequestManager.class,
Level.WARN);
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+ when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
+ final String statusDetail = "Missing required client tags for
rack-aware standby assignment: [zone, cluster]";
+
+ // First heartbeat with MISSING_CLIENT_TAGS status
+ final NetworkClientDelegate.PollResult result1 =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result1.unsentRequests.size());
+
+ final ClientResponse response1 = new ClientResponse(
+ new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, (short) 1,
"", 1),
+ null, "-1", time.milliseconds(), time.milliseconds(), false,
null, null,
+ new StreamsGroupHeartbeatResponse(
+ new StreamsGroupHeartbeatResponseData()
+ .setHeartbeatIntervalMs((int)
RECEIVED_HEARTBEAT_INTERVAL_MS)
+ .setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_CLIENT_TAGS.code())
+ .setStatusDetail(statusDetail)))
+ )
+ );
+ result1.unsentRequests.get(0).handler().onComplete(response1);
+
+ long firstWarnCount = logAppender.getMessages("WARN").stream()
+ .filter(m -> m.contains("Missing required client tags"))
+ .count();
+ assertEquals(1, firstWarnCount);
+
+ // Second heartbeat with the same status — should NOT log again
+ final NetworkClientDelegate.PollResult result2 =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result2.unsentRequests.size());
+
+ final ClientResponse response2 = new ClientResponse(
+ new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, (short) 1,
"", 1),
+ null, "-1", time.milliseconds(), time.milliseconds(), false,
null, null,
+ new StreamsGroupHeartbeatResponse(
+ new StreamsGroupHeartbeatResponseData()
+ .setHeartbeatIntervalMs((int)
RECEIVED_HEARTBEAT_INTERVAL_MS)
+ .setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_CLIENT_TAGS.code())
+ .setStatusDetail(statusDetail)))
+ )
+ );
+ result2.unsentRequests.get(0).handler().onComplete(response2);
+
+ long secondWarnCount = logAppender.getMessages("WARN").stream()
+ .filter(m -> m.contains("Missing required client tags"))
+ .count();
+ assertEquals(1, secondWarnCount, "MISSING_CLIENT_TAGS warning
should not be logged again for the same detail");
+
+ // Third heartbeat with a DIFFERENT status detail — should log
again
+ final String changedStatusDetail = "Missing required client tags
for rack-aware standby assignment: [zone]";
+
+ final NetworkClientDelegate.PollResult result3 =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result3.unsentRequests.size());
+
+ final ClientResponse response3 = new ClientResponse(
+ new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, (short) 1,
"", 1),
+ null, "-1", time.milliseconds(), time.milliseconds(), false,
null, null,
+ new StreamsGroupHeartbeatResponse(
+ new StreamsGroupHeartbeatResponseData()
+ .setHeartbeatIntervalMs((int)
RECEIVED_HEARTBEAT_INTERVAL_MS)
+ .setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_CLIENT_TAGS.code())
+ .setStatusDetail(changedStatusDetail)))
+ )
+ );
+ result3.unsentRequests.get(0).handler().onComplete(response3);
+
+ long thirdWarnCount = logAppender.getMessages("WARN").stream()
+ .filter(m -> m.contains("Missing required client tags"))
+ .count();
+ assertEquals(2, thirdWarnCount, "MISSING_CLIENT_TAGS warning
should be logged again when the detail changes");
Review Comment:
Also check that only `[zone]` is logged now in the second log line (might
require a second `assertEquals`
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java:
##########
@@ -132,6 +133,7 @@ public void testFromPropsInvalid() {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_boolean");
+ } else if
(GroupConfig.STREAMS_RACK_AWARE_ASSIGNMENT_TAGS_CONFIG.equals(name)) {
Review Comment:
Why is the block empty?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -1700,6 +1701,94 @@ private ClientResponse buildClientErrorResponse(final
Errors error, final String
);
}
+ @Test
+ public void testMissingClientTagsStatusLogsWarningOnlyOnce() {
+ try (
+ final MockedConstruction<HeartbeatRequestState> ignored =
mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true));
+ final LogCaptureAppender logAppender =
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
+ ) {
+
logAppender.setClassLogger(StreamsGroupHeartbeatRequestManager.class,
Level.WARN);
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+ when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
+ final String statusDetail = "Missing required client tags for
rack-aware standby assignment: [zone, cluster]";
+
+ // First heartbeat with MISSING_CLIENT_TAGS status
+ final NetworkClientDelegate.PollResult result1 =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result1.unsentRequests.size());
+
+ final ClientResponse response1 = new ClientResponse(
+ new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, (short) 1,
"", 1),
+ null, "-1", time.milliseconds(), time.milliseconds(), false,
null, null,
+ new StreamsGroupHeartbeatResponse(
+ new StreamsGroupHeartbeatResponseData()
+ .setHeartbeatIntervalMs((int)
RECEIVED_HEARTBEAT_INTERVAL_MS)
+ .setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_CLIENT_TAGS.code())
+ .setStatusDetail(statusDetail)))
+ )
+ );
+ result1.unsentRequests.get(0).handler().onComplete(response1);
+
+ long firstWarnCount = logAppender.getMessages("WARN").stream()
+ .filter(m -> m.contains("Missing required client tags"))
Review Comment:
Should we also check if `[zone, cluster]` is correctly contained ?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2206,6 +2207,25 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
)
));
+ String rackAwareTagsValue =
currentAssignmentConfigs.getOrDefault("rack.aware.assignment.tags", "");
Review Comment:
@lucasbru -- Wondering why `currentAssignmentConfigs` is `Map<String,
String>`? Would be convenient here, if the values would be properly types here,
so we don't have to parse the tag-list here? Atm, we would parse it on every
heartbeat..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]