[CURATOR-223] Add executorService methods to ServiceCacheBuilder Add executorService methods to ServiceCacheBuilder to allow the caller to specify an ExecutorService or a CloseableExecutorService to be used by the PathChildrenCache embedded in ServiceCacheImpl.
Extracts ExecuteCalledWatchingExecutorService (and DelegatingExecutorService) into the curator-test module for use by TestServiceCache. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6ca77776 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6ca77776 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6ca77776 Branch: refs/heads/CURATOR-3.0 Commit: 6ca77776d3d2c71b1e541c0edd60d2c17efe9c66 Parents: 20e92a5 Author: Tom Dyas <td...@foursquare.com> Authored: Tue Jun 16 17:38:18 2015 -0400 Committer: Tom Dyas <td...@foursquare.com> Committed: Wed Jun 17 13:03:17 2015 -0400 ---------------------------------------------------------------------- .../recipes/cache/TestPathChildrenCache.java | 124 +------------------ .../curator/test/DelegatingExecutorService.java | 119 ++++++++++++++++++ .../ExecuteCalledWatchingExecutorService.java | 48 +++++++ .../x/discovery/ServiceCacheBuilder.java | 24 +++- .../details/ServiceCacheBuilderImpl.java | 39 +++++- .../x/discovery/details/ServiceCacheImpl.java | 17 ++- .../curator/x/discovery/TestServiceCache.java | 53 ++++++++ 7 files changed, 297 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java index b904bdc..216660f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.KillSession; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; @@ -1039,127 +1040,4 @@ public class TestPathChildrenCache extends BaseClassForTests CloseableUtils.closeQuietly(client); } } - - public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService - { - boolean executeCalled = false; - - public ExecuteCalledWatchingExecutorService(ExecutorService delegate) - { - super(delegate); - } - - @Override - public synchronized void execute(Runnable command) - { - executeCalled = true; - super.execute(command); - } - - public synchronized boolean isExecuteCalled() - { - return executeCalled; - } - - public synchronized void setExecuteCalled(boolean executeCalled) - { - this.executeCalled = executeCalled; - } - } - - public static class DelegatingExecutorService implements ExecutorService - { - private final ExecutorService delegate; - - public DelegatingExecutorService( - ExecutorService delegate - ) - { - this.delegate = delegate; - } - - - @Override - public void shutdown() - { - delegate.shutdown(); - } - - @Override - public List<Runnable> shutdownNow() - { - return delegate.shutdownNow(); - } - - @Override - public boolean isShutdown() - { - return delegate.isShutdown(); - } - - @Override - public boolean isTerminated() - { - return delegate.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException - { - return delegate.awaitTermination(timeout, unit); - } - - @Override - public <T> Future<T> submit(Callable<T> task) - { - return delegate.submit(task); - } - - @Override - public <T> Future<T> submit(Runnable task, T result) - { - return delegate.submit(task, result); - } - - @Override - public Future<?> submit(Runnable task) - { - return delegate.submit(task); - } - - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException - { - return delegate.invokeAll(tasks); - } - - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException - { - return delegate.invokeAll(tasks, timeout, unit); - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks) - throws InterruptedException, ExecutionException - { - return delegate.invokeAny(tasks); - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException - { - return delegate.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) - { - delegate.execute(command); - } - } } http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java new file mode 100644 index 0000000..eff34dd --- /dev/null +++ b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.test; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; + +public class DelegatingExecutorService implements ExecutorService +{ + private final ExecutorService delegate; + + public DelegatingExecutorService( + ExecutorService delegate + ) + { + this.delegate = delegate; + } + + + @Override + public void shutdown() + { + delegate.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() + { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException + { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public <T> Future<T> submit(Callable<T> task) + { + return delegate.submit(task); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) + { + return delegate.submit(task, result); + } + + @Override + public Future<?> submit(Runnable task) + { + return delegate.submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException + { + return delegate.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException + { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException + { + return delegate.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) + { + delegate.execute(command); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java new file mode 100644 index 0000000..da7bc66 --- /dev/null +++ b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.test; + +import java.util.concurrent.ExecutorService; + +public class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService +{ + boolean executeCalled = false; + + public ExecuteCalledWatchingExecutorService(ExecutorService delegate) + { + super(delegate); + } + + @Override + public synchronized void execute(Runnable command) + { + executeCalled = true; + super.execute(command); + } + + public synchronized boolean isExecuteCalled() + { + return executeCalled; + } + + public synchronized void setExecuteCalled(boolean executeCalled) + { + this.executeCalled = executeCalled; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java index 10ce305..290d9b1 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java @@ -18,6 +18,8 @@ */ package org.apache.curator.x.discovery; +import org.apache.curator.utils.CloseableExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; public interface ServiceCacheBuilder<T> @@ -38,10 +40,30 @@ public interface ServiceCacheBuilder<T> public ServiceCacheBuilder<T> name(String name); /** - * Optional thread factory to use for the cache's internal thread + * Optional thread factory to use for the cache's internal thread. The specified ExecutorService + * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder. * * @param threadFactory factory * @return this */ public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory); + + /** + * Optional ExecutorService to use for the cache's background thread. The specified ExecutorService + * will be wrapped in a CloseableExecutorService and overrides any prior ThreadFactory or ExecutorService + * set on the ServiceCacheBuilder. + * + * @param executorService executor service + * @return this + */ + public ServiceCacheBuilder<T> executorService(ExecutorService executorService); + + /** + * Optional CloseableExecutorService to use for the cache's background thread. The specified ExecutorService + * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder. + * + * @param executorService an instance of CloseableExecutorService + * @return this + */ + public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService); } http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java index c4104f4..8922233 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java @@ -18,8 +18,10 @@ */ package org.apache.curator.x.discovery.details; +import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceCacheBuilder; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; /** @@ -30,6 +32,7 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T> private ServiceDiscoveryImpl<T> discovery; private String name; private ThreadFactory threadFactory; + private CloseableExecutorService executorService; ServiceCacheBuilderImpl(ServiceDiscoveryImpl<T> discovery) { @@ -44,7 +47,14 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T> @Override public ServiceCache<T> build() { - return new ServiceCacheImpl<T>(discovery, name, threadFactory); + if (executorService != null) + { + return new ServiceCacheImpl<T>(discovery, name, executorService); + } + else + { + return new ServiceCacheImpl<T>(discovery, name, threadFactory); + } } /** @@ -70,6 +80,33 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T> public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; + this.executorService = null; + return this; + } + + /** + * Optional executor service to use for the cache's background thread + * + * @param executorService executor service + * @return this + */ + @Override + public ServiceCacheBuilder<T> executorService(ExecutorService executorService) { + this.executorService = new CloseableExecutorService(executorService); + this.threadFactory = null; + return this; + } + + /** + * Optional CloseableExecutorService to use for the cache's background thread + * + * @param executorService an instance of CloseableExecutorService + * @return this + */ + @Override + public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService) { + this.executorService = executorService; + this.threadFactory = null; return this; } } http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java index 0269d24..b8f39d5 100644 --- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java +++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.ListenerContainer; @@ -36,6 +37,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; @@ -54,15 +56,26 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi STOPPED } + private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory) + { + Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null"); + return new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory)); + } + ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory) { + this(discovery, name, convertThreadFactory(threadFactory)); + } + + ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, CloseableExecutorService executorService) + { Preconditions.checkNotNull(discovery, "discovery cannot be null"); Preconditions.checkNotNull(name, "name cannot be null"); - Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null"); + Preconditions.checkNotNull(executorService, "executorService cannot be null"); this.discovery = discovery; - cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, threadFactory); + cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService); cache.getListenable().addListener(this); } http://git-wip-us.apache.org/repos/asf/curator/blob/6ca77776/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java ---------------------------------------------------------------------- diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java index be114d4..5850961 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java @@ -20,6 +20,7 @@ package org.apache.curator.x.discovery; import com.google.common.collect.Lists; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.ExecuteCalledWatchingExecutorService; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; @@ -35,6 +36,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -255,4 +257,55 @@ public class TestServiceCache extends BaseClassForTests } } } + + @Test + public void testExecutorServiceIsInvoked() throws Exception { + List<Closeable> closeables = Lists.newArrayList(); + try { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + closeables.add(client); + client.start(); + + ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build(); + closeables.add(discovery); + discovery.start(); + + ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor()); + Assert.assertFalse(exec.isExecuteCalled()); + + ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").executorService(exec).build(); + closeables.add(cache); + cache.start(); + + final Semaphore semaphore = new Semaphore(0); + ServiceCacheListener listener = new ServiceCacheListener() + { + @Override + public void cacheChanged() + { + semaphore.release(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + } + }; + cache.addListener(listener); + + ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build(); + discovery.registerService(instance1); + Assert.assertTrue(semaphore.tryAcquire(10, TimeUnit.SECONDS)); + + Assert.assertTrue(exec.isExecuteCalled()); + } + finally + { + Collections.reverse(closeables); + for ( Closeable c : closeables ) + { + CloseableUtils.closeQuietly(c); + } + } + } }