This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e626e2a  Let Proxy handle ZK session expires without restarting (#1583)
e626e2a is described below

commit e626e2ac26634b989db344a23f066bf34774363d
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sun Apr 15 23:21:10 2018 -0700

    Let Proxy handle ZK session expires without restarting (#1583)
---
 .../bookkeeper/zookeeper/BkZooKeeperClient.java    | 1365 ++++++++++++++++++++
 .../apache/bookkeeper/zookeeper/BkZooWorker.java   |  162 +++
 .../apache/pulsar/proxy/server/ProxyService.java   |   30 +-
 .../util/BkReadOnlyZookeeperClientFactoryImpl.java |   74 ++
 .../proxy/server/util/ZookeeperCacheLoader.java    |   35 +-
 5 files changed, 1622 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java
 
b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java
new file mode 100644
index 0000000..be60294
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java
@@ -0,0 +1,1365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BkZooWorker.ZooCallable;
+import org.apache.zookeeper.AsyncCallback.ACLCallback;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.MultiCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a zookeeper client to handle session expire.
+ *
+ * Note this is copied from BK ZooKeeperClient class. The only addition here 
is that it allows to specify read-only mode
+ * which we use when connecting to global ZK.
+ *
+ * TODO: Remove this class once BK-4.8 is released to include read-only in 
ZooKeeperClient.
+ */
+public class BkZooKeeperClient extends ZooKeeper implements Watcher {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BkZooKeeperClient.class);
+
+    private static final int DEFAULT_RETRY_EXECUTOR_THREAD_COUNT = 1;
+
+    // ZooKeeper client connection variables
+    private final String connectString;
+    private final int sessionTimeoutMs;
+    private final boolean allowReadOnlyMode;
+
+    // state for the zookeeper client
+    private final AtomicReference<ZooKeeper> zk = new 
AtomicReference<ZooKeeper>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ZooKeeperWatcherBase watcherManager;
+
+    private final ScheduledExecutorService retryExecutor;
+    private final ExecutorService connectExecutor;
+
+    // rate limiter
+    private final RateLimiter rateLimiter;
+
+    // retry polices
+    private final RetryPolicy connectRetryPolicy;
+    private final RetryPolicy operationRetryPolicy;
+
+    // Stats Logger
+    private final OpStatsLogger createStats;
+    private final OpStatsLogger getStats;
+    private final OpStatsLogger setStats;
+    private final OpStatsLogger deleteStats;
+    private final OpStatsLogger getChildrenStats;
+    private final OpStatsLogger existsStats;
+    private final OpStatsLogger multiStats;
+    private final OpStatsLogger getACLStats;
+    private final OpStatsLogger setACLStats;
+    private final OpStatsLogger syncStats;
+    private final OpStatsLogger createClientStats;
+
+    private final Callable<ZooKeeper> clientCreator = new 
Callable<ZooKeeper>() {
+
+        @Override
+        public ZooKeeper call() throws Exception {
+            try {
+                return BkZooWorker.syncCallWithRetries(null, new 
ZooCallable<ZooKeeper>() {
+
+                    @Override
+                    public ZooKeeper call() throws KeeperException, 
InterruptedException {
+                        logger.info("Reconnecting zookeeper {}.", 
connectString);
+                        // close the previous one
+                        closeZkHandle();
+                        ZooKeeper newZk;
+                        try {
+                            newZk = createZooKeeper();
+                        } catch (IOException ie) {
+                            logger.error("Failed to create zookeeper instance 
to " + connectString, ie);
+                            throw 
KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+                        }
+                        waitForConnection();
+                        zk.set(newZk);
+                        logger.info("ZooKeeper session {} is created to {}.",
+                                Long.toHexString(newZk.getSessionId()), 
connectString);
+                        return newZk;
+                    }
+
+                    @Override
+                    public String toString() {
+                        return String.format("ZooKeeper Client Creator (%s)", 
connectString);
+                    }
+
+                }, connectRetryPolicy, rateLimiter, createClientStats);
+            } catch (Exception e) {
+                logger.error("Gave up reconnecting to ZooKeeper : ", e);
+                Runtime.getRuntime().exit(-1);
+                return null;
+            }
+        }
+
+    };
+
+    @VisibleForTesting
+    static BkZooKeeperClient createConnectedZooKeeperClient(
+            String connectString, int sessionTimeoutMs, Set<Watcher> 
childWatchers,
+            RetryPolicy operationRetryPolicy)
+                    throws KeeperException, InterruptedException, IOException {
+        return BkZooKeeperClient.newBuilder()
+                .connectString(connectString)
+                .sessionTimeoutMs(sessionTimeoutMs)
+                .watchers(childWatchers)
+                .operationRetryPolicy(operationRetryPolicy)
+                .build();
+    }
+
+    /**
+     * A builder to build retryable zookeeper client.
+     */
+    public static class Builder {
+        String connectString = null;
+        int sessionTimeoutMs = 10000;
+        Set<Watcher> watchers = null;
+        RetryPolicy connectRetryPolicy = null;
+        RetryPolicy operationRetryPolicy = null;
+        StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+        int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
+        double requestRateLimit = 0;
+        boolean allowReadOnlyMode = false;
+
+        private Builder() {}
+
+        public Builder connectString(String connectString) {
+            this.connectString = connectString;
+            return this;
+        }
+
+        public Builder sessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder watchers(Set<Watcher> watchers) {
+            this.watchers = watchers;
+            return this;
+        }
+
+        public Builder connectRetryPolicy(RetryPolicy retryPolicy) {
+            this.connectRetryPolicy = retryPolicy;
+            return this;
+        }
+
+        public Builder operationRetryPolicy(RetryPolicy retryPolicy) {
+            this.operationRetryPolicy = retryPolicy;
+            return this;
+        }
+
+        public Builder statsLogger(StatsLogger statsLogger) {
+            this.statsLogger = statsLogger;
+            return this;
+        }
+
+        public Builder requestRateLimit(double requestRateLimit) {
+            this.requestRateLimit = requestRateLimit;
+            return this;
+        }
+
+        public Builder retryThreadCount(int numThreads) {
+            this.retryExecThreadCount = numThreads;
+            return this;
+        }
+
+        public Builder allowReadOnlyMode(boolean allowReadOnlyMode) {
+            this.allowReadOnlyMode = allowReadOnlyMode;
+            return this;
+        }
+
+        public BkZooKeeperClient build() throws IOException, KeeperException, 
InterruptedException {
+            checkNotNull(connectString);
+            checkArgument(sessionTimeoutMs > 0);
+            checkNotNull(statsLogger);
+            checkArgument(retryExecThreadCount > 0);
+
+            if (null == connectRetryPolicy) {
+                connectRetryPolicy =
+                        new 
BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 
Integer.MAX_VALUE);
+            }
+            if (null == operationRetryPolicy) {
+                operationRetryPolicy =
+                        new 
BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0);
+            }
+
+            // Create a watcher manager
+            StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
+            ZooKeeperWatcherBase watcherManager =
+                    null == watchers ? new 
ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
+                            new ZooKeeperWatcherBase(sessionTimeoutMs, 
watchers, watcherStatsLogger);
+            BkZooKeeperClient client = new BkZooKeeperClient(
+                    connectString,
+                    sessionTimeoutMs,
+                    watcherManager,
+                    connectRetryPolicy,
+                    operationRetryPolicy,
+                    statsLogger,
+                    retryExecThreadCount,
+                    requestRateLimit,
+                    allowReadOnlyMode
+            );
+            // Wait for connection to be established.
+            try {
+                watcherManager.waitForConnection();
+            } catch (KeeperException ke) {
+                client.close();
+                throw ke;
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                client.close();
+                throw ie;
+            }
+            return client;
+        }
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    BkZooKeeperClient(String connectString,
+                    int sessionTimeoutMs,
+                    ZooKeeperWatcherBase watcherManager,
+                    RetryPolicy connectRetryPolicy,
+                    RetryPolicy operationRetryPolicy,
+                    StatsLogger statsLogger,
+                    int retryExecThreadCount,
+                    double rate,
+                    boolean allowReadOnlyMode) throws IOException {
+        super(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode);
+        this.connectString = connectString;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.allowReadOnlyMode =  allowReadOnlyMode;
+        this.watcherManager = watcherManager;
+        this.connectRetryPolicy = connectRetryPolicy;
+        this.operationRetryPolicy = operationRetryPolicy;
+        this.rateLimiter = rate > 0 ? RateLimiter.create(rate) : null;
+        this.retryExecutor =
+                Executors.newScheduledThreadPool(retryExecThreadCount,
+                    new 
ThreadFactoryBuilder().setNameFormat("ZKC-retry-executor-%d").build());
+        this.connectExecutor =
+                Executors.newSingleThreadExecutor(
+                    new 
ThreadFactoryBuilder().setNameFormat("ZKC-connect-executor-%d").build());
+        // added itself to the watcher
+        watcherManager.addChildWatcher(this);
+
+        // Stats
+        StatsLogger scopedStatsLogger = statsLogger.scope("zk");
+        createClientStats = 
scopedStatsLogger.getOpStatsLogger("create_client");
+        createStats = scopedStatsLogger.getOpStatsLogger("create");
+        getStats = scopedStatsLogger.getOpStatsLogger("get_data");
+        setStats = scopedStatsLogger.getOpStatsLogger("set_data");
+        deleteStats = scopedStatsLogger.getOpStatsLogger("delete");
+        getChildrenStats = scopedStatsLogger.getOpStatsLogger("get_children");
+        existsStats = scopedStatsLogger.getOpStatsLogger("exists");
+        multiStats = scopedStatsLogger.getOpStatsLogger("multi");
+        getACLStats = scopedStatsLogger.getOpStatsLogger("get_acl");
+        setACLStats = scopedStatsLogger.getOpStatsLogger("set_acl");
+        syncStats = scopedStatsLogger.getOpStatsLogger("sync");
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        closed.set(true);
+        connectExecutor.shutdown();
+        retryExecutor.shutdown();
+        closeZkHandle();
+    }
+
+    private void closeZkHandle() throws InterruptedException {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            super.close();
+        } else {
+            zkHandle.close();
+        }
+    }
+
+    protected void waitForConnection() throws KeeperException, 
InterruptedException {
+        watcherManager.waitForConnection();
+    }
+
+    protected ZooKeeper createZooKeeper() throws IOException {
+        return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode);
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        if (event.getType() == EventType.None
+            && event.getState() == KeeperState.Expired) {
+            onExpired();
+        }
+    }
+
+    private void onExpired() {
+        if (closed.get()) {
+            // we don't schedule any tries if the client is closed.
+            return;
+        }
+
+        logger.info("ZooKeeper session {} is expired from {}.",
+                Long.toHexString(getSessionId()), connectString);
+        try {
+            connectExecutor.submit(clientCreator);
+        } catch (RejectedExecutionException ree) {
+            if (!closed.get()) {
+                logger.error("ZooKeeper reconnect task is rejected : ", ree);
+            }
+        } catch (Exception t) {
+            logger.error("Failed to submit zookeeper reconnect task due to 
runtime exception : ", t);
+        }
+    }
+
+    /**
+     * A runnable that retries zookeeper operations.
+     */
+    abstract static class ZkRetryRunnable implements Runnable {
+
+        final BkZooWorker worker;
+        final RateLimiter rateLimiter;
+        final Runnable that;
+
+        ZkRetryRunnable(RetryPolicy retryPolicy,
+                        RateLimiter rateLimiter,
+                        OpStatsLogger statsLogger) {
+            this.worker = new BkZooWorker(retryPolicy, statsLogger);
+            this.rateLimiter = rateLimiter;
+            that = this;
+        }
+
+        @Override
+        public void run() {
+            if (null != rateLimiter) {
+                rateLimiter.acquire();
+            }
+            zkRun();
+        }
+
+        abstract void zkRun();
+    }
+
+    // inherits from ZooKeeper client for all operations
+
+    @Override
+    public long getSessionId() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return super.getSessionId();
+        }
+        return zkHandle.getSessionId();
+    }
+
+    @Override
+    public byte[] getSessionPasswd() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return super.getSessionPasswd();
+        }
+        return zkHandle.getSessionPasswd();
+    }
+
+    @Override
+    public int getSessionTimeout() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return super.getSessionTimeout();
+        }
+        return zkHandle.getSessionTimeout();
+    }
+
+    @Override
+    public void addAuthInfo(String scheme, byte[] auth) {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            super.addAuthInfo(scheme, auth);
+            return;
+        }
+        zkHandle.addAuthInfo(scheme, auth);
+    }
+
+    private void backOffAndRetry(Runnable r, long nextRetryWaitTimeMs) {
+        try {
+            retryExecutor.schedule(r, nextRetryWaitTimeMs, 
TimeUnit.MILLISECONDS);
+        } catch (RejectedExecutionException ree) {
+            if (!closed.get()) {
+                logger.error("ZooKeeper Operation {} is rejected : ", r, ree);
+            }
+        }
+    }
+
+    private boolean allowRetry(BkZooWorker worker, int rc) {
+        return worker.allowRetry(rc) && !closed.get();
+    }
+
+    @Override
+    public synchronized void register(Watcher watcher) {
+        watcherManager.addChildWatcher(watcher);
+    }
+
+    @Override
+    public List<OpResult> multi(final Iterable<Op> ops) throws 
InterruptedException, KeeperException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<OpResult>>() {
+
+            @Override
+            public String toString() {
+                return "multi";
+            }
+
+            @Override
+            public List<OpResult> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.multi(ops);
+                }
+                return zkHandle.multi(ops);
+            }
+
+        }, operationRetryPolicy, rateLimiter, multiStats);
+    }
+
+    @Override
+    public void multi(final Iterable<Op> ops,
+                      final MultiCallback cb,
+                      final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, createStats) {
+
+            final MultiCallback multiCb = new MultiCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
List<OpResult> results) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, results);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.multi(ops, multiCb, worker);
+                } else {
+                    zkHandle.multi(ops, multiCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return "multi";
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    @Deprecated
+    public Transaction transaction() {
+        // since there is no reference about which client that the transaction 
could use
+        // so just use ZooKeeper instance directly.
+        // you'd better to use {@link #multi}.
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return super.transaction();
+        }
+        return zkHandle.transaction();
+    }
+
+    @Override
+    public List<ACL> getACL(final String path, final Stat stat) throws 
KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<ACL>>() {
+
+            @Override
+            public String toString() {
+                return String.format("getACL (%s, stat = %s)", path, stat);
+            }
+
+            @Override
+            public List<ACL> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getACL(path, stat);
+                }
+                return zkHandle.getACL(path, stat);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getACLStats);
+    }
+
+    @Override
+    public void getACL(final String path, final Stat stat, final ACLCallback 
cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getACLStats) {
+
+            final ACLCallback aclCb = new ACLCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
List<ACL> acl, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, acl, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            public String toString() {
+                return String.format("getACL (%s, stat = %s)", path, stat);
+            }
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getACL(path, stat, aclCb, worker);
+                } else {
+                    zkHandle.getACL(path, stat, aclCb, worker);
+                }
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public Stat setACL(final String path, final List<ACL> acl, final int 
version)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+            @Override
+            public String toString() {
+                return String.format("setACL (%s, acl = %s, version = %d)", 
path, acl, version);
+            }
+
+            @Override
+            public Stat call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.setACL(path, acl, version);
+                }
+                return zkHandle.setACL(path, acl, version);
+            }
+
+        }, operationRetryPolicy, rateLimiter, setACLStats);
+    }
+
+    @Override
+    public void setACL(final String path, final List<ACL> acl, final int 
version,
+            final StatCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, setACLStats) {
+
+            final StatCallback stCb = new StatCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            public String toString() {
+                return String.format("setACL (%s, acl = %s, version = %d)", 
path, acl, version);
+            }
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.setACL(path, acl, version, stCb, 
worker);
+                } else {
+                    zkHandle.setACL(path, acl, version, stCb, worker);
+                }
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void sync(final String path, final VoidCallback cb, final Object 
context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, syncStats) {
+
+            final VoidCallback vCb = new VoidCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context);
+                    }
+                }
+
+            };
+
+            @Override
+            public String toString() {
+                return String.format("sync (%s)", path);
+            }
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.sync(path, vCb, worker);
+                } else {
+                    zkHandle.sync(path, vCb, worker);
+                }
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public States getState() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return BkZooKeeperClient.super.getState();
+        } else {
+            return zkHandle.getState();
+        }
+    }
+
+    @Override
+    public String toString() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return BkZooKeeperClient.super.toString();
+        } else {
+            return zkHandle.toString();
+        }
+    }
+
+    @Override
+    public String create(final String path, final byte[] data,
+            final List<ACL> acl, final CreateMode createMode)
+                    throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<String>() 
{
+
+            @Override
+            public String call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.create(path, data, acl, 
createMode);
+                }
+                return zkHandle.create(path, data, acl, createMode);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("create (%s, acl = %s, mode = %s)", path, 
acl, createMode);
+            }
+
+        }, operationRetryPolicy, rateLimiter, createStats);
+    }
+
+    @Override
+    public void create(final String path, final byte[] data, final List<ACL> 
acl,
+            final CreateMode createMode, final StringCallback cb, final Object 
context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, createStats) {
+
+            final StringCallback createCb = new StringCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
String name) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, name);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.create(path, data, acl, 
createMode, createCb, worker);
+                } else {
+                    zkHandle.create(path, data, acl, createMode, createCb, 
worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("create (%s, acl = %s, mode = %s)", path, 
acl, createMode);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void delete(final String path, final int version) throws 
KeeperException, InterruptedException {
+        BkZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() {
+
+            @Override
+            public Void call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.delete(path, version);
+                } else {
+                    zkHandle.delete(path, version);
+                }
+                return null;
+            }
+
+            @Override
+            public String toString() {
+                return String.format("delete (%s, version = %d)", path, 
version);
+            }
+
+        }, operationRetryPolicy, rateLimiter, deleteStats);
+    }
+
+    @Override
+    public void delete(final String path, final int version, final 
VoidCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, deleteStats) {
+
+            final VoidCallback deleteCb = new VoidCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.delete(path, version, deleteCb, 
worker);
+                } else {
+                    zkHandle.delete(path, version, deleteCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("delete (%s, version = %d)", path, 
version);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public Stat exists(final String path, final Watcher watcher) throws 
KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+            @Override
+            public Stat call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.exists(path, watcher);
+                }
+                return zkHandle.exists(path, watcher);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, 
watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, existsStats);
+    }
+
+    @Override
+    public Stat exists(final String path, final boolean watch) throws 
KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+            @Override
+            public Stat call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.exists(path, watch);
+                }
+                return zkHandle.exists(path, watch);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, existsStats);
+    }
+
+    @Override
+    public void exists(final String path, final Watcher watcher, final 
StatCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, existsStats) {
+
+            final StatCallback stCb = new StatCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.exists(path, watcher, stCb, 
worker);
+                } else {
+                    zkHandle.exists(path, watcher, stCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, 
watcher);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void exists(final String path, final boolean watch, final 
StatCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, existsStats) {
+
+            final StatCallback stCb = new StatCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.exists(path, watch, stCb, worker);
+                } else {
+                    zkHandle.exists(path, watch, stCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, watch);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public byte[] getData(final String path, final Watcher watcher, final Stat 
stat)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() 
{
+
+            @Override
+            public byte[] call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getData(path, watcher, 
stat);
+                }
+                return zkHandle.getData(path, watcher, stat);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, 
watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getStats);
+    }
+
+    @Override
+    public byte[] getData(final String path, final boolean watch, final Stat 
stat)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() 
{
+
+            @Override
+            public byte[] call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getData(path, watch, stat);
+                }
+                return zkHandle.getData(path, watch, stat);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, 
watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getStats);
+    }
+
+    @Override
+    public void getData(final String path, final Watcher watcher, final 
DataCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getStats) {
+
+            final DataCallback dataCb = new DataCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, data, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getData(path, watcher, dataCb, 
worker);
+                } else {
+                    zkHandle.getData(path, watcher, dataCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, 
watcher);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void getData(final String path, final boolean watch, final 
DataCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getStats) {
+
+            final DataCallback dataCb = new DataCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, data, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getData(path, watch, dataCb, 
worker);
+                } else {
+                    zkHandle.getData(path, watch, dataCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, 
watch);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public Stat setData(final String path, final byte[] data, final int 
version)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+            @Override
+            public Stat call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.setData(path, data, 
version);
+                }
+                return zkHandle.setData(path, data, version);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("setData (%s, version = %d)", path, 
version);
+            }
+
+        }, operationRetryPolicy, rateLimiter, setStats);
+    }
+
+    @Override
+    public void setData(final String path, final byte[] data, final int 
version,
+            final StatCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, setStats) {
+
+            final StatCallback stCb = new StatCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.setData(path, data, version, stCb, 
worker);
+                } else {
+                    zkHandle.setData(path, data, version, stCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("setData (%s, version = %d)", path, 
version);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public List<String> getChildren(final String path, final Watcher watcher, 
final Stat stat)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<String>>() {
+
+            @Override
+            public List<String> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getChildren(path, watcher, 
stat);
+                }
+                return zkHandle.getChildren(path, watcher, stat);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
+    }
+
+    @Override
+    public List<String> getChildren(final String path, final boolean watch, 
final Stat stat)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<String>>() {
+
+            @Override
+            public List<String> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getChildren(path, watch, 
stat);
+                }
+                return zkHandle.getChildren(path, watch, stat);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
+    }
+
+    @Override
+    public void getChildren(final String path, final Watcher watcher,
+            final Children2Callback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getChildrenStats) {
+
+            final Children2Callback childCb = new Children2Callback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                        List<String> children, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, children, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getChildren(path, watcher, 
childCb, worker);
+                } else {
+                    zkHandle.getChildren(path, watcher, childCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watcher);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void getChildren(final String path, final boolean watch, final 
Children2Callback cb,
+            final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getChildrenStats) {
+
+            final Children2Callback childCb = new Children2Callback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                        List<String> children, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, children, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getChildren(path, watch, childCb, 
worker);
+                } else {
+                    zkHandle.getChildren(path, watch, childCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watch);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+
+    @Override
+    public List<String> getChildren(final String path, final Watcher watcher)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<String>>() {
+
+            @Override
+            public List<String> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getChildren(path, watcher);
+                }
+                return zkHandle.getChildren(path, watcher);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
+    }
+
+    @Override
+    public List<String> getChildren(final String path, final boolean watch)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<String>>() {
+
+            @Override
+            public List<String> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getChildren(path, watch);
+                }
+                return zkHandle.getChildren(path, watch);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
+    }
+
+    @Override
+    public void getChildren(final String path, final Watcher watcher,
+            final ChildrenCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getChildrenStats) {
+
+            final ChildrenCallback childCb = new ChildrenCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                        List<String> children) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, children);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getChildren(path, watcher, 
childCb, worker);
+                } else {
+                    zkHandle.getChildren(path, watcher, childCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watcher);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void getChildren(final String path, final boolean watch,
+            final ChildrenCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getChildrenStats) {
+
+            final ChildrenCallback childCb = new ChildrenCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                        List<String> children) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, children);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getChildren(path, watch, childCb, 
worker);
+                } else {
+                    zkHandle.getChildren(path, watch, childCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watch);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java 
b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java
new file mode 100644
index 0000000..634a61e
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a mechanism to perform an operation on ZooKeeper that is safe on 
disconnections
+ * and recoverable errors.
+ *
+ * TODO: Remove this class once BK-4.8 is released to include read-only in 
ZooKeeperClient.
+ */
+class BkZooWorker {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BkZooWorker.class);
+
+    int attempts = 0;
+    long startTimeNanos;
+    long elapsedTimeMs = 0L;
+    final RetryPolicy retryPolicy;
+    final OpStatsLogger statsLogger;
+
+    BkZooWorker(RetryPolicy retryPolicy, OpStatsLogger statsLogger) {
+        this.retryPolicy = retryPolicy;
+        this.statsLogger = statsLogger;
+        this.startTimeNanos = MathUtils.nowInNano();
+    }
+
+    public boolean allowRetry(int rc) {
+        elapsedTimeMs = MathUtils.elapsedMSec(startTimeNanos);
+        if (!BkZooWorker.isRecoverableException(rc)) {
+            if (KeeperException.Code.OK.intValue() == rc) {
+                
statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), 
TimeUnit.MICROSECONDS);
+            } else {
+                
statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), 
TimeUnit.MICROSECONDS);
+            }
+            return false;
+        }
+        ++attempts;
+        return retryPolicy.allowRetry(attempts, elapsedTimeMs);
+    }
+
+    public long nextRetryWaitTime() {
+        return retryPolicy.nextRetryWaitTime(attempts, elapsedTimeMs);
+    }
+
+    /**
+     * Check whether the given result code is recoverable by retry.
+     *
+     * @param rc result code
+     * @return true if given result code is recoverable.
+     */
+    public static boolean isRecoverableException(int rc) {
+        return KeeperException.Code.CONNECTIONLOSS.intValue() == rc
+            || KeeperException.Code.OPERATIONTIMEOUT.intValue() == rc
+            || KeeperException.Code.SESSIONMOVED.intValue() == rc
+            || KeeperException.Code.SESSIONEXPIRED.intValue() == rc;
+    }
+
+    /**
+     * Check whether the given exception is recoverable by retry.
+     *
+     * @param exception given exception
+     * @return true if given exception is recoverable.
+     */
+    public static boolean isRecoverableException(KeeperException exception) {
+        return isRecoverableException(exception.code().intValue());
+    }
+
+    interface ZooCallable<T> {
+        /**
+         * Be compatible with ZooKeeper interface.
+         *
+         * @return value
+         * @throws InterruptedException
+         * @throws KeeperException
+         */
+        T call() throws InterruptedException, KeeperException;
+    }
+
+    /**
+     * Execute a sync zookeeper operation with a given retry policy.
+     *
+     * @param client
+     *          ZooKeeper client.
+     * @param proc
+     *          Synchronous zookeeper operation wrapped in a {@link Callable}.
+     * @param retryPolicy
+     *          Retry policy to execute the synchronous operation.
+     * @param rateLimiter
+     *          Rate limiter for zookeeper calls
+     * @param statsLogger
+     *          Stats Logger for zookeeper client.
+     * @return result of the zookeeper operation
+     * @throws KeeperException any non-recoverable exception or recoverable 
exception exhausted all retires.
+     * @throws InterruptedException the operation is interrupted.
+     */
+    public static<T> T syncCallWithRetries(BkZooKeeperClient client,
+                                           ZooCallable<T> proc,
+                                           RetryPolicy retryPolicy,
+                                           RateLimiter rateLimiter,
+                                           OpStatsLogger statsLogger)
+    throws KeeperException, InterruptedException {
+        T result = null;
+        boolean isDone = false;
+        int attempts = 0;
+        long startTimeNanos = MathUtils.nowInNano();
+        while (!isDone) {
+            try {
+                if (null != client) {
+                    client.waitForConnection();
+                }
+                logger.debug("Execute {} at {} retry attempt.", proc, 
attempts);
+                if (null != rateLimiter) {
+                    rateLimiter.acquire();
+                }
+                result = proc.call();
+                isDone = true;
+                
statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), 
TimeUnit.MICROSECONDS);
+            } catch (KeeperException e) {
+                ++attempts;
+                boolean rethrow = true;
+                long elapsedTime = MathUtils.elapsedMSec(startTimeNanos);
+                if (((null != client && isRecoverableException(e)) || null == 
client)
+                    && retryPolicy.allowRetry(attempts, elapsedTime)) {
+                    rethrow = false;
+                }
+                if (rethrow) {
+                    
statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), 
TimeUnit.MICROSECONDS);
+                    logger.debug("Stopped executing {} after {} attempts.", 
proc, attempts);
+                    throw e;
+                }
+                
TimeUnit.MILLISECONDS.sleep(retryPolicy.nextRetryWaitTime(attempts, 
elapsedTime));
+            }
+        }
+        return result;
+    }
+
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index c77ace2..e798077 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -21,6 +21,13 @@ package org.apache.pulsar.proxy.server;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -39,20 +46,11 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
-import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
 import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Pulsar proxy service
  */
@@ -78,8 +76,6 @@ public class ProxyService implements Closeable {
 
     private BrokerDiscoveryProvider discoveryProvider;
 
-    private LocalZooKeeperConnectionService localZooKeeperConnectionService;
-
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
 
     private static final int numThreads = 
Runtime.getRuntime().availableProcessors();
@@ -124,15 +120,6 @@ public class ProxyService implements Closeable {
         authenticationService = new 
AuthenticationService(serviceConfiguration);
 
         if (!isBlank(proxyConfig.getZookeeperServers()) && 
!isBlank(proxyConfig.getGlobalZookeeperServers())) {
-            localZooKeeperConnectionService = new 
LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
-                    proxyConfig.getZookeeperServers(), 
proxyConfig.getZookeeperSessionTimeoutMs());
-            localZooKeeperConnectionService.start(new ShutdownService() {
-                @Override
-                public void shutdown(int exitCode) {
-                    LOG.error("Lost local ZK session. Shutting down the 
proxy");
-                    Runtime.getRuntime().halt(-1);
-                }
-            });
             discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, 
getZooKeeperClientFactory());
             this.configurationCacheService = new 
ConfigurationCacheService(discoveryProvider.globalZkCache);
             authorizationService = new 
AuthorizationService(serviceConfiguration, configurationCacheService);
@@ -182,9 +169,6 @@ public class ProxyService implements Closeable {
     }
 
     public void close() throws IOException {
-        if (localZooKeeperConnectionService != null) {
-            localZooKeeperConnectionService.close();
-        }
         if (discoveryProvider != null) {
             discoveryProvider.close();
         }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java
new file mode 100644
index 0000000..69452fc
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server.util;
+
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.zookeeper.BkZooKeeperClient;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
+@Slf4j
+public class BkReadOnlyZookeeperClientFactoryImpl implements 
ZooKeeperClientFactory {
+
+    private final OrderedExecutor executor;
+
+    public BkReadOnlyZookeeperClientFactoryImpl(OrderedExecutor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<ZooKeeper> create(String serverList, SessionType 
sessionType, int zkSessionTimeoutMillis) {
+        CompletableFuture<ZooKeeper> future = new CompletableFuture<>();
+
+        executor.execute(safeRun(() -> {
+            try {
+                ZooKeeper zk = 
BkZooKeeperClient.newBuilder().connectString(serverList)
+                        .sessionTimeoutMs(zkSessionTimeoutMillis)
+                        .connectRetryPolicy(new 
BoundExponentialBackoffRetryPolicy(zkSessionTimeoutMillis,
+                                zkSessionTimeoutMillis, 0))
+                        .allowReadOnlyMode(sessionType == 
SessionType.AllowReadOnly).build();
+
+                if (zk.getState() == States.CONNECTEDREADONLY && sessionType 
!= SessionType.AllowReadOnly) {
+                    future.completeExceptionally(new 
IllegalStateException("Cannot use a read-only session"));
+                }
+
+                log.info("ZooKeeper session established: {}", zk);
+                future.complete(zk);
+            } catch (IOException | KeeperException | InterruptedException 
exception) {
+                log.error("Failed to establish ZooKeeper session: {}", 
exception.getMessage());
+                future.completeExceptionally(exception);
+            }
+        }, throwable -> {
+            future.completeExceptionally(throwable);
+        }));
+
+        return future;
+    }
+
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 7822347..887c5ea 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server.util;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -28,12 +29,13 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
-import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,13 +45,11 @@ import org.slf4j.LoggerFactory;
  */
 public class ZookeeperCacheLoader implements Closeable {
 
+    private final ZooKeeper zkClient;
     private final ZooKeeperCache localZkCache;
-    private final LocalZooKeeperConnectionService localZkConnectionSvc;
-
     private final ZooKeeperDataCache<LoadManagerReport> brokerInfo;
     private final ZooKeeperChildrenCache availableBrokersCache;
 
-    private volatile Set<String> availableBrokersSet;
     private volatile List<LoadManagerReport> availableBrokers;
 
     private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
@@ -63,22 +63,9 @@ public class ZookeeperCacheLoader implements Closeable {
      * @param zookeeperServers
      * @throws Exception
      */
-    public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String 
zookeeperServers,
-            int zookeeperSessionTimeoutMs) throws Exception {
-        localZkConnectionSvc = new 
LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers,
-                zookeeperSessionTimeoutMs);
-        localZkConnectionSvc.start(exitCode -> {
-            log.error("Shutting down ZK sessions: {}", exitCode);
-        });
-
-        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor);
-        localZkConnectionSvc.start(exitCode -> {
-            try {
-                localZkCache.getZooKeeper().close();
-            } catch (InterruptedException e) {
-                log.warn("Failed to shutdown ZooKeeper gracefully {}", 
e.getMessage(), e);
-            }
-        });
+    public ZookeeperCacheLoader(ZooKeeperClientFactory factory, String 
zookeeperServers, int zookeeperSessionTimeoutMs) throws Exception {
+        this.zkClient = factory.create(zookeeperServers, 
SessionType.AllowReadOnly, zookeeperSessionTimeoutMs).get();
+        this.localZkCache = new LocalZooKeeperCache(zkClient, 
this.orderedExecutor);
 
         this.brokerInfo = new 
ZooKeeperDataCache<LoadManagerReport>(localZkCache) {
             @Override
@@ -113,7 +100,13 @@ public class ZookeeperCacheLoader implements Closeable {
     }
 
     @Override
-    public void close() {
+    public void close() throws IOException {
+        try {
+            zkClient.close();
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw new IOException(e);
+        }
         orderedExecutor.shutdown();
     }
 

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to