cloud-fan commented on a change in pull request #23383: [SPARK-23817][SQL]
Migrate ORC file format read path to data source V2
URL: https://github.com/apache/spark/pull/23383#discussion_r246637679
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
##########
@@ -539,6 +540,72 @@ object PartitioningUtils {
}).asNullable
}
+ def partitionColumns(
+ output: Seq[AttributeReference],
+ partitionSchema: StructType,
+ caseSensitive: Boolean): Seq[AttributeReference] = {
+ val equality = columnNameEquality(caseSensitive)
+ partitionSchema.fields.map { col =>
+ output.find(a => equality(a.name, col.name)).getOrElse {
+ throw new AnalysisException(s"Partition column `$col` not found in
$output")
+ }
+ }
+ }
+
+ def mergeDataAndPartitionSchema(
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ caseSensitive: Boolean): (StructType, Map[String, StructField]) = {
+ val overlappedPartCols = mutable.Map.empty[String, StructField]
+ partitionSchema.foreach { partitionField =>
+ val partitionFieldName = getColName(partitionField, caseSensitive)
+ if (dataSchema.exists(getColName(_, caseSensitive) ==
partitionFieldName)) {
+ overlappedPartCols += partitionFieldName -> partitionField
+ }
+ }
+
+ // When data and partition schemas have overlapping columns, the output
+ // schema respects the order of the data schema for the overlapping
columns, and it
+ // respects the data types of the partition schema.
+ val fullSchema =
+ StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f,
caseSensitive), f)) ++
+ partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f,
caseSensitive))))
+ (fullSchema, overlappedPartCols.toMap)
+ }
+
+ def requestedPartitionColumnIds(
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ caseSensitive: Boolean): Array[Int] = {
+ val columnNameMap =
+ partitionSchema.fields.map(getColName(_,
caseSensitive)).zipWithIndex.toMap
+ requiredSchema.fields.map { field =>
+ columnNameMap.getOrElse(getColName(field, caseSensitive), -1)
+ }
+ }
+
+ /**
+ * Returns a new StructType that is a copy of the original StructType,
removing any items that
+ * also appear in other StructType. The order is preserved from the original
StructType.
+ */
+ def subtractSchema(original: StructType, other: StructType, isCaseSensitive:
Boolean)
Review comment:
ditto
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]