rdblue commented on a change in pull request #26297: [SPARK-29665][SQL] refine 
the TableProvider interface
URL: https://github.com/apache/spark/pull/26297#discussion_r341775787
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 ##########
 @@ -41,22 +50,117 @@ trait FileDataSourceV2 extends TableProvider with 
DataSourceRegister {
 
   lazy val sparkSession = SparkSession.active
 
-  protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
+  private def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
     val objectMapper = new ObjectMapper()
     val paths = Option(map.get("paths")).map { pathStr =>
       objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
     }.getOrElse(Seq.empty)
     paths ++ Option(map.get("path")).toSeq
   }
 
-  protected def getTableName(paths: Seq[String]): String = {
-    val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",")
-    Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    // If we need to infer schema, this must be the first time to create file 
index.
+    assert(fileIndex.isEmpty)
+    val caseSensitiveMap = options.asCaseSensitiveMap().asScala.toMap
+    val paths = getPaths(options)
+    fileIndex = Some(createFileIndex(None, paths, caseSensitiveMap))
+
+    val scalaInsensitiveMap = CaseInsensitiveMap(caseSensitiveMap)
+    val dataSchema = inferDataSchema(fileIndex.get.allFiles(), 
scalaInsensitiveMap).getOrElse {
+      throw new AnalysisException(
+        s"Unable to infer schema for $shortName. It must be specified 
manually.")
+    }
+    val partitionSchema = fileIndex.get.partitionSchema
+
+    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    val partitionNameSet: Set[String] =
+      partitionSchema.fields.map(PartitioningUtils.getColName(_, 
caseSensitive)).toSet
+    // When data and partition schemas have overlapping columns,
+    // tableSchema = dataSchema - overlapSchema + partitionSchema
+    val fields = dataSchema.fields.filterNot { field =>
+      val colName = PartitioningUtils.getColName(field, caseSensitive)
+      partitionNameSet.contains(colName)
+    } ++ partitionSchema.fields
+    StructType(fields)
+  }
+
+  override def inferPartitioning(
 
 Review comment:
   This is the only one I see that uses schema, and that is just to create the 
file index.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to