Author: jeagles Date: Wed Mar 6 00:12:09 2013 New Revision: 1453100 URL: http://svn.apache.org/r1453100 Log: MAPREDUCE-5027. Shuffle does not limit number of outstanding connections (Robert Parker via jeagles)
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1453100&r1=1453099&r2=1453100&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Mar 6 00:12:09 2013 @@ -25,6 +25,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-4989. JSONify DataTables input data for Attempts page (Ravi Prakash via jlowe) + MAPREDUCE-5027. Shuffle does not limit number of outstanding connections + (Robert Parker via jeagles) + OPTIMIZATIONS MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1453100&r1=1453099&r2=1453100&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Wed Mar 6 00:12:09 2013 @@ -322,6 +322,14 @@ </property> <property> + <name>mapreduce.shuffle.max.connections</name> + <value>0</value> + <description>Max allowed connections for the shuffle. Set to 0 (zero) + to indicate no limit on the number of connections. + </description> +</property> + +<property> <name>mapreduce.reduce.shuffle.retry-delay.max.ms</name> <value>60000</value> <description>The maximum number of ms the reducer will delay before retrying Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1453100&r1=1453099&r2=1453100&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed Mar 6 00:12:09 2013 @@ -85,6 +85,7 @@ import org.jboss.netty.channel.ChannelFu import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.DefaultFileRegion; import org.jboss.netty.channel.ExceptionEvent; @@ -122,6 +123,7 @@ public class ShuffleHandler extends Abst private int port; private ChannelFactory selector; private final ChannelGroup accepted = new DefaultChannelGroup(); + private int maxShuffleConnections; public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce.shuffle"; @@ -134,6 +136,11 @@ public class ShuffleHandler extends Abst public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; public static final int DEFAULT_SHUFFLE_PORT = 8080; + public static final String MAX_SHUFFLE_CONNECTIONS = + "mapreduce.shuffle.max.connections"; + // Default is no limit + public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; + @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @Metric("Shuffle output in bytes") @@ -239,6 +246,8 @@ public class ShuffleHandler extends Abst @Override public synchronized void init(Configuration conf) { + maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, + DEFAULT_MAX_SHUFFLE_CONNECTIONS); ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") .build(); @@ -345,6 +354,20 @@ public class ShuffleHandler extends Abst } @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) + throws Exception { + if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) { + LOG.info(String.format("Current number of shuffle connections (%d) is " + + "greater than or equal to the max allowed shuffle connections (%d)", + accepted.size(), maxShuffleConnections)); + evt.getChannel().close(); + return; + } + accepted.add(evt.getChannel()); + super.channelOpen(ctx, evt); + } + + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception { HttpRequest request = (HttpRequest) evt.getMessage(); @@ -553,6 +576,5 @@ public class ShuffleHandler extends Abst sendError(ctx, INTERNAL_SERVER_ERROR); } } - } } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1453100&r1=1453099&r2=1453100&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Wed Mar 6 00:12:09 2013 @@ -28,9 +28,12 @@ import static org.junit.Assert.assertEqu import java.io.DataInputStream; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.SocketException; import java.net.URL; import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; @@ -49,8 +52,10 @@ import org.junit.Test; public class TestShuffleHandler { static final long MiB = 1024 * 1024; + private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); - @Test public void testSerializeMeta() throws Exception { + @Test(timeout = 10000) + public void testSerializeMeta() throws Exception { assertEquals(1, ShuffleHandler.deserializeMetaData( ShuffleHandler.serializeMetaData(1))); assertEquals(-1, ShuffleHandler.deserializeMetaData( @@ -59,7 +64,8 @@ public class TestShuffleHandler { ShuffleHandler.serializeMetaData(8080))); } - @Test public void testShuffleMetrics() throws Exception { + @Test(timeout = 10000) + public void testShuffleMetrics() throws Exception { MetricsSystem ms = new MetricsSystemImpl(); ShuffleHandler sh = new ShuffleHandler(ms); ChannelFuture cf = make(stub(ChannelFuture.class). @@ -88,7 +94,7 @@ public class TestShuffleHandler { assertGauge("ShuffleConnections", connections, rb); } - @Test + @Test(timeout = 10000) public void testClientClosesConnection() throws Exception { final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); Configuration conf = new Configuration(); @@ -103,6 +109,7 @@ public class TestShuffleHandler { HttpRequest request, HttpResponse response, URL requestUri) throws IOException { } + @Override protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String jobId, String mapId, int reduce) @@ -120,6 +127,7 @@ public class TestShuffleHandler { } return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); } + @Override protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { @@ -128,6 +136,7 @@ public class TestShuffleHandler { ctx.getChannel().close(); } } + @Override protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { @@ -159,4 +168,82 @@ public class TestShuffleHandler { Assert.assertTrue("sendError called when client closed connection", failures.size() == 0); } + + @Test(timeout = 10000) + public void testMaxConnections() throws Exception { + + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String jobId, String mapId, int reduce) + throws IOException { + // send a shuffle header and a lot of data down the channel + // to trigger a broken pipe + ShuffleHeader header = new ShuffleHeader("dummy_header", 5678, + 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i = 0; i < 100000; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + }; + } + }; + shuffleHandler.init(conf); + shuffleHandler.start(); + // setup connections + Integer connAttempts = 3; + HttpURLConnection conns[] = new HttpURLConnection[connAttempts]; + for (Integer i = 0; i < connAttempts; i++) { + String URLstring = "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_" + + i.toString() + "_0"; + URL url = new URL(URLstring); + conns[i] = (HttpURLConnection) url.openConnection(); + } + // Try to open numerous connections + for (Integer i = 0; i < connAttempts; i++) { + conns[i].connect(); + } + //Check that first 2 connection are okay + conns[0].getInputStream(); + int rc = conns[0].getResponseCode(); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + Thread.sleep(100); + conns[1].getInputStream(); + rc = conns[1].getResponseCode(); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // This connection should be closed because it to above the limit + try { + conns[2].getInputStream(); + rc = conns[2].getResponseCode(); + Assert.fail("Expected a SocketException"); + } catch (SocketException se) { + LOG.info("Expected - connection should not be open"); + } catch (Exception e) { + Assert.fail("Expected a SocketException"); + } + shuffleHandler.stop(); + } }