sunchao commented on a change in pull request #33639:
URL: https://github.com/apache/spark/pull/33639#discussion_r699537018



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct a ColumnarBatch from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(4 * 1024, aggSchema)

Review comment:
       should we get batch size from config here?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)

Review comment:
       nit: we should use the current parquet type name instead of the list

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct a ColumnarBatch from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the 
statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and 
Array[Any].
+   *         The first element is the PrimitiveTypeName of the aggregate 
column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks()

Review comment:
       nit: remove empty parens

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct a ColumnarBatch from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the 
statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and 
Array[Any].
+   *         The first element is the PrimitiveTypeName of the aggregate 
column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks()
+    val primitiveTypeBuilder = ArrayBuilder.make[PrimitiveType]
+    val valuesBuilder = ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      var schemaName = ""
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns()
+        agg match {
+          case max: Max =>
+            val colName = max.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "max(" + colName + ")"
+            val currentMax = getCurrentBlockMaxOrMin(blockMetaData, index, 
true)
+            if (value == None || 
currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0) {
+              value = currentMax
+            }
+          case min: Min =>
+            val colName = min.column.fieldNames.head
+            index = dataSchema.fieldNames.toList.indexOf(colName)
+            schemaName = "min(" + colName + ")"
+            val currentMin = getCurrentBlockMaxOrMin(blockMetaData, index, 
false)
+            if (value == None || 
currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0) {
+              value = currentMin
+            }
+          case count: Count =>
+            schemaName = "count(" + count.column.fieldNames.head + ")"
+            rowCount += block.getRowCount
+            var isPartitionCol = false
+            if (partitionSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive))
+              .toSet.contains(count.column().fieldNames.head)) {
+              isPartitionCol = true
+            }
+            isCount = true
+            if(!isPartitionCol) {
+              index = 
dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head)
+              // Count(*) includes the null values, but Count (colName) 
doesn't.
+              rowCount -= getNumNulls(blockMetaData, index)
+            }
+          case _: CountStar =>
+            schemaName = "count(*)"
+            rowCount += block.getRowCount
+            isCount = true
+          case _ =>
+        }
+      }
+      if (isCount) {
+        valuesBuilder += rowCount
+        primitiveTypeBuilder += 
Types.required(PrimitiveTypeName.INT64).named(schemaName);
+      } else {
+        valuesBuilder += value
+        if (fields.get(index).asPrimitiveType().getPrimitiveTypeName

Review comment:
       I think `PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY` doesn't necessarily 
mean the logical type is decimal. We can perhaps use the following:
   ```scala
           valuesBuilder += value
           val field = fields.get(index)
           primitiveTypeBuilder += 
Types.required(field.asPrimitiveType().getPrimitiveTypeName)
               .as(field.getLogicalTypeAnnotation)
               .length(field.asPrimitiveType().getTypeLength)
               .named(schemaName)
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct a ColumnarBatch from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the 
statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and 
Array[Any].
+   *         The first element is the PrimitiveTypeName of the aggregate 
column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks()
+    val primitiveTypeBuilder = ArrayBuilder.make[PrimitiveType]

Review comment:
       nit: ArrayBuilder -> mutable.ArrayBuilder

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct a ColumnarBatch from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the 
statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and 
Array[Any].

Review comment:
       nit: update comment here

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct a ColumnarBatch from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,

Review comment:
       we may need to pass this and `int96RebaseModeInRead` to 
`createAggInternalRowFromFooter`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct a ColumnarBatch from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,
+      isCaseSensitive: Boolean): ColumnarBatch = {
+    val row = createAggInternalRowFromFooter(
+      footer,
+      dataSchema,
+      partitionSchema,
+      aggregation,
+      aggSchema,
+      isCaseSensitive)
+    val converter = new RowToColumnConverter(aggSchema)
+    val columnVectors = if (offHeap) {
+      OffHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    } else {
+      OnHeapColumnVector.allocateColumns(4 * 1024, aggSchema)
+    }
+    converter.convert(row, columnVectors.toArray)
+    new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+  }
+
+  /**
+   * Calculate the pushed down aggregates (Max/Min/Count) result using the 
statistics
+   * information from Parquet footer file.
+   *
+   * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and 
Array[Any].
+   *         The first element is the PrimitiveTypeName of the aggregate 
column,
+   *         and the second element is the aggregated value.
+   */
+  private[sql] def getPushedDownAggResult(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      isCaseSensitive: Boolean)
+  : (Array[PrimitiveType], Array[Any]) = {
+    val footerFileMetaData = footer.getFileMetaData
+    val fields = footerFileMetaData.getSchema.getFields
+    val blocks = footer.getBlocks()
+    val primitiveTypeBuilder = ArrayBuilder.make[PrimitiveType]
+    val valuesBuilder = ArrayBuilder.make[Any]
+
+    aggregation.aggregateExpressions().foreach { agg =>
+      var value: Any = None
+      var rowCount = 0L
+      var isCount = false
+      var index = 0
+      var schemaName = ""
+      blocks.forEach { block =>
+        val blockMetaData = block.getColumns()

Review comment:
       nit: remove empty parens

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +144,209 @@ object ParquetUtils {
     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
+
+  /**
+   * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, 
we don't need to
+   * createRowBaseReader to read data from Parquet and aggregate at Spark 
layer. Instead we want
+   * to get the partial aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct an InternalRow from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of InternalRow
+   */
+  private[sql] def createAggInternalRowFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      isCaseSensitive: Boolean): InternalRow = {
+    val (primitiveType, values) =
+      getPushedDownAggResult(footer, dataSchema, partitionSchema, aggregation, 
isCaseSensitive)
+
+    val builder = Types.buildMessage()
+    primitiveType.foreach(t => builder.addField(t))
+    val parquetSchema = builder.named("root")
+
+    val schemaConverter = new ParquetToSparkSchemaConverter
+    val converter = new ParquetRowConverter(schemaConverter, parquetSchema, 
aggSchema,
+      None, LegacyBehaviorPolicy.CORRECTED, LegacyBehaviorPolicy.CORRECTED, 
NoopUpdater)
+    val primitiveTypeName = primitiveType.map(_.getPrimitiveTypeName)
+    primitiveTypeName.zipWithIndex.foreach {
+      case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+        val v = values(i).asInstanceOf[Boolean]
+        converter.getConverter(i).asPrimitiveConverter().addBoolean(v)
+      case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+        val v = values(i).asInstanceOf[Integer]
+        converter.getConverter(i).asPrimitiveConverter().addInt(v)
+      case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+        val v = values(i).asInstanceOf[Long]
+        converter.getConverter(i).asPrimitiveConverter().addLong(v)
+      case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+        val v = values(i).asInstanceOf[Float]
+        converter.getConverter(i).asPrimitiveConverter().addFloat(v)
+      case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+        val v = values(i).asInstanceOf[Double]
+        converter.getConverter(i).asPrimitiveConverter().addDouble(v)
+      case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+        val v = values(i).asInstanceOf[Binary]
+        converter.getConverter(i).asPrimitiveConverter().addBinary(v)
+      case _ =>
+        throw new SparkException("Unexpected parquet type name: " + 
primitiveTypeName)
+    }
+    converter.currentRecord
+  }
+
+  /**
+   * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the 
case of
+   * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need 
buildColumnarReader
+   * to read data from Parquet and aggregate at Spark layer. Instead we want
+   * to get the aggregates (Max/Min/Count) result using the statistics 
information
+   * from Parquet footer file, and then construct a ColumnarBatch from these 
aggregate results.
+   *
+   * @return Aggregate results in the format of ColumnarBatch
+   */
+  private[sql] def createAggColumnarBatchFromFooter(
+      footer: ParquetMetadata,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      aggregation: Aggregation,
+      aggSchema: StructType,
+      offHeap: Boolean,
+      datetimeRebaseModeInRead: String,

Review comment:
       Hmm, I think `ParquetRowConverter` takes these 2 as input parameters? 
right now we are passing `LegacyBehaviorPolicy.CORRECTED` for both of them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to