AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406686948
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -181,36 +191,19 @@ public NetworkClientDelegate.PollResult poll(long
currentTimeMs) {
return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs,
Collections.singletonList(request));
}
- private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
- // TODO: extract this logic for building the
ConsumerGroupHeartbeatRequestData to a
- // stateful builder (HeartbeatState), that will keep the last data
sent, and determine
- // the fields that changed and need to be included in the next HB
(ex. check
- // subscriptionState changed from last sent to include assignment).
It should also
- // ensure that all fields are sent on failure.
- ConsumerGroupHeartbeatRequestData data = new
ConsumerGroupHeartbeatRequestData()
- .setGroupId(membershipManager.groupId())
- .setMemberEpoch(membershipManager.memberEpoch())
- .setRebalanceTimeoutMs(rebalanceTimeoutMs);
-
- if (membershipManager.memberId() != null) {
- data.setMemberId(membershipManager.memberId());
- }
-
- membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-
- if (this.subscriptions.hasPatternSubscription()) {
- // TODO: Pass the string to the GC if server side regex is used.
- } else {
- data.setSubscribedTopicNames(new
ArrayList<>(this.subscriptions.subscription()));
- List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
topicPartitions =
-
buildTopicPartitionsList(membershipManager.currentAssignment());
- data.setTopicPartitions(topicPartitions);
- }
-
-
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
+ /**
+ * Returns the delay before the next network request for this request
manager. Used to ensure that
+ * waiting in the application thread does not delay beyond the point that
a result can be returned.
+ */
+ @Override
+ public long timeUntilNextPoll(long currentTimeMs) {
+ boolean heartbeatNow = membershipManager.shouldHeartbeatNow() &&
!heartbeatRequestState.requestInFlight();
+ return heartbeatNow ? 0L :
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+ }
+ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
- new ConsumerGroupHeartbeatRequest.Builder(data),
+ new
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
Review Comment:
That style is used throughout this file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]