Repository: spark Updated Branches: refs/heads/master 35f7f5ce8 -> 99d2e4e00
[SPARK-24296][CORE] Replicate large blocks as a stream. When replicating large cached RDD blocks, it can be helpful to replicate them as a stream, to avoid using large amounts of memory during the transfer. This also allows blocks larger than 2GB to be replicated. Added unit tests in DistributedSuite. Also ran tests on a cluster for blocks > 2gb. Closes #21451 from squito/clean_replication. Authored-by: Imran Rashid <iras...@cloudera.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99d2e4e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99d2e4e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99d2e4e0 Branch: refs/heads/master Commit: 99d2e4e00711cffbfaee8cb3da9b6b3feab8ff18 Parents: 35f7f5c Author: Imran Rashid <iras...@cloudera.com> Authored: Tue Aug 21 11:26:41 2018 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Tue Aug 21 11:26:41 2018 -0700 ---------------------------------------------------------------------- .../network/server/TransportRequestHandler.java | 2 +- .../shuffle/protocol/BlockTransferMessage.java | 3 +- .../shuffle/protocol/UploadBlockStream.java | 89 ++++++++++++++++++++ .../org/apache/spark/executor/Executor.scala | 4 +- .../apache/spark/internal/config/package.scala | 7 ++ .../apache/spark/network/BlockDataManager.scala | 12 +++ .../network/netty/NettyBlockRpcServer.scala | 26 +++++- .../netty/NettyBlockTransferService.scala | 39 +++++---- .../org/apache/spark/storage/BlockManager.scala | 66 ++++++++++++++- .../storage/BlockManagerManagedBuffer.scala | 7 +- .../org/apache/spark/storage/DiskStore.scala | 5 +- .../spark/util/io/ChunkedByteBuffer.scala | 4 + .../org/apache/spark/DistributedSuite.scala | 25 +++++- .../spark/security/EncryptionFunSuite.scala | 12 ++- .../apache/spark/storage/DiskStoreSuite.scala | 3 +- project/MimaExcludes.scala | 3 +- 16 files changed, 270 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index c6fd56b..9fac96d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -234,7 +234,7 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { callback.onSuccess(ByteBuffer.allocate(0)); } catch (Exception ex) { IOException ioExc = new IOException("Failure post-processing complete stream;" + - " failing this rpc and leaving channel active"); + " failing this rpc and leaving channel active", ex); callback.onFailure(ioExc); streamHandler.onFailure(streamId, ioExc); } http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 9af6759..a68a297 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -42,7 +42,7 @@ public abstract class BlockTransferMessage implements Encodable { /** Preceding every serialized message is its type, which allows us to deserialize it. */ public enum Type { OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4), - HEARTBEAT(5); + HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6); private final byte id; @@ -67,6 +67,7 @@ public abstract class BlockTransferMessage implements Encodable { case 3: return StreamHandle.decode(buf); case 4: return RegisterDriver.decode(buf); case 5: return ShuffleServiceHeartbeat.decode(buf); + case 6: return UploadBlockStream.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java new file mode 100644 index 0000000..9df3096 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import java.util.Arrays; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.protocol.Encoders; + +// Needed by ScalaDoc. See SPARK-7726 +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; + +/** + * A request to Upload a block, which the destination should receive as a stream. + * + * The actual block data is not contained here. It will be passed to the StreamCallbackWithID + * that is returned from RpcHandler.receiveStream() + */ +public class UploadBlockStream extends BlockTransferMessage { + public final String blockId; + public final byte[] metadata; + + public UploadBlockStream(String blockId, byte[] metadata) { + this.blockId = blockId; + this.metadata = metadata; + } + + @Override + protected Type type() { return Type.UPLOAD_BLOCK_STREAM; } + + @Override + public int hashCode() { + int objectsHashCode = Objects.hashCode(blockId); + return objectsHashCode * 41 + Arrays.hashCode(metadata); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("blockId", blockId) + .add("metadata size", metadata.length) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof UploadBlockStream) { + UploadBlockStream o = (UploadBlockStream) other; + return Objects.equal(blockId, o.blockId) + && Arrays.equals(metadata, o.metadata); + } + return false; + } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(blockId) + + Encoders.ByteArrays.encodedLength(metadata); + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, blockId); + Encoders.ByteArrays.encode(buf, metadata); + } + + public static UploadBlockStream decode(ByteBuf buf) { + String blockId = Encoders.Strings.decode(buf); + byte[] metadata = Encoders.ByteArrays.decode(buf); + return new UploadBlockStream(blockId, metadata); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b1856ff..86b1957 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -363,14 +363,14 @@ private[spark] class Executor( threadMXBean.getCurrentThreadCpuTime } else 0L var threwException = true - val value = try { + val value = Utils.tryWithSafeFinally { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res - } finally { + } { val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a8aa691..daf3f07 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -568,6 +568,13 @@ package object config { .checkValue(v => v > 0, "The value should be a positive integer.") .createWithDefault(2000) + private[spark] val MEMORY_MAP_LIMIT_FOR_TESTS = + ConfigBuilder("spark.storage.memoryMapLimitForTests") + .internal() + .doc("For testing only, controls the size of chunks when memory mapping a file") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Int.MaxValue) + private[spark] val BARRIER_SYNC_TIMEOUT = ConfigBuilder("spark.barrier.sync.timeout") .doc("The timeout in seconds for each barrier() call from a barrier task. If the " + http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index b3f8bfe..e94a012 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.network import scala.reflect.ClassTag import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.storage.{BlockId, StorageLevel} private[spark] @@ -44,6 +45,17 @@ trait BlockDataManager { classTag: ClassTag[_]): Boolean /** + * Put the given block that will be received as a stream. + * + * When this method is called, the block data itself is not available -- it will be passed to the + * returned StreamCallbackWithID. + */ + def putBlockDataAsStream( + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[_]): StreamCallbackWithID + + /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. */ def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index eb4cf94..7076701 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -26,9 +26,9 @@ import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.BlockDataManager import org.apache.spark.network.buffer.NioManagedBuffer -import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} +import org.apache.spark.network.client.{RpcResponseCallback, StreamCallbackWithID, TransportClient} import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager} -import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, OpenBlocks, StreamHandle, UploadBlock} +import org.apache.spark.network.shuffle.protocol._ import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, StorageLevel} @@ -73,10 +73,32 @@ class NettyBlockRpcServer( } val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData)) val blockId = BlockId(uploadBlock.blockId) + logDebug(s"Receiving replicated block $blockId with level ${level} " + + s"from ${client.getSocketAddress}") blockManager.putBlockData(blockId, data, level, classTag) responseContext.onSuccess(ByteBuffer.allocate(0)) } } + override def receiveStream( + client: TransportClient, + messageHeader: ByteBuffer, + responseContext: RpcResponseCallback): StreamCallbackWithID = { + val message = + BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream] + val (level: StorageLevel, classTag: ClassTag[_]) = { + serializer + .newInstance() + .deserialize(ByteBuffer.wrap(message.metadata)) + .asInstanceOf[(StorageLevel, ClassTag[_])] + } + val blockId = BlockId(message.blockId) + logDebug(s"Receiving replicated block $blockId with level ${level} as stream " + + s"from ${client.getSocketAddress}") + // This will return immediately, but will setup a callback on streamData which will still + // do all the processing in the netty thread. + blockManager.putBlockDataAsStream(blockId, level, classTag) + } + override def getStreamManager(): StreamManager = streamManager } http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index b7d8c35..1905632 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -27,13 +27,14 @@ import scala.reflect.ClassTag import com.codahale.metrics.{Metric, MetricSet} import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.internal.config import org.apache.spark.network._ -import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory} import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.server._ import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempFileManager} -import org.apache.spark.network.shuffle.protocol.UploadBlock +import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream} import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage.{BlockId, StorageLevel} @@ -148,20 +149,28 @@ private[spark] class NettyBlockTransferService( // Everything else is encoded using our binary protocol. val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) - // Convert or copy nio buffer into array in order to serialize it. - val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) + val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) + val callback = new RpcResponseCallback { + override def onSuccess(response: ByteBuffer): Unit = { + logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") + result.success((): Unit) + } - client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer, - new RpcResponseCallback { - override def onSuccess(response: ByteBuffer): Unit = { - logTrace(s"Successfully uploaded block $blockId") - result.success((): Unit) - } - override def onFailure(e: Throwable): Unit = { - logError(s"Error while uploading block $blockId", e) - result.failure(e) - } - }) + override def onFailure(e: Throwable): Unit = { + logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e) + result.failure(e) + } + } + if (asStream) { + val streamHeader = new UploadBlockStream(blockId.name, metadata).toByteBuffer + client.uploadStream(new NioManagedBuffer(streamHeader), blockData, callback) + } else { + // Convert or copy nio buffer into array in order to serialize it. + val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) + + client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer, + callback) + } result.future } http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5cd21e3..e7cdfab 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -41,6 +41,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.client.StreamCallbackWithID import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo @@ -406,6 +407,63 @@ private[spark] class BlockManager( putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) } + override def putBlockDataAsStream( + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[_]): StreamCallbackWithID = { + // TODO if we're going to only put the data in the disk store, we should just write it directly + // to the final location, but that would require a deeper refactor of this code. So instead + // we just write to a temp file, and call putBytes on the data in that file. + val tmpFile = diskBlockManager.createTempLocalBlock()._2 + val channel = new CountingWritableChannel( + Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile)))) + logTrace(s"Streaming block $blockId to tmp file $tmpFile") + new StreamCallbackWithID { + + override def getID: String = blockId.name + + override def onData(streamId: String, buf: ByteBuffer): Unit = { + while (buf.hasRemaining) { + channel.write(buf) + } + } + + override def onComplete(streamId: String): Unit = { + logTrace(s"Done receiving block $blockId, now putting into local blockManager") + // Read the contents of the downloaded file as a buffer to put into the blockManager. + // Note this is all happening inside the netty thread as soon as it reads the end of the + // stream. + channel.close() + // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up + // using a lot of memory here. With encryption, we'll read the whole file into a regular + // byte buffer and OOM. Without encryption, we'll memory map the file and won't get a jvm + // OOM, but might get killed by the OS / cluster manager. We could at least read the tmp + // file as a stream in both cases. + val buffer = securityManager.getIOEncryptionKey() match { + case Some(key) => + // we need to pass in the size of the unencrypted block + val blockSize = channel.getCount + val allocator = level.memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator) + + case None => + ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) + } + putBytes(blockId, buffer, level)(classTag) + tmpFile.delete() + } + + override def onFailure(streamId: String, cause: Throwable): Unit = { + // the framework handles the connection itself, we just need to do local cleanup + channel.close() + tmpFile.delete() + } + } + } + /** * Get the BlockStatus for the block identified by the given ID, if it exists. * NOTE: This is mainly for testing. @@ -667,7 +725,7 @@ private[spark] class BlockManager( // TODO if we change this method to return the ManagedBuffer, then getRemoteValues // could just use the inputStream on the temp file, rather than memory-mapping the file. // Until then, replication can cause the process to use too much memory and get killed - // by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though + // by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though // we've read the data to disk. logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") @@ -1358,12 +1416,16 @@ private[spark] class BlockManager( try { val onePeerStartTime = System.nanoTime logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") + // This thread keeps a lock on the block, so we do not want the netty thread to unlock + // block when it finishes sending the message. + val buffer = new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false, + unlockOnDeallocate = false) blockTransferService.uploadBlockSync( peer.host, peer.port, peer.executorId, blockId, - new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false), + buffer, tLevel, classTag) logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala index 3d38061..5c12b5c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala @@ -38,7 +38,8 @@ private[storage] class BlockManagerManagedBuffer( blockInfoManager: BlockInfoManager, blockId: BlockId, data: BlockData, - dispose: Boolean) extends ManagedBuffer { + dispose: Boolean, + unlockOnDeallocate: Boolean = true) extends ManagedBuffer { private val refCount = new AtomicInteger(1) @@ -58,7 +59,9 @@ private[storage] class BlockManagerManagedBuffer( } override def release(): ManagedBuffer = { - blockInfoManager.unlock(blockId) + if (unlockOnDeallocate) { + blockInfoManager.unlock(blockId) + } if (refCount.decrementAndGet() == 0 && dispose) { data.dispose() } http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index ef526fd..a820bc7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -29,7 +29,7 @@ import com.google.common.io.Closeables import io.netty.channel.DefaultFileRegion import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils} import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.util.Utils @@ -44,8 +44,7 @@ private[spark] class DiskStore( securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") - private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", - Int.MaxValue.toString) + private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS) private val blockSizes = new ConcurrentHashMap[BlockId, Long]() def getSize(blockId: BlockId): Long = blockSizes.get(blockId) http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index efed90c..39f050f 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -181,6 +181,10 @@ object ChunkedByteBuffer { } } + def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = { + map(file, maxChunkSize, 0, file.length()) + } + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { Utils.tryWithResource(FileChannel.open(file.toPath, StandardOpenOption.READ)) { channel => var remaining = length http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 28ea0c6..629a323 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Millis, Span} +import org.apache.spark.internal.config import org.apache.spark.security.EncryptionFunSuite import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.io.ChunkedByteBuffer @@ -154,6 +155,21 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex sc.parallelize(1 to 10).count() } + private def testCaching(testName: String, conf: SparkConf, storageLevel: StorageLevel): Unit = { + test(testName) { + testCaching(conf, storageLevel) + } + if (storageLevel.replication > 1) { + // also try with block replication as a stream + val uploadStreamConf = new SparkConf() + uploadStreamConf.setAll(conf.getAll) + uploadStreamConf.set(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1L) + test(s"$testName (with replication as stream)") { + testCaching(uploadStreamConf, storageLevel) + } + } + } + private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = { sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test")) TestUtils.waitUntilExecutorsUp(sc, 2, 30000) @@ -169,7 +185,10 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val blockManager = SparkEnv.get.blockManager val blockTransfer = blockManager.blockTransferService val serializerManager = SparkEnv.get.serializerManager - blockManager.master.getLocations(blockId).foreach { cmId => + val locations = blockManager.master.getLocations(blockId) + assert(locations.size === storageLevel.replication, + s"; got ${locations.size} replicas instead of ${storageLevel.replication}") + locations.foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, blockId.toString, null) val deserialized = serializerManager.dataDeserializeStream(blockId, @@ -189,8 +208,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2, "caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2 ).foreach { case (testName, storageLevel) => - encryptionTest(testName) { conf => - testCaching(conf, storageLevel) + encryptionTestHelper(testName) { case (name, conf) => + testCaching(name, conf, storageLevel) } } http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala b/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala index 3f52dc4..be6b8a6 100644 --- a/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala @@ -28,11 +28,15 @@ trait EncryptionFunSuite { * for the test to modify the provided SparkConf. */ final protected def encryptionTest(name: String)(fn: SparkConf => Unit) { + encryptionTestHelper(name) { case (name, conf) => + test(name)(fn(conf)) + } + } + + final protected def encryptionTestHelper(name: String)(fn: (String, SparkConf) => Unit): Unit = { Seq(false, true).foreach { encrypt => - test(s"$name (encryption = ${ if (encrypt) "on" else "off" })") { - val conf = new SparkConf().set(IO_ENCRYPTION_ENABLED, encrypt) - fn(conf) - } + val conf = new SparkConf().set(IO_ENCRYPTION_ENABLED, encrypt) + fn(s"$name (encryption = ${ if (encrypt) "on" else "off" })", conf) } } http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 2f880a3..eec961a 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -24,6 +24,7 @@ import com.google.common.io.{ByteStreams, Files} import io.netty.channel.FileRegion import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config import org.apache.spark.network.util.{ByteArrayWritableChannel, JavaUtils} import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.util.Utils @@ -94,7 +95,7 @@ class DiskStoreSuite extends SparkFunSuite { test("blocks larger than 2gb") { val conf = new SparkConf() - .set("spark.storage.memoryMapLimitForTests", "10k" ) + .set(config.MEMORY_MAP_LIMIT_FOR_TESTS.key, "10k") val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 080cdd1..cdc99a4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,11 +36,12 @@ object MimaExcludes { // Exclude rules for 2.4.x lazy val v24excludes = v23excludes ++ Seq( + // [SPARK-24296][CORE] Replicate large blocks as a stream. + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.this"), // [SPARK-23528] Add numIter to ClusteringSummary ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.ClusteringSummary.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.KMeansSummary.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.BisectingKMeansSummary.this"), - // [SPARK-6237][NETWORK] Network-layer changes to allow stream upload ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org