Repository: hadoop
Updated Branches:
  refs/heads/branch-2 a942d968c -> b77239b46


Revert "HDFS-11114. Support for running async disk checks in DataNode."

This reverts commit 732eaadddbe9a2682bf96ba06ce5e7ebfa58ba87.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b77239b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b77239b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b77239b4

Branch: refs/heads/branch-2
Commit: b77239b46d5918b3a3f4f7032e6c20a2b4fcbe56
Parents: a942d96
Author: Arpit Agarwal <a...@apache.org>
Authored: Tue Nov 8 09:04:58 2016 -0800
Committer: Arpit Agarwal <a...@apache.org>
Committed: Tue Nov 8 09:04:58 2016 -0800

----------------------------------------------------------------------
 .../server/datanode/checker/AsyncChecker.java   |  63 -----
 .../hdfs/server/datanode/checker/Checkable.java |  49 ----
 .../datanode/checker/ThrottledAsyncChecker.java | 224 ---------------
 .../server/datanode/checker/package-info.java   |  26 --
 .../checker/TestThrottledAsyncChecker.java      | 276 -------------------
 5 files changed, 638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
deleted file mode 100644
index 1d534a3..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.checker;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A class that can be used to schedule an asynchronous check on a given
- * {@link Checkable}. If the check is successfully scheduled then a
- * {@link ListenableFuture} is returned.
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public interface AsyncChecker<K, V> {
-
-  /**
-   * Schedule an asynchronous check for the given object.
-   *
-   * @param target object to be checked.
-   *
-   * @param context the interpretation of the context depends on the
-   *                target.
-   *
-   * @return returns a {@link ListenableFuture} that can be used to
-   *         retrieve the result of the asynchronous check.
-   */
-  ListenableFuture<V> schedule(Checkable<K, V> target, K context);
-
-  /**
-   * Cancel all executing checks and wait for them to complete.
-   * First attempts a graceful cancellation, then cancels forcefully.
-   * Waits for the supplied timeout after both attempts.
-   *
-   * See {@link ExecutorService#awaitTermination} for a description of
-   * the parameters.
-   *
-   * @throws InterruptedException
-   */
-  void shutdownAndWait(long timeout, TimeUnit timeUnit)
-      throws InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java
deleted file mode 100644
index 833ebda..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.checker;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-
-/**
- * A Checkable is an object whose health can be probed by invoking its
- * {@link #check} method.
- *
- * e.g. a {@link Checkable} instance may represent a single hardware
- * resource.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public interface Checkable<K, V> {
-
-  /**
-   * Query the health of this object. This method may hang
-   * indefinitely depending on the status of the target resource.
-   *
-   * @param context for the probe operation. May be null depending
-   *                on the implementation.
-   *
-   * @return result of the check operation.
-   *
-   * @throws Exception encountered during the check operation. An
-   *                   exception indicates that the check failed.
-   */
-  V check(K context) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
deleted file mode 100644
index d0ee3d2..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.checker;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.util.Timer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * An implementation of {@link AsyncChecker} that skips checking recently
- * checked objects. It will enforce at least {@link minMsBetweenChecks}
- * milliseconds between two successive checks of any one object.
- *
- * It is assumed that the total number of Checkable objects in the system
- * is small, (not more than a few dozen) since the checker uses O(Checkables)
- * storage and also potentially O(Checkables) threads.
- *
- * {@link minMsBetweenChecks} should be configured reasonably
- * by the caller to avoid spinning up too many threads frequently.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(ThrottledAsyncChecker.class);
-
-  private final Timer timer;
-
-  /**
-   * The ExecutorService used to schedule asynchronous checks.
-   */
-  private final ListeningExecutorService executorService;
-
-  /**
-   * The minimum gap in milliseconds between two successive checks
-   * of the same object. This is the throttle.
-   */
-  private final long minMsBetweenChecks;
-
-  /**
-   * Map of checks that are currently in progress. Protected by the object
-   * lock.
-   */
-  private final Map<Checkable, ListenableFuture<V>> checksInProgress;
-
-  /**
-   * Maps Checkable objects to a future that can be used to retrieve
-   * the results of the operation.
-   * Protected by the object lock.
-   */
-  private final Map<Checkable, LastCheckResult<V>> completedChecks;
-
-  ThrottledAsyncChecker(final Timer timer,
-                        final long minMsBetweenChecks,
-                        final ExecutorService executorService) {
-    this.timer = timer;
-    this.minMsBetweenChecks = minMsBetweenChecks;
-    this.executorService = MoreExecutors.listeningDecorator(executorService);
-    this.checksInProgress = new HashMap<>();
-    this.completedChecks = new WeakHashMap<>();
-  }
-
-  /**
-   * See {@link AsyncChecker#schedule}
-   *
-   * If the object has been checked recently then the check will
-   * be skipped. Multiple concurrent checks for the same object
-   * will receive the same Future.
-   */
-  @Override
-  public synchronized ListenableFuture<V> schedule(
-      final Checkable<K, V> target,
-      final K context) {
-    LOG.debug("Scheduling a check of {}", target);
-
-    if (checksInProgress.containsKey(target)) {
-      return checksInProgress.get(target);
-    }
-
-    if (completedChecks.containsKey(target)) {
-      final LastCheckResult<V> result = completedChecks.get(target);
-      final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
-      if (msSinceLastCheck < minMsBetweenChecks) {
-        LOG.debug("Skipped checking {}. Time since last check {}ms " +
-            "is less than the min gap {}ms.",
-            target, msSinceLastCheck, minMsBetweenChecks);
-        return result.result != null ?
-            Futures.immediateFuture(result.result) :
-            Futures.immediateFailedFuture(result.exception);
-      }
-    }
-
-    final ListenableFuture<V> lf = executorService.submit(
-        new Callable<V>() {
-          @Override
-          public V call() throws Exception {
-            return target.check(context);
-          }
-        });
-    checksInProgress.put(target, lf);
-    addResultCachingCallback(target, lf);
-    return lf;
-  }
-
-  /**
-   * Register a callback to cache the result of a check.
-   * @param target
-   * @param lf
-   */
-  private void addResultCachingCallback(
-      Checkable<K, V> target, ListenableFuture<V> lf) {
-    Futures.addCallback(lf, new FutureCallback<V>() {
-      @Override
-      public void onSuccess(@Nullable V result) {
-        synchronized (ThrottledAsyncChecker.this) {
-          checksInProgress.remove(target);
-          completedChecks.put(target, new LastCheckResult<>(
-              result, timer.monotonicNow()));
-        }
-      }
-
-      @Override
-      public void onFailure(@Nonnull Throwable t) {
-        synchronized (ThrottledAsyncChecker.this) {
-          checksInProgress.remove(target);
-          completedChecks.put(target, new LastCheckResult<>(
-              t, timer.monotonicNow()));
-        }
-      }
-    });
-  }
-
-  /**
-   * {@inheritDoc}.
-   */
-  @Override
-  public void shutdownAndWait(long timeout, TimeUnit timeUnit)
-      throws InterruptedException {
-    // Try orderly shutdown.
-    executorService.shutdown();
-
-    if (!executorService.awaitTermination(timeout, timeUnit)) {
-      // Interrupt executing tasks and wait again.
-      executorService.shutdownNow();
-      executorService.awaitTermination(timeout, timeUnit);
-    }
-  }
-
-  /**
-   * Status of running a check. It can either be a result or an
-   * exception, depending on whether the check completed or threw.
-   */
-  private static final class LastCheckResult<V> {
-    /**
-     * Timestamp at which the check completed.
-     */
-    private final long completedAt;
-
-    /**
-     * Result of running the check if it completed. null if it threw.
-     */
-    @Nullable
-    private final V result;
-
-    /**
-     * Exception thrown by the check. null if it returned a result.
-     */
-    private final Throwable exception; // null on success.
-
-    /**
-     * Initialize with a result.
-     * @param result
-     */
-    private LastCheckResult(V result, long completedAt) {
-      this.result = result;
-      this.exception = null;
-      this.completedAt = completedAt;
-    }
-
-    /**
-     * Initialize with an exception.
-     * @param completedAt
-     * @param t
-     */
-    private LastCheckResult(Throwable t, long completedAt) {
-      this.result = null;
-      this.exception = t;
-      this.completedAt = completedAt;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java
deleted file mode 100644
index 52822e9..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Datanode support for running disk checks.
- */
-@InterfaceAudience.LimitedPrivate({"HDFS"})
-@InterfaceStability.Evolving
-package org.apache.hadoop.hdfs.server.datanode.checker;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b77239b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
deleted file mode 100644
index 70795ca..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.checker;
-
-import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.FakeTimer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.core.Is.isA;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Verify functionality of {@link ThrottledAsyncChecker}.
- */
-public class TestThrottledAsyncChecker {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(TestThrottledAsyncChecker.class);
-  private static final long MIN_ERROR_CHECK_GAP = 1000;
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  /**
-   * Test various scheduling combinations to ensure scheduling and
-   * throttling behave as expected.
-   */
-  @Test(timeout=60000)
-  public void testScheduler() throws Exception {
-    final NoOpCheckable target1 = new NoOpCheckable();
-    final NoOpCheckable target2 = new NoOpCheckable();
-    final FakeTimer timer = new FakeTimer();
-    ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
-                                    getExecutorService());
-
-    // check target1 and ensure we get back the expected result.
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
-
-    // Check target1 again without advancing the timer. target1 should not
-    // be checked again and the cached result should be returned.
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
-
-    // Schedule target2 scheduled without advancing the timer.
-    // target2 should be checked as it has never been checked before.
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target2.numChecks.get(), is(1L));
-
-    // Advance the timer but just short of the min gap.
-    // Neither target1 nor target2 should be checked again.
-    timer.advance(MIN_ERROR_CHECK_GAP - 1);
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target2.numChecks.get(), is(1L));
-
-    // Advance the timer again.
-    // Both targets should be checked now.
-    timer.advance(MIN_ERROR_CHECK_GAP);
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(2L));
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target1.numChecks.get(), is(2L));
-  }
-
-  @Test (timeout=60000)
-  public void testCancellation() throws Exception {
-    LatchedCheckable target = new LatchedCheckable();
-    final FakeTimer timer = new FakeTimer();
-    final LatchedCallback callback = new LatchedCallback(target);
-    ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
-                                    getExecutorService());
-
-    ListenableFuture<Boolean> lf = checker.schedule(target, true);
-    Futures.addCallback(lf, callback);
-
-    // Request immediate cancellation.
-    checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
-    try {
-      assertFalse(lf.get());
-      fail("Failed to get expected InterruptedException");
-    } catch (ExecutionException ee) {
-      assertTrue(ee.getCause() instanceof InterruptedException);
-    }
-    callback.failureLatch.await();
-  }
-
-  @Test (timeout=60000)
-  public void testConcurrentChecks() throws Exception {
-    LatchedCheckable target = new LatchedCheckable();
-    final FakeTimer timer = new FakeTimer();
-    ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
-                                    getExecutorService());
-    final ListenableFuture<Boolean> lf1 = checker.schedule(target, true);
-    final ListenableFuture<Boolean> lf2 = checker.schedule(target, true);
-
-    // Ensure that concurrent requests return the same future object.
-    assertTrue(lf1 == lf2);
-
-    // Unblock the latch and wait for it to finish execution.
-    target.latch.countDown();
-    lf1.get();
-
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        // We should not get back the same future as before.
-        // This can take a short while until the internal callback in
-        // ThrottledAsyncChecker is scheduled for execution.
-        // Also this should not trigger a new check operation as the timer
-        // was not advanced. If it does trigger a new check then the test
-        // will fail with a timeout.
-        final ListenableFuture<Boolean> lf3 = checker.schedule(target, true);
-        return lf3 != lf2;
-      }
-    }, 100, 10000);
-  }
-
-  /**
-   * Ensure that the context is passed through to the Checkable#check
-   * method.
-   * @throws Exception
-   */
-  @Test(timeout=60000)
-  public void testContextIsPassed() throws Exception {
-    final NoOpCheckable target1 = new NoOpCheckable();
-    final FakeTimer timer = new FakeTimer();
-    ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
-            getExecutorService());
-
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
-    timer.advance(MIN_ERROR_CHECK_GAP + 1);
-    assertFalse(checker.schedule(target1, false).get());
-    assertThat(target1.numChecks.get(), is(2L));
-  }
-
-  /**
-   * Ensure that the exeption from a failed check is cached
-   * and returned without re-running the check when the minimum
-   * gap has not elapsed.
-   *
-   * @throws Exception
-   */
-  @Test(timeout=60000)
-  public void testExceptionCaching() throws Exception {
-    final ThrowingCheckable target1 = new ThrowingCheckable();
-    final FakeTimer timer = new FakeTimer();
-    ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
-            getExecutorService());
-
-    thrown.expectCause(isA(DummyException.class));
-    checker.schedule(target1, true).get();
-    assertThat(target1.numChecks.get(), is(1L));
-
-    thrown.expectCause(isA(DummyException.class));
-    checker.schedule(target1, true).get();
-    assertThat(target1.numChecks.get(), is(2L));
-  }
-
-  /**
-   * A simple ExecutorService for testing.
-   */
-  private ExecutorService getExecutorService() {
-    return new ScheduledThreadPoolExecutor(1);
-  }
-
-  /**
-   * A Checkable that just returns its input.
-   */
-  private static class NoOpCheckable
-      implements Checkable<Boolean, Boolean> {
-    private final AtomicLong numChecks = new AtomicLong(0);
-    @Override
-    public Boolean check(Boolean context) {
-      numChecks.incrementAndGet();
-      return context;
-    }
-  }
-
-  private static class ThrowingCheckable
-      implements Checkable<Boolean, Boolean> {
-    private final AtomicLong numChecks = new AtomicLong(0);
-    @Override
-    public Boolean check(Boolean context) throws DummyException {
-      numChecks.incrementAndGet();
-      throw new DummyException();
-    }
-
-  }
-
-  private static class DummyException extends Exception {
-  }
-
-  /**
-   * A checkable that hangs until signaled.
-   */
-  private static class LatchedCheckable
-      implements Checkable<Boolean, Boolean> {
-    private final CountDownLatch latch = new CountDownLatch(1);
-
-    @Override
-    public Boolean check(Boolean ignored) throws InterruptedException {
-      LOG.info("LatchedCheckable {} waiting.", this);
-      latch.await();
-      return true;  // Unreachable.
-    }
-  }
-
-  /**
-   * A {@link FutureCallback} that counts its invocations.
-   */
-  private static final class LatchedCallback
-      implements FutureCallback<Boolean> {
-    private final CountDownLatch successLatch = new CountDownLatch(1);
-    private final CountDownLatch failureLatch = new CountDownLatch(1);
-    private final Checkable target;
-
-    private LatchedCallback(Checkable target) {
-      this.target = target;
-    }
-
-    @Override
-    public void onSuccess(@Nonnull Boolean result) {
-      LOG.info("onSuccess callback invoked for {}", target);
-      successLatch.countDown();
-    }
-
-    @Override
-    public void onFailure(@Nonnull Throwable t) {
-      LOG.info("onFailure callback invoked for {} with exception", target, t);
-      failureLatch.countDown();
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to