Repository: hadoop Updated Branches: refs/heads/branch-2 a942d968c -> b77239b46
Revert "HDFS-11114. Support for running async disk checks in DataNode." This reverts commit 732eaadddbe9a2682bf96ba06ce5e7ebfa58ba87. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b77239b4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b77239b4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b77239b4 Branch: refs/heads/branch-2 Commit: b77239b46d5918b3a3f4f7032e6c20a2b4fcbe56 Parents: a942d96 Author: Arpit Agarwal <a...@apache.org> Authored: Tue Nov 8 09:04:58 2016 -0800 Committer: Arpit Agarwal <a...@apache.org> Committed: Tue Nov 8 09:04:58 2016 -0800 ---------------------------------------------------------------------- .../server/datanode/checker/AsyncChecker.java | 63 ----- .../hdfs/server/datanode/checker/Checkable.java | 49 ---- .../datanode/checker/ThrottledAsyncChecker.java | 224 --------------- .../server/datanode/checker/package-info.java | 26 -- .../checker/TestThrottledAsyncChecker.java | 276 ------------------- 5 files changed, 638 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java deleted file mode 100644 index 1d534a3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode.checker; - -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * A class that can be used to schedule an asynchronous check on a given - * {@link Checkable}. If the check is successfully scheduled then a - * {@link ListenableFuture} is returned. - * - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public interface AsyncChecker<K, V> { - - /** - * Schedule an asynchronous check for the given object. - * - * @param target object to be checked. - * - * @param context the interpretation of the context depends on the - * target. - * - * @return returns a {@link ListenableFuture} that can be used to - * retrieve the result of the asynchronous check. - */ - ListenableFuture<V> schedule(Checkable<K, V> target, K context); - - /** - * Cancel all executing checks and wait for them to complete. - * First attempts a graceful cancellation, then cancels forcefully. - * Waits for the supplied timeout after both attempts. - * - * See {@link ExecutorService#awaitTermination} for a description of - * the parameters. - * - * @throws InterruptedException - */ - void shutdownAndWait(long timeout, TimeUnit timeUnit) - throws InterruptedException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java deleted file mode 100644 index 833ebda..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode.checker; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - - -/** - * A Checkable is an object whose health can be probed by invoking its - * {@link #check} method. - * - * e.g. a {@link Checkable} instance may represent a single hardware - * resource. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public interface Checkable<K, V> { - - /** - * Query the health of this object. This method may hang - * indefinitely depending on the status of the target resource. - * - * @param context for the probe operation. May be null depending - * on the implementation. - * - * @return result of the check operation. - * - * @throws Exception encountered during the check operation. An - * exception indicates that the check failed. - */ - V check(K context) throws Exception; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java deleted file mode 100644 index d0ee3d2..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode.checker; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.util.Timer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.HashMap; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * An implementation of {@link AsyncChecker} that skips checking recently - * checked objects. It will enforce at least {@link minMsBetweenChecks} - * milliseconds between two successive checks of any one object. - * - * It is assumed that the total number of Checkable objects in the system - * is small, (not more than a few dozen) since the checker uses O(Checkables) - * storage and also potentially O(Checkables) threads. - * - * {@link minMsBetweenChecks} should be configured reasonably - * by the caller to avoid spinning up too many threads frequently. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> { - public static final Logger LOG = - LoggerFactory.getLogger(ThrottledAsyncChecker.class); - - private final Timer timer; - - /** - * The ExecutorService used to schedule asynchronous checks. - */ - private final ListeningExecutorService executorService; - - /** - * The minimum gap in milliseconds between two successive checks - * of the same object. This is the throttle. - */ - private final long minMsBetweenChecks; - - /** - * Map of checks that are currently in progress. Protected by the object - * lock. - */ - private final Map<Checkable, ListenableFuture<V>> checksInProgress; - - /** - * Maps Checkable objects to a future that can be used to retrieve - * the results of the operation. - * Protected by the object lock. - */ - private final Map<Checkable, LastCheckResult<V>> completedChecks; - - ThrottledAsyncChecker(final Timer timer, - final long minMsBetweenChecks, - final ExecutorService executorService) { - this.timer = timer; - this.minMsBetweenChecks = minMsBetweenChecks; - this.executorService = MoreExecutors.listeningDecorator(executorService); - this.checksInProgress = new HashMap<>(); - this.completedChecks = new WeakHashMap<>(); - } - - /** - * See {@link AsyncChecker#schedule} - * - * If the object has been checked recently then the check will - * be skipped. Multiple concurrent checks for the same object - * will receive the same Future. - */ - @Override - public synchronized ListenableFuture<V> schedule( - final Checkable<K, V> target, - final K context) { - LOG.debug("Scheduling a check of {}", target); - - if (checksInProgress.containsKey(target)) { - return checksInProgress.get(target); - } - - if (completedChecks.containsKey(target)) { - final LastCheckResult<V> result = completedChecks.get(target); - final long msSinceLastCheck = timer.monotonicNow() - result.completedAt; - if (msSinceLastCheck < minMsBetweenChecks) { - LOG.debug("Skipped checking {}. Time since last check {}ms " + - "is less than the min gap {}ms.", - target, msSinceLastCheck, minMsBetweenChecks); - return result.result != null ? - Futures.immediateFuture(result.result) : - Futures.immediateFailedFuture(result.exception); - } - } - - final ListenableFuture<V> lf = executorService.submit( - new Callable<V>() { - @Override - public V call() throws Exception { - return target.check(context); - } - }); - checksInProgress.put(target, lf); - addResultCachingCallback(target, lf); - return lf; - } - - /** - * Register a callback to cache the result of a check. - * @param target - * @param lf - */ - private void addResultCachingCallback( - Checkable<K, V> target, ListenableFuture<V> lf) { - Futures.addCallback(lf, new FutureCallback<V>() { - @Override - public void onSuccess(@Nullable V result) { - synchronized (ThrottledAsyncChecker.this) { - checksInProgress.remove(target); - completedChecks.put(target, new LastCheckResult<>( - result, timer.monotonicNow())); - } - } - - @Override - public void onFailure(@Nonnull Throwable t) { - synchronized (ThrottledAsyncChecker.this) { - checksInProgress.remove(target); - completedChecks.put(target, new LastCheckResult<>( - t, timer.monotonicNow())); - } - } - }); - } - - /** - * {@inheritDoc}. - */ - @Override - public void shutdownAndWait(long timeout, TimeUnit timeUnit) - throws InterruptedException { - // Try orderly shutdown. - executorService.shutdown(); - - if (!executorService.awaitTermination(timeout, timeUnit)) { - // Interrupt executing tasks and wait again. - executorService.shutdownNow(); - executorService.awaitTermination(timeout, timeUnit); - } - } - - /** - * Status of running a check. It can either be a result or an - * exception, depending on whether the check completed or threw. - */ - private static final class LastCheckResult<V> { - /** - * Timestamp at which the check completed. - */ - private final long completedAt; - - /** - * Result of running the check if it completed. null if it threw. - */ - @Nullable - private final V result; - - /** - * Exception thrown by the check. null if it returned a result. - */ - private final Throwable exception; // null on success. - - /** - * Initialize with a result. - * @param result - */ - private LastCheckResult(V result, long completedAt) { - this.result = result; - this.exception = null; - this.completedAt = completedAt; - } - - /** - * Initialize with an exception. - * @param completedAt - * @param t - */ - private LastCheckResult(Throwable t, long completedAt) { - this.result = null; - this.exception = t; - this.completedAt = completedAt; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java deleted file mode 100644 index 52822e9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Datanode support for running disk checks. - */ -@InterfaceAudience.LimitedPrivate({"HDFS"}) -@InterfaceStability.Evolving -package org.apache.hadoop.hdfs.server.datanode.checker; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java deleted file mode 100644 index 70795ca..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode.checker; - -import com.google.common.base.Supplier; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.FakeTimer; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.core.Is.isA; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Verify functionality of {@link ThrottledAsyncChecker}. - */ -public class TestThrottledAsyncChecker { - public static final Logger LOG = - LoggerFactory.getLogger(TestThrottledAsyncChecker.class); - private static final long MIN_ERROR_CHECK_GAP = 1000; - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - /** - * Test various scheduling combinations to ensure scheduling and - * throttling behave as expected. - */ - @Test(timeout=60000) - public void testScheduler() throws Exception { - final NoOpCheckable target1 = new NoOpCheckable(); - final NoOpCheckable target2 = new NoOpCheckable(); - final FakeTimer timer = new FakeTimer(); - ThrottledAsyncChecker<Boolean, Boolean> checker = - new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, - getExecutorService()); - - // check target1 and ensure we get back the expected result. - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); - - // Check target1 again without advancing the timer. target1 should not - // be checked again and the cached result should be returned. - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); - - // Schedule target2 scheduled without advancing the timer. - // target2 should be checked as it has never been checked before. - assertTrue(checker.schedule(target2, true).get()); - assertThat(target2.numChecks.get(), is(1L)); - - // Advance the timer but just short of the min gap. - // Neither target1 nor target2 should be checked again. - timer.advance(MIN_ERROR_CHECK_GAP - 1); - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); - assertTrue(checker.schedule(target2, true).get()); - assertThat(target2.numChecks.get(), is(1L)); - - // Advance the timer again. - // Both targets should be checked now. - timer.advance(MIN_ERROR_CHECK_GAP); - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(2L)); - assertTrue(checker.schedule(target2, true).get()); - assertThat(target1.numChecks.get(), is(2L)); - } - - @Test (timeout=60000) - public void testCancellation() throws Exception { - LatchedCheckable target = new LatchedCheckable(); - final FakeTimer timer = new FakeTimer(); - final LatchedCallback callback = new LatchedCallback(target); - ThrottledAsyncChecker<Boolean, Boolean> checker = - new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, - getExecutorService()); - - ListenableFuture<Boolean> lf = checker.schedule(target, true); - Futures.addCallback(lf, callback); - - // Request immediate cancellation. - checker.shutdownAndWait(0, TimeUnit.MILLISECONDS); - try { - assertFalse(lf.get()); - fail("Failed to get expected InterruptedException"); - } catch (ExecutionException ee) { - assertTrue(ee.getCause() instanceof InterruptedException); - } - callback.failureLatch.await(); - } - - @Test (timeout=60000) - public void testConcurrentChecks() throws Exception { - LatchedCheckable target = new LatchedCheckable(); - final FakeTimer timer = new FakeTimer(); - ThrottledAsyncChecker<Boolean, Boolean> checker = - new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, - getExecutorService()); - final ListenableFuture<Boolean> lf1 = checker.schedule(target, true); - final ListenableFuture<Boolean> lf2 = checker.schedule(target, true); - - // Ensure that concurrent requests return the same future object. - assertTrue(lf1 == lf2); - - // Unblock the latch and wait for it to finish execution. - target.latch.countDown(); - lf1.get(); - - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - // We should not get back the same future as before. - // This can take a short while until the internal callback in - // ThrottledAsyncChecker is scheduled for execution. - // Also this should not trigger a new check operation as the timer - // was not advanced. If it does trigger a new check then the test - // will fail with a timeout. - final ListenableFuture<Boolean> lf3 = checker.schedule(target, true); - return lf3 != lf2; - } - }, 100, 10000); - } - - /** - * Ensure that the context is passed through to the Checkable#check - * method. - * @throws Exception - */ - @Test(timeout=60000) - public void testContextIsPassed() throws Exception { - final NoOpCheckable target1 = new NoOpCheckable(); - final FakeTimer timer = new FakeTimer(); - ThrottledAsyncChecker<Boolean, Boolean> checker = - new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, - getExecutorService()); - - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); - timer.advance(MIN_ERROR_CHECK_GAP + 1); - assertFalse(checker.schedule(target1, false).get()); - assertThat(target1.numChecks.get(), is(2L)); - } - - /** - * Ensure that the exeption from a failed check is cached - * and returned without re-running the check when the minimum - * gap has not elapsed. - * - * @throws Exception - */ - @Test(timeout=60000) - public void testExceptionCaching() throws Exception { - final ThrowingCheckable target1 = new ThrowingCheckable(); - final FakeTimer timer = new FakeTimer(); - ThrottledAsyncChecker<Boolean, Boolean> checker = - new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, - getExecutorService()); - - thrown.expectCause(isA(DummyException.class)); - checker.schedule(target1, true).get(); - assertThat(target1.numChecks.get(), is(1L)); - - thrown.expectCause(isA(DummyException.class)); - checker.schedule(target1, true).get(); - assertThat(target1.numChecks.get(), is(2L)); - } - - /** - * A simple ExecutorService for testing. - */ - private ExecutorService getExecutorService() { - return new ScheduledThreadPoolExecutor(1); - } - - /** - * A Checkable that just returns its input. - */ - private static class NoOpCheckable - implements Checkable<Boolean, Boolean> { - private final AtomicLong numChecks = new AtomicLong(0); - @Override - public Boolean check(Boolean context) { - numChecks.incrementAndGet(); - return context; - } - } - - private static class ThrowingCheckable - implements Checkable<Boolean, Boolean> { - private final AtomicLong numChecks = new AtomicLong(0); - @Override - public Boolean check(Boolean context) throws DummyException { - numChecks.incrementAndGet(); - throw new DummyException(); - } - - } - - private static class DummyException extends Exception { - } - - /** - * A checkable that hangs until signaled. - */ - private static class LatchedCheckable - implements Checkable<Boolean, Boolean> { - private final CountDownLatch latch = new CountDownLatch(1); - - @Override - public Boolean check(Boolean ignored) throws InterruptedException { - LOG.info("LatchedCheckable {} waiting.", this); - latch.await(); - return true; // Unreachable. - } - } - - /** - * A {@link FutureCallback} that counts its invocations. - */ - private static final class LatchedCallback - implements FutureCallback<Boolean> { - private final CountDownLatch successLatch = new CountDownLatch(1); - private final CountDownLatch failureLatch = new CountDownLatch(1); - private final Checkable target; - - private LatchedCallback(Checkable target) { - this.target = target; - } - - @Override - public void onSuccess(@Nonnull Boolean result) { - LOG.info("onSuccess callback invoked for {}", target); - successLatch.countDown(); - } - - @Override - public void onFailure(@Nonnull Throwable t) { - LOG.info("onFailure callback invoked for {} with exception", target, t); - failureLatch.countDown(); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org