cmccabe commented on code in PR #16672:
URL: https://github.com/apache/kafka/pull/16672#discussion_r1693688012


##########
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##########
@@ -30,442 +33,436 @@
 import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.server.common.TopicIdPartition;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
-public class AssignmentsManager {
+import static 
org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
 
-    private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+public final class AssignmentsManager {
+    static final ExponentialBackoff STANDARD_BACKOFF = new ExponentialBackoff(
+            TimeUnit.MILLISECONDS.toNanos(100),
+            2,
+            TimeUnit.SECONDS.toNanos(10),
+            0.02);
 
     /**
-     * Assignments are dispatched to the controller this long after
-     * being submitted to {@link AssignmentsManager}, if there
-     * is no request in flight already.
-     * The interval is reset when a new assignment is submitted.
-     * If {@link AssignReplicasToDirsRequest#MAX_ASSIGNMENTS_PER_REQUEST}
-     * is reached, we ignore this interval and dispatch immediately.
+     * The minimum amount of time we will wait before logging individual 
assignment failures.
      */
-    private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.MILLISECONDS.toNanos(500);
+    static final long MIN_NOISY_FAILURE_INTERVAL_NS = 
TimeUnit.MINUTES.toNanos(2);
 
-    private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+    /**
+     * The metric reflecting the number of pending assignments.
+     */
+    static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
+            metricName("QueuedReplicaToDirAssignments");
+
+    /**
+     * The event at which we send assignments, if appropriate.
+     */
+    static final String MAYBE_SEND_ASSIGNMENTS_EVENT = 
"MaybeSendAssignmentsEvent";
+
+    /**
+     * The log4j object.
+     */
+    private final Logger log;
 
-    // visible for testing.
-    static final String QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME = 
"QueuedReplicaToDirAssignments";
+    /**
+     * The exponential backoff strategy to use.
+     */
+    private final ExponentialBackoff backoff;
 
+    /**
+     * The clock object to use.
+     */
     private final Time time;
+
+    /**
+     * Used to send messages to the controller.
+     */
     private final NodeToControllerChannelManager channelManager;
-    private final int brokerId;
-    private final Supplier<Long> brokerEpochSupplier;
-    private final KafkaEventQueue eventQueue;
+
+    /**
+     * The node ID.
+     */
+    private final int nodeId;
+
+    /**
+     * Supplies the latest MetadataImage.
+     */
+    private final Supplier<MetadataImage> metadataImageSupplier;
+
+    /**
+     * Maps partitions to assignments that are ready to send.
+     */
+    private final ConcurrentHashMap<TopicIdPartition, Assignment> ready;
+
+    /**
+     * Maps partitions to assignments that are in-flight. Older entries come 
first.
+     */
+    private volatile Map<TopicIdPartition, Assignment> inflight;
+
+    /**
+     * The registry to register our metrics with.
+     */
+    private final MetricsRegistry metricsRegistry;
+
+    /**
+     * Contains the metrics for this class.
+     */
     private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
 
-    // These variables should only be mutated from the KafkaEventQueue thread,
-    // but `inflight` and `pending` are also read from a Yammer metrics gauge.
-    private volatile Map<TopicIdPartition, AssignmentEvent> inflight = null;
-    private volatile Map<TopicIdPartition, AssignmentEvent> pending = new 
HashMap<>();
-    private final ExponentialBackoff resendExponentialBackoff =
-            new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
-    private final Function<Uuid, Optional<String>> dirIdToPath;
-    private final Function<Uuid, Optional<String>> topicIdToName;
-    private int failedAttempts = 0;
-
-    public AssignmentsManager(Time time,
-                              NodeToControllerChannelManager channelManager,
-                              int brokerId,
-                              Supplier<Long> brokerEpochSupplier,
-                              Function<Uuid, Optional<String>> dirIdToPath,
-                              Function<Uuid, Optional<String>> topicIdToName) {
+    /**
+     * The number of global failures we had previously (cleared after any 
success).
+     */
+    private int previousGlobalFailures;
+
+    /**
+     * The event queue.
+     */
+    private final KafkaEventQueue eventQueue;
+
+    static MetricName metricName(String name) {
+        return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", 
"AssignmentsManager", name);
+    }
+
+    public AssignmentsManager(
+        Time time,
+        NodeToControllerChannelManager channelManager,
+        int nodeId,
+        Supplier<MetadataImage> metadataImageSupplier
+    ) {
+        this(STANDARD_BACKOFF,
+            time,
+            channelManager,
+            nodeId,
+            metadataImageSupplier,
+            KafkaYammerMetrics.defaultRegistry());
+    }
+
+    AssignmentsManager(
+        ExponentialBackoff backoff,
+        Time time,
+        NodeToControllerChannelManager channelManager,
+        int nodeId,
+        Supplier<MetadataImage> metadataImageSupplier,
+        MetricsRegistry metricsRegistry
+    ) {
+        this.log = new LogContext("[AssignmentsManager id=" + nodeId + "] ").
+            logger(AssignmentsManager.class);
+        this.backoff = backoff;
         this.time = time;
         this.channelManager = channelManager;
-        this.brokerId = brokerId;
-        this.brokerEpochSupplier = brokerEpochSupplier;
+        this.nodeId = nodeId;
+        this.metadataImageSupplier = metadataImageSupplier;
+        this.ready = new ConcurrentHashMap<>();
+        this.inflight = Collections.emptyMap();
+        this.metricsRegistry = metricsRegistry;
+        
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new 
Gauge<Integer>() {
+                @Override
+                public Integer value() {
+                    return numPending();
+                }
+            });
+        this.previousGlobalFailures = 0;
         this.eventQueue = new KafkaEventQueue(time,
-                new LogContext("[AssignmentsManager id=" + brokerId + "]"),
-                "broker-" + brokerId + "-directory-assignments-manager-",
-                new ShutdownEvent());
+            new LogContext("[AssignmentsManager id=" + nodeId + "]"),
+            "broker-" + nodeId + "-directory-assignments-manager-",
+            new ShutdownEvent());
         channelManager.start();
-        
this.metricsGroup.newGauge(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME, () -> 
getMapSize(inflight) + getMapSize(pending));
-        if (dirIdToPath == null) dirIdToPath = id -> Optional.empty();
-        this.dirIdToPath = dirIdToPath;
-        if (topicIdToName == null) topicIdToName = id -> Optional.empty();
-        this.topicIdToName = topicIdToName;
     }
 
-    public void close() throws InterruptedException {
-        try {
-            eventQueue.close();
-        } finally {
-            
metricsGroup.removeMetric(QUEUE_REPLICA_TO_DIR_ASSIGNMENTS_METRIC_NAME);
-        }
+    public int numPending() {
+        return ready.size() + inflight.size();
     }
 
-    public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, 
String reason) {
-        onAssignment(topicPartition, dirId, reason, null);
+    public void close() throws InterruptedException {
+        eventQueue.close();
     }
 
-    public void onAssignment(TopicIdPartition topicPartition, Uuid dirId, 
String reason, Runnable callback) {
-        if (callback == null) {
-            callback = () -> { };
-        }
-        AssignmentEvent assignment = new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId, reason, callback);
-        if (log.isDebugEnabled()) {
-            log.debug("Queued assignment {}", assignment);
+    public void onAssignment(
+        TopicIdPartition topicIdPartition,
+        Uuid directoryId,
+        String reason,
+        Runnable successCallback
+    ) {
+        long nowNs = time.nanoseconds();
+        Assignment assignment = new Assignment(
+                topicIdPartition, directoryId, nowNs, successCallback);
+        ready.put(topicIdPartition, assignment);
+        if (log.isTraceEnabled()) {
+            log.trace("Registered assignment {}: {}", assignment, reason);
         }
-        eventQueue.append(assignment);
+        rescheduleMaybeSendAssignmentsEvent(nowNs);
     }
 
-    // only for testing
-    void wakeup() {
-        eventQueue.wakeup();
+    void rescheduleMaybeSendAssignmentsEvent(long nowNs) {
+        eventQueue.scheduleDeferred(MAYBE_SEND_ASSIGNMENTS_EVENT,
+            new AssignmentsManagerDeadlineFunction(backoff,
+                nowNs, previousGlobalFailures, !inflight.isEmpty(), 
ready.size()),
+            new MaybeSendAssignmentsEvent());
     }
 
     /**
-     * Base class for all the events handled by {@link AssignmentsManager}.
+     * Handles shutdown.
      */
-    private abstract static class Event implements EventQueue.Event {
-        /**
-         * Override the default behavior in
-         * {@link EventQueue.Event#handleException}
-         * which swallows the exception.
-         */
+    private class ShutdownEvent implements EventQueue.Event {
         @Override
-        public void handleException(Throwable e) {
-            log.error("Unexpected error handling {}", this, e);
+        public void run() {
+            log.info("shutting down.");
+            try {
+                channelManager.shutdown();
+            } catch (Exception e) {
+                log.error("Unexpected exception shutting down 
NodeToControllerChannelManager", e);
+            }
+            try {
+                
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
+            } catch (Exception e) {
+                log.error("Unexpected exception removing metrics.", e);
+            }
         }
     }
 
     /**
-     * Handles shutdown of the {@link AssignmentsManager}.
+     * An event that processes the assignments in the ready map.
      */
-    private class ShutdownEvent extends Event {
+    private class MaybeSendAssignmentsEvent implements EventQueue.Event {
         @Override
         public void run() {
-            channelManager.shutdown();
+            try {
+                maybeSendAssignments();
+            } catch (Exception e) {
+                log.error("Unexpected exception in MaybeSendAssignmentsEvent", 
e);
+            }
         }
     }
 
     /**
-     * Handles new generated assignments, to be propagated to the controller.
-     * Assignment events may be handled out of order, so for any two assignment
-     * events for the same topic partition, the one with the oldest timestamp 
is
-     * disregarded.
+     * An event that handles the controller's response to our request.
      */
-    private class AssignmentEvent extends Event {
-        final long timestampNs;
-        final TopicIdPartition partition;
-        final Uuid dirId;
-        final String reason;
-        final List<Runnable> completionHandlers;
-        AssignmentEvent(long timestampNs, TopicIdPartition partition, Uuid 
dirId, String reason, Runnable onComplete) {
-            this.timestampNs = timestampNs;
-            this.partition = Objects.requireNonNull(partition);
-            this.dirId = Objects.requireNonNull(dirId);
-            this.reason = reason;
-            this.completionHandlers = new ArrayList<>();
-            if (onComplete != null) {
-                completionHandlers.add(onComplete);
-            }
-        }
-        void merge(AssignmentEvent other) {
-            if (!partition.equals(other.partition)) {
-                throw new IllegalArgumentException("Cannot merge events for 
different partitions");
-            }
-            completionHandlers.addAll(other.completionHandlers);
-        }
-        void onComplete() {
-            for (Runnable onComplete : completionHandlers) {
-                onComplete.run();
-            }
+    private class HandleResponseEvent implements EventQueue.Event {
+        private final Map<TopicIdPartition, Assignment> sent;
+        private final Optional<ClientResponse> response;
+
+        HandleResponseEvent(
+            Map<TopicIdPartition, Assignment> sent,
+            Optional<ClientResponse> response
+        ) {
+            this.sent = sent;
+            this.response = response;
         }
+
         @Override
         public void run() {
-            log.trace("Received assignment {}", this);
-            AssignmentEvent existing = pending.getOrDefault(partition, null);
-            boolean existingIsInFlight = false;
-            if (existing == null && inflight != null) {
-                existing = inflight.getOrDefault(partition, null);
-                existingIsInFlight = true;
-            }
-            if (existing != null) {
-                if (existing.dirId.equals(dirId)) {
-                    existing.merge(this);
-                    log.debug("Ignoring duplicate assignment {}", this);
-                    return;
-                }
-                if (existing.timestampNs > timestampNs) {
-                    existing.merge(this);
-                    log.debug("Dropping assignment {} because it's older than 
existing {}", this, existing);
-                    return;
-                } else if (!existingIsInFlight) {
-                    this.merge(existing);
-                    log.debug("Dropping existing assignment {} because it's 
older than {}", existing, this);
+            try {
+                handleResponse(sent, response);
+            } catch (Exception e) {
+                log.error("Unexpected exception in HandleResponseEvent", e);
+            } finally {
+                if (!ready.isEmpty()) {
+                    rescheduleMaybeSendAssignmentsEvent(time.nanoseconds());
                 }
             }
-            log.debug("Queueing new assignment {}", this);
-            pending.put(partition, this);
-
-            if (inflight == null || inflight.isEmpty()) {
-                scheduleDispatch();
-            }
         }
-        @Override
-        public String toString() {
-            String partitionString = topicIdToName.apply(partition.topicId())
-                    .map(name -> name + ":" + partition.partitionId())
-                    .orElseGet(() -> "<topic name unknown id: " + 
partition.topicId() + " partition: " + partition.partitionId() + ">");
-            String dirString = dirIdToPath.apply(dirId)
-                    .orElseGet(() -> "<dir path unknown id:" + dirId + ">");
-            return "Assignment{" +
-                    "timestampNs=" + timestampNs +
-                    ", partition=" + partitionString +
-                    ", dir=" + dirString +
-                    ", reason='" + reason + '\'' +
-                    '}';
+    }
+
+    /**
+     * A callback object that handles the controller's response to our request.
+     */
+    private class CompletionHandler implements 
ControllerRequestCompletionHandler {
+        private final Map<TopicIdPartition, Assignment> sent;
+
+        CompletionHandler(Map<TopicIdPartition, Assignment> sent) {
+            this.sent = sent;
         }
+
         @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            AssignmentEvent that = (AssignmentEvent) o;
-            return timestampNs == that.timestampNs
-                    && Objects.equals(partition, that.partition)
-                    && Objects.equals(dirId, that.dirId)
-                    && Objects.equals(reason, that.reason);
+        public void onTimeout() {
+            eventQueue.append(new HandleResponseEvent(sent, Optional.empty()));
         }
+
         @Override
-        public int hashCode() {
-            return Objects.hash(timestampNs, partition, dirId, reason);
+        public void onComplete(ClientResponse response) {
+            eventQueue.append(new HandleResponseEvent(sent, 
Optional.of(response)));
         }
     }
 
-    /**
-     * Gathers pending assignments and pushes them to the controller in a 
{@link AssignReplicasToDirsRequest}.
-     */
-    private class DispatchEvent extends Event {
-        static final String TAG = "dispatch";
-        @Override
-        public void run() {
-            if (inflight != null) {
-                throw new IllegalStateException("Bug. Should not be 
dispatching while there are assignments in flight");
-            }
-            if (pending.isEmpty()) {
-                log.trace("No pending assignments, no-op dispatch");
-                return;
-            }
-            Collection<AssignmentEvent> events = pending.values();
-            pending = new HashMap<>();
-            inflight = new HashMap<>();
-            for (AssignmentEvent event : events) {
-                if (inflight.size() < 
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
-                    inflight.put(event.partition, event);
-                } else {
-                    pending.put(event.partition, event);
-                }
-            }
-            if (!pending.isEmpty()) {
-                log.warn("Too many assignments ({}) to fit in one call, 
sending only {} and queueing the rest",
-                        
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST + pending.size(),
-                        
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST);
+    void maybeSendAssignments() {
+        int inflightSize = inflight.size();
+        if (log.isTraceEnabled()) {
+            log.info("maybeSendAssignments: inflightSize = {}.", inflightSize);

Review Comment:
   No, this was supposed to be TRACE. Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to