Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921551
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -227,66 +294,302 @@ case class ParquetRelation2(path: String)(@transient 
val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): 
JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): 
JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = 
partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested 
then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && 
partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: 
ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = 
partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = 
true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "parquet.mergeSchema"
    +
    +  // Hive Metastore schema, passed in when the Parquet relation is 
converted from Metastore
    +  val METASTORE_SCHEMA = "parquet.metastoreSchema"
    +
    +  // Default partition name to use when the partition column value is null 
or empty string
    +  val DEFAULT_PARTITION_NAME = "partition.defaultName"
    +
    +  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: 
SQLContext): StructType = {
    +    footers.map { footer =>
    +      val metadata = footer.getParquetMetadata.getFileMetaData
    +      val parquetSchema = metadata.getSchema
    +      val maybeSparkSchema = metadata
    +        .getKeyValueMetaData
    +        .toMap
    +        .get(RowReadSupport.SPARK_METADATA_KEY)
    +        .map(DataType.fromJson(_).asInstanceOf[StructType])
    +
    +      maybeSparkSchema.getOrElse {
    +        // Falls back to Parquet schema if Spark SQL schema is absent.
    +        StructType.fromAttributes(
    +          // TODO Really no need to use `Attribute` here, we only need to 
know the data type.
    +          convertToAttributes(parquetSchema, 
sqlContext.conf.isParquetBinaryAsString))
    +      }
    +    }.reduce { (left, right) =>
    +      try mergeCatalystSchemas(left, right) catch { case e: Throwable =>
    +        throw new SparkException(s"Failed to merge incompatible schemas 
$left and $right", e)
    +      }
    +    }
    +  }
    +
    +  private[parquet] def mergeMetastoreParquetSchema(
    +      metastoreSchema: StructType,
    +      parquetSchema: StructType): StructType = {
    +    def schemaConflictMessage =
    +      s"""Converting Hive Metastore Parquet, but detected conflicting 
schemas. Metastore schema:
    +         |${metastoreSchema.prettyJson}
    +         |
    +         |Parquet schema:
    +         |${parquetSchema.prettyJson}
    +       """.stripMargin
    +
    +    assert(metastoreSchema.size == parquetSchema.size, 
schemaConflictMessage)
    +
    +    val ordinalMap = metastoreSchema.zipWithIndex.map {
    +      case (field, index) => field.name.toLowerCase -> index
    +    }.toMap
    +    val reorderedParquetSchema = parquetSchema.sortBy(f => 
ordinalMap(f.name.toLowerCase))
    +
    +    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
    +      // Uses Parquet field names but retains Metastore data types.
    +      case (mSchema, pSchema) if mSchema.name.toLowerCase == 
pSchema.name.toLowerCase =>
    +        mSchema.copy(name = pSchema.name)
    +      case _ =>
    +        throw new SparkException(schemaConflictMessage)
    +    })
    +  }
    +
    +  // TODO Data source implementations shouldn't touch Catalyst types 
(`Literal`).
    +  // However, we are already using Catalyst expressions for partition 
pruning and predicate
    +  // push-down here...
    +  private[parquet] case class PartitionValues(columnNames: Seq[String], 
literals: Seq[Literal]) {
    +    require(columnNames.size == literals.size)
    +  }
    +
    +  /**
    +   * Given a group of qualified paths, tries to parse them and returns a 
partition specification.
    +   * For example, given:
    +   * {{{
    +   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
    +   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionSpec(
    +   *     partitionColumns = StructType(
    +   *       StructField(name = "a", dataType = IntegerType, nullable = 
true),
    +   *       StructField(name = "b", dataType = StringType, nullable = true),
    +   *       StructField(name = "c", dataType = DoubleType, nullable = 
true)),
    +   *     partitions = Seq(
    +   *       Partition(
    +   *         values = Row(1, "hello", 3.14),
    +   *         path = 
"hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
    +   *       Partition(
    +   *         values = Row(2, "world", 6.28),
    +   *         path = 
"hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
    +   * }}}
    +   */
    +  private[parquet] def parsePartitions(
    +      paths: Seq[Path],
    +      defaultPartitionName: String): PartitionSpec = {
    +    val partitionValues = resolvePartitions(paths.map(parsePartition(_, 
defaultPartitionName)))
    +    val fields = {
    +      val (PartitionValues(columnNames, literals)) = partitionValues.head
    +      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
    +        StructField(name, dataType, nullable = true)
    +      }
    +    }
    +
    +    val partitions = partitionValues.zip(paths).map {
    +      case (PartitionValues(_, literals), path) =>
    +        Partition(Row(literals.map(_.value): _*), path.toString)
    +    }
    +
    +    PartitionSpec(StructType(fields), partitions)
    +  }
    +
    +  /**
    +   * Parses a single partition, returns column names and values of each 
partition column.  For
    +   * example, given:
    +   * {{{
    +   *   basePath = hdfs://<host>:<port>/path/to/partition
    +   *   partitionPath = 
hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionDesc(
    +   *     Seq("a", "b", "c"),
    +   *     Seq(
    +   *       Literal(42, IntegerType),
    +   *       Literal("hello", StringType),
    +   *       Literal(3.14, FloatType)))
    +   * }}}
    +   */
    +  private[parquet] def parsePartition(
    +      path: Path,
    +      defaultPartitionName: String): PartitionValues = {
    +    val columns = ArrayBuffer.empty[(String, Literal)]
    +    var finished = path.isRoot
    +    var chopped = path
    +
    +    while (!finished) {
    +      val maybeColumn = parsePartitionColumn(chopped.getName, 
defaultPartitionName)
    +      maybeColumn.foreach(columns += _)
    +      chopped = chopped.getParent
    +      finished = maybeColumn.isEmpty || chopped.isRoot
    +    }
    +
    +    val (columnNames, values) = columns.unzip
    +    PartitionValues(columnNames, values)
    +  }
    +
    +  private def parsePartitionColumn(
    +      columnSpec: String,
    +      defaultPartitionName: String): Option[(String, Literal)] = {
    +    val equalSignIndex = columnSpec.indexOf('=')
    +    if (equalSignIndex == -1) {
    +      None
    +    } else {
    +      val columnName = columnSpec.take(equalSignIndex)
    +      val literal = inferPartitionColumnValue(
    +        columnSpec.drop(equalSignIndex + 1), defaultPartitionName)
    +      Some(columnName -> literal)
    +    }
    +  }
    +
    +  /**
    +   * Resolves possible type conflicts between partitions by up-casting 
"lower" types.  The up-
    +   * casting order is:
    +   * {{{
    +   *   NullType ->
    +   *   IntegerType -> LongType ->
    +   *   FloatType -> DoubleType -> DecimalType.Unlimited ->
    +   *   StringType
    +   * }}}
    +   */
    +  private[parquet] def resolvePartitions(descs: Seq[PartitionValues]): 
Seq[PartitionValues] = {
    +    val distinctColNamesOfPartitions = descs.map(_.columnNames).distinct
    +    val columnCount = descs.head.columnNames.size
    +
    +    // Column names of all partitions must match
    +    assert(distinctColNamesOfPartitions.size == 1, {
    +      val list = distinctColNamesOfPartitions.mkString("\t", "\n", "")
    +      s"Conflicting partition column names detected:\n$list"
    +    })
    +
    +    // Resolves possible type conflicts for each column
    +    val resolvedValues = (0 until columnCount).map { i =>
    +      resolveTypeConflicts(descs.map(_.literals(i)))
    +    }
    +
    +    // Fills resolved literals back to each partition
    +    descs.zipWithIndex.map { case (d, index) =>
    +      d.copy(literals = resolvedValues.map(_(index)))
    +    }
    +  }
    +
    +  /**
    +   * Converts a string to a `Literal` with automatic type inference.  
Currently only supports
    +   * [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], 
[[DecimalType.Unlimited]], and
    +   * [[StringType]].
    +   */
    +  private[parquet] def inferPartitionColumnValue(
    +      raw: String,
    +      defaultPartitionName: String): Literal = {
    +    // First tries integral types
    +    Try(Literal(Integer.parseInt(raw), IntegerType))
    +      .orElse(Try(Literal(JLong.parseLong(raw), LongType)))
    +      // Then falls back to fractional types
    +      .orElse(Try(Literal(JFloat.parseFloat(raw), FloatType)))
    +      .orElse(Try(Literal(JDouble.parseDouble(raw), DoubleType)))
    +      .orElse(Try(Literal(new JBigDecimal(raw), DecimalType.Unlimited)))
    +      // Then falls back to string
    +      .getOrElse {
    +        if (raw == defaultPartitionName) Literal(null, NullType) else 
Literal(raw, StringType)
    --- End diff --
    
    For string, should we convert default partition name to `null` or empty 
string? Need to check Hive behaviour here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to