huaxingao commented on a change in pull request #33639:
URL: https://github.com/apache/spark/pull/33639#discussion_r683981070



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
##########
@@ -40,7 +42,8 @@ import org.apache.spark.util.Utils
 /**
  * A test suite that tests various Parquet queries.
  */
-abstract class ParquetQuerySuite extends QueryTest with ParquetTest with 
SharedSparkSession {
+abstract class ParquetQuerySuite extends QueryTest with ParquetTest with 
SharedSparkSession
+  with ExplainSuiteHelper{

Review comment:
       Done

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,255 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial Aggregates (Max/Min/Count) are pushed down to parquet, 
we don't need to
+   * createRowBaseReader to read data from parquet and aggregate at spark 
layer. Instead we want
+   * to get the partial Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct an InternalRow from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createInternalRowFromAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val mutableRow = new SpecificInternalRow(aggSchema.fields.map(x => 
x.dataType))
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            mutableRow.update(i, 
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Integer].toLong, 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT32")
+        }
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        aggSchema.fields(i).dataType match {
+          case LongType =>
+            mutableRow.setLong(i, values(i).asInstanceOf[Long])
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Long], d.precision, 
d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT64")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case StringType =>
+            mutableRow.update(i, UTF8String.fromBytes(bytes))
+          case BinaryType =>
+            mutableRow.update(i, bytes)
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for Binary")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for 
FIXED_LEN_BYTE_ARRAY")
+        }
+      case _ =>
+        throw new SparkException("Unexpected parquet type name")
+    }
+    mutableRow
+  }
+
+  /**
+   * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from parquet and aggregate at spark layer. Instead we want
+   * to get the Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct a ColumnarBatch from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createColumnarBatchFromAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val capacity = 4 * 1024
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(capacity, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(capacity, aggSchema)
+    }
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            columnVectors(i).appendByte(values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            
columnVectors(i).appendShort(values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            columnVectors(i).appendInt(values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            
columnVectors(i).appendInt(dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case _ => throw new SparkException("Unexpected type for INT32")
+        }
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        columnVectors(i).appendLong(values(i).asInstanceOf[Long])
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        columnVectors(i).appendFloat(values(i).asInstanceOf[Float])
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        columnVectors(i).appendDouble(values(i).asInstanceOf[Double])
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        columnVectors(i).putByteArray(0, bytes, 0, bytes.length)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        columnVectors(i).putByteArray(0, bytes, 0, bytes.length)
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        columnVectors(i).appendBoolean(values(i).asInstanceOf[Boolean])
+      case _ =>
+        throw new SparkException("Unexpected parquet type name")
+    }
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down Aggregates (Max/Min/Count) result using the 
statistics
+   * information from parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and 
Array[Any].
+   *         The first element is the PrimitiveTypeName of the Aggregate 
column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType.PrimitiveTypeName], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks()
+    val typesBuilder = ArrayBuilder.make[PrimitiveType.PrimitiveTypeName]
+    val valuesBuilder = ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns()
+        agg match {
+          case max: Max =>
+            index = 
dataSchema.fieldNames.toList.indexOf(max.column.fieldNames.head)
+            val currentMax = getCurrentBlockMaxOrMin(blockMetaData, index, 
true)
+            if (currentMax != None &&
+              (value == None || 
currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0)) {
+              value = currentMax
+            }
+          case min: Min =>
+            index = 
dataSchema.fieldNames.toList.indexOf(min.column.fieldNames.head)
+            val currentMin = getCurrentBlockMaxOrMin(blockMetaData, index, 
false)
+            if (currentMin != None &&
+              (value == None || 
currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0)) {
+              value = currentMin
+            }
+          case count: Count =>
+            rowCount += block.getRowCount
+            var isPartitionCol = false;
+            if (partitionSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive))
+              .toSet.contains(count.column().fieldNames.head)) {
+              isPartitionCol = true
+            }
+            isCount = true
+            if(!isPartitionCol) {
+              index = 
dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head)
+              // Count(*) includes the null values, but Count (colName) 
doesn't.
+              rowCount -= getNumNulls(blockMetaData, index)
+            }
+          case _: CountStar =>
+            rowCount += block.getRowCount
+            isCount = true
+          case _ =>
+        }
+      }
+      if (isCount) {
+        valuesBuilder += rowCount
+        typesBuilder += PrimitiveType.PrimitiveTypeName.INT64
+      } else {
+        valuesBuilder += value
+        typesBuilder += fields.get(index).asPrimitiveType.getPrimitiveTypeName
+      }
+    }
+    (typesBuilder.result(), valuesBuilder.result())
+  }
+
+  /**
+   * get the Max or Min value for ith column in the current block
+   *
+   * @return the Max or Min value
+   */
+  private def getCurrentBlockMaxOrMin(
+      columnChunkMetaData: util.List[ColumnChunkMetaData],
+      i: Int,
+      isMax: Boolean): Any = {
+    val statistics = columnChunkMetaData.get(i).getStatistics()
+    if (!statistics.hasNonNullValue) {
+      throw new UnsupportedOperationException("No min/max found for parquet 
file, Set SQLConf" +
+        " PARQUET_AGGREGATE_PUSHDOWN_ENABLED to false and execute again")
+    } else {
+      if (isMax) statistics.genericGetMax() else statistics.genericGetMin()
+    }
+  }
+
+  private def getNumNulls(
+      columnChunkMetaData: util.List[ColumnChunkMetaData],
+      i: Int): Long = {
+    val statistics = columnChunkMetaData.get(i).getStatistics()
+    if (!statistics.isNumNullsSet()) {

Review comment:
       I am not sure. During my test, this only happened for partition col. I 
will take a look at the parquet code to see if I can find more info there. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,255 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial Aggregates (Max/Min/Count) are pushed down to parquet, 
we don't need to
+   * createRowBaseReader to read data from parquet and aggregate at spark 
layer. Instead we want
+   * to get the partial Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct an InternalRow from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createInternalRowFromAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val mutableRow = new SpecificInternalRow(aggSchema.fields.map(x => 
x.dataType))
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            mutableRow.update(i, 
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Integer].toLong, 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT32")
+        }
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        aggSchema.fields(i).dataType match {
+          case LongType =>
+            mutableRow.setLong(i, values(i).asInstanceOf[Long])
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Long], d.precision, 
d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT64")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case StringType =>
+            mutableRow.update(i, UTF8String.fromBytes(bytes))
+          case BinaryType =>
+            mutableRow.update(i, bytes)
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for Binary")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for 
FIXED_LEN_BYTE_ARRAY")
+        }
+      case _ =>
+        throw new SparkException("Unexpected parquet type name")
+    }
+    mutableRow
+  }
+
+  /**
+   * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from parquet and aggregate at spark layer. Instead we want
+   * to get the Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct a ColumnarBatch from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createColumnarBatchFromAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val capacity = 4 * 1024
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(capacity, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(capacity, aggSchema)
+    }
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            columnVectors(i).appendByte(values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            
columnVectors(i).appendShort(values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            columnVectors(i).appendInt(values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            
columnVectors(i).appendInt(dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case _ => throw new SparkException("Unexpected type for INT32")
+        }
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        columnVectors(i).appendLong(values(i).asInstanceOf[Long])
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        columnVectors(i).appendFloat(values(i).asInstanceOf[Float])
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        columnVectors(i).appendDouble(values(i).asInstanceOf[Double])
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        columnVectors(i).putByteArray(0, bytes, 0, bytes.length)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        columnVectors(i).putByteArray(0, bytes, 0, bytes.length)
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        columnVectors(i).appendBoolean(values(i).asInstanceOf[Boolean])
+      case _ =>
+        throw new SparkException("Unexpected parquet type name")
+    }
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down Aggregates (Max/Min/Count) result using the 
statistics
+   * information from parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and 
Array[Any].
+   *         The first element is the PrimitiveTypeName of the Aggregate 
column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType.PrimitiveTypeName], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks()
+    val typesBuilder = ArrayBuilder.make[PrimitiveType.PrimitiveTypeName]
+    val valuesBuilder = ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns()
+        agg match {
+          case max: Max =>
+            index = 
dataSchema.fieldNames.toList.indexOf(max.column.fieldNames.head)
+            val currentMax = getCurrentBlockMaxOrMin(blockMetaData, index, 
true)
+            if (currentMax != None &&
+              (value == None || 
currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0)) {
+              value = currentMax
+            }
+          case min: Min =>
+            index = 
dataSchema.fieldNames.toList.indexOf(min.column.fieldNames.head)
+            val currentMin = getCurrentBlockMaxOrMin(blockMetaData, index, 
false)
+            if (currentMin != None &&
+              (value == None || 
currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0)) {
+              value = currentMin
+            }
+          case count: Count =>
+            rowCount += block.getRowCount
+            var isPartitionCol = false;
+            if (partitionSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive))
+              .toSet.contains(count.column().fieldNames.head)) {
+              isPartitionCol = true
+            }
+            isCount = true
+            if(!isPartitionCol) {
+              index = 
dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head)
+              // Count(*) includes the null values, but Count (colName) 
doesn't.
+              rowCount -= getNumNulls(blockMetaData, index)
+            }
+          case _: CountStar =>
+            rowCount += block.getRowCount
+            isCount = true
+          case _ =>
+        }
+      }
+      if (isCount) {
+        valuesBuilder += rowCount
+        typesBuilder += PrimitiveType.PrimitiveTypeName.INT64
+      } else {
+        valuesBuilder += value
+        typesBuilder += fields.get(index).asPrimitiveType.getPrimitiveTypeName
+      }
+    }
+    (typesBuilder.result(), valuesBuilder.result())
+  }
+
+  /**
+   * get the Max or Min value for ith column in the current block
+   *
+   * @return the Max or Min value
+   */
+  private def getCurrentBlockMaxOrMin(
+      columnChunkMetaData: util.List[ColumnChunkMetaData],
+      i: Int,
+      isMax: Boolean): Any = {
+    val statistics = columnChunkMetaData.get(i).getStatistics()
+    if (!statistics.hasNonNullValue) {
+      throw new UnsupportedOperationException("No min/max found for parquet 
file, Set SQLConf" +
+        " PARQUET_AGGREGATE_PUSHDOWN_ENABLED to false and execute again")

Review comment:
       Fixed. Thanks!

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -870,6 +870,12 @@ object SQLConf {
       .checkValue(threshold => threshold >= 0, "The threshold must not be 
negative.")
       .createWithDefault(10)
 
+  val PARQUET_AGGREGATE_PUSHDOWN_ENABLED = 
buildConf("spark.sql.parquet.aggregatePushdown")
+    .doc("Enables Parquet aggregate push-down optimization when set to true.")

Review comment:
       Done

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,255 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial Aggregates (Max/Min/Count) are pushed down to parquet, 
we don't need to
+   * createRowBaseReader to read data from parquet and aggregate at spark 
layer. Instead we want
+   * to get the partial Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct an InternalRow from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createInternalRowFromAggResult(

Review comment:
       Fixed. Thanks!

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,255 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial Aggregates (Max/Min/Count) are pushed down to parquet, 
we don't need to
+   * createRowBaseReader to read data from parquet and aggregate at spark 
layer. Instead we want
+   * to get the partial Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct an InternalRow from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createInternalRowFromAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val mutableRow = new SpecificInternalRow(aggSchema.fields.map(x => 
x.dataType))
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            mutableRow.update(i, 
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Integer].toLong, 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT32")
+        }
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        aggSchema.fields(i).dataType match {
+          case LongType =>
+            mutableRow.setLong(i, values(i).asInstanceOf[Long])
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Long], d.precision, 
d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT64")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case StringType =>
+            mutableRow.update(i, UTF8String.fromBytes(bytes))
+          case BinaryType =>
+            mutableRow.update(i, bytes)
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for Binary")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for 
FIXED_LEN_BYTE_ARRAY")
+        }
+      case _ =>
+        throw new SparkException("Unexpected parquet type name")
+    }
+    mutableRow
+  }
+
+  /**
+   * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from parquet and aggregate at spark layer. Instead we want
+   * to get the Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct a ColumnarBatch from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createColumnarBatchFromAggResult(

Review comment:
       Fixed.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,255 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial Aggregates (Max/Min/Count) are pushed down to parquet, 
we don't need to
+   * createRowBaseReader to read data from parquet and aggregate at spark 
layer. Instead we want
+   * to get the partial Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct an InternalRow from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createInternalRowFromAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val mutableRow = new SpecificInternalRow(aggSchema.fields.map(x => 
x.dataType))
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            mutableRow.update(i, 
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Integer].toLong, 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT32")

Review comment:
       Fixed. Thanks!

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,255 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial Aggregates (Max/Min/Count) are pushed down to parquet, 
we don't need to
+   * createRowBaseReader to read data from parquet and aggregate at spark 
layer. Instead we want
+   * to get the partial Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct an InternalRow from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createInternalRowFromAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val mutableRow = new SpecificInternalRow(aggSchema.fields.map(x => 
x.dataType))
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            mutableRow.update(i, 
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Integer].toLong, 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT32")
+        }
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        aggSchema.fields(i).dataType match {
+          case LongType =>
+            mutableRow.setLong(i, values(i).asInstanceOf[Long])
+          case d: DecimalType =>
+            val decimal = Decimal(values(i).asInstanceOf[Long], d.precision, 
d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for INT64")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case StringType =>
+            mutableRow.update(i, UTF8String.fromBytes(bytes))
+          case BinaryType =>
+            mutableRow.update(i, bytes)
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for Binary")
+        }
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        aggSchema.fields(i).dataType match {
+          case d: DecimalType =>
+            val decimal =
+              Decimal(new BigDecimal(new BigInteger(bytes), d.scale), 
d.precision, d.scale)
+            mutableRow.setDecimal(i, decimal, d.precision)
+          case _ => throw new SparkException("Unexpected type for 
FIXED_LEN_BYTE_ARRAY")
+        }
+      case _ =>
+        throw new SparkException("Unexpected parquet type name")
+    }
+    mutableRow
+  }
+
+  /**
+   * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from parquet and aggregate at spark layer. Instead we want
+   * to get the Aggregates (Max/Min/Count) result using the statistics 
information
+   * from parquet footer file, and then construct a ColumnarBatch from these 
Aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createColumnarBatchFromAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val (parquetTypes, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+    val capacity = 4 * 1024
+    val footerFileMetaData = footer.getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(capacity, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(capacity, aggSchema)
+    }
+
+    parquetTypes.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        aggSchema.fields(i).dataType match {
+          case ByteType =>
+            columnVectors(i).appendByte(values(i).asInstanceOf[Integer].toByte)
+          case ShortType =>
+            
columnVectors(i).appendShort(values(i).asInstanceOf[Integer].toShort)
+          case IntegerType =>
+            columnVectors(i).appendInt(values(i).asInstanceOf[Integer])
+          case DateType =>
+            val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+              datetimeRebaseMode, "Parquet")
+            
columnVectors(i).appendInt(dateRebaseFunc(values(i).asInstanceOf[Integer]))
+          case _ => throw new SparkException("Unexpected type for INT32")
+        }
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        columnVectors(i).appendLong(values(i).asInstanceOf[Long])
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        columnVectors(i).appendFloat(values(i).asInstanceOf[Float])
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        columnVectors(i).appendDouble(values(i).asInstanceOf[Double])
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        columnVectors(i).putByteArray(0, bytes, 0, bytes.length)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val bytes = values(i).asInstanceOf[Binary].getBytes
+        columnVectors(i).putByteArray(0, bytes, 0, bytes.length)
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        columnVectors(i).appendBoolean(values(i).asInstanceOf[Boolean])
+      case _ =>
+        throw new SparkException("Unexpected parquet type name")
+    }
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down Aggregates (Max/Min/Count) result using the 
statistics
+   * information from parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and 
Array[Any].
+   *         The first element is the PrimitiveTypeName of the Aggregate 
column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType.PrimitiveTypeName], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks()
+    val typesBuilder = ArrayBuilder.make[PrimitiveType.PrimitiveTypeName]
+    val valuesBuilder = ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns()
+        agg match {
+          case max: Max =>
+            index = 
dataSchema.fieldNames.toList.indexOf(max.column.fieldNames.head)
+            val currentMax = getCurrentBlockMaxOrMin(blockMetaData, index, 
true)
+            if (currentMax != None &&
+              (value == None || 
currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0)) {
+              value = currentMax
+            }
+          case min: Min =>
+            index = 
dataSchema.fieldNames.toList.indexOf(min.column.fieldNames.head)
+            val currentMin = getCurrentBlockMaxOrMin(blockMetaData, index, 
false)
+            if (currentMin != None &&
+              (value == None || 
currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0)) {
+              value = currentMin
+            }
+          case count: Count =>
+            rowCount += block.getRowCount
+            var isPartitionCol = false;
+            if (partitionSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive))
+              .toSet.contains(count.column().fieldNames.head)) {
+              isPartitionCol = true
+            }
+            isCount = true
+            if(!isPartitionCol) {
+              index = 
dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head)
+              // Count(*) includes the null values, but Count (colName) 
doesn't.
+              rowCount -= getNumNulls(blockMetaData, index)
+            }
+          case _: CountStar =>
+            rowCount += block.getRowCount
+            isCount = true
+          case _ =>
+        }
+      }
+      if (isCount) {
+        valuesBuilder += rowCount
+        typesBuilder += PrimitiveType.PrimitiveTypeName.INT64
+      } else {
+        valuesBuilder += value
+        typesBuilder += fields.get(index).asPrimitiveType.getPrimitiveTypeName
+      }
+    }
+    (typesBuilder.result(), valuesBuilder.result())
+  }
+
+  /**
+   * get the Max or Min value for ith column in the current block

Review comment:
       Done

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
##########
@@ -43,10 +44,14 @@ case class ParquetScan(
     readPartitionSchema: StructType,
     pushedFilters: Array[Filter],
     options: CaseInsensitiveStringMap,
+    pushedAggregate: Option[Aggregation] = None,
     partitionFilters: Seq[Expression] = Seq.empty,
     dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
   override def isSplitable(path: Path): Boolean = true
 
+  override def readSchema(): StructType =
+    if (pushedAggregate.nonEmpty) readDataSchema else super.readSchema()

Review comment:
       yes

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
##########
@@ -80,8 +87,74 @@ case class ParquetScanBuilder(
   // All filters that can be converted to Parquet are pushed down.
   override def pushedFilters(): Array[Filter] = pushedParquetFilters
 
+  override def pushAggregation(aggregation: Aggregation): Boolean = {
+
+    def getStructFieldForCol(col: FieldReference): StructField = {
+      schema.fields(schema.fieldNames.toList.indexOf(col.fieldNames.head))
+    }
+
+    def isPartitionCol(col: FieldReference) = {
+      if (readPartitionSchema().fields.map(PartitioningUtils
+        .getColName(_, sparkSession.sessionState.conf.caseSensitiveAnalysis))
+        .toSet.contains(col.fieldNames.head)) {
+        true
+      } else {
+        false
+      }
+    }
+
+    if (!sparkSession.sessionState.conf.parquetAggregatePushDown ||
+      aggregation.groupByColumns.nonEmpty || filters.length > 0) {
+      return false
+    }

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to