junrao commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r570453181



##########
File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import org.slf4j.Logger;
+
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+
+public interface EventQueue extends AutoCloseable {
+    interface Event {
+        /**
+         * Run the event.
+         */
+        void run() throws Exception;
+
+        /**
+         * Handle an exception that was either generated by running the event, 
or by the
+         * event queue's inability to run the event.
+         *
+         * @param e     The exception.  This will be a TimeoutException if the 
event hit
+         *              its deadline before it could be scheduled.
+         *              It will be a RejectedExecutionException if the event 
could not be
+         *              scheduled because the event queue has already been 
closed.
+         *              Otherweise, it will be whatever exception was thrown 
by run().
+         */
+        default void handleException(Throwable e) {}
+    }
+
+    abstract class FailureLoggingEvent implements Event {
+        private final Logger log;
+
+        public FailureLoggingEvent(Logger log) {
+            this.log = log;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            if (e instanceof RejectedExecutionException) {
+                log.info("Not processing {} because the event queue is 
closed.",
+                    this.toString());
+            } else {
+                log.error("Unexpected error handling {}", this.toString(), e);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return this.getClass().getSimpleName();
+        }
+    }
+
+    class NoDeadlineFunction implements Function<OptionalLong, OptionalLong> {
+        public static final NoDeadlineFunction INSTANCE = new 
NoDeadlineFunction();
+
+        @Override
+        public OptionalLong apply(OptionalLong ignored) {
+            return OptionalLong.empty();
+        }
+    }
+
+    class DeadlineFunction implements Function<OptionalLong, OptionalLong> {
+        private final long deadlineNs;
+
+        public DeadlineFunction(long deadlineNs) {
+            this.deadlineNs = deadlineNs;
+        }
+
+        @Override
+        public OptionalLong apply(OptionalLong ignored) {
+            return OptionalLong.of(deadlineNs);
+        }
+    }
+
+    class EarliestDeadlineFunction implements Function<OptionalLong, 
OptionalLong> {
+        private final long newDeadlineNs;
+
+        public EarliestDeadlineFunction(long newDeadlineNs) {
+            this.newDeadlineNs = newDeadlineNs;
+        }
+
+        @Override
+        public OptionalLong apply(OptionalLong prevDeadlineNs) {
+            if (!prevDeadlineNs.isPresent()) {
+                return OptionalLong.of(newDeadlineNs);
+            } else if (prevDeadlineNs.getAsLong() < newDeadlineNs) {
+                return prevDeadlineNs;
+            } else {
+                return OptionalLong.of(newDeadlineNs);
+            }
+        }
+    }
+
+    class VoidEvent implements Event {
+        public final static VoidEvent INSTANCE = new VoidEvent();
+
+        @Override
+        public void run() throws Exception {
+        }
+    }
+
+    /**
+     * Add an element to the front of the queue.
+     *
+     * @param event             The mandatory event to prepend.
+     */
+    default void prepend(Event event) {
+        enqueue(EventInsertionType.PREPEND, null, NoDeadlineFunction.INSTANCE, 
event);
+    }
+
+    /**
+     * Add an element to the end of the queue.
+     *
+     * @param event             The event to append.
+     */
+    default void append(Event event) {
+        enqueue(EventInsertionType.APPEND, null, NoDeadlineFunction.INSTANCE, 
event);
+    }
+
+    /**
+     * Add an event to the end of the queue.
+     *
+     * @param deadlineNs        The deadline for starting the event, in 
monotonic
+     *                          nanoseconds.  If the event has not started by 
this
+     *                          deadline, handleException is called with a
+     *                          
@{org.apache.kafka.common.errors.TimeoutException},
+     *                          and the event is cancelled.
+     * @param event             The event to append.
+     */
+    default void appendWithDeadline(long deadlineNs, Event event) {
+        enqueue(EventInsertionType.APPEND, null, new 
DeadlineFunction(deadlineNs), event);
+    }
+
+    /**
+     * Schedule an event to be run at a specific time.
+     *
+     * @param tag                   If this is non-null, the unique tag to use 
for this
+     *                              event.  If an event with this tag already 
exists, it
+     *                              will be cancelled.
+     * @param deadlineNsCalculator  A function which takes as an argument the 
existing
+     *                              deadline for the event with this tag (or 
empty if the
+     *                              event has no tag, or if there is none 
such), and
+     *                              produces the deadline to use for this 
event.
+     *                              Once the deadline has arrived, the event 
will be
+     *                              prepended to the queue.  Events whose 
deadlines are

Review comment:
       "the event will be prepended to the queue" : This is no longer true in 
the implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to