[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169967506 --- Diff: project/MimaExcludes.scala --- @@ -1129,6 +1129,12 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.org$apache$spark$ml$param$shared$HasLoss$_setter_$loss_="), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.getLoss"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.loss") +) ++ Seq( + // [SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs. + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.FetchFailed$"), --- End diff -- yup --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169902931 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -295,8 +307,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * and the second item is a sequence of (shuffle block id, shuffle block size) tuples * describing the shuffle blocks that are stored at that block manager. */ - def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) - : Seq[(BlockManagerId, Seq[(BlockId, Long)])] + def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int, + serializer: Serializer = null): Seq[(BlockManagerId, Seq[(BlockId, Long)])] --- End diff -- I'd like to pass in a `serializerRelocatable: Boolean` instead of a serializer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169901683 --- Diff: project/MimaExcludes.scala --- @@ -1129,6 +1129,12 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.org$apache$spark$ml$param$shared$HasLoss$_setter_$loss_="), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.getLoss"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.loss") +) ++ Seq( + // [SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs. + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.FetchFailed$"), --- End diff -- I see, so I will keep FetchFailed as it was (no numBlocks is introduced), is it OK? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169899114 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -295,8 +307,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * and the second item is a sequence of (shuffle block id, shuffle block size) tuples * describing the shuffle blocks that are stored at that block manager. */ - def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) - : Seq[(BlockManagerId, Seq[(BlockId, Long)])] + def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int, + serializer: Serializer = null): Seq[(BlockManagerId, Seq[(BlockId, Long)])] --- End diff -- if possible let's not introduce a default value here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169898393 --- Diff: project/MimaExcludes.scala --- @@ -1129,6 +1129,12 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.org$apache$spark$ml$param$shared$HasLoss$_setter_$loss_="), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.getLoss"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasLoss.loss") +) ++ Seq( + // [SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs. + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.FetchFailed$"), --- End diff -- We can get rid of this if we don't touch `FetchFailed`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169873362 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -280,6 +282,16 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } + protected def supportsContinuousBlockBatchFetch(serializer: Serializer): Boolean = { +if (serializer == null || !serializer.supportsRelocationOfSerializedObjects) { + return false +} +val compressionEnabled: Boolean = conf.getBoolean("spark.shuffle.compress", true) +val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) --- End diff -- We only need to create the codec if `compressionEnabled` is true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169843676 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -280,6 +281,16 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } + protected def supportsContinuousBlockBulkFetch: Boolean = { +// continuousBlockBulkFetch only happens in SparkSQL, it uses UnsafeRowSerializer, +// which supports relocation of serialized objects, so we only consider compression +val adaptiveEnabled: Boolean = conf.getBoolean("spark.sql.adaptive.enabled", false) --- End diff -- Got it, adaptiveEnabled will be removed as your suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169843133 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -157,21 +157,34 @@ public void registerExecutor( } /** - * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions + * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numBlocks). We make assumptions * about how the hash and sort based shuffles store their data. */ public ManagedBuffer getBlockData( String appId, String execId, int shuffleId, int mapId, - int reduceId) { + int reduceId, + int numBlocks) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { throw new RuntimeException( String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } -return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); +return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numBlocks); + } + + /** + * This interface is for backward compatible. + */ + public ManagedBuffer getBlockData( --- End diff -- It is called in ExternalShuffleBlockResolverSuite only, so how about removing it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169229650 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -81,16 +81,17 @@ case object Resubmitted extends TaskFailedReason { */ @DeveloperApi case class FetchFailed( -bmAddress: BlockManagerId, // Note that bmAddress can be null +bmAddress: BlockManagerId, // Note that bmAddress can be null shuffleId: Int, mapId: Int, reduceId: Int, -message: String) +message: String, +numBlocks: Int = 1) extends TaskFailedReason { override def toErrorString: String = { val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " + - s"message=\n$message\n)" + s"numBlocks=$numBlocks, message=\n$message\n)" --- End diff -- do we really care about the `numBlock` when a shuffle fetch failed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169229539 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala --- @@ -51,11 +51,25 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). +trait ShuffleBlockIdBase { --- End diff -- we can make this extend `BlockId`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169229060 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -280,6 +281,16 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } + protected def supportsContinuousBlockBulkFetch: Boolean = { +// continuousBlockBulkFetch only happens in SparkSQL, it uses UnsafeRowSerializer, +// which supports relocation of serialized objects, so we only consider compression +val adaptiveEnabled: Boolean = conf.getBoolean("spark.sql.adaptive.enabled", false) --- End diff -- I don't think we can get this conf here. SQL conf is per-session and is kind of a layer above Spark conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169228298 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -157,21 +157,34 @@ public void registerExecutor( } /** - * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions + * Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numBlocks). We make assumptions * about how the hash and sort based shuffles store their data. */ public ManagedBuffer getBlockData( String appId, String execId, int shuffleId, int mapId, - int reduceId) { + int reduceId, + int numBlocks) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { throw new RuntimeException( String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } -return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); +return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numBlocks); + } + + /** + * This interface is for backward compatible. + */ + public ManagedBuffer getBlockData( --- End diff -- where will we call this method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r169227734 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java --- @@ -196,42 +196,51 @@ private ShuffleMetrics() { private final String appId; private final String execId; private final int shuffleId; -// An array containing mapId and reduceId pairs. -private final int[] mapIdAndReduceIds; +// An array containing mapId, reduceId and numBlocks tuple +private final int[] shuffleBlockIds; ManagedBufferIterator(String appId, String execId, String[] blockIds) { this.appId = appId; this.execId = execId; String[] blockId0Parts = blockIds[0].split("_"); - if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) { + // length == 4: ShuffleBlockId + // length == 5: ContinuousShuffleBlockId + if (!(blockId0Parts.length == 4 || blockId0Parts.length == 5) || +!blockId0Parts[0].equals("shuffle")) { throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]); } this.shuffleId = Integer.parseInt(blockId0Parts[1]); - mapIdAndReduceIds = new int[2 * blockIds.length]; + shuffleBlockIds = new int[3 * blockIds.length]; for (int i = 0; i < blockIds.length; i++) { String[] blockIdParts = blockIds[i].split("_"); -if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) { +if (!(blockIdParts.length == 4 || blockIdParts.length == 5) || + !blockIdParts[0].equals("shuffle")) { --- End diff -- shall we create a `boolean isShuffleBlock(String[] blockIdParts)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r165409015 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -165,13 +165,23 @@ public ManagedBuffer getBlockData( String execId, int shuffleId, int mapId, - int reduceId) { + int reduceId, + int length) { --- End diff -- Ok, will use numBlocks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r165408800 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java --- @@ -203,22 +203,23 @@ private ShuffleMetrics() { this.appId = appId; this.execId = execId; String[] blockId0Parts = blockIds[0].split("_"); - if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) { + if (blockId0Parts.length != 5 || !blockId0Parts[0].equals("shuffle")) { throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]); } this.shuffleId = Integer.parseInt(blockId0Parts[1]); - mapIdAndReduceIds = new int[2 * blockIds.length]; + mapIdAndReduceIds = new int[3 * blockIds.length]; --- End diff -- Thanks, fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r158124202 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java --- @@ -203,22 +203,23 @@ private ShuffleMetrics() { this.appId = appId; this.execId = execId; String[] blockId0Parts = blockIds[0].split("_"); - if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) { + if (blockId0Parts.length != 5 || !blockId0Parts[0].equals("shuffle")) { --- End diff -- This format change can cause incompatibility between shuffle service and spark application - causing a restart of the cluster and update of all spark applications I wish we had a better way to encode this information which was not so brittle. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r158121392 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -165,13 +165,23 @@ public ManagedBuffer getBlockData( String execId, int shuffleId, int mapId, - int reduceId) { + int reduceId, + int length) { --- End diff -- Please rename the variable - `length` is incorrect (here and other places), please rename to make it clear : `numBlocks `perhaps ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r158120856 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java --- @@ -203,22 +203,23 @@ private ShuffleMetrics() { this.appId = appId; this.execId = execId; String[] blockId0Parts = blockIds[0].split("_"); - if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) { + if (blockId0Parts.length != 5 || !blockId0Parts[0].equals("shuffle")) { throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]); } this.shuffleId = Integer.parseInt(blockId0Parts[1]); - mapIdAndReduceIds = new int[2 * blockIds.length]; + mapIdAndReduceIds = new int[3 * blockIds.length]; --- End diff -- Please update description of the variable as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r158123441 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -812,10 +812,10 @@ private[spark] object MapOutputTracker extends Logging { logError(errorMessage) throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) } else { -for (part <- startPartition until endPartition) { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += -((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) -} +val totalSize: Long = (startPartition until endPartition).map(status.getSizeForBlock).sum +splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += + ((ShuffleBlockId(shuffleId, mapId, startPartition, endPartition - startPartition), +totalSize)) --- End diff -- This is going to create some very heavy shuffle fetches - and looks incorrect. This merge should not be happening here, but in `ShuffleBlockFetcherIterator` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r158124746 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java --- @@ -59,9 +59,9 @@ public int getSize() { /** * Get index offset for a particular reducer. */ - public ShuffleIndexRecord getIndex(int reduceId) { + public ShuffleIndexRecord getIndex(int reduceId, int length) { --- End diff -- perhaps `require` that length (number of Blocks) is >= 1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r158124569 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -165,13 +165,23 @@ public ManagedBuffer getBlockData( String execId, int shuffleId, int mapId, - int reduceId) { + int reduceId, + int length) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { throw new RuntimeException( String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } -return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); +return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, length); + } + + public ManagedBuffer getBlockData( + String appId, + String execId, + int shuffleId, + int mapId, + int reduceId) { --- End diff -- Remove this method ? We dont need it anymore --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153117548 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala --- @@ -116,8 +117,8 @@ object BlockId { def apply(name: String): BlockId = name match { case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) -case SHUFFLE(shuffleId, mapId, reduceId) => - ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) +case SHUFFLE(shuffleId, mapId, reduceId, n) => --- End diff -- Yes, good catch! I will change here after using `ContinuousShuffleBlockId` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153117088 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala --- @@ -116,8 +117,8 @@ object BlockId { def apply(name: String): BlockId = name match { case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt) -case SHUFFLE(shuffleId, mapId, reduceId) => - ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) +case SHUFFLE(shuffleId, mapId, reduceId, n) => --- End diff -- :nit length? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153089584 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- I get your point, thanks for the explanation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153057489 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -165,13 +165,23 @@ public ManagedBuffer getBlockData( String execId, int shuffleId, int mapId, - int reduceId) { + int reduceId, + int length) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { throw new RuntimeException( String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } -return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); +return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, length); + } + + public ManagedBuffer getBlockData( --- End diff -- Thanks, will update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153049711 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java --- @@ -110,6 +110,13 @@ public void testSortShuffleBlocks() throws IOException { new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); block1Stream.close(); assertEquals(sortBlock1, block1); + +InputStream block01Stream = +resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream(); +String block01 = CharStreams.toString( +new InputStreamReader(block01Stream, StandardCharsets.UTF_8)); --- End diff -- Thanks, updated! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153049707 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala --- @@ -52,8 +52,9 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi -case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { - override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId +case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, length: Int = 1) + extends BlockId { + override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + "_" + length --- End diff -- `ContinuousShuffleBlockIds` looks like a good idea, let me try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153049650 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") --- End diff -- Without this info, it looks hard to know continuous shuffle block read really happen, and I found `getLocalBytes` had similar debug info also. ``` logDebug(s"Getting local block $blockId as bytes") ``` How about keeping it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152980151 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala --- @@ -52,8 +52,9 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi -case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { - override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId +case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, length: Int = 1) + extends BlockId { + override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + "_" + length --- End diff -- these are semi-public interfaces, can we create a new block id `ContinuousShuffleBlockIds`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152976816 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java --- @@ -110,6 +110,13 @@ public void testSortShuffleBlocks() throws IOException { new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); block1Stream.close(); assertEquals(sortBlock1, block1); + +InputStream block01Stream = +resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream(); +String block01 = CharStreams.toString( +new InputStreamReader(block01Stream, StandardCharsets.UTF_8)); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152976135 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -165,13 +165,23 @@ public ManagedBuffer getBlockData( String execId, int shuffleId, int mapId, - int reduceId) { + int reduceId, + int length) { ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId)); if (executor == null) { throw new RuntimeException( String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } -return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); +return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, length); + } + + public ManagedBuffer getBlockData( --- End diff -- nit: we should move the original comment here, and explain the different usages of these two functions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152976754 --- Diff: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java --- @@ -110,6 +110,13 @@ public void testSortShuffleBlocks() throws IOException { new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); block1Stream.close(); assertEquals(sortBlock1, block1); + +InputStream block01Stream = +resolver.getBlockData("app0", "exec0", 0, 0, 0, 2).createInputStream(); --- End diff -- nit: please follow the above indents format. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152978408 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockId.scala --- @@ -52,8 +52,9 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { // Format of the shuffle block ids (including data and index) should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi -case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { - override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId +case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int, length: Int = 1) + extends BlockId { + override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + "_" + length --- End diff -- nit: maybe `s"shuffle_$shuffleId_$mapId_$reduceId_$length"`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152962372 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") --- End diff -- Ok, I will remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152961094 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- Sure, for example, when startPartition = 3, endPartition = 8, it means we need [3, 8) and length = 5. Line 204: ByteStreams.skipFully(3 * 8), will skip 0, 1, 2 Line 205: offset = in. readLong, we got startPartition(3)'s offset Line 206: ByteStreams.skipFully((5 - 1) * 8), will skip 4, 5, 6, 7 Line 207: nextOffset = in.readLong(), now we got endPartition(8)'s offset When length is "1", zero should be correct. We don't need to skip anything, and Line 207's readLong will get endPartition's offset. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891920 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- Also if length is "1", then this will always be Zero. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891792 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -812,10 +812,13 @@ private[spark] object MapOutputTracker extends Logging { logError(errorMessage) throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) } else { +var totalSize = 0L for (part <- startPartition until endPartition) { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += -((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) + totalSize += status.getSizeForBlock(part) --- End diff -- This can be simplified like: `val totalSize = (startPartition until endPartition).map(status.getSizeForXXX).sum`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891172 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") --- End diff -- Not necessary to add this, I guess this is mainly for your debug purpose. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891438 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- I doubt this line is not correct, this seems change the semantics, for example if startPartition is 3, endPartition is 8, originally it should be (3\*8), now it changes to (4\*8), can you please explain more? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152193203 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -812,10 +812,14 @@ private[spark] object MapOutputTracker extends Logging { logError(errorMessage) throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) } else { +var n = 0 +var totalSize = 0L for (part <- startPartition until endPartition) { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += -((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) + n += 1 + totalSize += status.getSizeForBlock(part) } --- End diff -- `n` can be `numPartitions`, and directly get by `endPartition - startPartition` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
GitHub user yucai opened a pull request: https://github.com/apache/spark/pull/19788 [SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs ## What changes were proposed in this pull request? Optimize shuffle fetch of contiguous partition IDs ## How was this patch tested? Add new unit tests, AdaptiveSchedulingSuite, BlockIdSuite etc. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yucai/spark shuffle_fetch_opt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19788.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19788 commit e947bcb39d1436751e68e033871baba5a77f6d0e Author: yucaiDate: 2017-11-20T03:04:28Z [SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org