This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b49d7a  HDDS-989. Check Hdds Volumes for errors. Contributed by Arpit 
Agarwal.
3b49d7a is described below

commit 3b49d7aeae8819ce7c2c4f4fec057dd9e75dedf1
Author: Arpit Agarwal <a...@apache.org>
AuthorDate: Sun Jan 27 11:18:30 2019 -0800

    HDDS-989. Check Hdds Volumes for errors. Contributed by Arpit Agarwal.
---
 .../container/common/volume/AbstractFuture.java    | 1291 ++++++++++++++++++++
 .../ozone/container/common/volume/HddsVolume.java  |   24 +-
 .../container/common/volume/HddsVolumeChecker.java |  418 +++++++
 .../common/volume/ThrottledAsyncChecker.java       |  245 ++++
 .../container/common/volume/TimeoutFuture.java     |  161 +++
 .../ozone/container/common/volume/VolumeSet.java   |  116 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |    1 +
 .../common/volume/TestHddsVolumeChecker.java       |  212 ++++
 .../common/volume/TestVolumeSetDiskChecks.java     |  185 +++
 9 files changed, 2643 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java
new file mode 100644
index 0000000..438692c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java
@@ -0,0 +1,1291 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/**
+ * Some portions of this class have been modified to make it functional in this
+ * package.
+ */
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.GwtCompatible;
+import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+    .newUpdater;
+
+import javax.annotation.Nullable;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An abstract implementation of {@link ListenableFuture}, intended for
+ * advanced users only. More common ways to create a {@code ListenableFuture}
+ * include instantiating a {@link SettableFuture}, submitting a task to a
+ * {@link ListeningExecutorService}, and deriving a {@code Future} from an
+ * existing one, typically using methods like {@link Futures#transform
+ * (ListenableFuture, com.google.common.base.Function) Futures.transform}
+ * and its overloaded versions.
+ * <p>
+ * <p>This class implements all methods in {@code ListenableFuture}.
+ * Subclasses should provide a way to set the result of the computation
+ * through the protected methods {@link #set(Object)},
+ * {@link #setFuture(ListenableFuture)} and {@link #setException(Throwable)}.
+ * Subclasses may also override {@link #interruptTask()}, which will be
+ * invoked automatically if a call to {@link #cancel(boolean) cancel(true)}
+ * succeeds in canceling the future. Subclasses should rarely override other
+ * methods.
+ */
+
+@GwtCompatible(emulated = true)
+public abstract class AbstractFuture<V> implements ListenableFuture<V> {
+  // NOTE: Whenever both tests are cheap and functional, it's faster to use &,
+  // | instead of &&, ||
+
+  private static final boolean GENERATE_CANCELLATION_CAUSES =
+      Boolean.parseBoolean(
+          System.getProperty("guava.concurrent.generate_cancellation_cause",
+              "false"));
+
+  /**
+   * A less abstract subclass of AbstractFuture. This can be used to optimize
+   * setFuture by ensuring that {@link #get} calls exactly the implementation
+   * of {@link AbstractFuture#get}.
+   */
+  abstract static class TrustedFuture<V> extends AbstractFuture<V> {
+    @Override
+    public final V get() throws InterruptedException, ExecutionException {
+      return super.get();
+    }
+
+    @Override
+    public final V get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      return super.get(timeout, unit);
+    }
+
+    @Override
+    public final boolean isDone() {
+      return super.isDone();
+    }
+
+    @Override
+    public final boolean isCancelled() {
+      return super.isCancelled();
+    }
+
+    @Override
+    public final void addListener(Runnable listener, Executor executor) {
+      super.addListener(listener, executor);
+    }
+
+    @Override
+    public final boolean cancel(boolean mayInterruptIfRunning) {
+      return super.cancel(mayInterruptIfRunning);
+    }
+  }
+
+  // Logger to log exceptions caught when running listeners.
+  private static final Logger log = Logger
+      .getLogger(AbstractFuture.class.getName());
+
+  // A heuristic for timed gets. If the remaining timeout is less than this,
+  // spin instead of
+  // blocking. This value is what AbstractQueuedSynchronizer uses.
+  private static final long SPIN_THRESHOLD_NANOS = 1000L;
+
+  private static final AtomicHelper ATOMIC_HELPER;
+
+  static {
+    AtomicHelper helper;
+
+    try {
+      helper = new UnsafeAtomicHelper();
+    } catch (Throwable unsafeFailure) {
+      // catch absolutely everything and fall through to our 'SafeAtomicHelper'
+      // The access control checks that ARFU does means the caller class has
+      // to be AbstractFuture
+      // instead of SafeAtomicHelper, so we annoyingly define these here
+      try {
+        helper =
+            new SafeAtomicHelper(
+                newUpdater(Waiter.class, Thread.class, "thread"),
+                newUpdater(Waiter.class, Waiter.class, "next"),
+                newUpdater(AbstractFuture.class, Waiter.class, "waiters"),
+                newUpdater(AbstractFuture.class, Listener.class, "listeners"),
+                newUpdater(AbstractFuture.class, Object.class, "value"));
+      } catch (Throwable atomicReferenceFieldUpdaterFailure) {
+        // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs
+        // that cause getDeclaredField to throw a NoSuchFieldException when
+        // the field is definitely there.
+        // For these users fallback to a suboptimal implementation, based on
+        // synchronized. This will be a definite performance hit to those 
users.
+        log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure);
+        log.log(
+            Level.SEVERE, "SafeAtomicHelper is broken!",
+            atomicReferenceFieldUpdaterFailure);
+        helper = new SynchronizedHelper();
+      }
+    }
+    ATOMIC_HELPER = helper;
+
+    // Prevent rare disastrous classloading in first call to LockSupport.park.
+    // See: https://bugs.openjdk.java.net/browse/JDK-8074773
+    @SuppressWarnings("unused")
+    Class<?> ensureLoaded = LockSupport.class;
+  }
+
+  /**
+   * Waiter links form a Treiber stack, in the {@link #waiters} field.
+   */
+  private static final class Waiter {
+    static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
+
+    @Nullable volatile Thread thread;
+    @Nullable volatile Waiter next;
+
+    /**
+     * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this
+     * class is loaded before the ATOMIC_HELPER. Apparently this is possible
+     * on some android platforms.
+     */
+    Waiter(boolean unused) {
+    }
+
+    Waiter() {
+      // avoid volatile write, write is made visible by subsequent CAS on
+      // waiters field
+      ATOMIC_HELPER.putThread(this, Thread.currentThread());
+    }
+
+    // non-volatile write to the next field. Should be made visible by
+    // subsequent CAS on waiters field.
+    void setNext(Waiter next) {
+      ATOMIC_HELPER.putNext(this, next);
+    }
+
+    void unpark() {
+      // This is racy with removeWaiter. The consequence of the race is that
+      // we may spuriously call unpark even though the thread has already
+      // removed itself from the list. But even if we did use a CAS, that
+      // race would still exist (it would just be ever so slightly smaller).
+      Thread w = thread;
+      if (w != null) {
+        thread = null;
+        LockSupport.unpark(w);
+      }
+    }
+  }
+
+  /**
+   * Marks the given node as 'deleted' (null waiter) and then scans the list
+   * to unlink all deleted nodes. This is an O(n) operation in the common
+   * case (and O(n^2) in the worst), but we are saved by two things.
+   * <ul>
+   * <li>This is only called when a waiting thread times out or is
+   * interrupted. Both of which should be rare.
+   * <li>The waiters list should be very short.
+   * </ul>
+   */
+  private void removeWaiter(Waiter node) {
+    node.thread = null; // mark as 'deleted'
+    restart:
+    while (true) {
+      Waiter pred = null;
+      Waiter curr = waiters;
+      if (curr == Waiter.TOMBSTONE) {
+        return; // give up if someone is calling complete
+      }
+      Waiter succ;
+      while (curr != null) {
+        succ = curr.next;
+        if (curr.thread != null) { // we aren't unlinking this node, update
+          // pred.
+          pred = curr;
+        } else if (pred != null) { // We are unlinking this node and it has a
+          // predecessor.
+          pred.next = succ;
+          if (pred.thread == null) { // We raced with another node that
+            // unlinked pred. Restart.
+            continue restart;
+          }
+        } else if (!ATOMIC_HELPER
+            .casWaiters(this, curr, succ)) { // We are unlinking head
+          continue restart; // We raced with an add or complete
+        }
+        curr = succ;
+      }
+      break;
+    }
+  }
+
+  /**
+   * Listeners also form a stack through the {@link #listeners} field.
+   */
+  private static final class Listener {
+    static final Listener TOMBSTONE = new Listener(null, null);
+    final Runnable task;
+    final Executor executor;
+
+    // writes to next are made visible by subsequent CAS's on the listeners
+    // field
+    @Nullable Listener next;
+
+    Listener(Runnable task, Executor executor) {
+      this.task = task;
+      this.executor = executor;
+    }
+  }
+
+  /**
+   * A special value to represent {@code null}.
+   */
+  private static final Object NULL = new Object();
+
+  /**
+   * A special value to represent failure, when {@link #setException} is
+   * called successfully.
+   */
+  private static final class Failure {
+    static final Failure FALLBACK_INSTANCE =
+        new Failure(
+            new Throwable("Failure occurred while trying to finish a future" +
+                ".") {
+              @Override
+              public synchronized Throwable fillInStackTrace() {
+                return this; // no stack trace
+              }
+            });
+    final Throwable exception;
+
+    Failure(Throwable exception) {
+      this.exception = checkNotNull(exception);
+    }
+  }
+
+  /**
+   * A special value to represent cancellation and the 'wasInterrupted' bit.
+   */
+  private static final class Cancellation {
+    final boolean wasInterrupted;
+    @Nullable final Throwable cause;
+
+    Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
+      this.wasInterrupted = wasInterrupted;
+      this.cause = cause;
+    }
+  }
+
+  /**
+   * A special value that encodes the 'setFuture' state.
+   */
+  private static final class SetFuture<V> implements Runnable {
+    final AbstractFuture<V> owner;
+    final ListenableFuture<? extends V> future;
+
+    SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) {
+      this.owner = owner;
+      this.future = future;
+    }
+
+    @Override
+    public void run() {
+      if (owner.value != this) {
+        // nothing to do, we must have been cancelled, don't bother inspecting
+        // the future.
+        return;
+      }
+      Object valueToSet = getFutureValue(future);
+      if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
+        complete(owner);
+      }
+    }
+  }
+
+  /**
+   * This field encodes the current state of the future.
+   * <p>
+   * <p>The valid values are:
+   * <ul>
+   * <li>{@code null} initial state, nothing has happened.
+   * <li>{@link Cancellation} terminal state, {@code cancel} was called.
+   * <li>{@link Failure} terminal state, {@code setException} was called.
+   * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
+   * <li>{@link #NULL} terminal state, {@code set(null)} was called.
+   * <li>Any other non-null value, terminal state, {@code set} was called with
+   * a non-null argument.
+   * </ul>
+   */
+  private volatile Object value;
+
+  /**
+   * All listeners.
+   */
+  private volatile Listener listeners;
+
+  /**
+   * All waiting threads.
+   */
+  private volatile Waiter waiters;
+
+  /**
+   * Constructor for use by subclasses.
+   */
+  protected AbstractFuture() {
+  }
+
+  // Gets and Timed Gets
+  //
+  // * Be responsive to interruption
+  // * Don't create Waiter nodes if you aren't going to park, this helps
+  // reduce contention on the waiters field.
+  // * Future completion is defined by when #value becomes non-null/non
+  // SetFuture
+  // * Future completion can be observed if the waiters field contains a
+  // TOMBSTONE
+
+  // Timed Get
+  // There are a few design constraints to consider
+  // * We want to be responsive to small timeouts, unpark() has non trivial
+  // latency overheads (I have observed 12 micros on 64 bit linux systems to
+  // wake up a parked thread). So if the timeout is small we shouldn't park().
+  // This needs to be traded off with the cpu overhead of spinning, so we use
+  // SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
+  // similar purposes.
+  // * We want to behave reasonably for timeouts of 0
+  // * We are more responsive to completion than timeouts. This is because
+  // parkNanos depends on system scheduling and as such we could either miss
+  // our deadline, or unpark() could be delayed so that it looks like we
+  // timed out even though we didn't. For comparison FutureTask respects
+  // completion preferably and AQS is non-deterministic (depends on where in
+  // the queue the waiter is). If we wanted to be strict about it, we could
+  // store the unpark() time in the Waiter node and we could use that to make
+  // a decision about whether or not we timed out prior to being unparked.
+
+  /*
+   * Improve the documentation of when InterruptedException is thrown. Our
+   * behavior matches the JDK's, but the JDK's documentation is misleading.
+   */
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * <p>The default {@link AbstractFuture} implementation throws {@code
+   * InterruptedException} if the current thread is interrupted before or
+   * during the call, even if the value is already available.
+   *
+   * @throws InterruptedException  if the current thread was interrupted
+   * before or during the call
+   *                               (optional but recommended).
+   * @throws CancellationException {@inheritDoc}
+   */
+  @Override
+  public V get(long timeout, TimeUnit unit)
+      throws InterruptedException, TimeoutException, ExecutionException {
+    // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into
+    // the while(true) loop at the bottom and throw a timeoutexception.
+    long remainingNanos = unit
+        .toNanos(timeout); // we rely on the implicit null check on unit.
+    if (Thread.interrupted()) {
+      throw new InterruptedException();
+    }
+    Object localValue = value;
+    if (localValue != null & !(localValue instanceof SetFuture)) {
+      return getDoneValue(localValue);
+    }
+    // we delay calling nanoTime until we know we will need to either park or
+    // spin
+    final long endNanos = remainingNanos > 0 ? System
+        .nanoTime() + remainingNanos : 0;
+    long_wait_loop:
+    if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
+      Waiter oldHead = waiters;
+      if (oldHead != Waiter.TOMBSTONE) {
+        Waiter node = new Waiter();
+        do {
+          node.setNext(oldHead);
+          if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
+            while (true) {
+              LockSupport.parkNanos(this, remainingNanos);
+              // Check interruption first, if we woke up due to interruption
+              // we need to honor that.
+              if (Thread.interrupted()) {
+                removeWaiter(node);
+                throw new InterruptedException();
+              }
+
+              // Otherwise re-read and check doneness. If we loop then it must
+              // have been a spurious wakeup
+              localValue = value;
+              if (localValue != null & !(localValue instanceof SetFuture)) {
+                return getDoneValue(localValue);
+              }
+
+              // timed out?
+              remainingNanos = endNanos - System.nanoTime();
+              if (remainingNanos < SPIN_THRESHOLD_NANOS) {
+                // Remove the waiter, one way or another we are done parking
+                // this thread.
+                removeWaiter(node);
+                break long_wait_loop; // jump down to the busy wait loop
+              }
+            }
+          }
+          oldHead = waiters; // re-read and loop.
+        } while (oldHead != Waiter.TOMBSTONE);
+      }
+      // re-read value, if we get here then we must have observed a TOMBSTONE
+      // while trying to add a waiter.
+      return getDoneValue(value);
+    }
+    // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and
+    // there is no node on the waiters list
+    while (remainingNanos > 0) {
+      localValue = value;
+      if (localValue != null & !(localValue instanceof SetFuture)) {
+        return getDoneValue(localValue);
+      }
+      if (Thread.interrupted()) {
+        throw new InterruptedException();
+      }
+      remainingNanos = endNanos - System.nanoTime();
+    }
+    throw new TimeoutException();
+  }
+
+  /*
+   * Improve the documentation of when InterruptedException is thrown. Our
+   * behavior matches the JDK's, but the JDK's documentation is misleading.
+   */
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * <p>The default {@link AbstractFuture} implementation throws {@code
+   * InterruptedException} if the current thread is interrupted before or
+   * during the call, even if the value is already available.
+   *
+   * @throws InterruptedException  if the current thread was interrupted
+   * before or during the call
+   *                               (optional but recommended).
+   * @throws CancellationException {@inheritDoc}
+   */
+  @Override
+  public V get() throws InterruptedException, ExecutionException {
+    if (Thread.interrupted()) {
+      throw new InterruptedException();
+    }
+    Object localValue = value;
+    if (localValue != null & !(localValue instanceof SetFuture)) {
+      return getDoneValue(localValue);
+    }
+    Waiter oldHead = waiters;
+    if (oldHead != Waiter.TOMBSTONE) {
+      Waiter node = new Waiter();
+      do {
+        node.setNext(oldHead);
+        if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
+          // we are on the stack, now wait for completion.
+          while (true) {
+            LockSupport.park(this);
+            // Check interruption first, if we woke up due to interruption we
+            // need to honor that.
+            if (Thread.interrupted()) {
+              removeWaiter(node);
+              throw new InterruptedException();
+            }
+            // Otherwise re-read and check doneness. If we loop then it must
+            // have been a spurious wakeup
+            localValue = value;
+            if (localValue != null & !(localValue instanceof SetFuture)) {
+              return getDoneValue(localValue);
+            }
+          }
+        }
+        oldHead = waiters; // re-read and loop.
+      } while (oldHead != Waiter.TOMBSTONE);
+    }
+    // re-read value, if we get here then we must have observed a TOMBSTONE
+    // while trying to add a waiter.
+    return getDoneValue(value);
+  }
+
+  /**
+   * Unboxes {@code obj}. Assumes that obj is not {@code null} or a
+   * {@link SetFuture}.
+   */
+  private V getDoneValue(Object obj) throws ExecutionException {
+    // While this seems like it might be too branch-y, simple benchmarking
+    // proves it to be unmeasurable (comparing done AbstractFutures with
+    // immediateFuture)
+    if (obj instanceof Cancellation) {
+      throw cancellationExceptionWithCause(
+          "Task was cancelled.", ((Cancellation) obj).cause);
+    } else if (obj instanceof Failure) {
+      throw new ExecutionException(((Failure) obj).exception);
+    } else if (obj == NULL) {
+      return null;
+    } else {
+      @SuppressWarnings("unchecked") // this is the only other option
+          V asV = (V) obj;
+      return asV;
+    }
+  }
+
+  @Override
+  public boolean isDone() {
+    final Object localValue = value;
+    return localValue != null & !(localValue instanceof SetFuture);
+  }
+
+  @Override
+  public boolean isCancelled() {
+    final Object localValue = value;
+    return localValue instanceof Cancellation;
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * <p>If a cancellation attempt succeeds on a {@code Future} that had
+   * previously been {@linkplain#setFuture set asynchronously}, then the
+   * cancellation will also be propagated to the delegate {@code Future} that
+   * was supplied in the {@code setFuture} call.
+   */
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    Object localValue = value;
+    boolean rValue = false;
+    if (localValue == null | localValue instanceof SetFuture) {
+      // Try to delay allocating the exception. At this point we may still
+      // lose the CAS, but it is certainly less likely.
+      Throwable cause =
+          GENERATE_CANCELLATION_CAUSES
+              ? new CancellationException("Future.cancel() was called.")
+              : null;
+      Object valueToSet = new Cancellation(mayInterruptIfRunning, cause);
+      AbstractFuture<?> abstractFuture = this;
+      while (true) {
+        if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
+          rValue = true;
+          // We call interuptTask before calling complete(), which is
+          // consistent with FutureTask
+          if (mayInterruptIfRunning) {
+            abstractFuture.interruptTask();
+          }
+          complete(abstractFuture);
+          if (localValue instanceof SetFuture) {
+            // propagate cancellation to the future set in setfuture, this is
+            // racy, and we don't care if we are successful or not.
+            ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue)
+                .future;
+            if (futureToPropagateTo instanceof TrustedFuture) {
+              // If the future is a TrustedFuture then we specifically avoid
+              // calling cancel() this has 2 benefits
+              // 1. for long chains of futures strung together with setFuture
+              // we consume less stack
+              // 2. we avoid allocating Cancellation objects at every level of
+              // the cancellation chain
+              // We can only do this for TrustedFuture, because
+              // TrustedFuture.cancel is final and does nothing but delegate
+              // to this method.
+              AbstractFuture<?> trusted = (AbstractFuture<?>)
+                  futureToPropagateTo;
+              localValue = trusted.value;
+              if (localValue == null | localValue instanceof SetFuture) {
+                abstractFuture = trusted;
+                continue;  // loop back up and try to complete the new future
+              }
+            } else {
+              // not a TrustedFuture, call cancel directly.
+              futureToPropagateTo.cancel(mayInterruptIfRunning);
+            }
+          }
+          break;
+        }
+        // obj changed, reread
+        localValue = abstractFuture.value;
+        if (!(localValue instanceof SetFuture)) {
+          // obj cannot be null at this point, because value can only change
+          // from null to non-null. So if value changed (and it did since we
+          // lost the CAS), then it cannot be null and since it isn't a
+          // SetFuture, then the future must be done and we should exit the 
loop
+          break;
+        }
+      }
+    }
+    return rValue;
+  }
+
+  /**
+   * Subclasses can override this method to implement interruption of the
+   * future's computation. The method is invoked automatically by a
+   * successful call to {@link #cancel(boolean) cancel(true)}.
+   * <p>
+   * <p>The default implementation does nothing.
+   *
+   * @since 10.0
+   */
+  protected void interruptTask() {
+  }
+
+  /**
+   * Returns true if this future was cancelled with {@code
+   * mayInterruptIfRunning} set to {@code true}.
+   *
+   * @since 14.0
+   */
+  protected final boolean wasInterrupted() {
+    final Object localValue = value;
+    return (localValue instanceof Cancellation) && ((Cancellation) localValue)
+        .wasInterrupted;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @since 10.0
+   */
+  @Override
+  public void addListener(Runnable listener, Executor executor) {
+    checkNotNull(listener, "Runnable was null.");
+    checkNotNull(executor, "Executor was null.");
+    Listener oldHead = listeners;
+    if (oldHead != Listener.TOMBSTONE) {
+      Listener newNode = new Listener(listener, executor);
+      do {
+        newNode.next = oldHead;
+        if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
+          return;
+        }
+        oldHead = listeners; // re-read
+      } while (oldHead != Listener.TOMBSTONE);
+    }
+    // If we get here then the Listener TOMBSTONE was set, which means the
+    // future is done, call the listener.
+    executeListener(listener, executor);
+  }
+
+  /**
+   * Sets the result of this {@code Future} unless this {@code Future} has
+   * already been cancelled or set (including
+   * {@linkplain #setFuture set asynchronously}). When a call to this method
+   * returns, the {@code Future} is guaranteed to be
+   * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which
+   * case it returns {@code true}). If it returns {@code false}, the {@code
+   * Future} may have previously been set asynchronously, in which case its
+   * result may not be known yet. That result, though not yet known, cannot
+   * be overridden by a call to a {@code set*} method, only by a call to
+   * {@link #cancel}.
+   *
+   * @param value the value to be used as the result
+   * @return true if the attempt was accepted, completing the {@code Future}
+   */
+  protected boolean set(@Nullable V value) {
+    Object valueToSet = value == null ? NULL : value;
+    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+      complete(this);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Sets the failed result of this {@code Future} unless this {@code Future}
+   * has already been cancelled or set (including
+   * {@linkplain #setFuture set asynchronously}). When a call to this method
+   * returns, the {@code Future} is guaranteed to be
+   * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which
+   * case it returns {@code true}). If it returns {@code false}, the
+   * {@code Future} may have previously been set asynchronously, in which case
+   * its result may not be known yet. That result, though not yet known,
+   * cannot be overridden by a call to a {@code set*} method, only by a call
+   * to {@link #cancel}.
+   *
+   * @param throwable the exception to be used as the failed result
+   * @return true if the attempt was accepted, completing the {@code Future}
+   */
+  protected boolean setException(Throwable throwable) {
+    Object valueToSet = new Failure(checkNotNull(throwable));
+    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+      complete(this);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Sets the result of this {@code Future} to match the supplied input
+   * {@code Future} once the supplied {@code Future} is done, unless this
+   * {@code Future} has already been cancelled or set (including "set
+   * asynchronously," defined below).
+   * <p>
+   * <p>If the supplied future is {@linkplain #isDone done} when this method
+   * is called and the call is accepted, then this future is guaranteed to
+   * have been completed with the supplied future by the time this method
+   * returns. If the supplied future is not done and the call is accepted, then
+   * the future will be <i>set asynchronously</i>. Note that such a result,
+   * though not yet known, cannot be overridden by a call to a {@code set*}
+   * method, only by a call to {@link #cancel}.
+   * <p>
+   * <p>If the call {@code setFuture(delegate)} is accepted and this {@code
+   * Future} is later cancelled, cancellation will be propagated to {@code
+   * delegate}. Additionally, any call to {@code setFuture} after any
+   * cancellation will propagate cancellation to the supplied {@code Future}.
+   *
+   * @param future the future to delegate to
+   * @return true if the attempt was accepted, indicating that the {@code
+   * Future} was not previously cancelled or set.
+   * @since 19.0
+   */
+  @Beta
+  protected boolean setFuture(ListenableFuture<? extends V> future) {
+    checkNotNull(future);
+    Object localValue = value;
+    if (localValue == null) {
+      if (future.isDone()) {
+        Object value = getFutureValue(future);
+        if (ATOMIC_HELPER.casValue(this, null, value)) {
+          complete(this);
+          return true;
+        }
+        return false;
+      }
+      SetFuture valueToSet = new SetFuture<V>(this, future);
+      if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+        // the listener is responsible for calling completeWithFuture,
+        // directExecutor is appropriate since all we are doing is unpacking
+        // a completed future which should be fast.
+        try {
+          future.addListener(valueToSet, directExecutor());
+        } catch (Throwable t) {
+          // addListener has thrown an exception! SetFuture.run can't throw
+          // any exceptions so this must have been caused by addListener
+          // itself. The most likely explanation is a misconfigured mock. Try
+          // to switch to Failure.
+          Failure failure;
+          try {
+            failure = new Failure(t);
+          } catch (Throwable oomMostLikely) {
+            failure = Failure.FALLBACK_INSTANCE;
+          }
+          // Note: The only way this CAS could fail is if cancel() has raced
+          // with us. That is ok.
+          boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure);
+        }
+        return true;
+      }
+      localValue = value; // we lost the cas, fall through and maybe cancel
+    }
+    // The future has already been set to something. If it is cancellation we
+    // should cancel the incoming future.
+    if (localValue instanceof Cancellation) {
+      // we don't care if it fails, this is best-effort.
+      future.cancel(((Cancellation) localValue).wasInterrupted);
+    }
+    return false;
+  }
+
+  /**
+   * Returns a value, suitable for storing in the {@link #value} field. From
+   * the given future, which is assumed to be done.
+   * <p>
+   * <p>This is approximately the inverse of {@link #getDoneValue(Object)}
+   */
+  private static Object getFutureValue(ListenableFuture<?> future) {
+    Object valueToSet;
+    if (future instanceof TrustedFuture) {
+      // Break encapsulation for TrustedFuture instances since we know that
+      // subclasses cannot override .get() (since it is final) and therefore
+      // this is equivalent to calling .get() and unpacking the exceptions
+      // like we do below (just much faster because it is a single field read
+      // instead of a read, several branches and possibly creating exceptions).
+      return ((AbstractFuture<?>) future).value;
+    } else {
+      // Otherwise calculate valueToSet by calling .get()
+      try {
+        Object v = getDone(future);
+        valueToSet = v == null ? NULL : v;
+      } catch (ExecutionException exception) {
+        valueToSet = new Failure(exception.getCause());
+      } catch (CancellationException cancellation) {
+        valueToSet = new Cancellation(false, cancellation);
+      } catch (Throwable t) {
+        valueToSet = new Failure(t);
+      }
+    }
+    return valueToSet;
+  }
+
+  /**
+   * Unblocks all threads and runs all listeners.
+   */
+  private static void complete(AbstractFuture<?> future) {
+    Listener next = null;
+    outer:
+    while (true) {
+      future.releaseWaiters();
+      // We call this before the listeners in order to avoid needing to manage
+      // a separate stack data structure for them. afterDone() should be
+      // generally fast and only used for cleanup work... but in theory can
+      // also be recursive and create StackOverflowErrors
+      future.afterDone();
+      // push the current set of listeners onto next
+      next = future.clearListeners(next);
+      future = null;
+      while (next != null) {
+        Listener curr = next;
+        next = next.next;
+        Runnable task = curr.task;
+        if (task instanceof SetFuture) {
+          SetFuture<?> setFuture = (SetFuture<?>) task;
+          // We unwind setFuture specifically to avoid StackOverflowErrors in
+          // the case of long chains of SetFutures
+          // Handling this special case is important because there is no way
+          // to pass an executor to setFuture, so a user couldn't break the
+          // chain by doing this themselves.  It is also potentially common
+          // if someone writes a recursive Futures.transformAsync transformer.
+          future = setFuture.owner;
+          if (future.value == setFuture) {
+            Object valueToSet = getFutureValue(setFuture.future);
+            if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
+              continue outer;
+            }
+          }
+          // other wise the future we were trying to set is already done.
+        } else {
+          executeListener(task, curr.executor);
+        }
+      }
+      break;
+    }
+  }
+
+  public static <V> V getDone(Future<V> future) throws ExecutionException {
+    /*
+     * We throw IllegalStateException, since the call could succeed later.
+     * Perhaps we "should" throw IllegalArgumentException, since the call
+     * could succeed with a different argument. Those exceptions' docs
+     * suggest that either is acceptable. Google's Java Practices page
+     * recommends IllegalArgumentException here, in part to keep its
+     * recommendation simple: Static methods should throw
+     * IllegalStateException only when they use static state.
+     *
+     *
+     * Why do we deviate here? The answer: We want for fluentFuture.getDone()
+      * to throw the same exception as Futures.getDone(fluentFuture).
+     */
+    Preconditions.checkState(future.isDone(), "Future was expected to be " +
+        "done:" +
+        " %s", future);
+    return Uninterruptibles.getUninterruptibly(future);
+  }
+
+  /**
+   * Callback method that is called exactly once after the future is completed.
+   * <p>
+   * <p>If {@link #interruptTask} is also run during completion,
+   * {@link #afterDone} runs after it.
+   * <p>
+   * <p>The default implementation of this method in {@code AbstractFuture}
+   * does nothing.  This is intended for very lightweight cleanup work, for
+   * example, timing statistics or clearing fields.
+   * If your task does anything heavier consider, just using a listener with
+   * an executor.
+   *
+   * @since 20.0
+   */
+  @Beta
+  protected void afterDone() {
+  }
+
+  /**
+   * If this future has been cancelled (and possibly interrupted), cancels
+   * (and possibly interrupts) the given future (if available).
+   * <p>
+   * <p>This method should be used only when this future is completed. It is
+   * designed to be called from {@code done}.
+   */
+  final void maybePropagateCancellation(@Nullable Future<?> related) {
+    if (related != null & isCancelled()) {
+      related.cancel(wasInterrupted());
+    }
+  }
+
+  /**
+   * Releases all threads in the {@link #waiters} list, and clears the list.
+   */
+  private void releaseWaiters() {
+    Waiter head;
+    do {
+      head = waiters;
+    } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
+    for (
+        Waiter currentWaiter = head;
+        currentWaiter != null;
+        currentWaiter = currentWaiter.next) {
+      currentWaiter.unpark();
+    }
+  }
+
+  /**
+   * Clears the {@link #listeners} list and prepends its contents to {@code
+   * onto}, least recently added first.
+   */
+  private Listener clearListeners(Listener onto) {
+    // We need to
+    // 1. atomically swap the listeners with TOMBSTONE, this is because
+    // addListener uses that to to synchronize with us
+    // 2. reverse the linked list, because despite our rather clear contract,
+    // people depend on us executing listeners in the order they were added
+    // 3. push all the items onto 'onto' and return the new head of the stack
+    Listener head;
+    do {
+      head = listeners;
+    } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
+    Listener reversedList = onto;
+    while (head != null) {
+      Listener tmp = head;
+      head = head.next;
+      tmp.next = reversedList;
+      reversedList = tmp;
+    }
+    return reversedList;
+  }
+
+  /**
+   * Submits the given runnable to the given {@link Executor} catching and
+   * logging all {@linkplain RuntimeException runtime exceptions} thrown by
+   * the executor.
+   */
+  private static void executeListener(Runnable runnable, Executor executor) {
+    try {
+      executor.execute(runnable);
+    } catch (RuntimeException e) {
+      // Log it and keep going -- bad runnable and/or executor. Don't punish
+      // the other runnables if we're given a bad one. We only catch
+      // RuntimeException because we want Errors to propagate up.
+      log.log(
+          Level.SEVERE,
+          "RuntimeException while executing runnable " + runnable + " with " +
+              "executor " + executor,
+          e);
+    }
+  }
+
+  private abstract static class AtomicHelper {
+    /**
+     * Non volatile write of the thread to the {@link Waiter#thread} field.
+     */
+    abstract void putThread(Waiter waiter, Thread newValue);
+
+    /**
+     * Non volatile write of the waiter to the {@link Waiter#next} field.
+     */
+    abstract void putNext(Waiter waiter, Waiter newValue);
+
+    /**
+     * Performs a CAS operation on the {@link #waiters} field.
+     */
+    abstract boolean casWaiters(
+        AbstractFuture<?> future, Waiter expect,
+        Waiter update);
+
+    /**
+     * Performs a CAS operation on the {@link #listeners} field.
+     */
+    abstract boolean casListeners(
+        AbstractFuture<?> future, Listener expect,
+        Listener update);
+
+    /**
+     * Performs a CAS operation on the {@link #value} field.
+     */
+    abstract boolean casValue(
+        AbstractFuture<?> future, Object expect, Object update);
+  }
+
+  /**
+   * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
+   * <p>
+   * <p>Static initialization of this class will fail if the
+   * {@link sun.misc.Unsafe} object cannot be accessed.
+   */
+  private static final class UnsafeAtomicHelper extends AtomicHelper {
+    static final sun.misc.Unsafe UNSAFE;
+    static final long LISTENERS_OFFSET;
+    static final long WAITERS_OFFSET;
+    static final long VALUE_OFFSET;
+    static final long WAITER_THREAD_OFFSET;
+    static final long WAITER_NEXT_OFFSET;
+
+    static {
+      sun.misc.Unsafe unsafe = null;
+      try {
+        unsafe = sun.misc.Unsafe.getUnsafe();
+      } catch (SecurityException tryReflectionInstead) {
+        try {
+          unsafe =
+              AccessController.doPrivileged(
+                  new PrivilegedExceptionAction<sun.misc.Unsafe>() {
+                    @Override
+                    public sun.misc.Unsafe run() throws Exception {
+                      Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
+                      for (java.lang.reflect.Field f : k.getDeclaredFields()) {
+                        f.setAccessible(true);
+                        Object x = f.get(null);
+                        if (k.isInstance(x)) {
+                          return k.cast(x);
+                        }
+                      }
+                      throw new NoSuchFieldError("the Unsafe");
+                    }
+                  });
+        } catch (PrivilegedActionException e) {
+          throw new RuntimeException(
+              "Could not initialize intrinsics", e.getCause());
+        }
+      }
+      try {
+        Class<?> abstractFuture = AbstractFuture.class;
+        WAITERS_OFFSET = unsafe
+            .objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
+        LISTENERS_OFFSET = unsafe
+            .objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
+        VALUE_OFFSET = unsafe
+            .objectFieldOffset(abstractFuture.getDeclaredField("value"));
+        WAITER_THREAD_OFFSET = unsafe
+            .objectFieldOffset(Waiter.class.getDeclaredField("thread"));
+        WAITER_NEXT_OFFSET = unsafe
+            .objectFieldOffset(Waiter.class.getDeclaredField("next"));
+        UNSAFE = unsafe;
+      } catch (Exception e) {
+        throwIfUnchecked(e);
+        throw new RuntimeException(e);
+      }
+    }
+
+    public static void throwIfUnchecked(Throwable throwable) {
+      checkNotNull(throwable);
+      if (throwable instanceof RuntimeException) {
+        throw (RuntimeException) throwable;
+      }
+      if (throwable instanceof Error) {
+        throw (Error) throwable;
+      }
+    }
+
+    @Override
+    void putThread(Waiter waiter, Thread newValue) {
+      UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
+    }
+
+    @Override
+    void putNext(Waiter waiter, Waiter newValue) {
+      UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
+    }
+
+    /**
+     * Performs a CAS operation on the {@link #waiters} field.
+     */
+    @Override
+    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+        update) {
+      return UNSAFE
+          .compareAndSwapObject(future, WAITERS_OFFSET, expect, update);
+    }
+
+    /**
+     * Performs a CAS operation on the {@link #listeners} field.
+     */
+    @Override
+    boolean casListeners(
+        AbstractFuture<?> future, Listener expect, Listener update) {
+      return UNSAFE
+          .compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
+    }
+
+    /**
+     * Performs a CAS operation on the {@link #value} field.
+     */
+    @Override
+    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+      return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
+    }
+  }
+
+  /**
+   * {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}.
+   */
+  private static final class SafeAtomicHelper extends AtomicHelper {
+    final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
+    final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
+    final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
+    final AtomicReferenceFieldUpdater<AbstractFuture, Listener>
+        listenersUpdater;
+    final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
+
+    SafeAtomicHelper(
+        AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
+        AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
+        AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
+        AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
+        AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
+      this.waiterThreadUpdater = waiterThreadUpdater;
+      this.waiterNextUpdater = waiterNextUpdater;
+      this.waitersUpdater = waitersUpdater;
+      this.listenersUpdater = listenersUpdater;
+      this.valueUpdater = valueUpdater;
+    }
+
+    @Override
+    void putThread(Waiter waiter, Thread newValue) {
+      waiterThreadUpdater.lazySet(waiter, newValue);
+    }
+
+    @Override
+    void putNext(Waiter waiter, Waiter newValue) {
+      waiterNextUpdater.lazySet(waiter, newValue);
+    }
+
+    @Override
+    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+        update) {
+      return waitersUpdater.compareAndSet(future, expect, update);
+    }
+
+    @Override
+    boolean casListeners(
+        AbstractFuture<?> future, Listener expect, Listener update) {
+      return listenersUpdater.compareAndSet(future, expect, update);
+    }
+
+    @Override
+    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+      return valueUpdater.compareAndSet(future, expect, update);
+    }
+  }
+
+  /**
+   * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
+   * <p>
+   * <p>This is an implementation of last resort for when certain basic VM
+   * features are broken (like AtomicReferenceFieldUpdater).
+   */
+  private static final class SynchronizedHelper extends AtomicHelper {
+    @Override
+    void putThread(Waiter waiter, Thread newValue) {
+      waiter.thread = newValue;
+    }
+
+    @Override
+    void putNext(Waiter waiter, Waiter newValue) {
+      waiter.next = newValue;
+    }
+
+    @Override
+    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
+        update) {
+      synchronized (future) {
+        if (future.waiters == expect) {
+          future.waiters = update;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    @Override
+    boolean casListeners(
+        AbstractFuture<?> future, Listener expect, Listener update) {
+      synchronized (future) {
+        if (future.listeners == expect) {
+          future.listeners = update;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    @Override
+    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
+      synchronized (future) {
+        if (future.value == expect) {
+          future.value = update;
+          return true;
+        }
+        return false;
+      }
+    }
+  }
+
+  private static CancellationException cancellationExceptionWithCause(
+      @Nullable String message, @Nullable Throwable cause) {
+    CancellationException exception = new CancellationException(message);
+    exception.initCause(cause);
+    return exception;
+  }
+
+  /**
+   * Returns an {@link Executor} that runs each task in the thread that invokes
+   * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
+   * <p>
+   * <p>This instance is equivalent to: <pre>   {@code
+   *   final class DirectExecutor implements Executor {
+   *     public void execute(Runnable r) {
+   *       r.run();
+   *     }
+   *   }}</pre>
+   */
+  public static Executor directExecutor() {
+    return DirectExecutor.INSTANCE;
+  }
+
+  /**
+   * See {@link #directExecutor} for behavioral notes.
+   */
+  private enum DirectExecutor implements Executor {
+    INSTANCE;
+
+    @Override
+    public void execute(Runnable command) {
+      command.run();
+    }
+
+    @Override
+    public String toString() {
+      return "MoreExecutors.directExecutor()";
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index d088826..4cf6c3d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -20,17 +20,23 @@ package org.apache.hadoop.ozone.container.common.volume;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.sun.istack.Nullable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.GetSpaceUsed;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
 import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.Time;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +64,10 @@ import java.util.UUID;
  * During DN startup, if the VERSION file exists, we verify that the
  * clusterID in the version file matches the clusterID from SCM.
  */
-public final class HddsVolume {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HddsVolume
+    implements Checkable<Boolean, VolumeCheckResult> {
 
   private static final Logger LOG = LoggerFactory.getLogger(HddsVolume.class);
 
@@ -77,6 +86,19 @@ public final class HddsVolume {
   private int layoutVersion;      // layout version of the storage data
 
   /**
+   * Run a check on the current volume to determine if it is healthy.
+   * @param unused context for the check, ignored.
+   * @return result of checking the volume.
+   * @throws Exception if an exception was encountered while running
+   *            the volume check.
+   */
+  @Override
+  public VolumeCheckResult check(@Nullable Boolean unused) throws Exception {
+    DiskChecker.checkDir(hddsRootDir);
+    return VolumeCheckResult.HEALTHY;
+  }
+
+  /**
    * Builder for HddsVolume.
    */
   public static class Builder {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
new file mode 100644
index 0000000..6df81df
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
+
+
+/**
+ * A class that encapsulates running disk checks against each HDDS volume and
+ * allows retrieving a list of failed volumes.
+ */
+public class HddsVolumeChecker {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(HddsVolumeChecker.class);
+
+  private AsyncChecker<Boolean, VolumeCheckResult> delegateChecker;
+
+  private final AtomicLong numVolumeChecks = new AtomicLong(0);
+  private final AtomicLong numAllVolumeChecks = new AtomicLong(0);
+  private final AtomicLong numSkippedChecks = new AtomicLong(0);
+
+  /**
+   * Max allowed time for a disk check in milliseconds. If the check
+   * doesn't complete within this time we declare the disk as dead.
+   */
+  private final long maxAllowedTimeForCheckMs;
+
+  /**
+   * Minimum time between two successive disk checks of a volume.
+   */
+  private final long minDiskCheckGapMs;
+
+  /**
+   * Timestamp of the last check of all volumes.
+   */
+  private long lastAllVolumesCheck;
+
+  private final Timer timer;
+
+  private final ExecutorService checkVolumeResultHandlerExecutorService;
+
+  /**
+   * @param conf Configuration object.
+   * @param timer {@link Timer} object used for throttling checks.
+   */
+  public HddsVolumeChecker(Configuration conf, Timer timer)
+      throws DiskErrorException {
+    maxAllowedTimeForCheckMs = conf.getTimeDuration(
+        DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+        DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    if (maxAllowedTimeForCheckMs <= 0) {
+      throw new DiskErrorException("Invalid value configured for "
+          + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+          + maxAllowedTimeForCheckMs + " (should be > 0)");
+    }
+
+    this.timer = timer;
+
+    /**
+     * Maximum number of volume failures that can be tolerated without
+     * declaring a fatal error.
+     */
+    int maxVolumeFailuresTolerated = conf.getInt(
+        DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
+        DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
+
+    minDiskCheckGapMs = conf.getTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    if (minDiskCheckGapMs < 0) {
+      throw new DiskErrorException("Invalid value configured for "
+          + DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - "
+          + minDiskCheckGapMs + " (should be >= 0)");
+    }
+
+    long diskCheckTimeout = conf.getTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    if (diskCheckTimeout < 0) {
+      throw new DiskErrorException("Invalid value configured for "
+          + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+          + diskCheckTimeout + " (should be >= 0)");
+    }
+
+    lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
+
+    if (maxVolumeFailuresTolerated < 
DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
+      throw new DiskErrorException("Invalid value configured for "
+          + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+          + maxVolumeFailuresTolerated + " "
+          + DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG);
+    }
+
+    delegateChecker = new ThrottledAsyncChecker<>(
+        timer, minDiskCheckGapMs, diskCheckTimeout,
+        Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataNode DiskChecker thread %d")
+                .setDaemon(true)
+                .build()));
+
+    checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder()
+            .setNameFormat("VolumeCheck ResultHandler thread %d")
+            .setDaemon(true)
+            .build());
+  }
+
+  /**
+   * Run checks against all HDDS volumes.
+   *
+   * This check may be performed at service startup and subsequently at
+   * regular intervals to detect and handle failed volumes.
+   *
+   * @param volumes - Set of volumes to be checked. This set must be immutable
+   *                for the duration of the check else the results will be
+   *                unexpected.
+   *
+   * @return set of failed volumes.
+   */
+  public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes)
+      throws InterruptedException {
+    final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+    if (gap < minDiskCheckGapMs) {
+      numSkippedChecks.incrementAndGet();
+      LOG.trace(
+          "Skipped checking all volumes, time since last check {} is less " +
+              "than the minimum gap between checks ({} ms).",
+          gap, minDiskCheckGapMs);
+      return Collections.emptySet();
+    }
+
+    lastAllVolumesCheck = timer.monotonicNow();
+    final Set<HddsVolume> healthyVolumes = new HashSet<>();
+    final Set<HddsVolume> failedVolumes = new HashSet<>();
+    final Set<HddsVolume> allVolumes = new HashSet<>();
+
+    final AtomicLong numVolumes = new AtomicLong(volumes.size());
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    for (HddsVolume v : volumes) {
+      Optional<ListenableFuture<VolumeCheckResult>> olf =
+          delegateChecker.schedule(v, null);
+      LOG.info("Scheduled health check for volume {}", v);
+      if (olf.isPresent()) {
+        allVolumes.add(v);
+        Futures.addCallback(olf.get(),
+            new ResultHandler(v, healthyVolumes, failedVolumes,
+                numVolumes, (ignored1, ignored2) -> latch.countDown()));
+      } else {
+        if (numVolumes.decrementAndGet() == 0) {
+          latch.countDown();
+        }
+      }
+    }
+
+    // Wait until our timeout elapses, after which we give up on
+    // the remaining volumes.
+    if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
+      LOG.warn("checkAllVolumes timed out after {} ms" +
+          maxAllowedTimeForCheckMs);
+    }
+
+    numAllVolumeChecks.incrementAndGet();
+    synchronized (this) {
+      // All volumes that have not been detected as healthy should be
+      // considered failed. This is a superset of 'failedVolumes'.
+      //
+      // Make a copy under the mutex as Sets.difference() returns a view
+      // of a potentially changing set.
+      return new HashSet<>(Sets.difference(allVolumes, healthyVolumes));
+    }
+  }
+
+  /**
+   * A callback interface that is supplied the result of running an
+   * async disk check on multiple volumes.
+   */
+  public interface Callback {
+    /**
+     * @param healthyVolumes set of volumes that passed disk checks.
+     * @param failedVolumes set of volumes that failed disk checks.
+     */
+    void call(Set<HddsVolume> healthyVolumes,
+              Set<HddsVolume> failedVolumes);
+  }
+
+  /**
+   * Check a single volume asynchronously, returning a {@link ListenableFuture}
+   * that can be used to retrieve the final result.
+   *
+   * If the volume cannot be referenced then it is already closed and
+   * cannot be checked. No error is propagated to the callback.
+   *
+   * @param volume the volume that is to be checked.
+   * @param callback callback to be invoked when the volume check completes.
+   * @return true if the check was scheduled and the callback will be invoked.
+   *         false otherwise.
+   */
+  public boolean checkVolume(final HddsVolume volume, Callback callback) {
+    if (volume == null) {
+      LOG.debug("Cannot schedule check on null volume");
+      return false;
+    }
+
+    Optional<ListenableFuture<VolumeCheckResult>> olf =
+        delegateChecker.schedule(volume, null);
+    if (olf.isPresent()) {
+      numVolumeChecks.incrementAndGet();
+      Futures.addCallback(olf.get(),
+          new ResultHandler(volume, new HashSet<>(), new HashSet<>(),
+              new AtomicLong(1), callback),
+          checkVolumeResultHandlerExecutorService
+      );
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * A callback to process the results of checking a volume.
+   */
+  private class ResultHandler
+      implements FutureCallback<VolumeCheckResult> {
+    private final HddsVolume volume;
+    private final Set<HddsVolume> failedVolumes;
+    private final Set<HddsVolume> healthyVolumes;
+    private final AtomicLong volumeCounter;
+
+    @Nullable
+    private final Callback callback;
+
+    /**
+     *
+     * @param healthyVolumes set of healthy volumes. If the disk check is
+     *                       successful, add the volume here.
+     * @param failedVolumes set of failed volumes. If the disk check fails,
+     *                      add the volume here.
+     * @param volumeCounter volumeCounter used to trigger callback invocation.
+     * @param callback invoked when the volumeCounter reaches 0.
+     */
+    ResultHandler(HddsVolume volume,
+                  Set<HddsVolume> healthyVolumes,
+                  Set<HddsVolume> failedVolumes,
+                  AtomicLong volumeCounter,
+                  @Nullable Callback callback) {
+      this.volume = volume;
+      this.healthyVolumes = healthyVolumes;
+      this.failedVolumes = failedVolumes;
+      this.volumeCounter = volumeCounter;
+      this.callback = callback;
+    }
+
+    @Override
+    public void onSuccess(@Nonnull VolumeCheckResult result) {
+      switch(result) {
+        case HEALTHY:
+        case DEGRADED:
+          LOG.debug("Volume {} is {}.", volume, result);
+          markHealthy();
+          break;
+        case FAILED:
+          LOG.warn("Volume {} detected as being unhealthy", volume);
+          markFailed();
+          break;
+        default:
+          LOG.error("Unexpected health check result {} for volume {}",
+              result, volume);
+          markHealthy();
+          break;
+      }
+      cleanup();
+    }
+
+    @Override
+    public void onFailure(@Nonnull Throwable t) {
+      Throwable exception = (t instanceof ExecutionException) ?
+          t.getCause() : t;
+      LOG.warn("Exception running disk checks against volume " +
+          volume, exception);
+      markFailed();
+      cleanup();
+    }
+
+    private void markHealthy() {
+      synchronized (HddsVolumeChecker.this) {
+        healthyVolumes.add(volume);
+      }
+    }
+
+    private void markFailed() {
+      synchronized (HddsVolumeChecker.this) {
+        failedVolumes.add(volume);
+      }
+    }
+
+    private void cleanup() {
+      invokeCallback();
+    }
+
+    private void invokeCallback() {
+      try {
+        final long remaining = volumeCounter.decrementAndGet();
+        if (callback != null && remaining == 0) {
+          callback.call(healthyVolumes, failedVolumes);
+        }
+      } catch(Exception e) {
+        // Propagating this exception is unlikely to be helpful.
+        LOG.warn("Unexpected exception", e);
+      }
+    }
+  }
+
+  /**
+   * Shutdown the checker and its associated ExecutorService.
+   *
+   * See {@link ExecutorService#awaitTermination} for the interpretation
+   * of the parameters.
+   */
+  void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
+    try {
+      delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
+    } catch (InterruptedException e) {
+      LOG.warn("{} interrupted during shutdown.", 
this.getClass().getSimpleName());
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * This method is for testing only.
+   *
+   * @param testDelegate
+   */
+  @VisibleForTesting
+  void setDelegateChecker(
+      AsyncChecker<Boolean, VolumeCheckResult> testDelegate) {
+    delegateChecker = testDelegate;
+  }
+
+  /**
+   * Return the number of {@link #checkVolume} invocations.
+   */
+  public long getNumVolumeChecks() {
+    return numVolumeChecks.get();
+  }
+
+  /**
+   * Return the number of {@link #checkAllVolumes} invocations.
+   */
+  public long getNumAllVolumeChecks() {
+    return numAllVolumeChecks.get();
+  }
+
+  /**
+   * Return the number of checks skipped because the minimum gap since the
+   * last check had not elapsed.
+   */
+  public long getNumSkippedChecks() {
+    return numSkippedChecks.get();
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java
new file mode 100644
index 0000000..3be24e4
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.util.Timer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An implementation of {@link AsyncChecker} that skips checking recently
+ * checked objects. It will enforce at least minMsBetweenChecks
+ * milliseconds between two successive checks of any one object.
+ *
+ * It is assumed that the total number of Checkable objects in the system
+ * is small, (not more than a few dozen) since the checker uses O(Checkables)
+ * storage and also potentially O(Checkables) threads.
+ *
+ * minMsBetweenChecks should be configured reasonably
+ * by the caller to avoid spinning up too many threads frequently.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ThrottledAsyncChecker.class);
+
+  private final Timer timer;
+
+  /**
+   * The ExecutorService used to schedule asynchronous checks.
+   */
+  private final ListeningExecutorService executorService;
+  private final ScheduledExecutorService scheduledExecutorService;
+
+  /**
+   * The minimum gap in milliseconds between two successive checks
+   * of the same object. This is the throttle.
+   */
+  private final long minMsBetweenChecks;
+  private final long diskCheckTimeout;
+
+  /**
+   * Map of checks that are currently in progress. Protected by the object
+   * lock.
+   */
+  private final Map<Checkable, ListenableFuture<V>> checksInProgress;
+
+  /**
+   * Maps Checkable objects to a future that can be used to retrieve
+   * the results of the operation.
+   * Protected by the object lock.
+   */
+  private final Map<Checkable, ThrottledAsyncChecker.LastCheckResult<V>> 
completedChecks;
+
+  public ThrottledAsyncChecker(final Timer timer,
+                               final long minMsBetweenChecks,
+                               final long diskCheckTimeout,
+                               final ExecutorService executorService) {
+    this.timer = timer;
+    this.minMsBetweenChecks = minMsBetweenChecks;
+    this.diskCheckTimeout = diskCheckTimeout;
+    this.executorService = MoreExecutors.listeningDecorator(executorService);
+    this.checksInProgress = new HashMap<>();
+    this.completedChecks = new WeakHashMap<>();
+
+    if (this.diskCheckTimeout > 0) {
+      ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
+          ScheduledThreadPoolExecutor(1);
+      this.scheduledExecutorService = MoreExecutors
+          .getExitingScheduledExecutorService(scheduledThreadPoolExecutor);
+    } else {
+      this.scheduledExecutorService = null;
+    }
+  }
+
+  /**
+   * See {@link AsyncChecker#schedule}
+   *
+   * If the object has been checked recently then the check will
+   * be skipped. Multiple concurrent checks for the same object
+   * will receive the same Future.
+   */
+  @Override
+  public Optional<ListenableFuture<V>> schedule(
+      Checkable<K, V> target, K context) {
+    if (checksInProgress.containsKey(target)) {
+      return Optional.absent();
+    }
+
+    if (completedChecks.containsKey(target)) {
+      final ThrottledAsyncChecker.LastCheckResult<V> result = 
completedChecks.get(target);
+      final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
+      if (msSinceLastCheck < minMsBetweenChecks) {
+        LOG.debug("Skipped checking {}. Time since last check {}ms " +
+                "is less than the min gap {}ms.",
+            target, msSinceLastCheck, minMsBetweenChecks);
+        return Optional.absent();
+      }
+    }
+
+    LOG.info("Scheduling a check for {}", target);
+    final ListenableFuture<V> lfWithoutTimeout = executorService.submit(
+        () -> target.check(context));
+    final ListenableFuture<V> lf;
+
+    if (diskCheckTimeout > 0) {
+      lf = TimeoutFuture
+          .create(lfWithoutTimeout, diskCheckTimeout, TimeUnit.MILLISECONDS,
+              scheduledExecutorService);
+    } else {
+      lf = lfWithoutTimeout;
+    }
+
+    checksInProgress.put(target, lf);
+    addResultCachingCallback(target, lf);
+    return Optional.of(lf);
+  }
+
+  /**
+   * Register a callback to cache the result of a check.
+   * @param target
+   * @param lf
+   */
+  private void addResultCachingCallback(
+      Checkable<K, V> target, ListenableFuture<V> lf) {
+    Futures.addCallback(lf, new FutureCallback<V>() {
+      @Override
+      public void onSuccess(@Nullable V result) {
+        synchronized (ThrottledAsyncChecker.this) {
+          checksInProgress.remove(target);
+          completedChecks.put(target, new LastCheckResult<>(
+              result, timer.monotonicNow()));
+        }
+      }
+
+      @Override
+      public void onFailure(@Nonnull Throwable t) {
+        synchronized (ThrottledAsyncChecker.this) {
+          checksInProgress.remove(target);
+          completedChecks.put(target, new LastCheckResult<>(
+              t, timer.monotonicNow()));
+        }
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * The results of in-progress checks are not useful during shutdown,
+   * so we optimize for faster shutdown by interrupt all actively
+   * executing checks.
+   */
+  @Override
+  public void shutdownAndWait(long timeout, TimeUnit timeUnit)
+      throws InterruptedException {
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdownNow();
+      scheduledExecutorService.awaitTermination(timeout, timeUnit);
+    }
+
+    executorService.shutdownNow();
+    executorService.awaitTermination(timeout, timeUnit);
+  }
+
+  /**
+   * Status of running a check. It can either be a result or an
+   * exception, depending on whether the check completed or threw.
+   */
+  private static final class LastCheckResult<V> {
+    /**
+     * Timestamp at which the check completed.
+     */
+    private final long completedAt;
+
+    /**
+     * Result of running the check if it completed. null if it threw.
+     */
+    @Nullable
+    private final V result;
+
+    /**
+     * Exception thrown by the check. null if it returned a result.
+     */
+    private final Throwable exception; // null on success.
+
+    /**
+     * Initialize with a result.
+     * @param result
+     */
+    private LastCheckResult(V result, long completedAt) {
+      this.result = result;
+      this.exception = null;
+      this.completedAt = completedAt;
+    }
+
+    /**
+     * Initialize with an exception.
+     * @param completedAt
+     * @param t
+     */
+    private LastCheckResult(Throwable t, long completedAt) {
+      this.result = null;
+      this.exception = t;
+      this.completedAt = completedAt;
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java
new file mode 100644
index 0000000..a7a492a
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/**
+ * Some portions of this class have been modified to make it functional in this
+ * package.
+ */
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Implementation of {@code Futures#withTimeout}.
+ * <p>
+ * <p>Future that delegates to another but will finish early (via a
+ * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the
+ * specified duration expires. The delegate future is interrupted and
+ * cancelled if it times out.
+ */
+final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TimeoutFuture.class);
+
+  static <V> ListenableFuture<V> create(
+      ListenableFuture<V> delegate,
+      long time,
+      TimeUnit unit,
+      ScheduledExecutorService scheduledExecutor) {
+    TimeoutFuture<V> result = new TimeoutFuture<V>(delegate);
+    TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result);
+    result.timer = scheduledExecutor.schedule(fire, time, unit);
+    delegate.addListener(fire, directExecutor());
+    return result;
+  }
+
+  /*
+   * Memory visibility of these fields. There are two cases to consider.
+   *
+   * 1. visibility of the writes to these fields to Fire.run:
+   *
+   * The initial write to delegateRef is made definitely visible via the
+   * semantics of addListener/SES.schedule. The later racy write in cancel()
+   * is not guaranteed to be observed, however that is fine since the
+   * correctness is based on the atomic state in our base class. The initial
+   * write to timer is never definitely visible to Fire.run since it is
+   * assigned after SES.schedule is called. Therefore Fire.run has to check
+   * for null. However, it should be visible if Fire.run is called by
+   * delegate.addListener since addListener is called after the assignment
+   * to timer, and importantly this is the main situation in which we need to
+   * be able to see the write.
+   *
+   * 2. visibility of the writes to an afterDone() call triggered by cancel():
+   *
+   * Since these fields are non-final that means that TimeoutFuture is not
+   * being 'safely published', thus a motivated caller may be able to expose
+   * the reference to another thread that would then call cancel() and be
+   * unable to cancel the delegate. There are a number of ways to solve this,
+   * none of which are very pretty, and it is currently believed to be a
+   * purely theoretical problem (since the other actions should supply
+   * sufficient write-barriers).
+   */
+
+  @Nullable private ListenableFuture<V> delegateRef;
+  @Nullable private Future<?> timer;
+
+  private TimeoutFuture(ListenableFuture<V> delegate) {
+    this.delegateRef = Preconditions.checkNotNull(delegate);
+  }
+
+  /**
+   * A runnable that is called when the delegate or the timer completes.
+   */
+  private static final class Fire<V> implements Runnable {
+    @Nullable
+    TimeoutFuture<V> timeoutFutureRef;
+
+    Fire(
+        TimeoutFuture<V> timeoutFuture) {
+      this.timeoutFutureRef = timeoutFuture;
+    }
+
+    @Override
+    public void run() {
+      // If either of these reads return null then we must be after a
+      // successful cancel or another call to this method.
+      TimeoutFuture<V> timeoutFuture = timeoutFutureRef;
+      if (timeoutFuture == null) {
+        return;
+      }
+      ListenableFuture<V> delegate = timeoutFuture.delegateRef;
+      if (delegate == null) {
+        return;
+      }
+
+      /*
+       * If we're about to complete the TimeoutFuture, we want to release our
+       * reference to it. Otherwise, we'll pin it (and its result) in memory
+       * until the timeout task is GCed. (The need to clear our reference to
+       * the TimeoutFuture is the reason we use a *static* nested class with
+       * a manual reference back to the "containing" class.)
+       *
+       * This has the nice-ish side effect of limiting reentrancy: run() calls
+       * timeoutFuture.setException() calls run(). That reentrancy would
+       * already be harmless, since timeoutFuture can be set (and delegate
+       * cancelled) only once. (And "set only once" is important for other
+       * reasons: run() can still be invoked concurrently in different threads,
+       * even with the above null checks.)
+       */
+      timeoutFutureRef = null;
+      if (delegate.isDone()) {
+        timeoutFuture.setFuture(delegate);
+      } else {
+        try {
+          timeoutFuture.setException(
+              new TimeoutException("Future timed out: " + delegate));
+        } finally {
+          delegate.cancel(true);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void afterDone() {
+    maybePropagateCancellation(delegateRef);
+
+    Future<?> localTimer = timer;
+    // Try to cancel the timer as an optimization.
+    // timer may be null if this call to run was by the timer task since there
+    // is no happens-before edge between the assignment to timer and an
+    // execution of the timer task.
+    if (localTimer != null) {
+      localTimer.cancel(false);
+    }
+
+    delegateRef = null;
+    timer = null;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
index d30dd89..7addd63 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.volume;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 
@@ -37,8 +38,10 @@ import 
org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.Timer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,11 +51,18 @@ import java.util.Collection;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * VolumeSet to manage volumes in a DataNode.
+ * VolumeSet to manage HDDS volumes in a DataNode.
  */
 public class VolumeSet {
 
@@ -79,6 +89,14 @@ public class VolumeSet {
   private EnumMap<StorageType, List<HddsVolume>> volumeStateMap;
 
   /**
+   * An executor for periodic disk checks.
+   */
+  final ScheduledExecutorService diskCheckerservice;
+  final ScheduledFuture<?> periodicDiskChecker;
+
+  private static final long DISK_CHECK_INTERVAL_MINUTES = 15;
+
+  /**
    * A Reentrant Read Write Lock to synchronize volume operations in VolumeSet.
    * Any update to {@link VolumeSet#volumeMap},
    * {@link VolumeSet#failedVolumeMap}, or {@link VolumeSet#volumeStateMap}
@@ -90,6 +108,7 @@ public class VolumeSet {
   private String clusterID;
 
   private Runnable shutdownHook;
+  private final HddsVolumeChecker volumeChecker;
 
   public VolumeSet(String dnUuid, Configuration conf)
       throws IOException {
@@ -102,11 +121,30 @@ public class VolumeSet {
     this.clusterID = clusterID;
     this.conf = conf;
     this.volumeSetRWLock = new ReentrantReadWriteLock();
-
+    this.volumeChecker = getVolumeChecker(conf);
+    this.diskCheckerservice = Executors.newScheduledThreadPool(
+        1, r -> new Thread(r, "Periodic HDDS volume checker"));
+    this.periodicDiskChecker =
+        diskCheckerservice.scheduleWithFixedDelay(() -> {
+            try {
+              checkAllVolumes();
+            } catch (IOException e) {
+              LOG.warn("Exception while checking disks", e);
+            }
+          }, DISK_CHECK_INTERVAL_MINUTES, DISK_CHECK_INTERVAL_MINUTES,
+              TimeUnit.MINUTES);
     initializeVolumeSet();
   }
 
-  // Add DN volumes configured through ConfigKeys to volumeMap.
+  @VisibleForTesting
+  HddsVolumeChecker getVolumeChecker(Configuration conf)
+      throws DiskChecker.DiskErrorException {
+    return new HddsVolumeChecker(conf, new Timer());
+  }
+
+  /**
+   * Add DN volumes configured through ConfigKeys to volumeMap.
+   */
   private void initializeVolumeSet() throws IOException {
     volumeMap = new ConcurrentHashMap<>();
     failedVolumeMap = new ConcurrentHashMap<>();
@@ -123,7 +161,7 @@ public class VolumeSet {
     }
 
     for (StorageType storageType : StorageType.values()) {
-      volumeStateMap.put(storageType, new ArrayList<HddsVolume>());
+      volumeStateMap.put(storageType, new ArrayList<>());
     }
 
     for (String locationString : rawLocations) {
@@ -139,6 +177,12 @@ public class VolumeSet {
         volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
         LOG.info("Added Volume : {} to VolumeSet",
             hddsVolume.getHddsRootDir().getPath());
+
+        if (!hddsVolume.getHddsRootDir().mkdirs() &&
+            !hddsVolume.getHddsRootDir().exists()) {
+          throw new IOException("Failed to create HDDS storage dir " +
+              hddsVolume.getHddsRootDir());
+        }
       } catch (IOException e) {
         HddsVolume volume = new HddsVolume.Builder(locationString)
             .failedVolume(true).build();
@@ -147,8 +191,10 @@ public class VolumeSet {
       }
     }
 
+    checkAllVolumes();
+
     if (volumeMap.size() == 0) {
-      throw new DiskOutOfSpaceException("No storage location configured");
+      throw new DiskOutOfSpaceException("No storage locations configured");
     }
 
     // Ensure volume threads are stopped and scm df is saved during shutdown.
@@ -160,6 +206,52 @@ public class VolumeSet {
   }
 
   /**
+   * Run a synchronous parallel check of all HDDS volumes, removing
+   * failed volumes.
+   */
+  private void checkAllVolumes() throws IOException {
+    List<HddsVolume> allVolumes = getVolumesList();
+    Set<HddsVolume> failedVolumes;
+    try {
+      failedVolumes = volumeChecker.checkAllVolumes(allVolumes);
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while running disk check", e);
+    }
+
+    if (failedVolumes.size() > 0) {
+      LOG.warn("checkAllVolumes got {} failed volumes - {}",
+          failedVolumes.size(), failedVolumes);
+      handleVolumeFailures(failedVolumes);
+    } else {
+      LOG.debug("checkAllVolumes encountered no failures");
+    }
+  }
+
+  /**
+   * Handle one or more failed volumes.
+   * @param failedVolumes
+   */
+  private void handleVolumeFailures(Set<HddsVolume> failedVolumes) {
+    for (HddsVolume v: failedVolumes) {
+      this.writeLock();
+      try {
+        // Immediately mark the volume as failed so it is unavailable
+        // for new containers.
+        volumeMap.remove(v.getHddsRootDir().getPath());
+        failedVolumeMap.putIfAbsent(v.getHddsRootDir().getPath(), v);
+      } finally {
+        this.writeUnlock();
+      }
+
+      // TODO:
+      // 1. Mark all closed containers on the volume as unhealthy.
+      // 2. Consider stopping IO on open containers and tearing down
+      //    active pipelines.
+      // 3. Handle Ratis log disk failure.
+    }
+  }
+
+  /**
    * If Version file exists and the {@link VolumeSet#clusterID} is not set yet,
    * assign it the value from Version file. Otherwise, check that the given
    * id matches with the id from version file.
@@ -225,12 +317,12 @@ public class VolumeSet {
 
 
   // Add a volume to VolumeSet
-  public boolean addVolume(String dataDir) {
+  boolean addVolume(String dataDir) {
     return addVolume(dataDir, StorageType.DEFAULT);
   }
 
   // Add a volume to VolumeSet
-  public boolean addVolume(String volumeRoot, StorageType storageType) {
+  private boolean addVolume(String volumeRoot, StorageType storageType) {
     String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot);
     boolean success;
 
@@ -330,16 +422,22 @@ public class VolumeSet {
   }
 
   /**
-   * Shutdown's the volumeset, if saveVolumeSetUsed is false, call's
-   * {@link VolumeSet#saveVolumeSetUsed}.
+   * Shutdown the volumeset.
    */
   public void shutdown() {
     saveVolumeSetUsed();
+    stopDiskChecker();
     if (shutdownHook != null) {
       ShutdownHookManager.get().removeShutdownHook(shutdownHook);
     }
   }
 
+  private void stopDiskChecker() {
+    periodicDiskChecker.cancel(true);
+    volumeChecker.shutdownAndWait(0, TimeUnit.SECONDS);
+    diskCheckerservice.shutdownNow();
+  }
+
   @VisibleForTesting
   public List<HddsVolume> getVolumesList() {
     return ImmutableList.copyOf(volumeMap.values());
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 80ce13d..92d76ef 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -160,6 +160,7 @@ public class OzoneContainer {
     writeChannel.stop();
     readChannel.stop();
     hddsDispatcher.shutdown();
+    volumeSet.shutdown();
   }
 
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
new file mode 100644
index 0000000..f2a0c25
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
+import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests for {@link HddsVolumeChecker}.
+ */
+@RunWith(Parameterized.class)
+public class TestHddsVolumeChecker {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestHddsVolumeChecker.class);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(30_000);
+
+  /**
+   * Run each test case for each possible value of {@link VolumeCheckResult}.
+   * Including "null" for 'throw exception'.
+   * @return
+   */
+  @Parameters(name="{0}")
+  public static Collection<Object[]> data() {
+    List<Object[]> values = new ArrayList<>();
+    for (VolumeCheckResult result : VolumeCheckResult.values()) {
+      values.add(new Object[] {result});
+    }
+    values.add(new Object[] {null});
+    return values;
+  }
+
+  /**
+   * When null, the check call should throw an exception.
+   */
+  private final VolumeCheckResult expectedVolumeHealth;
+  private static final int NUM_VOLUMES = 2;
+
+
+  public TestHddsVolumeChecker(VolumeCheckResult expectedVolumeHealth) {
+    this.expectedVolumeHealth = expectedVolumeHealth;
+  }
+
+  /**
+   * Test {@link HddsVolumeChecker#checkVolume} propagates the
+   * check to the delegate checker.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testCheckOneVolume() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+    final HddsVolume volume = makeVolumes(1, expectedVolumeHealth).get(0);
+    final HddsVolumeChecker checker =
+        new HddsVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+    checker.setDelegateChecker(new DummyChecker());
+    final AtomicLong numCallbackInvocations = new AtomicLong(0);
+
+    /**
+     * Request a check and ensure it triggered {@link HddsVolume#check}.
+     */
+    boolean result =
+        checker.checkVolume(volume, (healthyVolumes, failedVolumes) -> {
+          numCallbackInvocations.incrementAndGet();
+          if (expectedVolumeHealth != null &&
+              expectedVolumeHealth != FAILED) {
+            assertThat(healthyVolumes.size(), is(1));
+            assertThat(failedVolumes.size(), is(0));
+          } else {
+            assertThat(healthyVolumes.size(), is(0));
+            assertThat(failedVolumes.size(), is(1));
+          }
+        });
+
+    GenericTestUtils.waitFor(() -> numCallbackInvocations.get() > 0, 5, 10000);
+
+    // Ensure that the check was invoked at least once.
+    verify(volume, times(1)).check(anyObject());
+    if (result) {
+      assertThat(numCallbackInvocations.get(), is(1L));
+    }
+  }
+
+  /**
+   * Test {@link HddsVolumeChecker#checkAllVolumes} propagates
+   * checks for all volumes to the delegate checker.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testCheckAllVolumes() throws Exception {
+    LOG.info("Executing {}", testName.getMethodName());
+
+    final List<HddsVolume> volumes = makeVolumes(
+        NUM_VOLUMES, expectedVolumeHealth);
+    final HddsVolumeChecker checker =
+        new HddsVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+    checker.setDelegateChecker(new DummyChecker());
+
+    Set<HddsVolume> failedVolumes = checker.checkAllVolumes(volumes);
+    LOG.info("Got back {} failed volumes", failedVolumes.size());
+
+    if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
+      assertThat(failedVolumes.size(), is(NUM_VOLUMES));
+    } else {
+      assertTrue(failedVolumes.isEmpty());
+    }
+
+    // Ensure each volume's check() method was called exactly once.
+    for (HddsVolume volume : volumes) {
+      verify(volume, times(1)).check(anyObject());
+    }
+  }
+
+  /**
+   * A checker to wraps the result of {@link HddsVolume#check} in
+   * an ImmediateFuture.
+   */
+  static class DummyChecker
+      implements AsyncChecker<Boolean, VolumeCheckResult> {
+
+    @Override
+    public Optional<ListenableFuture<VolumeCheckResult>> schedule(
+        Checkable<Boolean, VolumeCheckResult> target,
+        Boolean context) {
+      try {
+        LOG.info("Returning success for volume check");
+        return Optional.of(
+            Futures.immediateFuture(target.check(context)));
+      } catch (Exception e) {
+        LOG.info("check routine threw exception " + e);
+        return Optional.of(Futures.immediateFailedFuture(e));
+      }
+    }
+
+    @Override
+    public void shutdownAndWait(long timeout, TimeUnit timeUnit)
+        throws InterruptedException {
+      // Nothing to cancel.
+    }
+  }
+
+  static List<HddsVolume> makeVolumes(
+      int numVolumes, VolumeCheckResult health) throws Exception {
+    final List<HddsVolume> volumes = new ArrayList<>(numVolumes);
+    for (int i = 0; i < numVolumes; ++i) {
+      final HddsVolume volume = mock(HddsVolume.class);
+
+      if (health != null) {
+        when(volume.check(any(Boolean.class))).thenReturn(health);
+        when(volume.check(isNull())).thenReturn(health);
+      } else {
+        final DiskErrorException de = new DiskErrorException("Fake Exception");
+        when(volume.check(any(Boolean.class))).thenThrow(de);
+        when(volume.check(isNull())).thenThrow(de);
+      }
+      volumes.add(volume);
+    }
+    return volumes;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
new file mode 100644
index 0000000..687a12d
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.collect.Iterables;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Timer;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Verify that {@link VolumeSet} correctly checks for failed disks
+ * during initialization.
+ */
+public class TestVolumeSetDiskChecks {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestVolumeSetDiskChecks.class);
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(30_000);
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  Configuration conf = null;
+
+  /**
+   * Cleanup volume directories.
+   */
+  @After
+  public void cleanup() {
+    final Collection<String> dirs = conf.getTrimmedStringCollection(
+        DFS_DATANODE_DATA_DIR_KEY);
+
+    for (String d: dirs) {
+      FileUtils.deleteQuietly(new File(d));
+    }
+  }
+
+  /**
+   * Verify that VolumeSet creates volume root directories at startup.
+   * @throws IOException
+   */
+  @Test
+  public void testOzoneDirsAreCreated() throws IOException {
+    final int numVolumes = 2;
+
+    conf = getConfWithDataNodeDirs(numVolumes);
+    final VolumeSet volumeSet =
+        new VolumeSet(UUID.randomUUID().toString(), conf);
+
+    assertThat(volumeSet.getVolumesList().size(), is(numVolumes));
+    assertThat(volumeSet.getFailedVolumesList().size(), is(0));
+
+    // Verify that the Ozone dirs were created during initialization.
+    Collection<String> dirs = conf.getTrimmedStringCollection(
+        DFS_DATANODE_DATA_DIR_KEY);
+    for (String d : dirs) {
+      assertTrue(new File(d).isDirectory());
+    }
+  }
+
+  /**
+   * Verify that bad volumes are filtered at startup.
+   * @throws IOException
+   */
+  @Test
+  public void testBadDirectoryDetection() throws IOException {
+    final int numVolumes = 5;
+    final int numBadVolumes = 2;
+
+    conf = getConfWithDataNodeDirs(numVolumes);
+    final VolumeSet volumeSet = new VolumeSet(
+        UUID.randomUUID().toString(), conf) {
+      @Override
+      HddsVolumeChecker getVolumeChecker(Configuration conf)
+          throws DiskErrorException {
+        return new DummyChecker(conf, new Timer(), numBadVolumes);
+      }
+    };
+
+    assertThat(volumeSet.getFailedVolumesList().size(), is(numBadVolumes));
+    assertThat(volumeSet.getVolumesList().size(), is(numVolumes - 
numBadVolumes));
+  }
+
+  /**
+   * Verify that initialization fails if all volumes are bad.
+   */
+  @Test
+  public void testAllVolumesAreBad() throws IOException {
+    final int numVolumes = 5;
+
+    conf = getConfWithDataNodeDirs(numVolumes);
+    thrown.expect(IOException.class);
+    final VolumeSet volumeSet = new VolumeSet(
+        UUID.randomUUID().toString(), conf) {
+      @Override
+      HddsVolumeChecker getVolumeChecker(Configuration conf)
+          throws DiskErrorException {
+        return new DummyChecker(conf, new Timer(), numVolumes);
+      }
+    };
+  }
+
+  /**
+   * Update configuration with the specified number of Datanode
+   * storage directories.
+   * @param conf
+   * @param numDirs
+   */
+  private Configuration getConfWithDataNodeDirs(int numDirs) {
+    final Configuration conf = new OzoneConfiguration();
+    final List<String> dirs = new ArrayList<>();
+    for (int i = 0; i < numDirs; ++i) {
+      dirs.add(GenericTestUtils.getRandomizedTestDir().getPath());
+    }
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, String.join(",", dirs));
+    return conf;
+  }
+
+  /**
+   * A no-op checker that fails the given number of volumes and succeeds
+   * the rest.
+   */
+  static class DummyChecker extends HddsVolumeChecker {
+    private final int numBadVolumes;
+
+    public DummyChecker(Configuration conf, Timer timer, int numBadVolumes)
+        throws DiskErrorException {
+      super(conf, timer);
+      this.numBadVolumes = numBadVolumes;
+    }
+
+    @Override
+    public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes)
+        throws InterruptedException {
+      // Return the first 'numBadVolumes' as failed.
+      return ImmutableSet.copyOf(Iterables.limit(volumes, numBadVolumes));
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to