cmccabe commented on code in PR #16672:
URL: https://github.com/apache/kafka/pull/16672#discussion_r1693689335


##########
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##########
@@ -30,442 +33,436 @@
 import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.server.common.TopicIdPartition;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
-public class AssignmentsManager {
+import static 
org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
 
-    private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+public final class AssignmentsManager {
+    static final ExponentialBackoff STANDARD_BACKOFF = new ExponentialBackoff(
+            TimeUnit.MILLISECONDS.toNanos(100),
+            2,
+            TimeUnit.SECONDS.toNanos(10),
+            0.02);
 
     /**
-     * Assignments are dispatched to the controller this long after
-     * being submitted to {@link AssignmentsManager}, if there
-     * is no request in flight already.
-     * The interval is reset when a new assignment is submitted.
-     * If {@link AssignReplicasToDirsRequest#MAX_ASSIGNMENTS_PER_REQUEST}
-     * is reached, we ignore this interval and dispatch immediately.
+     * The minimum amount of time we will wait before logging individual 
assignment failures.
      */
-    private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.MILLISECONDS.toNanos(500);
+    static final long MIN_NOISY_FAILURE_INTERVAL_NS = 
TimeUnit.MINUTES.toNanos(2);
 
-    private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+    /**
+     * The metric reflecting the number of pending assignments.
+     */
+    static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
+            metricName("QueuedReplicaToDirAssignments");
+
+    /**
+     * The event at which we send assignments, if appropriate.
+     */
+    static final String MAYBE_SEND_ASSIGNMENTS_EVENT = 
"MaybeSendAssignmentsEvent";
+
+    /**
+     * The log4j object.
+     */
+    private final Logger log;
 
-    // visible for testing.
-    static final String QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME = 
"QueuedReplicaToDirAssignments";
+    /**
+     * The exponential backoff strategy to use.
+     */
+    private final ExponentialBackoff backoff;
 
+    /**
+     * The clock object to use.
+     */
     private final Time time;
+
+    /**
+     * Used to send messages to the controller.
+     */
     private final NodeToControllerChannelManager channelManager;
-    private final int brokerId;
-    private final Supplier<Long> brokerEpochSupplier;
-    private final KafkaEventQueue eventQueue;
+
+    /**
+     * The node ID.
+     */
+    private final int nodeId;
+
+    /**
+     * Supplies the latest MetadataImage.
+     */
+    private final Supplier<MetadataImage> metadataImageSupplier;
+
+    /**
+     * Maps partitions to assignments that are ready to send.
+     */
+    private final ConcurrentHashMap<TopicIdPartition, Assignment> ready;
+
+    /**
+     * Maps partitions to assignments that are in-flight. Older entries come 
first.
+     */
+    private volatile Map<TopicIdPartition, Assignment> inflight;
+
+    /**
+     * The registry to register our metrics with.
+     */
+    private final MetricsRegistry metricsRegistry;
+
+    /**
+     * Contains the metrics for this class.
+     */
     private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
 
-    // These variables should only be mutated from the KafkaEventQueue thread,
-    // but `inflight` and `pending` are also read from a Yammer metrics gauge.
-    private volatile Map<TopicIdPartition, AssignmentEvent> inflight = null;
-    private volatile Map<TopicIdPartition, AssignmentEvent> pending = new 
HashMap<>();
-    private final ExponentialBackoff resendExponentialBackoff =
-            new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
-    private final Function<Uuid, Optional<String>> dirIdToPath;
-    private final Function<Uuid, Optional<String>> topicIdToName;
-    private int failedAttempts = 0;
-
-    public AssignmentsManager(Time time,
-                              NodeToControllerChannelManager channelManager,
-                              int brokerId,
-                              Supplier<Long> brokerEpochSupplier,
-                              Function<Uuid, Optional<String>> dirIdToPath,
-                              Function<Uuid, Optional<String>> topicIdToName) {
+    /**
+     * The number of global failures we had previously (cleared after any 
success).
+     */
+    private int previousGlobalFailures;
+
+    /**
+     * The event queue.
+     */
+    private final KafkaEventQueue eventQueue;
+
+    static MetricName metricName(String name) {
+        return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", 
"AssignmentsManager", name);
+    }
+
+    public AssignmentsManager(
+        Time time,
+        NodeToControllerChannelManager channelManager,
+        int nodeId,
+        Supplier<MetadataImage> metadataImageSupplier
+    ) {
+        this(STANDARD_BACKOFF,
+            time,
+            channelManager,
+            nodeId,
+            metadataImageSupplier,
+            KafkaYammerMetrics.defaultRegistry());
+    }
+
+    AssignmentsManager(
+        ExponentialBackoff backoff,
+        Time time,
+        NodeToControllerChannelManager channelManager,
+        int nodeId,
+        Supplier<MetadataImage> metadataImageSupplier,
+        MetricsRegistry metricsRegistry
+    ) {
+        this.log = new LogContext("[AssignmentsManager id=" + nodeId + "] ").
+            logger(AssignmentsManager.class);
+        this.backoff = backoff;
         this.time = time;
         this.channelManager = channelManager;
-        this.brokerId = brokerId;
-        this.brokerEpochSupplier = brokerEpochSupplier;
+        this.nodeId = nodeId;
+        this.metadataImageSupplier = metadataImageSupplier;
+        this.ready = new ConcurrentHashMap<>();
+        this.inflight = Collections.emptyMap();
+        this.metricsRegistry = metricsRegistry;
+        
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new 
Gauge<Integer>() {
+                @Override
+                public Integer value() {
+                    return numPending();
+                }
+            });
+        this.previousGlobalFailures = 0;
         this.eventQueue = new KafkaEventQueue(time,
-                new LogContext("[AssignmentsManager id=" + brokerId + "]"),
-                "broker-" + brokerId + "-directory-assignments-manager-",
-                new ShutdownEvent());
+            new LogContext("[AssignmentsManager id=" + nodeId + "]"),
+            "broker-" + nodeId + "-directory-assignments-manager-",
+            new ShutdownEvent());
         channelManager.start();
-        
this.metricsGroup.newGauge(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME, () -> 
getMapSize(inflight) + getMapSize(pending));
-        if (dirIdToPath == null) dirIdToPath = id -> Optional.empty();
-        this.dirIdToPath = dirIdToPath;
-        if (topicIdToName == null) topicIdToName = id -> Optional.empty();
-        this.topicIdToName = topicIdToName;
     }
 
-    public void close() throws InterruptedException {
-        try {
-            eventQueue.close();
-        } finally {
-            
metricsGroup.removeMetric(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME);
-        }
+    public int numPending() {
+        return ready.size() + inflight.size();
     }
 
-    public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, 
String reason) {
-        onAssignment(topicPartition, dirId, reason, null);
+    public void close() throws InterruptedException {
+        eventQueue.close();
     }
 
-    public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, 
String reason, Runnable callback) {
-        if (callback == null) {
-            callback = () -> { };
-        }
-        AssignmentEvent assignment = new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId, reason, callback);
-        if (log.isDebugEnabled()) {
-            log.debug("Queued assignment {}", assignment);
+    public void onAssignment(
+        TopicIdPartition topicIdPartition,
+        Uuid directoryId,
+        String reason,
+        Runnable successCallback
+    ) {
+        long nowNs = time.nanoseconds();
+        Assignment assignment = new Assignment(
+                topicIdPartition, directoryId, nowNs, successCallback);
+        ready.put(topicIdPartition, assignment);
+        if (log.isTraceEnabled()) {
+            log.trace("Registered assignment {}: {}", assignment, reason);
         }
-        eventQueue.append(assignment);
+        rescheduleMaybeSendAssignmentsEvent(nowNs);
     }
 
-    // only for testing
-    void wakeup() {
-        eventQueue.wakeup();
+    void rescheduleMaybeSendAssignmentsEvent(long nowNs) {
+        eventQueue.scheduleDeferred(MAYBE_SEND_ASSIGNMENTS_EVENT,
+            new AssignmentsManagerDeadlineFunction(backoff,
+                nowNs, previousGlobalFailures, !inflight.isEmpty(), 
ready.size()),
+            new MaybeSendAssignmentsEvent());
     }
 
     /**
-     * Base class for all the events handled by {@link AssignmentsManager}.
+     * Handles shutdown.
      */
-    private abstract static class Event implements EventQueue.Event {
-        /**
-         * Override the default behavior in
-         * {@link EventQueue.Event#handleException}
-         * which swallows the exception.
-         */
+    private class ShutdownEvent implements EventQueue.Event {
         @Override
-        public void handleException(Throwable e) {
-            log.error("Unexpected error handling {}", this, e);
+        public void run() {
+            log.info("shutting down.");
+            try {
+                channelManager.shutdown();
+            } catch (Exception e) {
+                log.error("Unexpected exception shutting down 
NodeToControllerChannelManager", e);
+            }
+            try {
+                
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
+            } catch (Exception e) {
+                log.error("Unexpected exception removing metrics.", e);
+            }

Review Comment:
   Well the "common functionality" here is just a `try..catch`. I don't know if 
that's worth creating a class hierarchy for.  I guess I don't feel that 
strongly about it, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to