[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20422


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-02-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165256701
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -89,26 +96,39 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 } {
   out2.close()
 }
-resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2)
+
+assert(indexFile.length() === (lengths.length + 1) * 8)
 assert(lengths2.toSeq === lengths.toSeq)
 assert(dataFile.exists())
 assert(dataFile.length() === 30)
 assert(!dataTmp2.exists())
 
 // The dataFile should be the previous one
 val firstByte = new Array[Byte](1)
-val in = new FileInputStream(dataFile)
+val dataIn = new FileInputStream(dataFile)
 Utils.tryWithSafeFinally {
-  in.read(firstByte)
+  dataIn.read(firstByte)
 } {
-  in.close()
+  dataIn.close()
 }
 assert(firstByte(0) === 0)
 
+// The index file should not change
+val secondValueOffset = new Array[Byte](8)
+val indexIn = new FileInputStream(indexFile)
+Utils.tryWithSafeFinally {
+  indexIn.read(secondValueOffset)
+  indexIn.read(secondValueOffset)
+} {
+  indexIn.close()
+}
+assert(secondValueOffset(7) === 10, "The index file should not change")
--- End diff --

minor: here and below, would be more clear if you use 
`DataInputStream.readLong()` (no magic 7 offset, and you check the rest of the 
bytes):
```scala
val indexIn = new DataInputStream( newFileInputStream(indexFile))
Utils.tryWithSafeFinally {
  indexIn.readLong()  // first offset is always 0
  assert(10 === indexIn.readLong(),"The index file should not change")
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165250968
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -89,26 +96,39 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 } {
   out2.close()
 }
-resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2)
+
+assert(indexFile.length() === (lengths.length + 1) * 8)
 assert(lengths2.toSeq === lengths.toSeq)
 assert(dataFile.exists())
 assert(dataFile.length() === 30)
 assert(!dataTmp2.exists())
 
 // The dataFile should be the previous one
 val firstByte = new Array[Byte](1)
-val in = new FileInputStream(dataFile)
+val dataIn = new FileInputStream(dataFile)
 Utils.tryWithSafeFinally {
-  in.read(firstByte)
+  dataIn.read(firstByte)
 } {
-  in.close()
+  dataIn.close()
 }
 assert(firstByte(0) === 0)
 
+// The index file should not change
+val secondBytes = new Array[Byte](8)
--- End diff --

not sure whether we should change this, just my two cents.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165250833
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -89,26 +96,39 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 } {
   out2.close()
 }
-resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2)
+
+assert(indexFile.length() === (lengths.length + 1) * 8)
 assert(lengths2.toSeq === lengths.toSeq)
 assert(dataFile.exists())
 assert(dataFile.length() === 30)
 assert(!dataTmp2.exists())
 
 // The dataFile should be the previous one
 val firstByte = new Array[Byte](1)
-val in = new FileInputStream(dataFile)
+val dataIn = new FileInputStream(dataFile)
 Utils.tryWithSafeFinally {
-  in.read(firstByte)
+  dataIn.read(firstByte)
 } {
-  in.close()
+  dataIn.close()
 }
 assert(firstByte(0) === 0)
 
+// The index file should not change
+val secondBytes = new Array[Byte](8)
--- End diff --

nit: should have a better name, perhaps `secondValueIndex` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165249708
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.shuffle.sort
 
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io._
--- End diff --

nit: this change is not necessary


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165226127
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 }
 assert(firstByte2(0) === 2)
   }
+
+  test("SPARK-23253: index files should be created properly") {
--- End diff --

+1 we can add check the index file in the original test case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165225733
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -123,7 +123,7 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 assert(dataFile.length() === 35)
 assert(!dataTmp2.exists())
--- End diff --

not related to the change, but this should be `assert(!dataTmp3.exists())`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165161260
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 }
 assert(firstByte2(0) === 2)
   }
+
+  test("SPARK-23253: index files should be created properly") {
--- End diff --

thanks for adding this, but actually I'm not sure this is covering any 
cases in the previous test, is it?  I was thinking of just adding something to 
read the actual index file, and make sure it had the right values to go with 
the update to the data file (or no updates in some cases).

you may have added a couple more asserts than the original test -- if so, 
maybe they can just be added to the original?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r165159188
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 ---
@@ -133,4 +133,65 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
 }
 assert(firstByte2(0) === 2)
   }
+
+  test("SPARK-23253: index files should be created properly") {
+val shuffleId = 1
+val mapId = 2
+val idxName = s"shuffle_${shuffleId}_${mapId}_0.index"
+val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+
+val lengths = (1 to 2).map(_ => 8L).toArray
--- End diff --

you could do `Array.fill(2)(8L)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20422#discussion_r164635886
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -166,8 +153,20 @@ private[spark] class IndexShuffleBlockResolver(
   if (dataTmp != null && dataTmp.exists()) {
 dataTmp.delete()
   }
-  indexTmp.delete()
 } else {
+  val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
--- End diff --

move this below the comment "This is the first successul attempt".

I'd also include a comment about why we write to a temporary file, even 
though we're always going to rename (because in case the task dies somehow, 
we'd prefer to not leave a half-written index file in the final location).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20422: [SPARK-23253][Core][Shuffle]Only write shuffle te...

2018-01-29 Thread yaooqinn
GitHub user yaooqinn opened a pull request:

https://github.com/apache/spark/pull/20422

[SPARK-23253][Core][Shuffle]Only write shuffle temporary index file when 
there is not an existing one


## What changes were proposed in this pull request?

Shuffle Index temporay file is used for atomic creating shuffle index file, 
it is not needed when the index file already exists after another attempts of 
same task had it done.

## How was this patch tested?

exitsting ut

cc @squito 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yaooqinn/spark SPARK-23253

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20422


commit 98ea6a742143da803eb728c352e7424f504fabba
Author: Kent Yao 
Date:   2018-01-29T10:11:50Z

Only write shuffle temporary index file when there is not an existing one




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org