Repository: spark
Updated Branches:
  refs/heads/master 35f7f5ce8 -> 99d2e4e00


[SPARK-24296][CORE] Replicate large blocks as a stream.

When replicating large cached RDD blocks, it can be helpful to replicate
them as a stream, to avoid using large amounts of memory during the
transfer.  This also allows blocks larger than 2GB to be replicated.

Added unit tests in DistributedSuite.  Also ran tests on a cluster for
blocks > 2gb.

Closes #21451 from squito/clean_replication.

Authored-by: Imran Rashid <iras...@cloudera.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99d2e4e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99d2e4e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99d2e4e0

Branch: refs/heads/master
Commit: 99d2e4e00711cffbfaee8cb3da9b6b3feab8ff18
Parents: 35f7f5c
Author: Imran Rashid <iras...@cloudera.com>
Authored: Tue Aug 21 11:26:41 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Aug 21 11:26:41 2018 -0700

----------------------------------------------------------------------
 .../network/server/TransportRequestHandler.java |  2 +-
 .../shuffle/protocol/BlockTransferMessage.java  |  3 +-
 .../shuffle/protocol/UploadBlockStream.java     | 89 ++++++++++++++++++++
 .../org/apache/spark/executor/Executor.scala    |  4 +-
 .../apache/spark/internal/config/package.scala  |  7 ++
 .../apache/spark/network/BlockDataManager.scala | 12 +++
 .../network/netty/NettyBlockRpcServer.scala     | 26 +++++-
 .../netty/NettyBlockTransferService.scala       | 39 +++++----
 .../org/apache/spark/storage/BlockManager.scala | 66 ++++++++++++++-
 .../storage/BlockManagerManagedBuffer.scala     |  7 +-
 .../org/apache/spark/storage/DiskStore.scala    |  5 +-
 .../spark/util/io/ChunkedByteBuffer.scala       |  4 +
 .../org/apache/spark/DistributedSuite.scala     | 25 +++++-
 .../spark/security/EncryptionFunSuite.scala     | 12 ++-
 .../apache/spark/storage/DiskStoreSuite.scala   |  3 +-
 project/MimaExcludes.scala                      |  3 +-
 16 files changed, 270 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index c6fd56b..9fac96d 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -234,7 +234,7 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
              callback.onSuccess(ByteBuffer.allocate(0));
            } catch (Exception ex) {
              IOException ioExc = new IOException("Failure post-processing 
complete stream;" +
-               " failing this rpc and leaving channel active");
+               " failing this rpc and leaving channel active", ex);
              callback.onFailure(ioExc);
              streamHandler.onFailure(streamId, ioExc);
            }

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 9af6759..a68a297 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -42,7 +42,7 @@ public abstract class BlockTransferMessage implements 
Encodable {
   /** Preceding every serialized message is its type, which allows us to 
deserialize it. */
   public enum Type {
     OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), 
REGISTER_DRIVER(4),
-    HEARTBEAT(5);
+    HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6);
 
     private final byte id;
 
@@ -67,6 +67,7 @@ public abstract class BlockTransferMessage implements 
Encodable {
         case 3: return StreamHandle.decode(buf);
         case 4: return RegisterDriver.decode(buf);
         case 5: return ShuffleServiceHeartbeat.decode(buf);
+        case 6: return UploadBlockStream.decode(buf);
         default: throw new IllegalArgumentException("Unknown message type: " + 
type);
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java
----------------------------------------------------------------------
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java
new file mode 100644
index 0000000..9df3096
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+/**
+ * A request to Upload a block, which the destination should receive as a 
stream.
+ *
+ * The actual block data is not contained here.  It will be passed to the 
StreamCallbackWithID
+ * that is returned from RpcHandler.receiveStream()
+ */
+public class UploadBlockStream extends BlockTransferMessage {
+  public final String blockId;
+  public final byte[] metadata;
+
+  public UploadBlockStream(String blockId, byte[] metadata) {
+    this.blockId = blockId;
+    this.metadata = metadata;
+  }
+
+  @Override
+  protected Type type() { return Type.UPLOAD_BLOCK_STREAM; }
+
+  @Override
+  public int hashCode() {
+    int objectsHashCode = Objects.hashCode(blockId);
+    return objectsHashCode * 41 + Arrays.hashCode(metadata);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("blockId", blockId)
+      .add("metadata size", metadata.length)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof UploadBlockStream) {
+      UploadBlockStream o = (UploadBlockStream) other;
+      return Objects.equal(blockId, o.blockId)
+        && Arrays.equals(metadata, o.metadata);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(blockId)
+      + Encoders.ByteArrays.encodedLength(metadata);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, blockId);
+    Encoders.ByteArrays.encode(buf, metadata);
+  }
+
+  public static UploadBlockStream decode(ByteBuf buf) {
+    String blockId = Encoders.Strings.decode(buf);
+    byte[] metadata = Encoders.ByteArrays.decode(buf);
+    return new UploadBlockStream(blockId, metadata);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b1856ff..86b1957 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -363,14 +363,14 @@ private[spark] class Executor(
           threadMXBean.getCurrentThreadCpuTime
         } else 0L
         var threwException = true
-        val value = try {
+        val value = Utils.tryWithSafeFinally {
           val res = task.run(
             taskAttemptId = taskId,
             attemptNumber = taskDescription.attemptNumber,
             metricsSystem = env.metricsSystem)
           threwException = false
           res
-        } finally {
+        } {
           val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
           val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a8aa691..daf3f07 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -568,6 +568,13 @@ package object config {
       .checkValue(v => v > 0, "The value should be a positive integer.")
       .createWithDefault(2000)
 
+  private[spark] val MEMORY_MAP_LIMIT_FOR_TESTS =
+    ConfigBuilder("spark.storage.memoryMapLimitForTests")
+      .internal()
+      .doc("For testing only, controls the size of chunks when memory mapping 
a file")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Int.MaxValue)
+
   private[spark] val BARRIER_SYNC_TIMEOUT =
     ConfigBuilder("spark.barrier.sync.timeout")
       .doc("The timeout in seconds for each barrier() call from a barrier 
task. If the " +

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala 
b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index b3f8bfe..e94a012 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark.network
 import scala.reflect.ClassTag
 
 import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.client.StreamCallbackWithID
 import org.apache.spark.storage.{BlockId, StorageLevel}
 
 private[spark]
@@ -44,6 +45,17 @@ trait BlockDataManager {
       classTag: ClassTag[_]): Boolean
 
   /**
+   * Put the given block that will be received as a stream.
+   *
+   * When this method is called, the block data itself is not available -- it 
will be passed to the
+   * returned StreamCallbackWithID.
+   */
+  def putBlockDataAsStream(
+      blockId: BlockId,
+      level: StorageLevel,
+      classTag: ClassTag[_]): StreamCallbackWithID
+
+  /**
    * Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
    */
   def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index eb4cf94..7076701 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -26,9 +26,9 @@ import scala.reflect.ClassTag
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.BlockDataManager
 import org.apache.spark.network.buffer.NioManagedBuffer
-import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.client.{RpcResponseCallback, 
StreamCallbackWithID, TransportClient}
 import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, 
StreamManager}
-import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, 
OpenBlocks, StreamHandle, UploadBlock}
+import org.apache.spark.network.shuffle.protocol._
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BlockId, StorageLevel}
 
@@ -73,10 +73,32 @@ class NettyBlockRpcServer(
         }
         val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
         val blockId = BlockId(uploadBlock.blockId)
+        logDebug(s"Receiving replicated block $blockId with level ${level} " +
+          s"from ${client.getSocketAddress}")
         blockManager.putBlockData(blockId, data, level, classTag)
         responseContext.onSuccess(ByteBuffer.allocate(0))
     }
   }
 
+  override def receiveStream(
+      client: TransportClient,
+      messageHeader: ByteBuffer,
+      responseContext: RpcResponseCallback): StreamCallbackWithID = {
+    val message =
+      
BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream]
+    val (level: StorageLevel, classTag: ClassTag[_]) = {
+      serializer
+        .newInstance()
+        .deserialize(ByteBuffer.wrap(message.metadata))
+        .asInstanceOf[(StorageLevel, ClassTag[_])]
+    }
+    val blockId = BlockId(message.blockId)
+    logDebug(s"Receiving replicated block $blockId with level ${level} as 
stream " +
+      s"from ${client.getSocketAddress}")
+    // This will return immediately, but will setup a callback on streamData 
which will still
+    // do all the processing in the netty thread.
+    blockManager.putBlockDataAsStream(blockId, level, classTag)
+  }
+
   override def getStreamManager(): StreamManager = streamManager
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index b7d8c35..1905632 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -27,13 +27,14 @@ import scala.reflect.ClassTag
 import com.codahale.metrics.{Metric, MetricSet}
 
 import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.internal.config
 import org.apache.spark.network._
-import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.client.{RpcResponseCallback, 
TransportClientBootstrap, TransportClientFactory}
 import org.apache.spark.network.crypto.{AuthClientBootstrap, 
AuthServerBootstrap}
 import org.apache.spark.network.server._
 import org.apache.spark.network.shuffle.{BlockFetchingListener, 
OneForOneBlockFetcher, RetryingBlockFetcher, TempFileManager}
-import org.apache.spark.network.shuffle.protocol.UploadBlock
+import org.apache.spark.network.shuffle.protocol.{UploadBlock, 
UploadBlockStream}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -148,20 +149,28 @@ private[spark] class NettyBlockTransferService(
     // Everything else is encoded using our binary protocol.
     val metadata = 
JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
 
-    // Convert or copy nio buffer into array in order to serialize it.
-    val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
+    val asStream = blockData.size() > 
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
+    val callback = new RpcResponseCallback {
+      override def onSuccess(response: ByteBuffer): Unit = {
+        logTrace(s"Successfully uploaded block $blockId${if (asStream) " as 
stream" else ""}")
+        result.success((): Unit)
+      }
 
-    client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, 
array).toByteBuffer,
-      new RpcResponseCallback {
-        override def onSuccess(response: ByteBuffer): Unit = {
-          logTrace(s"Successfully uploaded block $blockId")
-          result.success((): Unit)
-        }
-        override def onFailure(e: Throwable): Unit = {
-          logError(s"Error while uploading block $blockId", e)
-          result.failure(e)
-        }
-      })
+      override def onFailure(e: Throwable): Unit = {
+        logError(s"Error while uploading $blockId${if (asStream) " as stream" 
else ""}", e)
+        result.failure(e)
+      }
+    }
+    if (asStream) {
+      val streamHeader = new UploadBlockStream(blockId.name, 
metadata).toByteBuffer
+      client.uploadStream(new NioManagedBuffer(streamHeader), blockData, 
callback)
+    } else {
+      // Convert or copy nio buffer into array in order to serialize it.
+      val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
+
+      client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, 
array).toByteBuffer,
+        callback)
+    }
 
     result.future
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 5cd21e3..e7cdfab 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -41,6 +41,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.client.StreamCallbackWithID
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.{ExternalShuffleClient, 
TempFileManager}
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
@@ -406,6 +407,63 @@ private[spark] class BlockManager(
     putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), 
level)(classTag)
   }
 
+  override def putBlockDataAsStream(
+      blockId: BlockId,
+      level: StorageLevel,
+      classTag: ClassTag[_]): StreamCallbackWithID = {
+    // TODO if we're going to only put the data in the disk store, we should 
just write it directly
+    // to the final location, but that would require a deeper refactor of this 
code.  So instead
+    // we just write to a temp file, and call putBytes on the data in that 
file.
+    val tmpFile = diskBlockManager.createTempLocalBlock()._2
+    val channel = new CountingWritableChannel(
+      Channels.newChannel(serializerManager.wrapForEncryption(new 
FileOutputStream(tmpFile))))
+    logTrace(s"Streaming block $blockId to tmp file $tmpFile")
+    new StreamCallbackWithID {
+
+      override def getID: String = blockId.name
+
+      override def onData(streamId: String, buf: ByteBuffer): Unit = {
+        while (buf.hasRemaining) {
+          channel.write(buf)
+        }
+      }
+
+      override def onComplete(streamId: String): Unit = {
+        logTrace(s"Done receiving block $blockId, now putting into local 
blockManager")
+        // Read the contents of the downloaded file as a buffer to put into 
the blockManager.
+        // Note this is all happening inside the netty thread as soon as it 
reads the end of the
+        // stream.
+        channel.close()
+        // TODO SPARK-25035 Even if we're only going to write the data to disk 
after this, we end up
+        // using a lot of memory here.  With encryption, we'll read the whole 
file into a regular
+        // byte buffer and OOM.  Without encryption, we'll memory map the file 
and won't get a jvm
+        // OOM, but might get killed by the OS / cluster manager.  We could at 
least read the tmp
+        // file as a stream in both cases.
+        val buffer = securityManager.getIOEncryptionKey() match {
+          case Some(key) =>
+            // we need to pass in the size of the unencrypted block
+            val blockSize = channel.getCount
+            val allocator = level.memoryMode match {
+              case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+              case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+            }
+            new EncryptedBlockData(tmpFile, blockSize, conf, 
key).toChunkedByteBuffer(allocator)
+
+          case None =>
+            ChunkedByteBuffer.map(tmpFile, 
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
+        }
+        putBytes(blockId, buffer, level)(classTag)
+        tmpFile.delete()
+      }
+
+      override def onFailure(streamId: String, cause: Throwable): Unit = {
+        // the framework handles the connection itself, we just need to do 
local cleanup
+        channel.close()
+        tmpFile.delete()
+      }
+    }
+  }
+
   /**
    * Get the BlockStatus for the block identified by the given ID, if it 
exists.
    * NOTE: This is mainly for testing.
@@ -667,7 +725,7 @@ private[spark] class BlockManager(
     // TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
     // could just use the inputStream on the temp file, rather than 
memory-mapping the file.
     // Until then, replication can cause the process to use too much memory 
and get killed
-    // by the OS / cluster manager (not a java OOM, since its a memory-mapped 
file) even though
+    // by the OS / cluster manager (not a java OOM, since it's a memory-mapped 
file) even though
     // we've read the data to disk.
     logDebug(s"Getting remote block $blockId")
     require(blockId != null, "BlockId is null")
@@ -1358,12 +1416,16 @@ private[spark] class BlockManager(
       try {
         val onePeerStartTime = System.nanoTime
         logTrace(s"Trying to replicate $blockId of ${data.size} bytes to 
$peer")
+        // This thread keeps a lock on the block, so we do not want the netty 
thread to unlock
+        // block when it finishes sending the message.
+        val buffer = new BlockManagerManagedBuffer(blockInfoManager, blockId, 
data, false,
+          unlockOnDeallocate = false)
         blockTransferService.uploadBlockSync(
           peer.host,
           peer.port,
           peer.executorId,
           blockId,
-          new BlockManagerManagedBuffer(blockInfoManager, blockId, data, 
false),
+          buffer,
           tLevel,
           classTag)
         logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
index 3d38061..5c12b5c 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
@@ -38,7 +38,8 @@ private[storage] class BlockManagerManagedBuffer(
     blockInfoManager: BlockInfoManager,
     blockId: BlockId,
     data: BlockData,
-    dispose: Boolean) extends ManagedBuffer {
+    dispose: Boolean,
+    unlockOnDeallocate: Boolean = true) extends ManagedBuffer {
 
   private val refCount = new AtomicInteger(1)
 
@@ -58,7 +59,9 @@ private[storage] class BlockManagerManagedBuffer(
  }
 
   override def release(): ManagedBuffer = {
-    blockInfoManager.unlock(blockId)
+    if (unlockOnDeallocate) {
+      blockInfoManager.unlock(blockId)
+    }
     if (refCount.decrementAndGet() == 0 && dispose) {
       data.dispose()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index ef526fd..a820bc7 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -29,7 +29,7 @@ import com.google.common.io.Closeables
 import io.netty.channel.DefaultFileRegion
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
 import org.apache.spark.security.CryptoStreamUtils
 import org.apache.spark.util.Utils
@@ -44,8 +44,7 @@ private[spark] class DiskStore(
     securityManager: SecurityManager) extends Logging {
 
   private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
-  private val maxMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
-    Int.MaxValue.toString)
+  private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
   private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
 
   def getSize(blockId: BlockId): Long = blockSizes.get(blockId)

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index efed90c..39f050f 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -181,6 +181,10 @@ object ChunkedByteBuffer {
     }
   }
 
+  def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
+    map(file, maxChunkSize, 0, file.length())
+  }
+
   def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
     Utils.tryWithResource(FileChannel.open(file.toPath, 
StandardOpenOption.READ)) { channel =>
       var remaining = length

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 28ea0c6..629a323 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.Matchers
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.time.{Millis, Span}
 
+import org.apache.spark.internal.config
 import org.apache.spark.security.EncryptionFunSuite
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.io.ChunkedByteBuffer
@@ -154,6 +155,21 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     sc.parallelize(1 to 10).count()
   }
 
+  private def testCaching(testName: String, conf: SparkConf, storageLevel: 
StorageLevel): Unit = {
+    test(testName) {
+      testCaching(conf, storageLevel)
+    }
+    if (storageLevel.replication > 1) {
+      // also try with block replication as a stream
+      val uploadStreamConf = new SparkConf()
+      uploadStreamConf.setAll(conf.getAll)
+      uploadStreamConf.set(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1L)
+      test(s"$testName (with replication as stream)") {
+        testCaching(uploadStreamConf, storageLevel)
+      }
+    }
+  }
+
   private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = 
{
     sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test"))
     TestUtils.waitUntilExecutorsUp(sc, 2, 30000)
@@ -169,7 +185,10 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     val blockManager = SparkEnv.get.blockManager
     val blockTransfer = blockManager.blockTransferService
     val serializerManager = SparkEnv.get.serializerManager
-    blockManager.master.getLocations(blockId).foreach { cmId =>
+    val locations = blockManager.master.getLocations(blockId)
+    assert(locations.size === storageLevel.replication,
+      s"; got ${locations.size} replicas instead of 
${storageLevel.replication}")
+    locations.foreach { cmId =>
       val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, 
cmId.executorId,
         blockId.toString, null)
       val deserialized = serializerManager.dataDeserializeStream(blockId,
@@ -189,8 +208,8 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2,
     "caching in memory and disk, serialized, replicated" -> 
StorageLevel.MEMORY_AND_DISK_SER_2
   ).foreach { case (testName, storageLevel) =>
-    encryptionTest(testName) { conf =>
-      testCaching(conf, storageLevel)
+    encryptionTestHelper(testName) { case (name, conf) =>
+      testCaching(name, conf, storageLevel)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala 
b/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala
index 3f52dc4..be6b8a6 100644
--- a/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala
@@ -28,11 +28,15 @@ trait EncryptionFunSuite {
    * for the test to modify the provided SparkConf.
    */
   final protected def encryptionTest(name: String)(fn: SparkConf => Unit) {
+    encryptionTestHelper(name) { case (name, conf) =>
+      test(name)(fn(conf))
+    }
+  }
+
+  final protected def encryptionTestHelper(name: String)(fn: (String, 
SparkConf) => Unit): Unit = {
     Seq(false, true).foreach { encrypt =>
-      test(s"$name (encryption = ${ if (encrypt) "on" else "off" })") {
-        val conf = new SparkConf().set(IO_ENCRYPTION_ENABLED, encrypt)
-        fn(conf)
-      }
+      val conf = new SparkConf().set(IO_ENCRYPTION_ENABLED, encrypt)
+      fn(s"$name (encryption = ${ if (encrypt) "on" else "off" })", conf)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index 2f880a3..eec961a 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -24,6 +24,7 @@ import com.google.common.io.{ByteStreams, Files}
 import io.netty.channel.FileRegion
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config
 import org.apache.spark.network.util.{ByteArrayWritableChannel, JavaUtils}
 import org.apache.spark.security.CryptoStreamUtils
 import org.apache.spark.util.Utils
@@ -94,7 +95,7 @@ class DiskStoreSuite extends SparkFunSuite {
 
   test("blocks larger than 2gb") {
     val conf = new SparkConf()
-      .set("spark.storage.memoryMapLimitForTests", "10k" )
+      .set(config.MEMORY_MAP_LIMIT_FOR_TESTS.key, "10k")
     val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
     val diskStore = new DiskStore(conf, diskBlockManager, new 
SecurityManager(conf))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 080cdd1..cdc99a4 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,11 +36,12 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+    // [SPARK-24296][CORE] Replicate large blocks as a stream.
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.this"),
     // [SPARK-23528] Add numIter to ClusteringSummary
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.ClusteringSummary.this"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.KMeansSummary.this"),
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.BisectingKMeansSummary.this"),
-
     // [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to