Copilot commented on code in PR #22551:
URL: https://github.com/apache/kafka/pull/22551#discussion_r3411883832
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -297,7 +302,9 @@ public GroupCoordinatorService build() {
groupConfigManager,
persister,
timer,
- partitionMetadataClient
+ partitionMetadataClient,
+ streamsGroupTopologyDescriptionPlugin,
+ time
);
Review Comment:
The topology description plugin is instantiated in `Builder.build()` via
`config.getConfiguredInstance(...)` and the plugin interface extends
`AutoCloseable`, but the service currently doesn't retain/close the plugin on
shutdown (only `runtime`, `metrics`, and `groupConfigManager` are closed). This
can leak plugin resources (threads, network clients, etc.) across broker
shutdown/restart cycles.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoff.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * In-memory per-group back-off that throttles broker re-solicitation of a
topology
+ * description push. An entry is armed when the broker decides to set
+ * {@code TopologyDescriptionRequired=true} on a heartbeat or after a
transient plugin
+ * failure; consecutive arms at the same topology epoch double the window from
+ * {@value #INITIAL_DELAY_MS} ms up to {@value #MAX_DELAY_MS} ms. Successful
pushes,
+ * permanent plugin failures, and topology-epoch advances clear the entry.
+ */
+public class StreamsGroupTopologyDescriptionBackoff {
+
+ static final long INITIAL_DELAY_MS = 30_000L;
+ static final long MAX_DELAY_MS = 3_600_000L;
+
+ private final Time time;
+ private final ConcurrentHashMap<String, Entry> state = new
ConcurrentHashMap<>();
+
+ record Entry(int topologyEpoch, long currentDelayMs, long nextAttemptMs) {
}
+
+ public StreamsGroupTopologyDescriptionBackoff(Time time) {
+ this.time = time;
+ }
+
+ /**
+ * @return true if a back-off window is in effect for the given group at
the given
+ * topology epoch and the broker should suppress soliciting
another push.
+ */
+ public boolean isActive(String groupId, int topologyEpoch) {
+ Entry entry = state.get(groupId);
+ return entry != null
+ && entry.topologyEpoch() == topologyEpoch
+ && time.milliseconds() < entry.nextAttemptMs();
+ }
+
+ /**
+ * Atomic check-and-arm. Returns true if no window was in effect and a new
one was
+ * armed, false if a window was already active and nothing changed. Used
on the
+ * heartbeat path to fold the "check + arm" pair into a single compute so
two
+ * concurrent heartbeats for the same group cannot both arm the back-off.
+ */
+ public boolean armIfNotActive(String groupId, int topologyEpoch) {
+ final long now = time.milliseconds();
+ final boolean[] armed = new boolean[]{false};
+ state.compute(groupId, (key, existing) -> {
+ if (existing != null
+ && existing.topologyEpoch() == topologyEpoch
+ && now < existing.nextAttemptMs()) {
+ return existing;
+ }
+ armed[0] = true;
+ return new Entry(topologyEpoch, INITIAL_DELAY_MS, now +
INITIAL_DELAY_MS);
+ });
+ return armed[0];
+ }
Review Comment:
`armIfNotActive` always re-arms with `INITIAL_DELAY_MS` when the window has
expired, even if the same group/epoch has been re-solicited before. This
prevents the documented exponential backoff behavior (doubling up to
`MAX_DELAY_MS`) from ever taking effect on repeated solicitations at the same
topology epoch, and also makes the comment about preventing concurrent
heartbeats from "both arm and double" inaccurate in practice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]