Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/15951#discussion_r89030586
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
---
@@ -84,30 +84,95 @@ case class DataSource(
private val caseInsensitiveOptions = new CaseInsensitiveMap(options)
/**
- * Infer the schema of the given FileFormat, returns a pair of schema
and partition column names.
+ * Get the schema of the given FileFormat, if provided by
`userSpecifiedSchema`, or try to infer
+ * it. In the read path, only Hive managed tables provide the partition
columns properly when
+ * initializing this class. All other file based data sources will try
to infer the partitioning,
+ * and then cast the inferred types to user specified dataTypes if the
partition columns exist
+ * inside `userSpecifiedSchema`, otherwise we can hit data corruption
bugs like SPARK-18510.
+ * This method will try to skip file scanning whether
`userSpecifiedSchema` and
+ * `partitionColumns` are provided. Here are some code paths that use
this method:
+ * 1. `spark.read` (no schema): Most amount of work. Infer both schema
and partitioning columns
+ * 2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning
columns, cast them to the
+ * dataTypes provided in `userSpecifiedSchema` if they exist or
fallback to inferred
+ * dataType if they don't.
+ * 3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming
use cases, users have to
+ * provide the schema. Here, we also perform partition inference
like 2, and try to use
+ * dataTypes in `userSpecifiedSchema`. All subsequent triggers for
this stream will re-use
+ * this information, therefore calls to this method should be very
cheap, i.e. there won't
+ * be any further inference in any triggers.
+ * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this
method to resolve the
+ * existing table's partitioning scheme. This is achieved by not
providing
+ * `userSpecifiedSchema`. For this case, we add the boolean
`justPartitioning` for an early
+ * exit, if we don't care about the schema of the original table.
+ *
+ * Returns a pair of the data schema (excluding partition columns) and
the schema of the partition
+ * columns.
+ *
+ * @param justPartitioning Whether to exit early and provide just the
schema partitioning. The
+ * data schema is incorrect in this case and
should not be used.
*/
- private def inferFileFormatSchema(format: FileFormat): (StructType,
Seq[String]) = {
- userSpecifiedSchema.map(_ -> partitionColumns).orElse {
- val allPaths = caseInsensitiveOptions.get("path")
+ private def getOrInferFileFormatSchema(
+ format: FileFormat,
+ justPartitioning: Boolean = false): (StructType, StructType) = {
+ // the operations below are expensive therefore try not to do them if
we don't need to
+ lazy val tempFileCatalog = {
+ val allPaths = caseInsensitiveOptions.get("path") ++ paths
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
val fs =
hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val qualified = hdfsPath.makeQualified(fs.getUri,
fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
- val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths,
options, None)
- val partitionSchema = fileCatalog.partitionSpec().partitionColumns
- val inferred = format.inferSchema(
+ new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
+ }
+ val partitionSchema = if (partitionColumns.isEmpty &&
catalogTable.isEmpty) {
+ // Try to infer partitioning, because no DataSource in the read path
provides the partitioning
+ // columns properly unless it is a Hive DataSource
+ val resolved = tempFileCatalog.partitionSchema.map { partitionField
=>
+ val equality = sparkSession.sessionState.conf.resolver
+ // SPARK-18510: try to get schema from userSpecifiedSchema,
otherwise fallback to inferred
+ userSpecifiedSchema.flatMap(_.find(f => equality(f.name,
partitionField.name))).getOrElse(
+ partitionField)
+ }
+ StructType(resolved)
+ } else {
+ // in streaming mode, we have already inferred and registered
partition columns, we will
+ // never have to re-use this
+ lazy val inferredPartitions = tempFileCatalog.partitionSchema
--- End diff --
Since everything is already defined in `userSpecifiedSchema` once we create
the `FileStreamSource`, we will never have to materialize this variable,
because we will not use it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]