Updated Branches: refs/heads/2.0.1-incubating 11ae23adc -> 6e3c9e27b
temp checkin Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/86b82ab6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/86b82ab6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/86b82ab6 Branch: refs/heads/2.0.1-incubating Commit: 86b82ab61d951429121e2f869bc00ff4cede9407 Parents: 97cda39 Author: randgalt <randg...@apache.org> Authored: Mon May 6 11:56:19 2013 -0700 Committer: randgalt <randg...@apache.org> Committed: Mon May 6 11:56:19 2013 -0700 ---------------------------------------------------------------------- .../curator/utils/CloseableExecutorService.java | 28 ++ .../utils/CloseableExecutorServiceBase.java | 124 +++++++ .../utils/CloseableScheduledExecutorService.java | 100 +++++ .../org/apache/curator/utils/FutureContainer.java | 91 +++++ .../utils/TestCloseableExecutorService.java | 282 +++++++++++++++ .../framework/recipes/cache/PathChildrenCache.java | 25 +- .../curator/framework/recipes/locks/Reaper.java | 14 +- 7 files changed, 642 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java new file mode 100644 index 0000000..cf92ef4 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java @@ -0,0 +1,28 @@ +package org.apache.curator.utils; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.ExecutorService; + +/** + * Decorates an {@link ExecutorService} such that submitted tasks + * are recorded and can be closed en masse. + */ +public class CloseableExecutorService extends CloseableExecutorServiceBase +{ + private final ListeningExecutorService executorService; + + /** + * @param executorService the service to decorate + */ + public CloseableExecutorService(ExecutorService executorService) + { + this.executorService = MoreExecutors.listeningDecorator(executorService); + } + + @Override + protected ListeningExecutorService getService() + { + return executorService; + } +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java new file mode 100644 index 0000000..92371d7 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java @@ -0,0 +1,124 @@ +package org.apache.curator.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.io.Closeable; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Decorates an {@link ExecutorService} such that submitted tasks + * are recorded and can be closed en masse. + */ +abstract class CloseableExecutorServiceBase implements Closeable +{ + private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap()); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + protected abstract ListeningExecutorService getService(); + + @Override + public void close() + { + isClosed.set(true); + Iterator<Future<?>> iterator = futures.iterator(); + while ( iterator.hasNext() ) + { + Future<?> future = iterator.next(); + iterator.remove(); + future.cancel(true); + } + } + + /** + * @see ExecutorService#isShutdown() + * @return true/false + */ + public boolean isShutdown() + { + return getService().isShutdown(); + } + + /** + * @see ExecutorService#isTerminated() + * @return true/false + */ + public boolean isTerminated() + { + return getService().isTerminated(); + } + + /** + * Calls {@link ExecutorService#submit(Callable)}, records + * and returns the future + * + * @param task task to submit + * @return the future + */ + public <T> Future<T> submit(Callable<T> task) + { + return record(getService().submit(task)); + } + + /** + * Calls {@link ExecutorService#submit(Runnable)}, records + * and returns the future + * + * @param task task to submit + * @return the future + */ + public Future<?> submit(Runnable task) + { + return record(getService().submit(task)); + } + + @VisibleForTesting + int size() + { + return futures.size(); + } + + protected <T> ScheduledFuture<T> record(final ScheduledFuture<T> future) + { + if ( isClosed.get() ) + { + future.cancel(true); + } + else + { + futures.add(future); + } + return future; + } + + protected <T> Future<T> record(final ListenableFuture<T> future) + { + Runnable listener = new Runnable() + { + @Override + public void run() + { + futures.remove(future); + } + }; + if ( isClosed.get() ) + { + future.cancel(true); + } + else + { + futures.add(future); + future.addListener(listener, MoreExecutors.sameThreadExecutor()); + } + return future; + } +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java new file mode 100644 index 0000000..8638ee6 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java @@ -0,0 +1,100 @@ +package org.apache.curator.utils; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Decorates an {@link ExecutorService} such that submitted tasks + * are recorded and can be closed en masse. + */ +public class CloseableScheduledExecutorService extends CloseableExecutorServiceBase +{ + private final ListeningScheduledExecutorService executorService; + + /** + * @param executorService the service to decorate + */ + public CloseableScheduledExecutorService(ScheduledExecutorService executorService) + { + this.executorService = MoreExecutors.listeningDecorator(executorService); + } + + @Override + protected ListeningExecutorService getService() + { + return executorService; + } + + /** + * Calls {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}, records + * and returns the future + * + * @param command the task to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @return a ScheduledFuture representing pending completion of + * the task and whose <tt>get()</tt> method will return + * <tt>null</tt> upon completion + */ + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) + { + return record(executorService.schedule(command, delay, unit)); + } + + /** + * Calls {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}, records + * and returns the future + * + * @param callable the task to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @return a ScheduledFuture representing pending completion of + * the task and whose <tt>get()</tt> method will return + * <tt>null</tt> upon completion + */ + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) + { + return record(executorService.schedule(callable, delay, unit)); + } + + /** + * Calls {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, records + * and returns the future + * + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param period the period between successive executions + * @param unit the time unit of the initialDelay and period parameters + * @return a ScheduledFuture representing pending completion of + * the task, and whose <tt>get()</tt> method will throw an + * exception upon cancellation + */ + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) + { + return record(executorService.scheduleAtFixedRate(command, initialDelay, period, unit)); + } + + /** + * Calls {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, records + * and returns the future + * + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param delay the delay between the termination of one + * execution and the commencement of the next + * @param unit the time unit of the initialDelay and delay parameters + * @return a ScheduledFuture representing pending completion of + * the task, and whose <tt>get()</tt> method will throw an + * exception upon cancellation + */ + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) + { + return record(executorService.scheduleWithFixedDelay(command, initialDelay, delay, unit)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java new file mode 100644 index 0000000..51fe6a4 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java @@ -0,0 +1,91 @@ +package org.apache.curator.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.io.Closeable; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; + +public class FutureContainer implements Closeable +{ + private final List<Future<?>> futures = Lists.newArrayList(); + private final ExecutorService executorService; + + private class QueueingFuture<T> extends FutureTask<T> + { + private final RunnableFuture<T> task; + + QueueingFuture(RunnableFuture<T> task) + { + super(task, null); + this.task = task; + futures.add(task); + } + + protected void done() + { + futures.remove(task); + } + } + + public FutureContainer(ExecutorService executorService) + { + this.executorService = executorService; + } + + @VisibleForTesting + int size() + { + return futures.size(); + } + + @Override + public void close() + { + Iterator<Future<?>> iterator = futures.iterator(); + while ( iterator.hasNext() ) + { + Future<?> future = iterator.next(); + iterator.remove(); + if ( !future.cancel(true) ) + { + System.err.println("Could not cancel"); + throw new RuntimeException("Could not cancel"); + } + } + } + + /** + * Submits a value-returning task for execution and returns a Future + * representing the pending results of the task. Upon completion, + * this task may be taken or polled. + * + * @param task the task to submit + */ + public<V> void submit(Callable<V> task) + { + FutureTask<V> futureTask = new FutureTask<V>(task); + executorService.execute(new QueueingFuture<V>(futureTask)); + } + + /** + * Submits a Runnable task for execution and returns a Future + * representing that task. Upon completion, this task may be + * taken or polled. + * + * @param task the task to submit + */ + public void submit(Runnable task) + { + FutureTask<Void> futureTask = new FutureTask<Void>(task, null); + executorService.execute(new QueueingFuture<Void>(futureTask)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java new file mode 100644 index 0000000..2cd2901 --- /dev/null +++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java @@ -0,0 +1,282 @@ +package org.apache.curator.utils; + +import com.google.common.collect.Lists; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestCloseableExecutorService +{ + private static final int QTY = 10; + + private volatile ExecutorService executorService; + private volatile AtomicInteger count; + + @BeforeMethod + public void setup() + { + executorService = Executors.newFixedThreadPool(QTY * 2); + count = new AtomicInteger(0); + } + + @AfterMethod + public void tearDown() + { + executorService.shutdownNow(); + } + + @Test + public void testBasicRunnable() throws InterruptedException + { + try + { + FutureContainer service = new FutureContainer(executorService); + CountDownLatch startLatch = new CountDownLatch(QTY); + CountDownLatch latch = new CountDownLatch(QTY); + for ( int i = 0; i < QTY; ++i ) + { + submitRunnable(service, startLatch, latch); + } + + Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size()); + service.close(); + Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size()); + } + catch ( AssertionError e ) + { + throw e; + } + catch ( Throwable e ) + { + e.printStackTrace(); + } + } + + @Test + public void testBasicCallable() throws InterruptedException + { + CloseableExecutorService service = new CloseableExecutorService(executorService); + List<CountDownLatch> latches = Lists.newArrayList(); + for ( int i = 0; i < QTY; ++i ) + { + final CountDownLatch latch = new CountDownLatch(1); + latches.add(latch); + service.submit + ( + new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + finally + { + latch.countDown(); + } + return null; + } + } + ); + } + + service.close(); + for ( CountDownLatch latch : latches ) + { + Assert.assertTrue(latch.await(3, TimeUnit.SECONDS)); + } + } + + @Test + public void testListeningRunnable() throws InterruptedException + { + CloseableExecutorService service = new CloseableExecutorService(executorService); + List<Future<?>> futures = Lists.newArrayList(); + for ( int i = 0; i < QTY; ++i ) + { + Future<?> future = service.submit + ( + new Runnable() + { + @Override + public void run() + { + try + { + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + } + } + ); + futures.add(future); + } + + Thread.sleep(100); + + for ( Future<?> future : futures ) + { + future.cancel(true); + } + + Assert.assertEquals(service.size(), 0); + } + + @Test + public void testListeningCallable() throws InterruptedException + { + CloseableExecutorService service = new CloseableExecutorService(executorService); + List<Future<?>> futures = Lists.newArrayList(); + for ( int i = 0; i < QTY; ++i ) + { + Future<?> future = service.submit + ( + new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + return null; + } + } + ); + futures.add(future); + } + + Thread.sleep(100); + + for ( Future<?> future : futures ) + { + future.cancel(true); + } + + Assert.assertEquals(service.size(), 0); + } + + @Test + public void testPartialRunnable() throws InterruptedException + { + try + { + final CountDownLatch outsideLatch = new CountDownLatch(1); + executorService.submit + ( + new Runnable() + { + @Override + public void run() + { + try + { + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + finally + { + outsideLatch.countDown(); + } + } + } + ); + + FutureContainer service = new FutureContainer(executorService); + CountDownLatch startLatch = new CountDownLatch(QTY); + CountDownLatch latch = new CountDownLatch(QTY); + for ( int i = 0; i < QTY; ++i ) + { + submitRunnable(service, startLatch, latch); + } + + while ( service.size() < QTY ) + { + Thread.sleep(100); + } + + Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size()); + service.close(); + Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size()); + Assert.assertEquals(outsideLatch.getCount(), 1); + } + catch ( AssertionError e ) + { + throw e; + } + catch ( Throwable e ) + { + e.printStackTrace(); + } + finally + { + executorService.shutdownNow(); + } + } + + private void submitRunnable(FutureContainer service, final CountDownLatch startLatch, final CountDownLatch latch) + { + try + { + service.submit + ( + new Runnable() + { + @Override + public void run() + { + try + { + startLatch.countDown(); + count.incrementAndGet(); + Thread.sleep(100000); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + catch ( Throwable e ) + { + e.printStackTrace(); + } + finally + { + // count.decrementAndGet(); + latch.countDown(); + } + } + } + ); + } + catch ( Throwable e ) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index 9b25001..f42039c 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -32,6 +32,7 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableExecutorService; import org.apache.curator.utils.EnsurePath; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; @@ -70,7 +71,7 @@ public class PathChildrenCache implements Closeable private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFramework client; private final String path; - private final ExecutorService executorService; + private final CloseableExecutorService executorService; private final boolean cacheData; private final boolean dataIsCompressed; private final EnsurePath ensurePath; @@ -197,7 +198,7 @@ public class PathChildrenCache implements Closeable this.path = path; this.cacheData = cacheData; this.dataIsCompressed = dataIsCompressed; - this.executorService = executorService; + this.executorService = new CloseableExecutorService(executorService); ensurePath = client.newNamespaceAwareEnsurePath(path); } @@ -261,17 +262,17 @@ public class PathChildrenCache implements Closeable mode = Preconditions.checkNotNull(mode, "mode cannot be null"); client.getConnectionStateListenable().addListener(connectionStateListener); - executorService.execute - ( - new Runnable() + executorService.submit + ( + new Runnable() + { + @Override + public void run() { - @Override - public void run() - { - mainLoop(); - } + mainLoop(); } - ); + } + ); switch ( mode ) { @@ -357,7 +358,7 @@ public class PathChildrenCache implements Closeable Preconditions.checkState(!executorService.isShutdown(), "has not been started"); client.getConnectionStateListenable().removeListener(connectionStateListener); - executorService.shutdownNow(); + executorService.close(); } /** http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java index 11efefd..b540689 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.CloseableScheduledExecutorService; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -42,7 +43,7 @@ public class Reaper implements Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFramework client; - private final ScheduledExecutorService executor; + private final CloseableScheduledExecutorService executor; private final int reapingThresholdMs; private final Set<String> activePaths = Sets.newSetFromMap(Maps.<String, Boolean>newConcurrentMap()); private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); @@ -127,7 +128,7 @@ public class Reaper implements Closeable public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs) { this.client = client; - this.executor = executor; + this.executor = new CloseableScheduledExecutorService(executor); this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD; } @@ -181,14 +182,7 @@ public class Reaper implements Closeable { if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { - try - { - executor.shutdownNow(); - } - catch ( Exception e ) - { - log.error("Canceling task", e); - } + executor.close(); } }