Author: szetszwo
Date: Tue Jun 26 09:43:06 2012
New Revision: 1353894

URL: http://svn.apache.org/viewvc?rev=1353894&view=rev
Log:
svn merge -c 1353893 from branch-1 for HDFS-3504.

Modified:
    hadoop/common/branches/branch-1.1/   (props changed)
    hadoop/common/branches/branch-1.1/CHANGES.txt   (contents, props changed)
    
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java
    
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java
    hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java
    
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Propchange: hadoop/common/branches/branch-1.1/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1:r1353893

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Jun 26 09:43:06 2012
@@ -103,6 +103,10 @@ Release 1.1.0 - unreleased
 
     HADOOP-8430. Backport new FileSystem methods introduced by HADOOP-8014. 
(eli)
 
+    HDFS-3504. Support configurable retry policy in DFSClient for RPC
+    connections and RPC calls, and add MultipleLinearRandomRetry, a new retry
+    policy.  (szetszwo)
+
   BUG FIXES
 
     MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can

Propchange: hadoop/common/branches/branch-1.1/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1353893

Modified: 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java
 Tue Jun 26 09:43:06 2012
@@ -35,14 +35,17 @@ class RetryInvocationHandler implements 
   private Map<String,RetryPolicy> methodNameToPolicyMap;
   
   public RetryInvocationHandler(Object implementation, RetryPolicy 
retryPolicy) {
-    this.implementation = implementation;
-    this.defaultPolicy = retryPolicy;
-    this.methodNameToPolicyMap = Collections.emptyMap();
+    this(implementation, retryPolicy, Collections.<String, 
RetryPolicy>emptyMap());
   }
   
   public RetryInvocationHandler(Object implementation, Map<String, 
RetryPolicy> methodNameToPolicyMap) {
+    this(implementation, RetryPolicies.TRY_ONCE_THEN_FAIL, 
methodNameToPolicyMap);
+  }
+
+  public RetryInvocationHandler(Object implementation,
+      RetryPolicy defaultPolicy, Map<String, RetryPolicy> 
methodNameToPolicyMap) {
     this.implementation = implementation;
-    this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+    this.defaultPolicy = defaultPolicy;
     this.methodNameToPolicyMap = methodNameToPolicyMap;
   }
 

Modified: 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java
 Tue Jun 26 09:43:06 2012
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.io.retry;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
@@ -32,7 +36,15 @@ import org.apache.hadoop.ipc.RemoteExcep
  * </p>
  */
 public class RetryPolicies {
-  
+  private static final Log LOG = LogFactory.getLog(RetryPolicies.class);
+
+  private static ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
+    @Override
+    protected Random initialValue() {
+      return new Random();
+    }
+  };
+
   /**
    * <p>
    * Try once, and fail by re-throwing the exception.
@@ -139,17 +151,35 @@ public class RetryPolicies {
     }
   }
   
+  /**
+   * Retry up to maxRetries.
+   * The actual sleep time of the n-th retry is f(n, sleepTime),
+   * where f is a function provided by the subclass implementation.
+   *
+   * The object of the subclasses should be immutable;
+   * otherwise, the subclass must override hashCode(), equals(..) and 
toString().
+   */
   static abstract class RetryLimited implements RetryPolicy {
-    int maxRetries;
-    long sleepTime;
-    TimeUnit timeUnit;
+    final int maxRetries;
+    final long sleepTime;
+    final TimeUnit timeUnit;
+
+    private String myString;
     
-    public RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+    RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+      if (maxRetries < 0) {
+        throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 
0");
+      }
+      if (sleepTime < 0) {
+        throw new IllegalArgumentException("sleepTime = " + sleepTime + " < 
0");
+      }
+
       this.maxRetries = maxRetries;
       this.sleepTime = sleepTime;
       this.timeUnit = timeUnit;
     }
 
+    @Override
     public boolean shouldRetry(Exception e, int retries) throws Exception {
       if (retries >= maxRetries) {
         throw e;
@@ -163,6 +193,30 @@ public class RetryPolicies {
     }
     
     protected abstract long calculateSleepTime(int retries);
+    
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+    
+    @Override
+    public boolean equals(final Object that) {
+      if (this == that) {
+        return true;
+      } else if (that == null || this.getClass() != that.getClass()) {
+        return false;
+      }
+      return this.toString().equals(that.toString());
+    }
+
+    @Override
+    public String toString() {
+      if (myString == null) {
+        myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries
+            + ", sleepTime=" + sleepTime + " " + timeUnit + ")";
+      }
+      return myString;
+    }
   }
   
   static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited {
@@ -192,6 +246,169 @@ public class RetryPolicies {
       return sleepTime * (retries + 1);
     }
   }
+
+  /**
+   * Given pairs of number of retries and sleep time (n0, t0), (n1, t1), ...,
+   * the first n0 retries sleep t0 milliseconds on average,
+   * the following n1 retries sleep t1 milliseconds on average, and so on.
+   * 
+   * For all the sleep, the actual sleep time is randomly uniform distributed
+   * in the close interval [0.5t, 1.5t], where t is the sleep time specified.
+   *
+   * The objects of this class are immutable.
+   */
+  public static class MultipleLinearRandomRetry implements RetryPolicy {
+    /** Pairs of numRetries and sleepSeconds */
+    public static class Pair {
+      final int numRetries;
+      final int sleepMillis;
+      
+      public Pair(final int numRetries, final int sleepMillis) {
+        if (numRetries < 0) {
+          throw new IllegalArgumentException("numRetries = " + numRetries+" < 
0");
+        }
+        if (sleepMillis < 0) {
+          throw new IllegalArgumentException("sleepMillis = " + sleepMillis + 
" < 0");
+        }
+
+        this.numRetries = numRetries;
+        this.sleepMillis = sleepMillis;
+      }
+      
+      @Override
+      public String toString() {
+        return numRetries + "x" + sleepMillis + "ms";
+      }
+    }
+
+    private final List<Pair> pairs;
+    private String myString;
+
+    public MultipleLinearRandomRetry(List<Pair> pairs) {
+      if (pairs == null || pairs.isEmpty()) {
+        throw new IllegalArgumentException("pairs must be neither null nor 
empty.");
+      }
+      this.pairs = Collections.unmodifiableList(pairs);
+    }
+
+    @Override
+    public boolean shouldRetry(Exception e, int curRetry) throws Exception {
+      final Pair p = searchPair(curRetry);
+      if (p == null) {
+        //no more retries, re-throw the original exception.
+        throw e;
+      }
+
+      //sleep and return true.
+      //If the sleep is interrupted, throw the InterruptedException out.
+      final double ratio = RANDOM.get().nextDouble() + 0.5;//0.5 <= ratio <=1.5
+      Thread.sleep(Math.round(p.sleepMillis * ratio));
+      return true;
+    }
+
+    /**
+     * Given the current number of retry, search the corresponding pair.
+     * @return the corresponding pair,
+     *   or null if the current number of retry > maximum number of retry. 
+     */
+    private Pair searchPair(int curRetry) {
+      int i = 0;
+      for(; i < pairs.size() && curRetry > pairs.get(i).numRetries; i++) {
+        curRetry -= pairs.get(i).numRetries;
+      }
+      return i == pairs.size()? null: pairs.get(i);
+    }
+    
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+    
+    @Override
+    public boolean equals(final Object that) {
+      if (this == that) {
+        return true;
+      } else if (that == null || this.getClass() != that.getClass()) {
+        return false;
+      }
+      return this.toString().equals(that.toString());
+    }
+
+    @Override
+    public String toString() {
+      if (myString == null) {
+        myString = getClass().getSimpleName() + pairs;
+      }
+      return myString;
+    }
+
+    /**
+     * Parse the given string as a MultipleLinearRandomRetry object.
+     * The format of the string is "t_1, n_1, t_2, n_2, ...",
+     * where t_i and n_i are the i-th pair of sleep time and number of retires.
+     * Note that the white spaces in the string are ignored.
+     *
+     * @return the parsed object, or null if the parsing fails.
+     */
+    public static MultipleLinearRandomRetry parseCommaSeparatedString(String 
s) {
+      final String[] elements = s.split(",");
+      if (elements.length == 0) {
+        LOG.warn("Illegal value: there is no element in \"" + s + "\".");
+        return null;
+      }
+      if (elements.length % 2 != 0) {
+        LOG.warn("Illegal value: the number of elements in \"" + s + "\" is "
+            + elements.length + " but an even number of elements is 
expected.");
+        return null;
+      }
+
+      final List<RetryPolicies.MultipleLinearRandomRetry.Pair> pairs
+          = new ArrayList<RetryPolicies.MultipleLinearRandomRetry.Pair>();
+   
+      for(int i = 0; i < elements.length; ) {
+        //parse the i-th sleep-time
+        final int sleep = parsePositiveInt(elements, i++, s);
+        if (sleep == -1) {
+          return null; //parse fails
+        }
+
+        //parse the i-th number-of-retries
+        final int retries = parsePositiveInt(elements, i++, s);
+        if (retries == -1) {
+          return null; //parse fails
+        }
+
+        pairs.add(new RetryPolicies.MultipleLinearRandomRetry.Pair(retries, 
sleep));
+      }
+      return new RetryPolicies.MultipleLinearRandomRetry(pairs);
+    }
+
+    /**
+     * Parse the i-th element as an integer.
+     * @return -1 if the parsing fails or the parsed value <= 0;
+     *   otherwise, return the parsed value.
+     */
+    private static int parsePositiveInt(final String[] elements,
+        final int i, final String originalString) {
+      final String s = elements[i].trim();
+      final int n;
+      try {
+        n = Integer.parseInt(s);
+      } catch(NumberFormatException nfe) {
+        LOG.warn("Failed to parse \"" + s + "\", which is the index " + i
+            + " element in \"" + originalString + "\"", nfe);
+        return -1;
+      }
+
+      if (n <= 0) {
+        LOG.warn("The value " + n + " <= 0: it is parsed from the string \""
+            + s + "\" which is the index " + i + " element in \""
+            + originalString + "\"");
+        return -1;
+      }
+      return n;
+    }
+  }
   
   static class ExceptionDependentRetry implements RetryPolicy {
 
@@ -248,6 +465,13 @@ public class RetryPolicies {
     public ExponentialBackoffRetry(
         int maxRetries, long sleepTime, TimeUnit timeUnit) {
       super(maxRetries, sleepTime, timeUnit);
+
+      if (maxRetries < 0) {
+        throw new IllegalArgumentException("maxRetries = " + maxRetries + " < 
0");
+      } else if (maxRetries > 30) {
+        //if maxRetries > 30, calculateSleepTime will overflow. 
+        throw new IllegalArgumentException("maxRetries = " + maxRetries + " > 
30");
+      }
     }
     
     @Override

Modified: 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java
 Tue Jun 26 09:43:06 2012
@@ -65,4 +65,13 @@ public class RetryProxy {
                                   new RetryInvocationHandler(implementation, 
methodNameToPolicyMap)
                                   );
   }
+
+  public static Object create(Class<?> iface, Object implementation,
+      RetryPolicy defaultPolicy, Map<String,RetryPolicy> 
methodNameToPolicyMap) {
+    return Proxy.newProxyInstance(
+        implementation.getClass().getClassLoader(),
+        new Class<?>[] { iface },
+        new RetryInvocationHandler(implementation, defaultPolicy, 
methodNameToPolicyMap)
+        );
+  }
 }

Modified: 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java 
(original)
+++ 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java 
Tue Jun 26 09:43:06 2012
@@ -18,40 +18,41 @@
 
 package org.apache.hadoop.ipc;
 
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.net.ConnectException;
-
-import java.io.IOException;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.FilterInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
-import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.*;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SaslRpcClient;
@@ -60,8 +61,8 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
@@ -71,7 +72,9 @@ import org.apache.hadoop.util.Reflection
  * @see Server
  */
 public class Client {
-  
+  public static final String IPC_CLIENT_CONNECT_MAX_RETRIES_KEY = 
"ipc.client.connect.max.retries";
+  public static final int    IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
+
   public static final Log LOG =
     LogFactory.getLog(Client.class);
   private Hashtable<ConnectionId, Connection> connections =
@@ -197,9 +200,10 @@ public class Client {
     private int rpcTimeout;
     private int maxIdleTime; //connections will be culled if it was idle for
          //maxIdleTime msecs
-    private int maxRetries; //the max. no. of retries for socket connections
+    private final RetryPolicy connectionRetryPolicy;
     private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private int pingInterval; // how often sends ping to the server in msecs
+
     
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
@@ -215,7 +219,7 @@ public class Client {
                                        remoteId.getAddress().getHostName());
       }
       this.maxIdleTime = remoteId.getMaxIdleTime();
-      this.maxRetries = remoteId.getMaxRetries();
+      this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
       this.tcpNoDelay = remoteId.getTcpNoDelay();
       this.pingInterval = remoteId.getPingInterval();
       if (LOG.isDebugEnabled()) {
@@ -453,7 +457,7 @@ public class Client {
           if (updateAddress()) {
             timeoutFailures = ioFailures = 0;
           }
-          handleConnectionFailure(ioFailures++, maxRetries, ie);
+          handleConnectionFailure(ioFailures++, ie);
         }
       }
     }
@@ -663,8 +667,26 @@ public class Client {
         Thread.sleep(1000);
       } catch (InterruptedException ignored) {}
       
-      LOG.info("Retrying connect to server: " + server + 
-          ". Already tried " + curRetries + " time(s).");
+      LOG.info("Retrying connect to server: " + server + ". Already tried "
+          + curRetries + " time(s); maxRetries=" + maxRetries);
+    }
+
+    private void handleConnectionFailure(int curRetries, IOException ioe
+        ) throws IOException {
+      closeConnection();
+
+      final boolean retry;
+      try {
+        retry = connectionRetryPolicy.shouldRetry(ioe, curRetries);
+      } catch(Exception e) {
+        throw e instanceof IOException? (IOException)e: new IOException(e);
+      }
+      if (!retry) {
+        throw ioe;
+      }
+
+      LOG.info("Retrying connect to server: " + server + ". Already tried "
+          + curRetries + " time(s); retry policy is " + connectionRetryPolicy);
     }
 
     /* Write the RPC header */
@@ -1220,14 +1242,15 @@ public class Client {
      private String serverPrincipal;
      private int maxIdleTime; //connections will be culled if it was idle for 
      //maxIdleTime msecs
-     private int maxRetries; //the max. no. of retries for socket connections
+     private final RetryPolicy connectionRetryPolicy;
      private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
      private int pingInterval; // how often sends ping to the server in msecs
+
      
      ConnectionId(InetSocketAddress address, Class<?> protocol, 
                   UserGroupInformation ticket, int rpcTimeout,
                   String serverPrincipal, int maxIdleTime, 
-                  int maxRetries, boolean tcpNoDelay,
+                  RetryPolicy connectionRetryPolicy, boolean tcpNoDelay,
                   int pingInterval) {
        this.protocol = protocol;
        this.address = address;
@@ -1235,7 +1258,7 @@ public class Client {
        this.rpcTimeout = rpcTimeout;
        this.serverPrincipal = serverPrincipal;
        this.maxIdleTime = maxIdleTime;
-       this.maxRetries = maxRetries;
+       this.connectionRetryPolicy = connectionRetryPolicy;
        this.tcpNoDelay = tcpNoDelay;
        this.pingInterval = pingInterval;
      }
@@ -1264,10 +1287,6 @@ public class Client {
        return maxIdleTime;
      }
      
-     int getMaxRetries() {
-       return maxRetries;
-     }
-     
      boolean getTcpNoDelay() {
        return tcpNoDelay;
      }
@@ -1285,11 +1304,26 @@ public class Client {
      static ConnectionId getConnectionId(InetSocketAddress addr,
          Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
          Configuration conf) throws IOException {
+       return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
+     }
+
+     static ConnectionId getConnectionId(InetSocketAddress addr,
+         Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+         RetryPolicy connectionRetryPolicy, Configuration conf) throws 
IOException {
+
+       if (connectionRetryPolicy == null) {
+         final int max = conf.getInt(
+             IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+             IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
+         connectionRetryPolicy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+             max, 1, TimeUnit.SECONDS);
+       }
+
        String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
        return new ConnectionId(addr, protocol, ticket,
            rpcTimeout, remotePrincipal,
            conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
-           conf.getInt("ipc.client.connect.max.retries", 10),
+           connectionRetryPolicy,
            conf.getBoolean("ipc.client.tcpnodelay", false),
            Client.getPingInterval(conf));
      }
@@ -1326,7 +1360,7 @@ public class Client {
          ConnectionId that = (ConnectionId) obj;
          return isEqual(this.address, that.address)
              && this.maxIdleTime == that.maxIdleTime
-             && this.maxRetries == that.maxRetries
+             && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy)
              && this.pingInterval == that.pingInterval
              && isEqual(this.protocol, that.protocol)
              && this.rpcTimeout == that.rpcTimeout
@@ -1339,10 +1373,9 @@ public class Client {
      
      @Override
      public int hashCode() {
-       int result = 1;
+       int result = connectionRetryPolicy.hashCode();
        result = PRIME * result + ((address == null) ? 0 : address.hashCode());
        result = PRIME * result + maxIdleTime;
-       result = PRIME * result + maxRetries;
        result = PRIME * result + pingInterval;
        result = PRIME * result + ((protocol == null) ? 0 : 
protocol.hashCode());
        result = PRIME * rpcTimeout;

Modified: 
hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java 
(original)
+++ hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java 
Tue Jun 26 09:43:06 2012
@@ -18,31 +18,35 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Method;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.lang.reflect.Array;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
-
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.io.*;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.Map;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.conf.*;
-
-import org.apache.hadoop.net.NetUtils;
 
 /** A simple RPC mechanism.
  *
@@ -205,12 +209,12 @@ public class RPC {
     private Client client;
     private boolean isClosed = false;
 
-    public Invoker(Class<? extends VersionedProtocol> protocol,
+    private Invoker(Class<? extends VersionedProtocol> protocol,
         InetSocketAddress address, UserGroupInformation ticket,
         Configuration conf, SocketFactory factory,
-        int rpcTimeout) throws IOException {
+        int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
-          ticket, rpcTimeout, conf);
+          ticket, rpcTimeout, connectionRetryPolicy, conf);
       this.client = CLIENTS.getClient(conf, factory);
     }
 
@@ -385,14 +389,25 @@ public class RPC {
       Class<? extends VersionedProtocol> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
       Configuration conf, SocketFactory factory, int rpcTimeout) throws 
IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+        rpcTimeout, null);
+  }
+
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol,
+      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
+      Configuration conf, SocketFactory factory, int rpcTimeout,
+      RetryPolicy connectionRetryPolicy) throws IOException {
 
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
     }
-    VersionedProtocol proxy =
-        (VersionedProtocol) Proxy.newProxyInstance(
-            protocol.getClassLoader(), new Class[] { protocol },
-            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
+    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
+        rpcTimeout, connectionRetryPolicy);
+    VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[]{protocol}, invoker);
     long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
                                                   clientVersion);
     if (serverVersion == clientVersion) {

Modified: 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
 Tue Jun 26 09:43:06 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -115,7 +116,7 @@ public class DFSClient implements FSCons
   public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
       Configuration conf) throws IOException {
     return createNamenode(createRPCNamenode(nameNodeAddr, conf,
-      UserGroupInformation.getCurrentUser()));
+      UserGroupInformation.getCurrentUser()), conf);
   }
 
   private static ClientProtocol createRPCNamenode(InetSocketAddress 
nameNodeAddr,
@@ -123,11 +124,93 @@ public class DFSClient implements FSCons
     throws IOException {
     return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
         ClientProtocol.versionID, nameNodeAddr, ugi, conf,
-        NetUtils.getSocketFactory(conf, ClientProtocol.class));
+        NetUtils.getSocketFactory(conf, ClientProtocol.class), 0,
+        getMultipleLinearRandomRetry(conf));
   }
 
-  private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
-    throws IOException {
+  /**
+   * Return the default retry policy used in RPC.
+   * 
+   * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
+   * 
+   * Otherwise, 
+   * (1) use multipleLinearRandomRetry for
+   *     - SafeModeException, or
+   *     - IOException other than RemoteException; and
+   * (2) use TRY_ONCE_THEN_FAIL for
+   *     - non-SafeMode RemoteException, or
+   *     - non-IOException.
+   *     
+   * Note that dfs.client.retry.max < 0 is not allowed.
+   */
+  private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
+    final RetryPolicy multipleLinearRandomRetry = 
getMultipleLinearRandomRetry(conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
+    }
+
+    if (multipleLinearRandomRetry == null) {
+      //no retry
+      return RetryPolicies.TRY_ONCE_THEN_FAIL;
+    } else {
+      //use exponential backoff
+      return new RetryPolicy() {
+        @Override
+        public boolean shouldRetry(Exception e, int retries) throws Exception {
+          //see (1) and (2) in the javadoc of this method.
+          final RetryPolicy p;
+          if (e instanceof RemoteException) {
+            final RemoteException re = (RemoteException)e;
+            p = SafeModeException.class.getName().equals(re.getClassName())?
+                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
+          } else if (e instanceof IOException) {
+            p = multipleLinearRandomRetry;
+          } else { //non-IOException
+            p = RetryPolicies.TRY_ONCE_THEN_FAIL;
+          }
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("RETRY " + retries + ") policy="
+                + p.getClass().getSimpleName() + ", exception=" + e);
+          }
+          return p.shouldRetry(e, retries);
+        }
+      };
+    }
+  }
+
+  /**
+   * Return the MultipleLinearRandomRetry policy specified in the conf,
+   * or null if the feature is disabled.
+   * If the policy is specified in the conf but the policy cannot be parsed,
+   * the default policy is returned.
+   * 
+   * Conf property: N pairs of sleep-time and number-of-retries
+   *   dfs.client.retry.policy = "s1,n1,s2,n2,..."
+   */
+  private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
+    final boolean enabled = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
+    if (!enabled) {
+      return null;
+    }
+
+    final String policy = conf.get(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+
+    final RetryPolicy r = 
RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
+    return r != null? r: 
RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+  }
+
+  private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
+      Configuration conf) throws IOException {
+    //default policy
+    final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
+
+    //create policy
     RetryPolicy createPolicy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(
         5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
     
@@ -139,15 +222,15 @@ public class DFSClient implements FSCons
       new HashMap<Class<? extends Exception>, RetryPolicy>();
     exceptionToPolicyMap.put(RemoteException.class, 
         RetryPolicies.retryByRemoteException(
-            RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
+            defaultPolicy, remoteExceptionToPolicyMap));
     RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+        defaultPolicy, exceptionToPolicyMap);
     Map<String,RetryPolicy> methodNameToPolicyMap = new 
HashMap<String,RetryPolicy>();
     
     methodNameToPolicyMap.put("create", methodPolicy);
 
     return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
-        rpcNamenode, methodNameToPolicyMap);
+        rpcNamenode, defaultPolicy, methodNameToPolicyMap);
   }
 
   /** Create {@link ClientDatanodeProtocol} proxy with block/token */
@@ -244,7 +327,7 @@ public class DFSClient implements FSCons
 
     if (nameNodeAddr != null && rpcNamenode == null) {
       this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
-      this.namenode = createNamenode(this.rpcNamenode);
+      this.namenode = createNamenode(this.rpcNamenode, conf);
     } else if (nameNodeAddr == null && rpcNamenode != null) {
       //This case is used for testing.
       this.namenode = this.rpcNamenode = rpcNamenode;

Modified: 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
 Tue Jun 26 09:43:06 2012
@@ -36,6 +36,10 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
   public static final String  DFS_BYTES_PER_CHECKSUM_KEY = 
"dfs.bytes-per-checksum";
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = 
"dfs.client.retry.policy.enabled";
+  public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false; 
+  public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_KEY = 
"dfs.client.retry.policy.spec";
+  public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = 
"10000,6,60000,10"; //t1,n1,t2,n2,... 
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = 
"dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
   public static final String  DFS_CLIENT_USE_DN_HOSTNAME = 
"dfs.client.use.datanode.hostname";
@@ -118,7 +122,6 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_NAME_DIR_KEY = 
"dfs.namenode.name.dir";
   public static final String  DFS_NAMENODE_EDITS_DIR_KEY = 
"dfs.namenode.edits.dir";
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY = 
"dfs.client.read.prefetch.size"; 
-  public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= 
"dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = 
"dfs.metrics.session-id";
   public static final String  DFS_DATANODE_HOST_NAME_KEY = 
"dfs.datanode.hostname";
   public static final String  DFS_DATANODE_STORAGEID_KEY = 
"dfs.datanode.StorageId";

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
 Tue Jun 26 09:43:06 2012
@@ -583,27 +583,35 @@ public class MiniDFSCluster {
     }
   }
 
-  /**
-   * Restart namenode. Waits for exit from safemode.
-   */
-  public synchronized void restartNameNode()
-      throws IOException {
-    restartNameNode(true);
+  /** Same as restartNameNode(true, true). */
+  public synchronized void restartNameNode() throws IOException {
+    restartNameNode(true, true);
   }
   
+  /** Same as restartNameNode(waitSafemodeExit, true). */
+  public synchronized void restartNameNode(boolean waitSafemodeExit
+      ) throws IOException {
+    restartNameNode(waitSafemodeExit, true);
+  }
+
   /**
    * Restart namenode.
+   * 
+   * @param waitSafemodeExit Should it wait for safe mode to turn off?
+   * @param waitClusterActive Should it wait for cluster to be active?
+   * @throws IOException
    */
-  public synchronized void restartNameNode(boolean waitSafemodeExit)
-      throws IOException {
+  public synchronized void restartNameNode(boolean waitSafemodeExit,
+      boolean waitClusterActive) throws IOException {
     shutdownNameNode();
     nameNode = NameNode.createNameNode(new String[] {}, conf);
     if (waitSafemodeExit) {
       waitClusterUp();
     }
     System.out.println("Restarted the namenode");
+
     int failedCount = 0;
-    while (true) {
+    while(waitClusterActive) {
       try {
         waitActive();
         break;
@@ -618,7 +626,6 @@ public class MiniDFSCluster {
         }
       }
     }
-    System.out.println("Cluster is active");
   }
 
   /*
@@ -860,6 +867,7 @@ public class MiniDFSCluster {
     }
 
     client.close();
+    System.out.println("Cluster is active");
   }
 
   private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {

Modified: 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1353894&r1=1353893&r2=1353894&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
 (original)
+++ 
hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
 Tue Jun 26 09:43:06 2012
@@ -17,50 +17,66 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.net.SocketTimeoutException;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
-import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-
-import org.apache.hadoop.hdfs.server.common.*;
-import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
-import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
-
-import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-
-import junit.framework.TestCase;
-import static org.mockito.Mockito.*;
-import org.mockito.stubbing.Answer;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Level;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * These tests make sure that DFSClient retries fetching data from DFS
@@ -509,4 +525,148 @@ public class TestDFSClientRetries extend
       cluster.shutdown();
     }
   }
+
+  /** Test client retry with namenode restarting. */
+  public void testNamenodeRestart() throws Exception {
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+
+    final List<Exception> exceptions = new ArrayList<Exception>();
+
+    final Path dir = new Path("/testNamenodeRestart");
+
+    final Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+
+    final short numDatanodes = 3;
+    final MiniDFSCluster cluster = new MiniDFSCluster(
+        conf, numDatanodes, true, null);
+    try {
+      cluster.waitActive();
+
+      //create a file
+      final DistributedFileSystem dfs = 
(DistributedFileSystem)cluster.getFileSystem();
+      final long length = 1L << 20;
+      final Path file1 = new Path(dir, "foo"); 
+      DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L);
+
+      //get file status
+      final FileStatus s1 = dfs.getFileStatus(file1);
+      assertEquals(length, s1.getLen());
+
+      //shutdown namenode
+      cluster.shutdownNameNode();
+
+      //namenode is down, create another file in a thread
+      final Path file3 = new Path(dir, "file"); 
+      final Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //it should retry till namenode is up.
+            final FileSystem fs = 
AppendTestUtil.createHdfsWithDifferentUsername(conf);
+            DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L);
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      });
+      thread.start();
+ 
+      //restart namenode in a new thread
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //sleep, restart, and then wait active
+            TimeUnit.SECONDS.sleep(30);
+            cluster.restartNameNode(false, false);
+            cluster.waitActive();
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      }).start();
+
+      //namenode is down, it should retry until namenode is up again. 
+      final FileStatus s2 = dfs.getFileStatus(file1);
+      assertEquals(s1, s2);
+
+      //check file1 and file3
+      thread.join();
+      assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3));
+
+      //enter safe mode
+      dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      
+      //leave safe mode in a new thread
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            //sleep and then leave safe mode
+            TimeUnit.SECONDS.sleep(30);
+            dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+          } catch (Exception e) {
+            exceptions.add(e);
+          }
+        }
+      }).start();
+
+      //namenode is in safe mode, create should retry until it leaves safe 
mode.
+      final Path file2 = new Path(dir, "bar");
+      DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L);
+      assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2));
+      
+      //make sure it won't retry on exceptions like FileNotFoundException
+      final Path nonExisting = new Path(dir, "nonExisting");
+      LOG.info("setPermission: " + nonExisting);
+      try {
+        dfs.setPermission(nonExisting, new FsPermission((short)0));
+        fail();
+      } catch(FileNotFoundException fnfe) {
+        LOG.info("GOOD!", fnfe);
+      }
+
+      if (!exceptions.isEmpty()) {
+        LOG.error("There are " + exceptions.size() + " exception(s):");
+        for(int i = 0; i < exceptions.size(); i++) {
+          LOG.error("Exception " + i, exceptions.get(i));
+        }
+        fail();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  public void testMultipleLinearRandomRetry() {
+    parseMultipleLinearRandomRetry(null, "");
+    parseMultipleLinearRandomRetry(null, "11");
+    parseMultipleLinearRandomRetry(null, "11,22,33");
+    parseMultipleLinearRandomRetry(null, "11,22,33,44,55");
+    parseMultipleLinearRandomRetry(null, "AA");
+    parseMultipleLinearRandomRetry(null, "11,AA");
+    parseMultipleLinearRandomRetry(null, "11,22,33,FF");
+    parseMultipleLinearRandomRetry(null, "11,-22");
+    parseMultipleLinearRandomRetry(null, "-11,22");
+
+    parseMultipleLinearRandomRetry("[22x11ms]",
+        "11,22");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms]",
+        "11,22,33,44");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]",
+        "11,22,33,44,55,66");
+    parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]",
+        "   11,   22, 33,  44, 55,  66   ");
+  }
+  
+  static void parseMultipleLinearRandomRetry(String expected, String s) {
+    final MultipleLinearRandomRetry r = 
MultipleLinearRandomRetry.parseCommaSeparatedString(s);
+    LOG.info("input=" + s + ", parsed=" + r + ", expected=" + expected);
+    if (r == null) {
+      Assert.assertEquals(expected, null);
+    } else {
+      Assert.assertEquals("MultipleLinearRandomRetry" + expected, 
r.toString());
+    }
+  }
 }


Reply via email to