AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406686948


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -181,36 +191,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
         return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
     }
 
-    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
-        // TODO: extract this logic for building the 
ConsumerGroupHeartbeatRequestData to a
-        //  stateful builder (HeartbeatState), that will keep the last data 
sent, and determine
-        //  the fields that changed and need to be included in the next HB 
(ex. check
-        //  subscriptionState changed from last sent to include assignment). 
It should also
-        //  ensure that all fields are sent on failure.
-        ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData()
-            .setGroupId(membershipManager.groupId())
-            .setMemberEpoch(membershipManager.memberEpoch())
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs);
-
-        if (membershipManager.memberId() != null) {
-            data.setMemberId(membershipManager.memberId());
-        }
-
-        membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-
-        if (this.subscriptions.hasPatternSubscription()) {
-            // TODO: Pass the string to the GC if server side regex is used.
-        } else {
-            data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
-            List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
topicPartitions =
-                    
buildTopicPartitionsList(membershipManager.currentAssignment());
-            data.setTopicPartitions(topicPartitions);
-        }
-
-        
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
+    /**
+     * Returns the delay before the next network request for this request 
manager. Used to ensure that
+     * waiting in the application thread does not delay beyond the point that 
a result can be returned.
+     */
+    @Override
+    public long timeUntilNextPoll(long currentTimeMs) {
+        boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
+        return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+    }
 
+    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
         NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new ConsumerGroupHeartbeatRequest.Builder(data),
+            new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),

Review Comment:
   That style is used throughout this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to