lucasbru commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1407452521
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,128 @@ private void updateHeartbeatIntervalMs(final long
heartbeatIntervalMs) {
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
}
}
+
+ /**
+ * Builds the heartbeat requests correctly, ensuring that all information
is sent according to
+ * the protocol, but subsequent requests do not send information which has
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+ static class HeartbeatState {
+ private final SubscriptionState subscriptions;
+ private final MembershipManager membershipManager;
+ private final int rebalanceTimeoutMs;
+ private final SentFields sentFields;
+
+ public HeartbeatState(
+ final SubscriptionState subscriptions,
+ final MembershipManager membershipManager,
+ final int rebalanceTimeoutMs) {
+ this.subscriptions = subscriptions;
+ this.membershipManager = membershipManager;
+ this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+ this.sentFields = new SentFields();
+ }
+
+
+ public void reset() {
+ sentFields.reset();
+ }
+
+ public ConsumerGroupHeartbeatRequestData buildRequestData() {
+ ConsumerGroupHeartbeatRequestData data = new
ConsumerGroupHeartbeatRequestData();
+
+ // GroupId - always sent
+ data.setGroupId(membershipManager.groupId());
+
+ // MemberId - always sent, empty until it has been received from
the coordinator
+ data.setMemberId(membershipManager.memberId());
+
+ // MemberEpoch - always sent
+ data.setMemberEpoch(membershipManager.memberEpoch());
+
+ // InstanceId - sent if hasn't changed since the last heartbeat
+ membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
+ if (!groupInstanceId.equals(sentFields.instanceId)) {
+ data.setInstanceId(groupInstanceId);
+ sentFields.instanceId = groupInstanceId;
+ }
+ });
+
+ // RebalanceTimeoutMs - sent if hasn't changed since the last
heartbeat
+ if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+ data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+ sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
+ }
+
+ if (!this.subscriptions.hasPatternSubscription()) {
+ // SubscribedTopicNames - sent if hasn't changed since the
last heartbeat
+ TreeSet<String> subscribedTopicNames = new
TreeSet<>(this.subscriptions.subscription());
+ if
(!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+ data.setSubscribedTopicNames(new
ArrayList<>(this.subscriptions.subscription()));
+ sentFields.subscribedTopicNames = subscribedTopicNames;
+ }
+ } else {
+ // SubscribedTopicRegex - sent if hasn't changed since the
last heartbeat
+ // - not supported yet
+ }
+
+ // ServerAssignor - sent if hasn't changed since the last heartbeat
+ this.membershipManager.serverAssignor().ifPresent(serverAssignor
-> {
+ if (!serverAssignor.equals(sentFields.serverAssignor)) {
+ data.setServerAssignor(serverAssignor);
+ sentFields.serverAssignor = serverAssignor;
+ }
+ });
+
+ // ClientAssignors - not supported yet
+
+ // TopicPartitions - sent if hasn't changed since the last
heartbeat
+ TreeSet<String> assignedPartitions = new
TreeSet<>(membershipManager.currentAssignment().stream()
+ .map(tp -> tp.topicId() + "-" +
tp.partition()).collect(Collectors.toList()));
Review Comment:
`.collect(Collectors.toCollection(TreeSet::new)` and skip the intermediate
list.
Also, you seem to avoid using `TopicIdPartition.toString` or just
`TopicIdPartition.equals` here - is it to support topic renaming? If so, I
think it's quite subtle and would deserve a little comment.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -128,13 +136,15 @@ public HeartbeatRequestManager(
final CoordinatorRequestManager coordinatorRequestManager,
final SubscriptionState subscriptions,
final MembershipManager membershipManager,
+ final HeartbeatState heartbeatState,
final HeartbeatRequestState heartbeatRequestState,
final BackgroundEventHandler backgroundEventHandler) {
this.logger = logContext.logger(this.getClass());
this.subscriptions = subscriptions;
Review Comment:
Looks like the subscription field is now unused.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,128 @@ private void updateHeartbeatIntervalMs(final long
heartbeatIntervalMs) {
this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
}
}
+
+ /**
+ * Builds the heartbeat requests correctly, ensuring that all information
is sent according to
+ * the protocol, but subsequent requests do not send information which has
not changed. This
+ * is important to ensure that reconciliation completes successfully.
+ */
+ static class HeartbeatState {
+ private final SubscriptionState subscriptions;
+ private final MembershipManager membershipManager;
+ private final int rebalanceTimeoutMs;
+ private final SentFields sentFields;
+
+ public HeartbeatState(
+ final SubscriptionState subscriptions,
+ final MembershipManager membershipManager,
+ final int rebalanceTimeoutMs) {
+ this.subscriptions = subscriptions;
+ this.membershipManager = membershipManager;
+ this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+ this.sentFields = new SentFields();
+ }
+
+
+ public void reset() {
+ sentFields.reset();
+ }
+
+ public ConsumerGroupHeartbeatRequestData buildRequestData() {
+ ConsumerGroupHeartbeatRequestData data = new
ConsumerGroupHeartbeatRequestData();
+
+ // GroupId - always sent
+ data.setGroupId(membershipManager.groupId());
+
+ // MemberId - always sent, empty until it has been received from
the coordinator
+ data.setMemberId(membershipManager.memberId());
+
+ // MemberEpoch - always sent
+ data.setMemberEpoch(membershipManager.memberEpoch());
+
+ // InstanceId - sent if hasn't changed since the last heartbeat
Review Comment:
I think in this comment and the similar ones below, you wanted to say `send
if it _has_ changed`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]