Fix compaction and flush exception not captured issue

patch by Jay Zhuang; reviewed by marcuse for CASSANDRA-13833


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e80ede6d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e80ede6d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e80ede6d

Branch: refs/heads/cassandra-3.11
Commit: e80ede6d393460f22ee2b313d4bac7e3fbbfe893
Parents: 4d90573
Author: Jay Zhuang <jay.zhu...@yahoo.com>
Authored: Thu Aug 31 11:07:07 2017 -0700
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Sep 4 15:01:02 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../db/compaction/CompactionExecutorTest.java   | 131 +++++++++++++++++++
 4 files changed, 136 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e80ede6d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e68ddc..03a78fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.11
+ * Fix compaction and flush exception not captured (CASSANDRA-13833)
  * Make BatchlogManagerMBean.forceBatchlogReplay() blocking (CASSANDRA-13809)
  * Uncaught exceptions in Netty pipeline (CASSANDRA-13649)
  * Prevent integer overflow on exabyte filesystems (CASSANDRA-13067) 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e80ede6d/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2e52eb2..7e36e11 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -906,9 +906,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             logFlush();
             Flush flush = new Flush(false);
             ListenableFutureTask<Void> flushTask = 
ListenableFutureTask.create(flush, null);
-            flushExecutor.submit(flushTask);
+            flushExecutor.execute(flushTask);
             ListenableFutureTask<ReplayPosition> task = 
ListenableFutureTask.create(flush.postFlush);
-            postFlushExecutor.submit(task);
+            postFlushExecutor.execute(task);
 
             @SuppressWarnings("unchecked")
             ListenableFuture<ReplayPosition> future = 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e80ede6d/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index d21f1e8..cd50646 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1457,7 +1457,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         return CompactionMetrics.getCompactions().size();
     }
 
-    private static class CompactionExecutor extends 
JMXEnabledThreadPoolExecutor
+    static class CompactionExecutor extends JMXEnabledThreadPoolExecutor
     {
         protected CompactionExecutor(int minThreads, int maxThreads, String 
name, BlockingQueue<Runnable> queue)
         {
@@ -1537,7 +1537,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             try
             {
                 ListenableFutureTask ret = ListenableFutureTask.create(task);
-                submit(ret);
+                execute(ret);
                 return ret;
             }
             catch (RejectedExecutionException ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e80ede6d/test/unit/org/apache/cassandra/db/compaction/CompactionExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionExecutorTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionExecutorTest.java
new file mode 100644
index 0000000..c6feb3f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionExecutorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CompactionExecutorTest
+{
+    static Throwable testTaskThrowable = null;
+    private static class TestTaskExecutor extends 
CompactionManager.CompactionExecutor
+    {
+        @Override
+        public void afterExecute(Runnable r, Throwable t)
+        {
+            if (t == null)
+            {
+                t = DebuggableThreadPoolExecutor.extractThrowable(r);
+            }
+            testTaskThrowable = t;
+        }
+        @Override
+        protected void beforeExecute(Thread t, Runnable r)
+        {
+        }
+    }
+    private CompactionManager.CompactionExecutor executor;
+
+    @Before
+    public void setup()
+    {
+        executor = new TestTaskExecutor();
+    }
+
+    @After
+    public void destroy() throws Exception
+    {
+        executor.shutdown();
+        executor.awaitTermination(1, TimeUnit.MINUTES);
+    }
+
+    @Test
+    public void testFailedRunnable() throws Exception
+    {
+        testTaskThrowable = null;
+
+        Future<?> tt = executor.submitIfRunning(
+            new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    assert false : "testFailedRunnable";
+                }
+            }, "compactionExecutorTest"
+        );
+
+        while (!tt.isDone())
+            Thread.sleep(10);
+        assertNotNull(testTaskThrowable);
+        assertEquals(testTaskThrowable.getMessage(), "testFailedRunnable");
+    }
+
+    @Test
+    public void testFailedCallable() throws Exception
+    {
+        testTaskThrowable = null;
+        Future<?> tt = executor.submitIfRunning(
+            new Callable<Integer>()
+            {
+                @Override
+                public Integer call() throws Exception
+                {
+                    assert false : "testFailedCallable";
+                    return 1;
+                }
+            }
+            , "compactionExecutorTest");
+
+        while (!tt.isDone())
+            Thread.sleep(10);
+        assertNotNull(testTaskThrowable);
+        assertEquals(testTaskThrowable.getMessage(), "testFailedCallable");
+    }
+
+    @Test
+    public void testExceptionRunnable() throws Exception
+    {
+        testTaskThrowable = null;
+        Future<?> tt = executor.submitIfRunning(
+        new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                throw new RuntimeException("testExceptionRunnable");
+            }
+        }
+        , "compactionExecutorTest");
+
+        while (!tt.isDone())
+            Thread.sleep(10);
+        assertNotNull(testTaskThrowable);
+        assertEquals(testTaskThrowable.getMessage(), "testExceptionRunnable");
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to