[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r198652991
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/BlockDataManager.scala ---
@@ -43,6 +44,17 @@ trait BlockDataManager {
   level: StorageLevel,
   classTag: ClassTag[_]): Boolean
 
+  /**
+   * Put the given block that will be received as a stream.
+   *
+   * When this method is called, the data itself is not available -- it 
needs to be handled within
+   * the callbacks of streamData.
--- End diff --

Need to update comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r198657121
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1341,12 +1390,16 @@ private[spark] class BlockManager(
   try {
 val onePeerStartTime = System.nanoTime
 logTrace(s"Trying to replicate $blockId of ${data.size} bytes to 
$peer")
+// This thread keeps a lock on the block, so we do not want the 
netty thread to unlock
+// block when it finishes sending the message.
+val mb = new BlockManagerManagedBuffer(blockInfoManager, blockId, 
data, false,
--- End diff --

s/mb/buffer

Confusing in a place that deals with sizes all over.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r198652931
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+/**
+ * A request to Upload a block, which the destintation should receive as a 
stream.
+ *
+ * The actual block data is not contained here.  It is in the streamData 
in the RpcHandler.receive()
--- End diff --

Need to update to match API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r198656665
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +701,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can cause the process to use too much 
memory and get killed
+// by the OS / cluster manager (not a java OOM, since its a 
memory-mapped file) even though
--- End diff --

it's


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r198653427
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ---
@@ -73,10 +73,34 @@ class NettyBlockRpcServer(
 }
 val data = new 
NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
 val blockId = BlockId(uploadBlock.blockId)
+logInfo(s"Receiving replicated block $blockId with level ${level} 
" +
+  s"from ${client.getSocketAddress}")
 blockManager.putBlockData(blockId, data, level, classTag)
 responseContext.onSuccess(ByteBuffer.allocate(0))
 }
   }
 
+  override def receiveStream(
+  client: TransportClient,
+  messageHeader: ByteBuffer,
+  responseContext: RpcResponseCallback): StreamCallbackWithID = {
+val message = 
BlockTransferMessage.Decoder.fromByteBuffer(messageHeader)
+message match {
+  case uploadBlockStream: UploadBlockStream =>
+   val (level: StorageLevel, classTag: ClassTag[_]) = {
--- End diff --

Indentation is off here.

Using `.asInstanceOf[UploadBlockStream]` would achieve the same goal here 
with less indentation, just with a different exception...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r198656910
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -723,7 +770,9 @@ private[spark] class BlockManager(
   }
 
   if (data != null) {
-return Some(new ChunkedByteBuffer(data))
+val chunkSize =
+  conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", 
Int.MaxValue.toString).toInt
--- End diff --

Want to turn this into a config constant? I'm seeing it in a bunch of 
places.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-06-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r192796600
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

I'm gonna move discussion here https://github.com/apache/spark/pull/21346 
since that is the PR that will introduce this api


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-31 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r192279111
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

What about incorporating parameter `message` into parameter `streamData`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r192136490
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

yes, there are other ways to do this, but I wanted to leave the old code 
paths as close relatively untouched to minimize the behavior change / risk of 
bugs.  I also think its helpful to clearly separate out a portion that is read 
entirely into memory vs. the streaming portion, it makes it easier to work 
with.  Also InputStream suggests the data is getting pulled instead of pushed.

your earlier approach definitely gave a lot of inspiration for this change. 
 I'm hoping that making it a more isolated change helps us make progress here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-29 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r191628993
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

It's not necessary to add a parameter.  Change the message parameter to 
InputStream.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-29 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/21451

[SPARK-24296][CORE][WIP] Replicate large blocks as a stream.

When replicating large cached RDD blocks, it can be helpful to replicate
them as a stream, to avoid using large amounts of memory during the
transfer.  This also allows blocks larger than 2GB to be replicated.

Added unit tests in DistributedSuite.  Also ran tests on a cluster for
blocks > 2gb.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark clean_replication

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21451.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21451


commit 05967808f5440835919f02d6c5d0d3563482d304
Author: Imran Rashid 
Date:   2018-05-02T14:55:15Z

[SPARK-6237][NETWORK] Network-layer changes to allow stream upload.

These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks.

Added unit tests.

commit 43658df6d6b7dffacd528a2573e8846ab6469e81
Author: Imran Rashid 
Date:   2018-05-23T03:59:40Z

[SPARK-24307][CORE] Support reading remote cached partitions > 2gb

(1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.

(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
mapping the entire file in chunks.

Added unit tests.  Also tested on a cluster with remote cache reads >
2gb (in memory and on disk).

commit 7e517e4ea0ff66dc57121b54fdd71f8391edd8f2
Author: Imran Rashid 
Date:   2018-05-15T16:48:51Z

[SPARK-24296][CORE] Replicate large blocks as a stream.

When replicating large cached RDD blocks, it can be helpful to replicate
them as a stream, to avoid using large amounts of memory during the
transfer.  This also allows blocks larger than 2GB to be replicated.

Added unit tests in DistributedSuite.  Also ran tests on a cluster for
blocks > 2gb.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org