merlimat closed pull request #1583: Let Proxy handle ZK session expires without 
restarting
URL: https://github.com/apache/incubator-pulsar/pull/1583
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java
 
b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java
new file mode 100644
index 000000000..be60294ce
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java
@@ -0,0 +1,1365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BkZooWorker.ZooCallable;
+import org.apache.zookeeper.AsyncCallback.ACLCallback;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.MultiCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a zookeeper client to handle session expire.
+ *
+ * Note this is copied from BK ZooKeeperClient class. The only addition here 
is that it allows to specify read-only mode
+ * which we use when connecting to global ZK.
+ *
+ * TODO: Remove this class once BK-4.8 is released to include read-only in 
ZooKeeperClient.
+ */
+public class BkZooKeeperClient extends ZooKeeper implements Watcher {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BkZooKeeperClient.class);
+
+    private static final int DEFAULT_RETRY_EXECUTOR_THREAD_COUNT = 1;
+
+    // ZooKeeper client connection variables
+    private final String connectString;
+    private final int sessionTimeoutMs;
+    private final boolean allowReadOnlyMode;
+
+    // state for the zookeeper client
+    private final AtomicReference<ZooKeeper> zk = new 
AtomicReference<ZooKeeper>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ZooKeeperWatcherBase watcherManager;
+
+    private final ScheduledExecutorService retryExecutor;
+    private final ExecutorService connectExecutor;
+
+    // rate limiter
+    private final RateLimiter rateLimiter;
+
+    // retry polices
+    private final RetryPolicy connectRetryPolicy;
+    private final RetryPolicy operationRetryPolicy;
+
+    // Stats Logger
+    private final OpStatsLogger createStats;
+    private final OpStatsLogger getStats;
+    private final OpStatsLogger setStats;
+    private final OpStatsLogger deleteStats;
+    private final OpStatsLogger getChildrenStats;
+    private final OpStatsLogger existsStats;
+    private final OpStatsLogger multiStats;
+    private final OpStatsLogger getACLStats;
+    private final OpStatsLogger setACLStats;
+    private final OpStatsLogger syncStats;
+    private final OpStatsLogger createClientStats;
+
+    private final Callable<ZooKeeper> clientCreator = new 
Callable<ZooKeeper>() {
+
+        @Override
+        public ZooKeeper call() throws Exception {
+            try {
+                return BkZooWorker.syncCallWithRetries(null, new 
ZooCallable<ZooKeeper>() {
+
+                    @Override
+                    public ZooKeeper call() throws KeeperException, 
InterruptedException {
+                        logger.info("Reconnecting zookeeper {}.", 
connectString);
+                        // close the previous one
+                        closeZkHandle();
+                        ZooKeeper newZk;
+                        try {
+                            newZk = createZooKeeper();
+                        } catch (IOException ie) {
+                            logger.error("Failed to create zookeeper instance 
to " + connectString, ie);
+                            throw 
KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+                        }
+                        waitForConnection();
+                        zk.set(newZk);
+                        logger.info("ZooKeeper session {} is created to {}.",
+                                Long.toHexString(newZk.getSessionId()), 
connectString);
+                        return newZk;
+                    }
+
+                    @Override
+                    public String toString() {
+                        return String.format("ZooKeeper Client Creator (%s)", 
connectString);
+                    }
+
+                }, connectRetryPolicy, rateLimiter, createClientStats);
+            } catch (Exception e) {
+                logger.error("Gave up reconnecting to ZooKeeper : ", e);
+                Runtime.getRuntime().exit(-1);
+                return null;
+            }
+        }
+
+    };
+
+    @VisibleForTesting
+    static BkZooKeeperClient createConnectedZooKeeperClient(
+            String connectString, int sessionTimeoutMs, Set<Watcher> 
childWatchers,
+            RetryPolicy operationRetryPolicy)
+                    throws KeeperException, InterruptedException, IOException {
+        return BkZooKeeperClient.newBuilder()
+                .connectString(connectString)
+                .sessionTimeoutMs(sessionTimeoutMs)
+                .watchers(childWatchers)
+                .operationRetryPolicy(operationRetryPolicy)
+                .build();
+    }
+
+    /**
+     * A builder to build retryable zookeeper client.
+     */
+    public static class Builder {
+        String connectString = null;
+        int sessionTimeoutMs = 10000;
+        Set<Watcher> watchers = null;
+        RetryPolicy connectRetryPolicy = null;
+        RetryPolicy operationRetryPolicy = null;
+        StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+        int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
+        double requestRateLimit = 0;
+        boolean allowReadOnlyMode = false;
+
+        private Builder() {}
+
+        public Builder connectString(String connectString) {
+            this.connectString = connectString;
+            return this;
+        }
+
+        public Builder sessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder watchers(Set<Watcher> watchers) {
+            this.watchers = watchers;
+            return this;
+        }
+
+        public Builder connectRetryPolicy(RetryPolicy retryPolicy) {
+            this.connectRetryPolicy = retryPolicy;
+            return this;
+        }
+
+        public Builder operationRetryPolicy(RetryPolicy retryPolicy) {
+            this.operationRetryPolicy = retryPolicy;
+            return this;
+        }
+
+        public Builder statsLogger(StatsLogger statsLogger) {
+            this.statsLogger = statsLogger;
+            return this;
+        }
+
+        public Builder requestRateLimit(double requestRateLimit) {
+            this.requestRateLimit = requestRateLimit;
+            return this;
+        }
+
+        public Builder retryThreadCount(int numThreads) {
+            this.retryExecThreadCount = numThreads;
+            return this;
+        }
+
+        public Builder allowReadOnlyMode(boolean allowReadOnlyMode) {
+            this.allowReadOnlyMode = allowReadOnlyMode;
+            return this;
+        }
+
+        public BkZooKeeperClient build() throws IOException, KeeperException, 
InterruptedException {
+            checkNotNull(connectString);
+            checkArgument(sessionTimeoutMs > 0);
+            checkNotNull(statsLogger);
+            checkArgument(retryExecThreadCount > 0);
+
+            if (null == connectRetryPolicy) {
+                connectRetryPolicy =
+                        new 
BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 
Integer.MAX_VALUE);
+            }
+            if (null == operationRetryPolicy) {
+                operationRetryPolicy =
+                        new 
BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0);
+            }
+
+            // Create a watcher manager
+            StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
+            ZooKeeperWatcherBase watcherManager =
+                    null == watchers ? new 
ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
+                            new ZooKeeperWatcherBase(sessionTimeoutMs, 
watchers, watcherStatsLogger);
+            BkZooKeeperClient client = new BkZooKeeperClient(
+                    connectString,
+                    sessionTimeoutMs,
+                    watcherManager,
+                    connectRetryPolicy,
+                    operationRetryPolicy,
+                    statsLogger,
+                    retryExecThreadCount,
+                    requestRateLimit,
+                    allowReadOnlyMode
+            );
+            // Wait for connection to be established.
+            try {
+                watcherManager.waitForConnection();
+            } catch (KeeperException ke) {
+                client.close();
+                throw ke;
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                client.close();
+                throw ie;
+            }
+            return client;
+        }
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    BkZooKeeperClient(String connectString,
+                    int sessionTimeoutMs,
+                    ZooKeeperWatcherBase watcherManager,
+                    RetryPolicy connectRetryPolicy,
+                    RetryPolicy operationRetryPolicy,
+                    StatsLogger statsLogger,
+                    int retryExecThreadCount,
+                    double rate,
+                    boolean allowReadOnlyMode) throws IOException {
+        super(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode);
+        this.connectString = connectString;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.allowReadOnlyMode =  allowReadOnlyMode;
+        this.watcherManager = watcherManager;
+        this.connectRetryPolicy = connectRetryPolicy;
+        this.operationRetryPolicy = operationRetryPolicy;
+        this.rateLimiter = rate > 0 ? RateLimiter.create(rate) : null;
+        this.retryExecutor =
+                Executors.newScheduledThreadPool(retryExecThreadCount,
+                    new 
ThreadFactoryBuilder().setNameFormat("ZKC-retry-executor-%d").build());
+        this.connectExecutor =
+                Executors.newSingleThreadExecutor(
+                    new 
ThreadFactoryBuilder().setNameFormat("ZKC-connect-executor-%d").build());
+        // added itself to the watcher
+        watcherManager.addChildWatcher(this);
+
+        // Stats
+        StatsLogger scopedStatsLogger = statsLogger.scope("zk");
+        createClientStats = 
scopedStatsLogger.getOpStatsLogger("create_client");
+        createStats = scopedStatsLogger.getOpStatsLogger("create");
+        getStats = scopedStatsLogger.getOpStatsLogger("get_data");
+        setStats = scopedStatsLogger.getOpStatsLogger("set_data");
+        deleteStats = scopedStatsLogger.getOpStatsLogger("delete");
+        getChildrenStats = scopedStatsLogger.getOpStatsLogger("get_children");
+        existsStats = scopedStatsLogger.getOpStatsLogger("exists");
+        multiStats = scopedStatsLogger.getOpStatsLogger("multi");
+        getACLStats = scopedStatsLogger.getOpStatsLogger("get_acl");
+        setACLStats = scopedStatsLogger.getOpStatsLogger("set_acl");
+        syncStats = scopedStatsLogger.getOpStatsLogger("sync");
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        closed.set(true);
+        connectExecutor.shutdown();
+        retryExecutor.shutdown();
+        closeZkHandle();
+    }
+
+    private void closeZkHandle() throws InterruptedException {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            super.close();
+        } else {
+            zkHandle.close();
+        }
+    }
+
+    protected void waitForConnection() throws KeeperException, 
InterruptedException {
+        watcherManager.waitForConnection();
+    }
+
+    protected ZooKeeper createZooKeeper() throws IOException {
+        return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode);
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        if (event.getType() == EventType.None
+            && event.getState() == KeeperState.Expired) {
+            onExpired();
+        }
+    }
+
+    private void onExpired() {
+        if (closed.get()) {
+            // we don't schedule any tries if the client is closed.
+            return;
+        }
+
+        logger.info("ZooKeeper session {} is expired from {}.",
+                Long.toHexString(getSessionId()), connectString);
+        try {
+            connectExecutor.submit(clientCreator);
+        } catch (RejectedExecutionException ree) {
+            if (!closed.get()) {
+                logger.error("ZooKeeper reconnect task is rejected : ", ree);
+            }
+        } catch (Exception t) {
+            logger.error("Failed to submit zookeeper reconnect task due to 
runtime exception : ", t);
+        }
+    }
+
+    /**
+     * A runnable that retries zookeeper operations.
+     */
+    abstract static class ZkRetryRunnable implements Runnable {
+
+        final BkZooWorker worker;
+        final RateLimiter rateLimiter;
+        final Runnable that;
+
+        ZkRetryRunnable(RetryPolicy retryPolicy,
+                        RateLimiter rateLimiter,
+                        OpStatsLogger statsLogger) {
+            this.worker = new BkZooWorker(retryPolicy, statsLogger);
+            this.rateLimiter = rateLimiter;
+            that = this;
+        }
+
+        @Override
+        public void run() {
+            if (null != rateLimiter) {
+                rateLimiter.acquire();
+            }
+            zkRun();
+        }
+
+        abstract void zkRun();
+    }
+
+    // inherits from ZooKeeper client for all operations
+
+    @Override
+    public long getSessionId() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return super.getSessionId();
+        }
+        return zkHandle.getSessionId();
+    }
+
+    @Override
+    public byte[] getSessionPasswd() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return super.getSessionPasswd();
+        }
+        return zkHandle.getSessionPasswd();
+    }
+
+    @Override
+    public int getSessionTimeout() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return super.getSessionTimeout();
+        }
+        return zkHandle.getSessionTimeout();
+    }
+
+    @Override
+    public void addAuthInfo(String scheme, byte[] auth) {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            super.addAuthInfo(scheme, auth);
+            return;
+        }
+        zkHandle.addAuthInfo(scheme, auth);
+    }
+
+    private void backOffAndRetry(Runnable r, long nextRetryWaitTimeMs) {
+        try {
+            retryExecutor.schedule(r, nextRetryWaitTimeMs, 
TimeUnit.MILLISECONDS);
+        } catch (RejectedExecutionException ree) {
+            if (!closed.get()) {
+                logger.error("ZooKeeper Operation {} is rejected : ", r, ree);
+            }
+        }
+    }
+
+    private boolean allowRetry(BkZooWorker worker, int rc) {
+        return worker.allowRetry(rc) && !closed.get();
+    }
+
+    @Override
+    public synchronized void register(Watcher watcher) {
+        watcherManager.addChildWatcher(watcher);
+    }
+
+    @Override
+    public List<OpResult> multi(final Iterable<Op> ops) throws 
InterruptedException, KeeperException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<OpResult>>() {
+
+            @Override
+            public String toString() {
+                return "multi";
+            }
+
+            @Override
+            public List<OpResult> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.multi(ops);
+                }
+                return zkHandle.multi(ops);
+            }
+
+        }, operationRetryPolicy, rateLimiter, multiStats);
+    }
+
+    @Override
+    public void multi(final Iterable<Op> ops,
+                      final MultiCallback cb,
+                      final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, createStats) {
+
+            final MultiCallback multiCb = new MultiCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
List<OpResult> results) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, results);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.multi(ops, multiCb, worker);
+                } else {
+                    zkHandle.multi(ops, multiCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return "multi";
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    @Deprecated
+    public Transaction transaction() {
+        // since there is no reference about which client that the transaction 
could use
+        // so just use ZooKeeper instance directly.
+        // you'd better to use {@link #multi}.
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return super.transaction();
+        }
+        return zkHandle.transaction();
+    }
+
+    @Override
+    public List<ACL> getACL(final String path, final Stat stat) throws 
KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<ACL>>() {
+
+            @Override
+            public String toString() {
+                return String.format("getACL (%s, stat = %s)", path, stat);
+            }
+
+            @Override
+            public List<ACL> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getACL(path, stat);
+                }
+                return zkHandle.getACL(path, stat);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getACLStats);
+    }
+
+    @Override
+    public void getACL(final String path, final Stat stat, final ACLCallback 
cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getACLStats) {
+
+            final ACLCallback aclCb = new ACLCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
List<ACL> acl, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, acl, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            public String toString() {
+                return String.format("getACL (%s, stat = %s)", path, stat);
+            }
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getACL(path, stat, aclCb, worker);
+                } else {
+                    zkHandle.getACL(path, stat, aclCb, worker);
+                }
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public Stat setACL(final String path, final List<ACL> acl, final int 
version)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+            @Override
+            public String toString() {
+                return String.format("setACL (%s, acl = %s, version = %d)", 
path, acl, version);
+            }
+
+            @Override
+            public Stat call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.setACL(path, acl, version);
+                }
+                return zkHandle.setACL(path, acl, version);
+            }
+
+        }, operationRetryPolicy, rateLimiter, setACLStats);
+    }
+
+    @Override
+    public void setACL(final String path, final List<ACL> acl, final int 
version,
+            final StatCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, setACLStats) {
+
+            final StatCallback stCb = new StatCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            public String toString() {
+                return String.format("setACL (%s, acl = %s, version = %d)", 
path, acl, version);
+            }
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.setACL(path, acl, version, stCb, 
worker);
+                } else {
+                    zkHandle.setACL(path, acl, version, stCb, worker);
+                }
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void sync(final String path, final VoidCallback cb, final Object 
context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, syncStats) {
+
+            final VoidCallback vCb = new VoidCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context);
+                    }
+                }
+
+            };
+
+            @Override
+            public String toString() {
+                return String.format("sync (%s)", path);
+            }
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.sync(path, vCb, worker);
+                } else {
+                    zkHandle.sync(path, vCb, worker);
+                }
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public States getState() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return BkZooKeeperClient.super.getState();
+        } else {
+            return zkHandle.getState();
+        }
+    }
+
+    @Override
+    public String toString() {
+        ZooKeeper zkHandle = zk.get();
+        if (null == zkHandle) {
+            return BkZooKeeperClient.super.toString();
+        } else {
+            return zkHandle.toString();
+        }
+    }
+
+    @Override
+    public String create(final String path, final byte[] data,
+            final List<ACL> acl, final CreateMode createMode)
+                    throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<String>() 
{
+
+            @Override
+            public String call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.create(path, data, acl, 
createMode);
+                }
+                return zkHandle.create(path, data, acl, createMode);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("create (%s, acl = %s, mode = %s)", path, 
acl, createMode);
+            }
+
+        }, operationRetryPolicy, rateLimiter, createStats);
+    }
+
+    @Override
+    public void create(final String path, final byte[] data, final List<ACL> 
acl,
+            final CreateMode createMode, final StringCallback cb, final Object 
context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, createStats) {
+
+            final StringCallback createCb = new StringCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
String name) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, name);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.create(path, data, acl, 
createMode, createCb, worker);
+                } else {
+                    zkHandle.create(path, data, acl, createMode, createCb, 
worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("create (%s, acl = %s, mode = %s)", path, 
acl, createMode);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void delete(final String path, final int version) throws 
KeeperException, InterruptedException {
+        BkZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() {
+
+            @Override
+            public Void call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.delete(path, version);
+                } else {
+                    zkHandle.delete(path, version);
+                }
+                return null;
+            }
+
+            @Override
+            public String toString() {
+                return String.format("delete (%s, version = %d)", path, 
version);
+            }
+
+        }, operationRetryPolicy, rateLimiter, deleteStats);
+    }
+
+    @Override
+    public void delete(final String path, final int version, final 
VoidCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, deleteStats) {
+
+            final VoidCallback deleteCb = new VoidCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.delete(path, version, deleteCb, 
worker);
+                } else {
+                    zkHandle.delete(path, version, deleteCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("delete (%s, version = %d)", path, 
version);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public Stat exists(final String path, final Watcher watcher) throws 
KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+            @Override
+            public Stat call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.exists(path, watcher);
+                }
+                return zkHandle.exists(path, watcher);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, 
watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, existsStats);
+    }
+
+    @Override
+    public Stat exists(final String path, final boolean watch) throws 
KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+            @Override
+            public Stat call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.exists(path, watch);
+                }
+                return zkHandle.exists(path, watch);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, existsStats);
+    }
+
+    @Override
+    public void exists(final String path, final Watcher watcher, final 
StatCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, existsStats) {
+
+            final StatCallback stCb = new StatCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.exists(path, watcher, stCb, 
worker);
+                } else {
+                    zkHandle.exists(path, watcher, stCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, 
watcher);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void exists(final String path, final boolean watch, final 
StatCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, existsStats) {
+
+            final StatCallback stCb = new StatCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.exists(path, watch, stCb, worker);
+                } else {
+                    zkHandle.exists(path, watch, stCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("exists (%s, watcher = %s)", path, watch);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public byte[] getData(final String path, final Watcher watcher, final Stat 
stat)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() 
{
+
+            @Override
+            public byte[] call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getData(path, watcher, 
stat);
+                }
+                return zkHandle.getData(path, watcher, stat);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, 
watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getStats);
+    }
+
+    @Override
+    public byte[] getData(final String path, final boolean watch, final Stat 
stat)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() 
{
+
+            @Override
+            public byte[] call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getData(path, watch, stat);
+                }
+                return zkHandle.getData(path, watch, stat);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, 
watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getStats);
+    }
+
+    @Override
+    public void getData(final String path, final Watcher watcher, final 
DataCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getStats) {
+
+            final DataCallback dataCb = new DataCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, data, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getData(path, watcher, dataCb, 
worker);
+                } else {
+                    zkHandle.getData(path, watcher, dataCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, 
watcher);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void getData(final String path, final boolean watch, final 
DataCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getStats) {
+
+            final DataCallback dataCb = new DataCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, data, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getData(path, watch, dataCb, 
worker);
+                } else {
+                    zkHandle.getData(path, watch, dataCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getData (%s, watcher = %s)", path, 
watch);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public Stat setData(final String path, final byte[] data, final int 
version)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
+
+            @Override
+            public Stat call() throws KeeperException, InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.setData(path, data, 
version);
+                }
+                return zkHandle.setData(path, data, version);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("setData (%s, version = %d)", path, 
version);
+            }
+
+        }, operationRetryPolicy, rateLimiter, setStats);
+    }
+
+    @Override
+    public void setData(final String path, final byte[] data, final int 
version,
+            final StatCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, setStats) {
+
+            final StatCallback stCb = new StatCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.setData(path, data, version, stCb, 
worker);
+                } else {
+                    zkHandle.setData(path, data, version, stCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("setData (%s, version = %d)", path, 
version);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public List<String> getChildren(final String path, final Watcher watcher, 
final Stat stat)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<String>>() {
+
+            @Override
+            public List<String> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getChildren(path, watcher, 
stat);
+                }
+                return zkHandle.getChildren(path, watcher, stat);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
+    }
+
+    @Override
+    public List<String> getChildren(final String path, final boolean watch, 
final Stat stat)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<String>>() {
+
+            @Override
+            public List<String> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getChildren(path, watch, 
stat);
+                }
+                return zkHandle.getChildren(path, watch, stat);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
+    }
+
+    @Override
+    public void getChildren(final String path, final Watcher watcher,
+            final Children2Callback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getChildrenStats) {
+
+            final Children2Callback childCb = new Children2Callback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                        List<String> children, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, children, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getChildren(path, watcher, 
childCb, worker);
+                } else {
+                    zkHandle.getChildren(path, watcher, childCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watcher);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void getChildren(final String path, final boolean watch, final 
Children2Callback cb,
+            final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getChildrenStats) {
+
+            final Children2Callback childCb = new Children2Callback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                        List<String> children, Stat stat) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, children, stat);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getChildren(path, watch, childCb, 
worker);
+                } else {
+                    zkHandle.getChildren(path, watch, childCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watch);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+
+    @Override
+    public List<String> getChildren(final String path, final Watcher watcher)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<String>>() {
+
+            @Override
+            public List<String> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getChildren(path, watcher);
+                }
+                return zkHandle.getChildren(path, watcher);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watcher);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
+    }
+
+    @Override
+    public List<String> getChildren(final String path, final boolean watch)
+            throws KeeperException, InterruptedException {
+        return BkZooWorker.syncCallWithRetries(this, new 
ZooCallable<List<String>>() {
+
+            @Override
+            public List<String> call() throws KeeperException, 
InterruptedException {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    return BkZooKeeperClient.super.getChildren(path, watch);
+                }
+                return zkHandle.getChildren(path, watch);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watch);
+            }
+
+        }, operationRetryPolicy, rateLimiter, getChildrenStats);
+    }
+
+    @Override
+    public void getChildren(final String path, final Watcher watcher,
+            final ChildrenCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getChildrenStats) {
+
+            final ChildrenCallback childCb = new ChildrenCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                        List<String> children) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, children);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getChildren(path, watcher, 
childCb, worker);
+                } else {
+                    zkHandle.getChildren(path, watcher, childCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watcher);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+
+    @Override
+    public void getChildren(final String path, final boolean watch,
+            final ChildrenCallback cb, final Object context) {
+        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, getChildrenStats) {
+
+            final ChildrenCallback childCb = new ChildrenCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                        List<String> children) {
+                    BkZooWorker worker = (BkZooWorker) ctx;
+                    if (allowRetry(worker, rc)) {
+                        backOffAndRetry(that, worker.nextRetryWaitTime());
+                    } else {
+                        cb.processResult(rc, path, context, children);
+                    }
+                }
+
+            };
+
+            @Override
+            void zkRun() {
+                ZooKeeper zkHandle = zk.get();
+                if (null == zkHandle) {
+                    BkZooKeeperClient.super.getChildren(path, watch, childCb, 
worker);
+                } else {
+                    zkHandle.getChildren(path, watch, childCb, worker);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return String.format("getChildren (%s, watcher = %s)", path, 
watch);
+            }
+        };
+        // execute it immediately
+        proc.run();
+    }
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java 
b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java
new file mode 100644
index 000000000..634a61e51
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide a mechanism to perform an operation on ZooKeeper that is safe on 
disconnections
+ * and recoverable errors.
+ *
+ * TODO: Remove this class once BK-4.8 is released to include read-only in 
ZooKeeperClient.
+ */
+class BkZooWorker {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BkZooWorker.class);
+
+    int attempts = 0;
+    long startTimeNanos;
+    long elapsedTimeMs = 0L;
+    final RetryPolicy retryPolicy;
+    final OpStatsLogger statsLogger;
+
+    BkZooWorker(RetryPolicy retryPolicy, OpStatsLogger statsLogger) {
+        this.retryPolicy = retryPolicy;
+        this.statsLogger = statsLogger;
+        this.startTimeNanos = MathUtils.nowInNano();
+    }
+
+    public boolean allowRetry(int rc) {
+        elapsedTimeMs = MathUtils.elapsedMSec(startTimeNanos);
+        if (!BkZooWorker.isRecoverableException(rc)) {
+            if (KeeperException.Code.OK.intValue() == rc) {
+                
statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), 
TimeUnit.MICROSECONDS);
+            } else {
+                
statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), 
TimeUnit.MICROSECONDS);
+            }
+            return false;
+        }
+        ++attempts;
+        return retryPolicy.allowRetry(attempts, elapsedTimeMs);
+    }
+
+    public long nextRetryWaitTime() {
+        return retryPolicy.nextRetryWaitTime(attempts, elapsedTimeMs);
+    }
+
+    /**
+     * Check whether the given result code is recoverable by retry.
+     *
+     * @param rc result code
+     * @return true if given result code is recoverable.
+     */
+    public static boolean isRecoverableException(int rc) {
+        return KeeperException.Code.CONNECTIONLOSS.intValue() == rc
+            || KeeperException.Code.OPERATIONTIMEOUT.intValue() == rc
+            || KeeperException.Code.SESSIONMOVED.intValue() == rc
+            || KeeperException.Code.SESSIONEXPIRED.intValue() == rc;
+    }
+
+    /**
+     * Check whether the given exception is recoverable by retry.
+     *
+     * @param exception given exception
+     * @return true if given exception is recoverable.
+     */
+    public static boolean isRecoverableException(KeeperException exception) {
+        return isRecoverableException(exception.code().intValue());
+    }
+
+    interface ZooCallable<T> {
+        /**
+         * Be compatible with ZooKeeper interface.
+         *
+         * @return value
+         * @throws InterruptedException
+         * @throws KeeperException
+         */
+        T call() throws InterruptedException, KeeperException;
+    }
+
+    /**
+     * Execute a sync zookeeper operation with a given retry policy.
+     *
+     * @param client
+     *          ZooKeeper client.
+     * @param proc
+     *          Synchronous zookeeper operation wrapped in a {@link Callable}.
+     * @param retryPolicy
+     *          Retry policy to execute the synchronous operation.
+     * @param rateLimiter
+     *          Rate limiter for zookeeper calls
+     * @param statsLogger
+     *          Stats Logger for zookeeper client.
+     * @return result of the zookeeper operation
+     * @throws KeeperException any non-recoverable exception or recoverable 
exception exhausted all retires.
+     * @throws InterruptedException the operation is interrupted.
+     */
+    public static<T> T syncCallWithRetries(BkZooKeeperClient client,
+                                           ZooCallable<T> proc,
+                                           RetryPolicy retryPolicy,
+                                           RateLimiter rateLimiter,
+                                           OpStatsLogger statsLogger)
+    throws KeeperException, InterruptedException {
+        T result = null;
+        boolean isDone = false;
+        int attempts = 0;
+        long startTimeNanos = MathUtils.nowInNano();
+        while (!isDone) {
+            try {
+                if (null != client) {
+                    client.waitForConnection();
+                }
+                logger.debug("Execute {} at {} retry attempt.", proc, 
attempts);
+                if (null != rateLimiter) {
+                    rateLimiter.acquire();
+                }
+                result = proc.call();
+                isDone = true;
+                
statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), 
TimeUnit.MICROSECONDS);
+            } catch (KeeperException e) {
+                ++attempts;
+                boolean rethrow = true;
+                long elapsedTime = MathUtils.elapsedMSec(startTimeNanos);
+                if (((null != client && isRecoverableException(e)) || null == 
client)
+                    && retryPolicy.allowRetry(attempts, elapsedTime)) {
+                    rethrow = false;
+                }
+                if (rethrow) {
+                    
statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), 
TimeUnit.MICROSECONDS);
+                    logger.debug("Stopped executing {} after {} attempts.", 
proc, attempts);
+                    throw e;
+                }
+                
TimeUnit.MILLISECONDS.sleep(retryPolicy.nextRetryWaitTime(attempts, 
elapsedTime));
+            }
+        }
+        return result;
+    }
+
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index c77ace2fb..e79807733 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -21,6 +21,13 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -39,20 +46,11 @@
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
-import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
 import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Pulsar proxy service
  */
@@ -78,8 +76,6 @@
 
     private BrokerDiscoveryProvider discoveryProvider;
 
-    private LocalZooKeeperConnectionService localZooKeeperConnectionService;
-
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
 
     private static final int numThreads = 
Runtime.getRuntime().availableProcessors();
@@ -124,15 +120,6 @@ public void start() throws Exception {
         authenticationService = new 
AuthenticationService(serviceConfiguration);
 
         if (!isBlank(proxyConfig.getZookeeperServers()) && 
!isBlank(proxyConfig.getGlobalZookeeperServers())) {
-            localZooKeeperConnectionService = new 
LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
-                    proxyConfig.getZookeeperServers(), 
proxyConfig.getZookeeperSessionTimeoutMs());
-            localZooKeeperConnectionService.start(new ShutdownService() {
-                @Override
-                public void shutdown(int exitCode) {
-                    LOG.error("Lost local ZK session. Shutting down the 
proxy");
-                    Runtime.getRuntime().halt(-1);
-                }
-            });
             discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, 
getZooKeeperClientFactory());
             this.configurationCacheService = new 
ConfigurationCacheService(discoveryProvider.globalZkCache);
             authorizationService = new 
AuthorizationService(serviceConfiguration, configurationCacheService);
@@ -182,9 +169,6 @@ public BrokerDiscoveryProvider getDiscoveryProvider() {
     }
 
     public void close() throws IOException {
-        if (localZooKeeperConnectionService != null) {
-            localZooKeeperConnectionService.close();
-        }
         if (discoveryProvider != null) {
             discoveryProvider.close();
         }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java
new file mode 100644
index 000000000..69452fc13
--- /dev/null
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server.util;
+
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.zookeeper.BkZooKeeperClient;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
+@Slf4j
+public class BkReadOnlyZookeeperClientFactoryImpl implements 
ZooKeeperClientFactory {
+
+    private final OrderedExecutor executor;
+
+    public BkReadOnlyZookeeperClientFactoryImpl(OrderedExecutor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<ZooKeeper> create(String serverList, SessionType 
sessionType, int zkSessionTimeoutMillis) {
+        CompletableFuture<ZooKeeper> future = new CompletableFuture<>();
+
+        executor.execute(safeRun(() -> {
+            try {
+                ZooKeeper zk = 
BkZooKeeperClient.newBuilder().connectString(serverList)
+                        .sessionTimeoutMs(zkSessionTimeoutMillis)
+                        .connectRetryPolicy(new 
BoundExponentialBackoffRetryPolicy(zkSessionTimeoutMillis,
+                                zkSessionTimeoutMillis, 0))
+                        .allowReadOnlyMode(sessionType == 
SessionType.AllowReadOnly).build();
+
+                if (zk.getState() == States.CONNECTEDREADONLY && sessionType 
!= SessionType.AllowReadOnly) {
+                    future.completeExceptionally(new 
IllegalStateException("Cannot use a read-only session"));
+                }
+
+                log.info("ZooKeeper session established: {}", zk);
+                future.complete(zk);
+            } catch (IOException | KeeperException | InterruptedException 
exception) {
+                log.error("Failed to establish ZooKeeper session: {}", 
exception.getMessage());
+                future.completeExceptionally(exception);
+            }
+        }, throwable -> {
+            future.completeExceptionally(throwable);
+        }));
+
+        return future;
+    }
+
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 78223473b..887c5ea2d 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server.util;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -28,12 +29,13 @@
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
-import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,13 +45,11 @@
  */
 public class ZookeeperCacheLoader implements Closeable {
 
+    private final ZooKeeper zkClient;
     private final ZooKeeperCache localZkCache;
-    private final LocalZooKeeperConnectionService localZkConnectionSvc;
-
     private final ZooKeeperDataCache<LoadManagerReport> brokerInfo;
     private final ZooKeeperChildrenCache availableBrokersCache;
 
-    private volatile Set<String> availableBrokersSet;
     private volatile List<LoadManagerReport> availableBrokers;
 
     private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
@@ -63,22 +63,9 @@
      * @param zookeeperServers
      * @throws Exception
      */
-    public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String 
zookeeperServers,
-            int zookeeperSessionTimeoutMs) throws Exception {
-        localZkConnectionSvc = new 
LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers,
-                zookeeperSessionTimeoutMs);
-        localZkConnectionSvc.start(exitCode -> {
-            log.error("Shutting down ZK sessions: {}", exitCode);
-        });
-
-        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor);
-        localZkConnectionSvc.start(exitCode -> {
-            try {
-                localZkCache.getZooKeeper().close();
-            } catch (InterruptedException e) {
-                log.warn("Failed to shutdown ZooKeeper gracefully {}", 
e.getMessage(), e);
-            }
-        });
+    public ZookeeperCacheLoader(ZooKeeperClientFactory factory, String 
zookeeperServers, int zookeeperSessionTimeoutMs) throws Exception {
+        this.zkClient = factory.create(zookeeperServers, 
SessionType.AllowReadOnly, zookeeperSessionTimeoutMs).get();
+        this.localZkCache = new LocalZooKeeperCache(zkClient, 
this.orderedExecutor);
 
         this.brokerInfo = new 
ZooKeeperDataCache<LoadManagerReport>(localZkCache) {
             @Override
@@ -113,7 +100,13 @@ public ZooKeeperCache getLocalZkCache() {
     }
 
     @Override
-    public void close() {
+    public void close() throws IOException {
+        try {
+            zkClient.close();
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw new IOException(e);
+        }
         orderedExecutor.shutdown();
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to