Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6648#discussion_r32022303
--- Diff: core/src/test/scala/org/apache/spark/shuffle/ShuffleSuite.scala
---
@@ -315,8 +320,115 @@ abstract class ShuffleSuite extends SparkFunSuite
with Matchers with LocalSparkC
assert(metrics.bytesWritten === metrics.byresRead)
assert(metrics.bytesWritten > 0)
}
+
+ def multipleAttemptConfs: Seq[(String, SparkConf)] = Seq("basic" -> conf)
+
+ multipleAttemptConfs.foreach { case (name, multipleAttemptConf) =>
+ test("multiple attempts for one task: conf = " + name) {
+ sc = new SparkContext("local", "test", multipleAttemptConf)
+ val mapTrackerMaster =
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ val manager = sc.env.shuffleManager
+ val taskMemoryManager = new
TaskMemoryManager(sc.env.executorMemoryManager)
+ val shuffleMapRdd = new MyRDD(sc, 1, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(1))
+ val shuffleHandle = manager.registerShuffle(0, 1, shuffleDep)
+
+ // first attempt -- its successful
+ val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0, 0,
+ new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, false,
stageAttemptId = 0,
+ taskMetrics = new TaskMetrics))
+ val data1 = (1 to 10).map { x => x -> x}
+
+ // second attempt -- also successful. We'll write out different
data,
+ // just to simulate the fact that the records may get written
differently
+ // depending on what gets spilled, what gets combined, etc.
+ val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0, 1,
+ new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, false,
stageAttemptId = 1,
+ taskMetrics = new TaskMetrics))
+ val data2 = (11 to 20).map { x => x -> x}
+
+ // interleave writes of both attempts -- we want to test that both
attempts can occur
+ // simultaneously, and everything is still OK
+ val interleaver = new InterleavingIterator(
+ data1, {iter: Iterator[(Int, Int)] => writer1.write(iter);
writer1.stop(true)},
+ data2, {iter: Iterator[(Int, Int)] => writer2.write(iter);
writer2.stop(true)})
+ val (mapOutput1, mapOutput2) = interleaver.run()
+
+
+ // register the output from attempt 1, and try to read it
+ mapOutput1.foreach { mapStatus =>
mapTrackerMaster.registerMapOutputs(0, Array(mapStatus))}
+ val reader1 = manager.getReader[Int, Int](shuffleHandle, 0, 1,
+ new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, false,
taskMetrics = new TaskMetrics))
+ reader1.read().toIndexedSeq should be (data1.toIndexedSeq)
+
+ // now for attempt 2 (registeringMapOutputs always blows away all
previous outputs, so we
+ // won't find the output for attempt 1)
+ mapOutput2.foreach { mapStatus =>
mapTrackerMaster.registerMapOutputs(0, Array(mapStatus))}
+
+ val reader2 = manager.getReader[Int, Int](shuffleHandle, 0, 1,
+ new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, false,
taskMetrics = new TaskMetrics))
+ reader2.read().toIndexedSeq should be(data2.toIndexedSeq)
+
+
+ // make sure that when the shuffle gets unregistered, we cleanup
from all attempts
+ val shuffleFiles1 = manager.getShuffleFiles(shuffleHandle, 0, 0, 0)
--- End diff --
This also tests that the output gets cleaned up properly from all attempts.
To test that, I added a somewhat hacky `getShuffleFiles` method to
`ShuffleManager`. A shuffle manager could easily game this test, since the
only purpose the method serves is for this test ... but there are some checks
below to make sure its doing something reasonable, so I think its OK. open to
other suggestions
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]