cmccabe commented on a change in pull request #10030: URL: https://github.com/apache/kafka/pull/10030#discussion_r569827578
########## File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.queue; + +import org.slf4j.Logger; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public interface EventQueue extends AutoCloseable { + interface Event { + void run() throws Exception; + default void handleException(Throwable e) {} + } + + abstract class FailureLoggingEvent implements Event { + private final Logger log; + + public FailureLoggingEvent(Logger log) { + this.log = log; + } + + @Override + public void handleException(Throwable e) { + if (e instanceof EventQueueClosedException) { + log.info("Not processing {} because the event queue is closed.", + this.toString()); + } else { + log.error("Unexpected error handling {}", this.toString(), e); + } + } + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } + } + + class DeadlineFunction implements Function<Long, Long> { + private final long deadlineNs; + + public DeadlineFunction(long deadlineNs) { + this.deadlineNs = deadlineNs; + } + + @Override + public Long apply(Long t) { + return deadlineNs; + } + } + + class EarliestDeadlineFunction implements Function<Long, Long> { + private final long newDeadlineNs; + + public EarliestDeadlineFunction(long newDeadlineNs) { + this.newDeadlineNs = newDeadlineNs; + } + + @Override + public Long apply(Long prevDeadlineNs) { + if (prevDeadlineNs == null) { + return newDeadlineNs; + } else if (prevDeadlineNs < newDeadlineNs) { + return prevDeadlineNs; + } else { + return newDeadlineNs; + } + } + } + + class VoidEvent implements Event { + public final static VoidEvent INSTANCE = new VoidEvent(); + + @Override + public void run() throws Exception { + } + } + + /** + * Add an element to the front of the queue. + * + * @param event The mandatory event to prepend. + */ + default void prepend(Event event) { + enqueue(EventInsertionType.PREPEND, null, null, event); + } + + /** + * Add an element to the end of the queue. + * + * @param event The event to append. + */ + default void append(Event event) { + enqueue(EventInsertionType.APPEND, null, null, event); + } + + /** + * Enqueue an event to be run in FIFO order. + * + * @param deadlineNs The time in monotonic nanoseconds after which the future + * is completed with a + * @{org.apache.kafka.common.errors.TimeoutException}, + * and the event is cancelled. + * @param event The event to append. + */ + default void appendWithDeadline(long deadlineNs, Event event) { + enqueue(EventInsertionType.APPEND, null, __ -> deadlineNs, event); + } + + /** + * Schedule an event to be run at a specific time. + * + * @param tag If this is non-null, the unique tag to use for this + * event. If an event with this tag already exists, it + * will be cancelled. Review comment: > One way to change the deadline is to specified the previous event that is being rescheduled Having to store the previous event object somewhere would be very annoying. Since the thread triggering deferred events is often not the controller thread itself, it would require complex locking or volatile objects. Also then you run into issues like should the equals function be used, or object identity. It's a lot easier to just pass a string (which is immutable). This also has the nice effect of describing the deferred event. > This API allows the deadline to either increase or decrease. Are both cases used by the Controller? In the controller we are mainly interested in making the deadline closer to "now" (decreasing it) or cancelling the deferred event. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org