This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e626e2a Let Proxy handle ZK session expires without restarting (#1583) e626e2a is described below commit e626e2ac26634b989db344a23f066bf34774363d Author: Matteo Merli <mme...@apache.org> AuthorDate: Sun Apr 15 23:21:10 2018 -0700 Let Proxy handle ZK session expires without restarting (#1583) --- .../bookkeeper/zookeeper/BkZooKeeperClient.java | 1365 ++++++++++++++++++++ .../apache/bookkeeper/zookeeper/BkZooWorker.java | 162 +++ .../apache/pulsar/proxy/server/ProxyService.java | 30 +- .../util/BkReadOnlyZookeeperClientFactoryImpl.java | 74 ++ .../proxy/server/util/ZookeeperCacheLoader.java | 35 +- 5 files changed, 1622 insertions(+), 44 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java new file mode 100644 index 0000000..be60294 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java @@ -0,0 +1,1365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.zookeeper; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.zookeeper.BkZooWorker.ZooCallable; +import org.apache.zookeeper.AsyncCallback.ACLCallback; +import org.apache.zookeeper.AsyncCallback.Children2Callback; +import org.apache.zookeeper.AsyncCallback.ChildrenCallback; +import org.apache.zookeeper.AsyncCallback.DataCallback; +import org.apache.zookeeper.AsyncCallback.MultiCallback; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.Transaction; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provide a zookeeper client to handle session expire. + * + * Note this is copied from BK ZooKeeperClient class. The only addition here is that it allows to specify read-only mode + * which we use when connecting to global ZK. + * + * TODO: Remove this class once BK-4.8 is released to include read-only in ZooKeeperClient. + */ +public class BkZooKeeperClient extends ZooKeeper implements Watcher { + + private static final Logger logger = LoggerFactory.getLogger(BkZooKeeperClient.class); + + private static final int DEFAULT_RETRY_EXECUTOR_THREAD_COUNT = 1; + + // ZooKeeper client connection variables + private final String connectString; + private final int sessionTimeoutMs; + private final boolean allowReadOnlyMode; + + // state for the zookeeper client + private final AtomicReference<ZooKeeper> zk = new AtomicReference<ZooKeeper>(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final ZooKeeperWatcherBase watcherManager; + + private final ScheduledExecutorService retryExecutor; + private final ExecutorService connectExecutor; + + // rate limiter + private final RateLimiter rateLimiter; + + // retry polices + private final RetryPolicy connectRetryPolicy; + private final RetryPolicy operationRetryPolicy; + + // Stats Logger + private final OpStatsLogger createStats; + private final OpStatsLogger getStats; + private final OpStatsLogger setStats; + private final OpStatsLogger deleteStats; + private final OpStatsLogger getChildrenStats; + private final OpStatsLogger existsStats; + private final OpStatsLogger multiStats; + private final OpStatsLogger getACLStats; + private final OpStatsLogger setACLStats; + private final OpStatsLogger syncStats; + private final OpStatsLogger createClientStats; + + private final Callable<ZooKeeper> clientCreator = new Callable<ZooKeeper>() { + + @Override + public ZooKeeper call() throws Exception { + try { + return BkZooWorker.syncCallWithRetries(null, new ZooCallable<ZooKeeper>() { + + @Override + public ZooKeeper call() throws KeeperException, InterruptedException { + logger.info("Reconnecting zookeeper {}.", connectString); + // close the previous one + closeZkHandle(); + ZooKeeper newZk; + try { + newZk = createZooKeeper(); + } catch (IOException ie) { + logger.error("Failed to create zookeeper instance to " + connectString, ie); + throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); + } + waitForConnection(); + zk.set(newZk); + logger.info("ZooKeeper session {} is created to {}.", + Long.toHexString(newZk.getSessionId()), connectString); + return newZk; + } + + @Override + public String toString() { + return String.format("ZooKeeper Client Creator (%s)", connectString); + } + + }, connectRetryPolicy, rateLimiter, createClientStats); + } catch (Exception e) { + logger.error("Gave up reconnecting to ZooKeeper : ", e); + Runtime.getRuntime().exit(-1); + return null; + } + } + + }; + + @VisibleForTesting + static BkZooKeeperClient createConnectedZooKeeperClient( + String connectString, int sessionTimeoutMs, Set<Watcher> childWatchers, + RetryPolicy operationRetryPolicy) + throws KeeperException, InterruptedException, IOException { + return BkZooKeeperClient.newBuilder() + .connectString(connectString) + .sessionTimeoutMs(sessionTimeoutMs) + .watchers(childWatchers) + .operationRetryPolicy(operationRetryPolicy) + .build(); + } + + /** + * A builder to build retryable zookeeper client. + */ + public static class Builder { + String connectString = null; + int sessionTimeoutMs = 10000; + Set<Watcher> watchers = null; + RetryPolicy connectRetryPolicy = null; + RetryPolicy operationRetryPolicy = null; + StatsLogger statsLogger = NullStatsLogger.INSTANCE; + int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT; + double requestRateLimit = 0; + boolean allowReadOnlyMode = false; + + private Builder() {} + + public Builder connectString(String connectString) { + this.connectString = connectString; + return this; + } + + public Builder sessionTimeoutMs(int sessionTimeoutMs) { + this.sessionTimeoutMs = sessionTimeoutMs; + return this; + } + + public Builder watchers(Set<Watcher> watchers) { + this.watchers = watchers; + return this; + } + + public Builder connectRetryPolicy(RetryPolicy retryPolicy) { + this.connectRetryPolicy = retryPolicy; + return this; + } + + public Builder operationRetryPolicy(RetryPolicy retryPolicy) { + this.operationRetryPolicy = retryPolicy; + return this; + } + + public Builder statsLogger(StatsLogger statsLogger) { + this.statsLogger = statsLogger; + return this; + } + + public Builder requestRateLimit(double requestRateLimit) { + this.requestRateLimit = requestRateLimit; + return this; + } + + public Builder retryThreadCount(int numThreads) { + this.retryExecThreadCount = numThreads; + return this; + } + + public Builder allowReadOnlyMode(boolean allowReadOnlyMode) { + this.allowReadOnlyMode = allowReadOnlyMode; + return this; + } + + public BkZooKeeperClient build() throws IOException, KeeperException, InterruptedException { + checkNotNull(connectString); + checkArgument(sessionTimeoutMs > 0); + checkNotNull(statsLogger); + checkArgument(retryExecThreadCount > 0); + + if (null == connectRetryPolicy) { + connectRetryPolicy = + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE); + } + if (null == operationRetryPolicy) { + operationRetryPolicy = + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0); + } + + // Create a watcher manager + StatsLogger watcherStatsLogger = statsLogger.scope("watcher"); + ZooKeeperWatcherBase watcherManager = + null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) : + new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger); + BkZooKeeperClient client = new BkZooKeeperClient( + connectString, + sessionTimeoutMs, + watcherManager, + connectRetryPolicy, + operationRetryPolicy, + statsLogger, + retryExecThreadCount, + requestRateLimit, + allowReadOnlyMode + ); + // Wait for connection to be established. + try { + watcherManager.waitForConnection(); + } catch (KeeperException ke) { + client.close(); + throw ke; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + client.close(); + throw ie; + } + return client; + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + BkZooKeeperClient(String connectString, + int sessionTimeoutMs, + ZooKeeperWatcherBase watcherManager, + RetryPolicy connectRetryPolicy, + RetryPolicy operationRetryPolicy, + StatsLogger statsLogger, + int retryExecThreadCount, + double rate, + boolean allowReadOnlyMode) throws IOException { + super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); + this.connectString = connectString; + this.sessionTimeoutMs = sessionTimeoutMs; + this.allowReadOnlyMode = allowReadOnlyMode; + this.watcherManager = watcherManager; + this.connectRetryPolicy = connectRetryPolicy; + this.operationRetryPolicy = operationRetryPolicy; + this.rateLimiter = rate > 0 ? RateLimiter.create(rate) : null; + this.retryExecutor = + Executors.newScheduledThreadPool(retryExecThreadCount, + new ThreadFactoryBuilder().setNameFormat("ZKC-retry-executor-%d").build()); + this.connectExecutor = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("ZKC-connect-executor-%d").build()); + // added itself to the watcher + watcherManager.addChildWatcher(this); + + // Stats + StatsLogger scopedStatsLogger = statsLogger.scope("zk"); + createClientStats = scopedStatsLogger.getOpStatsLogger("create_client"); + createStats = scopedStatsLogger.getOpStatsLogger("create"); + getStats = scopedStatsLogger.getOpStatsLogger("get_data"); + setStats = scopedStatsLogger.getOpStatsLogger("set_data"); + deleteStats = scopedStatsLogger.getOpStatsLogger("delete"); + getChildrenStats = scopedStatsLogger.getOpStatsLogger("get_children"); + existsStats = scopedStatsLogger.getOpStatsLogger("exists"); + multiStats = scopedStatsLogger.getOpStatsLogger("multi"); + getACLStats = scopedStatsLogger.getOpStatsLogger("get_acl"); + setACLStats = scopedStatsLogger.getOpStatsLogger("set_acl"); + syncStats = scopedStatsLogger.getOpStatsLogger("sync"); + } + + @Override + public void close() throws InterruptedException { + closed.set(true); + connectExecutor.shutdown(); + retryExecutor.shutdown(); + closeZkHandle(); + } + + private void closeZkHandle() throws InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + super.close(); + } else { + zkHandle.close(); + } + } + + protected void waitForConnection() throws KeeperException, InterruptedException { + watcherManager.waitForConnection(); + } + + protected ZooKeeper createZooKeeper() throws IOException { + return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); + } + + @Override + public void process(WatchedEvent event) { + if (event.getType() == EventType.None + && event.getState() == KeeperState.Expired) { + onExpired(); + } + } + + private void onExpired() { + if (closed.get()) { + // we don't schedule any tries if the client is closed. + return; + } + + logger.info("ZooKeeper session {} is expired from {}.", + Long.toHexString(getSessionId()), connectString); + try { + connectExecutor.submit(clientCreator); + } catch (RejectedExecutionException ree) { + if (!closed.get()) { + logger.error("ZooKeeper reconnect task is rejected : ", ree); + } + } catch (Exception t) { + logger.error("Failed to submit zookeeper reconnect task due to runtime exception : ", t); + } + } + + /** + * A runnable that retries zookeeper operations. + */ + abstract static class ZkRetryRunnable implements Runnable { + + final BkZooWorker worker; + final RateLimiter rateLimiter; + final Runnable that; + + ZkRetryRunnable(RetryPolicy retryPolicy, + RateLimiter rateLimiter, + OpStatsLogger statsLogger) { + this.worker = new BkZooWorker(retryPolicy, statsLogger); + this.rateLimiter = rateLimiter; + that = this; + } + + @Override + public void run() { + if (null != rateLimiter) { + rateLimiter.acquire(); + } + zkRun(); + } + + abstract void zkRun(); + } + + // inherits from ZooKeeper client for all operations + + @Override + public long getSessionId() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionId(); + } + return zkHandle.getSessionId(); + } + + @Override + public byte[] getSessionPasswd() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionPasswd(); + } + return zkHandle.getSessionPasswd(); + } + + @Override + public int getSessionTimeout() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.getSessionTimeout(); + } + return zkHandle.getSessionTimeout(); + } + + @Override + public void addAuthInfo(String scheme, byte[] auth) { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + super.addAuthInfo(scheme, auth); + return; + } + zkHandle.addAuthInfo(scheme, auth); + } + + private void backOffAndRetry(Runnable r, long nextRetryWaitTimeMs) { + try { + retryExecutor.schedule(r, nextRetryWaitTimeMs, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ree) { + if (!closed.get()) { + logger.error("ZooKeeper Operation {} is rejected : ", r, ree); + } + } + } + + private boolean allowRetry(BkZooWorker worker, int rc) { + return worker.allowRetry(rc) && !closed.get(); + } + + @Override + public synchronized void register(Watcher watcher) { + watcherManager.addChildWatcher(watcher); + } + + @Override + public List<OpResult> multi(final Iterable<Op> ops) throws InterruptedException, KeeperException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<OpResult>>() { + + @Override + public String toString() { + return "multi"; + } + + @Override + public List<OpResult> call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.multi(ops); + } + return zkHandle.multi(ops); + } + + }, operationRetryPolicy, rateLimiter, multiStats); + } + + @Override + public void multi(final Iterable<Op> ops, + final MultiCallback cb, + final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) { + + final MultiCallback multiCb = new MultiCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, List<OpResult> results) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, results); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.multi(ops, multiCb, worker); + } else { + zkHandle.multi(ops, multiCb, worker); + } + } + + @Override + public String toString() { + return "multi"; + } + }; + // execute it immediately + proc.run(); + } + + @Override + @Deprecated + public Transaction transaction() { + // since there is no reference about which client that the transaction could use + // so just use ZooKeeper instance directly. + // you'd better to use {@link #multi}. + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return super.transaction(); + } + return zkHandle.transaction(); + } + + @Override + public List<ACL> getACL(final String path, final Stat stat) throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<ACL>>() { + + @Override + public String toString() { + return String.format("getACL (%s, stat = %s)", path, stat); + } + + @Override + public List<ACL> call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.getACL(path, stat); + } + return zkHandle.getACL(path, stat); + } + + }, operationRetryPolicy, rateLimiter, getACLStats); + } + + @Override + public void getACL(final String path, final Stat stat, final ACLCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getACLStats) { + + final ACLCallback aclCb = new ACLCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, acl, stat); + } + } + + }; + + @Override + public String toString() { + return String.format("getACL (%s, stat = %s)", path, stat); + } + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.getACL(path, stat, aclCb, worker); + } else { + zkHandle.getACL(path, stat, aclCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat setACL(final String path, final List<ACL> acl, final int version) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() { + + @Override + public String toString() { + return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version); + } + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.setACL(path, acl, version); + } + return zkHandle.setACL(path, acl, version); + } + + }, operationRetryPolicy, rateLimiter, setACLStats); + } + + @Override + public void setACL(final String path, final List<ACL> acl, final int version, + final StatCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setACLStats) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + public String toString() { + return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version); + } + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.setACL(path, acl, version, stCb, worker); + } else { + zkHandle.setACL(path, acl, version, stCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void sync(final String path, final VoidCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, syncStats) { + + final VoidCallback vCb = new VoidCallback() { + + @Override + public void processResult(int rc, String path, Object ctx) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context); + } + } + + }; + + @Override + public String toString() { + return String.format("sync (%s)", path); + } + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.sync(path, vCb, worker); + } else { + zkHandle.sync(path, vCb, worker); + } + } + }; + // execute it immediately + proc.run(); + } + + @Override + public States getState() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.getState(); + } else { + return zkHandle.getState(); + } + } + + @Override + public String toString() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.toString(); + } else { + return zkHandle.toString(); + } + } + + @Override + public String create(final String path, final byte[] data, + final List<ACL> acl, final CreateMode createMode) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<String>() { + + @Override + public String call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.create(path, data, acl, createMode); + } + return zkHandle.create(path, data, acl, createMode); + } + + @Override + public String toString() { + return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode); + } + + }, operationRetryPolicy, rateLimiter, createStats); + } + + @Override + public void create(final String path, final byte[] data, final List<ACL> acl, + final CreateMode createMode, final StringCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) { + + final StringCallback createCb = new StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, name); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker); + } else { + zkHandle.create(path, data, acl, createMode, createCb, worker); + } + } + + @Override + public String toString() { + return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void delete(final String path, final int version) throws KeeperException, InterruptedException { + BkZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() { + + @Override + public Void call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.delete(path, version); + } else { + zkHandle.delete(path, version); + } + return null; + } + + @Override + public String toString() { + return String.format("delete (%s, version = %d)", path, version); + } + + }, operationRetryPolicy, rateLimiter, deleteStats); + } + + @Override + public void delete(final String path, final int version, final VoidCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, deleteStats) { + + final VoidCallback deleteCb = new VoidCallback() { + + @Override + public void processResult(int rc, String path, Object ctx) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.delete(path, version, deleteCb, worker); + } else { + zkHandle.delete(path, version, deleteCb, worker); + } + } + + @Override + public String toString() { + return String.format("delete (%s, version = %d)", path, version); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat exists(final String path, final Watcher watcher) throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.exists(path, watcher); + } + return zkHandle.exists(path, watcher); + } + + @Override + public String toString() { + return String.format("exists (%s, watcher = %s)", path, watcher); + } + + }, operationRetryPolicy, rateLimiter, existsStats); + } + + @Override + public Stat exists(final String path, final boolean watch) throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.exists(path, watch); + } + return zkHandle.exists(path, watch); + } + + @Override + public String toString() { + return String.format("exists (%s, watcher = %s)", path, watch); + } + + }, operationRetryPolicy, rateLimiter, existsStats); + } + + @Override + public void exists(final String path, final Watcher watcher, final StatCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.exists(path, watcher, stCb, worker); + } else { + zkHandle.exists(path, watcher, stCb, worker); + } + } + + @Override + public String toString() { + return String.format("exists (%s, watcher = %s)", path, watcher); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void exists(final String path, final boolean watch, final StatCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.exists(path, watch, stCb, worker); + } else { + zkHandle.exists(path, watch, stCb, worker); + } + } + + @Override + public String toString() { + return String.format("exists (%s, watcher = %s)", path, watch); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public byte[] getData(final String path, final Watcher watcher, final Stat stat) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() { + + @Override + public byte[] call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.getData(path, watcher, stat); + } + return zkHandle.getData(path, watcher, stat); + } + + @Override + public String toString() { + return String.format("getData (%s, watcher = %s)", path, watcher); + } + + }, operationRetryPolicy, rateLimiter, getStats); + } + + @Override + public byte[] getData(final String path, final boolean watch, final Stat stat) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() { + + @Override + public byte[] call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.getData(path, watch, stat); + } + return zkHandle.getData(path, watch, stat); + } + + @Override + public String toString() { + return String.format("getData (%s, watcher = %s)", path, watch); + } + + }, operationRetryPolicy, rateLimiter, getStats); + } + + @Override + public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) { + + final DataCallback dataCb = new DataCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, data, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.getData(path, watcher, dataCb, worker); + } else { + zkHandle.getData(path, watcher, dataCb, worker); + } + } + + @Override + public String toString() { + return String.format("getData (%s, watcher = %s)", path, watcher); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getData(final String path, final boolean watch, final DataCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) { + + final DataCallback dataCb = new DataCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, data, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.getData(path, watch, dataCb, worker); + } else { + zkHandle.getData(path, watch, dataCb, worker); + } + } + + @Override + public String toString() { + return String.format("getData (%s, watcher = %s)", path, watch); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public Stat setData(final String path, final byte[] data, final int version) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() { + + @Override + public Stat call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.setData(path, data, version); + } + return zkHandle.setData(path, data, version); + } + + @Override + public String toString() { + return String.format("setData (%s, version = %d)", path, version); + } + + }, operationRetryPolicy, rateLimiter, setStats); + } + + @Override + public void setData(final String path, final byte[] data, final int version, + final StatCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) { + + final StatCallback stCb = new StatCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.setData(path, data, version, stCb, worker); + } else { + zkHandle.setData(path, data, version, stCb, worker); + } + } + + @Override + public String toString() { + return String.format("setData (%s, version = %d)", path, version); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public List<String> getChildren(final String path, final Watcher watcher, final Stat stat) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() { + + @Override + public List<String> call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.getChildren(path, watcher, stat); + } + return zkHandle.getChildren(path, watcher, stat); + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watcher); + } + + }, operationRetryPolicy, rateLimiter, getChildrenStats); + } + + @Override + public List<String> getChildren(final String path, final boolean watch, final Stat stat) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() { + + @Override + public List<String> call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.getChildren(path, watch, stat); + } + return zkHandle.getChildren(path, watch, stat); + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watch); + } + + }, operationRetryPolicy, rateLimiter, getChildrenStats); + } + + @Override + public void getChildren(final String path, final Watcher watcher, + final Children2Callback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) { + + final Children2Callback childCb = new Children2Callback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List<String> children, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, children, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.getChildren(path, watcher, childCb, worker); + } else { + zkHandle.getChildren(path, watcher, childCb, worker); + } + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watcher); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getChildren(final String path, final boolean watch, final Children2Callback cb, + final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) { + + final Children2Callback childCb = new Children2Callback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List<String> children, Stat stat) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, children, stat); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.getChildren(path, watch, childCb, worker); + } else { + zkHandle.getChildren(path, watch, childCb, worker); + } + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watch); + } + }; + // execute it immediately + proc.run(); + } + + + @Override + public List<String> getChildren(final String path, final Watcher watcher) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() { + + @Override + public List<String> call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.getChildren(path, watcher); + } + return zkHandle.getChildren(path, watcher); + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watcher); + } + + }, operationRetryPolicy, rateLimiter, getChildrenStats); + } + + @Override + public List<String> getChildren(final String path, final boolean watch) + throws KeeperException, InterruptedException { + return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() { + + @Override + public List<String> call() throws KeeperException, InterruptedException { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + return BkZooKeeperClient.super.getChildren(path, watch); + } + return zkHandle.getChildren(path, watch); + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watch); + } + + }, operationRetryPolicy, rateLimiter, getChildrenStats); + } + + @Override + public void getChildren(final String path, final Watcher watcher, + final ChildrenCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) { + + final ChildrenCallback childCb = new ChildrenCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List<String> children) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, children); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.getChildren(path, watcher, childCb, worker); + } else { + zkHandle.getChildren(path, watcher, childCb, worker); + } + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watcher); + } + }; + // execute it immediately + proc.run(); + } + + @Override + public void getChildren(final String path, final boolean watch, + final ChildrenCallback cb, final Object context) { + final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) { + + final ChildrenCallback childCb = new ChildrenCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, + List<String> children) { + BkZooWorker worker = (BkZooWorker) ctx; + if (allowRetry(worker, rc)) { + backOffAndRetry(that, worker.nextRetryWaitTime()); + } else { + cb.processResult(rc, path, context, children); + } + } + + }; + + @Override + void zkRun() { + ZooKeeper zkHandle = zk.get(); + if (null == zkHandle) { + BkZooKeeperClient.super.getChildren(path, watch, childCb, worker); + } else { + zkHandle.getChildren(path, watch, childCb, worker); + } + } + + @Override + public String toString() { + return String.format("getChildren (%s, watcher = %s)", path, watch); + } + }; + // execute it immediately + proc.run(); + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java new file mode 100644 index 0000000..634a61e --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.zookeeper; + +import com.google.common.util.concurrent.RateLimiter; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.util.MathUtils; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provide a mechanism to perform an operation on ZooKeeper that is safe on disconnections + * and recoverable errors. + * + * TODO: Remove this class once BK-4.8 is released to include read-only in ZooKeeperClient. + */ +class BkZooWorker { + + private static final Logger logger = LoggerFactory.getLogger(BkZooWorker.class); + + int attempts = 0; + long startTimeNanos; + long elapsedTimeMs = 0L; + final RetryPolicy retryPolicy; + final OpStatsLogger statsLogger; + + BkZooWorker(RetryPolicy retryPolicy, OpStatsLogger statsLogger) { + this.retryPolicy = retryPolicy; + this.statsLogger = statsLogger; + this.startTimeNanos = MathUtils.nowInNano(); + } + + public boolean allowRetry(int rc) { + elapsedTimeMs = MathUtils.elapsedMSec(startTimeNanos); + if (!BkZooWorker.isRecoverableException(rc)) { + if (KeeperException.Code.OK.intValue() == rc) { + statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + } else { + statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + } + return false; + } + ++attempts; + return retryPolicy.allowRetry(attempts, elapsedTimeMs); + } + + public long nextRetryWaitTime() { + return retryPolicy.nextRetryWaitTime(attempts, elapsedTimeMs); + } + + /** + * Check whether the given result code is recoverable by retry. + * + * @param rc result code + * @return true if given result code is recoverable. + */ + public static boolean isRecoverableException(int rc) { + return KeeperException.Code.CONNECTIONLOSS.intValue() == rc + || KeeperException.Code.OPERATIONTIMEOUT.intValue() == rc + || KeeperException.Code.SESSIONMOVED.intValue() == rc + || KeeperException.Code.SESSIONEXPIRED.intValue() == rc; + } + + /** + * Check whether the given exception is recoverable by retry. + * + * @param exception given exception + * @return true if given exception is recoverable. + */ + public static boolean isRecoverableException(KeeperException exception) { + return isRecoverableException(exception.code().intValue()); + } + + interface ZooCallable<T> { + /** + * Be compatible with ZooKeeper interface. + * + * @return value + * @throws InterruptedException + * @throws KeeperException + */ + T call() throws InterruptedException, KeeperException; + } + + /** + * Execute a sync zookeeper operation with a given retry policy. + * + * @param client + * ZooKeeper client. + * @param proc + * Synchronous zookeeper operation wrapped in a {@link Callable}. + * @param retryPolicy + * Retry policy to execute the synchronous operation. + * @param rateLimiter + * Rate limiter for zookeeper calls + * @param statsLogger + * Stats Logger for zookeeper client. + * @return result of the zookeeper operation + * @throws KeeperException any non-recoverable exception or recoverable exception exhausted all retires. + * @throws InterruptedException the operation is interrupted. + */ + public static<T> T syncCallWithRetries(BkZooKeeperClient client, + ZooCallable<T> proc, + RetryPolicy retryPolicy, + RateLimiter rateLimiter, + OpStatsLogger statsLogger) + throws KeeperException, InterruptedException { + T result = null; + boolean isDone = false; + int attempts = 0; + long startTimeNanos = MathUtils.nowInNano(); + while (!isDone) { + try { + if (null != client) { + client.waitForConnection(); + } + logger.debug("Execute {} at {} retry attempt.", proc, attempts); + if (null != rateLimiter) { + rateLimiter.acquire(); + } + result = proc.call(); + isDone = true; + statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + } catch (KeeperException e) { + ++attempts; + boolean rethrow = true; + long elapsedTime = MathUtils.elapsedMSec(startTimeNanos); + if (((null != client && isRecoverableException(e)) || null == client) + && retryPolicy.allowRetry(attempts, elapsedTime)) { + rethrow = false; + } + if (rethrow) { + statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS); + logger.debug("Stopped executing {} after {} attempts.", proc, attempts); + throw e; + } + TimeUnit.MILLISECONDS.sleep(retryPolicy.nextRetryWaitTime(attempts, elapsedTime)); + } + } + return result; + } + +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index c77ace2..e798077 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -21,6 +21,13 @@ package org.apache.pulsar.proxy.server; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; + import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; @@ -39,20 +46,11 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; -import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; -import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService; import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.AdaptiveRecvByteBufAllocator; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Pulsar proxy service */ @@ -78,8 +76,6 @@ public class ProxyService implements Closeable { private BrokerDiscoveryProvider discoveryProvider; - private LocalZooKeeperConnectionService localZooKeeperConnectionService; - protected final AtomicReference<Semaphore> lookupRequestSemaphore; private static final int numThreads = Runtime.getRuntime().availableProcessors(); @@ -124,15 +120,6 @@ public class ProxyService implements Closeable { authenticationService = new AuthenticationService(serviceConfiguration); if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getGlobalZookeeperServers())) { - localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), - proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs()); - localZooKeeperConnectionService.start(new ShutdownService() { - @Override - public void shutdown(int exitCode) { - LOG.error("Lost local ZK session. Shutting down the proxy"); - Runtime.getRuntime().halt(-1); - } - }); discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory()); this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache); authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService); @@ -182,9 +169,6 @@ public class ProxyService implements Closeable { } public void close() throws IOException { - if (localZooKeeperConnectionService != null) { - localZooKeeperConnectionService.close(); - } if (discoveryProvider != null) { discoveryProvider.close(); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java new file mode 100644 index 0000000..69452fc --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server.util; + +import static org.apache.bookkeeper.util.SafeRunnable.safeRun; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.zookeeper.BkZooKeeperClient; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; + +@Slf4j +public class BkReadOnlyZookeeperClientFactoryImpl implements ZooKeeperClientFactory { + + private final OrderedExecutor executor; + + public BkReadOnlyZookeeperClientFactoryImpl(OrderedExecutor executor) { + this.executor = executor; + } + + @Override + public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType, int zkSessionTimeoutMillis) { + CompletableFuture<ZooKeeper> future = new CompletableFuture<>(); + + executor.execute(safeRun(() -> { + try { + ZooKeeper zk = BkZooKeeperClient.newBuilder().connectString(serverList) + .sessionTimeoutMs(zkSessionTimeoutMillis) + .connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(zkSessionTimeoutMillis, + zkSessionTimeoutMillis, 0)) + .allowReadOnlyMode(sessionType == SessionType.AllowReadOnly).build(); + + if (zk.getState() == States.CONNECTEDREADONLY && sessionType != SessionType.AllowReadOnly) { + future.completeExceptionally(new IllegalStateException("Cannot use a read-only session")); + } + + log.info("ZooKeeper session established: {}", zk); + future.complete(zk); + } catch (IOException | KeeperException | InterruptedException exception) { + log.error("Failed to establish ZooKeeper session: {}", exception.getMessage()); + future.completeExceptionally(exception); + } + }, throwable -> { + future.completeExceptionally(throwable); + })); + + return future; + } + +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java index 7822347..887c5ea 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java @@ -19,6 +19,7 @@ package org.apache.pulsar.proxy.server.util; import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -28,12 +29,13 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; import org.apache.pulsar.zookeeper.LocalZooKeeperCache; -import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService; import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; +import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,13 +45,11 @@ import org.slf4j.LoggerFactory; */ public class ZookeeperCacheLoader implements Closeable { + private final ZooKeeper zkClient; private final ZooKeeperCache localZkCache; - private final LocalZooKeeperConnectionService localZkConnectionSvc; - private final ZooKeeperDataCache<LoadManagerReport> brokerInfo; private final ZooKeeperChildrenCache availableBrokersCache; - private volatile Set<String> availableBrokersSet; private volatile List<LoadManagerReport> availableBrokers; private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8) @@ -63,22 +63,9 @@ public class ZookeeperCacheLoader implements Closeable { * @param zookeeperServers * @throws Exception */ - public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zookeeperServers, - int zookeeperSessionTimeoutMs) throws Exception { - localZkConnectionSvc = new LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers, - zookeeperSessionTimeoutMs); - localZkConnectionSvc.start(exitCode -> { - log.error("Shutting down ZK sessions: {}", exitCode); - }); - - this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor); - localZkConnectionSvc.start(exitCode -> { - try { - localZkCache.getZooKeeper().close(); - } catch (InterruptedException e) { - log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e); - } - }); + public ZookeeperCacheLoader(ZooKeeperClientFactory factory, String zookeeperServers, int zookeeperSessionTimeoutMs) throws Exception { + this.zkClient = factory.create(zookeeperServers, SessionType.AllowReadOnly, zookeeperSessionTimeoutMs).get(); + this.localZkCache = new LocalZooKeeperCache(zkClient, this.orderedExecutor); this.brokerInfo = new ZooKeeperDataCache<LoadManagerReport>(localZkCache) { @Override @@ -113,7 +100,13 @@ public class ZookeeperCacheLoader implements Closeable { } @Override - public void close() { + public void close() throws IOException { + try { + zkClient.close(); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new IOException(e); + } orderedExecutor.shutdown(); } -- To stop receiving notification emails like this one, please contact mme...@apache.org.