[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218942035
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -77,17 +82,54 @@
   private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
   private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
 
+  // Separate thread pool for handling ChunkFetchRequest. This helps to 
enable throttling
+  // max number of TransportServer worker threads that are blocked on 
writing response
+  // of ChunkFetchRequest message back to the client via the underlying 
channel.
+  private static EventLoopGroup chunkFetchWorkers;
+
   public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
-this(conf, rpcHandler, false);
+this(conf, rpcHandler, false, false);
   }
 
   public TransportContext(
   TransportConf conf,
   RpcHandler rpcHandler,
   boolean closeIdleConnections) {
+this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+/**
+ *
+ * @param conf TransportConf
+ * @param rpcHandler RpcHandler responsible for handling requests and 
responses.
+ * @param closeIdleConnections Close idle connections if it is set to 
true.
+ * @param isClientOnly This config is more important when external 
shuffle is enabled.
+ * It stops creating extra event loop and 
subsequent thread pool
+ * for shuffle clients to handle chunked fetch 
requests.
+ * In the case when external shuffle is disabled, 
the executors are both
+ * client and server so both share the same event 
loop which is trivial.
--- End diff --

I hope we follow a similar indentation for all other comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218939333
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -98,21 +98,32 @@ public TransportContext(
 this(conf, rpcHandler, closeIdleConnections, false);
   }
 
+/**
+ *
+ * @param conf TransportConf
+ * @param rpcHandler RpcHandler responsible for handling requests and 
responses.
+ * @param closeIdleConnections Close idle connections if is set to 
true.
+ * @param isClientOnly This config is more important when external 
shuffle is enabled.
--- End diff --

I think for comments we follow the same spacing convention as observed here 
so sticking with it...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218930135
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -77,17 +82,43 @@
   private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
   private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
 
+  // Separate thread pool for handling ChunkFetchRequest. This helps to 
enable throttling
+  // max number of TransportServer worker threads that are blocked on 
writing response
+  // of ChunkFetchRequest message back to the client via the underlying 
channel.
+  private static EventLoopGroup chunkFetchWorkers;
+
   public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
-this(conf, rpcHandler, false);
+this(conf, rpcHandler, false, false);
   }
 
   public TransportContext(
   TransportConf conf,
   RpcHandler rpcHandler,
   boolean closeIdleConnections) {
+this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public TransportContext(
+  TransportConf conf,
+  RpcHandler rpcHandler,
+  boolean closeIdleConnections,
+  boolean isClient) {
--- End diff --

sure... anything to make is more clear


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218926387
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -77,17 +82,43 @@
   private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
   private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
 
+  // Separate thread pool for handling ChunkFetchRequest. This helps to 
enable throttling
+  // max number of TransportServer worker threads that are blocked on 
writing response
+  // of ChunkFetchRequest message back to the client via the underlying 
channel.
+  private static EventLoopGroup chunkFetchWorkers;
+
   public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
-this(conf, rpcHandler, false);
+this(conf, rpcHandler, false, false);
   }
 
   public TransportContext(
   TransportConf conf,
   RpcHandler rpcHandler,
   boolean closeIdleConnections) {
+this(conf, rpcHandler, closeIdleConnections, false);
+  }
+
+  public TransportContext(
+  TransportConf conf,
+  RpcHandler rpcHandler,
+  boolean closeIdleConnections,
+  boolean isClient) {
--- End diff --

we should make this more clear.  Because with external shuffle off, we want 
this on client mode as well since its really both a client and server.  Perhaps 
we could change name to be isClientOnly, we should put some java docs on the 
function to describe the parameter as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218925291
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.channel.Channel;
+import org.apache.spark.network.server.ChunkFetchRequestHandler;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.*;
+import org.apache.spark.network.server.NoOpRpcHandler;
+import org.apache.spark.network.server.OneForOneStreamManager;
+import org.apache.spark.network.server.RpcHandler;
+
+public class ChunkFetchRequestHandlerSuite {
+
+  @Test
+  public void handleChunkFetchRequest() throws Exception {
+RpcHandler rpcHandler = new NoOpRpcHandler();
+OneForOneStreamManager streamManager = (OneForOneStreamManager) 
(rpcHandler.getStreamManager());
+Channel channel = mock(Channel.class);
+ChannelHandlerContext context = mock(ChannelHandlerContext.class);
+when(context.channel())
+.thenAnswer(invocationOnMock0 -> {
--- End diff --

change spacing to 2 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218924768
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
--- End diff --

remove extra empty line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218924588
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -173,5 +213,14 @@ private TransportChannelHandler 
createChannelHandler(Channel channel, RpcHandler
   conf.connectionTimeoutMs(), closeIdleConnections);
   }
 
+  /**
+   * Creates the dedicated ChannelHandler for ChunkFetchRequest messages.
+   */
+  private ChunkFetchRequestHandler 
createChunkFetchHandler(TransportChannelHandler channelHandler,
+  RpcHandler rpcHandler) {
+return new ChunkFetchRequestHandler(channelHandler.getClient(),
+rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred());
--- End diff --

identation 2 spaces inside return


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218874415
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +284,44 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Check if it is a shuffle client
+   * and avoid creating unnecessary event loops
+   * in the TransportChannelHandler
+   */
+  public boolean shuffleClient() {
--- End diff --

I think since this is so specialized and really only used from one place 
(the external shuffle service client) perhaps putting it here isn't that 
useful.  I think if we add a new constructor to the TransportContext with that 
optional parameter it might make more sense


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218812564
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +284,44 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Check if it is a shuffle client
+   * and avoid creating unnecessary event loops
+   * in the TransportChannelHandler
+   */
+  public boolean shuffleClient() {
+return this.isShuffleClient;
+  }
+
+  public void setShuffleClient(boolean isShuffleClient) {
+this.isShuffleClient = isShuffleClient;
+  }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of 
io.serverThreads
+   * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
+return this.serverThreads() > 0 ? (this.serverThreads() * 
chunkFetchHandlerThreadsPercent)/100:
+(2 * NettyRuntime.availableProcessors() * 
chunkFetchHandlerThreadsPercent)/100;
--- End diff --

fix indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218812503
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +284,44 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Check if it is a shuffle client
+   * and avoid creating unnecessary event loops
+   * in the TransportChannelHandler
+   */
+  public boolean shuffleClient() {
+return this.isShuffleClient;
+  }
+
+  public void setShuffleClient(boolean isShuffleClient) {
+this.isShuffleClient = isShuffleClient;
+  }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of 
io.serverThreads
+   * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
--- End diff --

fix indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218811827
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 ---
@@ -32,7 +41,6 @@
 import org.apache.spark.network.client.*;
 import org.apache.spark.network.protocol.*;
 import org.apache.spark.network.util.TransportFrameDecoder;
-
--- End diff --

put extra line back to keep separation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218811721
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 ---
@@ -24,6 +24,15 @@
 import com.google.common.base.Throwables;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+
+import org.apache.spark.network.protocol.OneWayMessage;
+import org.apache.spark.network.protocol.RpcFailure;
+import org.apache.spark.network.protocol.RpcRequest;
--- End diff --

these don't seem like should be necessary?  and in wrong location


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r21885
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()), cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, final 
ChunkFetchRequest msg)
+throws Exception {
+Channel channel = ctx.channel();
+if (logger.isTraceEnabled()) {
+  logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
+msg.streamChunkId);
+}
+long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+  logger.warn("The number of chunks being transferred {} is above {}, 
close the connection.",
+chunksBeingTransferred, maxChunksBeingTransferred);
+  channel.close();
+  return;
+}
+ManagedBuffer buf;
+try {
+  streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
+  streamManager.registerChannel(channel, msg.streamChunkId.streamId);
+  buf = streamManager.getChunk(msg.streamChunkId.streamId, 

[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218807868
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()), cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, final 
ChunkFetchRequest msg)
+throws Exception {
--- End diff --

indent this 2 more spaces


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218621553
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx,
+  final ChunkFetchRequest msg) throws 
Exception {
+Channel channel = ctx.channel();
+if (logger.isTraceEnabled()) {
+  logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
+  msg.streamChunkId);
+}
+long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+  logger.warn("The number of chunks being transferred {} is above {}, 
close the connection.",
+  chunksBeingTransferred, maxChunksBeingTransferred);
+  channel.close();
+  return;
+}
+ManagedBuffer buf;
+try {
+  streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
+  streamManager.registerChannel(channel, msg.streamChunkId.streamId);
+  buf = 

[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218590278
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx,
+  final ChunkFetchRequest msg) throws 
Exception {
+Channel channel = ctx.channel();
+if (logger.isTraceEnabled()) {
+  logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
+  msg.streamChunkId);
+}
+long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+  logger.warn("The number of chunks being transferred {} is above {}, 
close the connection.",
+  chunksBeingTransferred, maxChunksBeingTransferred);
+  channel.close();
+  return;
+}
+ManagedBuffer buf;
+try {
+  streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
+  streamManager.registerChannel(channel, msg.streamChunkId.streamId);
+  buf = 

[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218562181
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -88,6 +97,16 @@ public TransportContext(
 this.conf = conf;
 this.rpcHandler = rpcHandler;
 this.closeIdleConnections = closeIdleConnections;
+
+synchronized(this.getClass()) {
+  if (chunkFetchWorkers == null && conf.getModuleName() != null &&
+  conf.getModuleName().equalsIgnoreCase("shuffle")) {
+chunkFetchWorkers = NettyUtils.createEventLoop(
+IOMode.valueOf(conf.ioMode()),
+conf.chunkFetchHandlerThreads(),
+"chunk-fetch-handler");
--- End diff --

like you mention if we can not create the event loop when on the client 
side that would be best


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218559203
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of 
io.serverThreads
+   * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
--- End diff --

Yes it is documented above... if it is 0 or 100 it is 2*#cores or 
io.serverThreads


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218508374
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx,
+  final ChunkFetchRequest msg) throws 
Exception {
+Channel channel = ctx.channel();
+if (logger.isTraceEnabled()) {
+  logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
+  msg.streamChunkId);
+}
+long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+  logger.warn("The number of chunks being transferred {} is above {}, 
close the connection.",
+  chunksBeingTransferred, maxChunksBeingTransferred);
+  channel.close();
+  return;
+}
+ManagedBuffer buf;
+try {
+  streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
+  streamManager.registerChannel(channel, msg.streamChunkId.streamId);
+  buf = 

[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218466733
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
--- End diff --

this can actually be unwrapped here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218533001
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of 
io.serverThreads
+   * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
--- End diff --

fix spacing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218465153
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -88,6 +97,16 @@ public TransportContext(
 this.conf = conf;
 this.rpcHandler = rpcHandler;
 this.closeIdleConnections = closeIdleConnections;
+
+synchronized(this.getClass()) {
+  if (chunkFetchWorkers == null && conf.getModuleName() != null &&
+  conf.getModuleName().equalsIgnoreCase("shuffle")) {
+chunkFetchWorkers = NettyUtils.createEventLoop(
+IOMode.valueOf(conf.ioMode()),
+conf.chunkFetchHandlerThreads(),
+"chunk-fetch-handler");
--- End diff --

for consistency perhaps name thread shuffle-chunk-fetch-handler


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218468244
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx,
+  final ChunkFetchRequest msg) throws 
Exception {
+Channel channel = ctx.channel();
+if (logger.isTraceEnabled()) {
+  logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
+  msg.streamChunkId);
+}
+long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+  logger.warn("The number of chunks being transferred {} is above {}, 
close the connection.",
+  chunksBeingTransferred, maxChunksBeingTransferred);
+  channel.close();
+  return;
+}
+ManagedBuffer buf;
+try {
+  streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
+  streamManager.registerChannel(channel, msg.streamChunkId.streamId);
+  buf = 

[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218533702
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of 
io.serverThreads
+   * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
--- End diff --

what happens if this is actually 0?  you are going to get 0 
chunkFetchHandlerThreads so what does the event loop group do with that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218462401
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -88,6 +97,16 @@ public TransportContext(
 this.conf = conf;
 this.rpcHandler = rpcHandler;
 this.closeIdleConnections = closeIdleConnections;
+
+synchronized(this.getClass()) {
--- End diff --

I think synchronized(this.getClass()) is not recommended due to it not 
handling inheritance and such.  Use synchronized(TransportContext.class)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218467664
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx,
+  final ChunkFetchRequest msg) throws 
Exception {
--- End diff --

fix wrapping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218465817
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -144,14 +163,21 @@ public TransportChannelHandler initializePipeline(
   RpcHandler channelRpcHandler) {
 try {
   TransportChannelHandler channelHandler = 
createChannelHandler(channel, channelRpcHandler);
-  channel.pipeline()
+  ChunkFetchRequestHandler chunkFetchHandler =
+  createChunkFetchHandler(channelHandler, channelRpcHandler);
--- End diff --

fix spacing, only indented 2 spaces


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218490559
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -144,14 +163,21 @@ public TransportChannelHandler initializePipeline(
   RpcHandler channelRpcHandler) {
 try {
   TransportChannelHandler channelHandler = 
createChannelHandler(channel, channelRpcHandler);
-  channel.pipeline()
+  ChunkFetchRequestHandler chunkFetchHandler =
+  createChunkFetchHandler(channelHandler, channelRpcHandler);
+  ChannelPipeline pipeline = channel.pipeline()
 .addLast("encoder", ENCODER)
 .addLast(TransportFrameDecoder.HANDLER_NAME, 
NettyUtils.createFrameDecoder())
 .addLast("decoder", DECODER)
-.addLast("idleStateHandler", new IdleStateHandler(0, 0, 
conf.connectionTimeoutMs() / 1000))
+.addLast("idleStateHandler",
+new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 
1000))
--- End diff --

fix indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218494508
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 ---
@@ -24,15 +24,27 @@
 import com.google.common.base.Throwables;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.client.*;
-import org.apache.spark.network.protocol.*;
+import org.apache.spark.network.client.RpcResponseCallback;
--- End diff --

revert the imports to be .*


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218463768
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -88,6 +97,16 @@ public TransportContext(
 this.conf = conf;
 this.rpcHandler = rpcHandler;
 this.closeIdleConnections = closeIdleConnections;
+
+synchronized(this.getClass()) {
+  if (chunkFetchWorkers == null && conf.getModuleName() != null &&
+  conf.getModuleName().equalsIgnoreCase("shuffle")) {
--- End diff --

fix spacing here, line up conf.getModuleName with the chunkFetchWorkers


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-18 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218466397
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
--- End diff --

spacing 2, fix throughout the file


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-17 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r218163618
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx,
+  final ChunkFetchRequest msg) throws 
Exception {
+Channel channel = ctx.channel();
+if (logger.isTraceEnabled()) {
+  logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
+  msg.streamChunkId);
+}
+long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+  logger.warn("The number of chunks being transferred {} is above {}, 
close the connection.",
+  chunksBeingTransferred, maxChunksBeingTransferred);
+  channel.close();
+  return;
+}
+ManagedBuffer buf;
+try {
+  streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
+  streamManager.registerChannel(channel, msg.streamChunkId.streamId);
+  buf = 

[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216074852
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx,
+  final ChunkFetchRequest msg) throws 
Exception {
+Channel channel = ctx.channel();
+if (logger.isTraceEnabled()) {
+  logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
+  msg.streamChunkId);
+}
+long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+  logger.warn("The number of chunks being transferred {} is above {}, 
close the connection.",
+  chunksBeingTransferred, maxChunksBeingTransferred);
+  channel.close();
+  return;
+}
+ManagedBuffer buf;
+try {
+  streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
+  streamManager.registerChannel(channel, msg.streamChunkId.streamId);
+  buf = 

[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216069578
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of 
io.serverThreads
+   * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if(!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
+return this.serverThreads() > 0? (this.serverThreads() * 
chunkFetchHandlerThreadsPercent)/100:
--- End diff --

I think it is a good idea to document both as this is an important config. 
Let me know your thoughts


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216068689
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of 
io.serverThreads
--- End diff --

No based on how many threads required for other rpc calls, i have not 
tested them, but the whole point would be to reduce the dependency how much 
time the chunkFetchedRequests will be spending doing disk I/O


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216004740
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of 
io.serverThreads
+   * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if(!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
+return this.serverThreads() > 0? (this.serverThreads() * 
chunkFetchHandlerThreadsPercent)/100:
--- End diff --

space between 0 and ?, and space between 100 and :


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216063759
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.net.SocketAddress;
+
+import com.google.common.base.Throwables;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.protocol.ChunkFetchFailure;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.ChunkFetchSuccess;
+import org.apache.spark.network.protocol.Encodable;
+
+import static org.apache.spark.network.util.NettyUtils.*;
+
+
+/**
+ * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
+ * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
+ * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
+ * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
+ * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
+ * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
+ * result, it would leave no threads left to process other RPC messages, 
which takes much less
+ * time to process, and could lead to client timing out on either 
performing SASL authentication,
+ * registering executors, or waiting for response for an OpenBlocks 
messages.
+ */
+public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler {
+  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
+
+  private final TransportClient client;
+  private final StreamManager streamManager;
+  /** The max number of chunks being transferred and not finished yet. */
+  private final long maxChunksBeingTransferred;
+
+  public ChunkFetchRequestHandler(
+  TransportClient client,
+  StreamManager streamManager,
+  Long maxChunksBeingTransferred) {
+this.client = client;
+this.streamManager = streamManager;
+this.maxChunksBeingTransferred = maxChunksBeingTransferred;
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()),
+cause);
+ctx.close();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx,
+  final ChunkFetchRequest msg) throws 
Exception {
+Channel channel = ctx.channel();
+if (logger.isTraceEnabled()) {
+  logger.trace("Received req from {} to fetch block {}", 
getRemoteAddress(channel),
+  msg.streamChunkId);
+}
+long chunksBeingTransferred = streamManager.chunksBeingTransferred();
+if (chunksBeingTransferred >= maxChunksBeingTransferred) {
+  logger.warn("The number of chunks being transferred {} is above {}, 
close the connection.",
+  chunksBeingTransferred, maxChunksBeingTransferred);
+  channel.close();
+  return;
+}
+ManagedBuffer buf;
+try {
+  streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
+  streamManager.registerChannel(channel, msg.streamChunkId.streamId);
+  buf = 

[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216006120
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+
--- End diff --

remove extra line.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216057933
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of 
io.serverThreads
+   * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if(!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
+return this.serverThreads() > 0? (this.serverThreads() * 
chunkFetchHandlerThreadsPercent)/100:
--- End diff --

I assume we aren't documenting chunkFetchHandlerThreadsPercent since the 
serverthreads config isn't documented


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216005883
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of 
io.serverThreads
--- End diff --

I realize these are just examples but normally I would expect a user to 
have the threads processing chunk request much more then those doing other 
things so perhaps the example should be like 90%.

Were any tests done to see how many threads were needed for the others, I 
would expect very few?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216005916
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of 
io.serverThreads
+   * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if(!this.getModuleName().equalsIgnoreCase("shuffle")) {
+  return 0;
+}
+int chunkFetchHandlerThreadsPercent =
+
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
+return this.serverThreads() > 0? (this.serverThreads() * 
chunkFetchHandlerThreadsPercent)/100:
+(2* NettyRuntime.availableProcessors() * 
chunkFetchHandlerThreadsPercent)/100;
--- End diff --

space between 2 and *


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-09-07 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r216005137
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 ---
@@ -281,4 +282,31 @@ public Properties cryptoConf() {
   public long maxChunksBeingTransferred() {
 return conf.getLong("spark.shuffle.maxChunksBeingTransferred", 
Long.MAX_VALUE);
   }
+
+  /**
+   * Percentage of io.serverThreads used by netty to process 
ChunkFetchRequest.
+   * Shuffle server will use a separate EventLoopGroup to process 
ChunkFetchRequest messages.
+   * Although when calling the async writeAndFlush on the underlying 
channel to send
+   * response back to client, the I/O on the channel is still being 
handled by
+   * {@link org.apache.spark.network.server.TransportServer}'s default 
EventLoopGroup
+   * that's registered with the Channel, by waiting inside the 
ChunkFetchRequest handler
+   * threads for the completion of sending back responses, we are able to 
put a limit on
+   * the max number of threads from TransportServer's default 
EventLoopGroup that are
+   * going to be consumed by writing response to ChunkFetchRequest, which 
are I/O intensive
+   * and could take long time to process due to disk contentions. By 
configuring a slightly
+   * higher number of shuffler server threads, we are able to reserve some 
threads for
+   * handling other RPC messages, thus making the Client less likely to 
experience timeout
+   * when sending RPC messages to the shuffle server. Default to 0, which 
is 2*#cores
+   * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of 
io.serverThreads
+   * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads.
+   */
+  public int chunkFetchHandlerThreads() {
+if(!this.getModuleName().equalsIgnoreCase("shuffle")) {
--- End diff --

space after if before (


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-08-24 Thread redsanket
Github user redsanket commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r212639516
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -144,14 +161,17 @@ public TransportChannelHandler initializePipeline(
   RpcHandler channelRpcHandler) {
 try {
   TransportChannelHandler channelHandler = 
createChannelHandler(channel, channelRpcHandler);
+  ChunkFetchRequestHandler chunkFetchHandler = 
createChunkFetchHandler(channelHandler, channelRpcHandler);
   channel.pipeline()
 .addLast("encoder", ENCODER)
 .addLast(TransportFrameDecoder.HANDLER_NAME, 
NettyUtils.createFrameDecoder())
 .addLast("decoder", DECODER)
 .addLast("idleStateHandler", new IdleStateHandler(0, 0, 
conf.connectionTimeoutMs() / 1000))
 // NOTE: Chunks are currently guaranteed to be returned in the 
order of request, but this
 // would require more logic to guarantee if this were not part of 
the same event loop.
-.addLast("handler", channelHandler);
+.addLast("handler", channelHandler)
+// Use a separate EventLoopGroup to handle ChunkFetchRequest 
messages.
+.addLast(chunkFetchWorkers, "chunkFetchHandler", 
chunkFetchHandler);
--- End diff --

yes i did notice that... makes sense


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-08-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r212070669
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -144,14 +161,17 @@ public TransportChannelHandler initializePipeline(
   RpcHandler channelRpcHandler) {
 try {
   TransportChannelHandler channelHandler = 
createChannelHandler(channel, channelRpcHandler);
+  ChunkFetchRequestHandler chunkFetchHandler = 
createChunkFetchHandler(channelHandler, channelRpcHandler);
   channel.pipeline()
 .addLast("encoder", ENCODER)
 .addLast(TransportFrameDecoder.HANDLER_NAME, 
NettyUtils.createFrameDecoder())
 .addLast("decoder", DECODER)
 .addLast("idleStateHandler", new IdleStateHandler(0, 0, 
conf.connectionTimeoutMs() / 1000))
 // NOTE: Chunks are currently guaranteed to be returned in the 
order of request, but this
 // would require more logic to guarantee if this were not part of 
the same event loop.
-.addLast("handler", channelHandler);
+.addLast("handler", channelHandler)
+// Use a separate EventLoopGroup to handle ChunkFetchRequest 
messages.
+.addLast(chunkFetchWorkers, "chunkFetchHandler", 
chunkFetchHandler);
--- End diff --

Hmm... I think there is some waste here. Not all channels actually need the 
chunk fetch handler. Basically only the shuffle server (external or not) does. 
So for all other cases - RpcEnv server and clients, shuffle clients - you'd 
have this new thread pool just sitting there. Before you'd have the handler in 
the pipeline, but that's just a few bytes of memory being wasted.

It would be good to avoid that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...

2018-08-21 Thread redsanket
GitHub user redsanket opened a pull request:

https://github.com/apache/spark/pull/22173

[SPARK-24335] Spark external shuffle server improvement to better handle 
block fetch requests.

## What changes were proposed in this pull request?

This is a continuation PR from https://github.com/apache/spark/pull/21402
Since there is no activity, I am willing to take this over and made few 
minor changes and tested them.
Adding the description from the earlier PR

Description:
Right now, the default server side netty handler threads is 2 * # cores, 
and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server 
netty handler thread.
However, when the server netty handler threads start to process 
ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk 
contentions from the random read operations initiated by all the 
ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent 
ChunkFetchRequests, the server side netty handler threads could all be blocked 
on reading shuffle files, thus leaving no handler thread available to process 
other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty 
handler threads that could get blocked when processing ChunkFetchRequest. We 
have a patch to do this by using a separate EventLoopGroup with a dedicated 
ChannelHandler to process ChunkFetchRequest. This enables shuffle server to 
reserve netty handler threads for non-ChunkFetchRequest, thus enabling 
consistent processing time for these requests which are fast to process. After 
deploying the patch in our infrastructure, we no longer see timeout issues with 
either executor registration with local shuffle server or shuffle client 
establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

## How was this patch tested?

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/redsanket/spark SPARK-24335

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22173.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22173


commit 44bb55759a4059d8bb0e60c361a8a3210a234f92
Author: Sanket Chintapalli 
Date:   2018-08-21T17:34:31Z

SPARK-24355 Spark external shuffle server improvement to better handle 
block fetch requests.

commit 3bab74ca84fe1b6682000741b958c8792f792472
Author: Sanket Chintapalli 
Date:   2018-08-21T16:49:50Z

make chunk fetch handler threads as a percentage of transport server threads




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org