HDFS-8397. Refactor the error handling code in DataStreamer. Contributed by Tsz 
Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f378733
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f378733
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f378733

Branch: refs/heads/HDFS-7240
Commit: 8f378733423a5244461df79a92c00239514b8b93
Parents: f7e051c
Author: Jing Zhao <ji...@apache.org>
Authored: Fri May 15 16:14:54 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri May 15 16:14:54 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 543 +++++++++++--------
 2 files changed, 308 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f378733/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6c0923c..35e81f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -554,6 +554,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-6888. Allow selectively audit logging ops (Chen He via vinayakumarb)
 
+    HDFS-8397. Refactor the error handling code in DataStreamer.
+    (Tsz Wo Nicholas Sze via jing9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f378733/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index b472b8c..cecd5a0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -38,7 +38,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
@@ -46,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@@ -208,6 +208,133 @@ class DataStreamer extends Daemon {
     }
   }
 
+  static class ErrorState {
+    private boolean error = false;
+    private int badNodeIndex = -1;
+    private int restartingNodeIndex = -1;
+    private long restartingNodeDeadline = 0;
+    private final long datanodeRestartTimeout;
+
+    ErrorState(long datanodeRestartTimeout) {
+      this.datanodeRestartTimeout = datanodeRestartTimeout;
+    }
+
+    synchronized void reset() {
+      error = false;
+      badNodeIndex = -1;
+      restartingNodeIndex = -1;
+      restartingNodeDeadline = 0;
+    }
+
+    synchronized boolean hasError() {
+      return error;
+    }
+
+    synchronized boolean hasDatanodeError() {
+      return error && isNodeMarked();
+    }
+
+    synchronized void setError(boolean err) {
+      this.error = err;
+    }
+
+    synchronized void setBadNodeIndex(int index) {
+      this.badNodeIndex = index;
+    }
+
+    synchronized int getBadNodeIndex() {
+      return badNodeIndex;
+    }
+
+    synchronized int getRestartingNodeIndex() {
+      return restartingNodeIndex;
+    }
+
+    synchronized void initRestartingNode(int i, String message) {
+      restartingNodeIndex = i;
+      restartingNodeDeadline =  Time.monotonicNow() + datanodeRestartTimeout;
+      // If the data streamer has already set the primary node
+      // bad, clear it. It is likely that the write failed due to
+      // the DN shutdown. Even if it was a real failure, the pipeline
+      // recovery will take care of it.
+      badNodeIndex = -1;
+      LOG.info(message);
+    }
+
+    synchronized boolean isRestartingNode() {
+      return restartingNodeIndex >= 0;
+    }
+
+    synchronized boolean isNodeMarked() {
+      return badNodeIndex >= 0 || isRestartingNode();
+    }
+
+    /**
+     * This method is used when no explicit error report was received, but
+     * something failed. The first node is a suspect or unsure about the cause
+     * so that it is marked as failed.
+     */
+    synchronized void markFirstNodeIfNotMarked() {
+      // There should be no existing error and no ongoing restart.
+      if (!isNodeMarked()) {
+        badNodeIndex = 0;
+      }
+    }
+
+    synchronized void adjustState4RestartingNode() {
+      // Just took care of a node error while waiting for a node restart
+      if (restartingNodeIndex >= 0) {
+        // If the error came from a node further away than the restarting
+        // node, the restart must have been complete.
+        if (badNodeIndex > restartingNodeIndex) {
+          restartingNodeIndex = -1;
+        } else if (badNodeIndex < restartingNodeIndex) {
+          // the node index has shifted.
+          restartingNodeIndex--;
+        } else {
+          throw new IllegalStateException("badNodeIndex = " + badNodeIndex
+              + " = restartingNodeIndex = " + restartingNodeIndex);
+        }
+      }
+
+      if (!isRestartingNode()) {
+        error = false;
+      }
+      badNodeIndex = -1;
+    }
+
+    synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
+      if (restartingNodeIndex >= 0) {
+        if (!error) {
+          throw new IllegalStateException("error=false while checking" +
+              " restarting node deadline");
+        }
+
+        // check badNodeIndex
+        if (badNodeIndex == restartingNodeIndex) {
+          // ignore, if came from the restarting node
+          badNodeIndex = -1;
+        }
+        // not within the deadline
+        if (Time.monotonicNow() >= restartingNodeDeadline) {
+          // expired. declare the restarting node dead
+          restartingNodeDeadline = 0;
+          final int i = restartingNodeIndex;
+          restartingNodeIndex = -1;
+          LOG.warn("Datanode " + i + " did not restart within "
+              + datanodeRestartTimeout + "ms: " + nodes[i]);
+          // Mark the restarting node as failed. If there is any other failed
+          // node during the last pipeline construction attempt, it will not be
+          // overwritten/dropped. In this case, the restarting node will get
+          // excluded in the following attempt, if it still does not come up.
+          if (badNodeIndex == -1) {
+            badNodeIndex = i;
+          }
+        }
+      }
+    }
+  }
+
   private volatile boolean streamerClosed = false;
   private ExtendedBlock block; // its length is number of bytes acked
   private Token<BlockTokenIdentifier> accessToken;
@@ -217,11 +344,8 @@ class DataStreamer extends Daemon {
   private volatile DatanodeInfo[] nodes = null; // list of targets for current 
block
   private volatile StorageType[] storageTypes = null;
   private volatile String[] storageIDs = null;
-  volatile boolean hasError = false;
-  volatile int errorIndex = -1;
-  // Restarting node index
-  AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
-  private long restartDeadline = 0; // Deadline of DN restart
+  private final ErrorState errorState;
+
   private BlockConstructionStage stage;  // block construction stage
   private long bytesSent = 0; // number of bytes that've been sent
   private final boolean isLazyPersistFile;
@@ -287,11 +411,13 @@ class DataStreamer extends Daemon {
     this.cachingStrategy = cachingStrategy;
     this.byteArrayManager = byteArrayManage;
     this.isLazyPersistFile = isLazyPersist(stat);
-    this.dfsclientSlowLogThresholdMs =
-        dfsClient.getConf().getSlowIoWarningThresholdMs();
-    this.excludedNodes = initExcludedNodes();
     this.isAppend = isAppend;
     this.favoredNodes = favoredNodes;
+
+    final DfsClientConf conf = dfsClient.getConf();
+    this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
+    this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
+    this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
   }
 
   /**
@@ -334,7 +460,6 @@ class DataStreamer extends Daemon {
   void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
     // setup pipeline to append to the last block XXX retries??
     setPipeline(lastBlock);
-    errorIndex = -1;   // no errors yet.
     if (nodes.length < 1) {
       throw new IOException("Unable to retrieve blocks locations " +
           " for last block " + block +
@@ -375,6 +500,10 @@ class DataStreamer extends Daemon {
     stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
   }
 
+  private boolean shouldStop() {
+    return streamerClosed || errorState.hasError() || !dfsClient.clientRunning;
+  }
+
   /*
    * streamer thread is the only thread that opens streams to datanode,
    * and closes them. Any error recovery is also done by this thread.
@@ -385,7 +514,7 @@ class DataStreamer extends Daemon {
     TraceScope scope = NullScope.INSTANCE;
     while (!streamerClosed && dfsClient.clientRunning) {
       // if the Responder encountered an error, shutdown Responder
-      if (hasError && response != null) {
+      if (errorState.hasError() && response != null) {
         try {
           response.close();
           response.join();
@@ -398,17 +527,13 @@ class DataStreamer extends Daemon {
       DFSPacket one;
       try {
         // process datanode IO errors if any
-        boolean doSleep = false;
-        if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
-          doSleep = processDatanodeError();
-        }
+        boolean doSleep = processDatanodeError();
 
         final int halfSocketTimeout = 
dfsClient.getConf().getSocketTimeout()/2; 
         synchronized (dataQueue) {
           // wait for a packet to be sent.
           long now = Time.monotonicNow();
-          while ((!streamerClosed && !hasError && dfsClient.clientRunning
-              && dataQueue.size() == 0 &&
+          while ((!shouldStop() && dataQueue.size() == 0 &&
               (stage != BlockConstructionStage.DATA_STREAMING ||
                   stage == BlockConstructionStage.DATA_STREAMING &&
                       now - lastPacket < halfSocketTimeout)) || doSleep ) {
@@ -424,13 +549,12 @@ class DataStreamer extends Daemon {
             doSleep = false;
             now = Time.monotonicNow();
           }
-          if (streamerClosed || hasError || !dfsClient.clientRunning) {
+          if (shouldStop()) {
             continue;
           }
           // get packet to be sent.
           if (dataQueue.isEmpty()) {
             one = createHeartbeatPacket();
-            assert one != null;
           } else {
             try {
               backOffIfNecessary();
@@ -460,7 +584,7 @@ class DataStreamer extends Daemon {
             LOG.debug("Append to block " + block);
           }
           setupPipelineForAppendOrRecovery();
-          if (true == streamerClosed) {
+          if (streamerClosed) {
             continue;
           }
           initDataStreaming();
@@ -478,8 +602,7 @@ class DataStreamer extends Daemon {
         if (one.isLastPacketInBlock()) {
           // wait for all data packets have been successfully acked
           synchronized (dataQueue) {
-            while (!streamerClosed && !hasError &&
-                ackQueue.size() != 0 && dfsClient.clientRunning) {
+            while (!shouldStop() && ackQueue.size() != 0) {
               try {
                 // wait for acks to arrive from datanodes
                 dataQueue.wait(1000);
@@ -488,7 +611,7 @@ class DataStreamer extends Daemon {
               }
             }
           }
-          if (streamerClosed || hasError || !dfsClient.clientRunning) {
+          if (shouldStop()) {
             continue;
           }
           stage = BlockConstructionStage.PIPELINE_CLOSE;
@@ -524,7 +647,7 @@ class DataStreamer extends Daemon {
           // effect. Pipeline recovery can handle only one node error at a
           // time. If the primary node fails again during the recovery, it
           // will be taken out then.
-          tryMarkPrimaryDatanodeFailed();
+          errorState.markFirstNodeIfNotMarked();
           throw e;
         } finally {
           writeScope.close();
@@ -537,7 +660,7 @@ class DataStreamer extends Daemon {
           bytesSent = tmpBytesSent;
         }
 
-        if (streamerClosed || hasError || !dfsClient.clientRunning) {
+        if (shouldStop()) {
           continue;
         }
 
@@ -545,12 +668,11 @@ class DataStreamer extends Daemon {
         if (one.isLastPacketInBlock()) {
           // wait for the close packet has been acked
           synchronized (dataQueue) {
-            while (!streamerClosed && !hasError &&
-                ackQueue.size() != 0 && dfsClient.clientRunning) {
+            while (!shouldStop() && ackQueue.size() != 0) {
               dataQueue.wait(1000);// wait for acks to arrive from datanodes
             }
           }
-          if (streamerClosed || hasError || !dfsClient.clientRunning) {
+          if (shouldStop()) {
             continue;
           }
 
@@ -564,7 +686,7 @@ class DataStreamer extends Daemon {
         }
       } catch (Throwable e) {
         // Log warning if there was a real error.
-        if (restartingNodeIndex.get() == -1) {
+        if (!errorState.isRestartingNode()) {
           // Since their messages are descriptive enough, do not always
           // log a verbose stack-trace WARN for quota exceptions.
           if (e instanceof QuotaExceededException) {
@@ -575,8 +697,8 @@ class DataStreamer extends Daemon {
         }
         lastException.set(e);
         assert !(e instanceof NullPointerException);
-        hasError = true;
-        if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
+        errorState.setError(true);
+        if (!errorState.isNodeMarked()) {
           // Not a datanode issue
           streamerClosed = true;
         }
@@ -773,40 +895,6 @@ class DataStreamer extends Daemon {
     }
   }
 
-  // The following synchronized methods are used whenever
-  // errorIndex or restartingNodeIndex is set. This is because
-  // check & set needs to be atomic. Simply reading variables
-  // does not require a synchronization. When responder is
-  // not running (e.g. during pipeline recovery), there is no
-  // need to use these methods.
-
-  /** Set the error node index. Called by responder */
-  synchronized void setErrorIndex(int idx) {
-    errorIndex = idx;
-  }
-
-  /** Set the restarting node index. Called by responder */
-  synchronized void setRestartingNodeIndex(int idx) {
-    restartingNodeIndex.set(idx);
-    // If the data streamer has already set the primary node
-    // bad, clear it. It is likely that the write failed due to
-    // the DN shutdown. Even if it was a real failure, the pipeline
-    // recovery will take care of it.
-    errorIndex = -1;
-  }
-
-  /**
-   * This method is used when no explicit error report was received,
-   * but something failed. When the primary node is a suspect or
-   * unsure about the cause, the primary node is marked as failed.
-   */
-  synchronized void tryMarkPrimaryDatanodeFailed() {
-    // There should be no existing error and no ongoing restart.
-    if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) {
-      errorIndex = 0;
-    }
-  }
-
   /**
    * Examine whether it is worth waiting for a node to restart.
    * @param index the node index
@@ -883,20 +971,16 @@ class DataStreamer extends Daemon {
             // the local node or the only one in the pipeline.
             if (PipelineAck.isRestartOOBStatus(reply) &&
                 shouldWaitForRestart(i)) {
-              restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
-                  + Time.monotonicNow();
-              setRestartingNodeIndex(i);
-              String message = "A datanode is restarting: " + targets[i];
-              LOG.info(message);
+              final String message = "Datanode " + i + " is restarting: "
+                  + targets[i];
+              errorState.initRestartingNode(i, message);
               throw new IOException(message);
             }
             // node error
             if (reply != SUCCESS) {
-              setErrorIndex(i); // first bad datanode
+              errorState.setBadNodeIndex(i); // mark bad datanode
               throw new IOException("Bad response " + reply +
-                  " for block " + block +
-                  " from datanode " +
-                  targets[i]);
+                  " for " + block + " from datanode " + targets[i]);
             }
           }
 
@@ -954,14 +1038,12 @@ class DataStreamer extends Daemon {
         } catch (Exception e) {
           if (!responderClosed) {
             lastException.set(e);
-            hasError = true;
-            // If no explicit error report was received, mark the primary
-            // node as failed.
-            tryMarkPrimaryDatanodeFailed();
+            errorState.setError(true);
+            errorState.markFirstNodeIfNotMarked();
             synchronized (dataQueue) {
               dataQueue.notifyAll();
             }
-            if (restartingNodeIndex.get() == -1) {
+            if (!errorState.isRestartingNode()) {
               LOG.warn("Exception for " + block, e);
             }
             responderClosed = true;
@@ -978,11 +1060,16 @@ class DataStreamer extends Daemon {
     }
   }
 
-  // If this stream has encountered any errors so far, shutdown
-  // threads and mark stream as closed. Returns true if we should
-  // sleep for a while after returning from this call.
-  //
+  /**
+   * If this stream has encountered any errors, shutdown threads
+   * and mark the stream as closed.
+   *
+   * @return true if it should sleep for a while after returning.
+   */
   private boolean processDatanodeError() throws IOException {
+    if (!errorState.hasDatanodeError()) {
+      return false;
+    }
     if (response != null) {
       LOG.info("Error Recovery for " + block +
           " waiting for responder to exit. ");
@@ -1064,7 +1151,7 @@ class DataStreamer extends Daemon {
               .append("The current failed datanode replacement policy is ")
               .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
               .append("a client may configure this via '")
-              
.append(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
+              .append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
               .append("' in its configuration.")
               .toString());
     }
@@ -1190,157 +1277,141 @@ class DataStreamer extends Daemon {
     boolean success = false;
     long newGS = 0L;
     while (!success && !streamerClosed && dfsClient.clientRunning) {
-      // Sleep before reconnect if a dn is restarting.
-      // This process will be repeated until the deadline or the datanode
-      // starts back up.
-      if (restartingNodeIndex.get() >= 0) {
-        // 4 seconds or the configured deadline period, whichever is shorter.
-        // This is the retry interval and recovery will be retried in this
-        // interval until timeout or success.
-        long delay = Math.min(dfsClient.getConf().getDatanodeRestartTimeout(),
-            4000L);
-        try {
-          Thread.sleep(delay);
-        } catch (InterruptedException ie) {
-          lastException.set(new IOException("Interrupted while waiting for " +
-              "datanode to restart. " + nodes[restartingNodeIndex.get()]));
-          streamerClosed = true;
-          return false;
-        }
+      if (!handleRestartingDatanode()) {
+        return false;
       }
-      boolean isRecovery = hasError;
-      // remove bad datanode from list of datanodes.
-      // If errorIndex was not set (i.e. appends), then do not remove
-      // any datanodes
-      //
-      if (errorIndex >= 0) {
-        StringBuilder pipelineMsg = new StringBuilder();
-        for (int j = 0; j < nodes.length; j++) {
-          pipelineMsg.append(nodes[j]);
-          if (j < nodes.length - 1) {
-            pipelineMsg.append(", ");
-          }
-        }
-        if (nodes.length <= 1) {
-          lastException.set(new IOException("All datanodes " + pipelineMsg
-              + " are bad. Aborting..."));
-          streamerClosed = true;
-          return false;
-        }
-        LOG.warn("Error Recovery for block " + block +
-            " in pipeline " + pipelineMsg +
-            ": bad datanode " + nodes[errorIndex]);
-        failed.add(nodes[errorIndex]);
-
-        DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
-        arraycopy(nodes, newnodes, errorIndex);
-
-        final StorageType[] newStorageTypes = new StorageType[newnodes.length];
-        arraycopy(storageTypes, newStorageTypes, errorIndex);
-
-        final String[] newStorageIDs = new String[newnodes.length];
-        arraycopy(storageIDs, newStorageIDs, errorIndex);
-
-        setPipeline(newnodes, newStorageTypes, newStorageIDs);
-
-        // Just took care of a node error while waiting for a node restart
-        if (restartingNodeIndex.get() >= 0) {
-          // If the error came from a node further away than the restarting
-          // node, the restart must have been complete.
-          if (errorIndex > restartingNodeIndex.get()) {
-            restartingNodeIndex.set(-1);
-          } else if (errorIndex < restartingNodeIndex.get()) {
-            // the node index has shifted.
-            restartingNodeIndex.decrementAndGet();
-          } else {
-            // this shouldn't happen...
-            assert false;
-          }
-        }
 
-        if (restartingNodeIndex.get() == -1) {
-          hasError = false;
-        }
-        lastException.clear();
-        errorIndex = -1;
+      final boolean isRecovery = errorState.hasError();
+      if (!handleBadDatanode()) {
+        return false;
       }
 
-      // Check if replace-datanode policy is satisfied.
-      if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
-          nodes, isAppend, isHflushed)) {
-        try {
-          addDatanode2ExistingPipeline();
-        } catch(IOException ioe) {
-          if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
-            throw ioe;
-          }
-          LOG.warn("Failed to replace datanode."
-              + " Continue with the remaining datanodes since "
-              + 
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
-              + " is set to true.", ioe);
-        }
-      }
+      handleDatanodeReplacement();
 
       // get a new generation stamp and an access token
-      LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, 
dfsClient.clientName);
+      final LocatedBlock lb = updateBlockForPipeline();
       newGS = lb.getBlock().getGenerationStamp();
       accessToken = lb.getBlockToken();
 
       // set up the pipeline again with the remaining nodes
-      if (failPacket) { // for testing
-        success = createBlockOutputStream(nodes, storageTypes, newGS, 
isRecovery);
-        failPacket = false;
-        try {
-          // Give DNs time to send in bad reports. In real situations,
-          // good reports should follow bad ones, if client committed
-          // with those nodes.
-          Thread.sleep(2000);
-        } catch (InterruptedException ie) {}
-      } else {
-        success = createBlockOutputStream(nodes, storageTypes, newGS, 
isRecovery);
-      }
+      success = createBlockOutputStream(nodes, storageTypes, newGS, 
isRecovery);
 
-      if (restartingNodeIndex.get() >= 0) {
-        assert hasError == true;
-        // check errorIndex set above
-        if (errorIndex == restartingNodeIndex.get()) {
-          // ignore, if came from the restarting node
-          errorIndex = -1;
-        }
-        // still within the deadline
-        if (Time.monotonicNow() < restartDeadline) {
-          continue; // with in the deadline
-        }
-        // expired. declare the restarting node dead
-        restartDeadline = 0;
-        int expiredNodeIndex = restartingNodeIndex.get();
-        restartingNodeIndex.set(-1);
-        LOG.warn("Datanode did not restart in time: " +
-            nodes[expiredNodeIndex]);
-        // Mark the restarting node as failed. If there is any other failed
-        // node during the last pipeline construction attempt, it will not be
-        // overwritten/dropped. In this case, the restarting node will get
-        // excluded in the following attempt, if it still does not come up.
-        if (errorIndex == -1) {
-          errorIndex = expiredNodeIndex;
-        }
-        // From this point on, normal pipeline recovery applies.
-      }
+      failPacket4Testing();
+
+      errorState.checkRestartingNodeDeadline(nodes);
     } // while
 
     if (success) {
-      // update pipeline at the namenode
-      ExtendedBlock newBlock = new ExtendedBlock(
-          block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), 
newGS);
-      dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
-          nodes, storageIDs);
-      // update client side generation stamp
-      block = newBlock;
+      block = updatePipeline(newGS);
     }
     return false; // do not sleep, continue processing
   }
 
   /**
+   * Sleep if a node is restarting.
+   * This process is repeated until the deadline or the node starts back up.
+   * @return true if it should continue.
+   */
+  private boolean handleRestartingDatanode() {
+    if (errorState.isRestartingNode()) {
+      // 4 seconds or the configured deadline period, whichever is shorter.
+      // This is the retry interval and recovery will be retried in this
+      // interval until timeout or success.
+      final long delay = Math.min(errorState.datanodeRestartTimeout, 4000L);
+      try {
+        Thread.sleep(delay);
+      } catch (InterruptedException ie) {
+        lastException.set(new IOException(
+            "Interrupted while waiting for restarting "
+            + nodes[errorState.getRestartingNodeIndex()]));
+        streamerClosed = true;
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Remove bad node from list of nodes if badNodeIndex was set.
+   * @return true if it should continue.
+   */
+  private boolean handleBadDatanode() {
+    final int badNodeIndex = errorState.getBadNodeIndex();
+    if (badNodeIndex >= 0) {
+      if (nodes.length <= 1) {
+        lastException.set(new IOException("All datanodes "
+            + Arrays.toString(nodes) + " are bad. Aborting..."));
+        streamerClosed = true;
+        return false;
+      }
+
+      LOG.warn("Error Recovery for " + block + " in pipeline "
+          + Arrays.toString(nodes) + ": datanode " + badNodeIndex
+          + "("+ nodes[badNodeIndex] + ") is bad.");
+      failed.add(nodes[badNodeIndex]);
+
+      DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
+      arraycopy(nodes, newnodes, badNodeIndex);
+
+      final StorageType[] newStorageTypes = new StorageType[newnodes.length];
+      arraycopy(storageTypes, newStorageTypes, badNodeIndex);
+
+      final String[] newStorageIDs = new String[newnodes.length];
+      arraycopy(storageIDs, newStorageIDs, badNodeIndex);
+
+      setPipeline(newnodes, newStorageTypes, newStorageIDs);
+
+      errorState.adjustState4RestartingNode();
+      lastException.clear();
+    }
+    return true;
+  }
+
+  /** Add a datanode if replace-datanode policy is satisfied. */
+  private void handleDatanodeReplacement() throws IOException {
+    if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
+        nodes, isAppend, isHflushed)) {
+      try {
+        addDatanode2ExistingPipeline();
+      } catch(IOException ioe) {
+        if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
+          throw ioe;
+        }
+        LOG.warn("Failed to replace datanode."
+            + " Continue with the remaining datanodes since "
+            + BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
+            + " is set to true.", ioe);
+      }
+    }
+  }
+
+  private void failPacket4Testing() {
+    if (failPacket) { // for testing
+      failPacket = false;
+      try {
+        // Give DNs time to send in bad reports. In real situations,
+        // good reports should follow bad ones, if client committed
+        // with those nodes.
+        Thread.sleep(2000);
+      } catch (InterruptedException ie) {}
+    }
+  }
+
+  LocatedBlock updateBlockForPipeline() throws IOException {
+    return dfsClient.namenode.updateBlockForPipeline(
+        block, dfsClient.clientName);
+  }
+
+  /** update pipeline at the namenode */
+  ExtendedBlock updatePipeline(long newGS) throws IOException {
+    final ExtendedBlock newBlock = new ExtendedBlock(
+        block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), 
newGS);
+    dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
+        nodes, storageIDs);
+    return newBlock;
+  }
+
+  /**
    * Open a DataStreamer to a DataNode so that it can be written to.
    * This happens when a file is created and each time a new block is 
allocated.
    * Must get block ID and the IDs of the destinations from the namenode.
@@ -1354,9 +1425,8 @@ class DataStreamer extends Daemon {
     boolean success = false;
     ExtendedBlock oldBlock = block;
     do {
-      hasError = false;
+      errorState.reset();
       lastException.clear();
-      errorIndex = -1;
       success = false;
 
       DatanodeInfo[] excluded =
@@ -1382,8 +1452,9 @@ class DataStreamer extends Daemon {
         dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
             dfsClient.clientName);
         block = null;
-        LOG.info("Excluding datanode " + nodes[errorIndex]);
-        excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
+        final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
+        LOG.info("Excluding datanode " + badNode);
+        excludedNodes.put(badNode, badNode);
       }
     } while (!success && --count >= 0);
 
@@ -1464,7 +1535,7 @@ class DataStreamer extends Daemon {
         // from the local datanode. Thus it is safe to treat this as a
         // regular node error.
         if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
-            restartingNodeIndex.get() == -1) {
+            !errorState.isRestartingNode()) {
           checkRestart = true;
           throw new IOException("A datanode is restarting.");
         }
@@ -1475,10 +1546,9 @@ class DataStreamer extends Daemon {
         assert null == blockStream : "Previous blockStream unclosed";
         blockStream = out;
         result =  true; // success
-        restartingNodeIndex.set(-1);
-        hasError = false;
+        errorState.reset();
       } catch (IOException ie) {
-        if (restartingNodeIndex.get() == -1) {
+        if (!errorState.isRestartingNode()) {
           LOG.info("Exception in createBlockOutputStream", ie);
         }
         if (ie instanceof InvalidEncryptionKeyException && 
refetchEncryptionKey > 0) {
@@ -1498,24 +1568,21 @@ class DataStreamer extends Daemon {
           for (int i = 0; i < nodes.length; i++) {
             // NB: Unconditionally using the xfer addr w/o hostname
             if (firstBadLink.equals(nodes[i].getXferAddr())) {
-              errorIndex = i;
+              errorState.setBadNodeIndex(i);
               break;
             }
           }
         } else {
           assert checkRestart == false;
-          errorIndex = 0;
+          errorState.setBadNodeIndex(0);
         }
+
+        final int i = errorState.getBadNodeIndex();
         // Check whether there is a restart worth waiting for.
-        if (checkRestart && shouldWaitForRestart(errorIndex)) {
-          restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
-              + Time.monotonicNow();
-          restartingNodeIndex.set(errorIndex);
-          errorIndex = -1;
-          LOG.info("Waiting for the datanode to be restarted: " +
-              nodes[restartingNodeIndex.get()]);
+        if (checkRestart && shouldWaitForRestart(i)) {
+          errorState.initRestartingNode(i, "Datanode " + i + " is restarting: 
" + nodes[i]);
         }
-        hasError = true;
+        errorState.setError(true);
         lastException.set(ie);
         result =  false;  // error
       } finally {
@@ -1699,10 +1766,10 @@ class DataStreamer extends Daemon {
     return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false);
   }
 
-  private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() {
-    return CacheBuilder.newBuilder().expireAfterWrite(
-        dfsClient.getConf().getExcludedNodesCacheExpiry(),
-        TimeUnit.MILLISECONDS)
+  private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes(
+      long excludedNodesCacheExpiry) {
+    return CacheBuilder.newBuilder()
+        .expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
         .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
           @Override
           public void onRemoval(

Reply via email to