Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/4308#discussion_r23921444
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
@@ -227,66 +294,302 @@ case class ParquetRelation2(path: String)(@transient
val sqlContext: SQLContext)
val cacheMetadata = useCache
@transient
- val cachedStatus = selectedPartitions.flatMap(_.files)
+ val cachedStatus = selectedFiles
// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
- val inputFormat =
- if (cacheMetadata) {
- new FilteringParquetRowInputFormat {
- override def listStatus(jobContext: JobContext):
JList[FileStatus] = cachedStatus
- }
- } else {
- new FilteringParquetRowInputFormat
+ val inputFormat = if (cacheMetadata) {
+ new FilteringParquetRowInputFormat {
+ override def listStatus(jobContext: JobContext):
JList[FileStatus] = cachedStatus
}
-
- inputFormat match {
- case configurable: Configurable =>
- configurable.setConf(getConf)
- case _ =>
+ } else {
+ new FilteringParquetRowInputFormat
}
+
val jobContext = newJobContext(getConf, jobId)
- val rawSplits = inputFormat.getSplits(jobContext).toArray
- val result = new Array[SparkPartition](rawSplits.size)
- for (i <- 0 until rawSplits.size) {
- result(i) =
- new NewHadoopPartition(id, i,
rawSplits(i).asInstanceOf[InputSplit with Writable])
+ val rawSplits = inputFormat.getSplits(jobContext)
+
+ Array.tabulate[SparkPartition](rawSplits.size) { i =>
+ new NewHadoopPartition(id, i,
rawSplits(i).asInstanceOf[InputSplit with Writable])
}
- result
}
}
- // The ordinal for the partition key in the result row, if requested.
- val partitionKeyLocation =
- partitionKeys
- .headOption
- .map(requiredColumns.indexOf(_))
- .getOrElse(-1)
+ // The ordinals for partition keys in the result row, if requested.
+ val partitionKeyLocations =
partitionColumns.fieldNames.zipWithIndex.map {
+ case (name, index) => index -> requiredColumns.indexOf(name)
+ }.toMap.filter {
+ case (_, index) => index >= 0
+ }
// When the data does not include the key and the key is requested
then we must fill it in
// based on information from the input split.
- if (!dataIncludesKey && partitionKeyLocation != -1) {
- baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
- val partValue = "([^=]+)=([^=]+)".r
- val partValues =
- split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
- .getPath
- .toString
- .split("/")
- .flatMap {
- case partValue(key, value) => Some(key -> value)
- case _ => None
- }.toMap
-
- val currentValue = partValues.values.head.toInt
- iter.map { pair =>
- val res = pair._2.asInstanceOf[SpecificMutableRow]
- res.setInt(partitionKeyLocation, currentValue)
- res
+ if (!dataSchemaIncludesPartitionKeys &&
partitionKeyLocations.nonEmpty) {
+ baseRDD.mapPartitionsWithInputSplit { case (split:
ParquetInputSplit, iterator) =>
+ val partValues = selectedPartitions.collectFirst {
+ case p if split.getPath.getParent.toString == p.path => p.values
+ }.get
+
+ iterator.map { pair =>
+ val row = pair._2.asInstanceOf[SpecificMutableRow]
+ var i = 0
+ while (i < partValues.size) {
+ // TODO Avoids boxing cost here!
+ row.update(partitionKeyLocations(i), partValues(i))
+ i += 1
+ }
+ row
}
}
} else {
baseRDD.map(_._2)
}
}
+
+ private def prunePartitions(
+ predicates: Seq[Expression],
+ partitions: Seq[Partition]): Seq[Partition] = {
+ val partitionColumnNames = partitionColumns.map(_.name).toSet
+ val partitionPruningPredicates = predicates.filter {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+
+ val rawPredicate =
partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
+ val boundPredicate = InterpretedPredicate(rawPredicate transform {
+ case a: AttributeReference =>
+ val index = partitionColumns.indexWhere(a.name == _.name)
+ BoundReference(index, partitionColumns(index).dataType, nullable =
true)
+ })
+
+ if (isPartitioned && partitionPruningPredicates.nonEmpty) {
+ partitions.filter(p => boundPredicate(p.values))
+ } else {
+ partitions
+ }
+ }
+}
+
+object ParquetRelation2 {
+ // Whether we should merge schemas collected from all Parquet part-files.
+ val MERGE_SCHEMA = "parquet.mergeSchema"
+
+ // Hive Metastore schema, passed in when the Parquet relation is
converted from Metastore
+ val METASTORE_SCHEMA = "parquet.metastoreSchema"
+
+ // Default partition name to use when the partition column value is null
or empty string
+ val DEFAULT_PARTITION_NAME = "partition.defaultName"
+
+ private[parquet] def readSchema(footers: Seq[Footer], sqlContext:
SQLContext): StructType = {
+ footers.map { footer =>
+ val metadata = footer.getParquetMetadata.getFileMetaData
+ val parquetSchema = metadata.getSchema
+ val maybeSparkSchema = metadata
+ .getKeyValueMetaData
+ .toMap
+ .get(RowReadSupport.SPARK_METADATA_KEY)
+ .map(DataType.fromJson(_).asInstanceOf[StructType])
+
+ maybeSparkSchema.getOrElse {
+ // Falls back to Parquet schema if Spark SQL schema is absent.
+ StructType.fromAttributes(
+ // TODO Really no need to use `Attribute` here, we only need to
know the data type.
+ convertToAttributes(parquetSchema,
sqlContext.conf.isParquetBinaryAsString))
+ }
+ }.reduce { (left, right) =>
+ try mergeCatalystSchemas(left, right) catch { case e: Throwable =>
+ throw new SparkException(s"Failed to merge incompatible schemas
$left and $right", e)
+ }
+ }
+ }
+
+ private[parquet] def mergeMetastoreParquetSchema(
+ metastoreSchema: StructType,
+ parquetSchema: StructType): StructType = {
+ def schemaConflictMessage =
+ s"""Converting Hive Metastore Parquet, but detected conflicting
schemas. Metastore schema:
+ |${metastoreSchema.prettyJson}
+ |
+ |Parquet schema:
+ |${parquetSchema.prettyJson}
+ """.stripMargin
+
+ assert(metastoreSchema.size == parquetSchema.size,
schemaConflictMessage)
+
+ val ordinalMap = metastoreSchema.zipWithIndex.map {
+ case (field, index) => field.name.toLowerCase -> index
+ }.toMap
+ val reorderedParquetSchema = parquetSchema.sortBy(f =>
ordinalMap(f.name.toLowerCase))
+
+ StructType(metastoreSchema.zip(reorderedParquetSchema).map {
+ // Uses Parquet field names but retains Metastore data types.
+ case (mSchema, pSchema) if mSchema.name.toLowerCase ==
pSchema.name.toLowerCase =>
+ mSchema.copy(name = pSchema.name)
+ case _ =>
+ throw new SparkException(schemaConflictMessage)
+ })
+ }
+
+ // TODO Data source implementations shouldn't touch Catalyst types
(`Literal`).
+ // However, we are already using Catalyst expressions for partition
pruning and predicate
+ // push-down here...
+ private[parquet] case class PartitionValues(columnNames: Seq[String],
literals: Seq[Literal]) {
+ require(columnNames.size == literals.size)
+ }
+
+ /**
+ * Given a group of qualified paths, tries to parse them and returns a
partition specification.
+ * For example, given:
+ * {{{
+ * hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
+ * hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
+ * }}}
+ * it returns:
+ * {{{
+ * PartitionSpec(
+ * partitionColumns = StructType(
+ * StructField(name = "a", dataType = IntegerType, nullable =
true),
+ * StructField(name = "b", dataType = StringType, nullable = true),
+ * StructField(name = "c", dataType = DoubleType, nullable =
true)),
+ * partitions = Seq(
+ * Partition(
+ * values = Row(1, "hello", 3.14),
+ * path =
"hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
+ * Partition(
+ * values = Row(2, "world", 6.28),
+ * path =
"hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
+ * }}}
+ */
+ private[parquet] def parsePartitions(
+ paths: Seq[Path],
+ defaultPartitionName: String): PartitionSpec = {
+ val partitionValues = resolvePartitions(paths.map(parsePartition(_,
defaultPartitionName)))
+ val fields = {
+ val (PartitionValues(columnNames, literals)) = partitionValues.head
+ columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
+ StructField(name, dataType, nullable = true)
+ }
+ }
+
+ val partitions = partitionValues.zip(paths).map {
+ case (PartitionValues(_, literals), path) =>
+ Partition(Row(literals.map(_.value): _*), path.toString)
+ }
+
+ PartitionSpec(StructType(fields), partitions)
+ }
+
+ /**
+ * Parses a single partition, returns column names and values of each
partition column. For
+ * example, given:
+ * {{{
+ * basePath = hdfs://<host>:<port>/path/to/partition
+ * partitionPath =
hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
--- End diff --
Fix outdated comments.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]