[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20422 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165256701 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -89,26 +96,39 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } { out2.close() } -resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2) +resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2) + +assert(indexFile.length() === (lengths.length + 1) * 8) assert(lengths2.toSeq === lengths.toSeq) assert(dataFile.exists()) assert(dataFile.length() === 30) assert(!dataTmp2.exists()) // The dataFile should be the previous one val firstByte = new Array[Byte](1) -val in = new FileInputStream(dataFile) +val dataIn = new FileInputStream(dataFile) Utils.tryWithSafeFinally { - in.read(firstByte) + dataIn.read(firstByte) } { - in.close() + dataIn.close() } assert(firstByte(0) === 0) +// The index file should not change +val secondValueOffset = new Array[Byte](8) +val indexIn = new FileInputStream(indexFile) +Utils.tryWithSafeFinally { + indexIn.read(secondValueOffset) + indexIn.read(secondValueOffset) +} { + indexIn.close() +} +assert(secondValueOffset(7) === 10, "The index file should not change") --- End diff -- minor: here and below, would be more clear if you use `DataInputStream.readLong()` (no magic 7 offset, and you check the rest of the bytes): ```scala val indexIn = new DataInputStream( newFileInputStream(indexFile)) Utils.tryWithSafeFinally { indexIn.readLong() // first offset is always 0 assert(10 === indexIn.readLong(),"The index file should not change") } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165250968 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -89,26 +96,39 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } { out2.close() } -resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2) +resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2) + +assert(indexFile.length() === (lengths.length + 1) * 8) assert(lengths2.toSeq === lengths.toSeq) assert(dataFile.exists()) assert(dataFile.length() === 30) assert(!dataTmp2.exists()) // The dataFile should be the previous one val firstByte = new Array[Byte](1) -val in = new FileInputStream(dataFile) +val dataIn = new FileInputStream(dataFile) Utils.tryWithSafeFinally { - in.read(firstByte) + dataIn.read(firstByte) } { - in.close() + dataIn.close() } assert(firstByte(0) === 0) +// The index file should not change +val secondBytes = new Array[Byte](8) --- End diff -- not sure whether we should change this, just my two cents. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165250833 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -89,26 +96,39 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } { out2.close() } -resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2) +resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2) + +assert(indexFile.length() === (lengths.length + 1) * 8) assert(lengths2.toSeq === lengths.toSeq) assert(dataFile.exists()) assert(dataFile.length() === 30) assert(!dataTmp2.exists()) // The dataFile should be the previous one val firstByte = new Array[Byte](1) -val in = new FileInputStream(dataFile) +val dataIn = new FileInputStream(dataFile) Utils.tryWithSafeFinally { - in.read(firstByte) + dataIn.read(firstByte) } { - in.close() + dataIn.close() } assert(firstByte(0) === 0) +// The index file should not change +val secondBytes = new Array[Byte](8) --- End diff -- nit: should have a better name, perhaps `secondValueIndex` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165249708 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -17,7 +17,7 @@ package org.apache.spark.shuffle.sort -import java.io.{File, FileInputStream, FileOutputStream} +import java.io._ --- End diff -- nit: this change is not necessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165226127 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } assert(firstByte2(0) === 2) } + + test("SPARK-23253: index files should be created properly") { --- End diff -- +1 we can add check the index file in the original test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165225733 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -123,7 +123,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa assert(dataFile.length() === 35) assert(!dataTmp2.exists()) --- End diff -- not related to the change, but this should be `assert(!dataTmp3.exists())` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165161260 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } assert(firstByte2(0) === 2) } + + test("SPARK-23253: index files should be created properly") { --- End diff -- thanks for adding this, but actually I'm not sure this is covering any cases in the previous test, is it? I was thinking of just adding something to read the actual index file, and make sure it had the right values to go with the update to the data file (or no updates in some cases). you may have added a couple more asserts than the original test -- if so, maybe they can just be added to the original? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r165159188 --- Diff: core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala --- @@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa } assert(firstByte2(0) === 2) } + + test("SPARK-23253: index files should be created properly") { +val shuffleId = 1 +val mapId = 2 +val idxName = s"shuffle_${shuffleId}_${mapId}_0.index" +val resolver = new IndexShuffleBlockResolver(conf, blockManager) + +val lengths = (1 to 2).map(_ => 8L).toArray --- End diff -- you could do `Array.fill(2)(8L)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20422#discussion_r164635886 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -166,8 +153,20 @@ private[spark] class IndexShuffleBlockResolver( if (dataTmp != null && dataTmp.exists()) { dataTmp.delete() } - indexTmp.delete() } else { + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) --- End diff -- move this below the comment "This is the first successul attempt". I'd also include a comment about why we write to a temporary file, even though we're always going to rename (because in case the task dies somehow, we'd prefer to not leave a half-written index file in the final location). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...
GitHub user yaooqinn opened a pull request: https://github.com/apache/spark/pull/20422 [SPARK-23253][Core][Shuffle]Only write shuffle temporary index file when there is not an existing one ## What changes were proposed in this pull request? Shuffle Index temporay file is used for atomic creating shuffle index file, it is not needed when the index file already exists after another attempts of same task had it done. ## How was this patch tested? exitsting ut cc @squito You can merge this pull request into a Git repository by running: $ git pull https://github.com/yaooqinn/spark SPARK-23253 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20422 commit 98ea6a742143da803eb728c352e7424f504fabba Author: Kent YaoDate: 2018-01-29T10:11:50Z Only write shuffle temporary index file when there is not an existing one --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org