This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dff6d03a59 Use SingleThreadExecutor for OrderedExecutor and drainTo() 
tasks into local array (#3546)
dff6d03a59 is described below

commit dff6d03a59b293019a3c1144515e6f931986f82f
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Oct 20 13:24:21 2022 -0700

    Use SingleThreadExecutor for OrderedExecutor and drainTo() tasks into local 
array (#3546)
    
    * Use SingleThreadExecutor for OrderedExecutor and drainTo() tasks into 
local array
    
    * Added metrics to executor
    
    * Fixed checkstyle
    
    * Made the test more resilient
    
    * Made the tests not relying on thread.sleep()
    
    * Fixed testRejectWhenQueueIsFull
    
    * Ignore spotbugs warning
    
    * Fixed annotation formatting
    
    * Removed test assertion that had already been changed
---
 bookkeeper-common/pom.xml                          |   5 +
 .../bookkeeper/common/util/OrderedExecutor.java    | 107 ++++----
 .../bookkeeper/common/util/OrderedScheduler.java   |  10 +-
 .../common/util/SingleThreadExecutor.java          | 296 +++++++++++++++++++++
 .../common/util/TestSingleThreadExecutor.java      | 290 ++++++++++++++++++++
 5 files changed, 652 insertions(+), 56 deletions(-)

diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 62aad8735e..29d77bccde 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -85,6 +85,11 @@
       <artifactId>commons-lang3</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
index 9ee84ba9ed..40c3fb0283 100644
--- 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
@@ -29,19 +29,16 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
 import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -294,20 +291,17 @@ public class OrderedExecutor implements ExecutorService {
         }
     }
 
-    protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory 
factory) {
-        BlockingQueue<Runnable> queue;
-        if (enableBusyWait) {
-            // Use queue with busy-wait polling strategy
-            queue = new BlockingMpscQueue<>(maxTasksInQueue > 0 ? 
maxTasksInQueue : DEFAULT_MAX_ARRAY_QUEUE_SIZE);
+    protected ExecutorService createSingleThreadExecutor(ThreadFactory 
factory) {
+        if (maxTasksInQueue > 0) {
+            return new SingleThreadExecutor(factory, maxTasksInQueue, true);
         } else {
-            // By default, use regular JDK LinkedBlockingQueue
-            queue = new LinkedBlockingQueue<>();
+            return new SingleThreadExecutor(factory);
         }
-        return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, 
factory);
     }
 
-    protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
-        return new BoundedExecutorService(executor, this.maxTasksInQueue);
+    protected ExecutorService getBoundedExecutor(ExecutorService executor) {
+        checkArgument(executor instanceof ThreadPoolExecutor);
+        return new BoundedExecutorService((ThreadPoolExecutor) executor, 
this.maxTasksInQueue);
     }
 
     protected ExecutorService addExecutorDecorators(ExecutorService executor) {
@@ -400,11 +394,14 @@ public class OrderedExecutor implements ExecutorService {
         threads = new ExecutorService[numThreads];
         threadIds = new long[numThreads];
         for (int i = 0; i < numThreads; i++) {
-            ThreadPoolExecutor thread = createSingleThreadExecutor(
+            ExecutorService thread = createSingleThreadExecutor(
                     new ThreadFactoryBuilder().setNameFormat(name + "-" + 
getClass().getSimpleName() + "-" + i + "-%d")
                     .setThreadFactory(threadFactory).build());
 
-            threads[i] = addExecutorDecorators(getBoundedExecutor(thread));
+            if (traceTaskExecution || preserveMdcForTaskExecution) {
+                thread = addExecutorDecorators(thread);
+            }
+            threads[i] = thread;
 
             final int idx = i;
             try {
@@ -434,43 +431,49 @@ public class OrderedExecutor implements ExecutorService {
                 throw new RuntimeException("Couldn't start thread " + i, e);
             }
 
-            // Register gauges
-            statsLogger.scopeLabel("thread", String.valueOf(idx))
-                    .registerGauge(String.format("%s-queue", name), new 
Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-
-                @Override
-                public Number getSample() {
-                    return thread.getQueue().size();
-                }
-            });
-            statsLogger.scopeLabel("thread", String.valueOf(idx))
-                    .registerGauge(String.format("%s-completed-tasks", name), 
new Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-
-                @Override
-                public Number getSample() {
-                    return thread.getCompletedTaskCount();
-                }
-            });
-            statsLogger.scopeLabel("thread", String.valueOf(idx))
-                    .registerGauge(String.format("%s-total-tasks", name), new 
Gauge<Number>() {
-                @Override
-                public Number getDefaultValue() {
-                    return 0;
-                }
-
-                @Override
-                public Number getSample() {
-                    return thread.getTaskCount();
-                }
-            });
+            if (thread instanceof SingleThreadExecutor) {
+                SingleThreadExecutor ste = (SingleThreadExecutor) thread;
+                ste.registerMetrics(statsLogger);
+            } else if (thread instanceof ThreadPoolExecutor) {
+                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) 
thread;
+                // Register gauges
+                statsLogger.scopeLabel("thread", String.valueOf(idx))
+                        .registerGauge(String.format("%s-queue", name), new 
Gauge<Number>() {
+                            @Override
+                            public Number getDefaultValue() {
+                                return 0;
+                            }
+
+                            @Override
+                            public Number getSample() {
+                                return threadPoolExecutor.getQueue().size();
+                            }
+                        });
+                statsLogger.scopeLabel("thread", String.valueOf(idx))
+                        .registerGauge(String.format("%s-completed-tasks", 
name), new Gauge<Number>() {
+                            @Override
+                            public Number getDefaultValue() {
+                                return 0;
+                            }
+
+                            @Override
+                            public Number getSample() {
+                                return 
threadPoolExecutor.getCompletedTaskCount();
+                            }
+                        });
+                statsLogger.scopeLabel("thread", String.valueOf(idx))
+                        .registerGauge(String.format("%s-total-tasks", name), 
new Gauge<Number>() {
+                            @Override
+                            public Number getDefaultValue() {
+                                return 0;
+                            }
+
+                            @Override
+                            public Number getSample() {
+                                return threadPoolExecutor.getTaskCount();
+                            }
+                        });
+            }
         }
 
         statsLogger.registerGauge(String.format("%s-threads", name), new 
Gauge<Number>() {
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index cc2e9c8405..377ab202a6 100644
--- 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -118,17 +117,20 @@ public class OrderedScheduler extends OrderedExecutor 
implements ScheduledExecut
     }
 
     @Override
-    protected ScheduledThreadPoolExecutor 
createSingleThreadExecutor(ThreadFactory factory) {
-        return new ScheduledThreadPoolExecutor(1, factory);
+    protected ExecutorService createSingleThreadExecutor(ThreadFactory 
factory) {
+        return new BoundedScheduledExecutorService(new 
ScheduledThreadPoolExecutor(1, factory), this.maxTasksInQueue);
     }
 
     @Override
-    protected ListeningScheduledExecutorService 
getBoundedExecutor(ThreadPoolExecutor executor) {
+    protected ListeningScheduledExecutorService 
getBoundedExecutor(ExecutorService executor) {
         return new 
BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, 
this.maxTasksInQueue);
     }
 
     @Override
     protected ListeningScheduledExecutorService 
addExecutorDecorators(ExecutorService executor) {
+        if (!(executor instanceof ListeningScheduledExecutorService)) {
+            executor = new 
BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, 0);
+        }
         return new 
OrderedSchedulerDecoratedThread((ListeningScheduledExecutorService) executor);
     }
 
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
new file mode 100644
index 0000000000..318aacb8cf
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Implements a single thread executor that drains the queue in batches to 
minimize contention between threads.
+ *
+ * <p>Tasks are executed in a safe manner: if there are exceptions they are 
logged and the executor will
+ * proceed with the next tasks.
+ */
+@Slf4j
+public class SingleThreadExecutor extends AbstractExecutorService implements 
ExecutorService, Runnable {
+    private final BlockingQueue<Runnable> queue;
+    private final Thread runner;
+
+    private final boolean rejectExecution;
+
+    private final LongAdder tasksCount = new LongAdder();
+    private final LongAdder tasksCompleted = new LongAdder();
+    private final LongAdder tasksRejected = new LongAdder();
+    private final LongAdder tasksFailed = new LongAdder();
+
+    enum State {
+        Running,
+        Shutdown,
+        Terminated
+    }
+
+    private volatile State state;
+
+    private final CountDownLatch startLatch;
+
+    public SingleThreadExecutor(ThreadFactory tf) {
+        this(tf, 64 * 1024, false);
+    }
+
+    @SneakyThrows
+    @SuppressFBWarnings(value = {"SC_START_IN_CTOR"})
+    public SingleThreadExecutor(ThreadFactory tf, int maxQueueCapacity, 
boolean rejectExecution) {
+        this.queue = new ArrayBlockingQueue<>(maxQueueCapacity);
+        this.runner = tf.newThread(this);
+        this.state = State.Running;
+        this.rejectExecution = rejectExecution;
+        this.startLatch = new CountDownLatch(1);
+        this.runner.start();
+
+        // Ensure the runner is already fully working by the time the 
constructor is done
+        this.startLatch.await();
+    }
+
+    public void run() {
+        try {
+            boolean isInitialized = false;
+            List<Runnable> localTasks = new ArrayList<>();
+
+            while (state == State.Running) {
+                if (!isInitialized) {
+                    startLatch.countDown();
+                    isInitialized = true;
+                }
+
+                int n = queue.drainTo(localTasks);
+                if (n > 0) {
+                    for (int i = 0; i < n; i++) {
+                        if (!safeRunTask(localTasks.get(i))) {
+                            return;
+                        }
+                    }
+                    localTasks.clear();
+                } else {
+                    if (!safeRunTask(queue.take())) {
+                        return;
+                    }
+                }
+            }
+
+            // Clear the queue in orderly shutdown
+            int n = queue.drainTo(localTasks);
+            for (int i = 0; i < n; i++) {
+                safeRunTask(localTasks.get(i));
+            }
+        } catch (InterruptedException ie) {
+            // Exit loop when interrupted
+            Thread.currentThread().interrupt();
+        } catch (Throwable t) {
+            log.error("Exception in executor: {}", t.getMessage(), t);
+            throw t;
+        } finally {
+            state = State.Terminated;
+        }
+    }
+
+    private boolean safeRunTask(Runnable r) {
+        try {
+            r.run();
+            tasksCompleted.increment();
+        } catch (Throwable t) {
+            if (t instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+                return false;
+            } else {
+                tasksFailed.increment();
+                log.error("Error while running task: {}", t.getMessage(), t);
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public void shutdown() {
+        state = State.Shutdown;
+        if (queue.isEmpty()) {
+            runner.interrupt();
+        }
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        this.state = State.Shutdown;
+        this.runner.interrupt();
+        List<Runnable> remainingTasks = new ArrayList<>();
+        queue.drainTo(remainingTasks);
+        return remainingTasks;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return state != State.Running;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return state == State.Terminated;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+        runner.join(unit.toMillis(timeout));
+        return runner.isAlive();
+    }
+
+    public long getQueuedTasksCount() {
+        return Math.max(0, getSubmittedTasksCount() - 
getCompletedTasksCount());
+    }
+
+    public long getSubmittedTasksCount() {
+        return tasksCount.sum();
+    }
+
+    public long getCompletedTasksCount() {
+        return tasksCompleted.sum();
+    }
+
+    public long getRejectedTasksCount() {
+        return tasksRejected.sum();
+    }
+
+    public long getFailedTasksCount() {
+        return tasksFailed.sum();
+    }
+
+    @Override
+    public void execute(Runnable r) {
+        if (state != State.Running) {
+            throw new RejectedExecutionException("Executor is shutting down");
+        }
+
+        try {
+            if (!rejectExecution) {
+                queue.put(r);
+                tasksCount.increment();
+            } else {
+                if (queue.offer(r)) {
+                    tasksCount.increment();
+                } else {
+                    tasksRejected.increment();
+                    throw new ExecutorRejectedException("Executor queue is 
full");
+                }
+            }
+        } catch (InterruptedException e) {
+            throw new RejectedExecutionException("Executor thread was 
interrupted", e);
+        }
+    }
+
+    public void registerMetrics(StatsLogger statsLogger) {
+        // Register gauges
+        statsLogger.scopeLabel("thread", runner.getName())
+                .registerGauge("thread_executor_queue", new Gauge<Number>() {
+                    @Override
+                    public Number getDefaultValue() {
+                        return 0;
+                    }
+
+                    @Override
+                    public Number getSample() {
+                        return getQueuedTasksCount();
+                    }
+                });
+        statsLogger.scopeLabel("thread", runner.getName())
+                .registerGauge("thread_executor_completed", new 
Gauge<Number>() {
+                    @Override
+                    public Number getDefaultValue() {
+                        return 0;
+                    }
+
+                    @Override
+                    public Number getSample() {
+                        return getCompletedTasksCount();
+                    }
+                });
+        statsLogger.scopeLabel("thread", runner.getName())
+                .registerGauge("thread_executor_tasks_completed", new 
Gauge<Number>() {
+                    @Override
+                    public Number getDefaultValue() {
+                        return 0;
+                    }
+
+                    @Override
+                    public Number getSample() {
+                        return getCompletedTasksCount();
+                    }
+                });
+        statsLogger.scopeLabel("thread", runner.getName())
+                .registerGauge("thread_executor_tasks_rejected", new 
Gauge<Number>() {
+                    @Override
+                    public Number getDefaultValue() {
+                        return 0;
+                    }
+
+                    @Override
+                    public Number getSample() {
+                        return getRejectedTasksCount();
+                    }
+                });
+        statsLogger.scopeLabel("thread", runner.getName())
+                .registerGauge("thread_executor_tasks_failed", new 
Gauge<Number>() {
+                    @Override
+                    public Number getDefaultValue() {
+                        return 0;
+                    }
+
+                    @Override
+                    public Number getSample() {
+                        return getFailedTasksCount();
+                    }
+                });
+    }
+
+    private static class ExecutorRejectedException extends 
RejectedExecutionException {
+
+        private ExecutorRejectedException(String msg) {
+            super(msg);
+        }
+        @Override
+        public Throwable fillInStackTrace() {
+            // Avoid the stack traces to be generated for this exception. This 
is done
+            // because when rejectExecution=true, there could be many such 
exceptions
+            // getting thrown, and filling the stack traces is very expensive
+            return this;
+        }
+    }
+}
diff --git 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
new file mode 100644
index 0000000000..1f03bfb85f
--- /dev/null
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link SingleThreadExecutor}.
+ */
+public class TestSingleThreadExecutor {
+
+    private static final ThreadFactory THREAD_FACTORY = new 
DefaultThreadFactory("test");
+
+    @Test
+    public void testSimple() throws Exception {
+        @Cleanup("shutdown")
+        SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+        AtomicInteger count = new AtomicInteger();
+
+        assertEquals(0, ste.getSubmittedTasksCount());
+        assertEquals(0, ste.getCompletedTasksCount());
+        assertEquals(0, ste.getQueuedTasksCount());
+
+        for (int i = 0; i < 10; i++) {
+            ste.execute(() -> count.incrementAndGet());
+        }
+
+        assertEquals(10, ste.getSubmittedTasksCount());
+
+        ste.submit(() -> {
+        }).get();
+
+        assertEquals(10, count.get());
+        assertEquals(11, ste.getSubmittedTasksCount());
+
+        Awaitility.await().untilAsserted(() -> assertEquals(11, 
ste.getCompletedTasksCount()));
+        assertEquals(0, ste.getRejectedTasksCount());
+        assertEquals(0, ste.getFailedTasksCount());
+        assertEquals(0, ste.getQueuedTasksCount());
+    }
+
+    @Test
+    public void testRejectWhenQueueIsFull() throws Exception {
+        @Cleanup("shutdownNow")
+        SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 
10, true);
+
+        CyclicBarrier barrier = new CyclicBarrier(10 + 1);
+        CountDownLatch startedLatch = new CountDownLatch(1);
+
+        for (int i = 0; i < 10; i++) {
+            ste.execute(() -> {
+                startedLatch.countDown();
+
+                try {
+                    barrier.await();
+                } catch (InterruptedException | BrokenBarrierException e) {
+                    // ignore
+                }
+            });
+
+            // Wait until the first task is already running in the thread
+            startedLatch.await();
+        }
+
+        // Next task should go through, because the runner thread has already 
pulled out 1 item from the
+        // queue: the first tasks which is currently stuck
+        ste.execute(() -> {
+        });
+
+        // Now the queue is really full and should reject tasks
+        try {
+            ste.execute(() -> {
+            });
+            fail("should have rejected the task");
+        } catch (RejectedExecutionException e) {
+            // Expected
+        }
+
+        assertTrue(ste.getSubmittedTasksCount() >= 11);
+        assertTrue(ste.getRejectedTasksCount() >= 1);
+        assertEquals(0, ste.getFailedTasksCount());
+    }
+
+    @Test
+    public void testBlockWhenQueueIsFull() throws Exception {
+        @Cleanup("shutdown")
+        SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 
10, false);
+
+        CyclicBarrier barrier = new CyclicBarrier(10 + 1);
+
+        for (int i = 0; i < 10; i++) {
+            ste.execute(() -> {
+                try {
+                    barrier.await(1, TimeUnit.SECONDS);
+                } catch (TimeoutException te) {
+                    // ignore
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+
+        assertEquals(10, ste.getQueuedTasksCount());
+
+        ste.submit(() -> {
+        }).get();
+
+        assertEquals(11, ste.getSubmittedTasksCount());
+        assertEquals(0, ste.getRejectedTasksCount());
+    }
+
+    @Test
+    public void testShutdown() throws Exception {
+        @Cleanup("shutdown")
+        SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+        assertFalse(ste.isShutdown());
+        assertFalse(ste.isTerminated());
+
+        AtomicInteger count = new AtomicInteger();
+
+        for (int i = 0; i < 3; i++) {
+            ste.execute(() -> {
+                try {
+                    Thread.sleep(1000);
+                    count.incrementAndGet();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+
+        ste.shutdown();
+        assertTrue(ste.isShutdown());
+        assertFalse(ste.isTerminated());
+
+        try {
+            ste.execute(() -> {
+            });
+            fail("should have rejected the task");
+        } catch (RejectedExecutionException e) {
+            // Expected
+        }
+
+        ste.awaitTermination(10, TimeUnit.SECONDS);
+        assertTrue(ste.isShutdown());
+        assertTrue(ste.isTerminated());
+
+        assertEquals(3, count.get());
+    }
+
+    @Test
+    public void testShutdownNow() throws Exception {
+        @Cleanup("shutdown")
+        SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+        assertFalse(ste.isShutdown());
+        assertFalse(ste.isTerminated());
+
+        AtomicInteger count = new AtomicInteger();
+
+        for (int i = 0; i < 3; i++) {
+            ste.execute(() -> {
+                try {
+                    Thread.sleep(2000);
+                    count.incrementAndGet();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            // Ensure the 3 tasks are not picked up in one shot by the runner 
thread
+            Thread.sleep(500);
+        }
+
+        List<Runnable> remainingTasks = ste.shutdownNow();
+        assertEquals(2, remainingTasks.size());
+        assertTrue(ste.isShutdown());
+
+        try {
+            ste.execute(() -> {
+            });
+            fail("should have rejected the task");
+        } catch (RejectedExecutionException e) {
+            // Expected
+        }
+
+        ste.awaitTermination(10, TimeUnit.SECONDS);
+        assertTrue(ste.isShutdown());
+        assertTrue(ste.isTerminated());
+
+        assertEquals(0, count.get());
+    }
+
+    @Test
+    public void testTasksWithException() throws Exception {
+        @Cleanup("shutdown")
+        SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+        AtomicInteger count = new AtomicInteger();
+
+        for (int i = 0; i < 10; i++) {
+            ste.execute(() -> {
+                count.incrementAndGet();
+                throw new RuntimeException("xyz");
+            });
+        }
+
+        ste.submit(() -> {
+        }).get();
+        assertEquals(10, count.get());
+
+        assertEquals(11, ste.getSubmittedTasksCount());
+        Awaitility.await().untilAsserted(() -> assertEquals(1, 
ste.getCompletedTasksCount()));
+        assertEquals(0, ste.getRejectedTasksCount());
+        assertEquals(10, ste.getFailedTasksCount());
+    }
+
+    @Test
+    public void testTasksWithNPE() throws Exception {
+        @Cleanup("shutdown")
+        SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+        AtomicInteger count = new AtomicInteger();
+        String npeTest = null;
+
+        for (int i = 0; i < 10; i++) {
+            ste.execute(() -> {
+                count.incrementAndGet();
+
+                // Trigger the NPE exception
+                System.out.println(npeTest.length());
+            });
+        }
+
+        ste.submit(() -> {
+        }).get();
+        assertEquals(10, count.get());
+
+        assertEquals(11, ste.getSubmittedTasksCount());
+        Awaitility.await().untilAsserted(() -> assertEquals(1, 
ste.getCompletedTasksCount()));
+        assertEquals(0, ste.getRejectedTasksCount());
+        assertEquals(10, ste.getFailedTasksCount());
+    }
+
+    @Test
+    public void testShutdownEmpty() throws Exception {
+        SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+        ste.shutdown();
+        assertTrue(ste.isShutdown());
+
+        ste.awaitTermination(10, TimeUnit.SECONDS);
+        assertTrue(ste.isShutdown());
+        assertTrue(ste.isTerminated());
+    }
+}

Reply via email to