Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921277
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -81,116 +101,158 @@ private[parquet] case class 
Partition(partitionValues: Map[String, Any], files:
      * discovery.
      */
     @DeveloperApi
    -case class ParquetRelation2(path: String)(@transient val sqlContext: 
SQLContext)
    +case class ParquetRelation2
    +    (paths: Seq[String], parameters: Map[String, String], maybeSchema: 
Option[StructType] = None)
    +    (@transient val sqlContext: SQLContext)
       extends CatalystScan with Logging {
     
    +  // Should we merge schemas from all Parquet part-files?
    +  private val shouldMergeSchemas =
    +    parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
    +
       def sparkContext = sqlContext.sparkContext
     
    -  // Minor Hack: scala doesnt seem to respect @transient for vals declared 
via extraction
    -  @transient
    -  private var partitionKeys: Seq[String] = _
    -  @transient
    -  private var partitions: Seq[Partition] = _
    -  discoverPartitions()
    +  private val fs = FileSystem.get(sparkContext.hadoopConfiguration)
     
    -  // TODO: Only finds the first partition, assumes the key is of type 
Integer...
    -  private def discoverPartitions() = {
    -    val fs = FileSystem.get(new java.net.URI(path), 
sparkContext.hadoopConfiguration)
    -    val partValue = "([^=]+)=([^=]+)".r
    +  private val baseStatuses = {
    +    val statuses = paths.distinct.map(p => 
fs.getFileStatus(fs.makeQualified(new Path(p))))
    +    assert(statuses.forall(_.isFile) || statuses.forall(_.isDir))
    +    statuses
    +  }
     
    -    val childrenOfPath = fs.listStatus(new 
Path(path)).filterNot(_.getPath.getName.startsWith("_"))
    -    val childDirs = childrenOfPath.filter(s => s.isDir)
    +  private val leafStatuses = baseStatuses.flatMap { f =>
    +    val statuses = SparkHadoopUtil.get.listLeafStatuses(fs, 
f.getPath).filter { f =>
    +      isSummaryFile(f.getPath) ||
    +        !(f.getPath.getName.startsWith("_") || 
f.getPath.getName.startsWith("."))
    +    }
    +    assert(statuses.nonEmpty, s"${f.getPath} is an empty folder.")
    +    statuses
    +  }
     
    -    if (childDirs.size > 0) {
    -      val partitionPairs = childDirs.map(_.getPath.getName).map {
    -        case partValue(key, value) => (key, value)
    -      }
    +  private val (dataStatuses, metadataStatuses, commonMetadataStatuses) = {
    +    (leafStatuses.filterNot(f => isSummaryFile(f.getPath)).toSeq,
    +      leafStatuses.filter(f => f.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE),
    +      leafStatuses.filter(f => f.getPath.getName == 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))
    +  }
     
    -      val foundKeys = partitionPairs.map(_._1).distinct
    -      if (foundKeys.size > 1) {
    -        sys.error(s"Too many distinct partition keys: $foundKeys")
    -      }
    +  private val footers = {
    +    // TODO Issue a Spark job to gather footers if there are too many files
    +    (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { 
f =>
    +      val parquetMetadata = ParquetFileReader.readFooter(
    +        sparkContext.hadoopConfiguration, f, 
ParquetMetadataConverter.NO_FILTER)
    +      f -> new Footer(f.getPath, parquetMetadata)
    +    }.seq.toMap
    +  }
     
    -      // Do a parallel lookup of partition metadata.
    -      val partitionFiles =
    -        childDirs.par.map { d =>
    -          fs.listStatus(d.getPath)
    -            // TODO: Is there a standard hadoop function for this?
    -            .filterNot(_.getPath.getName.startsWith("_"))
    -            .filterNot(_.getPath.getName.startsWith("."))
    -        }.seq
    -
    -      partitionKeys = foundKeys.toSeq
    -      partitions = partitionFiles.zip(partitionPairs).map { case (files, 
(key, value)) =>
    -        Partition(Map(key -> value.toInt), files)
    -      }.toSeq
    +  private val partitionSpec = {
    +    val partitionDirs =
    +      dataStatuses
    +        .filterNot(baseStatuses.contains)
    +        .map(_.getPath.getParent)
    +        .distinct
    +
    +    // Hive uses this as part of the default partition name when the 
partition column value is null
    +    // or empty string
    +    val defaultPartitionName = parameters.getOrElse(
    +      ParquetRelation2.DEFAULT_PARTITION_NAME,
    +      "__HIVE_DEFAULT_PARTITION__")
    +
    +    if (partitionDirs.nonEmpty) {
    +      ParquetRelation2.parsePartitions(partitionDirs, defaultPartitionName)
         } else {
    -      partitionKeys = Nil
    -      partitions = Partition(Map.empty, childrenOfPath) :: Nil
    +      // No partition directories found, makes an empty specification
    +      PartitionSpec(StructType(Seq.empty[StructField]), 
Seq.empty[Partition])
         }
       }
     
    -  override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum
    +  private val PartitionSpec(partitionColumns, partitions) = partitionSpec
    +
    +  private def isPartitioned = partitionColumns.nonEmpty
     
    -  val dataSchema = StructType.fromAttributes( // TODO: Parquet code should 
not deal with attributes.
    -    ParquetTypesConverter.readSchemaFromFile(
    -      partitions.head.files.head.getPath,
    -      Some(sparkContext.hadoopConfiguration),
    -      sqlContext.conf.isParquetBinaryAsString))
    +  private val dataSchema = maybeSchema.getOrElse(readSchema())
     
    -  val dataIncludesKey =
    -    
partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
    +  private val dataSchemaIncludesPartitionKeys =
    +    isPartitioned && partitionColumns.forall(f => 
dataSchema.fieldNames.contains(f.name))
     
    -  override val schema =
    -    if (dataIncludesKey) {
    +  override val schema = {
    +    val fullParquetSchema = if (dataSchemaIncludesPartitionKeys) {
           dataSchema
         } else {
    -      StructType(dataSchema.fields :+ StructField(partitionKeys.head, 
IntegerType))
    +      StructType(dataSchema.fields ++ partitionColumns.fields)
         }
     
    -  override def buildScan(output: Seq[Attribute], predicates: 
Seq[Expression]): RDD[Row] = {
    -    // This is mostly a hack so that we can use the existing parquet 
filter code.
    -    val requiredColumns = output.map(_.name)
    +    val maybeMetastoreSchema =
    +      parameters
    +        .get(ParquetRelation2.METASTORE_SCHEMA)
    +        .map(s => DataType.fromJson(s).asInstanceOf[StructType])
     
    -    val job = new Job(sparkContext.hadoopConfiguration)
    -    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
    -    val jobConf: Configuration = ContextUtil.getConfiguration(job)
    +    maybeMetastoreSchema
    +      .map(ParquetRelation2.mergeMetastoreParquetSchema(_, 
fullParquetSchema))
    +      .getOrElse(fullParquetSchema)
    +  }
     
    -    val requestedSchema = StructType(requiredColumns.map(schema(_)))
    +  private def readSchema(): StructType = {
    +    // Sees which file(s) we need to touch in order to figure out the 
schema.
    +    val filesToTouch =
    +      // Always tries the summary files first if users don't require a 
merged schema.  In this case,
    +      // "_common_metadata" is more preferable than "_metadata" because it 
doesn't contain row
    +      // groups information, and could be much smaller for large Parquet 
files with lots of row
    +      // groups.
    +      //
    +      // NOTE: Metadata stored in the summary files are merged from all 
part-files.  However, for
    +      // user defined key-value metadata (in which we store Spark SQL 
schema), Parquet doesn't know
    +      // how to merge them correctly if some key is associated with 
different values in different
    +      // part-files.  When this happens, Parquet simply gives up 
generating the summary file.  This
    +      // implies that if a summary file presents, then:
    +      //
    +      //   1. Either all part-files have exactly the same Spark SQL 
schema, or
    +      //   2. Some part-files don't contain Spark SQL schema in the 
key-value metadata at all (thus
    +      //      their schemas may differ from each other).
    +      //
    +      // Here we tend to be pessimistic and take the second case into 
account.  Basically this means
    +      // we can't trust the summary files if users require a merged 
schema, and must touch all part-
    +      // files to do the merge.
    +      if (shouldMergeSchemas) {
    +        dataStatuses.toSeq
    +      } else {
    +        commonMetadataStatuses.headOption
    +          .orElse(metadataStatuses.headOption)
    +          // Summary file(s) not found, falls back to the first part-file.
    +          .orElse(dataStatuses.headOption)
    +          .toSeq
    +      }
     
    -    val partitionKeySet = partitionKeys.toSet
    -    val rawPredicate =
    -      predicates
    -        .filter(_.references.map(_.name).toSet.subsetOf(partitionKeySet))
    -        .reduceOption(And)
    -        .getOrElse(Literal(true))
    +    ParquetRelation2.readSchema(filesToTouch.map(footers.apply), 
sqlContext)
    +  }
     
    -    // Translate the predicate so that it reads from the information 
derived from the
    -    // folder structure
    -    val castedPredicate = rawPredicate transform {
    -      case a: AttributeReference =>
    -        val idx = partitionKeys.indexWhere(a.name == _)
    -        BoundReference(idx, IntegerType, nullable = true)
    -    }
    +  private def isSummaryFile(file: Path): Boolean = {
    +    file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
    +      file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
    +  }
     
    -    val inputData = new GenericMutableRow(partitionKeys.size)
    -    val pruningCondition = InterpretedPredicate(castedPredicate)
    +  // TODO Should calculate per scan size
    +  // It's common that a query only scans a fraction of a large Parquet 
file.  Returning size of the
    +  // whole Parquet file disables some optimizations in this case (e.g. 
broadcast join).
    +  override val sizeInBytes = dataStatuses.map(_.getLen).sum
     
    -    val selectedPartitions =
    -      if (partitionKeys.nonEmpty && predicates.nonEmpty) {
    -        partitions.filter { part =>
    -          inputData(0) = part.partitionValues.values.head
    -          pruningCondition(inputData)
    -        }
    -      } else {
    -        partitions
    +  // This is mostly a hack so that we can use the existing parquet filter 
code.
    --- End diff --
    
    Should stop using Catalyst expressions and remove `CatalystScan` after 
lifting partition discovery and partition pruning to data source API level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to