This is an automated email from the ASF dual-hosted git repository. arp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 3b49d7a HDDS-989. Check Hdds Volumes for errors. Contributed by Arpit Agarwal. 3b49d7a is described below commit 3b49d7aeae8819ce7c2c4f4fec057dd9e75dedf1 Author: Arpit Agarwal <a...@apache.org> AuthorDate: Sun Jan 27 11:18:30 2019 -0800 HDDS-989. Check Hdds Volumes for errors. Contributed by Arpit Agarwal. --- .../container/common/volume/AbstractFuture.java | 1291 ++++++++++++++++++++ .../ozone/container/common/volume/HddsVolume.java | 24 +- .../container/common/volume/HddsVolumeChecker.java | 418 +++++++ .../common/volume/ThrottledAsyncChecker.java | 245 ++++ .../container/common/volume/TimeoutFuture.java | 161 +++ .../ozone/container/common/volume/VolumeSet.java | 116 +- .../ozone/container/ozoneimpl/OzoneContainer.java | 1 + .../common/volume/TestHddsVolumeChecker.java | 212 ++++ .../common/volume/TestVolumeSetDiskChecks.java | 185 +++ 9 files changed, 2643 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java new file mode 100644 index 0000000..438692c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AbstractFuture.java @@ -0,0 +1,1291 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Some portions of this class have been modified to make it functional in this + * package. + */ +package org.apache.hadoop.ozone.container.common.volume; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.GwtCompatible; +import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.Uninterruptibles; +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater + .newUpdater; + +import javax.annotation.Nullable; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.locks.LockSupport; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An abstract implementation of {@link ListenableFuture}, intended for + * advanced users only. More common ways to create a {@code ListenableFuture} + * include instantiating a {@link SettableFuture}, submitting a task to a + * {@link ListeningExecutorService}, and deriving a {@code Future} from an + * existing one, typically using methods like {@link Futures#transform + * (ListenableFuture, com.google.common.base.Function) Futures.transform} + * and its overloaded versions. + * <p> + * <p>This class implements all methods in {@code ListenableFuture}. + * Subclasses should provide a way to set the result of the computation + * through the protected methods {@link #set(Object)}, + * {@link #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. + * Subclasses may also override {@link #interruptTask()}, which will be + * invoked automatically if a call to {@link #cancel(boolean) cancel(true)} + * succeeds in canceling the future. Subclasses should rarely override other + * methods. + */ + +@GwtCompatible(emulated = true) +public abstract class AbstractFuture<V> implements ListenableFuture<V> { + // NOTE: Whenever both tests are cheap and functional, it's faster to use &, + // | instead of &&, || + + private static final boolean GENERATE_CANCELLATION_CAUSES = + Boolean.parseBoolean( + System.getProperty("guava.concurrent.generate_cancellation_cause", + "false")); + + /** + * A less abstract subclass of AbstractFuture. This can be used to optimize + * setFuture by ensuring that {@link #get} calls exactly the implementation + * of {@link AbstractFuture#get}. + */ + abstract static class TrustedFuture<V> extends AbstractFuture<V> { + @Override + public final V get() throws InterruptedException, ExecutionException { + return super.get(); + } + + @Override + public final V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return super.get(timeout, unit); + } + + @Override + public final boolean isDone() { + return super.isDone(); + } + + @Override + public final boolean isCancelled() { + return super.isCancelled(); + } + + @Override + public final void addListener(Runnable listener, Executor executor) { + super.addListener(listener, executor); + } + + @Override + public final boolean cancel(boolean mayInterruptIfRunning) { + return super.cancel(mayInterruptIfRunning); + } + } + + // Logger to log exceptions caught when running listeners. + private static final Logger log = Logger + .getLogger(AbstractFuture.class.getName()); + + // A heuristic for timed gets. If the remaining timeout is less than this, + // spin instead of + // blocking. This value is what AbstractQueuedSynchronizer uses. + private static final long SPIN_THRESHOLD_NANOS = 1000L; + + private static final AtomicHelper ATOMIC_HELPER; + + static { + AtomicHelper helper; + + try { + helper = new UnsafeAtomicHelper(); + } catch (Throwable unsafeFailure) { + // catch absolutely everything and fall through to our 'SafeAtomicHelper' + // The access control checks that ARFU does means the caller class has + // to be AbstractFuture + // instead of SafeAtomicHelper, so we annoyingly define these here + try { + helper = + new SafeAtomicHelper( + newUpdater(Waiter.class, Thread.class, "thread"), + newUpdater(Waiter.class, Waiter.class, "next"), + newUpdater(AbstractFuture.class, Waiter.class, "waiters"), + newUpdater(AbstractFuture.class, Listener.class, "listeners"), + newUpdater(AbstractFuture.class, Object.class, "value")); + } catch (Throwable atomicReferenceFieldUpdaterFailure) { + // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs + // that cause getDeclaredField to throw a NoSuchFieldException when + // the field is definitely there. + // For these users fallback to a suboptimal implementation, based on + // synchronized. This will be a definite performance hit to those users. + log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure); + log.log( + Level.SEVERE, "SafeAtomicHelper is broken!", + atomicReferenceFieldUpdaterFailure); + helper = new SynchronizedHelper(); + } + } + ATOMIC_HELPER = helper; + + // Prevent rare disastrous classloading in first call to LockSupport.park. + // See: https://bugs.openjdk.java.net/browse/JDK-8074773 + @SuppressWarnings("unused") + Class<?> ensureLoaded = LockSupport.class; + } + + /** + * Waiter links form a Treiber stack, in the {@link #waiters} field. + */ + private static final class Waiter { + static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); + + @Nullable volatile Thread thread; + @Nullable volatile Waiter next; + + /** + * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this + * class is loaded before the ATOMIC_HELPER. Apparently this is possible + * on some android platforms. + */ + Waiter(boolean unused) { + } + + Waiter() { + // avoid volatile write, write is made visible by subsequent CAS on + // waiters field + ATOMIC_HELPER.putThread(this, Thread.currentThread()); + } + + // non-volatile write to the next field. Should be made visible by + // subsequent CAS on waiters field. + void setNext(Waiter next) { + ATOMIC_HELPER.putNext(this, next); + } + + void unpark() { + // This is racy with removeWaiter. The consequence of the race is that + // we may spuriously call unpark even though the thread has already + // removed itself from the list. But even if we did use a CAS, that + // race would still exist (it would just be ever so slightly smaller). + Thread w = thread; + if (w != null) { + thread = null; + LockSupport.unpark(w); + } + } + } + + /** + * Marks the given node as 'deleted' (null waiter) and then scans the list + * to unlink all deleted nodes. This is an O(n) operation in the common + * case (and O(n^2) in the worst), but we are saved by two things. + * <ul> + * <li>This is only called when a waiting thread times out or is + * interrupted. Both of which should be rare. + * <li>The waiters list should be very short. + * </ul> + */ + private void removeWaiter(Waiter node) { + node.thread = null; // mark as 'deleted' + restart: + while (true) { + Waiter pred = null; + Waiter curr = waiters; + if (curr == Waiter.TOMBSTONE) { + return; // give up if someone is calling complete + } + Waiter succ; + while (curr != null) { + succ = curr.next; + if (curr.thread != null) { // we aren't unlinking this node, update + // pred. + pred = curr; + } else if (pred != null) { // We are unlinking this node and it has a + // predecessor. + pred.next = succ; + if (pred.thread == null) { // We raced with another node that + // unlinked pred. Restart. + continue restart; + } + } else if (!ATOMIC_HELPER + .casWaiters(this, curr, succ)) { // We are unlinking head + continue restart; // We raced with an add or complete + } + curr = succ; + } + break; + } + } + + /** + * Listeners also form a stack through the {@link #listeners} field. + */ + private static final class Listener { + static final Listener TOMBSTONE = new Listener(null, null); + final Runnable task; + final Executor executor; + + // writes to next are made visible by subsequent CAS's on the listeners + // field + @Nullable Listener next; + + Listener(Runnable task, Executor executor) { + this.task = task; + this.executor = executor; + } + } + + /** + * A special value to represent {@code null}. + */ + private static final Object NULL = new Object(); + + /** + * A special value to represent failure, when {@link #setException} is + * called successfully. + */ + private static final class Failure { + static final Failure FALLBACK_INSTANCE = + new Failure( + new Throwable("Failure occurred while trying to finish a future" + + ".") { + @Override + public synchronized Throwable fillInStackTrace() { + return this; // no stack trace + } + }); + final Throwable exception; + + Failure(Throwable exception) { + this.exception = checkNotNull(exception); + } + } + + /** + * A special value to represent cancellation and the 'wasInterrupted' bit. + */ + private static final class Cancellation { + final boolean wasInterrupted; + @Nullable final Throwable cause; + + Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { + this.wasInterrupted = wasInterrupted; + this.cause = cause; + } + } + + /** + * A special value that encodes the 'setFuture' state. + */ + private static final class SetFuture<V> implements Runnable { + final AbstractFuture<V> owner; + final ListenableFuture<? extends V> future; + + SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { + this.owner = owner; + this.future = future; + } + + @Override + public void run() { + if (owner.value != this) { + // nothing to do, we must have been cancelled, don't bother inspecting + // the future. + return; + } + Object valueToSet = getFutureValue(future); + if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { + complete(owner); + } + } + } + + /** + * This field encodes the current state of the future. + * <p> + * <p>The valid values are: + * <ul> + * <li>{@code null} initial state, nothing has happened. + * <li>{@link Cancellation} terminal state, {@code cancel} was called. + * <li>{@link Failure} terminal state, {@code setException} was called. + * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. + * <li>{@link #NULL} terminal state, {@code set(null)} was called. + * <li>Any other non-null value, terminal state, {@code set} was called with + * a non-null argument. + * </ul> + */ + private volatile Object value; + + /** + * All listeners. + */ + private volatile Listener listeners; + + /** + * All waiting threads. + */ + private volatile Waiter waiters; + + /** + * Constructor for use by subclasses. + */ + protected AbstractFuture() { + } + + // Gets and Timed Gets + // + // * Be responsive to interruption + // * Don't create Waiter nodes if you aren't going to park, this helps + // reduce contention on the waiters field. + // * Future completion is defined by when #value becomes non-null/non + // SetFuture + // * Future completion can be observed if the waiters field contains a + // TOMBSTONE + + // Timed Get + // There are a few design constraints to consider + // * We want to be responsive to small timeouts, unpark() has non trivial + // latency overheads (I have observed 12 micros on 64 bit linux systems to + // wake up a parked thread). So if the timeout is small we shouldn't park(). + // This needs to be traded off with the cpu overhead of spinning, so we use + // SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for + // similar purposes. + // * We want to behave reasonably for timeouts of 0 + // * We are more responsive to completion than timeouts. This is because + // parkNanos depends on system scheduling and as such we could either miss + // our deadline, or unpark() could be delayed so that it looks like we + // timed out even though we didn't. For comparison FutureTask respects + // completion preferably and AQS is non-deterministic (depends on where in + // the queue the waiter is). If we wanted to be strict about it, we could + // store the unpark() time in the Waiter node and we could use that to make + // a decision about whether or not we timed out prior to being unparked. + + /* + * Improve the documentation of when InterruptedException is thrown. Our + * behavior matches the JDK's, but the JDK's documentation is misleading. + */ + + /** + * {@inheritDoc} + * <p> + * <p>The default {@link AbstractFuture} implementation throws {@code + * InterruptedException} if the current thread is interrupted before or + * during the call, even if the value is already available. + * + * @throws InterruptedException if the current thread was interrupted + * before or during the call + * (optional but recommended). + * @throws CancellationException {@inheritDoc} + */ + @Override + public V get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException, ExecutionException { + // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into + // the while(true) loop at the bottom and throw a timeoutexception. + long remainingNanos = unit + .toNanos(timeout); // we rely on the implicit null check on unit. + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Object localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + // we delay calling nanoTime until we know we will need to either park or + // spin + final long endNanos = remainingNanos > 0 ? System + .nanoTime() + remainingNanos : 0; + long_wait_loop: + if (remainingNanos >= SPIN_THRESHOLD_NANOS) { + Waiter oldHead = waiters; + if (oldHead != Waiter.TOMBSTONE) { + Waiter node = new Waiter(); + do { + node.setNext(oldHead); + if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { + while (true) { + LockSupport.parkNanos(this, remainingNanos); + // Check interruption first, if we woke up due to interruption + // we need to honor that. + if (Thread.interrupted()) { + removeWaiter(node); + throw new InterruptedException(); + } + + // Otherwise re-read and check doneness. If we loop then it must + // have been a spurious wakeup + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + + // timed out? + remainingNanos = endNanos - System.nanoTime(); + if (remainingNanos < SPIN_THRESHOLD_NANOS) { + // Remove the waiter, one way or another we are done parking + // this thread. + removeWaiter(node); + break long_wait_loop; // jump down to the busy wait loop + } + } + } + oldHead = waiters; // re-read and loop. + } while (oldHead != Waiter.TOMBSTONE); + } + // re-read value, if we get here then we must have observed a TOMBSTONE + // while trying to add a waiter. + return getDoneValue(value); + } + // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and + // there is no node on the waiters list + while (remainingNanos > 0) { + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + if (Thread.interrupted()) { + throw new InterruptedException(); + } + remainingNanos = endNanos - System.nanoTime(); + } + throw new TimeoutException(); + } + + /* + * Improve the documentation of when InterruptedException is thrown. Our + * behavior matches the JDK's, but the JDK's documentation is misleading. + */ + + /** + * {@inheritDoc} + * <p> + * <p>The default {@link AbstractFuture} implementation throws {@code + * InterruptedException} if the current thread is interrupted before or + * during the call, even if the value is already available. + * + * @throws InterruptedException if the current thread was interrupted + * before or during the call + * (optional but recommended). + * @throws CancellationException {@inheritDoc} + */ + @Override + public V get() throws InterruptedException, ExecutionException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Object localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + Waiter oldHead = waiters; + if (oldHead != Waiter.TOMBSTONE) { + Waiter node = new Waiter(); + do { + node.setNext(oldHead); + if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { + // we are on the stack, now wait for completion. + while (true) { + LockSupport.park(this); + // Check interruption first, if we woke up due to interruption we + // need to honor that. + if (Thread.interrupted()) { + removeWaiter(node); + throw new InterruptedException(); + } + // Otherwise re-read and check doneness. If we loop then it must + // have been a spurious wakeup + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + } + } + oldHead = waiters; // re-read and loop. + } while (oldHead != Waiter.TOMBSTONE); + } + // re-read value, if we get here then we must have observed a TOMBSTONE + // while trying to add a waiter. + return getDoneValue(value); + } + + /** + * Unboxes {@code obj}. Assumes that obj is not {@code null} or a + * {@link SetFuture}. + */ + private V getDoneValue(Object obj) throws ExecutionException { + // While this seems like it might be too branch-y, simple benchmarking + // proves it to be unmeasurable (comparing done AbstractFutures with + // immediateFuture) + if (obj instanceof Cancellation) { + throw cancellationExceptionWithCause( + "Task was cancelled.", ((Cancellation) obj).cause); + } else if (obj instanceof Failure) { + throw new ExecutionException(((Failure) obj).exception); + } else if (obj == NULL) { + return null; + } else { + @SuppressWarnings("unchecked") // this is the only other option + V asV = (V) obj; + return asV; + } + } + + @Override + public boolean isDone() { + final Object localValue = value; + return localValue != null & !(localValue instanceof SetFuture); + } + + @Override + public boolean isCancelled() { + final Object localValue = value; + return localValue instanceof Cancellation; + } + + /** + * {@inheritDoc} + * <p> + * <p>If a cancellation attempt succeeds on a {@code Future} that had + * previously been {@linkplain#setFuture set asynchronously}, then the + * cancellation will also be propagated to the delegate {@code Future} that + * was supplied in the {@code setFuture} call. + */ + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + Object localValue = value; + boolean rValue = false; + if (localValue == null | localValue instanceof SetFuture) { + // Try to delay allocating the exception. At this point we may still + // lose the CAS, but it is certainly less likely. + Throwable cause = + GENERATE_CANCELLATION_CAUSES + ? new CancellationException("Future.cancel() was called.") + : null; + Object valueToSet = new Cancellation(mayInterruptIfRunning, cause); + AbstractFuture<?> abstractFuture = this; + while (true) { + if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { + rValue = true; + // We call interuptTask before calling complete(), which is + // consistent with FutureTask + if (mayInterruptIfRunning) { + abstractFuture.interruptTask(); + } + complete(abstractFuture); + if (localValue instanceof SetFuture) { + // propagate cancellation to the future set in setfuture, this is + // racy, and we don't care if we are successful or not. + ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue) + .future; + if (futureToPropagateTo instanceof TrustedFuture) { + // If the future is a TrustedFuture then we specifically avoid + // calling cancel() this has 2 benefits + // 1. for long chains of futures strung together with setFuture + // we consume less stack + // 2. we avoid allocating Cancellation objects at every level of + // the cancellation chain + // We can only do this for TrustedFuture, because + // TrustedFuture.cancel is final and does nothing but delegate + // to this method. + AbstractFuture<?> trusted = (AbstractFuture<?>) + futureToPropagateTo; + localValue = trusted.value; + if (localValue == null | localValue instanceof SetFuture) { + abstractFuture = trusted; + continue; // loop back up and try to complete the new future + } + } else { + // not a TrustedFuture, call cancel directly. + futureToPropagateTo.cancel(mayInterruptIfRunning); + } + } + break; + } + // obj changed, reread + localValue = abstractFuture.value; + if (!(localValue instanceof SetFuture)) { + // obj cannot be null at this point, because value can only change + // from null to non-null. So if value changed (and it did since we + // lost the CAS), then it cannot be null and since it isn't a + // SetFuture, then the future must be done and we should exit the loop + break; + } + } + } + return rValue; + } + + /** + * Subclasses can override this method to implement interruption of the + * future's computation. The method is invoked automatically by a + * successful call to {@link #cancel(boolean) cancel(true)}. + * <p> + * <p>The default implementation does nothing. + * + * @since 10.0 + */ + protected void interruptTask() { + } + + /** + * Returns true if this future was cancelled with {@code + * mayInterruptIfRunning} set to {@code true}. + * + * @since 14.0 + */ + protected final boolean wasInterrupted() { + final Object localValue = value; + return (localValue instanceof Cancellation) && ((Cancellation) localValue) + .wasInterrupted; + } + + /** + * {@inheritDoc} + * + * @since 10.0 + */ + @Override + public void addListener(Runnable listener, Executor executor) { + checkNotNull(listener, "Runnable was null."); + checkNotNull(executor, "Executor was null."); + Listener oldHead = listeners; + if (oldHead != Listener.TOMBSTONE) { + Listener newNode = new Listener(listener, executor); + do { + newNode.next = oldHead; + if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { + return; + } + oldHead = listeners; // re-read + } while (oldHead != Listener.TOMBSTONE); + } + // If we get here then the Listener TOMBSTONE was set, which means the + // future is done, call the listener. + executeListener(listener, executor); + } + + /** + * Sets the result of this {@code Future} unless this {@code Future} has + * already been cancelled or set (including + * {@linkplain #setFuture set asynchronously}). When a call to this method + * returns, the {@code Future} is guaranteed to be + * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which + * case it returns {@code true}). If it returns {@code false}, the {@code + * Future} may have previously been set asynchronously, in which case its + * result may not be known yet. That result, though not yet known, cannot + * be overridden by a call to a {@code set*} method, only by a call to + * {@link #cancel}. + * + * @param value the value to be used as the result + * @return true if the attempt was accepted, completing the {@code Future} + */ + protected boolean set(@Nullable V value) { + Object valueToSet = value == null ? NULL : value; + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + complete(this); + return true; + } + return false; + } + + /** + * Sets the failed result of this {@code Future} unless this {@code Future} + * has already been cancelled or set (including + * {@linkplain #setFuture set asynchronously}). When a call to this method + * returns, the {@code Future} is guaranteed to be + * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which + * case it returns {@code true}). If it returns {@code false}, the + * {@code Future} may have previously been set asynchronously, in which case + * its result may not be known yet. That result, though not yet known, + * cannot be overridden by a call to a {@code set*} method, only by a call + * to {@link #cancel}. + * + * @param throwable the exception to be used as the failed result + * @return true if the attempt was accepted, completing the {@code Future} + */ + protected boolean setException(Throwable throwable) { + Object valueToSet = new Failure(checkNotNull(throwable)); + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + complete(this); + return true; + } + return false; + } + + /** + * Sets the result of this {@code Future} to match the supplied input + * {@code Future} once the supplied {@code Future} is done, unless this + * {@code Future} has already been cancelled or set (including "set + * asynchronously," defined below). + * <p> + * <p>If the supplied future is {@linkplain #isDone done} when this method + * is called and the call is accepted, then this future is guaranteed to + * have been completed with the supplied future by the time this method + * returns. If the supplied future is not done and the call is accepted, then + * the future will be <i>set asynchronously</i>. Note that such a result, + * though not yet known, cannot be overridden by a call to a {@code set*} + * method, only by a call to {@link #cancel}. + * <p> + * <p>If the call {@code setFuture(delegate)} is accepted and this {@code + * Future} is later cancelled, cancellation will be propagated to {@code + * delegate}. Additionally, any call to {@code setFuture} after any + * cancellation will propagate cancellation to the supplied {@code Future}. + * + * @param future the future to delegate to + * @return true if the attempt was accepted, indicating that the {@code + * Future} was not previously cancelled or set. + * @since 19.0 + */ + @Beta + protected boolean setFuture(ListenableFuture<? extends V> future) { + checkNotNull(future); + Object localValue = value; + if (localValue == null) { + if (future.isDone()) { + Object value = getFutureValue(future); + if (ATOMIC_HELPER.casValue(this, null, value)) { + complete(this); + return true; + } + return false; + } + SetFuture valueToSet = new SetFuture<V>(this, future); + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + // the listener is responsible for calling completeWithFuture, + // directExecutor is appropriate since all we are doing is unpacking + // a completed future which should be fast. + try { + future.addListener(valueToSet, directExecutor()); + } catch (Throwable t) { + // addListener has thrown an exception! SetFuture.run can't throw + // any exceptions so this must have been caused by addListener + // itself. The most likely explanation is a misconfigured mock. Try + // to switch to Failure. + Failure failure; + try { + failure = new Failure(t); + } catch (Throwable oomMostLikely) { + failure = Failure.FALLBACK_INSTANCE; + } + // Note: The only way this CAS could fail is if cancel() has raced + // with us. That is ok. + boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); + } + return true; + } + localValue = value; // we lost the cas, fall through and maybe cancel + } + // The future has already been set to something. If it is cancellation we + // should cancel the incoming future. + if (localValue instanceof Cancellation) { + // we don't care if it fails, this is best-effort. + future.cancel(((Cancellation) localValue).wasInterrupted); + } + return false; + } + + /** + * Returns a value, suitable for storing in the {@link #value} field. From + * the given future, which is assumed to be done. + * <p> + * <p>This is approximately the inverse of {@link #getDoneValue(Object)} + */ + private static Object getFutureValue(ListenableFuture<?> future) { + Object valueToSet; + if (future instanceof TrustedFuture) { + // Break encapsulation for TrustedFuture instances since we know that + // subclasses cannot override .get() (since it is final) and therefore + // this is equivalent to calling .get() and unpacking the exceptions + // like we do below (just much faster because it is a single field read + // instead of a read, several branches and possibly creating exceptions). + return ((AbstractFuture<?>) future).value; + } else { + // Otherwise calculate valueToSet by calling .get() + try { + Object v = getDone(future); + valueToSet = v == null ? NULL : v; + } catch (ExecutionException exception) { + valueToSet = new Failure(exception.getCause()); + } catch (CancellationException cancellation) { + valueToSet = new Cancellation(false, cancellation); + } catch (Throwable t) { + valueToSet = new Failure(t); + } + } + return valueToSet; + } + + /** + * Unblocks all threads and runs all listeners. + */ + private static void complete(AbstractFuture<?> future) { + Listener next = null; + outer: + while (true) { + future.releaseWaiters(); + // We call this before the listeners in order to avoid needing to manage + // a separate stack data structure for them. afterDone() should be + // generally fast and only used for cleanup work... but in theory can + // also be recursive and create StackOverflowErrors + future.afterDone(); + // push the current set of listeners onto next + next = future.clearListeners(next); + future = null; + while (next != null) { + Listener curr = next; + next = next.next; + Runnable task = curr.task; + if (task instanceof SetFuture) { + SetFuture<?> setFuture = (SetFuture<?>) task; + // We unwind setFuture specifically to avoid StackOverflowErrors in + // the case of long chains of SetFutures + // Handling this special case is important because there is no way + // to pass an executor to setFuture, so a user couldn't break the + // chain by doing this themselves. It is also potentially common + // if someone writes a recursive Futures.transformAsync transformer. + future = setFuture.owner; + if (future.value == setFuture) { + Object valueToSet = getFutureValue(setFuture.future); + if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { + continue outer; + } + } + // other wise the future we were trying to set is already done. + } else { + executeListener(task, curr.executor); + } + } + break; + } + } + + public static <V> V getDone(Future<V> future) throws ExecutionException { + /* + * We throw IllegalStateException, since the call could succeed later. + * Perhaps we "should" throw IllegalArgumentException, since the call + * could succeed with a different argument. Those exceptions' docs + * suggest that either is acceptable. Google's Java Practices page + * recommends IllegalArgumentException here, in part to keep its + * recommendation simple: Static methods should throw + * IllegalStateException only when they use static state. + * + * + * Why do we deviate here? The answer: We want for fluentFuture.getDone() + * to throw the same exception as Futures.getDone(fluentFuture). + */ + Preconditions.checkState(future.isDone(), "Future was expected to be " + + "done:" + + " %s", future); + return Uninterruptibles.getUninterruptibly(future); + } + + /** + * Callback method that is called exactly once after the future is completed. + * <p> + * <p>If {@link #interruptTask} is also run during completion, + * {@link #afterDone} runs after it. + * <p> + * <p>The default implementation of this method in {@code AbstractFuture} + * does nothing. This is intended for very lightweight cleanup work, for + * example, timing statistics or clearing fields. + * If your task does anything heavier consider, just using a listener with + * an executor. + * + * @since 20.0 + */ + @Beta + protected void afterDone() { + } + + /** + * If this future has been cancelled (and possibly interrupted), cancels + * (and possibly interrupts) the given future (if available). + * <p> + * <p>This method should be used only when this future is completed. It is + * designed to be called from {@code done}. + */ + final void maybePropagateCancellation(@Nullable Future<?> related) { + if (related != null & isCancelled()) { + related.cancel(wasInterrupted()); + } + } + + /** + * Releases all threads in the {@link #waiters} list, and clears the list. + */ + private void releaseWaiters() { + Waiter head; + do { + head = waiters; + } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); + for ( + Waiter currentWaiter = head; + currentWaiter != null; + currentWaiter = currentWaiter.next) { + currentWaiter.unpark(); + } + } + + /** + * Clears the {@link #listeners} list and prepends its contents to {@code + * onto}, least recently added first. + */ + private Listener clearListeners(Listener onto) { + // We need to + // 1. atomically swap the listeners with TOMBSTONE, this is because + // addListener uses that to to synchronize with us + // 2. reverse the linked list, because despite our rather clear contract, + // people depend on us executing listeners in the order they were added + // 3. push all the items onto 'onto' and return the new head of the stack + Listener head; + do { + head = listeners; + } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); + Listener reversedList = onto; + while (head != null) { + Listener tmp = head; + head = head.next; + tmp.next = reversedList; + reversedList = tmp; + } + return reversedList; + } + + /** + * Submits the given runnable to the given {@link Executor} catching and + * logging all {@linkplain RuntimeException runtime exceptions} thrown by + * the executor. + */ + private static void executeListener(Runnable runnable, Executor executor) { + try { + executor.execute(runnable); + } catch (RuntimeException e) { + // Log it and keep going -- bad runnable and/or executor. Don't punish + // the other runnables if we're given a bad one. We only catch + // RuntimeException because we want Errors to propagate up. + log.log( + Level.SEVERE, + "RuntimeException while executing runnable " + runnable + " with " + + "executor " + executor, + e); + } + } + + private abstract static class AtomicHelper { + /** + * Non volatile write of the thread to the {@link Waiter#thread} field. + */ + abstract void putThread(Waiter waiter, Thread newValue); + + /** + * Non volatile write of the waiter to the {@link Waiter#next} field. + */ + abstract void putNext(Waiter waiter, Waiter newValue); + + /** + * Performs a CAS operation on the {@link #waiters} field. + */ + abstract boolean casWaiters( + AbstractFuture<?> future, Waiter expect, + Waiter update); + + /** + * Performs a CAS operation on the {@link #listeners} field. + */ + abstract boolean casListeners( + AbstractFuture<?> future, Listener expect, + Listener update); + + /** + * Performs a CAS operation on the {@link #value} field. + */ + abstract boolean casValue( + AbstractFuture<?> future, Object expect, Object update); + } + + /** + * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. + * <p> + * <p>Static initialization of this class will fail if the + * {@link sun.misc.Unsafe} object cannot be accessed. + */ + private static final class UnsafeAtomicHelper extends AtomicHelper { + static final sun.misc.Unsafe UNSAFE; + static final long LISTENERS_OFFSET; + static final long WAITERS_OFFSET; + static final long VALUE_OFFSET; + static final long WAITER_THREAD_OFFSET; + static final long WAITER_NEXT_OFFSET; + + static { + sun.misc.Unsafe unsafe = null; + try { + unsafe = sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException tryReflectionInstead) { + try { + unsafe = + AccessController.doPrivileged( + new PrivilegedExceptionAction<sun.misc.Unsafe>() { + @Override + public sun.misc.Unsafe run() throws Exception { + Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) { + return k.cast(x); + } + } + throw new NoSuchFieldError("the Unsafe"); + } + }); + } catch (PrivilegedActionException e) { + throw new RuntimeException( + "Could not initialize intrinsics", e.getCause()); + } + } + try { + Class<?> abstractFuture = AbstractFuture.class; + WAITERS_OFFSET = unsafe + .objectFieldOffset(abstractFuture.getDeclaredField("waiters")); + LISTENERS_OFFSET = unsafe + .objectFieldOffset(abstractFuture.getDeclaredField("listeners")); + VALUE_OFFSET = unsafe + .objectFieldOffset(abstractFuture.getDeclaredField("value")); + WAITER_THREAD_OFFSET = unsafe + .objectFieldOffset(Waiter.class.getDeclaredField("thread")); + WAITER_NEXT_OFFSET = unsafe + .objectFieldOffset(Waiter.class.getDeclaredField("next")); + UNSAFE = unsafe; + } catch (Exception e) { + throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + public static void throwIfUnchecked(Throwable throwable) { + checkNotNull(throwable); + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } + if (throwable instanceof Error) { + throw (Error) throwable; + } + } + + @Override + void putThread(Waiter waiter, Thread newValue) { + UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); + } + + /** + * Performs a CAS operation on the {@link #waiters} field. + */ + @Override + boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter + update) { + return UNSAFE + .compareAndSwapObject(future, WAITERS_OFFSET, expect, update); + } + + /** + * Performs a CAS operation on the {@link #listeners} field. + */ + @Override + boolean casListeners( + AbstractFuture<?> future, Listener expect, Listener update) { + return UNSAFE + .compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); + } + + /** + * Performs a CAS operation on the {@link #value} field. + */ + @Override + boolean casValue(AbstractFuture<?> future, Object expect, Object update) { + return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); + } + } + + /** + * {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. + */ + private static final class SafeAtomicHelper extends AtomicHelper { + final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; + final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; + final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; + final AtomicReferenceFieldUpdater<AbstractFuture, Listener> + listenersUpdater; + final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; + + SafeAtomicHelper( + AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, + AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, + AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, + AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, + AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { + this.waiterThreadUpdater = waiterThreadUpdater; + this.waiterNextUpdater = waiterNextUpdater; + this.waitersUpdater = waitersUpdater; + this.listenersUpdater = listenersUpdater; + this.valueUpdater = valueUpdater; + } + + @Override + void putThread(Waiter waiter, Thread newValue) { + waiterThreadUpdater.lazySet(waiter, newValue); + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + waiterNextUpdater.lazySet(waiter, newValue); + } + + @Override + boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter + update) { + return waitersUpdater.compareAndSet(future, expect, update); + } + + @Override + boolean casListeners( + AbstractFuture<?> future, Listener expect, Listener update) { + return listenersUpdater.compareAndSet(future, expect, update); + } + + @Override + boolean casValue(AbstractFuture<?> future, Object expect, Object update) { + return valueUpdater.compareAndSet(future, expect, update); + } + } + + /** + * {@link AtomicHelper} based on {@code synchronized} and volatile writes. + * <p> + * <p>This is an implementation of last resort for when certain basic VM + * features are broken (like AtomicReferenceFieldUpdater). + */ + private static final class SynchronizedHelper extends AtomicHelper { + @Override + void putThread(Waiter waiter, Thread newValue) { + waiter.thread = newValue; + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + waiter.next = newValue; + } + + @Override + boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter + update) { + synchronized (future) { + if (future.waiters == expect) { + future.waiters = update; + return true; + } + return false; + } + } + + @Override + boolean casListeners( + AbstractFuture<?> future, Listener expect, Listener update) { + synchronized (future) { + if (future.listeners == expect) { + future.listeners = update; + return true; + } + return false; + } + } + + @Override + boolean casValue(AbstractFuture<?> future, Object expect, Object update) { + synchronized (future) { + if (future.value == expect) { + future.value = update; + return true; + } + return false; + } + } + } + + private static CancellationException cancellationExceptionWithCause( + @Nullable String message, @Nullable Throwable cause) { + CancellationException exception = new CancellationException(message); + exception.initCause(cause); + return exception; + } + + /** + * Returns an {@link Executor} that runs each task in the thread that invokes + * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}. + * <p> + * <p>This instance is equivalent to: <pre> {@code + * final class DirectExecutor implements Executor { + * public void execute(Runnable r) { + * r.run(); + * } + * }}</pre> + */ + public static Executor directExecutor() { + return DirectExecutor.INSTANCE; + } + + /** + * See {@link #directExecutor} for behavioral notes. + */ + private enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public String toString() { + return "MoreExecutors.directExecutor()"; + } + } + +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index d088826..4cf6c3d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -20,17 +20,23 @@ package org.apache.hadoop.ozone.container.common.volume; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.sun.istack.Nullable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.GetSpaceUsed; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.ozone.common.InconsistentStorageStateException; import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion; import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile; import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion; import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.Time; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +64,10 @@ import java.util.UUID; * During DN startup, if the VERSION file exists, we verify that the * clusterID in the version file matches the clusterID from SCM. */ -public final class HddsVolume { +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HddsVolume + implements Checkable<Boolean, VolumeCheckResult> { private static final Logger LOG = LoggerFactory.getLogger(HddsVolume.class); @@ -77,6 +86,19 @@ public final class HddsVolume { private int layoutVersion; // layout version of the storage data /** + * Run a check on the current volume to determine if it is healthy. + * @param unused context for the check, ignored. + * @return result of checking the volume. + * @throws Exception if an exception was encountered while running + * the volume check. + */ + @Override + public VolumeCheckResult check(@Nullable Boolean unused) throws Exception { + DiskChecker.checkDir(hddsRootDir); + return VolumeCheckResult.HEALTHY; + } + + /** * Builder for HddsVolume. */ public static class Builder { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java new file mode 100644 index 0000000..6df81df --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolumeChecker.java @@ -0,0 +1,418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.volume; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY; + + +/** + * A class that encapsulates running disk checks against each HDDS volume and + * allows retrieving a list of failed volumes. + */ +public class HddsVolumeChecker { + + public static final Logger LOG = + LoggerFactory.getLogger(HddsVolumeChecker.class); + + private AsyncChecker<Boolean, VolumeCheckResult> delegateChecker; + + private final AtomicLong numVolumeChecks = new AtomicLong(0); + private final AtomicLong numAllVolumeChecks = new AtomicLong(0); + private final AtomicLong numSkippedChecks = new AtomicLong(0); + + /** + * Max allowed time for a disk check in milliseconds. If the check + * doesn't complete within this time we declare the disk as dead. + */ + private final long maxAllowedTimeForCheckMs; + + /** + * Minimum time between two successive disk checks of a volume. + */ + private final long minDiskCheckGapMs; + + /** + * Timestamp of the last check of all volumes. + */ + private long lastAllVolumesCheck; + + private final Timer timer; + + private final ExecutorService checkVolumeResultHandlerExecutorService; + + /** + * @param conf Configuration object. + * @param timer {@link Timer} object used for throttling checks. + */ + public HddsVolumeChecker(Configuration conf, Timer timer) + throws DiskErrorException { + maxAllowedTimeForCheckMs = conf.getTimeDuration( + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, + DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + if (maxAllowedTimeForCheckMs <= 0) { + throw new DiskErrorException("Invalid value configured for " + + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - " + + maxAllowedTimeForCheckMs + " (should be > 0)"); + } + + this.timer = timer; + + /** + * Maximum number of volume failures that can be tolerated without + * declaring a fatal error. + */ + int maxVolumeFailuresTolerated = conf.getInt( + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); + + minDiskCheckGapMs = conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT, + TimeUnit.MILLISECONDS); + + if (minDiskCheckGapMs < 0) { + throw new DiskErrorException("Invalid value configured for " + + DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - " + + minDiskCheckGapMs + " (should be >= 0)"); + } + + long diskCheckTimeout = conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + if (diskCheckTimeout < 0) { + throw new DiskErrorException("Invalid value configured for " + + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - " + + diskCheckTimeout + " (should be >= 0)"); + } + + lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs; + + if (maxVolumeFailuresTolerated < DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) { + throw new DiskErrorException("Invalid value configured for " + + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - " + + maxVolumeFailuresTolerated + " " + + DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG); + } + + delegateChecker = new ThrottledAsyncChecker<>( + timer, minDiskCheckGapMs, diskCheckTimeout, + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("DataNode DiskChecker thread %d") + .setDaemon(true) + .build())); + + checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("VolumeCheck ResultHandler thread %d") + .setDaemon(true) + .build()); + } + + /** + * Run checks against all HDDS volumes. + * + * This check may be performed at service startup and subsequently at + * regular intervals to detect and handle failed volumes. + * + * @param volumes - Set of volumes to be checked. This set must be immutable + * for the duration of the check else the results will be + * unexpected. + * + * @return set of failed volumes. + */ + public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes) + throws InterruptedException { + final long gap = timer.monotonicNow() - lastAllVolumesCheck; + if (gap < minDiskCheckGapMs) { + numSkippedChecks.incrementAndGet(); + LOG.trace( + "Skipped checking all volumes, time since last check {} is less " + + "than the minimum gap between checks ({} ms).", + gap, minDiskCheckGapMs); + return Collections.emptySet(); + } + + lastAllVolumesCheck = timer.monotonicNow(); + final Set<HddsVolume> healthyVolumes = new HashSet<>(); + final Set<HddsVolume> failedVolumes = new HashSet<>(); + final Set<HddsVolume> allVolumes = new HashSet<>(); + + final AtomicLong numVolumes = new AtomicLong(volumes.size()); + final CountDownLatch latch = new CountDownLatch(1); + + for (HddsVolume v : volumes) { + Optional<ListenableFuture<VolumeCheckResult>> olf = + delegateChecker.schedule(v, null); + LOG.info("Scheduled health check for volume {}", v); + if (olf.isPresent()) { + allVolumes.add(v); + Futures.addCallback(olf.get(), + new ResultHandler(v, healthyVolumes, failedVolumes, + numVolumes, (ignored1, ignored2) -> latch.countDown())); + } else { + if (numVolumes.decrementAndGet() == 0) { + latch.countDown(); + } + } + } + + // Wait until our timeout elapses, after which we give up on + // the remaining volumes. + if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { + LOG.warn("checkAllVolumes timed out after {} ms" + + maxAllowedTimeForCheckMs); + } + + numAllVolumeChecks.incrementAndGet(); + synchronized (this) { + // All volumes that have not been detected as healthy should be + // considered failed. This is a superset of 'failedVolumes'. + // + // Make a copy under the mutex as Sets.difference() returns a view + // of a potentially changing set. + return new HashSet<>(Sets.difference(allVolumes, healthyVolumes)); + } + } + + /** + * A callback interface that is supplied the result of running an + * async disk check on multiple volumes. + */ + public interface Callback { + /** + * @param healthyVolumes set of volumes that passed disk checks. + * @param failedVolumes set of volumes that failed disk checks. + */ + void call(Set<HddsVolume> healthyVolumes, + Set<HddsVolume> failedVolumes); + } + + /** + * Check a single volume asynchronously, returning a {@link ListenableFuture} + * that can be used to retrieve the final result. + * + * If the volume cannot be referenced then it is already closed and + * cannot be checked. No error is propagated to the callback. + * + * @param volume the volume that is to be checked. + * @param callback callback to be invoked when the volume check completes. + * @return true if the check was scheduled and the callback will be invoked. + * false otherwise. + */ + public boolean checkVolume(final HddsVolume volume, Callback callback) { + if (volume == null) { + LOG.debug("Cannot schedule check on null volume"); + return false; + } + + Optional<ListenableFuture<VolumeCheckResult>> olf = + delegateChecker.schedule(volume, null); + if (olf.isPresent()) { + numVolumeChecks.incrementAndGet(); + Futures.addCallback(olf.get(), + new ResultHandler(volume, new HashSet<>(), new HashSet<>(), + new AtomicLong(1), callback), + checkVolumeResultHandlerExecutorService + ); + return true; + } + return false; + } + + /** + * A callback to process the results of checking a volume. + */ + private class ResultHandler + implements FutureCallback<VolumeCheckResult> { + private final HddsVolume volume; + private final Set<HddsVolume> failedVolumes; + private final Set<HddsVolume> healthyVolumes; + private final AtomicLong volumeCounter; + + @Nullable + private final Callback callback; + + /** + * + * @param healthyVolumes set of healthy volumes. If the disk check is + * successful, add the volume here. + * @param failedVolumes set of failed volumes. If the disk check fails, + * add the volume here. + * @param volumeCounter volumeCounter used to trigger callback invocation. + * @param callback invoked when the volumeCounter reaches 0. + */ + ResultHandler(HddsVolume volume, + Set<HddsVolume> healthyVolumes, + Set<HddsVolume> failedVolumes, + AtomicLong volumeCounter, + @Nullable Callback callback) { + this.volume = volume; + this.healthyVolumes = healthyVolumes; + this.failedVolumes = failedVolumes; + this.volumeCounter = volumeCounter; + this.callback = callback; + } + + @Override + public void onSuccess(@Nonnull VolumeCheckResult result) { + switch(result) { + case HEALTHY: + case DEGRADED: + LOG.debug("Volume {} is {}.", volume, result); + markHealthy(); + break; + case FAILED: + LOG.warn("Volume {} detected as being unhealthy", volume); + markFailed(); + break; + default: + LOG.error("Unexpected health check result {} for volume {}", + result, volume); + markHealthy(); + break; + } + cleanup(); + } + + @Override + public void onFailure(@Nonnull Throwable t) { + Throwable exception = (t instanceof ExecutionException) ? + t.getCause() : t; + LOG.warn("Exception running disk checks against volume " + + volume, exception); + markFailed(); + cleanup(); + } + + private void markHealthy() { + synchronized (HddsVolumeChecker.this) { + healthyVolumes.add(volume); + } + } + + private void markFailed() { + synchronized (HddsVolumeChecker.this) { + failedVolumes.add(volume); + } + } + + private void cleanup() { + invokeCallback(); + } + + private void invokeCallback() { + try { + final long remaining = volumeCounter.decrementAndGet(); + if (callback != null && remaining == 0) { + callback.call(healthyVolumes, failedVolumes); + } + } catch(Exception e) { + // Propagating this exception is unlikely to be helpful. + LOG.warn("Unexpected exception", e); + } + } + } + + /** + * Shutdown the checker and its associated ExecutorService. + * + * See {@link ExecutorService#awaitTermination} for the interpretation + * of the parameters. + */ + void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) { + try { + delegateChecker.shutdownAndWait(gracePeriod, timeUnit); + } catch (InterruptedException e) { + LOG.warn("{} interrupted during shutdown.", this.getClass().getSimpleName()); + Thread.currentThread().interrupt(); + } + } + + /** + * This method is for testing only. + * + * @param testDelegate + */ + @VisibleForTesting + void setDelegateChecker( + AsyncChecker<Boolean, VolumeCheckResult> testDelegate) { + delegateChecker = testDelegate; + } + + /** + * Return the number of {@link #checkVolume} invocations. + */ + public long getNumVolumeChecks() { + return numVolumeChecks.get(); + } + + /** + * Return the number of {@link #checkAllVolumes} invocations. + */ + public long getNumAllVolumeChecks() { + return numAllVolumeChecks.get(); + } + + /** + * Return the number of checks skipped because the minimum gap since the + * last check had not elapsed. + */ + public long getNumSkippedChecks() { + return numSkippedChecks.get(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java new file mode 100644 index 0000000..3be24e4 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/ThrottledAsyncChecker.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.volume; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker; +import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; +import org.apache.hadoop.util.Timer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * An implementation of {@link AsyncChecker} that skips checking recently + * checked objects. It will enforce at least minMsBetweenChecks + * milliseconds between two successive checks of any one object. + * + * It is assumed that the total number of Checkable objects in the system + * is small, (not more than a few dozen) since the checker uses O(Checkables) + * storage and also potentially O(Checkables) threads. + * + * minMsBetweenChecks should be configured reasonably + * by the caller to avoid spinning up too many threads frequently. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> { + public static final Logger LOG = + LoggerFactory.getLogger(ThrottledAsyncChecker.class); + + private final Timer timer; + + /** + * The ExecutorService used to schedule asynchronous checks. + */ + private final ListeningExecutorService executorService; + private final ScheduledExecutorService scheduledExecutorService; + + /** + * The minimum gap in milliseconds between two successive checks + * of the same object. This is the throttle. + */ + private final long minMsBetweenChecks; + private final long diskCheckTimeout; + + /** + * Map of checks that are currently in progress. Protected by the object + * lock. + */ + private final Map<Checkable, ListenableFuture<V>> checksInProgress; + + /** + * Maps Checkable objects to a future that can be used to retrieve + * the results of the operation. + * Protected by the object lock. + */ + private final Map<Checkable, ThrottledAsyncChecker.LastCheckResult<V>> completedChecks; + + public ThrottledAsyncChecker(final Timer timer, + final long minMsBetweenChecks, + final long diskCheckTimeout, + final ExecutorService executorService) { + this.timer = timer; + this.minMsBetweenChecks = minMsBetweenChecks; + this.diskCheckTimeout = diskCheckTimeout; + this.executorService = MoreExecutors.listeningDecorator(executorService); + this.checksInProgress = new HashMap<>(); + this.completedChecks = new WeakHashMap<>(); + + if (this.diskCheckTimeout > 0) { + ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new + ScheduledThreadPoolExecutor(1); + this.scheduledExecutorService = MoreExecutors + .getExitingScheduledExecutorService(scheduledThreadPoolExecutor); + } else { + this.scheduledExecutorService = null; + } + } + + /** + * See {@link AsyncChecker#schedule} + * + * If the object has been checked recently then the check will + * be skipped. Multiple concurrent checks for the same object + * will receive the same Future. + */ + @Override + public Optional<ListenableFuture<V>> schedule( + Checkable<K, V> target, K context) { + if (checksInProgress.containsKey(target)) { + return Optional.absent(); + } + + if (completedChecks.containsKey(target)) { + final ThrottledAsyncChecker.LastCheckResult<V> result = completedChecks.get(target); + final long msSinceLastCheck = timer.monotonicNow() - result.completedAt; + if (msSinceLastCheck < minMsBetweenChecks) { + LOG.debug("Skipped checking {}. Time since last check {}ms " + + "is less than the min gap {}ms.", + target, msSinceLastCheck, minMsBetweenChecks); + return Optional.absent(); + } + } + + LOG.info("Scheduling a check for {}", target); + final ListenableFuture<V> lfWithoutTimeout = executorService.submit( + () -> target.check(context)); + final ListenableFuture<V> lf; + + if (diskCheckTimeout > 0) { + lf = TimeoutFuture + .create(lfWithoutTimeout, diskCheckTimeout, TimeUnit.MILLISECONDS, + scheduledExecutorService); + } else { + lf = lfWithoutTimeout; + } + + checksInProgress.put(target, lf); + addResultCachingCallback(target, lf); + return Optional.of(lf); + } + + /** + * Register a callback to cache the result of a check. + * @param target + * @param lf + */ + private void addResultCachingCallback( + Checkable<K, V> target, ListenableFuture<V> lf) { + Futures.addCallback(lf, new FutureCallback<V>() { + @Override + public void onSuccess(@Nullable V result) { + synchronized (ThrottledAsyncChecker.this) { + checksInProgress.remove(target); + completedChecks.put(target, new LastCheckResult<>( + result, timer.monotonicNow())); + } + } + + @Override + public void onFailure(@Nonnull Throwable t) { + synchronized (ThrottledAsyncChecker.this) { + checksInProgress.remove(target); + completedChecks.put(target, new LastCheckResult<>( + t, timer.monotonicNow())); + } + } + }); + } + + /** + * {@inheritDoc}. + * + * The results of in-progress checks are not useful during shutdown, + * so we optimize for faster shutdown by interrupt all actively + * executing checks. + */ + @Override + public void shutdownAndWait(long timeout, TimeUnit timeUnit) + throws InterruptedException { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService.awaitTermination(timeout, timeUnit); + } + + executorService.shutdownNow(); + executorService.awaitTermination(timeout, timeUnit); + } + + /** + * Status of running a check. It can either be a result or an + * exception, depending on whether the check completed or threw. + */ + private static final class LastCheckResult<V> { + /** + * Timestamp at which the check completed. + */ + private final long completedAt; + + /** + * Result of running the check if it completed. null if it threw. + */ + @Nullable + private final V result; + + /** + * Exception thrown by the check. null if it returned a result. + */ + private final Throwable exception; // null on success. + + /** + * Initialize with a result. + * @param result + */ + private LastCheckResult(V result, long completedAt) { + this.result = result; + this.exception = null; + this.completedAt = completedAt; + } + + /** + * Initialize with an exception. + * @param completedAt + * @param t + */ + private LastCheckResult(Throwable t, long completedAt) { + this.result = null; + this.exception = t; + this.completedAt = completedAt; + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java new file mode 100644 index 0000000..a7a492a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/TimeoutFuture.java @@ -0,0 +1,161 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Some portions of this class have been modified to make it functional in this + * package. + */ +package org.apache.hadoop.ozone.container.common.volume; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Implementation of {@code Futures#withTimeout}. + * <p> + * <p>Future that delegates to another but will finish early (via a + * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the + * specified duration expires. The delegate future is interrupted and + * cancelled if it times out. + */ +final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> { + public static final Logger LOG = LoggerFactory.getLogger( + TimeoutFuture.class); + + static <V> ListenableFuture<V> create( + ListenableFuture<V> delegate, + long time, + TimeUnit unit, + ScheduledExecutorService scheduledExecutor) { + TimeoutFuture<V> result = new TimeoutFuture<V>(delegate); + TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result); + result.timer = scheduledExecutor.schedule(fire, time, unit); + delegate.addListener(fire, directExecutor()); + return result; + } + + /* + * Memory visibility of these fields. There are two cases to consider. + * + * 1. visibility of the writes to these fields to Fire.run: + * + * The initial write to delegateRef is made definitely visible via the + * semantics of addListener/SES.schedule. The later racy write in cancel() + * is not guaranteed to be observed, however that is fine since the + * correctness is based on the atomic state in our base class. The initial + * write to timer is never definitely visible to Fire.run since it is + * assigned after SES.schedule is called. Therefore Fire.run has to check + * for null. However, it should be visible if Fire.run is called by + * delegate.addListener since addListener is called after the assignment + * to timer, and importantly this is the main situation in which we need to + * be able to see the write. + * + * 2. visibility of the writes to an afterDone() call triggered by cancel(): + * + * Since these fields are non-final that means that TimeoutFuture is not + * being 'safely published', thus a motivated caller may be able to expose + * the reference to another thread that would then call cancel() and be + * unable to cancel the delegate. There are a number of ways to solve this, + * none of which are very pretty, and it is currently believed to be a + * purely theoretical problem (since the other actions should supply + * sufficient write-barriers). + */ + + @Nullable private ListenableFuture<V> delegateRef; + @Nullable private Future<?> timer; + + private TimeoutFuture(ListenableFuture<V> delegate) { + this.delegateRef = Preconditions.checkNotNull(delegate); + } + + /** + * A runnable that is called when the delegate or the timer completes. + */ + private static final class Fire<V> implements Runnable { + @Nullable + TimeoutFuture<V> timeoutFutureRef; + + Fire( + TimeoutFuture<V> timeoutFuture) { + this.timeoutFutureRef = timeoutFuture; + } + + @Override + public void run() { + // If either of these reads return null then we must be after a + // successful cancel or another call to this method. + TimeoutFuture<V> timeoutFuture = timeoutFutureRef; + if (timeoutFuture == null) { + return; + } + ListenableFuture<V> delegate = timeoutFuture.delegateRef; + if (delegate == null) { + return; + } + + /* + * If we're about to complete the TimeoutFuture, we want to release our + * reference to it. Otherwise, we'll pin it (and its result) in memory + * until the timeout task is GCed. (The need to clear our reference to + * the TimeoutFuture is the reason we use a *static* nested class with + * a manual reference back to the "containing" class.) + * + * This has the nice-ish side effect of limiting reentrancy: run() calls + * timeoutFuture.setException() calls run(). That reentrancy would + * already be harmless, since timeoutFuture can be set (and delegate + * cancelled) only once. (And "set only once" is important for other + * reasons: run() can still be invoked concurrently in different threads, + * even with the above null checks.) + */ + timeoutFutureRef = null; + if (delegate.isDone()) { + timeoutFuture.setFuture(delegate); + } else { + try { + timeoutFuture.setException( + new TimeoutException("Future timed out: " + delegate)); + } finally { + delegate.cancel(true); + } + } + } + } + + @Override + protected void afterDone() { + maybePropagateCancellation(delegateRef); + + Future<?> localTimer = timer; + // Try to cancel the timer as an optimization. + // timer may be null if this call to run was by the timer task since there + // is no happens-before edge between the assignment to timer and an + // execution of the timer task. + if (localTimer != null) { + localTimer.cancel(false); + } + + delegateRef = null; + timer = null; + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java index d30dd89..7addd63 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java @@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.volume; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.curator.shaded.com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -37,8 +38,10 @@ import org.apache.hadoop.ozone.common.InconsistentStorageStateException; import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,11 +51,18 @@ import java.util.Collection; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * VolumeSet to manage volumes in a DataNode. + * VolumeSet to manage HDDS volumes in a DataNode. */ public class VolumeSet { @@ -79,6 +89,14 @@ public class VolumeSet { private EnumMap<StorageType, List<HddsVolume>> volumeStateMap; /** + * An executor for periodic disk checks. + */ + final ScheduledExecutorService diskCheckerservice; + final ScheduledFuture<?> periodicDiskChecker; + + private static final long DISK_CHECK_INTERVAL_MINUTES = 15; + + /** * A Reentrant Read Write Lock to synchronize volume operations in VolumeSet. * Any update to {@link VolumeSet#volumeMap}, * {@link VolumeSet#failedVolumeMap}, or {@link VolumeSet#volumeStateMap} @@ -90,6 +108,7 @@ public class VolumeSet { private String clusterID; private Runnable shutdownHook; + private final HddsVolumeChecker volumeChecker; public VolumeSet(String dnUuid, Configuration conf) throws IOException { @@ -102,11 +121,30 @@ public class VolumeSet { this.clusterID = clusterID; this.conf = conf; this.volumeSetRWLock = new ReentrantReadWriteLock(); - + this.volumeChecker = getVolumeChecker(conf); + this.diskCheckerservice = Executors.newScheduledThreadPool( + 1, r -> new Thread(r, "Periodic HDDS volume checker")); + this.periodicDiskChecker = + diskCheckerservice.scheduleWithFixedDelay(() -> { + try { + checkAllVolumes(); + } catch (IOException e) { + LOG.warn("Exception while checking disks", e); + } + }, DISK_CHECK_INTERVAL_MINUTES, DISK_CHECK_INTERVAL_MINUTES, + TimeUnit.MINUTES); initializeVolumeSet(); } - // Add DN volumes configured through ConfigKeys to volumeMap. + @VisibleForTesting + HddsVolumeChecker getVolumeChecker(Configuration conf) + throws DiskChecker.DiskErrorException { + return new HddsVolumeChecker(conf, new Timer()); + } + + /** + * Add DN volumes configured through ConfigKeys to volumeMap. + */ private void initializeVolumeSet() throws IOException { volumeMap = new ConcurrentHashMap<>(); failedVolumeMap = new ConcurrentHashMap<>(); @@ -123,7 +161,7 @@ public class VolumeSet { } for (StorageType storageType : StorageType.values()) { - volumeStateMap.put(storageType, new ArrayList<HddsVolume>()); + volumeStateMap.put(storageType, new ArrayList<>()); } for (String locationString : rawLocations) { @@ -139,6 +177,12 @@ public class VolumeSet { volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume); LOG.info("Added Volume : {} to VolumeSet", hddsVolume.getHddsRootDir().getPath()); + + if (!hddsVolume.getHddsRootDir().mkdirs() && + !hddsVolume.getHddsRootDir().exists()) { + throw new IOException("Failed to create HDDS storage dir " + + hddsVolume.getHddsRootDir()); + } } catch (IOException e) { HddsVolume volume = new HddsVolume.Builder(locationString) .failedVolume(true).build(); @@ -147,8 +191,10 @@ public class VolumeSet { } } + checkAllVolumes(); + if (volumeMap.size() == 0) { - throw new DiskOutOfSpaceException("No storage location configured"); + throw new DiskOutOfSpaceException("No storage locations configured"); } // Ensure volume threads are stopped and scm df is saved during shutdown. @@ -160,6 +206,52 @@ public class VolumeSet { } /** + * Run a synchronous parallel check of all HDDS volumes, removing + * failed volumes. + */ + private void checkAllVolumes() throws IOException { + List<HddsVolume> allVolumes = getVolumesList(); + Set<HddsVolume> failedVolumes; + try { + failedVolumes = volumeChecker.checkAllVolumes(allVolumes); + } catch (InterruptedException e) { + throw new IOException("Interrupted while running disk check", e); + } + + if (failedVolumes.size() > 0) { + LOG.warn("checkAllVolumes got {} failed volumes - {}", + failedVolumes.size(), failedVolumes); + handleVolumeFailures(failedVolumes); + } else { + LOG.debug("checkAllVolumes encountered no failures"); + } + } + + /** + * Handle one or more failed volumes. + * @param failedVolumes + */ + private void handleVolumeFailures(Set<HddsVolume> failedVolumes) { + for (HddsVolume v: failedVolumes) { + this.writeLock(); + try { + // Immediately mark the volume as failed so it is unavailable + // for new containers. + volumeMap.remove(v.getHddsRootDir().getPath()); + failedVolumeMap.putIfAbsent(v.getHddsRootDir().getPath(), v); + } finally { + this.writeUnlock(); + } + + // TODO: + // 1. Mark all closed containers on the volume as unhealthy. + // 2. Consider stopping IO on open containers and tearing down + // active pipelines. + // 3. Handle Ratis log disk failure. + } + } + + /** * If Version file exists and the {@link VolumeSet#clusterID} is not set yet, * assign it the value from Version file. Otherwise, check that the given * id matches with the id from version file. @@ -225,12 +317,12 @@ public class VolumeSet { // Add a volume to VolumeSet - public boolean addVolume(String dataDir) { + boolean addVolume(String dataDir) { return addVolume(dataDir, StorageType.DEFAULT); } // Add a volume to VolumeSet - public boolean addVolume(String volumeRoot, StorageType storageType) { + private boolean addVolume(String volumeRoot, StorageType storageType) { String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot); boolean success; @@ -330,16 +422,22 @@ public class VolumeSet { } /** - * Shutdown's the volumeset, if saveVolumeSetUsed is false, call's - * {@link VolumeSet#saveVolumeSetUsed}. + * Shutdown the volumeset. */ public void shutdown() { saveVolumeSetUsed(); + stopDiskChecker(); if (shutdownHook != null) { ShutdownHookManager.get().removeShutdownHook(shutdownHook); } } + private void stopDiskChecker() { + periodicDiskChecker.cancel(true); + volumeChecker.shutdownAndWait(0, TimeUnit.SECONDS); + diskCheckerservice.shutdownNow(); + } + @VisibleForTesting public List<HddsVolume> getVolumesList() { return ImmutableList.copyOf(volumeMap.values()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 80ce13d..92d76ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -160,6 +160,7 @@ public class OzoneContainer { writeChannel.stop(); readChannel.stop(); hddsDispatcher.shutdown(); + volumeSet.shutdown(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java new file mode 100644 index 0000000..f2a0c25 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolumeChecker.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.volume; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker; +import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.*; + + +/** + * Tests for {@link HddsVolumeChecker}. + */ +@RunWith(Parameterized.class) +public class TestHddsVolumeChecker { + public static final Logger LOG = LoggerFactory.getLogger( + TestHddsVolumeChecker.class); + + @Rule + public TestName testName = new TestName(); + + @Rule + public Timeout globalTimeout = new Timeout(30_000); + + /** + * Run each test case for each possible value of {@link VolumeCheckResult}. + * Including "null" for 'throw exception'. + * @return + */ + @Parameters(name="{0}") + public static Collection<Object[]> data() { + List<Object[]> values = new ArrayList<>(); + for (VolumeCheckResult result : VolumeCheckResult.values()) { + values.add(new Object[] {result}); + } + values.add(new Object[] {null}); + return values; + } + + /** + * When null, the check call should throw an exception. + */ + private final VolumeCheckResult expectedVolumeHealth; + private static final int NUM_VOLUMES = 2; + + + public TestHddsVolumeChecker(VolumeCheckResult expectedVolumeHealth) { + this.expectedVolumeHealth = expectedVolumeHealth; + } + + /** + * Test {@link HddsVolumeChecker#checkVolume} propagates the + * check to the delegate checker. + * + * @throws Exception + */ + @Test + public void testCheckOneVolume() throws Exception { + LOG.info("Executing {}", testName.getMethodName()); + final HddsVolume volume = makeVolumes(1, expectedVolumeHealth).get(0); + final HddsVolumeChecker checker = + new HddsVolumeChecker(new HdfsConfiguration(), new FakeTimer()); + checker.setDelegateChecker(new DummyChecker()); + final AtomicLong numCallbackInvocations = new AtomicLong(0); + + /** + * Request a check and ensure it triggered {@link HddsVolume#check}. + */ + boolean result = + checker.checkVolume(volume, (healthyVolumes, failedVolumes) -> { + numCallbackInvocations.incrementAndGet(); + if (expectedVolumeHealth != null && + expectedVolumeHealth != FAILED) { + assertThat(healthyVolumes.size(), is(1)); + assertThat(failedVolumes.size(), is(0)); + } else { + assertThat(healthyVolumes.size(), is(0)); + assertThat(failedVolumes.size(), is(1)); + } + }); + + GenericTestUtils.waitFor(() -> numCallbackInvocations.get() > 0, 5, 10000); + + // Ensure that the check was invoked at least once. + verify(volume, times(1)).check(anyObject()); + if (result) { + assertThat(numCallbackInvocations.get(), is(1L)); + } + } + + /** + * Test {@link HddsVolumeChecker#checkAllVolumes} propagates + * checks for all volumes to the delegate checker. + * + * @throws Exception + */ + @Test + public void testCheckAllVolumes() throws Exception { + LOG.info("Executing {}", testName.getMethodName()); + + final List<HddsVolume> volumes = makeVolumes( + NUM_VOLUMES, expectedVolumeHealth); + final HddsVolumeChecker checker = + new HddsVolumeChecker(new HdfsConfiguration(), new FakeTimer()); + checker.setDelegateChecker(new DummyChecker()); + + Set<HddsVolume> failedVolumes = checker.checkAllVolumes(volumes); + LOG.info("Got back {} failed volumes", failedVolumes.size()); + + if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) { + assertThat(failedVolumes.size(), is(NUM_VOLUMES)); + } else { + assertTrue(failedVolumes.isEmpty()); + } + + // Ensure each volume's check() method was called exactly once. + for (HddsVolume volume : volumes) { + verify(volume, times(1)).check(anyObject()); + } + } + + /** + * A checker to wraps the result of {@link HddsVolume#check} in + * an ImmediateFuture. + */ + static class DummyChecker + implements AsyncChecker<Boolean, VolumeCheckResult> { + + @Override + public Optional<ListenableFuture<VolumeCheckResult>> schedule( + Checkable<Boolean, VolumeCheckResult> target, + Boolean context) { + try { + LOG.info("Returning success for volume check"); + return Optional.of( + Futures.immediateFuture(target.check(context))); + } catch (Exception e) { + LOG.info("check routine threw exception " + e); + return Optional.of(Futures.immediateFailedFuture(e)); + } + } + + @Override + public void shutdownAndWait(long timeout, TimeUnit timeUnit) + throws InterruptedException { + // Nothing to cancel. + } + } + + static List<HddsVolume> makeVolumes( + int numVolumes, VolumeCheckResult health) throws Exception { + final List<HddsVolume> volumes = new ArrayList<>(numVolumes); + for (int i = 0; i < numVolumes; ++i) { + final HddsVolume volume = mock(HddsVolume.class); + + if (health != null) { + when(volume.check(any(Boolean.class))).thenReturn(health); + when(volume.check(isNull())).thenReturn(health); + } else { + final DiskErrorException de = new DiskErrorException("Fake Exception"); + when(volume.check(any(Boolean.class))).thenThrow(de); + when(volume.check(isNull())).thenThrow(de); + } + volumes.add(volume); + } + return volumes; + } +} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java new file mode 100644 index 0000000..687a12d --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.volume; + +import com.google.common.collect.Iterables; +import org.apache.commons.io.FileUtils; +import org.apache.curator.shaded.com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker; +import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.Timer; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + + +/** + * Verify that {@link VolumeSet} correctly checks for failed disks + * during initialization. + */ +public class TestVolumeSetDiskChecks { + public static final Logger LOG = LoggerFactory.getLogger( + TestVolumeSetDiskChecks.class); + + @Rule + public Timeout globalTimeout = new Timeout(30_000); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + Configuration conf = null; + + /** + * Cleanup volume directories. + */ + @After + public void cleanup() { + final Collection<String> dirs = conf.getTrimmedStringCollection( + DFS_DATANODE_DATA_DIR_KEY); + + for (String d: dirs) { + FileUtils.deleteQuietly(new File(d)); + } + } + + /** + * Verify that VolumeSet creates volume root directories at startup. + * @throws IOException + */ + @Test + public void testOzoneDirsAreCreated() throws IOException { + final int numVolumes = 2; + + conf = getConfWithDataNodeDirs(numVolumes); + final VolumeSet volumeSet = + new VolumeSet(UUID.randomUUID().toString(), conf); + + assertThat(volumeSet.getVolumesList().size(), is(numVolumes)); + assertThat(volumeSet.getFailedVolumesList().size(), is(0)); + + // Verify that the Ozone dirs were created during initialization. + Collection<String> dirs = conf.getTrimmedStringCollection( + DFS_DATANODE_DATA_DIR_KEY); + for (String d : dirs) { + assertTrue(new File(d).isDirectory()); + } + } + + /** + * Verify that bad volumes are filtered at startup. + * @throws IOException + */ + @Test + public void testBadDirectoryDetection() throws IOException { + final int numVolumes = 5; + final int numBadVolumes = 2; + + conf = getConfWithDataNodeDirs(numVolumes); + final VolumeSet volumeSet = new VolumeSet( + UUID.randomUUID().toString(), conf) { + @Override + HddsVolumeChecker getVolumeChecker(Configuration conf) + throws DiskErrorException { + return new DummyChecker(conf, new Timer(), numBadVolumes); + } + }; + + assertThat(volumeSet.getFailedVolumesList().size(), is(numBadVolumes)); + assertThat(volumeSet.getVolumesList().size(), is(numVolumes - numBadVolumes)); + } + + /** + * Verify that initialization fails if all volumes are bad. + */ + @Test + public void testAllVolumesAreBad() throws IOException { + final int numVolumes = 5; + + conf = getConfWithDataNodeDirs(numVolumes); + thrown.expect(IOException.class); + final VolumeSet volumeSet = new VolumeSet( + UUID.randomUUID().toString(), conf) { + @Override + HddsVolumeChecker getVolumeChecker(Configuration conf) + throws DiskErrorException { + return new DummyChecker(conf, new Timer(), numVolumes); + } + }; + } + + /** + * Update configuration with the specified number of Datanode + * storage directories. + * @param conf + * @param numDirs + */ + private Configuration getConfWithDataNodeDirs(int numDirs) { + final Configuration conf = new OzoneConfiguration(); + final List<String> dirs = new ArrayList<>(); + for (int i = 0; i < numDirs; ++i) { + dirs.add(GenericTestUtils.getRandomizedTestDir().getPath()); + } + conf.set(DFS_DATANODE_DATA_DIR_KEY, String.join(",", dirs)); + return conf; + } + + /** + * A no-op checker that fails the given number of volumes and succeeds + * the rest. + */ + static class DummyChecker extends HddsVolumeChecker { + private final int numBadVolumes; + + public DummyChecker(Configuration conf, Timer timer, int numBadVolumes) + throws DiskErrorException { + super(conf, timer); + this.numBadVolumes = numBadVolumes; + } + + @Override + public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes) + throws InterruptedException { + // Return the first 'numBadVolumes' as failed. + return ImmutableSet.copyOf(Iterables.limit(volumes, numBadVolumes)); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org