This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3119b9  HDFS-14648. Implement DeadNodeDetector basic model. 
Contributed by Lisheng Sun.
b3119b9 is described below

commit b3119b9ab60a19d624db476c4e1c53410870c7a6
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Sat Nov 16 11:32:41 2019 +0800

    HDFS-14648. Implement DeadNodeDetector basic model. Contributed by Lisheng 
Sun.
---
 .../java/org/apache/hadoop/hdfs/ClientContext.java |  49 ++++++
 .../java/org/apache/hadoop/hdfs/DFSClient.java     |  98 +++++++++++
 .../org/apache/hadoop/hdfs/DFSInputStream.java     |  98 +++++++----
 .../apache/hadoop/hdfs/DFSStripedInputStream.java  |   6 +-
 .../org/apache/hadoop/hdfs/DeadNodeDetector.java   | 185 +++++++++++++++++++++
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   4 +
 .../hadoop/hdfs/client/impl/DfsClientConf.java     |  31 +++-
 .../src/main/resources/hdfs-default.xml            |   9 +
 .../apache/hadoop/hdfs/TestDeadNodeDetection.java  | 183 ++++++++++++++++++++
 9 files changed, 617 insertions(+), 46 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index ad1b359..abb039c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -118,6 +119,19 @@ public class ClientContext {
   private NodeBase clientNode;
   private boolean topologyResolutionEnabled;
 
+  private Daemon deadNodeDetectorThr = null;
+
+  /**
+   * The switch to DeadNodeDetector.
+   */
+  private boolean deadNodeDetectionEnabled = false;
+
+  /**
+   * Detect the dead datanodes in advance, and share this information among all
+   * the DFSInputStreams in the same client.
+   */
+  private DeadNodeDetector deadNodeDetector = null;
+
   private ClientContext(String name, DfsClientConf conf,
       Configuration config) {
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
@@ -134,6 +148,12 @@ public class ClientContext {
 
     this.byteArrayManager = ByteArrayManager.newInstance(
         conf.getWriteByteArrayManagerConf());
+    this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
+    if (deadNodeDetectionEnabled && deadNodeDetector == null) {
+      deadNodeDetector = new DeadNodeDetector(name);
+      deadNodeDetectorThr = new Daemon(deadNodeDetector);
+      deadNodeDetectorThr.start();
+    }
     initTopologyResolution(config);
   }
 
@@ -251,4 +271,33 @@ public class ClientContext {
         datanodeInfo.getNetworkLocation());
     return NetworkTopology.getDistanceByPath(clientNode, node);
   }
+
+  /**
+   * The switch to DeadNodeDetector. If true, DeadNodeDetector is available.
+   */
+  public boolean isDeadNodeDetectionEnabled() {
+    return deadNodeDetectionEnabled;
+  }
+
+  /**
+   * Obtain DeadNodeDetector of the current client.
+   */
+  public DeadNodeDetector getDeadNodeDetector() {
+    return deadNodeDetector;
+  }
+
+  /**
+   * Close dead node detector thread.
+   */
+  public void stopDeadNodeDetectorThread() {
+    if (deadNodeDetectorThr != null) {
+      deadNodeDetectorThr.interrupt();
+      try {
+        deadNodeDetectorThr.join(3000);
+      } catch (InterruptedException e) {
+        LOG.warn("Encountered exception while waiting to join on dead " +
+            "node detector thread.", e);
+      }
+    }
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 56280f3..c19aa96 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -44,6 +44,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -631,6 +633,8 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
       // lease renewal stops when all files are closed
       closeAllFilesBeingWritten(false);
       clientRunning = false;
+      // close dead node detector thread
+      clientContext.stopDeadNodeDetectorThread();
       // close connections to the namenode
       closeConnectionToNamenode();
     }
@@ -3226,4 +3230,98 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
       throws IOException {
     return namenode.getHAServiceState();
   }
+
+  /**
+   * If deadNodeDetectionEnabled is true, return the dead nodes that detected 
by
+   * all the DFSInputStreams in the same client. Otherwise return the dead 
nodes
+   * that detected by given DFSInputStream.
+   */
+  public ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getDeadNodes(
+      DFSInputStream dfsInputStream) {
+    if (clientContext.isDeadNodeDetectionEnabled()) {
+      ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
+          new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
+      if (dfsInputStream != null) {
+        deadNodes.putAll(dfsInputStream.getLocalDeadNodes());
+      }
+
+      Set<DatanodeInfo> detectDeadNodes =
+          clientContext.getDeadNodeDetector().clearAndGetDetectedDeadNodes();
+      for (DatanodeInfo detectDeadNode : detectDeadNodes) {
+        deadNodes.put(detectDeadNode, detectDeadNode);
+      }
+      return deadNodes;
+    } else {
+      return dfsInputStream.getLocalDeadNodes();
+    }
+  }
+
+  /**
+   * If deadNodeDetectionEnabled is true, judgement based on whether this
+   * datanode is included or not in DeadNodeDetector. Otherwise judgment based
+   * given DFSInputStream.
+   */
+  public boolean isDeadNode(DFSInputStream dfsInputStream,
+      DatanodeInfo datanodeInfo) {
+    if (isDeadNodeDetectionEnabled()) {
+      boolean isDeadNode =
+          clientContext.getDeadNodeDetector().isDeadNode(datanodeInfo);
+      if (dfsInputStream != null) {
+        isDeadNode = isDeadNode
+            || dfsInputStream.getLocalDeadNodes().contains(datanodeInfo);
+      }
+      return isDeadNode;
+    } else {
+      return dfsInputStream.getLocalDeadNodes().contains(datanodeInfo);
+    }
+  }
+
+  /**
+   * Add given datanode in DeadNodeDetector.
+   */
+  public void addNodeToDeadNodeDetector(DFSInputStream dfsInputStream,
+      DatanodeInfo datanodeInfo) {
+    if (!isDeadNodeDetectionEnabled()) {
+      LOG.debug("DeadNode detection is not enabled, skip to add node {}.",
+          datanodeInfo);
+      return;
+    }
+    clientContext.getDeadNodeDetector().addNodeToDetect(dfsInputStream,
+        datanodeInfo);
+  }
+
+  /**
+   * Remove given datanode from DeadNodeDetector.
+   */
+  public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream,
+      DatanodeInfo datanodeInfo) {
+    if (!isDeadNodeDetectionEnabled()) {
+      LOG.debug("DeadNode detection is not enabled, skip to remove node {}.",
+          datanodeInfo);
+      return;
+    }
+    clientContext.getDeadNodeDetector()
+        .removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo);
+  }
+
+  /**
+   * Remove datanodes that given block placed on from DeadNodeDetector.
+   */
+  public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream,
+      LocatedBlocks locatedBlocks) {
+    if (!isDeadNodeDetectionEnabled() || locatedBlocks == null) {
+      LOG.debug("DeadNode detection is not enabled or given block {} " +
+          "is null, skip to remove node.", locatedBlocks);
+      return;
+    }
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      for (DatanodeInfo datanodeInfo : locatedBlock.getLocations()) {
+        removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo);
+      }
+    }
+  }
+
+  private boolean isDeadNodeDetectionEnabled() {
+    return clientContext.isDeadNodeDetectionEnabled();
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 757924d..7323797 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -171,10 +171,26 @@ public class DFSInputStream extends FSInputStream
 
   private byte[] oneByteBuf; // used for 'int read()'
 
-  void addToDeadNodes(DatanodeInfo dnInfo) {
+  protected void addToLocalDeadNodes(DatanodeInfo dnInfo) {
     deadNodes.put(dnInfo, dnInfo);
   }
 
+  protected void removeFromLocalDeadNodes(DatanodeInfo dnInfo) {
+    deadNodes.remove(dnInfo);
+  }
+
+  protected ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getLocalDeadNodes() {
+    return deadNodes;
+  }
+
+  private void clearLocalDeadNodes() {
+    deadNodes.clear();
+  }
+
+  protected DFSClient getDFSClient() {
+    return dfsClient;
+  }
+
   DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
       LocatedBlocks locatedBlocks) throws IOException {
     this.dfsClient = dfsClient;
@@ -612,7 +628,8 @@ public class DFSInputStream extends FSInputStream
                   + "{}, add to deadNodes and continue. ", targetAddr, src,
               targetBlock.getBlock(), ex);
           // Put chosen node into dead list, continue
-          addToDeadNodes(chosenNode);
+          addToLocalDeadNodes(chosenNode);
+          dfsClient.addNodeToDeadNodeDetector(this, chosenNode);
         }
       }
     }
@@ -663,28 +680,40 @@ public class DFSInputStream extends FSInputStream
    */
   @Override
   public synchronized void close() throws IOException {
-    if (!closed.compareAndSet(false, true)) {
-      DFSClient.LOG.debug("DFSInputStream has been closed already");
-      return;
-    }
-    dfsClient.checkOpen();
-
-    if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
-      final StringBuilder builder = new StringBuilder();
-      extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, 
Object>() {
-        private String prefix = "";
-        @Override
-        public void accept(ByteBuffer k, Object v) {
-          builder.append(prefix).append(k);
-          prefix = ", ";
-        }
-      });
-      DFSClient.LOG.warn("closing file " + src + ", but there are still " +
-          "unreleased ByteBuffers allocated by read().  " +
-          "Please release " + builder.toString() + ".");
+    try {
+      if (!closed.compareAndSet(false, true)) {
+        DFSClient.LOG.debug("DFSInputStream has been closed already");
+        return;
+      }
+      dfsClient.checkOpen();
+
+      if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
+        final StringBuilder builder = new StringBuilder();
+        extendedReadBuffers
+            .visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
+              private String prefix = "";
+
+              @Override
+              public void accept(ByteBuffer k, Object v) {
+                builder.append(prefix).append(k);
+                prefix = ", ";
+              }
+            });
+        DFSClient.LOG.warn("closing file " + src + ", but there are still "
+            + "unreleased ByteBuffers allocated by read().  "
+            + "Please release " + builder.toString() + ".");
+      }
+      closeCurrentBlockReaders();
+      super.close();
+    } finally {
+      /**
+       * If dfsInputStream is closed and datanode is in
+       * DeadNodeDetector#dfsInputStreamNodes, we need remove the datanode from
+       * the DeadNodeDetector#dfsInputStreamNodes. Since user should not use
+       * this dfsInputStream anymore.
+       */
+      dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks);
     }
-    closeCurrentBlockReaders();
-    super.close();
   }
 
   @Override
@@ -741,7 +770,8 @@ public class DFSInputStream extends FSInputStream
          */
         sourceFound = seekToBlockSource(pos);
       } else {
-        addToDeadNodes(currentNode);
+        addToLocalDeadNodes(currentNode);
+        dfsClient.addNodeToDeadNodeDetector(this, currentNode);
         sourceFound = seekToNewSource(pos);
       }
       if (!sourceFound) {
@@ -801,7 +831,8 @@ public class DFSInputStream extends FSInputStream
           }
           blockEnd = -1;
           if (currentNode != null) {
-            addToDeadNodes(currentNode);
+            addToLocalDeadNodes(currentNode);
+            dfsClient.addNodeToDeadNodeDetector(this, currentNode);
           }
           if (--retries == 0) {
             throw e;
@@ -883,7 +914,7 @@ public class DFSInputStream extends FSInputStream
   private LocatedBlock refetchLocations(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
     String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
-        deadNodes, ignoredNodes);
+            dfsClient.getDeadNodes(this), ignoredNodes);
     String blockInfo = block.getBlock() + " file=" + src;
     if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
       String description = "Could not obtain block: " + blockInfo;
@@ -924,7 +955,7 @@ public class DFSInputStream extends FSInputStream
       throw new InterruptedIOException(
           "Interrupted while choosing DataNode for read.");
     }
-    deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+    clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
     openInfo(true);
     block = refreshLocatedBlock(block);
     failures++;
@@ -945,7 +976,7 @@ public class DFSInputStream extends FSInputStream
     StorageType storageType = null;
     if (nodes != null) {
       for (int i = 0; i < nodes.length; i++) {
-        if (!deadNodes.containsKey(nodes[i])
+        if (!dfsClient.getDeadNodes(this).containsKey(nodes[i])
             && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
           chosenNode = nodes[i];
           // Storage types are ordered to correspond with nodes, so use the 
same
@@ -1097,7 +1128,7 @@ public class DFSInputStream extends FSInputStream
         DFSClient.LOG.warn(msg);
         // we want to remember what we have tried
         corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info);
-        addToDeadNodes(datanode.info);
+        addToLocalDeadNodes(datanode.info);
         throw new IOException(msg);
       } catch (IOException e) {
         checkInterrupted(e);
@@ -1119,7 +1150,8 @@ public class DFSInputStream extends FSInputStream
           String msg = "Failed to connect to " + datanode.addr + " for file "
               + src + " for block " + block.getBlock() + ":" + e;
           DFSClient.LOG.warn("Connection failure: " + msg, e);
-          addToDeadNodes(datanode.info);
+          addToLocalDeadNodes(datanode.info);
+          dfsClient.addNodeToDeadNodeDetector(this, datanode.info);
           throw new IOException(msg);
         }
         // Refresh the block for updated tokens in case of token failures or
@@ -1522,14 +1554,14 @@ public class DFSInputStream extends FSInputStream
     if (currentNode == null) {
       return seekToBlockSource(targetPos);
     }
-    boolean markedDead = deadNodes.containsKey(currentNode);
-    addToDeadNodes(currentNode);
+    boolean markedDead = dfsClient.isDeadNode(this, currentNode);
+    addToLocalDeadNodes(currentNode);
     DatanodeInfo oldNode = currentNode;
     DatanodeInfo newNode = blockSeekTo(targetPos);
     if (!markedDead) {
       /* remove it from deadNodes. blockSeekTo could have cleared
        * deadNodes and added currentNode again. Thats ok. */
-      deadNodes.remove(oldNode);
+      removeFromLocalDeadNodes(oldNode);
     }
     if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
       currentNode = newNode;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index cf29791..ba35d51 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -147,10 +147,6 @@ public class DFSStripedInputStream extends DFSInputStream {
     return src;
   }
 
-  protected DFSClient getDFSClient() {
-    return dfsClient;
-  }
-
   protected LocatedBlocks getLocatedBlocks() {
     return locatedBlocks;
   }
@@ -283,7 +279,7 @@ public class DFSStripedInputStream extends DFSInputStream {
               "block" + block.getBlock(), e);
           // re-fetch the block in case the block has been moved
           fetchBlockAt(block.getStartOffset());
-          addToDeadNodes(dnInfo.info);
+          addToLocalDeadNodes(dnInfo.info);
         }
       }
       if (reader != null) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
new file mode 100644
index 0000000..1ac29a7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Detect the dead nodes in advance, and share this information among all the
+ * DFSInputStreams in the same client.
+ */
+public class DeadNodeDetector implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DeadNodeDetector.class);
+
+  /**
+   * Waiting time when DeadNodeDetector happens error.
+   */
+  private static final long ERROR_SLEEP_MS = 5000;
+
+  /**
+   * Waiting time when DeadNodeDetector's state is idle.
+   */
+  private static final long IDLE_SLEEP_MS = 10000;
+
+  /**
+   * Client context name.
+   */
+  private String name;
+
+  /**
+   * Dead nodes shared by all the DFSInputStreams of the client.
+   */
+  private final ConcurrentHashMap<String, DatanodeInfo> deadNodes;
+
+  /**
+   * Record dead nodes by one DFSInputStream. When dead node is not used by one
+   * DFSInputStream, remove it from dfsInputStreamNodes#DFSInputStream. If
+   * DFSInputStream does not include any dead node, remove DFSInputStream from
+   * dfsInputStreamNodes.
+   */
+  private final ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>
+          dfsInputStreamNodes;
+
+  /**
+   * The state of DeadNodeDetector.
+   */
+  private enum State {
+    INIT, CHECK_DEAD, IDLE, ERROR
+  }
+
+  private State state;
+
+  public DeadNodeDetector(String name) {
+    this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>();
+    this.dfsInputStreamNodes =
+        new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>();
+    this.name = name;
+
+    LOG.info("Start dead node detector for DFSClient {}.", this.name);
+    state = State.INIT;
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      clearAndGetDetectedDeadNodes();
+      LOG.debug("Current detector state {}, the detected nodes: {}.", state,
+          deadNodes.values());
+      switch (state) {
+      case INIT:
+        init();
+        break;
+      case IDLE:
+        idle();
+        break;
+      case ERROR:
+        try {
+          Thread.sleep(ERROR_SLEEP_MS);
+        } catch (InterruptedException e) {
+        }
+        return;
+      default:
+        break;
+      }
+    }
+  }
+
+  private void idle() {
+    try {
+      Thread.sleep(IDLE_SLEEP_MS);
+    } catch (InterruptedException e) {
+
+    }
+
+    state = State.IDLE;
+  }
+
+  private void init() {
+    state = State.IDLE;
+  }
+
+  private void addToDead(DatanodeInfo datanodeInfo) {
+    deadNodes.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
+  }
+
+  public boolean isDeadNode(DatanodeInfo datanodeInfo) {
+    return deadNodes.containsKey(datanodeInfo.getDatanodeUuid());
+  }
+
+  /**
+   * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered
+   * to dead node. The dead node is shared by all the DFSInputStreams in the
+   * same client.
+   */
+  public synchronized void addNodeToDetect(DFSInputStream dfsInputStream,
+      DatanodeInfo datanodeInfo) {
+    HashSet<DatanodeInfo> datanodeInfos =
+        dfsInputStreamNodes.get(dfsInputStream);
+    if (datanodeInfos == null) {
+      datanodeInfos = new HashSet<DatanodeInfo>();
+      datanodeInfos.add(datanodeInfo);
+      dfsInputStreamNodes.putIfAbsent(dfsInputStream, datanodeInfos);
+    } else {
+      datanodeInfos.add(datanodeInfo);
+    }
+
+    addToDead(datanodeInfo);
+  }
+
+  /**
+   * Remove dead node which is not used by any DFSInputStream from deadNodes.
+   * @return new dead node shared by all DFSInputStreams.
+   */
+  public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() {
+    // remove the dead nodes who doesn't have any inputstream first
+    Set<DatanodeInfo> newDeadNodes = new HashSet<DatanodeInfo>();
+    for (HashSet<DatanodeInfo> datanodeInfos : dfsInputStreamNodes.values()) {
+      newDeadNodes.addAll(datanodeInfos);
+    }
+
+    for (DatanodeInfo datanodeInfo : deadNodes.values()) {
+      if (!newDeadNodes.contains(datanodeInfo)) {
+        deadNodes.remove(datanodeInfo.getDatanodeUuid());
+      }
+    }
+    return new HashSet<>(deadNodes.values());
+  }
+
+  /**
+   * Remove dead node from dfsInputStreamNodes#dfsInputStream. If
+   * dfsInputStreamNodes#dfsInputStream does not contain any dead node, remove
+   * it from dfsInputStreamNodes.
+   */
+  public synchronized void removeNodeFromDeadNodeDetector(
+      DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) {
+    Set<DatanodeInfo> datanodeInfos = dfsInputStreamNodes.get(dfsInputStream);
+    if (datanodeInfos != null) {
+      datanodeInfos.remove(datanodeInfo);
+      if (datanodeInfos.isEmpty()) {
+        dfsInputStreamNodes.remove(dfsInputStream);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index d1ca63d..e32a3bb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -152,6 +152,10 @@ public interface HdfsClientConfigKeys {
       "dfs.client.block.reader.remote.buffer.size";
   int DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_DEFAULT = 8192;
 
+  String DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY =
+          "dfs.client.deadnode.detection.enabled";
+  boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
+
   String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
       "dfs.datanode.kerberos.principal";
   String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 04bdfe4..804375c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -26,14 +26,19 @@ import org.apache.hadoop.fs.Options.ChecksumCombineMode;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
@@ -47,6 +52,8 @@ import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACH
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
@@ -61,8 +68,6 @@ import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCK
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
-import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
-import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@@ -71,6 +76,8 @@ import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
@@ -87,11 +94,6 @@ import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * DFSClient configuration.
  */
@@ -145,6 +147,8 @@ public class DfsClientConf {
 
   private final boolean dataTransferTcpNoDelay;
 
+  private final boolean deadNodeDetectionEnabled;
+
   public DfsClientConf(Configuration conf) {
     // The hdfsTimeout is currently the same as the ipc timeout
     hdfsTimeout = Client.getRpcTimeout(conf);
@@ -262,6 +266,10 @@ public class DfsClientConf {
         HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
         HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
 
+    deadNodeDetectionEnabled =
+        conf.getBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY,
+            DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT);
+
     stripedReadThreadpoolSize = conf.getInt(
         HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY,
         HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT);
@@ -596,6 +604,13 @@ public class DfsClientConf {
   }
 
   /**
+   * @return the deadNodeDetectionEnabled
+   */
+  public boolean isDeadNodeDetectionEnabled() {
+    return deadNodeDetectionEnabled;
+  }
+
+  /**
    * @return the replicaAccessorBuilderClasses
    */
   public List<Class<? extends ReplicaAccessorBuilder>>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7effbd0..d2103fb 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2987,6 +2987,15 @@
   </description>
 </property>
 
+  <property>
+    <name>dfs.client.deadnode.detection.enabled</name>
+    <value>false</value>
+    <description>
+      Set to true to enable dead node detection in client side. Then all the 
DFSInputStreams of the same client can
+      share the dead node information.
+    </description>
+  </property>
+
 <property>
   <name>dfs.namenode.lease-recheck-interval-ms</name>
   <value>2000</value>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
new file mode 100644
index 0000000..9b997ab
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for dead node detection in DFSClient.
+ */
+public class TestDeadNodeDetection {
+
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    cluster = null;
+    conf = new HdfsConfiguration();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testDeadNodeDetectionInBackground() throws IOException {
+    conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+    // We'll be using a 512 bytes block size just for tests
+    // so making sure the checksum bytes too match it.
+    conf.setInt("io.bytes.per.checksum", 512);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDetectDeadNodeInBackground");
+
+    // 256 bytes data chunk for writes
+    byte[] bytes = new byte[256];
+    for (int index = 0; index < bytes.length; index++) {
+      bytes[index] = '0';
+    }
+
+    // File with a 512 bytes block size
+    FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
+
+    // Write a block to all 3 DNs (2x256bytes).
+    out.write(bytes);
+    out.write(bytes);
+    out.hflush();
+    out.close();
+
+    // Remove three DNs,
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+    cluster.stopDataNode(0);
+
+    FSDataInputStream in = fs.open(filePath);
+    DFSInputStream din = null;
+    DFSClient dfsClient = null;
+    try {
+      try {
+        in.read();
+      } catch (BlockMissingException e) {
+      }
+
+      din = (DFSInputStream) in.getWrappedStream();
+      dfsClient = din.getDFSClient();
+      assertEquals(3, dfsClient.getDeadNodes(din).size());
+      assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    } finally {
+      in.close();
+      fs.delete(new Path("/testDetectDeadNodeInBackground"), true);
+      // check the dead node again here, the dead node is expected be removed
+      assertEquals(0, dfsClient.getDeadNodes(din).size());
+      assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    }
+  }
+
+  @Test
+  public void testDeadNodeDetectionInMultipleDFSInputStream()
+      throws IOException {
+    conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
+    // We'll be using a 512 bytes block size just for tests
+    // so making sure the checksum bytes too match it.
+    conf.setInt("io.bytes.per.checksum", 512);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testDeadNodeMultipleDFSInputStream");
+
+    // 256 bytes data chunk for writes
+    byte[] bytes = new byte[256];
+    for (int index = 0; index < bytes.length; index++) {
+      bytes[index] = '0';
+    }
+
+    // File with a 512 bytes block size
+    FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
+
+    // Write a block to DN (2x256bytes).
+    out.write(bytes);
+    out.write(bytes);
+    out.hflush();
+    out.close();
+
+    String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid();
+    FSDataInputStream in1 = fs.open(filePath);
+    DFSInputStream din1 = (DFSInputStream) in1.getWrappedStream();
+    DFSClient dfsClient1 = din1.getDFSClient();
+    cluster.stopDataNode(0);
+
+    FSDataInputStream in2 = fs.open(filePath);
+    DFSInputStream din2 = null;
+    DFSClient dfsClient2 = null;
+    try {
+      try {
+        in1.read();
+      } catch (BlockMissingException e) {
+      }
+
+      din2 = (DFSInputStream) in1.getWrappedStream();
+      dfsClient2 = din2.getDFSClient();
+      assertEquals(1, dfsClient1.getDeadNodes(din1).size());
+      assertEquals(1, dfsClient2.getDeadNodes(din2).size());
+      assertEquals(1, dfsClient1.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+      assertEquals(1, dfsClient2.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+      // check the dn uuid of dead node to see if its expected dead node
+      assertEquals(datanodeUuid,
+          ((DatanodeInfo) dfsClient1.getClientContext().getDeadNodeDetector()
+              .clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid());
+      assertEquals(datanodeUuid,
+          ((DatanodeInfo) dfsClient2.getClientContext().getDeadNodeDetector()
+              .clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid());
+    } finally {
+      in1.close();
+      in2.close();
+      fs.delete(new Path("/testDeadNodeMultipleDFSInputStream"), true);
+      // check the dead node again here, the dead node is expected be removed
+      assertEquals(0, dfsClient1.getDeadNodes(din1).size());
+      assertEquals(0, dfsClient2.getDeadNodes(din2).size());
+      assertEquals(0, dfsClient1.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+      assertEquals(0, dfsClient2.getClientContext().getDeadNodeDetector()
+          .clearAndGetDetectedDeadNodes().size());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to