Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15951#discussion_r88984227
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
    @@ -84,30 +84,90 @@ case class DataSource(
       private val caseInsensitiveOptions = new CaseInsensitiveMap(options)
     
       /**
    -   * Infer the schema of the given FileFormat, returns a pair of schema 
and partition column names.
    +   * Get the schema of the given FileFormat, if provided by 
`userSpecifiedSchema`, or try to infer
    +   * it. In the read path, only Hive managed tables provide the partition 
columns properly when
    +   * initializing this class. All other file based data sources will try 
to infer the partitioning,
    +   * and then cast the inferred types to user specified dataTypes if the 
partition columns exist
    +   * inside `userSpecifiedSchema`, otherwise we can hit data corruption 
bugs like SPARK-18510.
    +   * This method will try to do the least amount of work given whether 
`userSpecifiedSchema` and
    +   * `partitionColumns` are provided. Here are some code paths that use 
this method:
    +   *   1. `spark.read` (no schema): Most amount of work. Infer both schema 
and partitioning columns
    +   *   2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning 
columns, cast them to the
    +   *     dataTypes provided in `userSpecifiedSchema` if they exist or 
fallback to inferred
    +   *     dataType if they don't.
    +   *   3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming 
use cases, users have to
    +   *     provide the schema. Here, we also perform partition inference 
like 2, and try to use
    +   *     dataTypes in `userSpecifiedSchema`. All subsequent triggers for 
this stream will re-use
    +   *     this information, therefore calls to this method should be very 
cheap, i.e. there won't
    +   *     be any further inference in any triggers.
    +   *   4. `df.saveAsTable(tableThatExisted)`: In this case, we call this 
method to resolve the
    +   *     existing table's partitioning scheme. This is achieved by not 
providing
    +   *     `userSpecifiedSchema`. For this case, we add the boolean 
`justPartitioning` for an early
    +   *     exit, if we don't care about the schema of the original table.
    +   *
    +   * Returns a pair of schema and partition column names.
        */
    -  private def inferFileFormatSchema(format: FileFormat): (StructType, 
Seq[String]) = {
    -    userSpecifiedSchema.map(_ -> partitionColumns).orElse {
    -      val allPaths = caseInsensitiveOptions.get("path")
    +  private def getOrInferFileFormatSchema(
    --- End diff --
    
    can you add param docs to define what `justPartitioning` means?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to