AndrewJSchofield commented on code in PR #16963:
URL: https://github.com/apache/kafka/pull/16963#discussion_r1728912783


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -275,98 +242,98 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
                                                                      final 
boolean ignoreResponse) {
         NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
         heartbeatRequestState.onSendAttempt(currentTimeMs);
-        membershipManager.onHeartbeatRequestGenerated();
+        membershipManager().onHeartbeatRequestGenerated();
         metricsManager.recordHeartbeatSentMs(currentTimeMs);
         heartbeatRequestState.resetTimer();
         return request;
     }
 
+    @SuppressWarnings("unchecked")
     private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
-        NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
-            coordinatorRequestManager.coordinator());
+        NetworkClientDelegate.UnsentRequest request = buildHeartbeatRequest();
         if (ignoreResponse)
             return logResponse(request);
         else
             return request.whenComplete((response, exception) -> {
                 long completionTimeMs = request.handler().completionTimeMs();
                 if (response != null) {
                     
metricsManager.recordRequestLatency(response.requestLatencyMs());
-                    onResponse((ConsumerGroupHeartbeatResponse) 
response.responseBody(), completionTimeMs);
+                    onResponse((HBR) response.responseBody(), 
completionTimeMs);
                 } else {
                     onFailure(exception, completionTimeMs);
                 }
             });
     }
 
+    @SuppressWarnings("unchecked")
     private NetworkClientDelegate.UnsentRequest logResponse(final 
NetworkClientDelegate.UnsentRequest request) {
         return request.whenComplete((response, exception) -> {
             if (response != null) {
                 
metricsManager.recordRequestLatency(response.requestLatencyMs());
                 Errors error =
-                    Errors.forCode(((ConsumerGroupHeartbeatResponse) 
response.responseBody()).data().errorCode());
+                    Errors.forCode(errorCodeForResponse((HBR) 
response.responseBody()));
                 if (error == Errors.NONE)
-                    logger.debug("GroupHeartbeat responded successfully: {}", 
response);
+                    logger.debug("{} responded successfully: {}", 
heartbeatRequestName(), response);
                 else
-                    logger.error("GroupHeartbeat failed because of {}: {}", 
error, response);
+                    logger.error("{} failed because of {}: {}", 
heartbeatRequestName(), error, response);
             } else {
-                logger.error("GroupHeartbeat failed because of unexpected 
exception.", exception);
+                logger.error("{} failed because of unexpected exception.", 
heartbeatRequestName(), exception);
             }
         });
     }
 
     private void onFailure(final Throwable exception, final long 
responseTimeMs) {
         this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
-        this.heartbeatState.reset();
-        membershipManager.onHeartbeatFailure(exception instanceof 
RetriableException);
+        resetHeartbeatState();
+        membershipManager().onHeartbeatFailure(exception instanceof 
RetriableException);
         if (exception instanceof RetriableException) {
-            String message = String.format("GroupHeartbeatRequest failed 
because of the retriable exception. " +
-                    "Will retry in %s ms: %s",
-                heartbeatRequestState.remainingBackoffMs(responseTimeMs),
-                exception.getMessage());
+            String message = String.format("%s failed because of the retriable 
exception. Will retry in %s ms: %s",
+                    heartbeatRequestName(),
+                    heartbeatRequestState.remainingBackoffMs(responseTimeMs),
+                    exception.getMessage());
             logger.debug(message);
         } else {
-            logger.error("GroupHeartbeatRequest failed due to fatal error: " + 
exception.getMessage());
+            logger.error("{} failed due to fatal error: {}", 
heartbeatRequestName(), exception.getMessage());
             handleFatalFailure(exception);
         }
     }
 
-    private void onResponse(final ConsumerGroupHeartbeatResponse response, 
long currentTimeMs) {
-        if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
-            
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
+    private void onResponse(final HBR response, long currentTimeMs) {
+        if (Errors.forCode(errorCodeForResponse(response)) == Errors.NONE) {
+            
heartbeatRequestState.updateHeartbeatIntervalMs(heartbeatIntervalForResponse(response));
             heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
-            membershipManager.onHeartbeatSuccess(response.data());
+            membershipManager().onHeartbeatSuccess(responseData(response));
             return;
         }
         onErrorResponse(response, currentTimeMs);
     }
 
-    private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
+    private void onErrorResponse(final HBR response,
                                  final long currentTimeMs) {
-        Errors error = Errors.forCode(response.data().errorCode());
-        String errorMessage = response.data().errorMessage();
+        Errors error = Errors.forCode(errorCodeForResponse(response));

Review Comment:
   Agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to