Author: szetszwo Date: Tue Jun 26 09:43:06 2012 New Revision: 1353894 URL: http://svn.apache.org/viewvc?rev=1353894&view=rev Log: svn merge -c 1353893 from branch-1 for HDFS-3504.
Modified: hadoop/common/branches/branch-1.1/ (props changed) hadoop/common/branches/branch-1.1/CHANGES.txt (contents, props changed) hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Propchange: hadoop/common/branches/branch-1.1/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/branch-1:r1353893 Modified: hadoop/common/branches/branch-1.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Jun 26 09:43:06 2012 @@ -103,6 +103,10 @@ Release 1.1.0 - unreleased HADOOP-8430. Backport new FileSystem methods introduced by HADOOP-8014. (eli) + HDFS-3504. Support configurable retry policy in DFSClient for RPC + connections and RPC calls, and add MultipleLinearRandomRetry, a new retry + policy. (szetszwo) + BUG FIXES MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can Propchange: hadoop/common/branches/branch-1.1/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1353893 Modified: hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original) +++ hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryInvocationHandler.java Tue Jun 26 09:43:06 2012 @@ -35,14 +35,17 @@ class RetryInvocationHandler implements private Map<String,RetryPolicy> methodNameToPolicyMap; public RetryInvocationHandler(Object implementation, RetryPolicy retryPolicy) { - this.implementation = implementation; - this.defaultPolicy = retryPolicy; - this.methodNameToPolicyMap = Collections.emptyMap(); + this(implementation, retryPolicy, Collections.<String, RetryPolicy>emptyMap()); } public RetryInvocationHandler(Object implementation, Map<String, RetryPolicy> methodNameToPolicyMap) { + this(implementation, RetryPolicies.TRY_ONCE_THEN_FAIL, methodNameToPolicyMap); + } + + public RetryInvocationHandler(Object implementation, + RetryPolicy defaultPolicy, Map<String, RetryPolicy> methodNameToPolicyMap) { this.implementation = implementation; - this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; + this.defaultPolicy = defaultPolicy; this.methodNameToPolicyMap = methodNameToPolicyMap; } Modified: hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java (original) +++ hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryPolicies.java Tue Jun 26 09:43:06 2012 @@ -17,13 +17,17 @@ */ package org.apache.hadoop.io.retry; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.Set; import java.util.Map.Entry; +import java.util.Random; import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.ipc.RemoteException; /** @@ -32,7 +36,15 @@ import org.apache.hadoop.ipc.RemoteExcep * </p> */ public class RetryPolicies { - + private static final Log LOG = LogFactory.getLog(RetryPolicies.class); + + private static ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() { + @Override + protected Random initialValue() { + return new Random(); + } + }; + /** * <p> * Try once, and fail by re-throwing the exception. @@ -139,17 +151,35 @@ public class RetryPolicies { } } + /** + * Retry up to maxRetries. + * The actual sleep time of the n-th retry is f(n, sleepTime), + * where f is a function provided by the subclass implementation. + * + * The object of the subclasses should be immutable; + * otherwise, the subclass must override hashCode(), equals(..) and toString(). + */ static abstract class RetryLimited implements RetryPolicy { - int maxRetries; - long sleepTime; - TimeUnit timeUnit; + final int maxRetries; + final long sleepTime; + final TimeUnit timeUnit; + + private String myString; - public RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) { + RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) { + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0"); + } + if (sleepTime < 0) { + throw new IllegalArgumentException("sleepTime = " + sleepTime + " < 0"); + } + this.maxRetries = maxRetries; this.sleepTime = sleepTime; this.timeUnit = timeUnit; } + @Override public boolean shouldRetry(Exception e, int retries) throws Exception { if (retries >= maxRetries) { throw e; @@ -163,6 +193,30 @@ public class RetryPolicies { } protected abstract long calculateSleepTime(int retries); + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(final Object that) { + if (this == that) { + return true; + } else if (that == null || this.getClass() != that.getClass()) { + return false; + } + return this.toString().equals(that.toString()); + } + + @Override + public String toString() { + if (myString == null) { + myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries + + ", sleepTime=" + sleepTime + " " + timeUnit + ")"; + } + return myString; + } } static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited { @@ -192,6 +246,169 @@ public class RetryPolicies { return sleepTime * (retries + 1); } } + + /** + * Given pairs of number of retries and sleep time (n0, t0), (n1, t1), ..., + * the first n0 retries sleep t0 milliseconds on average, + * the following n1 retries sleep t1 milliseconds on average, and so on. + * + * For all the sleep, the actual sleep time is randomly uniform distributed + * in the close interval [0.5t, 1.5t], where t is the sleep time specified. + * + * The objects of this class are immutable. + */ + public static class MultipleLinearRandomRetry implements RetryPolicy { + /** Pairs of numRetries and sleepSeconds */ + public static class Pair { + final int numRetries; + final int sleepMillis; + + public Pair(final int numRetries, final int sleepMillis) { + if (numRetries < 0) { + throw new IllegalArgumentException("numRetries = " + numRetries+" < 0"); + } + if (sleepMillis < 0) { + throw new IllegalArgumentException("sleepMillis = " + sleepMillis + " < 0"); + } + + this.numRetries = numRetries; + this.sleepMillis = sleepMillis; + } + + @Override + public String toString() { + return numRetries + "x" + sleepMillis + "ms"; + } + } + + private final List<Pair> pairs; + private String myString; + + public MultipleLinearRandomRetry(List<Pair> pairs) { + if (pairs == null || pairs.isEmpty()) { + throw new IllegalArgumentException("pairs must be neither null nor empty."); + } + this.pairs = Collections.unmodifiableList(pairs); + } + + @Override + public boolean shouldRetry(Exception e, int curRetry) throws Exception { + final Pair p = searchPair(curRetry); + if (p == null) { + //no more retries, re-throw the original exception. + throw e; + } + + //sleep and return true. + //If the sleep is interrupted, throw the InterruptedException out. + final double ratio = RANDOM.get().nextDouble() + 0.5;//0.5 <= ratio <=1.5 + Thread.sleep(Math.round(p.sleepMillis * ratio)); + return true; + } + + /** + * Given the current number of retry, search the corresponding pair. + * @return the corresponding pair, + * or null if the current number of retry > maximum number of retry. + */ + private Pair searchPair(int curRetry) { + int i = 0; + for(; i < pairs.size() && curRetry > pairs.get(i).numRetries; i++) { + curRetry -= pairs.get(i).numRetries; + } + return i == pairs.size()? null: pairs.get(i); + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(final Object that) { + if (this == that) { + return true; + } else if (that == null || this.getClass() != that.getClass()) { + return false; + } + return this.toString().equals(that.toString()); + } + + @Override + public String toString() { + if (myString == null) { + myString = getClass().getSimpleName() + pairs; + } + return myString; + } + + /** + * Parse the given string as a MultipleLinearRandomRetry object. + * The format of the string is "t_1, n_1, t_2, n_2, ...", + * where t_i and n_i are the i-th pair of sleep time and number of retires. + * Note that the white spaces in the string are ignored. + * + * @return the parsed object, or null if the parsing fails. + */ + public static MultipleLinearRandomRetry parseCommaSeparatedString(String s) { + final String[] elements = s.split(","); + if (elements.length == 0) { + LOG.warn("Illegal value: there is no element in \"" + s + "\"."); + return null; + } + if (elements.length % 2 != 0) { + LOG.warn("Illegal value: the number of elements in \"" + s + "\" is " + + elements.length + " but an even number of elements is expected."); + return null; + } + + final List<RetryPolicies.MultipleLinearRandomRetry.Pair> pairs + = new ArrayList<RetryPolicies.MultipleLinearRandomRetry.Pair>(); + + for(int i = 0; i < elements.length; ) { + //parse the i-th sleep-time + final int sleep = parsePositiveInt(elements, i++, s); + if (sleep == -1) { + return null; //parse fails + } + + //parse the i-th number-of-retries + final int retries = parsePositiveInt(elements, i++, s); + if (retries == -1) { + return null; //parse fails + } + + pairs.add(new RetryPolicies.MultipleLinearRandomRetry.Pair(retries, sleep)); + } + return new RetryPolicies.MultipleLinearRandomRetry(pairs); + } + + /** + * Parse the i-th element as an integer. + * @return -1 if the parsing fails or the parsed value <= 0; + * otherwise, return the parsed value. + */ + private static int parsePositiveInt(final String[] elements, + final int i, final String originalString) { + final String s = elements[i].trim(); + final int n; + try { + n = Integer.parseInt(s); + } catch(NumberFormatException nfe) { + LOG.warn("Failed to parse \"" + s + "\", which is the index " + i + + " element in \"" + originalString + "\"", nfe); + return -1; + } + + if (n <= 0) { + LOG.warn("The value " + n + " <= 0: it is parsed from the string \"" + + s + "\" which is the index " + i + " element in \"" + + originalString + "\""); + return -1; + } + return n; + } + } static class ExceptionDependentRetry implements RetryPolicy { @@ -248,6 +465,13 @@ public class RetryPolicies { public ExponentialBackoffRetry( int maxRetries, long sleepTime, TimeUnit timeUnit) { super(maxRetries, sleepTime, timeUnit); + + if (maxRetries < 0) { + throw new IllegalArgumentException("maxRetries = " + maxRetries + " < 0"); + } else if (maxRetries > 30) { + //if maxRetries > 30, calculateSleepTime will overflow. + throw new IllegalArgumentException("maxRetries = " + maxRetries + " > 30"); + } } @Override Modified: hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java (original) +++ hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/io/retry/RetryProxy.java Tue Jun 26 09:43:06 2012 @@ -65,4 +65,13 @@ public class RetryProxy { new RetryInvocationHandler(implementation, methodNameToPolicyMap) ); } + + public static Object create(Class<?> iface, Object implementation, + RetryPolicy defaultPolicy, Map<String,RetryPolicy> methodNameToPolicyMap) { + return Proxy.newProxyInstance( + implementation.getClass().getClassLoader(), + new Class<?>[] { iface }, + new RetryInvocationHandler(implementation, defaultPolicy, methodNameToPolicyMap) + ); + } } Modified: hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java (original) +++ hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/Client.java Tue Jun 26 09:43:06 2012 @@ -18,40 +18,41 @@ package org.apache.hadoop.ipc; -import java.net.InetAddress; -import java.net.Socket; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.net.ConnectException; - -import java.io.IOException; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.FilterInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; - +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.Hashtable; import java.util.Iterator; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; -import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; -import org.apache.commons.logging.*; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SaslRpcClient; @@ -60,8 +61,8 @@ import org.apache.hadoop.security.Securi import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.util.ReflectionUtils; /** A client for an IPC service. IPC calls take a single {@link Writable} as a @@ -71,7 +72,9 @@ import org.apache.hadoop.util.Reflection * @see Server */ public class Client { - + public static final String IPC_CLIENT_CONNECT_MAX_RETRIES_KEY = "ipc.client.connect.max.retries"; + public static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10; + public static final Log LOG = LogFactory.getLog(Client.class); private Hashtable<ConnectionId, Connection> connections = @@ -197,9 +200,10 @@ public class Client { private int rpcTimeout; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs - private int maxRetries; //the max. no. of retries for socket connections + private final RetryPolicy connectionRetryPolicy; private boolean tcpNoDelay; // if T then disable Nagle's Algorithm private int pingInterval; // how often sends ping to the server in msecs + // currently active calls private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); @@ -215,7 +219,7 @@ public class Client { remoteId.getAddress().getHostName()); } this.maxIdleTime = remoteId.getMaxIdleTime(); - this.maxRetries = remoteId.getMaxRetries(); + this.connectionRetryPolicy = remoteId.connectionRetryPolicy; this.tcpNoDelay = remoteId.getTcpNoDelay(); this.pingInterval = remoteId.getPingInterval(); if (LOG.isDebugEnabled()) { @@ -453,7 +457,7 @@ public class Client { if (updateAddress()) { timeoutFailures = ioFailures = 0; } - handleConnectionFailure(ioFailures++, maxRetries, ie); + handleConnectionFailure(ioFailures++, ie); } } } @@ -663,8 +667,26 @@ public class Client { Thread.sleep(1000); } catch (InterruptedException ignored) {} - LOG.info("Retrying connect to server: " + server + - ". Already tried " + curRetries + " time(s)."); + LOG.info("Retrying connect to server: " + server + ". Already tried " + + curRetries + " time(s); maxRetries=" + maxRetries); + } + + private void handleConnectionFailure(int curRetries, IOException ioe + ) throws IOException { + closeConnection(); + + final boolean retry; + try { + retry = connectionRetryPolicy.shouldRetry(ioe, curRetries); + } catch(Exception e) { + throw e instanceof IOException? (IOException)e: new IOException(e); + } + if (!retry) { + throw ioe; + } + + LOG.info("Retrying connect to server: " + server + ". Already tried " + + curRetries + " time(s); retry policy is " + connectionRetryPolicy); } /* Write the RPC header */ @@ -1220,14 +1242,15 @@ public class Client { private String serverPrincipal; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs - private int maxRetries; //the max. no. of retries for socket connections + private final RetryPolicy connectionRetryPolicy; private boolean tcpNoDelay; // if T then disable Nagle's Algorithm private int pingInterval; // how often sends ping to the server in msecs + ConnectionId(InetSocketAddress address, Class<?> protocol, UserGroupInformation ticket, int rpcTimeout, String serverPrincipal, int maxIdleTime, - int maxRetries, boolean tcpNoDelay, + RetryPolicy connectionRetryPolicy, boolean tcpNoDelay, int pingInterval) { this.protocol = protocol; this.address = address; @@ -1235,7 +1258,7 @@ public class Client { this.rpcTimeout = rpcTimeout; this.serverPrincipal = serverPrincipal; this.maxIdleTime = maxIdleTime; - this.maxRetries = maxRetries; + this.connectionRetryPolicy = connectionRetryPolicy; this.tcpNoDelay = tcpNoDelay; this.pingInterval = pingInterval; } @@ -1264,10 +1287,6 @@ public class Client { return maxIdleTime; } - int getMaxRetries() { - return maxRetries; - } - boolean getTcpNoDelay() { return tcpNoDelay; } @@ -1285,11 +1304,26 @@ public class Client { static ConnectionId getConnectionId(InetSocketAddress addr, Class<?> protocol, UserGroupInformation ticket, int rpcTimeout, Configuration conf) throws IOException { + return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf); + } + + static ConnectionId getConnectionId(InetSocketAddress addr, + Class<?> protocol, UserGroupInformation ticket, int rpcTimeout, + RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException { + + if (connectionRetryPolicy == null) { + final int max = conf.getInt( + IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT); + connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( + max, 1, TimeUnit.SECONDS); + } + String remotePrincipal = getRemotePrincipal(conf, addr, protocol); return new ConnectionId(addr, protocol, ticket, rpcTimeout, remotePrincipal, conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s - conf.getInt("ipc.client.connect.max.retries", 10), + connectionRetryPolicy, conf.getBoolean("ipc.client.tcpnodelay", false), Client.getPingInterval(conf)); } @@ -1326,7 +1360,7 @@ public class Client { ConnectionId that = (ConnectionId) obj; return isEqual(this.address, that.address) && this.maxIdleTime == that.maxIdleTime - && this.maxRetries == that.maxRetries + && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy) && this.pingInterval == that.pingInterval && isEqual(this.protocol, that.protocol) && this.rpcTimeout == that.rpcTimeout @@ -1339,10 +1373,9 @@ public class Client { @Override public int hashCode() { - int result = 1; + int result = connectionRetryPolicy.hashCode(); result = PRIME * result + ((address == null) ? 0 : address.hashCode()); result = PRIME * result + maxIdleTime; - result = PRIME * result + maxRetries; result = PRIME * result + pingInterval; result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode()); result = PRIME * rpcTimeout; Modified: hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java (original) +++ hadoop/common/branches/branch-1.1/src/core/org/apache/hadoop/ipc/RPC.java Tue Jun 26 09:43:06 2012 @@ -18,31 +18,35 @@ package org.apache.hadoop.ipc; -import java.lang.reflect.Proxy; -import java.lang.reflect.Method; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.lang.reflect.Array; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; - +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.io.*; -import java.util.Map; import java.util.HashMap; +import java.util.Map; import javax.net.SocketFactory; -import org.apache.commons.logging.*; - -import org.apache.hadoop.io.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.conf.*; - -import org.apache.hadoop.net.NetUtils; /** A simple RPC mechanism. * @@ -205,12 +209,12 @@ public class RPC { private Client client; private boolean isClosed = false; - public Invoker(Class<? extends VersionedProtocol> protocol, + private Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout) throws IOException { + int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, - ticket, rpcTimeout, conf); + ticket, rpcTimeout, connectionRetryPolicy, conf); this.client = CLIENTS.getClient(conf, factory); } @@ -385,14 +389,25 @@ public class RPC { Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { + return getProxy(protocol, clientVersion, addr, ticket, conf, factory, + rpcTimeout, null); + } + + /** Construct a client-side proxy object that implements the named protocol, + * talking to a server at the named address. */ + public static VersionedProtocol getProxy( + Class<? extends VersionedProtocol> protocol, + long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, + Configuration conf, SocketFactory factory, int rpcTimeout, + RetryPolicy connectionRetryPolicy) throws IOException { if (UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); } - VersionedProtocol proxy = - (VersionedProtocol) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); + final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, + rpcTimeout, connectionRetryPolicy); + VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[]{protocol}, invoker); long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion == clientVersion) { Modified: hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Tue Jun 26 09:43:06 2012 @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -115,7 +116,7 @@ public class DFSClient implements FSCons public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { return createNamenode(createRPCNamenode(nameNodeAddr, conf, - UserGroupInformation.getCurrentUser())); + UserGroupInformation.getCurrentUser()), conf); } private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr, @@ -123,11 +124,93 @@ public class DFSClient implements FSCons throws IOException { return (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, nameNodeAddr, ugi, conf, - NetUtils.getSocketFactory(conf, ClientProtocol.class)); + NetUtils.getSocketFactory(conf, ClientProtocol.class), 0, + getMultipleLinearRandomRetry(conf)); } - private static ClientProtocol createNamenode(ClientProtocol rpcNamenode) - throws IOException { + /** + * Return the default retry policy used in RPC. + * + * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL. + * + * Otherwise, + * (1) use multipleLinearRandomRetry for + * - SafeModeException, or + * - IOException other than RemoteException; and + * (2) use TRY_ONCE_THEN_FAIL for + * - non-SafeMode RemoteException, or + * - non-IOException. + * + * Note that dfs.client.retry.max < 0 is not allowed. + */ + private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) { + final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry); + } + + if (multipleLinearRandomRetry == null) { + //no retry + return RetryPolicies.TRY_ONCE_THEN_FAIL; + } else { + //use exponential backoff + return new RetryPolicy() { + @Override + public boolean shouldRetry(Exception e, int retries) throws Exception { + //see (1) and (2) in the javadoc of this method. + final RetryPolicy p; + if (e instanceof RemoteException) { + final RemoteException re = (RemoteException)e; + p = SafeModeException.class.getName().equals(re.getClassName())? + multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL; + } else if (e instanceof IOException) { + p = multipleLinearRandomRetry; + } else { //non-IOException + p = RetryPolicies.TRY_ONCE_THEN_FAIL; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("RETRY " + retries + ") policy=" + + p.getClass().getSimpleName() + ", exception=" + e); + } + return p.shouldRetry(e, retries); + } + }; + } + } + + /** + * Return the MultipleLinearRandomRetry policy specified in the conf, + * or null if the feature is disabled. + * If the policy is specified in the conf but the policy cannot be parsed, + * the default policy is returned. + * + * Conf property: N pairs of sleep-time and number-of-retries + * dfs.client.retry.policy = "s1,n1,s2,n2,..." + */ + private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) { + final boolean enabled = conf.getBoolean( + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT); + if (!enabled) { + return null; + } + + final String policy = conf.get( + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT); + + final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy); + return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString( + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT); + } + + private static ClientProtocol createNamenode(ClientProtocol rpcNamenode, + Configuration conf) throws IOException { + //default policy + final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf); + + //create policy RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); @@ -139,15 +222,15 @@ public class DFSClient implements FSCons new HashMap<Class<? extends Exception>, RetryPolicy>(); exceptionToPolicyMap.put(RemoteException.class, RetryPolicies.retryByRemoteException( - RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap)); + defaultPolicy, remoteExceptionToPolicyMap)); RetryPolicy methodPolicy = RetryPolicies.retryByException( - RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + defaultPolicy, exceptionToPolicyMap); Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); methodNameToPolicyMap.put("create", methodPolicy); return (ClientProtocol) RetryProxy.create(ClientProtocol.class, - rpcNamenode, methodNameToPolicyMap); + rpcNamenode, defaultPolicy, methodNameToPolicyMap); } /** Create {@link ClientDatanodeProtocol} proxy with block/token */ @@ -244,7 +327,7 @@ public class DFSClient implements FSCons if (nameNodeAddr != null && rpcNamenode == null) { this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi); - this.namenode = createNamenode(this.rpcNamenode); + this.namenode = createNamenode(this.rpcNamenode, conf); } else if (nameNodeAddr == null && rpcNamenode != null) { //This case is used for testing. this.namenode = this.rpcNamenode = rpcNamenode; Modified: hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Jun 26 09:43:06 2012 @@ -36,6 +36,10 @@ public class DFSConfigKeys extends Commo public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096; public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum"; public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512; + public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.client.retry.policy.enabled"; + public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false; + public static final String DFS_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.client.retry.policy.spec"; + public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname"; @@ -118,7 +122,6 @@ public class DFSConfigKeys extends Commo public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir"; public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir"; public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; - public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base"; public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id"; public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname"; public static final String DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId"; Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Jun 26 09:43:06 2012 @@ -583,27 +583,35 @@ public class MiniDFSCluster { } } - /** - * Restart namenode. Waits for exit from safemode. - */ - public synchronized void restartNameNode() - throws IOException { - restartNameNode(true); + /** Same as restartNameNode(true, true). */ + public synchronized void restartNameNode() throws IOException { + restartNameNode(true, true); } + /** Same as restartNameNode(waitSafemodeExit, true). */ + public synchronized void restartNameNode(boolean waitSafemodeExit + ) throws IOException { + restartNameNode(waitSafemodeExit, true); + } + /** * Restart namenode. + * + * @param waitSafemodeExit Should it wait for safe mode to turn off? + * @param waitClusterActive Should it wait for cluster to be active? + * @throws IOException */ - public synchronized void restartNameNode(boolean waitSafemodeExit) - throws IOException { + public synchronized void restartNameNode(boolean waitSafemodeExit, + boolean waitClusterActive) throws IOException { shutdownNameNode(); nameNode = NameNode.createNameNode(new String[] {}, conf); if (waitSafemodeExit) { waitClusterUp(); } System.out.println("Restarted the namenode"); + int failedCount = 0; - while (true) { + while(waitClusterActive) { try { waitActive(); break; @@ -618,7 +626,6 @@ public class MiniDFSCluster { } } } - System.out.println("Cluster is active"); } /* @@ -860,6 +867,7 @@ public class MiniDFSCluster { } client.close(); + System.out.println("Cluster is active"); } private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) { Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1353894&r1=1353893&r2=1353894&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original) +++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Tue Jun 26 09:43:06 2012 @@ -17,50 +17,66 @@ */ package org.apache.hadoop.hdfs; -import java.net.SocketTimeoutException; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.LongWritable; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetSocketAddress; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; -import java.net.SocketTimeoutException; +import java.util.concurrent.TimeUnit; + +import junit.framework.Assert; +import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient.DFSInputStream; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; - -import org.apache.hadoop.hdfs.server.common.*; -import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol; -import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.io.*; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; - -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; - -import junit.framework.TestCase; -import static org.mockito.Mockito.*; -import org.mockito.stubbing.Answer; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Level; import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * These tests make sure that DFSClient retries fetching data from DFS @@ -509,4 +525,148 @@ public class TestDFSClientRetries extend cluster.shutdown(); } } + + /** Test client retry with namenode restarting. */ + public void testNamenodeRestart() throws Exception { + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + + final List<Exception> exceptions = new ArrayList<Exception>(); + + final Path dir = new Path("/testNamenodeRestart"); + + final Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true); + + final short numDatanodes = 3; + final MiniDFSCluster cluster = new MiniDFSCluster( + conf, numDatanodes, true, null); + try { + cluster.waitActive(); + + //create a file + final DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); + final long length = 1L << 20; + final Path file1 = new Path(dir, "foo"); + DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L); + + //get file status + final FileStatus s1 = dfs.getFileStatus(file1); + assertEquals(length, s1.getLen()); + + //shutdown namenode + cluster.shutdownNameNode(); + + //namenode is down, create another file in a thread + final Path file3 = new Path(dir, "file"); + final Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + //it should retry till namenode is up. + final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf); + DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L); + } catch (Exception e) { + exceptions.add(e); + } + } + }); + thread.start(); + + //restart namenode in a new thread + new Thread(new Runnable() { + @Override + public void run() { + try { + //sleep, restart, and then wait active + TimeUnit.SECONDS.sleep(30); + cluster.restartNameNode(false, false); + cluster.waitActive(); + } catch (Exception e) { + exceptions.add(e); + } + } + }).start(); + + //namenode is down, it should retry until namenode is up again. + final FileStatus s2 = dfs.getFileStatus(file1); + assertEquals(s1, s2); + + //check file1 and file3 + thread.join(); + assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3)); + + //enter safe mode + dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + + //leave safe mode in a new thread + new Thread(new Runnable() { + @Override + public void run() { + try { + //sleep and then leave safe mode + TimeUnit.SECONDS.sleep(30); + dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + } catch (Exception e) { + exceptions.add(e); + } + } + }).start(); + + //namenode is in safe mode, create should retry until it leaves safe mode. + final Path file2 = new Path(dir, "bar"); + DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L); + assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2)); + + //make sure it won't retry on exceptions like FileNotFoundException + final Path nonExisting = new Path(dir, "nonExisting"); + LOG.info("setPermission: " + nonExisting); + try { + dfs.setPermission(nonExisting, new FsPermission((short)0)); + fail(); + } catch(FileNotFoundException fnfe) { + LOG.info("GOOD!", fnfe); + } + + if (!exceptions.isEmpty()) { + LOG.error("There are " + exceptions.size() + " exception(s):"); + for(int i = 0; i < exceptions.size(); i++) { + LOG.error("Exception " + i, exceptions.get(i)); + } + fail(); + } + } finally { + cluster.shutdown(); + } + } + + public void testMultipleLinearRandomRetry() { + parseMultipleLinearRandomRetry(null, ""); + parseMultipleLinearRandomRetry(null, "11"); + parseMultipleLinearRandomRetry(null, "11,22,33"); + parseMultipleLinearRandomRetry(null, "11,22,33,44,55"); + parseMultipleLinearRandomRetry(null, "AA"); + parseMultipleLinearRandomRetry(null, "11,AA"); + parseMultipleLinearRandomRetry(null, "11,22,33,FF"); + parseMultipleLinearRandomRetry(null, "11,-22"); + parseMultipleLinearRandomRetry(null, "-11,22"); + + parseMultipleLinearRandomRetry("[22x11ms]", + "11,22"); + parseMultipleLinearRandomRetry("[22x11ms, 44x33ms]", + "11,22,33,44"); + parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]", + "11,22,33,44,55,66"); + parseMultipleLinearRandomRetry("[22x11ms, 44x33ms, 66x55ms]", + " 11, 22, 33, 44, 55, 66 "); + } + + static void parseMultipleLinearRandomRetry(String expected, String s) { + final MultipleLinearRandomRetry r = MultipleLinearRandomRetry.parseCommaSeparatedString(s); + LOG.info("input=" + s + ", parsed=" + r + ", expected=" + expected); + if (r == null) { + Assert.assertEquals(expected, null); + } else { + Assert.assertEquals("MultipleLinearRandomRetry" + expected, r.toString()); + } + } }