dongjoon-hyun commented on a change in pull request #24327: [SPARK-27418][SQL] Migrate Parquet to File Data Source V2 URL: https://github.com/apache/spark/pull/24327#discussion_r289261872
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.parquet + +import java.net.URI +import java.util.TimeZone + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetInputSplit, ParquetRecordReader} + +import org.apache.spark.TaskContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.parquet._ +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader} +import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration + +/** + * A factory used to create Parquet readers. + * + * @param sqlConf SQL configuration. + * @param broadcastedConf Broadcast serializable Hadoop Configuration. + * @param dataSchema Schema of Parquet files. + * @param readDataSchema Required schema of Parquet files. + * @param partitionSchema Schema of partitions. + * @param filters Filters of the batch scan. + */ +case class ParquetPartitionReaderFactory( + sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + filters: Array[Filter]) extends FilePartitionReaderFactory with Logging { + private val isCaseSensitive = sqlConf.caseSensitiveAnalysis + private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields) + private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled + private val enableVectorizedReader: Boolean = sqlConf.parquetVectorizedReaderEnabled && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + private val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled + private val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion + private val capacity = sqlConf.parquetVectorizedReaderBatchSize + private val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown + private val pushDownDate = sqlConf.parquetFilterPushDownDate + private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + + override def supportColumnarReads(partition: InputPartition): Boolean = { + sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled && + resultSchema.length <= sqlConf.wholeStageMaxNumFields && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + } + + override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { + val reader = if (enableVectorizedReader) { + createVectorizedReader(file) + } else { + createRowBaseReader(file) + } + + val fileReader = new PartitionReader[InternalRow] { + override def next(): Boolean = reader.nextKeyValue() + + override def get(): InternalRow = reader.getCurrentValue.asInstanceOf[InternalRow] + + override def close(): Unit = reader.close() + } + + new PartitionReaderWithPartitionValues(fileReader, readDataSchema, + partitionSchema, file.partitionValues) + } + + override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { + val vectorizedReader = createVectorizedReader(file) + vectorizedReader.enableReturningBatches() + + new PartitionReader[ColumnarBatch] { + override def next(): Boolean = vectorizedReader.nextKeyValue() + + override def get(): ColumnarBatch = + vectorizedReader.getCurrentValue.asInstanceOf[ColumnarBatch] + + override def close(): Unit = vectorizedReader.close() + } + } + + private def buildReaderBase[T]( + file: PartitionedFile, + buildReaderFunc: ( + ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate], + Option[TimeZone]) => RecordReader[Void, T]): RecordReader[Void, T] = { + val conf = broadcastedConf.value.value + + val filePath = new Path(new URI(file.filePath)) + val split = + new org.apache.parquet.hadoop.ParquetInputSplit( + filePath, + file.start, + file.start + file.length, + file.length, + Array.empty, + null) + + lazy val footerFileMetaData = + ParquetFileReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, + pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) Review comment: `ParquetScanBuilder` already did most thing. Here, I guess we need `.reduceOption(FilterApi.and)` only. Please correct me if I'm wrong~ Also, cc @cloud-fan ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
