zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r928088166
##########
resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala:
##########
@@ -380,6 +550,472 @@ class YarnShuffleServiceSuite extends SparkFunSuite with
Matchers {
}
}
+ test("Consistency in AppPathInfo between in-memory hashmap and the DB") {
+ s1 = new YarnShuffleService
+ s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s1.init(yarnConfig)
+
+ val app1Id = ApplicationId.newInstance(0, 1)
+ val app1Data = makeAppInfo("user", app1Id)
+ s1.initializeApplication(app1Data)
+ val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+ val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+ s1.initializeApplication(app2Attempt1Data)
+ val app2Attempt2Id = ApplicationId.newInstance(0, 2)
+ val app2Attempt2Data = makeAppInfo("user", app2Attempt2Id)
+ s1.initializeApplication(app2Attempt2Data)
+ val app3IdNoAttemptId = ApplicationId.newInstance(0, 3)
+ val app3NoAttemptIdData = makeAppInfo("user", app3IdNoAttemptId)
+ s1.initializeApplication(app3NoAttemptIdData)
+
+ val mergeMgrFile = s1.mergeManagerFile
+ mergeMgrFile should not be (null)
+ val mergedShuffleInfo1 =
+ new ExecutorShuffleInfo(
+ Array(new File(tempDir, "foo/foo").getAbsolutePath,
+ new File(tempDir, "bar/bar").getAbsolutePath), 3,
+ SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo2Attempt1 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy1/bippy1").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo2Attempt2 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy2/bippy2").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+ val mergedShuffleInfo3NoAttemptId =
+ new ExecutorShuffleInfo(
+ Array(new File(tempDir, "foo/foo").getAbsolutePath,
+ new File(tempDir, "bar/bar").getAbsolutePath),
+ 4, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID)
+
+ val localDirs1 = Array(new File(tempDir,
"foo/merge_manager_1").getAbsolutePath,
+ new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+ val localDirs2Attempt1 = Array(new File(tempDir,
"bippy1/merge_manager_1").getAbsolutePath)
+ val localDirs2Attempt2 = Array(new File(tempDir,
"bippy2/merge_manager_2").getAbsolutePath)
+ val localDirs3NoAttempt = Array(new File(tempDir,
"foo/merge_manager").getAbsolutePath,
+ new File(tempDir, "bar/merge_manager").getAbsolutePath)
+ val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+ val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+ val appPathsInfo2Attempt2 = new AppPathsInfo(localDirs2Attempt2, 5)
+ val appPathsInfo3NoAttempt = new AppPathsInfo(localDirs3NoAttempt, 4)
+
+ val mergeManager1 =
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+ val mergeManager1DB =
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+ ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+ ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+ ShuffleTestAccessor.reloadAppShuffleInfo(
+ mergeManager1, mergeManager1DB).size() equals 0
+
+ mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+ var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 1
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ var appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 1
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+
+ mergeManager1.registerExecutor(app2Attempt1Id.toString,
mergedShuffleInfo2Attempt1)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 2
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 2
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfoAfterReload.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+
+ mergeManager1.registerExecutor(app3IdNoAttemptId.toString,
mergedShuffleInfo3NoAttemptId)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 3
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ appShuffleInfo.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 3
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfoAfterReload.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ appShuffleInfoAfterReload.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+
+ mergeManager1.registerExecutor(app2Attempt2Id.toString,
mergedShuffleInfo2Attempt2)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 3
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Attempt2Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt2)
+ appShuffleInfo.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 3
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfoAfterReload.get(
+ app2Attempt2Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt2)
+ appShuffleInfoAfterReload.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+
+ mergeManager1.applicationRemoved(app2Attempt2Id.toString, true)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 2
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ assert(!appShuffleInfo.containsKey(app2Attempt2Id.toString))
+ appShuffleInfo.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 2
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ assert(!appShuffleInfoAfterReload.containsKey(app2Attempt2Id.toString))
+ appShuffleInfoAfterReload.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+
+ s1.stop()
+ }
+
+ test("Finalized merged shuffle are written into DB and cleaned up after
application stopped") {
+ s1 = new YarnShuffleService
+ s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s1.init(yarnConfig)
+
+ val app1Id = ApplicationId.newInstance(0, 1)
+ val app1Data = makeAppInfo("user", app1Id)
+ s1.initializeApplication(app1Data)
+ val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+ val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+ s1.initializeApplication(app2Attempt1Data)
+
+ val mergeMgrFile = s1.mergeManagerFile
+ mergeMgrFile should not be (null)
+ val mergedShuffleInfo1 =
+ new ExecutorShuffleInfo(
+ Array(new File(tempDir, "foo/foo").getAbsolutePath,
+ new File(tempDir, "bar/bar").getAbsolutePath), 3,
+ SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo2Attempt1 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy1/bippy1").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+ val localDirs1 = Array(new File(tempDir,
"foo/merge_manager_1").getAbsolutePath,
+ new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+ val localDirs2Attempt1 = Array(new File(tempDir,
"bippy1/merge_manager_1").getAbsolutePath)
+ val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+ val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+
+ val mergeManager1 =
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+ val mergeManager1DB =
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+ ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+ ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+ ShuffleTestAccessor.reloadAppShuffleInfo(
+ mergeManager1, mergeManager1DB).size() equals 0
+
+ mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+ mergeManager1.registerExecutor(app2Attempt1Id.toString,
mergedShuffleInfo2Attempt1)
+ val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1)
+ val partitionId2 = new AppAttemptShuffleMergeId(app2Attempt1Id.toString,
1, 2, 1)
+ prepareAppShufflePartition(mergeManager1, partitionId1, 1, "3")
+ prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+ var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 2
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+
assert(!appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+ var appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 2
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfoAfterReload.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.isEmpty)
+
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.isEmpty)
+
+ ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+ ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+
assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+
assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.get(1).isFinalized)
+
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+ mergeManager1.applicationRemoved(app1Id.toString, true)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 1
+ assert(!appShuffleInfo.containsKey(app1Id.toString))
+
assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 1
+ assert(!appShuffleInfoAfterReload.containsKey(app1Id.toString))
+
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+ s1.stop()
+ }
+
+ test("Dangling finalized merged partition info in DB will be removed during
restart") {
+ s1 = new YarnShuffleService
+ s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s1._conf = yarnConfig
+ yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+ val transportConf = new TransportConf("shuffle", new
HadoopConfigProvider(yarnConfig))
+ val testShuffleMergeManager =
+
ShuffleTestAccessor.createMergeShuffleFileManagerForTestWithNoOpAppShuffleInfoDBCleanup(
+ transportConf,
+
s1.initRecoveryDb(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME))
+ s1.setShuffleMergeManager(testShuffleMergeManager)
+ s1.init(yarnConfig)
+
+ val app1Id = ApplicationId.newInstance(0, 1)
+ val app1Data = makeAppInfo("user", app1Id)
+ s1.initializeApplication(app1Data)
+ val app2Id = ApplicationId.newInstance(0, 2)
+ val app2Attempt1Data = makeAppInfo("user", app2Id)
+ s1.initializeApplication(app2Attempt1Data)
+
+ val mergeMgrFile = s1.mergeManagerFile
+ mergeMgrFile should not be (null)
+ val mergedShuffleInfo1 =
+ new ExecutorShuffleInfo(
+ Array(new File(tempDir, "foo/foo").getAbsolutePath,
+ new File(tempDir, "bar/bar").getAbsolutePath), 3,
+ SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo2Attempt1 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy1/bippy1").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+ val localDirs1 = Array(new File(tempDir,
"foo/merge_manager_1").getAbsolutePath,
+ new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+ val localDirs2Attempt1 = Array(new File(tempDir,
"bippy1/merge_manager_1").getAbsolutePath)
+ val localDirs2Attempt2 = Array(new File(tempDir,
"bippy2/merge_manager_2").getAbsolutePath)
+ val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+ val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+
+ val mergeManager1 =
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+ val mergeManager1DB =
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+ ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+ mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+ mergeManager1.registerExecutor(app2Id.toString, mergedShuffleInfo2Attempt1)
+ val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1)
+ val partitionId2 = new AppAttemptShuffleMergeId(app2Id.toString, 1, 2, 1)
+ prepareAppShufflePartition(mergeManager1, partitionId1, 1, "3")
+ prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+ var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 2
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+ assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+ assert(!appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+ ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+ ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+ assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+ var appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+
assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.get(1).isFinalized)
+
assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+ // The applicationRemove will not clean up the finalized merged shuffle
partition in DB
+ // as of the NoOp mergedShuffleFileManager removeAppShuffleInfoFromDB
method
+ mergeManager1.applicationRemoved(app1Id.toString, true)
+
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 1
+ assert(!appShuffleInfo.containsKey(app1Id.toString))
+ assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+ // Clear the AppsShuffleInfo hashmap and reload the hashmap from DB
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 1
+ assert(!appShuffleInfoAfterReload.containsKey(app1Id.toString))
+
assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+ // Register application app1Id again and reload the DB again
+ mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 2
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ assert(appShuffleInfo.get(app1Id.toString).getShuffles.isEmpty)
+ assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ // The merged partition information for App1 should be empty as they have
been removed from DB
+ assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.isEmpty)
+
assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+ s1.stop()
+ }
+
+ test("Dangling application attempt local path information in DB will be
removed during restart") {
+ s1 = new YarnShuffleService
+ s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s1._conf = yarnConfig
+ yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+ val transportConf = new TransportConf("shuffle", new
HadoopConfigProvider(yarnConfig))
+ val testShuffleMergeManager =
+ ShuffleTestAccessor.createMergeShuffleFileManagerForTestWithNoDBCleanup(
+ transportConf,
+
s1.initRecoveryDb(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME))
+ s1.setShuffleMergeManager(testShuffleMergeManager)
+ s1.init(yarnConfig)
+
+ val app1Id = ApplicationId.newInstance(0, 2)
+ val app1Attempt1Data = makeAppInfo("user", app1Id)
+ s1.initializeApplication(app1Attempt1Data)
+
+ val mergeMgrFile = s1.mergeManagerFile
+ mergeMgrFile should not be (null)
+
+ val mergedShuffleInfo1Attempt1 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy1/bippy1").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo1Attempt2 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy2/bippy2").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+
+ val localDirs1Attempt1 = Array(new File(tempDir,
"bippy1/merge_manager_1").getAbsolutePath)
+ val localDirs1Attempt2 = Array(new File(tempDir,
"bippy2/merge_manager_2").getAbsolutePath)
+ val appPathsInfo1Attempt1 = new AppPathsInfo(localDirs1Attempt1, 5)
+ val appPathsInfo1Attempt2 = new AppPathsInfo(localDirs1Attempt2, 5)
+
+ val mergeManager1 =
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+ ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+ mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1Attempt1)
+ val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 2, 1)
+ prepareAppShufflePartition(mergeManager1, partitionId1, 2, "4")
+
+ var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 1
+ appShuffleInfo.get(
+ app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt1)
+ assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+ ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+ // Register Attempt 2
+ mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1Attempt2)
+ val partitionId2 = new AppAttemptShuffleMergeId(app1Id.toString, 2, 2, 1)
+ prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 1
+ appShuffleInfo.get(
+ app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt2)
+ assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+ ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+ assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+ // now we pretend the shuffle service goes down, since the DB deletion are
NoOp,
+ // it should have multiple app attempt local paths info and finalized
merge info
+ s1.stop()
+ // Yarn Shuffle service comes back up without custom mergeManager
+ s2 = new YarnShuffleService
+ s2.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s2.init(yarnConfig)
+ s2.mergeManagerFile should be (mergeMgrFile)
+
+ val mergeManager2 =
s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager2)
+ appShuffleInfo.size() equals 1
+ appShuffleInfo.get(
+ app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt2)
+ assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+ s2.stop()
+ }
+
+ test("Cleanup for former attempts local path info should be triggered in
applicationRemoved") {
+ s1 = new YarnShuffleService
+ s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s1._conf = yarnConfig
+ yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+ val transportConf = new TransportConf("shuffle", new
HadoopConfigProvider(yarnConfig))
+ val testShuffleMergeManager =
+ ShuffleTestAccessor.createMergeShuffleFileManagerForTestWithNoDBCleanup(
+ transportConf,
+
s1.initRecoveryDb(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME))
+ s1.setShuffleMergeManager(testShuffleMergeManager)
+ s1.init(yarnConfig)
+
+ val app1Id = ApplicationId.newInstance(0, 1)
+ val app1Attempt1Data = makeAppInfo("user", app1Id)
+ s1.initializeApplication(app1Attempt1Data)
+
+ val mergeMgrFile = s1.mergeManagerFile
+ mergeMgrFile should not be (null)
+
+ val mergedShuffleInfo1Attempt1 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy1/bippy1").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo1Attempt2 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy2/bippy2").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+
+ val localDirs1Attempt1 = Array(new File(tempDir,
"bippy1/merge_manager_1").getAbsolutePath)
+ val localDirs1Attempt2 = Array(new File(tempDir,
"bippy2/merge_manager_2").getAbsolutePath)
+ val appPathsInfo1Attempt1 = new AppPathsInfo(localDirs1Attempt1, 5)
Review Comment:
Removed unused ones.
##########
resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala:
##########
@@ -380,6 +550,472 @@ class YarnShuffleServiceSuite extends SparkFunSuite with
Matchers {
}
}
+ test("Consistency in AppPathInfo between in-memory hashmap and the DB") {
+ s1 = new YarnShuffleService
+ s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s1.init(yarnConfig)
+
+ val app1Id = ApplicationId.newInstance(0, 1)
+ val app1Data = makeAppInfo("user", app1Id)
+ s1.initializeApplication(app1Data)
+ val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+ val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+ s1.initializeApplication(app2Attempt1Data)
+ val app2Attempt2Id = ApplicationId.newInstance(0, 2)
+ val app2Attempt2Data = makeAppInfo("user", app2Attempt2Id)
+ s1.initializeApplication(app2Attempt2Data)
+ val app3IdNoAttemptId = ApplicationId.newInstance(0, 3)
+ val app3NoAttemptIdData = makeAppInfo("user", app3IdNoAttemptId)
+ s1.initializeApplication(app3NoAttemptIdData)
+
+ val mergeMgrFile = s1.mergeManagerFile
+ mergeMgrFile should not be (null)
+ val mergedShuffleInfo1 =
+ new ExecutorShuffleInfo(
+ Array(new File(tempDir, "foo/foo").getAbsolutePath,
+ new File(tempDir, "bar/bar").getAbsolutePath), 3,
+ SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo2Attempt1 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy1/bippy1").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo2Attempt2 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy2/bippy2").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+ val mergedShuffleInfo3NoAttemptId =
+ new ExecutorShuffleInfo(
+ Array(new File(tempDir, "foo/foo").getAbsolutePath,
+ new File(tempDir, "bar/bar").getAbsolutePath),
+ 4, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID)
+
+ val localDirs1 = Array(new File(tempDir,
"foo/merge_manager_1").getAbsolutePath,
+ new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+ val localDirs2Attempt1 = Array(new File(tempDir,
"bippy1/merge_manager_1").getAbsolutePath)
+ val localDirs2Attempt2 = Array(new File(tempDir,
"bippy2/merge_manager_2").getAbsolutePath)
+ val localDirs3NoAttempt = Array(new File(tempDir,
"foo/merge_manager").getAbsolutePath,
+ new File(tempDir, "bar/merge_manager").getAbsolutePath)
+ val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+ val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+ val appPathsInfo2Attempt2 = new AppPathsInfo(localDirs2Attempt2, 5)
+ val appPathsInfo3NoAttempt = new AppPathsInfo(localDirs3NoAttempt, 4)
+
+ val mergeManager1 =
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+ val mergeManager1DB =
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+ ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+ ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+ ShuffleTestAccessor.reloadAppShuffleInfo(
+ mergeManager1, mergeManager1DB).size() equals 0
+
+ mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+ var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 1
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ var appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 1
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+
+ mergeManager1.registerExecutor(app2Attempt1Id.toString,
mergedShuffleInfo2Attempt1)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 2
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 2
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfoAfterReload.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+
+ mergeManager1.registerExecutor(app3IdNoAttemptId.toString,
mergedShuffleInfo3NoAttemptId)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 3
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ appShuffleInfo.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 3
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfoAfterReload.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ appShuffleInfoAfterReload.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+
+ mergeManager1.registerExecutor(app2Attempt2Id.toString,
mergedShuffleInfo2Attempt2)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 3
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Attempt2Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt2)
+ appShuffleInfo.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 3
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfoAfterReload.get(
+ app2Attempt2Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt2)
+ appShuffleInfoAfterReload.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+
+ mergeManager1.applicationRemoved(app2Attempt2Id.toString, true)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 2
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ assert(!appShuffleInfo.containsKey(app2Attempt2Id.toString))
+ appShuffleInfo.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 2
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ assert(!appShuffleInfoAfterReload.containsKey(app2Attempt2Id.toString))
+ appShuffleInfoAfterReload.get(
+ app3IdNoAttemptId.toString).getAppPathsInfo should be
(appPathsInfo3NoAttempt)
+
+ s1.stop()
+ }
+
+ test("Finalized merged shuffle are written into DB and cleaned up after
application stopped") {
+ s1 = new YarnShuffleService
+ s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s1.init(yarnConfig)
+
+ val app1Id = ApplicationId.newInstance(0, 1)
+ val app1Data = makeAppInfo("user", app1Id)
+ s1.initializeApplication(app1Data)
+ val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+ val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+ s1.initializeApplication(app2Attempt1Data)
+
+ val mergeMgrFile = s1.mergeManagerFile
+ mergeMgrFile should not be (null)
+ val mergedShuffleInfo1 =
+ new ExecutorShuffleInfo(
+ Array(new File(tempDir, "foo/foo").getAbsolutePath,
+ new File(tempDir, "bar/bar").getAbsolutePath), 3,
+ SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+ val mergedShuffleInfo2Attempt1 =
+ new ExecutorShuffleInfo(Array(new File(tempDir,
"bippy1/bippy1").getAbsolutePath),
+ 5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+ val localDirs1 = Array(new File(tempDir,
"foo/merge_manager_1").getAbsolutePath,
+ new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+ val localDirs2Attempt1 = Array(new File(tempDir,
"bippy1/merge_manager_1").getAbsolutePath)
+ val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+ val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+
+ val mergeManager1 =
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+ val mergeManager1DB =
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+ ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+ ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+ ShuffleTestAccessor.reloadAppShuffleInfo(
+ mergeManager1, mergeManager1DB).size() equals 0
+
+ mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+ mergeManager1.registerExecutor(app2Attempt1Id.toString,
mergedShuffleInfo2Attempt1)
+ val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1)
+ val partitionId2 = new AppAttemptShuffleMergeId(app2Attempt1Id.toString,
1, 2, 1)
+ prepareAppShufflePartition(mergeManager1, partitionId1, 1, "3")
+ prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+ var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 2
+ appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfo.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+
assert(!appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+ var appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 2
+ appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be
(appPathsInfo1)
+ appShuffleInfoAfterReload.get(
+ app2Attempt1Id.toString).getAppPathsInfo should be
(appPathsInfo2Attempt1)
+ assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.isEmpty)
+
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.isEmpty)
+
+ ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+ ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+
assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+
assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.get(1).isFinalized)
+
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+ mergeManager1.applicationRemoved(app1Id.toString, true)
+ appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+ appShuffleInfo.size() equals 1
+ assert(!appShuffleInfo.containsKey(app1Id.toString))
+
assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+ appShuffleInfoAfterReload =
+ ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+ appShuffleInfoAfterReload.size() equals 1
+ assert(!appShuffleInfoAfterReload.containsKey(app1Id.toString))
+
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+ s1.stop()
+ }
+
+ test("Dangling finalized merged partition info in DB will be removed during
restart") {
+ s1 = new YarnShuffleService
+ s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+ s1._conf = yarnConfig
+ yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+ val transportConf = new TransportConf("shuffle", new
HadoopConfigProvider(yarnConfig))
+ val testShuffleMergeManager =
+
ShuffleTestAccessor.createMergeShuffleFileManagerForTestWithNoOpAppShuffleInfoDBCleanup(
Review Comment:
Created a help method and refactored the code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]