Repository: bookkeeper
Updated Branches:
  refs/heads/master 5662416d8 -> 6cfecea6c


BOOKKEEPER-862: Add tracing and stats to OrderedSafeExecutor for debugging slow 
tasks (Leigh Stewart via sijie)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/6cfecea6
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/6cfecea6
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/6cfecea6

Branch: refs/heads/master
Commit: 6cfecea6c3e2b6e327fb53ac85f1894df81a10b2
Parents: 5662416
Author: Sijie Guo <si...@apache.org>
Authored: Tue Oct 6 01:05:18 2015 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Oct 6 01:05:18 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../bookkeeper/benchmark/BenchBookie.java       |   5 +-
 .../apache/bookkeeper/client/BookKeeper.java    |   9 +-
 .../bookkeeper/client/LedgerDeleteOp.java       |   5 +
 .../client/LedgerFragmentReplicator.java        |   4 +
 .../apache/bookkeeper/client/LedgerHandle.java  |  38 ++++
 .../apache/bookkeeper/client/LedgerOpenOp.java  |   4 +
 .../bookkeeper/client/ReadOnlyLedgerHandle.java |   5 +
 .../bookkeeper/conf/ClientConfiguration.java    |  46 ++++
 .../apache/bookkeeper/proto/BookieClient.java   |   6 +-
 .../bookkeeper/proto/PacketProcessorBaseV3.java |   4 +
 .../proto/PerChannelBookieClient.java           |  17 ++
 .../bookkeeper/proto/ReadEntryProcessor.java    |   5 +
 .../bookkeeper/proto/WriteEntryProcessor.java   |   6 +
 .../bookkeeper/util/OrderedSafeExecutor.java    | 221 ++++++++++++++++---
 .../proto/TestPerChannelBookieClient.java       |  21 +-
 .../bookkeeper/test/BookieClientTest.java       |   5 +-
 .../apache/bookkeeper/test/LoopbackClient.java  |   5 +-
 .../server/benchmark/BookieBenchmark.java       |   6 +-
 .../server/persistence/ReadAheadCache.java      |   6 +-
 20 files changed, 372 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 36d4372..14a9bed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-802: Bookkeeper protocol documentation (ivank via sijie)
 
+      BOOKKEEPER-862: Add tracing and stats to OrderedSafeExecutor for 
debugging slow tasks (Leigh Stewart via sijie)
+
       bookkeeper-client:
 
         BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie 
via sijie)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 258a3fb..89ffb82 100644
--- 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -142,7 +142,10 @@ public class BenchBookie {
         ClientSocketChannelFactory channelFactory
             = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
                                                 .newCachedThreadPool());
-        OrderedSafeExecutor executor = new OrderedSafeExecutor(1, 
"BenchBookieClientScheduler");
+        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+                .name("BenchBookieClientScheduler")
+                .numThreads(1)
+                .build();
 
         ClientConfiguration conf = new ClientConfiguration();
         BookieClient bc = new BookieClient(conf, channelFactory, executor);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 6fe1371..6bb71fa 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -295,8 +295,13 @@ public class BookKeeper {
         this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
 
         // initialize main worker pool
-        this.mainWorkerPool = new 
OrderedSafeExecutor(conf.getNumWorkerThreads(),
-                "BookKeeperClientWorker");
+        this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
+                .name("BookKeeperClientWorker")
+                .numThreads(conf.getNumWorkerThreads())
+                .statsLogger(statsLogger)
+                .traceTaskExecution(conf.getEnableTaskExecutionStats())
+                
.traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
+                .build();
 
         // initialize bookie client
         this.bookieClient = new BookieClient(conf, this.channelFactory, 
this.mainWorkerPool, statsLogger);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
index 13ce1fd..50fe54a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
@@ -89,4 +89,9 @@ class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> 
{
         }
         cb.deleteComplete(rc, this.ctx);
     }
+
+    @Override
+    public String toString() {
+        return String.format("LedgerDeleteOp(%d)", ledgerId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 2078245..b4c8cc8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -412,6 +412,10 @@ public class LedgerFragmentReplicator {
                                             newBookie);
                                 }
                             }
+                            @Override
+                            public String toString() {
+                                return 
String.format("ReReadMetadataForUpdateEnsemble(%d)", lh.getId());
+                            }
                         });
                 return;
             } else if (rc != BKException.Code.OK) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index f0d79b2..4ed3c03 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -377,6 +377,11 @@ public class LedgerHandle {
                                         }
                                     }
                                 }
+
+                                @Override
+                                public String toString() {
+                                    return 
String.format("ReReadMetadataForClose(%d)", ledgerId);
+                                }
                             });
                         } else if (rc != BKException.Code.OK) {
                             LOG.error("Error update ledger metadata for ledger 
" + ledgerId + " : " + rc);
@@ -385,11 +390,21 @@ public class LedgerHandle {
                             cb.closeComplete(BKException.Code.OK, 
LedgerHandle.this, ctx);
                         }
                     }
+
+                    @Override
+                    public String toString() {
+                        return String.format("WriteLedgerConfigForClose(%d)", 
ledgerId);
+                    }
                 };
 
                 writeLedgerConfig(new CloseCb());
 
             }
+
+            @Override
+            public String toString() {
+                return String.format("CloseLedgerHandle(%d)", ledgerId);
+            }
         });
     }
 
@@ -597,6 +612,10 @@ public class LedgerHandle {
                                                entryId, lastAddConfirmed, 
currentLength, data, offset, length);
                     op.initiate(toSend, length);
                 }
+                @Override
+                public String toString() {
+                    return String.format("AsyncAddEntry(lid=%d, eid=%d)", 
ledgerId, entryId);
+                }
             });
         } catch (RejectedExecutionException e) {
             
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
@@ -938,6 +957,11 @@ public class LedgerHandle {
             // the failed bookie has been replaced
             unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
         }
+
+        @Override
+        public String toString() {
+            return String.format("ChangeEnsemble(%d)", ledgerId);
+        }
     };
 
     /**
@@ -1059,6 +1083,10 @@ public class LedgerHandle {
             return true;
         }
 
+        @Override
+        public String toString() {
+            return String.format("ReReadLedgerMetadata(%d)", ledgerId);
+        }
     };
 
     void unsetSuccessAndSendWriteRequest(final int bookieIndex) {
@@ -1119,6 +1147,11 @@ public class LedgerHandle {
                                 recover(cb);
                             }
                         }
+
+                        @Override
+                        public String toString() {
+                            return 
String.format("ReReadMetadataForRecover(%d)", ledgerId);
+                        }
                     });
                 } else if (rc == BKException.Code.OK) {
                     new LedgerRecoveryOp(LedgerHandle.this, cb).initiate();
@@ -1127,6 +1160,11 @@ public class LedgerHandle {
                     cb.operationComplete(rc, null);
                 }
             }
+
+            @Override
+            public String toString() {
+                return String.format("WriteLedgerConfigForRecover(%d)", 
ledgerId);
+            }
         });
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
index 69a09e9..cc97866 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
@@ -176,6 +176,10 @@ class LedgerOpenOp implements 
GenericCallback<LedgerMetadata> {
                         
openComplete(bk.getReturnRc(BKException.Code.LedgerRecoveryException), null);
                     }
                 }
+                @Override
+                public String toString() {
+                    return String.format("Recover(%d)", ledgerId);
+                }
             });
         } else {
             lh.asyncReadLastConfirmed(new ReadLastConfirmedCallback() {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 8de4092..711f209 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -57,6 +57,11 @@ class ReadOnlyLedgerHandle extends LedgerHandle implements 
LedgerMetadataListene
                 ReadOnlyLedgerHandle.this.metadata = this.m;
             }
         }
+
+        @Override
+        public String toString() {
+            return String.format("MetadataUpdater(%d)", ledgerId);
+        }
     }
 
     ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index ae63c70..dde6d3a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -70,6 +70,10 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     // Ensemble Placement Policy
     protected final static String ENSEMBLE_PLACEMENT_POLICY = 
"ensemblePlacementPolicy";
 
+    // Stats
+    protected final static String ENABLE_TASK_EXECUTION_STATS = 
"enableTaskExecutionStats";
+    protected final static String TASK_EXECUTION_WARN_TIME_MICROS = 
"taskExecutionWarnTimeMicros";
+
     /**
      * Construct a default client-side configuration
      */
@@ -619,4 +623,46 @@ public class ClientConfiguration extends 
AbstractConfiguration {
         setProperty(ENSEMBLE_PLACEMENT_POLICY, policyClass.getName());
         return this;
     }
+
+    /**
+     * Whether to enable recording task execution stats.
+     *
+     * @return flag to enable/disable recording task execution stats.
+     */
+    public boolean getEnableTaskExecutionStats() {
+        return getBoolean(ENABLE_TASK_EXECUTION_STATS, false);
+    }
+
+    /**
+     * Enable/Disable recording task execution stats.
+     *
+     * @param enabled
+     *          flag to enable/disable recording task execution stats.
+     * @return client configuration.
+     */
+    public ClientConfiguration setEnableTaskExecutionStats(boolean enabled) {
+        setProperty(ENABLE_TASK_EXECUTION_STATS, enabled);
+        return this;
+    }
+
+    /**
+     * Get task execution duration which triggers a warning.
+     *
+     * @return time in microseconds which triggers a warning.
+     */
+    public long getTaskExecutionWarnTimeMicros() {
+        return getLong(TASK_EXECUTION_WARN_TIME_MICROS, 
TimeUnit.SECONDS.toMicros(1));
+    }
+
+    /**
+     * Set task execution duration which triggers a warning.
+     *
+     * @param warnTime
+     *          time in microseconds which triggers a warning.
+     * @return client configuration.
+     */
+    public ClientConfiguration setTaskExecutionWarnTimeMicros(long warnTime) {
+        setProperty(TASK_EXECUTION_WARN_TIME_MICROS, warnTime);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index f455c90..87d1865 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -318,8 +318,10 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                         "BookKeeper-NIOBoss-%d").build()),
                 Executors.newCachedThreadPool(tfb.setNameFormat(
                         "BookKeeper-NIOWorker-%d").build()));
-        OrderedSafeExecutor executor = new OrderedSafeExecutor(1,
-                "BookieClientWorker");
+        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+                .name("BookieClientWorker")
+                .numThreads(1)
+                .build();
         BookieClient bc = new BookieClient(new ClientConfiguration(), 
channelFactory, executor);
         BookieSocketAddress addr = new BookieSocketAddress(args[0], 
Integer.parseInt(args[1]));
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 0d7bbee..9ffca53 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -71,4 +71,8 @@ public abstract class PacketProcessorBaseV3 {
         return header.build();
     }
 
+    @Override
+    public String toString() {
+        return request.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 34b0362..6d8058f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -550,6 +550,11 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                 readCompletion.cb.readEntryComplete(rc, 
readCompletion.ledgerId, readCompletion.entryId,
                                                     null, readCompletion.ctx);
             }
+
+            @Override
+            public String toString() {
+                return String.format("ErrorOutReadKey(%s)", key);
+            }
         });
     }
 
@@ -577,6 +582,11 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                                                addr, addCompletion.ctx);
                 LOG.debug("Invoked callback method: {}", 
addCompletion.entryId);
             }
+
+            @Override
+            public String toString() {
+                return String.format("ErrorOutAddKey(%s)", key);
+            }
         });
     }
 
@@ -724,6 +734,13 @@ public class PerChannelBookieClient extends 
SimpleChannelHandler implements Chan
                             break;
                     }
                 }
+
+                @Override
+                public String toString() {
+                    return String.format("HandleResponse(Txn=%d, Type=%s, 
Entry=(%d, %d))",
+                                         header.getTxnId(), 
header.getOperation(),
+                                         completionValue.ledgerId, 
completionValue.entryId);
+                }
             });
         }
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 7c17bb1..43360fa 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -132,4 +132,9 @@ class ReadEntryProcessor extends PacketProcessorBase {
                          requestProcessor.readRequestStats);
         }
     }
+
+    @Override
+    public String toString() {
+        return String.format("ReadEntry(%d, %d)", request.getLedgerId(), 
request.getEntryId());
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index f493d73..b314998 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -100,4 +100,10 @@ class WriteEntryProcessor extends PacketProcessorBase 
implements WriteCallback {
                      ResponseBuilder.buildAddResponse(request),
                      requestProcessor.addRequestStats);
     }
+
+    @Override
+    public String toString() {
+        return String.format("WriteEntry(%d, %d)",
+                             request.getLedgerId(), request.getEntryId());
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
index 597c886..f1d0e9f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
@@ -17,16 +17,25 @@
  */
 package org.apache.bookkeeper.util;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,42 +56,142 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class OrderedSafeExecutor {
-    final ExecutorService threads[];
+    final static long WARN_TIME_MICRO_SEC_DEFAULT = 
TimeUnit.SECONDS.toMicros(1);
+    final String name;
+    final ThreadPoolExecutor threads[];
     final long threadIds[];
+    final BlockingQueue<Runnable> queues[];
     final Random rand = new Random();
+    final OpStatsLogger taskExecutionStats;
+    final OpStatsLogger taskPendingStats;
+    final boolean traceTaskExecution;
+    final long warnTimeMicroSec;
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String name = "OrderedSafeExecutor";
+        private int numThreads = Runtime.getRuntime().availableProcessors();
+        private ThreadFactory threadFactory = null;
+        private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+        private boolean traceTaskExecution = false;
+        private long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
+
+        public Builder name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        public Builder numThreads(int num) {
+            this.numThreads = num;
+            return this;
+        }
+
+        public Builder threadFactory(ThreadFactory threadFactory) {
+            this.threadFactory = threadFactory;
+            return this;
+        }
+
+        public Builder statsLogger(StatsLogger statsLogger) {
+            this.statsLogger = statsLogger;
+            return this;
+        }
+
+        public Builder traceTaskExecution(boolean enabled) {
+            this.traceTaskExecution = enabled;
+            return this;
+        }
+
+        public Builder traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
+            this.warnTimeMicroSec = warnTimeMicroSec;
+            return this;
+        }
+
+        public OrderedSafeExecutor build() {
+            if (null == threadFactory) {
+                threadFactory = Executors.defaultThreadFactory();
+            }
+            return new OrderedSafeExecutor(name, numThreads, threadFactory, 
statsLogger,
+                                           traceTaskExecution, 
warnTimeMicroSec);
+        }
+
+    }
+
+    private class TimedRunnable extends SafeRunnable {
+        final SafeRunnable runnable;
+        final long initNanos;
+
+        TimedRunnable(SafeRunnable runnable) {
+            this.runnable = runnable;
+            this.initNanos = MathUtils.nowInNano();
+         }
+
+        @Override
+        public void safeRun() {
+            taskPendingStats.registerSuccessfulEvent(initNanos, 
TimeUnit.NANOSECONDS);
+            long startNanos = MathUtils.nowInNano();
+            this.runnable.safeRun();
+            long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
+            taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, 
TimeUnit.MICROSECONDS);
+            if (elapsedMicroSec >= warnTimeMicroSec) {
+                logger.warn("Runnable {}:{} took too long {} micros to 
execute.",
+                            new Object[] { runnable, runnable.getClass(), 
elapsedMicroSec });
+            }
+        }
+     }
+
+    @Deprecated
+    public OrderedSafeExecutor(int numThreads, String threadName) {
+        this(threadName, numThreads, Executors.defaultThreadFactory(), 
NullStatsLogger.INSTANCE,
+             false, WARN_TIME_MICRO_SEC_DEFAULT);
+    }
 
     /**
      * Constructs Safe executor
      *
      * @param numThreads
      *            - number of threads
-     * @param threadName
-     *            - name of the thread
+     * @param baseName
+     *            - base name of executor threads
+     * @param threadFactory
+     *            - for constructing threads
+     * @param statsLogger
+     *            - for reporting executor stats
+     * @param traceTaskExecution
+     *            - should we stat task execution
+     * @param warnTimeMicroSec
+     *            - log long task exec warning after this interval
      */
-    public OrderedSafeExecutor(int numThreads, String threadName) {
-        if (numThreads <= 0) {
-            throw new IllegalArgumentException();
-        }
-        if (StringUtils.isBlank(threadName)) {
-            // sets default name
-            threadName = "OrderedSafeExecutor";
-        }
-        threads = new ExecutorService[numThreads];
+    @SuppressWarnings("unchecked")
+    private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory 
threadFactory,
+                                StatsLogger statsLogger, boolean 
traceTaskExecution,
+                                long warnTimeMicroSec) {
+        Preconditions.checkArgument(numThreads > 0);
+        Preconditions.checkArgument(!StringUtils.isBlank(baseName));
+
+        this.warnTimeMicroSec = warnTimeMicroSec;
+        name = baseName;
+        threads = new ThreadPoolExecutor[numThreads];
         threadIds = new long[numThreads];
+        queues = new BlockingQueue[numThreads];
         for (int i = 0; i < numThreads; i++) {
-            StringBuilder thName = new StringBuilder(threadName);
-            thName.append("-");
-            thName.append(i);
-            thName.append("-%d");
-            ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
-                    .setNameFormat(thName.toString());
-            threads[i] = Executors.newSingleThreadExecutor(tfb.build());
-            final int tid = i;
+            queues[i] = new LinkedBlockingQueue<Runnable>();
+            threads[i] =  new ThreadPoolExecutor(1, 1,
+                    0L, TimeUnit.MILLISECONDS, queues[i],
+                    new ThreadFactoryBuilder()
+                        .setNameFormat(name + "-orderedsafeexecutor-" + i + 
"-%d")
+                        .setThreadFactory(threadFactory)
+                        .build());
+
+            // Save thread ids
+            final int idx = i;
             try {
-                threads[i].submit(new SafeRunnable() {
+                threads[idx].submit(new SafeRunnable() {
                     @Override
                     public void safeRun() {
-                        threadIds[tid] = Thread.currentThread().getId();
+                        threadIds[idx] = Thread.currentThread().getId();
                     }
                 }).get();
             } catch (InterruptedException e) {
@@ -90,7 +199,47 @@ public class OrderedSafeExecutor {
             } catch (ExecutionException e) {
                 throw new RuntimeException("Couldn't start thread " + i, e);
             }
+
+            // Register gauges
+            statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), 
new Gauge<Number>() {
+                @Override
+                public Number getDefaultValue() {
+                    return 0;
+                }
+
+                @Override
+                public Number getSample() {
+                    return queues[idx].size();
+                }
+            });
+            statsLogger.registerGauge(String.format("%s-completed-tasks-%d", 
name, idx), new Gauge<Number>() {
+                @Override
+                public Number getDefaultValue() {
+                    return 0;
+                }
+
+                @Override
+                public Number getSample() {
+                    return threads[idx].getCompletedTaskCount();
+                }
+            });
+            statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, 
idx), new Gauge<Number>() {
+                @Override
+                public Number getDefaultValue() {
+                    return 0;
+                }
+
+                @Override
+                public Number getSample() {
+                    return threads[idx].getTaskCount();
+                }
+            });
         }
+
+        // Stats
+        this.taskExecutionStats = 
statsLogger.scope(name).getOpStatsLogger("task_execution");
+        this.taskPendingStats = 
statsLogger.scope(name).getOpStatsLogger("task_queued");
+        this.traceTaskExecution = traceTaskExecution;
     }
 
     ExecutorService chooseThread() {
@@ -113,11 +262,19 @@ public class OrderedSafeExecutor {
 
     }
 
+    private SafeRunnable timedRunnable(SafeRunnable r) {
+        if (traceTaskExecution) {
+            return new TimedRunnable(r);
+        } else {
+            return r;
+        }
+    }
+
     /**
      * schedules a one time action to execute
      */
     public void submit(SafeRunnable r) {
-        chooseThread().submit(r);
+        chooseThread().submit(timedRunnable(r));
     }
 
     /**
@@ -126,7 +283,7 @@ public class OrderedSafeExecutor {
      * @param r
      */
     public void submitOrdered(Object orderingKey, SafeRunnable r) {
-        chooseThread(orderingKey).submit(r);
+        chooseThread(orderingKey).submit(timedRunnable(r));
     }
 
     private long getThreadID(Object orderingKey) {
@@ -184,11 +341,17 @@ public class OrderedSafeExecutor {
             } else {
                 try {
                     executor.submitOrdered(orderingKey, new SafeRunnable() {
-                            @Override
-                            public void safeRun() {
-                                safeOperationComplete(rc, result);
-                            }
-                        });
+                        @Override
+                        public void safeRun() {
+                            safeOperationComplete(rc, result);
+                        }
+                        @Override
+                        public String toString() {
+                            return String.format("Callback(key=%s, name=%s)",
+                                                 orderingKey,
+                                                 
OrderedSafeGenericCallback.this);
+                        }
+                    });
                 } catch (RejectedExecutionException re) {
                     LOG.warn("Failed to submit callback for {} : ", 
orderingKey, re);
                 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 675c2fd..ac6bd8d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -70,8 +70,7 @@ public class TestPerChannelBookieClient extends 
BookKeeperClusterTestCase {
         ClientSocketChannelFactory channelFactory
             = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                                                 
Executors.newCachedThreadPool());
-        OrderedSafeExecutor executor = new OrderedSafeExecutor(1,
-                "BKClientOrderedSafeExecutor");
+        OrderedSafeExecutor executor = getOrderedSafeExecutor();
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 1000; i++) {
@@ -89,6 +88,15 @@ public class TestPerChannelBookieClient extends 
BookKeeperClusterTestCase {
         executor.shutdown();
     }
 
+    public OrderedSafeExecutor getOrderedSafeExecutor() {
+        return OrderedSafeExecutor.newBuilder()
+            .name("PCBC")
+            .numThreads(1)
+            .traceTaskExecution(true)
+            .traceTaskWarnTimeMicroSec(TimeUnit.MILLISECONDS.toMicros(100))
+            .build();
+    }
+
     /**
      * Test race scenario found in {@link 
https://issues.apache.org/jira/browse/BOOKKEEPER-5}
      * where multiple clients try to connect a channel simultaneously. If not 
synchronised
@@ -106,8 +114,7 @@ public class TestPerChannelBookieClient extends 
BookKeeperClusterTestCase {
         ClientSocketChannelFactory channelFactory
             = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                                                 
Executors.newCachedThreadPool());
-        OrderedSafeExecutor executor = new OrderedSafeExecutor(1,
-                "BKClientOrderedSafeExecutor");
+        OrderedSafeExecutor executor = getOrderedSafeExecutor();
 
         BookieSocketAddress addr = getBookie(0);
         for (int i = 0; i < 100; i++) {
@@ -140,8 +147,7 @@ public class TestPerChannelBookieClient extends 
BookKeeperClusterTestCase {
         ClientSocketChannelFactory channelFactory
             = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                                                 
Executors.newCachedThreadPool());
-        OrderedSafeExecutor executor = new OrderedSafeExecutor(1,
-                "BKClientOrderedSafeExecutor");
+        OrderedSafeExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
         final PerChannelBookieClient client = new 
PerChannelBookieClient(executor, channelFactory, addr);
@@ -238,8 +244,7 @@ public class TestPerChannelBookieClient extends 
BookKeeperClusterTestCase {
         ClientSocketChannelFactory channelFactory
             = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                                                 
Executors.newCachedThreadPool());
-        final OrderedSafeExecutor executor = new OrderedSafeExecutor(1,
-                "BKClientOrderedSafeExecutor");
+        final OrderedSafeExecutor executor = getOrderedSafeExecutor();
         BookieSocketAddress addr = getBookie(0);
 
         final PerChannelBookieClient client = new 
PerChannelBookieClient(executor, channelFactory, addr);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 285cf71..a170cee 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -73,7 +73,10 @@ public class BookieClientTest {
         bs.start();
         channelFactory = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
                 .newCachedThreadPool());
-        executor = new OrderedSafeExecutor(2, "BKClientOrderedSafeExecutor");
+        executor = OrderedSafeExecutor.newBuilder()
+                .name("BKClientOrderedSafeExecutor")
+                .numThreads(2)
+                .build();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
index 778a804..3a36129 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java
@@ -96,7 +96,10 @@ class LoopbackClient implements WriteCallback {
         LoopbackClient lb;
         ClientSocketChannelFactory channelFactory = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
                 .newCachedThreadPool());
-        OrderedSafeExecutor executor = new OrderedSafeExecutor(2, 
"BookieClientScheduler");
+        OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+                .name("BookieClientScheduler")
+                .numThreads(2)
+                .build();
         try {
             BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", 
Integer.valueOf(args[2]).intValue());
             lb = new LoopbackClient(channelFactory, executor, begin, 
limit.intValue());

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
index 7985d39..d58883d 100644
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
+++ 
b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
@@ -41,8 +41,10 @@ public class BookieBenchmark extends AbstractBenchmark {
     BookieClient bkc;
     BookieSocketAddress addr;
     ClientSocketChannelFactory channelFactory;
-    OrderedSafeExecutor executor = new OrderedSafeExecutor(1, 
"BookieBenchmarkScheduler");
-
+    OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
+            .name("BookieBenchmarkScheduler")
+            .numThreads(1)
+            .build();
 
     public BookieBenchmark(String bookieHostPort)  throws Exception {
         channelFactory = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(), 
Executors.newCachedThreadPool());

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/6cfecea6/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
----------------------------------------------------------------------
diff --git 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
index 2235282..48be3e8 100644
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
+++ 
b/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
@@ -154,8 +154,10 @@ public class ReadAheadCache implements PersistenceManager, 
HedwigJMXService {
         this.realPersistenceManager = realPersistenceManager;
         this.cfg = cfg;
         numCacheWorkers = cfg.getNumReadAheadCacheThreads();
-        cacheWorkers = new OrderedSafeExecutor(numCacheWorkers,
-                "ReadAheadCacheScheduler");
+        cacheWorkers = OrderedSafeExecutor.newBuilder()
+                .name("ReadAheadCacheScheduler")
+                .numThreads(numCacheWorkers)
+                .build();
         reloadConf(cfg);
     }
 

Reply via email to