huaxingao commented on a change in pull request #33639:
URL: https://github.com/apache/spark/pull/33639#discussion_r699846813
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+
+ /**
+ * When the partial aggregates (Max/Min/Count) are pushed down to Parquet,
we don't need to
+ * createRowBaseReader to read data from Parquet and aggregate at Spark
layer. Instead we want
+ * to get the partial aggregates (Max/Min/Count) result using the statistics
information
+ * from Parquet footer file, and then construct an InternalRow from these
aggregate results.
+ *
+ * @return Aggregate results in the format of InternalRow
+ */
+ private[sql] def createAggInternalRowFromFooter(
+ footer: ParquetMetadata,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ aggregation: Aggregation,
+ aggSchema: StructType,
+ isCaseSensitive: Boolean): InternalRow = {
+ val (primitiveType, values) =
+ getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation,
isCaseSensitive)
+
+ val builder = Types.buildMessage()
+ primitiveType.foreach(t => builder.addField(t))
+ val parquetSchema = builder.named("root")
+
+ val schemaConverter = new ParquetToSparkSchemaConverter
+ val converter = new ParquetRowConverter(schemaConverter, parquetSchema,
aggSchema,
+ None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED,
NoopUpdater)
+ val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+ primitiveTypeName.zipWithIndex.foreach {
+ case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+ val v = values(i).asInstanceOf[Boolean]
+ converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+ case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+ val v = values(i).asInstanceOf[Integer]
+ converter.getConverter(i).asPrimitiveConverter().addInt(v)
+ case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+ val v = values(i).asInstanceOf[Long]
+ converter.getConverter(i).asPrimitiveConverter().addLong(v)
+ case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+ val v = values(i).asInstanceOf[Float]
+ converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+ case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+ val v = values(i).asInstanceOf[Double]
+ converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+ case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+ val v = values(i).asInstanceOf[Binary]
+ converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+ case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+ val v = values(i).asInstanceOf[Binary]
+ converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+ case _ =>
+ throw new SparkException("Unexpected parquet type name: " +
primitiveTypeName)
+ }
+ converter.currentRecord
+ }
+
+ /**
+ * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the
case of
+ * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need
buildColumnarReader
+ * to read data from Parquet and aggregate at Spark layer. Instead we want
+ * to get the aggregates (Max/Min/Count) result using the statistics
information
+ * from Parquet footer file, and then construct a ColumnarBatch from these
aggregate results.
+ *
+ * @return Aggregate results in the format of ColumnarBatch
+ */
+ private[sql] def createAggColumnarBatchFromFooter(
+ footer: ParquetMetadata,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ aggregation: Aggregation,
+ aggSchema: StructType,
+ offHeap: Boolean,
+ datetimeRebaseModeInRead: String,
Review comment:
added datetimeRebaseMode, don't need int96RebaseMode or convertTz
because INT96 is not pushed down
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]