This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f6a044c [SPARK-37239][YARN][TESTS][FOLLOWUP] Add UT to cover `Client.prepareLocalResources` with custom `STAGING_FILE_REPLICATION` f6a044c is described below commit f6a044cf8cd83e6b3b30e515acbac0ec81607463 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Nov 9 08:07:38 2021 -0800 [SPARK-37239][YARN][TESTS][FOLLOWUP] Add UT to cover `Client.prepareLocalResources` with custom `STAGING_FILE_REPLICATION` ### What changes were proposed in this pull request? This pr add a new UT to cover `o.a.s.deploy.yarn.Client.prepareLocalResources` method with custom `STAGING_FILE_REPLICATION` configuration and change other related UTs to verify that the `replication` passed into the `copyFileToRemote` method is `None` explicitly. ### Why are the changes needed? Add new UT. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #34531 from LuciferYang/SPARK-37239-followup. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 35 +++++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 58e49c9..a8815dc 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -132,7 +132,7 @@ class ClientSuite extends SparkFunSuite with Matchers { .set("spark.yarn.dist.jars", ADDED) val client = createClient(sparkConf, args = Array("--jar", USER)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), any(), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + any(classOf[Path]), meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) val tempDir = Utils.createTempDir() try { @@ -308,12 +308,12 @@ class ClientSuite extends SparkFunSuite with Matchers { assert(sparkConf.get(SPARK_JARS) === Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), any(), - any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), any(), - any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), any(), - any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), + meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), + meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), + meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) val cp = classpath(client) cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) @@ -330,7 +330,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val client = createClient(sparkConf) client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), any(), + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) @@ -340,6 +340,25 @@ class ClientSuite extends SparkFunSuite with Matchers { } } + test("SPARK-37239: distribute jars archive with set STAGING_FILE_REPLICATION") { + val temp = Utils.createTempDir() + val archive = TestUtils.createJarWithFiles(Map(), temp) + val replication = 5 + + val sparkConf = new SparkConf() + .set(SPARK_ARCHIVE, archive.getPath()) + .set(STAGING_FILE_REPLICATION, replication) + val client = createClient(sparkConf) + client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) + + // It is difficult to assert the result of `setReplication` in UT because this method in + // `RawLocalFileSystem` always return true and not change the value of `replication`. + // So we can only assert the call of `client.copyFileToRemote` has passed in a non `None`. + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), + meq(Some(replication.toShort)), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) + classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) + } + test("distribute archive multiple times") { val libs = Utils.createTempDir() // Create jars dir and RELEASE file to avoid IllegalStateException. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org