Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/17702#discussion_r156259778
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
---
@@ -668,4 +672,31 @@ object DataSource extends Logging {
}
globPath
}
+
+ /**
+ * Return all paths represented by the wildcard string.
+ * Follow [[InMemoryFileIndex]].bulkListLeafFile and reuse the conf.
+ */
+ private def getGlobbedPaths(
+ sparkSession: SparkSession,
+ fs: FileSystem,
+ hadoopConf: SerializableConfiguration,
+ qualified: Path): Seq[Path] = {
+ val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified)
+ if (paths.size <=
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+ } else {
+ val parallelPartitionDiscoveryParallelism =
+
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
+ val numParallelism = Math.min(paths.size,
parallelPartitionDiscoveryParallelism)
+ val expanded = sparkSession.sparkContext
--- End diff --
NMs are part of YARN, not HDFS.
This code will talk to HDFS's NameNodes; there is generally only one of
those you'll be talking to. Latency here is not an issue, throughput is, so I
still don't see why this shouldn't be done with a local thread pool.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]