Ngone51 commented on a change in pull request #31664:
URL: https://github.com/apache/spark/pull/31664#discussion_r583633920
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
##########
@@ -131,4 +141,29 @@ class SortShuffleManagerSuite extends SparkFunSuite with
Matchers {
)))
}
+ test("SPARK-34541 Data could not be cleaned up when unregisterShuffle") {
+ val conf = new SparkConf(loadDefaults = false)
+ val tempDir: File = Utils.createTempDir()
Review comment:
use `withTempDir`?
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
##########
@@ -131,4 +141,29 @@ class SortShuffleManagerSuite extends SparkFunSuite with
Matchers {
)))
}
+ test("SPARK-34541 Data could not be cleaned up when unregisterShuffle") {
+ val conf = new SparkConf(loadDefaults = false)
+ val tempDir: File = Utils.createTempDir()
+ conf.set("spark.local.dir", tempDir.getAbsolutePath)
+ val sc: SparkContext = new SparkContext("local", "SPARK-34541", conf)
+ val rdd = sc.parallelize(1 to 10, 1).map(x => (x, x))
+ // Create a shuffleRdd
+ val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new
HashPartitioner(4))
+ .setSerializer(new JavaSerializer(conf))
+ def getAllFiles: Set[File] =
+ FileUtils.listFiles(tempDir, TrueFileFilter.INSTANCE,
TrueFileFilter.INSTANCE).asScala.toSet
+ val filesBeforeShuffle = getAllFiles
+ // Force the shuffle to be performed
+ shuffledRdd.count()
+ // Ensure that the shuffle actually created files that will need to be
cleaned up
+ val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle
+ filesCreatedByShuffle.map(_.getName) should be
+ Set("shuffle_0_0_0.data", "shuffle_0_0_0.index")
+ // Check that the cleanup actually removes the files
+ sc.env.blockManager.master.removeShuffle(0, blocking = true)
+ for (file <- filesCreatedByShuffle) {
+ assert (!file.exists(), s"Shuffle file $file was not cleaned up")
+ }
+ }
+
Review comment:
nit: unnecessary line
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
##########
@@ -17,13 +17,23 @@
package org.apache.spark.shuffle.sort
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.TrueFileFilter
import org.mockito.Mockito.{mock, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers._
import org.apache.spark._
+import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
+import org.apache.spark.util.Utils
+
Review comment:
nit: redundant empty line.
##########
File path:
core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
##########
@@ -131,4 +141,29 @@ class SortShuffleManagerSuite extends SparkFunSuite with
Matchers {
)))
}
+ test("SPARK-34541 Data could not be cleaned up when unregisterShuffle") {
Review comment:
nit: `SPARK-34541:`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]