huaxingao commented on a change in pull request #33639:
URL: https://github.com/apache/spark/pull/33639#discussion_r687325426
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +145,260 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+
+ /**
+ * When the partial Aggregates (Max/Min/Count) are pushed down to parquet,
we don't need to
+ * createRowBaseReader to read data from parquet and aggregate at spark
layer. Instead we want
+ * to get the partial Aggregates (Max/Min/Count) result using the statistics
information
+ * from parquet footer file, and then construct an InternalRow from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of InternalRow
+ */
+ private[sql] def createAggInternalRowFromFooter(
+ footer: ParquetMetadata,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ aggregation: Aggregation,
+ aggSchema: StructType,
+ datetimeRebaseModeInRead: String,
+ isCaseSensitive: Boolean): InternalRow = {
+ val (parquetTypes, values) =
+ getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation,
isCaseSensitive)
+ val mutableRow = new SpecificInternalRow(aggSchema.fields.map(x =>
x.dataType))
+ val footerFileMetaData = footer.getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+
+ parquetTypes.zipWithIndex.foreach {
+ case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+ aggSchema.fields(i).dataType match {
+ case ByteType =>
+ mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+ case ShortType =>
+ mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+ case IntegerType =>
+ mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+ case DateType =>
+ val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+ datetimeRebaseMode, "Parquet")
+ mutableRow.update(i,
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Integer].toLong,
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new SparkException(s"Unexpected type
${aggSchema.fields(i).dataType}" +
+ " for INT32")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+ aggSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Long], d.precision,
d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new SparkException(s"Unexpected type
${aggSchema.fields(i).dataType}" +
+ " for INT64")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+ mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+ case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+ mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+ case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+ mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+ case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ aggSchema.fields(i).dataType match {
+ case StringType =>
+ mutableRow.update(i, UTF8String.fromBytes(bytes))
+ case BinaryType =>
+ mutableRow.update(i, bytes)
+ case d: DecimalType =>
+ val decimal =
+ Decimal(new BigDecimal(new BigInteger(bytes), d.scale),
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new SparkException(s"Unexpected type
${aggSchema.fields(i).dataType}" +
+ " for Binary")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ aggSchema.fields(i).dataType match {
+ case d: DecimalType =>
+ val decimal =
+ Decimal(new BigDecimal(new BigInteger(bytes), d.scale),
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new SparkException(s"Unexpected type
${aggSchema.fields(i).dataType}" +
+ " for FIXED_LEN_BYTE_ARRAY")
+ }
+ case _ =>
+ throw new SparkException("Unexpected parquet type name")
+ }
+ mutableRow
+ }
+
+ /**
+ * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the
case of
+ * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need
buildColumnarReader
+ * to read data from parquet and aggregate at spark layer. Instead we want
+ * to get the Aggregates (Max/Min/Count) result using the statistics
information
+ * from parquet footer file, and then construct a ColumnarBatch from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of ColumnarBatch
+ */
+ private[sql] def createAggColumnarBatchFromFooter(
Review comment:
Seems there is a problem with `RowToColumnConverter`. I will take a look
at first before I change to `RowToColumnConverter`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
##########
@@ -80,43 +84,90 @@ case class ParquetPartitionReaderFactory(
private val datetimeRebaseModeInRead =
parquetOptions.datetimeRebaseModeInRead
private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+ private def getFooter(file: PartitionedFile): ParquetMetadata = {
+ val conf = broadcastedConf.value.value
+ val filePath = new Path(new URI(file.filePath))
+
+ if (aggregation.isEmpty) {
+ ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS)
+ } else {
+ ParquetFooterReader.readFooter(conf, filePath, NO_FILTER)
+ }
+ }
+
override def supportColumnarReads(partition: InputPartition): Boolean = {
sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
resultSchema.length <= sqlConf.wholeStageMaxNumFields &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
}
override def buildReader(file: PartitionedFile):
PartitionReader[InternalRow] = {
- val reader = if (enableVectorizedReader) {
- createVectorizedReader(file)
+ val fileReader = if (aggregation.isEmpty) {
+ val reader = if (enableVectorizedReader) {
+ createVectorizedReader(file)
+ } else {
+ createRowBaseReader(file)
+ }
+
+ new PartitionReader[InternalRow] {
+ override def next(): Boolean = reader.nextKeyValue()
+
+ override def get(): InternalRow =
reader.getCurrentValue.asInstanceOf[InternalRow]
+
+ override def close(): Unit = reader.close()
+ }
} else {
- createRowBaseReader(file)
- }
+ new PartitionReader[InternalRow] {
+ var count = 0
- val fileReader = new PartitionReader[InternalRow] {
- override def next(): Boolean = reader.nextKeyValue()
+ override def next(): Boolean = if (count == 0) true else false
- override def get(): InternalRow =
reader.getCurrentValue.asInstanceOf[InternalRow]
+ override def get(): InternalRow = {
+ count += 1
+ val footer = getFooter(file)
+ ParquetUtils.createAggInternalRowFromFooter(footer, dataSchema,
partitionSchema,
+ aggregation.get, readDataSchema, datetimeRebaseModeInRead,
isCaseSensitive)
+ }
- override def close(): Unit = reader.close()
+ override def close(): Unit = return
+ }
}
new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
partitionSchema, file.partitionValues)
}
override def buildColumnarReader(file: PartitionedFile):
PartitionReader[ColumnarBatch] = {
- val vectorizedReader = createVectorizedReader(file)
- vectorizedReader.enableReturningBatches()
+ val fileReader = if (aggregation.isEmpty) {
+ val vectorizedReader = createVectorizedReader(file)
+ vectorizedReader.enableReturningBatches()
+
+ new PartitionReader[ColumnarBatch] {
+ override def next(): Boolean = vectorizedReader.nextKeyValue()
- new PartitionReader[ColumnarBatch] {
- override def next(): Boolean = vectorizedReader.nextKeyValue()
+ override def get(): ColumnarBatch =
+ vectorizedReader.getCurrentValue.asInstanceOf[ColumnarBatch]
- override def get(): ColumnarBatch =
- vectorizedReader.getCurrentValue.asInstanceOf[ColumnarBatch]
+ override def close(): Unit = vectorizedReader.close()
+ }
+ } else {
+ new PartitionReader[ColumnarBatch] {
+ var count = 0
+
+ override def next(): Boolean = if (count == 0) true else false
- override def close(): Unit = vectorizedReader.close()
+ override def get(): ColumnarBatch = {
+ count += 1
Review comment:
Done
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
##########
@@ -80,43 +84,90 @@ case class ParquetPartitionReaderFactory(
private val datetimeRebaseModeInRead =
parquetOptions.datetimeRebaseModeInRead
private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+ private def getFooter(file: PartitionedFile): ParquetMetadata = {
+ val conf = broadcastedConf.value.value
+ val filePath = new Path(new URI(file.filePath))
+
+ if (aggregation.isEmpty) {
+ ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS)
+ } else {
+ ParquetFooterReader.readFooter(conf, filePath, NO_FILTER)
+ }
+ }
+
override def supportColumnarReads(partition: InputPartition): Boolean = {
sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
resultSchema.length <= sqlConf.wholeStageMaxNumFields &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
}
override def buildReader(file: PartitionedFile):
PartitionReader[InternalRow] = {
- val reader = if (enableVectorizedReader) {
- createVectorizedReader(file)
+ val fileReader = if (aggregation.isEmpty) {
+ val reader = if (enableVectorizedReader) {
+ createVectorizedReader(file)
+ } else {
+ createRowBaseReader(file)
+ }
+
+ new PartitionReader[InternalRow] {
+ override def next(): Boolean = reader.nextKeyValue()
+
+ override def get(): InternalRow =
reader.getCurrentValue.asInstanceOf[InternalRow]
+
+ override def close(): Unit = reader.close()
+ }
} else {
- createRowBaseReader(file)
- }
+ new PartitionReader[InternalRow] {
+ var count = 0
- val fileReader = new PartitionReader[InternalRow] {
- override def next(): Boolean = reader.nextKeyValue()
+ override def next(): Boolean = if (count == 0) true else false
- override def get(): InternalRow =
reader.getCurrentValue.asInstanceOf[InternalRow]
+ override def get(): InternalRow = {
+ count += 1
+ val footer = getFooter(file)
+ ParquetUtils.createAggInternalRowFromFooter(footer, dataSchema,
partitionSchema,
+ aggregation.get, readDataSchema, datetimeRebaseModeInRead,
isCaseSensitive)
+ }
- override def close(): Unit = reader.close()
+ override def close(): Unit = return
+ }
}
new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
partitionSchema, file.partitionValues)
}
override def buildColumnarReader(file: PartitionedFile):
PartitionReader[ColumnarBatch] = {
- val vectorizedReader = createVectorizedReader(file)
- vectorizedReader.enableReturningBatches()
+ val fileReader = if (aggregation.isEmpty) {
+ val vectorizedReader = createVectorizedReader(file)
+ vectorizedReader.enableReturningBatches()
+
+ new PartitionReader[ColumnarBatch] {
+ override def next(): Boolean = vectorizedReader.nextKeyValue()
- new PartitionReader[ColumnarBatch] {
- override def next(): Boolean = vectorizedReader.nextKeyValue()
+ override def get(): ColumnarBatch =
+ vectorizedReader.getCurrentValue.asInstanceOf[ColumnarBatch]
- override def get(): ColumnarBatch =
- vectorizedReader.getCurrentValue.asInstanceOf[ColumnarBatch]
+ override def close(): Unit = vectorizedReader.close()
+ }
+ } else {
+ new PartitionReader[ColumnarBatch] {
+ var count = 0
+
+ override def next(): Boolean = if (count == 0) true else false
- override def close(): Unit = vectorizedReader.close()
+ override def get(): ColumnarBatch = {
+ count += 1
+ val footer = getFooter(file)
+ ParquetUtils.createAggColumnarBatchFromFooter(footer, dataSchema,
partitionSchema,
+ aggregation.get, readDataSchema, enableOffHeapColumnVector,
datetimeRebaseModeInRead,
+ isCaseSensitive)
+ }
+
+ override def close(): Unit = return
Review comment:
Done
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
##########
@@ -80,8 +87,82 @@ case class ParquetScanBuilder(
// All filters that can be converted to Parquet are pushed down.
override def pushedFilters(): Array[Filter] = pushedParquetFilters
+ override def pushAggregation(aggregation: Aggregation): Boolean = {
+
+ def getStructFieldForCol(col: FieldReference): StructField = {
+ schema.fields(schema.fieldNames.toList.indexOf(col.fieldNames.head))
+ }
+
+ def isPartitionCol(col: FieldReference) = {
+ if (readPartitionSchema().fields.map(PartitioningUtils
Review comment:
Done
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
##########
@@ -80,8 +87,82 @@ case class ParquetScanBuilder(
// All filters that can be converted to Parquet are pushed down.
override def pushedFilters(): Array[Filter] = pushedParquetFilters
+ override def pushAggregation(aggregation: Aggregation): Boolean = {
+
+ def getStructFieldForCol(col: FieldReference): StructField = {
+ schema.fields(schema.fieldNames.toList.indexOf(col.fieldNames.head))
+ }
+
+ def isPartitionCol(col: FieldReference) = {
+ if (readPartitionSchema().fields.map(PartitioningUtils
+ .getColName(_, sparkSession.sessionState.conf.caseSensitiveAnalysis))
+ .toSet.contains(col.fieldNames.head)) {
+ true
+ } else {
+ false
+ }
+ }
+
+ if (!sparkSession.sessionState.conf.parquetAggregatePushDown ||
+ // parquet footer has max/min/count for columns
+ // e.g. SELECT COUNT(col1) FROM t
+ // but footer doesn't have max/min/count for a column if max/min/count
+ // are combined with filter or group by
+ // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+ // SELECT COUNT(col1) FROM t GROUP BY col2
+ // Todo: 1. add support if groupby column is partition col
+ // 2. add support if filter col is partition col
+ aggregation.groupByColumns.nonEmpty || filters.length > 0) {
+ return false
+ }
+
+ aggregation.groupByColumns.foreach { col =>
+ if (col.fieldNames.length != 1) return false
+ finalSchema = finalSchema.add(getStructFieldForCol(col))
+ }
+
+ aggregation.aggregateExpressions.foreach {
+ case max: Max =>
Review comment:
Done
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
##########
@@ -80,8 +87,82 @@ case class ParquetScanBuilder(
// All filters that can be converted to Parquet are pushed down.
override def pushedFilters(): Array[Filter] = pushedParquetFilters
+ override def pushAggregation(aggregation: Aggregation): Boolean = {
+
+ def getStructFieldForCol(col: FieldReference): StructField = {
+ schema.fields(schema.fieldNames.toList.indexOf(col.fieldNames.head))
+ }
+
+ def isPartitionCol(col: FieldReference) = {
+ if (readPartitionSchema().fields.map(PartitioningUtils
+ .getColName(_, sparkSession.sessionState.conf.caseSensitiveAnalysis))
+ .toSet.contains(col.fieldNames.head)) {
+ true
+ } else {
+ false
+ }
+ }
+
+ if (!sparkSession.sessionState.conf.parquetAggregatePushDown ||
+ // parquet footer has max/min/count for columns
+ // e.g. SELECT COUNT(col1) FROM t
+ // but footer doesn't have max/min/count for a column if max/min/count
+ // are combined with filter or group by
+ // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
+ // SELECT COUNT(col1) FROM t GROUP BY col2
+ // Todo: 1. add support if groupby column is partition col
+ // 2. add support if filter col is partition col
+ aggregation.groupByColumns.nonEmpty || filters.length > 0) {
+ return false
+ }
+
+ aggregation.groupByColumns.foreach { col =>
Review comment:
Right. It's empty. I leave it here because I need it later for group by
partition col.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]