This is an automated email from the ASF dual-hosted git repository.

hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e87bad6  ZOOKEEPER-3683: Discard requests that are delayed longer than 
a confi…
e87bad6 is described below

commit e87bad6774e7269ef21a156aff9dad089ef54794
Author: Jie Huang <jiehu...@fb.com>
AuthorDate: Tue Mar 24 19:38:46 2020 -0700

    ZOOKEEPER-3683: Discard requests that are delayed longer than a confi…
    
    …gured threshold
    
    Author: Jie Huang <jiehu...@fb.com>
    Author: Ivailo Nedelchev <nedelc...@fb.com>
    
    Reviewers: Michael Han <h...@apache.org>, Allan Lyu <fang...@apache.org>, 
Damien Diederen <d...@crosstwine.com>
    
    Closes #1211 from jhuan31/ZOOKEEPER-3683
---
 .../zookeeper-client-c/include/zookeeper.h         |   3 +-
 .../zookeeper-client-c/src/zookeeper.c             |   2 +
 .../src/main/resources/markdown/zookeeperAdmin.md  |   9 +
 .../java/org/apache/zookeeper/KeeperException.java |  18 +-
 .../zookeeper/server/FinalRequestProcessor.java    |  39 ++--
 .../zookeeper/server/PrepRequestProcessor.java     |  18 +-
 .../java/org/apache/zookeeper/server/Request.java  |  18 ++
 .../apache/zookeeper/server/RequestThrottler.java  |  16 +-
 .../org/apache/zookeeper/server/ServerMetrics.java |   7 +
 .../zookeeper/server/SyncRequestProcessor.java     |   7 +-
 .../apache/zookeeper/server/ZooKeeperServer.java   |  12 ++
 .../zookeeper/server/ZooKeeperServerBean.java      |  10 +
 .../zookeeper/server/ZooKeeperServerMXBean.java    |   3 +
 .../zookeeper/server/quorum/CommitProcessor.java   |  17 +-
 .../server/quorum/FollowerRequestProcessor.java    |   3 +
 .../org/apache/zookeeper/server/quorum/Leader.java |   6 +
 .../apache/zookeeper/server/quorum/Learner.java    |   4 +
 .../server/quorum/ObserverRequestProcessor.java    |   4 +
 .../server/quorum/QuorumZooKeeperServer.java       |   4 +
 .../java/org/apache/zookeeper/test/QuorumBase.java |  60 ++++++
 .../apache/zookeeper/test/ThrottledOpHelper.java   | 224 +++++++++++++++++++++
 .../zookeeper/test/ThrottledOpObserverTest.java    |  73 +++++++
 .../zookeeper/test/ThrottledOpQuorumTest.java      |  98 +++++++++
 .../zookeeper/test/ThrottledOpStandaloneTest.java  |  64 ++++++
 24 files changed, 695 insertions(+), 24 deletions(-)

diff --git a/zookeeper-client/zookeeper-client-c/include/zookeeper.h 
b/zookeeper-client/zookeeper-client-c/include/zookeeper.h
index 243fac2..d33a446 100644
--- a/zookeeper-client/zookeeper-client-c/include/zookeeper.h
+++ b/zookeeper-client/zookeeper-client-c/include/zookeeper.h
@@ -140,7 +140,8 @@ enum ZOO_ERRORS {
   ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a 
local session */
   ZNOWATCHER = -121, /*!< The watcher couldn't be found */
   ZRECONFIGDISABLED = -123, /*!< Attempts to perform a reconfiguration 
operation when reconfiguration feature is disabled */
-  ZSESSIONCLOSEDREQUIRESASLAUTH = -124 /*!< The session has been closed by 
server because server requires client to do SASL authentication, but client is 
not configured with SASL authentication or configuted with SASL but failed 
(i.e. wrong credential used.). */
+  ZSESSIONCLOSEDREQUIRESASLAUTH = -124, /*!< The session has been closed by 
server because server requires client to do SASL authentication, but client is 
not configured with SASL authentication or configuted with SASL but failed 
(i.e. wrong credential used.). */
+  ZTHROTTLEDOP = -127 /*!< Operation was throttled and not executed at all. 
please, retry! */
 };
 
 #ifdef __cplusplus
diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c 
b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
index 0c143e7..2b8053e 100644
--- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c
+++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
@@ -4902,6 +4902,8 @@ const char* zerror(int c)
       return "the watcher couldn't be found";
     case ZRECONFIGDISABLED:
       return "attempts to perform a reconfiguration operation when 
reconfiguration feature is disable";
+   case ZTHROTTLEDOP:
+     return "Operation was throttled due to high load";
     }
     if (c > 0) {
       return strerror(c);
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md 
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 8d46046..ca1a3f7 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1133,6 +1133,15 @@ property, when available, is noted below.
     effect due to TLS handshake timeout when there are too many in-flight TLS 
     handshakes. Set it to something like 250 is good enough to avoid herd 
effect.
 
+* *throttledOpWaitTime*
+    (Java system property: **zookeeper.throttled_op_wait_time**)
+    The time in the RequestThrottler queue longer than which a request will be 
marked as throttled.
+    A throttled requests will not be processed other than being fed down the 
pipeline of the server it belongs to
+    to preserve the order of all requests.
+    The FinalProcessor will issue an error response (new error code: 
ZTHROTTLEDOP) for these undigested requests.
+    The intent is for the clients not to retry them immediately.
+    When set to 0, no requests will be throttled. The default is 0.
+
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
index 5cff6f3..c8b33b7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java
@@ -148,6 +148,8 @@ public abstract class KeeperException extends Exception {
             return new SessionClosedRequireAuthException();
         case REQUESTTIMEOUT:
             return new RequestTimeoutException();
+        case THROTTLEDOP:
+            return new ThrottledOpException();
         case OK:
         default:
             throw new IllegalArgumentException("Invalid exception code");
@@ -404,7 +406,11 @@ public abstract class KeeperException extends Exception {
         /** The session has been closed by server because server requires 
client to do SASL authentication,
          *  but client is not configured with SASL authentication or 
configuted with SASL but failed
          *  (i.e. wrong credential used.). */
-        SESSIONCLOSEDREQUIRESASLAUTH(-124);
+        SESSIONCLOSEDREQUIRESASLAUTH(-124),
+        /** Operation was throttled and not executed at all. This error code 
indicates that zookeeper server
+         *  is under heavy load and can't process incoming requests at full 
speed; please retry with back off.
+         */
+        THROTTLEDOP (-127);
 
         private static final Map<Integer, Code> lookup = new HashMap<Integer, 
Code>();
 
@@ -495,6 +501,8 @@ public abstract class KeeperException extends Exception {
             return "Reconfig is disabled";
         case SESSIONCLOSEDREQUIRESASLAUTH:
             return "Session closed because client failed to authenticate";
+        case THROTTLEDOP:
+            return "Op throttled due to high load";
         default:
             return "Unknown error " + code;
         }
@@ -940,4 +948,12 @@ public abstract class KeeperException extends Exception {
 
     }
 
+    /**
+     * @see Code#THROTTLEDOP
+     */
+    public static class ThrottledOpException extends KeeperException {
+        public ThrottledOpException() {
+            super(Code.THROTTLEDOP);
+        }
+    }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index 9ffde55..6ad61fe 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -106,18 +106,7 @@ public class FinalRequestProcessor implements 
RequestProcessor {
         this.requestPathMetricsCollector = 
zks.getRequestPathMetricsCollector();
     }
 
-    public void processRequest(Request request) {
-        LOG.debug("Processing request:: {}", request);
-
-        // request.addRQRec(">final");
-        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
-        if (request.type == OpCode.ping) {
-            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-        }
-        if (LOG.isTraceEnabled()) {
-            ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
-        }
-
+    private ProcessTxnResult applyRequest(Request request) {
         ProcessTxnResult rc = zks.processTxn(request);
 
         // ZOOKEEPER-558:
@@ -131,7 +120,7 @@ public class FinalRequestProcessor implements 
RequestProcessor {
             // we are just playing diffs from the leader.
             if (closeSession(zks.serverCnxnFactory, request.sessionId)
                 || closeSession(zks.secureServerCnxnFactory, 
request.sessionId)) {
-                return;
+                return rc;
             }
         }
 
@@ -150,6 +139,24 @@ public class FinalRequestProcessor implements 
RequestProcessor {
             }
         }
 
+        return rc;
+    }
+
+    public void processRequest(Request request) {
+        LOG.debug("Processing request:: {}", request);
+
+        // request.addRQRec(">final");
+        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
+        if (request.type == OpCode.ping) {
+            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+        }
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
+        }
+        ProcessTxnResult rc = null;
+        if (!request.isThrottled()) {
+          rc = applyRequest(request);
+        }
         if (request.cnxn == null) {
             return;
         }
@@ -195,7 +202,13 @@ public class FinalRequestProcessor implements 
RequestProcessor {
             if (request.isStale()) {
                 ServerMetrics.getMetrics().STALE_REPLIES.add(1);
             }
+
+            if (request.isThrottled()) {
+              throw KeeperException.create(Code.THROTTLEDOP);
+            }
+
             AuditHelper.addAuditLog(request, rc);
+
             switch (request.type) {
             case OpCode.ping: {
                 lastOp = "PING";
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 74720ed..00ed555 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -765,6 +765,21 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements Req
         request.setHdr(null);
         request.setTxn(null);
 
+        if (!request.isThrottled()) {
+          pRequestHelper(request);
+        }
+
+        request.zxid = zks.getZxid();
+        
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - 
request.prepStartTime);
+        nextProcessor.processRequest(request);
+    }
+
+    /**
+     * This method is a helper to pRequest method
+     *
+     * @param request
+     */
+    private void pRequestHelper(Request request) throws 
RequestProcessorException {
         try {
             switch (request.type) {
             case OpCode.createContainer:
@@ -939,9 +954,6 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements Req
                 request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
             }
         }
-        request.zxid = zks.getZxid();
-        
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - 
request.prepStartTime);
-        nextProcessor.processRequest(request);
     }
 
     private static List<ACL> removeDuplicates(final List<ACL> acls) {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index d0fb7da..43a68ac 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -100,6 +100,8 @@ public class Request {
 
     public long syncQueueStartTime;
 
+    public long requestThrottleQueueTime;
+
     private Object owner;
 
     private KeeperException e;
@@ -108,6 +110,22 @@ public class Request {
 
     private TxnDigest txnDigest;
 
+    private boolean isThrottledFlag = false;
+
+    public boolean isThrottled() {
+      return isThrottledFlag;
+    }
+
+    public void setIsThrottled(boolean val) {
+      isThrottledFlag = val;
+    }
+
+    public boolean isThrottlable() {
+        return this.type != OpCode.ping
+                && this.type != OpCode.closeSession
+                && this.type != OpCode.createSession;
+    }
+
     /**
      * If this is a create or close request for a local-only session.
      */
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
index 16a1c6f..e9cdc5e 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,6 +98,13 @@ public class RequestThrottler extends 
ZooKeeperCriticalThread {
      */
     private static volatile boolean dropStaleRequests = 
Boolean.parseBoolean(System.getProperty("zookeeper.request_throttle_drop_stale",
 "true"));
 
+    protected boolean shouldThrottleOp(Request request, long elapsedTime) {
+        return request.isThrottlable()
+                && zks.getThrottledOpWaitTime() > 0
+                && elapsedTime > zks.getThrottledOpWaitTime();
+    }
+
+
     public RequestThrottler(ZooKeeperServer zks) {
         super("RequestThrottler", zks.getZooKeeperServerListener());
         this.zks = zks;
@@ -171,6 +179,12 @@ public class RequestThrottler extends 
ZooKeeperCriticalThread {
                     if (request.isStale()) {
                         ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
                     }
+                    final long elapsedTime = Time.currentElapsedTime() - 
request.requestThrottleQueueTime;
+                    
ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
+                    if (shouldThrottleOp(request, elapsedTime)) {
+                      request.setIsThrottled(true);
+                      ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
+                    }
                     zks.submitRequestNow(request);
                 }
             }
@@ -230,6 +244,7 @@ public class RequestThrottler extends 
ZooKeeperCriticalThread {
             LOG.debug("Shutdown in progress. Request cannot be processed");
             dropRequest(request);
         } else {
+            request.requestThrottleQueueTime = Time.currentElapsedTime();
             submittedRequests.add(request);
         }
     }
@@ -238,7 +253,6 @@ public class RequestThrottler extends 
ZooKeeperCriticalThread {
         return submittedRequests.size();
     }
 
-    @SuppressFBWarnings("DM_EXIT")
     public void shutdown() {
         // Try to shutdown gracefully
         LOG.info("Shutting down");
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index cbdb234..9521420 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -154,6 +154,8 @@ public final class ServerMetrics {
         READS_ISSUED_IN_COMMIT_PROC = 
metricsContext.getSummary("read_commit_proc_issued", DetailLevel.BASIC);
         WRITES_ISSUED_IN_COMMIT_PROC = 
metricsContext.getSummary("write_commit_proc_issued", DetailLevel.BASIC);
 
+        THROTTLED_OPS = metricsContext.getCounter("throttled_ops");
+
         /**
          * Time spent by a read request in the commit processor.
          */
@@ -223,6 +225,7 @@ public final class ServerMetrics {
         STALE_REQUESTS = metricsContext.getCounter("stale_requests");
         STALE_REQUESTS_DROPPED = 
metricsContext.getCounter("stale_requests_dropped");
         STALE_REPLIES = metricsContext.getCounter("stale_replies");
+        REQUEST_THROTTLE_QUEUE_TIME = 
metricsContext.getSummary("request_throttle_queue_time_ms", 
DetailLevel.ADVANCED);
         REQUEST_THROTTLE_WAIT_COUNT = 
metricsContext.getCounter("request_throttle_wait_count");
         LARGE_REQUESTS_REJECTED = 
metricsContext.getCounter("large_requests_rejected");
 
@@ -381,6 +384,9 @@ public final class ServerMetrics {
     public final Summary READS_ISSUED_IN_COMMIT_PROC;
     public final Summary WRITES_ISSUED_IN_COMMIT_PROC;
 
+    // Request op throttling related
+    public final Counter THROTTLED_OPS;
+
     /**
      * Time spent by a read request in the commit processor.
      */
@@ -435,6 +441,7 @@ public final class ServerMetrics {
     public final Counter STALE_REQUESTS;
     public final Counter STALE_REQUESTS_DROPPED;
     public final Counter STALE_REPLIES;
+    public final Summary REQUEST_THROTTLE_QUEUE_TIME;
     public final Counter REQUEST_THROTTLE_WAIT_COUNT;
     public final Counter LARGE_REQUESTS_REJECTED;
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 66e85b1..4df319f 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -178,7 +178,7 @@ public class SyncRequestProcessor extends 
ZooKeeperCriticalThread implements Req
                 
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - 
si.syncQueueStartTime);
 
                 // track the number of records written to the log
-                if (zks.getZKDatabase().append(si)) {
+                if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
                     if (shouldSnapshot()) {
                         resetSnapshotStats();
                         // roll the log
@@ -202,9 +202,8 @@ public class SyncRequestProcessor extends 
ZooKeeperCriticalThread implements Req
                     }
                 } else if (toFlush.isEmpty()) {
                     // optimization for read heavy workloads
-                    // iff this is a read, and there are no pending
-                    // flushes (writes), then just pass this to the next
-                    // processor
+                    // iff this is a read or a throttled request(which doesn't 
need to be written to the disk),
+                    // and there are no pending flushes (writes), then just 
pass this to the next processor
                     if (nextProcessor != null) {
                         nextProcessor.processRequest(si);
                         if (nextProcessor instanceof Flushable) {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 1a2d9a7..8205f74 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -155,6 +155,9 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
 
     public static final int DEFAULT_TICK_TIME = 3000;
     protected int tickTime = DEFAULT_TICK_TIME;
+    public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
+    protected static volatile int throttledOpWaitTime =
+        Integer.getInteger("zookeeper.throttled_op_wait_time", 
DEFAULT_THROTTLED_OP_WAIT_TIME);
     /** value of -1 indicates unset, use default */
     protected int minSessionTimeout = -1;
     /** value of -1 indicates unset, use default */
@@ -1237,6 +1240,15 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
         this.tickTime = tickTime;
     }
 
+    public static int getThrottledOpWaitTime() {
+        return throttledOpWaitTime;
+    }
+
+    public static void setThrottledOpWaitTime(int time) {
+        LOG.info("throttledOpWaitTime set to {}", time);
+        throttledOpWaitTime = time;
+    }
+
     public int getMinSessionTimeout() {
         return minSessionTimeout;
     }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index bd9b643..17dd48b 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -315,6 +315,16 @@ public class ZooKeeperServerBean implements 
ZooKeeperServerMXBean, ZKMBeanInfo {
     // Request throttling settings
     ///////////////////////////////////////////////////////////////////////////
 
+    public int getThrottledOpWaitTime() {
+        return zks.getThrottledOpWaitTime();
+    }
+
+    public void setThrottledOpWaitTime(int val) {
+        zks.setThrottledOpWaitTime(val);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
     public int getRequestThrottleLimit() {
         return RequestThrottler.getMaxRequests();
     }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index 71a9d98..851fc56 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -136,6 +136,9 @@ public interface ZooKeeperServerMXBean {
     boolean getRequestThrottleDropStale();
     void setRequestThrottleDropStale(boolean drop);
 
+    int getThrottledOpWaitTime();
+    void setThrottledOpWaitTime(int val);
+
     boolean getRequestStaleLatencyCheck();
     void setRequestStaleLatencyCheck(boolean check);
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
index 01f9f0d..86dce2b 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -29,12 +29,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.WorkerService;
 import org.apache.zookeeper.server.ZooKeeperCriticalThread;
 import org.apache.zookeeper.server.ZooKeeperServerListener;
+import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -165,6 +167,9 @@ public class CommitProcessor extends 
ZooKeeperCriticalThread implements RequestP
     }
 
     protected boolean needCommit(Request request) {
+        if (request.isThrottled()) {
+          return false;
+        }
         switch (request.type) {
         case OpCode.create:
         case OpCode.create2:
@@ -306,6 +311,11 @@ public class CommitProcessor extends 
ZooKeeperCriticalThread implements RequestP
                         // Process committed head
                         request = committedRequests.peek();
 
+                        if (request.isThrottled()) {
+                            LOG.error("Throttled request in committed pool: 
{}. Exiting.", request);
+                            
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+                        }
+
                         /*
                          * Check if this is a local write request is pending,
                          * if so, update it with the committed info. If the 
commit matches
@@ -349,6 +359,10 @@ public class CommitProcessor extends 
ZooKeeperCriticalThread implements RequestP
                                 topPending.zxid = request.zxid;
                                 topPending.commitRecvTime = 
request.commitRecvTime;
                                 request = topPending;
+                                if (request.isThrottled()) {
+                                    LOG.error("Throttled request in committed 
& pending pool: {}. Exiting.", request);
+                                    
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+                                }
                                 // Only decrement if we take a request off the 
queue.
                                 numWriteQueuedRequests.decrementAndGet();
                                 queuedWriteRequests.poll();
@@ -452,7 +466,8 @@ public class CommitProcessor extends 
ZooKeeperCriticalThread implements RequestP
      */
     private void sendToNextProcessor(Request request) {
         numRequestsProcessing.incrementAndGet();
-        workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
+        CommitWorkRequest workRequest = new CommitWorkRequest(request);
+        workerPool.schedule(workRequest, request.sessionId);
     }
 
     private void processWrite(Request request) throws 
RequestProcessorException {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
index 88144de..db51aee 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
@@ -73,6 +73,9 @@ public class FollowerRequestProcessor extends 
ZooKeeperCriticalThread implements
                 // the request to the leader so that we are ready to receive
                 // the response
                 nextProcessor.processRequest(request);
+                if (request.isThrottled()) {
+                    continue;
+                }
 
                 // We now ship the request to the leader. As with all
                 // other quorum operations, sync also follows this code
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 5371bea..0eb3722 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -56,6 +56,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
@@ -68,6 +69,7 @@ import 
org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1218,6 +1220,10 @@ public class Leader extends LearnerMaster {
      * @return the proposal that is queued to send to all the members
      */
     public Proposal propose(Request request) throws XidRolloverException {
+        if (request.isThrottled()) {
+            LOG.error("Throttled request send as proposal: {}. Exiting.", 
request);
+            
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
         /**
          * Address the rollover issue. All lower 32bits set indicate a new 
leader
          * election. Force a re-election instead. See ZOOKEEPER-1277
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 8a0cac1..da5f113 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -234,6 +234,10 @@ public class Learner {
      * @throws IOException
      */
     void request(Request request) throws IOException {
+        if (request.isThrottled()) {
+            LOG.error("Throttled request sent to leader: {}. Exiting", 
request);
+            
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream oa = new DataOutputStream(baos);
         oa.writeLong(request.sessionId);
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
index 4666203..0075ce4 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
@@ -82,6 +82,10 @@ public class ObserverRequestProcessor extends 
ZooKeeperCriticalThread implements
                 // the response
                 nextProcessor.processRequest(request);
 
+                if (request.isThrottled()) {
+                    continue;
+                }
+
                 // We now ship the request to the leader. As with all
                 // other quorum operations, sync also follows this code
                 // path, but different from others, we need to keep track
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
index c6cd93b..6fdea82 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
@@ -60,6 +60,10 @@ public abstract class QuorumZooKeeperServer extends 
ZooKeeperServer {
     }
 
     public Request checkUpgradeSession(Request request) throws IOException, 
KeeperException {
+        if (request.isThrottled()) {
+            return null;
+        }
+
         // If this is a request for a local session and it is to
         // create an ephemeral node, then upgrade the session and return
         // a new session request for the leader.
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
index b91aa19..638275a 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
@@ -247,6 +247,66 @@ public class QuorumBase extends ClientBase {
         return -1;
     }
 
+    public int getLeaderClientPort() {
+      if (s1.getPeerState() == ServerState.LEADING) {
+        return portClient1;
+      } else if (s2.getPeerState() == ServerState.LEADING) {
+        return portClient2;
+      } else if (s3.getPeerState() == ServerState.LEADING) {
+        return portClient3;
+      } else if (s4.getPeerState() == ServerState.LEADING) {
+        return portClient4;
+      } else if (s5.getPeerState() == ServerState.LEADING) {
+        return portClient5;
+      }
+      return -1;
+    }
+
+    public QuorumPeer getLeaderQuorumPeer() {
+      if (s1.getPeerState() == ServerState.LEADING) {
+        return s1;
+      } else if (s2.getPeerState() == ServerState.LEADING) {
+        return s2;
+      } else if (s3.getPeerState() == ServerState.LEADING) {
+        return s3;
+      } else if (s4.getPeerState() == ServerState.LEADING) {
+        return s4;
+      } else if (s5.getPeerState() == ServerState.LEADING) {
+        return s5;
+      }
+      return null;
+    }
+
+    public QuorumPeer getFirstObserver() {
+      if (s1.getLearnerType() == LearnerType.OBSERVER) {
+        return s1;
+      } else if (s2.getLearnerType() == LearnerType.OBSERVER) {
+        return s2;
+      } else if (s3.getLearnerType() == LearnerType.OBSERVER) {
+        return s3;
+      } else if (s4.getLearnerType() == LearnerType.OBSERVER) {
+        return s4;
+      } else if (s5.getLearnerType() == LearnerType.OBSERVER) {
+        return s5;
+      }
+      return null;
+    }
+
+    public int getFirstObserverClientPort() {
+      if (s1.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient1;
+      } else if (s2.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient2;
+      } else if (s3.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient3;
+      } else if (s4.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient4;
+      } else if (s5.getLearnerType() == LearnerType.OBSERVER) {
+        return portClient5;
+      }
+      return -1;
+    }
+
     public String getPeersMatching(ServerState state) {
         StringBuilder hosts = new StringBuilder();
         for (QuorumPeer p : getPeerList()) {
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpHelper.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpHelper.java
new file mode 100644
index 0000000..1d9e502
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpHelper.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestThrottler;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThrottledOpHelper {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(ThrottledOpHelper.class);
+
+    public static final class RequestThrottleMock extends 
MockUp<RequestThrottler> {
+        public static void throttleEveryNthOp(int n) {
+            everyNthOp = n;
+            opCounter = 0;
+        }
+        private static int everyNthOp = 0;
+        private static int opCounter = 0;
+
+        @Mock
+        private boolean shouldThrottleOp(Request request, long elapsedTime) {
+            if (everyNthOp > 0 && request.isThrottlable() && (++opCounter % 
everyNthOp == 0)) {
+                opCounter %= everyNthOp;
+                return true;
+            }
+            return false;
+        }
+    }
+
+    public static void applyMockUps() {
+        new RequestThrottleMock();
+    }
+
+    public void testThrottledOp(ZooKeeper zk, ZooKeeperServer zs) throws 
IOException, InterruptedException, KeeperException {
+        final int N = 5; // must be greater than 3
+        final int COUNT = 100;
+        RequestThrottleMock.throttleEveryNthOp(N);
+        LOG.info("Before create /ivailo nodes");
+        int opCount = 0;
+        for (int i = 0; i < COUNT; i++) {
+            String nodeName = "/ivailo" + i;
+            if (opCount % N == N - 1) {
+                try {
+                    zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                        (i % 2 == 0) ? CreateMode.PERSISTENT : 
CreateMode.EPHEMERAL);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome
+                    Stat stat = zk.exists(nodeName, null);
+                    Assert.assertNull(stat);
+                    zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                        (i % 2 == 0) ? CreateMode.PERSISTENT : 
CreateMode.EPHEMERAL);
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+                opCount += 3; // three ops issued
+            } else {
+                zk.create(nodeName, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    (i % 2 == 0) ? CreateMode.PERSISTENT : 
CreateMode.EPHEMERAL);
+                opCount++; // one op issued
+            }
+            if (opCount % N == N - 1) {
+                try {
+                    zk.setData(nodeName, nodeName.getBytes(), -1);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    zk.setData(nodeName, nodeName.getBytes(), -1);
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+                opCount += 2; // two ops issued, one for retry
+            } else {
+                zk.setData(nodeName, nodeName.getBytes(), -1);
+                opCount++; // one op issued
+            }
+        }
+        LOG.info("Before delete /ivailo nodes");
+        for (int i = 0; i < COUNT; i++) {
+            String nodeName = "/ivailo" + i;
+            if (opCount % N == N - 1) {
+                try {
+                    zk.exists(nodeName, null);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    Stat stat = zk.exists(nodeName, null);
+                    Assert.assertNotNull(stat);
+                    opCount += 2; // two ops issued, one is retry
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+            } else {
+                Stat stat = zk.exists(nodeName, null);
+                Assert.assertNotNull(stat);
+                opCount++;
+            }
+            if (opCount % N == N - 1) {
+                try {
+                    zk.getData(nodeName, null, null);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    byte[] data = zk.getData(nodeName, null, null);
+                    Assert.assertEquals(nodeName, new String(data));
+                    opCount += 2; // two ops issued, one is retry
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+            } else {
+                byte[] data = zk.getData(nodeName, null, null);
+                Assert.assertEquals(nodeName, new String(data));
+                opCount++;
+            }
+            if (opCount % N == N - 1) {
+                try {
+                    // version 0 should not trigger BadVersion exception
+                    zk.delete(nodeName, 0);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    zk.delete(nodeName, -1);
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+                opCount += 2; // two ops issues, one for retry
+            } else {
+                zk.delete(nodeName, -1);
+                opCount++; // one op only issued
+            }
+            if (opCount % N == N - 1) {
+                try {
+                    zk.exists(nodeName, null);
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                } catch (KeeperException.ThrottledOpException e) {
+                    // anticipated outcome & retry
+                    Stat stat = zk.exists(nodeName, null);
+                    Assert.assertNull(stat);
+                    opCount += 2; // two ops issued, one is retry
+                } catch (KeeperException e) {
+                    Assert.fail("Should have gotten ThrottledOp exception");
+                }
+            } else {
+                Stat stat = zk.exists(nodeName, null);
+                Assert.assertNull(stat);
+                opCount++;
+            }
+        }
+        LOG.info("After delete /ivailo");
+        zk.close();
+    }
+
+    public void testThrottledAcl(ZooKeeper zk, ZooKeeperServer zs) throws 
Exception {
+        RequestThrottleMock.throttleEveryNthOp(0);
+
+        final ArrayList<ACL> ACL_PERMS =
+          new ArrayList<ACL>() { {
+            add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+            add(new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+            add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
+        }};
+        String path = "/path1";
+        zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.addAuthInfo("digest", "pat:test".getBytes());
+        List<ACL> defaultAcls = zk.getACL(path, null);
+        Assert.assertEquals(1, defaultAcls.size());
+
+        RequestThrottleMock.throttleEveryNthOp(2);
+
+        path = "/path2";
+        zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT);
+        try {
+            zk.setACL(path, ACL_PERMS, -1);
+            Assert.fail("Should have gotten ThrottledOp exception");
+        } catch (KeeperException.ThrottledOpException e) {
+            // expected
+        } catch (KeeperException e) {
+            Assert.fail("Should have gotten ThrottledOp exception");
+        }
+        List<ACL> acls = zk.getACL(path, null);
+        Assert.assertEquals(1, acls.size());
+
+        RequestThrottleMock.throttleEveryNthOp(0);
+
+        path = "/path3";
+        zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT);
+        zk.setACL(path, ACL_PERMS, -1);
+        acls = zk.getACL(path, null);
+        Assert.assertEquals(3, acls.size());
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
new file mode 100644
index 0000000..dfd1964
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpObserverTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpObserverTest extends QuorumBase {
+    @BeforeClass
+    public static void applyMockUps() {
+        ThrottledOpHelper.applyMockUps();
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp(true /* withObservers */);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Test
+    public void testThrottledOpObserver() throws IOException, 
InterruptedException, KeeperException {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient("localhost:" + getFirstObserverClientPort());
+            ZooKeeperServer zs = getFirstObserver().getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledOp(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledAclObserver() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient("localhost:" + getFirstObserverClientPort());
+            ZooKeeperServer zs = getFirstObserver().getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledAcl(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpQuorumTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpQuorumTest.java
new file mode 100644
index 0000000..2761365
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpQuorumTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpQuorumTest extends QuorumBase {
+    @BeforeClass
+    public static void applyMockUps() {
+        ThrottledOpHelper.applyMockUps();
+    }
+
+    @Test
+    public void testThrottledOpLeader() throws IOException, 
InterruptedException, KeeperException {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient("localhost:" + getLeaderClientPort());
+            ZooKeeperServer zs = getLeaderQuorumPeer().getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledOp(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledAclLeader() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient("localhost:" + getLeaderClientPort());
+            ZooKeeperServer zs = getLeaderQuorumPeer().getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledAcl(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledOpFollower() throws IOException, 
InterruptedException, KeeperException {
+        ZooKeeper zk = null;
+        try {
+            int clientPort = (getLeaderClientPort() == portClient1) ? 
portClient2 : portClient1;
+            zk = createClient("localhost:" + clientPort);
+            QuorumPeer qp = (getLeaderClientPort() == portClient1) ? s2 : s1;
+            ZooKeeperServer zs = qp.getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledOp(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledAclFollower() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            int clientPort = (getLeaderClientPort() == portClient1) ? 
portClient2 : portClient1;
+            zk = createClient("localhost:" + clientPort);
+            QuorumPeer qp = (getLeaderClientPort() == portClient1) ? s2 : s1;
+            ZooKeeperServer zs = qp.getActiveServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledAcl(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpStandaloneTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpStandaloneTest.java
new file mode 100644
index 0000000..27ac6a6
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ThrottledOpStandaloneTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThrottledOpStandaloneTest extends ClientBase {
+
+    @BeforeClass
+    public static void applyMockUps() {
+        ThrottledOpHelper.applyMockUps();
+    }
+
+    @Test
+    public void testThrottledOp() throws IOException, InterruptedException, 
KeeperException {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient(hostPort);
+            ZooKeeperServer zs = serverFactory.getZooKeeperServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledOp(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+    @Test
+    public void testThrottledAcl() throws Exception {
+        ZooKeeper zk = null;
+        try {
+            zk = createClient(hostPort);
+            ZooKeeperServer zs = serverFactory.getZooKeeperServer();
+            ThrottledOpHelper test = new ThrottledOpHelper();
+            test.testThrottledAcl(zk, zs);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+}

Reply via email to