asfgit closed pull request #23317: [SPARK-26368][SQL] Make it clear that
getOrInferFileFormatSchema doesn't create InMemoryFileIndex
URL: https://github.com/apache/spark/pull/23317
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 795a6d0b6b040..fefff68c4ba8b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -122,21 +122,14 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
- * @param fileIndex optional [[InMemoryFileIndex]] for getting partition
schema and file list
+ * @param getFileIndex [[InMemoryFileIndex]] for getting partition schema
and file list
* @return A pair of the data schema (excluding partition columns) and the
schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
- fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) =
{
- // The operations below are expensive therefore try not to do them if we
don't need to, e.g.,
- // in streaming mode, we have already inferred and registered partition
columns, we will
- // never have to materialize the lazy val below
- lazy val tempFileIndex = fileIndex.getOrElse {
- val globbedPaths =
- checkAndGlobPathIfNecessary(checkEmptyGlobPath = false,
checkFilesExist = false)
- createInMemoryFileIndex(globbedPaths)
- }
+ getFileIndex: () => InMemoryFileIndex): (StructType, StructType) = {
+ lazy val tempFileIndex = getFileIndex()
val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path
provides the partitioning
@@ -236,7 +229,15 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory
with " +
"'spark.read.load(directory)' and infer schema from it.")
}
- val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
+
+ val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format,
() => {
+ // The operations below are expensive therefore try not to do them
if we don't need to,
+ // e.g., in streaming mode, we have already inferred and registered
partition columns,
+ // we will never have to materialize the lazy val below
+ val globbedPaths =
+ checkAndGlobPathIfNecessary(checkEmptyGlobPath = false,
checkFilesExist = false)
+ createInMemoryFileIndex(globbedPaths)
+ })
SourceInfo(
s"FileSource[$path]",
StructType(dataSchema ++ partitionSchema),
@@ -370,7 +371,7 @@ case class DataSource(
} else {
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
- getOrInferFileFormatSchema(format, Some(index))
+ getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]