Author: tgraves Date: Tue Apr 24 13:06:35 2012 New Revision: 1329696 URL: http://svn.apache.org/viewvc?rev=1329696&view=rev Log: MAPREDUCE-4079. Allow MR AppMaster to limit ephemeral port range.(bobby via tgraves)
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java?rev=1329696&r1=1329695&r2=1329696&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java Tue Apr 24 13:06:35 2012 @@ -940,11 +940,57 @@ public class Configuration implements It * bound may be omitted meaning all values up to or over. So the string * above means 2, 3, 5, and 7, 8, 9, ... */ - public static class IntegerRanges { + public static class IntegerRanges implements Iterable<Integer>{ private static class Range { int start; int end; } + + private static class RangeNumberIterator implements Iterator<Integer> { + Iterator<Range> internal; + int at; + int end; + + public RangeNumberIterator(List<Range> ranges) { + if (ranges != null) { + internal = ranges.iterator(); + } + at = -1; + end = -2; + } + + @Override + public boolean hasNext() { + if (at <= end) { + return true; + } else if (internal != null){ + return internal.hasNext(); + } + return false; + } + + @Override + public Integer next() { + if (at <= end) { + at++; + return at - 1; + } else if (internal != null){ + Range found = internal.next(); + if (found != null) { + at = found.start; + end = found.end; + at++; + return at - 1; + } + } + return null; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; List<Range> ranges = new ArrayList<Range>(); @@ -1003,6 +1049,13 @@ public class Configuration implements It return false; } + /** + * @return true if there are no values in this range, else false. + */ + public boolean isEmpty() { + return ranges == null || ranges.isEmpty(); + } + @Override public String toString() { StringBuilder result = new StringBuilder(); @@ -1019,6 +1072,12 @@ public class Configuration implements It } return result.toString(); } + + @Override + public Iterator<Integer> iterator() { + return new RangeNumberIterator(ranges); + } + } /** Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1329696&r1=1329695&r2=1329696&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java Tue Apr 24 13:06:35 2012 @@ -223,16 +223,19 @@ public class AvroRpcEngine implements Rp /** Construct a server for a protocol implementation instance listening on a * port and address. */ + @Override public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, - SecretManager<? extends TokenIdentifier> secretManager + SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig ) throws IOException { return ENGINE.getServer(TunnelProtocol.class, new TunnelResponder(iface, impl), bindAddress, port, numHandlers, numReaders, - queueSizePerHandler, verbose, conf, secretManager); + queueSizePerHandler, verbose, conf, secretManager, + portRangeConfig); } } Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1329696&r1=1329695&r2=1329696&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Tue Apr 24 13:06:35 2012 @@ -514,7 +514,8 @@ public class RPC { final boolean verbose, Configuration conf) throws IOException { return getServer(instance.getClass(), // use impl class for protocol - instance, bindAddress, port, numHandlers, false, conf, null); + instance, bindAddress, port, numHandlers, false, conf, null, + null); } /** Construct a server for a protocol implementation instance. */ @@ -522,7 +523,8 @@ public class RPC { Object instance, String bindAddress, int port, Configuration conf) throws IOException { - return getServer(protocol, instance, bindAddress, port, 1, false, conf, null); + return getServer(protocol, instance, bindAddress, port, 1, false, conf, null, + null); } /** Construct a server for a protocol implementation instance. @@ -536,7 +538,7 @@ public class RPC { throws IOException { return getServer(protocol, instance, bindAddress, port, numHandlers, verbose, - conf, null); + conf, null, null); } /** Construct a server for a protocol implementation instance. */ @@ -546,10 +548,20 @@ public class RPC { boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { - + return getServer(protocol, instance, bindAddress, port, numHandlers, verbose, + conf, secretManager, null); + } + + public static Server getServer(Class<?> protocol, + Object instance, String bindAddress, int port, + int numHandlers, + boolean verbose, Configuration conf, + SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig) + throws IOException { return getProtocolEngine(protocol, conf) .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1, - verbose, conf, secretManager); + verbose, conf, secretManager, portRangeConfig); } /** Construct a server for a protocol implementation instance. */ @@ -562,7 +574,8 @@ public class RPC { return getProtocolEngine(protocol, conf) .getServer(protocol, instance, bindAddress, port, numHandlers, - numReaders, queueSizePerHandler, verbose, conf, secretManager); + numReaders, queueSizePerHandler, verbose, conf, secretManager, + null); } /** An RPC Server. */ @@ -572,9 +585,10 @@ public class RPC { Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, - SecretManager<? extends TokenIdentifier> secretManager) throws IOException { + SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig) throws IOException { super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, - conf, serverName, secretManager); + conf, serverName, secretManager, portRangeConfig); } } Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1329696&r1=1329695&r2=1329696&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java Tue Apr 24 13:06:35 2012 @@ -49,12 +49,30 @@ public interface RpcEngine { UserGroupInformation ticket, Configuration conf) throws IOException, InterruptedException; - /** Construct a server for a protocol implementation instance. */ + /** + * Construct a server for a protocol implementation instance. + * + * @param protocol the class of protocol to use + * @param instance the instance of protocol whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param numReaders the number of reader threads to run + * @param queueSizePerHandler the size of the queue per hander thread + * @param verbose whether each call should be logged + * @param secretManager The secret manager to use to validate incoming requests. + * @param portRangeConfig A config parameter that can be used to restrict + * the range of ports used when port is 0 (an ephemeral port) + * @return The Server instance + * @throws IOException on any error + */ RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, - SecretManager<? extends TokenIdentifier> secretManager + SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig ) throws IOException; } Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1329696&r1=1329695&r2=1329696&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Tue Apr 24 13:06:35 2012 @@ -60,6 +60,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.BytesWritable; @@ -193,6 +194,7 @@ public abstract class Server { protected RpcDetailedMetrics rpcDetailedMetrics; private Configuration conf; + private String portRangeConfig = null; private SecretManager<TokenIdentifier> secretManager; private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager(); @@ -225,8 +227,33 @@ public abstract class Server { */ public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) throws IOException { + bind(socket, address, backlog, null, null); + } + + public static void bind(ServerSocket socket, InetSocketAddress address, + int backlog, Configuration conf, String rangeConf) throws IOException { try { - socket.bind(address, backlog); + IntegerRanges range = null; + if (rangeConf != null) { + range = conf.getRange(rangeConf, ""); + } + if (range == null || range.isEmpty() || (address.getPort() != 0)) { + socket.bind(address, backlog); + } else { + for (Integer port : range) { + if (socket.isBound()) break; + try { + InetSocketAddress temp = new InetSocketAddress(address.getAddress(), + port); + socket.bind(temp, backlog); + } catch(BindException e) { + //Ignored + } + } + if (!socket.isBound()) { + throw new BindException("Could not find a free port in "+range); + } + } } catch (SocketException e) { throw NetUtils.wrapException(null, 0, @@ -310,7 +337,7 @@ public abstract class Server { acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port - bind(acceptChannel.socket(), address, backlogLength); + bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); @@ -1543,9 +1570,18 @@ public abstract class Server { Configuration conf) throws IOException { - this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null); + this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null, null); } + protected Server(String bindAddress, int port, + Class<? extends Writable> rpcRequestClass, int handlerCount, + int numReaders, int queueSizePerHandler, Configuration conf, + String serverName, SecretManager<? extends TokenIdentifier> secretManager) + throws IOException { + this(bindAddress, port, rpcRequestClass, handlerCount, numReaders, + queueSizePerHandler, conf, serverName, secretManager, null); + } + /** Constructs a server listening on the named port and address. Parameters passed must * be of the named class. The <code>handlerCount</handlerCount> determines * the number of handler threads that will be used to process calls. @@ -1554,11 +1590,13 @@ public abstract class Server { */ @SuppressWarnings("unchecked") protected Server(String bindAddress, int port, - Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler, - Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) - throws IOException { + Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler, + Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig) + throws IOException { this.bindAddress = bindAddress; this.conf = conf; + this.portRangeConfig = portRangeConfig; this.port = port; this.paramClass = paramClass; this.handlerCount = handlerCount; Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1329696&r1=1329695&r2=1329696&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Tue Apr 24 13:06:35 2012 @@ -282,6 +282,18 @@ public class WritableRpcEngine implement return new Server(instance, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager); } + + @Override + public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol, + Object instance, String bindAddress, int port, int numHandlers, + int numReaders, int queueSizePerHandler, boolean verbose, + Configuration conf, + SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig) throws IOException { + return new Server(instance, conf, bindAddress, port, numHandlers, + numReaders, queueSizePerHandler, verbose, secretManager, + portRangeConfig); + } /** An RPC Server. */ public static class Server extends RPC.Server { @@ -316,12 +328,22 @@ public class WritableRpcEngine implement * @param verbose whether each call should be logged */ public Server(Object instance, Configuration conf, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, + SecretManager<? extends TokenIdentifier> secretManager) + throws IOException { + this(instance, conf, bindAddress, port, numHandlers, numReaders, + queueSizePerHandler, verbose, secretManager, null); + } + + public Server(Object instance, Configuration conf, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, - SecretManager<? extends TokenIdentifier> secretManager) + SecretManager<? extends TokenIdentifier> secretManager, + String portRangeConfig) throws IOException { super(bindAddress, port, Invocation.class, numHandlers, numReaders, queueSizePerHandler, conf, - classNameBase(instance.getClass().getName()), secretManager); + classNameBase(instance.getClass().getName()), secretManager, + portRangeConfig); this.instance = instance; this.verbose = verbose; } Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java?rev=1329696&r1=1329695&r2=1329696&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java Tue Apr 24 13:06:35 2012 @@ -25,16 +25,20 @@ import java.io.IOException; import java.io.StringWriter; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.regex.Pattern; import junit.framework.TestCase; import static org.junit.Assert.assertArrayEquals; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; import org.codehaus.jackson.map.ObjectMapper; @@ -362,6 +366,35 @@ public class TestConfiguration extends T assertEquals(true, range.isIncluded(34)); assertEquals(true, range.isIncluded(100000000)); } + + public void testGetRangeIterator() throws Exception { + Configuration config = new Configuration(false); + IntegerRanges ranges = config.getRange("Test", ""); + assertFalse("Empty range has values", ranges.iterator().hasNext()); + ranges = config.getRange("Test", "5"); + Set<Integer> expected = new HashSet<Integer>(Arrays.asList(5)); + Set<Integer> found = new HashSet<Integer>(); + for(Integer i: ranges) { + found.add(i); + } + assertEquals(expected, found); + + ranges = config.getRange("Test", "5-10,13-14"); + expected = new HashSet<Integer>(Arrays.asList(5,6,7,8,9,10,13,14)); + found = new HashSet<Integer>(); + for(Integer i: ranges) { + found.add(i); + } + assertEquals(expected, found); + + ranges = config.getRange("Test", "8-12, 5- 7"); + expected = new HashSet<Integer>(Arrays.asList(5,6,7,8,9,10,11,12)); + found = new HashSet<Integer>(); + for(Integer i: ranges) { + found.add(i); + } + assertEquals(expected, found); + } public void testHexValues() throws IOException{ out=new BufferedWriter(new FileWriter(CONFIG));