lucasbru commented on code in PR #21737:
URL: https://github.com/apache/kafka/pull/21737#discussion_r2940801272
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -645,6 +660,7 @@ public GroupCoordinatorConfig(AbstractConfig config) {
String.format("%s must be less than or equal to %s",
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
Review Comment:
nit: why the blank
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -608,6 +622,7 @@ public GroupCoordinatorConfig(AbstractConfig config) {
require(consumerGroupAssignmentIntervalMs() <=
consumerGroupMaxAssignmentIntervalMs,
String.format("%s must be less than or equal to %s",
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
Review Comment:
nit: why the blank
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -528,6 +546,33 @@ private static void clampToMax(
}
}
+ /**
+ * Clamp a config value to at most min. A WARN log is emitted on
adjustment.
+ * No-op when the key is absent from props.
+ *
+ * @param props The properties to modify in place.
+ * @param groupId The group id.
+ * @param key The config key.
+ * @param min The minimum allowed value (inclusive).
+ */
+ private static void clampToMin(
+ Properties props,
+ String groupId,
+ String key,
+ int min
+ ) {
+ Object rawValue = props.get(key);
+ if (rawValue == null) return;
+
+ int value = Integer.parseInt(rawValue.toString());
+ if (value < min) {
+ log.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
Review Comment:
exceeds the minimum?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2139,7 +2140,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
- .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
+
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId)); // not
sure if we can send this each time?
Review Comment:
should be fine, in line with heartbeat interval ms
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]