AndrewJSchofield commented on code in PR #16963:
URL: https://github.com/apache/kafka/pull/16963#discussion_r1728912783
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -275,98 +242,98 @@ private NetworkClientDelegate.UnsentRequest
makeHeartbeatRequest(final long curr
final
boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(ignoreResponse);
heartbeatRequestState.onSendAttempt(currentTimeMs);
- membershipManager.onHeartbeatRequestGenerated();
+ membershipManager().onHeartbeatRequestGenerated();
metricsManager.recordHeartbeatSentMs(currentTimeMs);
heartbeatRequestState.resetTimer();
return request;
}
+ @SuppressWarnings("unchecked")
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
boolean ignoreResponse) {
- NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
- new
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
- coordinatorRequestManager.coordinator());
+ NetworkClientDelegate.UnsentRequest request = buildHeartbeatRequest();
if (ignoreResponse)
return logResponse(request);
else
return request.whenComplete((response, exception) -> {
long completionTimeMs = request.handler().completionTimeMs();
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
- onResponse((ConsumerGroupHeartbeatResponse)
response.responseBody(), completionTimeMs);
+ onResponse((HBR) response.responseBody(),
completionTimeMs);
} else {
onFailure(exception, completionTimeMs);
}
});
}
+ @SuppressWarnings("unchecked")
private NetworkClientDelegate.UnsentRequest logResponse(final
NetworkClientDelegate.UnsentRequest request) {
return request.whenComplete((response, exception) -> {
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
Errors error =
- Errors.forCode(((ConsumerGroupHeartbeatResponse)
response.responseBody()).data().errorCode());
+ Errors.forCode(errorCodeForResponse((HBR)
response.responseBody()));
if (error == Errors.NONE)
- logger.debug("GroupHeartbeat responded successfully: {}",
response);
+ logger.debug("{} responded successfully: {}",
heartbeatRequestName(), response);
else
- logger.error("GroupHeartbeat failed because of {}: {}",
error, response);
+ logger.error("{} failed because of {}: {}",
heartbeatRequestName(), error, response);
} else {
- logger.error("GroupHeartbeat failed because of unexpected
exception.", exception);
+ logger.error("{} failed because of unexpected exception.",
heartbeatRequestName(), exception);
}
});
}
private void onFailure(final Throwable exception, final long
responseTimeMs) {
this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
- this.heartbeatState.reset();
- membershipManager.onHeartbeatFailure(exception instanceof
RetriableException);
+ resetHeartbeatState();
+ membershipManager().onHeartbeatFailure(exception instanceof
RetriableException);
if (exception instanceof RetriableException) {
- String message = String.format("GroupHeartbeatRequest failed
because of the retriable exception. " +
- "Will retry in %s ms: %s",
- heartbeatRequestState.remainingBackoffMs(responseTimeMs),
- exception.getMessage());
+ String message = String.format("%s failed because of the retriable
exception. Will retry in %s ms: %s",
+ heartbeatRequestName(),
+ heartbeatRequestState.remainingBackoffMs(responseTimeMs),
+ exception.getMessage());
logger.debug(message);
} else {
- logger.error("GroupHeartbeatRequest failed due to fatal error: " +
exception.getMessage());
+ logger.error("{} failed due to fatal error: {}",
heartbeatRequestName(), exception.getMessage());
handleFatalFailure(exception);
}
}
- private void onResponse(final ConsumerGroupHeartbeatResponse response,
long currentTimeMs) {
- if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
-
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
+ private void onResponse(final HBR response, long currentTimeMs) {
+ if (Errors.forCode(errorCodeForResponse(response)) == Errors.NONE) {
+
heartbeatRequestState.updateHeartbeatIntervalMs(heartbeatIntervalForResponse(response));
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
- membershipManager.onHeartbeatSuccess(response.data());
+ membershipManager().onHeartbeatSuccess(responseData(response));
return;
}
onErrorResponse(response, currentTimeMs);
}
- private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
+ private void onErrorResponse(final HBR response,
final long currentTimeMs) {
- Errors error = Errors.forCode(response.data().errorCode());
- String errorMessage = response.data().errorMessage();
+ Errors error = Errors.forCode(errorCodeForResponse(response));
Review Comment:
Agreed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]