kirktrue commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406656043
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -159,11 +159,19 @@ public NetworkClientDelegate.PollResult poll(final long
currentTimeMs) {
return EMPTY;
List<NetworkClientDelegate.UnsentRequest> requests =
pendingRequests.drain(currentTimeMs);
+ return new
NetworkClientDelegate.PollResult(timeUntilNextPoll(currentTimeMs), requests);
+ }
+
+ /**
+ * Returns the delay before the next network request for this request
manager. Used to ensure that
+ * waiting in the application thread does not delay beyond the point that
a result can be returned.
+ */
+ @Override
+ public long timeUntilNextPoll(long currentTimeMs) {
// min of the remainingBackoffMs of all the request that are still
backing off
- final long timeUntilNextPoll = Math.min(
- findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
- findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
- return new NetworkClientDelegate.PollResult(timeUntilNextPoll,
requests);
+ return Math.min(
+ findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
+ findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
Review Comment:
Nit: leaving the indentation as is conforms to the existing 'four-spaces per
tab' style.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -181,36 +191,19 @@ public NetworkClientDelegate.PollResult poll(long
currentTimeMs) {
return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs,
Collections.singletonList(request));
}
- private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
- // TODO: extract this logic for building the
ConsumerGroupHeartbeatRequestData to a
- // stateful builder (HeartbeatState), that will keep the last data
sent, and determine
- // the fields that changed and need to be included in the next HB
(ex. check
- // subscriptionState changed from last sent to include assignment).
It should also
- // ensure that all fields are sent on failure.
- ConsumerGroupHeartbeatRequestData data = new
ConsumerGroupHeartbeatRequestData()
- .setGroupId(membershipManager.groupId())
- .setMemberEpoch(membershipManager.memberEpoch())
- .setRebalanceTimeoutMs(rebalanceTimeoutMs);
-
- if (membershipManager.memberId() != null) {
- data.setMemberId(membershipManager.memberId());
- }
-
- membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-
- if (this.subscriptions.hasPatternSubscription()) {
- // TODO: Pass the string to the GC if server side regex is used.
- } else {
- data.setSubscribedTopicNames(new
ArrayList<>(this.subscriptions.subscription()));
- List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
topicPartitions =
-
buildTopicPartitionsList(membershipManager.currentAssignment());
- data.setTopicPartitions(topicPartitions);
- }
-
-
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
+ /**
+ * Returns the delay before the next network request for this request
manager. Used to ensure that
+ * waiting in the application thread does not delay beyond the point that
a result can be returned.
+ */
+ @Override
+ public long timeUntilNextPoll(long currentTimeMs) {
+ boolean heartbeatNow = membershipManager.shouldHeartbeatNow() &&
!heartbeatRequestState.requestInFlight();
+ return heartbeatNow ? 0L :
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+ }
+ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
- new ConsumerGroupHeartbeatRequest.Builder(data),
+ new
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
Review Comment:
Nit: we got knocked in other reviews for unnecessarily qualifying with
`this`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long
heartbeatIntervalMs) {
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
}
}
+
+ /**
+ * Builds the heartbeat requests correctly, ensuring that all information
is sent according to
+ * the protocol, but subsequent requests do not send information which has
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+ static class HeartbeatState {
+ private final SubscriptionState subscriptions;
+ private final MembershipManager membershipManager;
+ private final int rebalanceTimeoutMs;
+
+ // Fields of ConsumerHeartbeatRequest sent in the most recent request
+ private String sentInstanceId;
+ private int sentRebalanceTimeoutMs;
+ private TreeSet<String> sentSubscribedTopicNames;
+ // private String sentSubscribedTopicRegex;
+ private String sentServerAssignor;
+ private TreeSet<String> sentTopicPartitions;
+
+ public HeartbeatState(
+ final SubscriptionState subscriptions,
+ final MembershipManager membershipManager,
+ final int rebalanceTimeoutMs) {
+ this.subscriptions = subscriptions;
+ this.membershipManager = membershipManager;
+ this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+ this.sentInstanceId = null;
+ this.sentRebalanceTimeoutMs = -1;
+ this.sentSubscribedTopicNames = null;
+ this.sentServerAssignor = null;
+ this.sentTopicPartitions = null;
+ }
+
+
+ public void reset() {
+ sentInstanceId = null;
+ sentRebalanceTimeoutMs = -1;
+ sentSubscribedTopicNames = null;
+ sentServerAssignor = null;
+ sentTopicPartitions = null;
+ }
+
+ public ConsumerGroupHeartbeatRequestData buildRequestData() {
+ ConsumerGroupHeartbeatRequestData data = new
ConsumerGroupHeartbeatRequestData();
+
+ // GroupId - always sent
+ data.setGroupId(membershipManager.groupId());
+
+ // MemberId - always sent, empty until it has been received from
the coordinator
+ data.setMemberId(membershipManager.memberId());
+
+ // MemberEpoch - always sent
+ data.setMemberEpoch(membershipManager.memberEpoch());
+
+ // InstanceId - sent if hasn't changed since the last heartbeat
+ membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
+ if (!groupInstanceId.equals(sentInstanceId)) {
+ data.setInstanceId(groupInstanceId);
+ sentInstanceId = groupInstanceId;
+ }
+ });
+
+ // RebalanceTimeoutMs - sent if hasn't changed since the last
heartbeat
+ if (sentRebalanceTimeoutMs != rebalanceTimeoutMs) {
+ data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+ sentRebalanceTimeoutMs = rebalanceTimeoutMs;
+ }
+
+ if (!this.subscriptions.hasPatternSubscription()) {
+ // SubscribedTopicNames - sent if hasn't changed since the
last heartbeat
+ TreeSet<String> subscribedTopicNames = new
TreeSet<>(this.subscriptions.subscription());
+ if (!subscribedTopicNames.equals(sentSubscribedTopicNames)) {
+ data.setSubscribedTopicNames(new
ArrayList<>(this.subscriptions.subscription()));
+ sentSubscribedTopicNames = subscribedTopicNames;
+ }
+ } else {
+ // SubscribedTopicRegex - sent if hasn't changed since the
last heartbeat
+ // - not supported yet
+ }
+
+ // ServerAssignor - sent if hasn't changed since the last heartbeat
+ this.membershipManager.serverAssignor().ifPresent(serverAssignor
-> {
+ if (!serverAssignor.equals(sentServerAssignor)) {
+ data.setServerAssignor(serverAssignor);
+ sentServerAssignor = serverAssignor;
+ }
+ });
+
+ // ClientAssignors - not supported yet
+
+ // TopicPartitions - sent if hasn't changed since the last
heartbeat
+ TreeSet<String> assignedPartitions = new
TreeSet<>(membershipManager.currentAssignment().stream()
+ .map(tp -> tp.topicId() + "-" +
tp.partition()).collect(Collectors.toList()));
+ if (!assignedPartitions.equals(sentTopicPartitions)) {
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
topicPartitions =
+
buildTopicPartitionsList(membershipManager.currentAssignment());
+ data.setTopicPartitions(topicPartitions);
+ sentTopicPartitions = assignedPartitions;
+ }
+
+ return data;
+ }
+
+ private List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
buildTopicPartitionsList(Set<TopicIdPartition> topicIdPartitions) {
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions> result =
new ArrayList<>();
+ Map<Uuid, List<Integer>> partitionsPerTopicId = new HashMap<>();
+ for (TopicIdPartition topicIdPartition : topicIdPartitions) {
+ Uuid topicId = topicIdPartition.topicId();
+ if (!partitionsPerTopicId.containsKey(topicId)) {
+ partitionsPerTopicId.put(topicId, new ArrayList<>());
+ }
+
partitionsPerTopicId.get(topicId).add(topicIdPartition.partition());
Review Comment:
```suggestion
partitionsPerTopicId.computeIfAbsent(topicId, __ -> new
ArrayList<>()).add(partitionId);
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long
heartbeatIntervalMs) {
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
}
}
+
+ /**
+ * Builds the heartbeat requests correctly, ensuring that all information
is sent according to
+ * the protocol, but subsequent requests do not send information which has
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+ static class HeartbeatState {
+ private final SubscriptionState subscriptions;
+ private final MembershipManager membershipManager;
+ private final int rebalanceTimeoutMs;
+
+ // Fields of ConsumerHeartbeatRequest sent in the most recent request
+ private String sentInstanceId;
+ private int sentRebalanceTimeoutMs;
+ private TreeSet<String> sentSubscribedTopicNames;
+ // private String sentSubscribedTopicRegex;
+ private String sentServerAssignor;
+ private TreeSet<String> sentTopicPartitions;
Review Comment:
IMO, we should group all of the 'last sent' attributes in a separate little
struct/class, just to keep their relation and their (re-)initializing logic
together.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long
heartbeatIntervalMs) {
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
}
}
+
+ /**
+ * Builds the heartbeat requests correctly, ensuring that all information
is sent according to
+ * the protocol, but subsequent requests do not send information which has
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+ static class HeartbeatState {
Review Comment:
Any reason `HeartbeatState` needs to be `static`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]