Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/4215#discussion_r23878550
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -431,6 +458,155 @@ object SparkSubmit {
}
}
+/** Provides utility functions to be used inside SparkSubmit. */
+private[spark] object SparkSubmitUtils extends Logging {
+
+ // Directories for caching downloads through ivy and storing the jars
when maven coordinates are
+ // supplied to spark-submit
+ private var PACKAGES_DIRECTORY: File = null
+
+ /**
+ * Represents a Maven Coordinate
+ * @param groupId the groupId of the coordinate
+ * @param artifactId the artifactId of the coordinate
+ * @param version the version of the coordinate
+ */
+ private[spark] case class MavenCoordinate(groupId: String, artifactId:
String, version: String)
+
+ /**
+ * Resolves any dependencies that were supplied through maven coordinates
+ * @param coordinates Comma-delimited string of maven coordinates
+ * @param remoteRepos Comma-delimited string of remote repositories
other than maven central
+ * @param ivyPath The path to the local ivy repository
+ * @return The comma-delimited path to the jars of the given maven
artifacts including their
+ * transitive dependencies
+ */
+ private[spark] def resolveMavenCoordinates(
+ coordinates: String,
+ remoteRepos: String,
+ ivyPath: String,
+ isTest: Boolean = false): String = {
+ if (coordinates == null || coordinates.trim.isEmpty) {
+ ""
+ } else {
+ val artifacts = coordinates.split(",").map { p =>
+ val splits = p.split(":")
+ require(splits.length == 3, s"Provided Maven Coordinates must be
in the form " +
+ s"'groupId:artifactId:version'. The coordinate provided is: $p")
+ require(splits(0) != null && splits(0).trim.nonEmpty, s"The
groupId cannot be null or " +
+ s"be whitespace. The groupId provided is: ${splits(0)}")
+ require(splits(1) != null && splits(1).trim.nonEmpty, s"The
artifactId cannot be null or " +
+ s"be whitespace. The artifactId provided is: ${splits(1)}")
+ require(splits(2) != null && splits(2).trim.nonEmpty, s"The
version cannot be null or " +
+ s"be whitespace. The version provided is: ${splits(2)}")
+ new MavenCoordinate(splits(0), splits(1), splits(2))
+ }
+ // Default configuration name for ivy
+ val conf = "default"
+ // set ivy settings for location of cache
+ val ivySettings: IvySettings = new IvySettings
+ if (ivyPath == null || ivyPath.trim.isEmpty) {
+ PACKAGES_DIRECTORY = new File(ivySettings.getDefaultIvyUserDir,
"jars")
+ } else {
+ ivySettings.setDefaultCache(new File(ivyPath, "cache"))
+ PACKAGES_DIRECTORY = new File(ivyPath, "jars")
+ }
+ logInfo(s"Ivy Default Cache set to:
${ivySettings.getDefaultCache.getAbsolutePath}")
+ logInfo(s"The jars for the packages stored in: $PACKAGES_DIRECTORY")
+
+ // create a pattern matcher
+ ivySettings.addMatcher(new GlobPatternMatcher)
+
+ // the biblio resolver resolves POM declared dependencies
+ val br: IBiblioResolver = new IBiblioResolver
+ br.setM2compatible(true)
+ br.setUsepoms(true)
+ br.setName("central")
+
+ // We need a chain resolver if we want to check multiple repositories
+ val cr = new ChainResolver
+ cr.setName("list")
+ cr.add(br)
+
+ // Add an exclusion rule for Spark
+ val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark",
"*"), "*", "*", "*")
+ val sparkDependencyExcludeRule =
+ new DefaultExcludeRule(sparkArtifacts,
ivySettings.getMatcher("glob"), null)
+ sparkDependencyExcludeRule.addConfiguration(conf)
+
+ // add any other remote repositories other than maven central
+ if (remoteRepos != null && remoteRepos.trim.nonEmpty) {
+ var i = 1
+ remoteRepos.split(",").foreach { repo =>
+ val brr: IBiblioResolver = new IBiblioResolver
+ brr.setM2compatible(true)
+ brr.setUsepoms(true)
+ brr.setRoot(repo)
+ brr.setName(s"repo-$i")
+ cr.add(brr)
+ logInfo(s"$repo added as a remote repository with the name:
${brr.getName}")
+ i += 1
+ }
+ }
+ ivySettings.addResolver(cr)
+ ivySettings.setDefaultResolver(cr.getName)
+ val ivy = Ivy.newInstance(ivySettings)
+ // Set resolve options to download transitive dependencies as well
+ val resolveOptions = new ResolveOptions
+ resolveOptions.setTransitive(true)
+ val retrieveOptions = new RetrieveOptions
+ // Turn downloading and logging off for testing
+ if (isTest) {
+ resolveOptions.setDownload(false)
+ resolveOptions.setLog(LogOptions.LOG_QUIET)
+ retrieveOptions.setLog(LogOptions.LOG_QUIET)
+ } else {
+ resolveOptions.setDownload(true)
+ }
+
+ // A Module descriptor must be specified. Entries are dummy strings
+ val md = DefaultModuleDescriptor.newDefaultInstance(
+ ModuleRevisionId.newInstance("org.apache.spark",
"spark-submit-envelope", "1.0"))
+ md.setDefaultConf(conf)
+
+ md.addExcludeRule(sparkDependencyExcludeRule)
+
+ artifacts.foreach { mvn =>
+ val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId,
mvn.version)
+ val dd = new DefaultDependencyDescriptor(ri, false, false)
+ dd.addDependencyConfiguration(conf, conf)
+ logInfo(s"${dd.getDependencyId} added as a dependency")
+ md.addDependency(dd)
+ }
+
+ // resolve dependencies
+ val rr: ResolveReport = ivy.resolve(md, resolveOptions)
+ if (rr.hasError) {
+ throw new RuntimeException(rr.getAllProblemMessages.toString)
+ }
+ // Log the callers for each dependency
+ rr.getDependencies.toArray.foreach { case dependency: IvyNode =>
+ logInfo(s"$dependency will be retrieved as a dependency for:")
--- End diff --
This `logInfo` call and the following call might have other logging
statements interleaved, so I think it would be preferable to build up a single
multi-line log string rather than making multiple calls here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]