maropu commented on a change in pull request #32049:
URL: https://github.com/apache/spark/pull/32049#discussion_r637728851
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
##########
@@ -37,7 +37,8 @@ class SparkOptimizer(
override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
// TODO: move SchemaPruning into catalyst
- SchemaPruning :: V2ScanRelationPushDown :: V2Writes ::
PruneFileSourcePartitions :: Nil
+ SchemaPruning :: V2ScanRelationPushDown :: V2Writes ::
+ PruneFileSourcePartitions :: Nil
Review comment:
nit: unnecessary change?
##########
File path:
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.Aggregation;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data source can implement this
interface to
+ * push down aggregates to the data source.
+ *
+ * @since 3.2.0
+ */
+@Evolving
+public interface SupportsPushDownAggregates extends ScanBuilder {
+
+ /**
+ * Pushes down Aggregation to datasource.
+ * The Aggregation can be pushed down only if all the Aggregate Functions can
+ * be pushed down.
+ */
+ void pushAggregation(Aggregation aggregation);
+
+ /**
+ * Returns the aggregation that are pushed to the data source via
+ * {@link #pushAggregation(Aggregation aggregation)}.
+ */
+ Aggregation pushedAggregation();
+
+ /**
+ * Returns the schema of the pushed down aggregates
+ */
+ StructType getPushDownAggSchema();
+
+ /**
+ * Indicate if the data source only supports global aggregated push down
+ */
+ boolean supportsGlobalAggregatePushDownOnly();
Review comment:
We need the two flags, `supportsGlobalAggregatePushDownOnly` and
`supportsPushDownAggregateWithFilter`? It seems be okay that we push down
aggregates as much as possible in `SupportsPushDownAggregates.pushAggregation`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +145,292 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+
+ /**
+ * When the partial Aggregates (Max/Min/Count) are pushed down to parquet,
we don't need to
+ * createRowBaseReader to read data from parquet and aggregate at spark
layer. Instead we want
+ * to calculate the partial Aggregates (Max/Min/Count) result using the
statistics information
+ * from parquet footer file, and then construct an InternalRow from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of InternalRow
+ */
+ private[sql] def aggResultToSparkInternalRows(
+ footer: ParquetMetadata,
+ parquetTypes: Seq[PrimitiveType.PrimitiveTypeName],
+ values: Seq[Any],
+ dataSchema: StructType,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ convertTz: Option[ZoneId]): InternalRow = {
+ val mutableRow = new SpecificInternalRow(dataSchema.fields.map(x =>
x.dataType))
+ val footerFileMetaData = footer.getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ parquetTypes.zipWithIndex.map {
Review comment:
`.map` -> `.foreach`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -17,19 +17,130 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.catalyst.expressions.{And, Expression,
NamedExpression, ProjectionOverSchema, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan,
Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter,
LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.read.{Scan, V1Scan}
+import org.apache.spark.sql.catalyst.util.toPrettySQL
+import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownAggregates,
V1Scan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources
+import org.apache.spark.sql.sources.{AggregateFunc, Aggregation}
import org.apache.spark.sql.types.StructType
-object V2ScanRelationPushDown extends Rule[LogicalPlan] {
+object V2ScanRelationPushDown extends Rule[LogicalPlan] with AliasHelper {
import DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+ case aggNode@Aggregate(groupingExpressions, resultExpressions, child) =>
Review comment:
nit: `aggNode@Aggregate` => `aggNode @ Aggregate`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -673,6 +677,25 @@ object DataSourceStrategy
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters,
handledFilters)
}
+ protected[sql] def translateAggregate(
+ aggregates: AggregateExpression,
+ pushableColumn: PushableColumnBase): Option[Seq[AggregateFunc]] = {
+ aggregates.aggregateFunction match {
+ case [email protected](pushableColumn(name)) =>
Review comment:
A nested column case can work fine? It seems there is not test for
nested column cases though.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +145,292 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+
+ /**
+ * When the partial Aggregates (Max/Min/Count) are pushed down to parquet,
we don't need to
+ * createRowBaseReader to read data from parquet and aggregate at spark
layer. Instead we want
+ * to calculate the partial Aggregates (Max/Min/Count) result using the
statistics information
+ * from parquet footer file, and then construct an InternalRow from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of InternalRow
+ */
+ private[sql] def aggResultToSparkInternalRows(
+ footer: ParquetMetadata,
+ parquetTypes: Seq[PrimitiveType.PrimitiveTypeName],
+ values: Seq[Any],
+ dataSchema: StructType,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ convertTz: Option[ZoneId]): InternalRow = {
+ val mutableRow = new SpecificInternalRow(dataSchema.fields.map(x =>
x.dataType))
+ val footerFileMetaData = footer.getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ parquetTypes.zipWithIndex.map {
+ case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+ dataSchema.fields(i).dataType match {
+ case ByteType =>
+ mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+ case ShortType =>
+ mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+ case IntegerType =>
+ mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+ case DateType =>
+ val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+ datetimeRebaseMode, "Parquet")
+ mutableRow.update(i,
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Integer].toLong,
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT32")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Long], d.precision,
d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT64")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT96, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case TimestampType =>
+ val int96RebaseFunc =
DataSourceUtils.creteTimestampRebaseFuncInRead(
+ int96RebaseMode, "Parquet INT96")
+ val julianMicros =
+
ParquetRowConverter.binaryToSQLTimestamp(values(i).asInstanceOf[Binary])
+ val gregorianMicros = int96RebaseFunc(julianMicros)
+ val adjTime =
+ convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _,
ZoneOffset.UTC))
+ .getOrElse(gregorianMicros)
+ mutableRow.setLong(i, adjTime)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT96")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+ mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+ case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+ mutableRow.setDouble(i, values(i).asInstanceOf[Double])
Review comment:
The part above is related to this PR (to support Min/Max/Count)? If not,
how about removing the unnecessary part to simplify the PR?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -673,6 +677,25 @@ object DataSourceStrategy
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters,
handledFilters)
}
+ protected[sql] def translateAggregate(
+ aggregates: AggregateExpression,
+ pushableColumn: PushableColumnBase): Option[Seq[AggregateFunc]] = {
+ aggregates.aggregateFunction match {
+ case [email protected](pushableColumn(name)) =>
Review comment:
nit: `min@aggregate` => `min @ aggregate`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
##########
@@ -673,6 +677,25 @@ object DataSourceStrategy
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters,
handledFilters)
}
+ protected[sql] def translateAggregate(
+ aggregates: AggregateExpression,
+ pushableColumn: PushableColumnBase): Option[Seq[AggregateFunc]] = {
+ aggregates.aggregateFunction match {
+ case [email protected](pushableColumn(name)) =>
+ Some(Seq(Min(name, min.dataType)))
+ case [email protected](pushableColumn(name)) =>
Review comment:
ditto
##########
File path:
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownAggregates.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.Aggregation;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data source can implement this
interface to
+ * push down aggregates to the data source.
+ *
+ * @since 3.2.0
+ */
+@Evolving
+public interface SupportsPushDownAggregates extends ScanBuilder {
+
+ /**
+ * Pushes down Aggregation to datasource.
+ * The Aggregation can be pushed down only if all the Aggregate Functions can
+ * be pushed down.
+ */
+ void pushAggregation(Aggregation aggregation);
+
+ /**
+ * Returns the aggregation that are pushed to the data source via
+ * {@link #pushAggregation(Aggregation aggregation)}.
+ */
+ Aggregation pushedAggregation();
+
+ /**
+ * Returns the schema of the pushed down aggregates
+ */
+ StructType getPushDownAggSchema();
Review comment:
This schema depends on a datasource implementation?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +145,292 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+
+ /**
+ * When the partial Aggregates (Max/Min/Count) are pushed down to parquet,
we don't need to
+ * createRowBaseReader to read data from parquet and aggregate at spark
layer. Instead we want
+ * to calculate the partial Aggregates (Max/Min/Count) result using the
statistics information
+ * from parquet footer file, and then construct an InternalRow from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of InternalRow
+ */
+ private[sql] def aggResultToSparkInternalRows(
+ footer: ParquetMetadata,
+ parquetTypes: Seq[PrimitiveType.PrimitiveTypeName],
+ values: Seq[Any],
+ dataSchema: StructType,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ convertTz: Option[ZoneId]): InternalRow = {
+ val mutableRow = new SpecificInternalRow(dataSchema.fields.map(x =>
x.dataType))
+ val footerFileMetaData = footer.getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ parquetTypes.zipWithIndex.map {
+ case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+ dataSchema.fields(i).dataType match {
+ case ByteType =>
+ mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+ case ShortType =>
+ mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+ case IntegerType =>
+ mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+ case DateType =>
+ val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+ datetimeRebaseMode, "Parquet")
+ mutableRow.update(i,
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Integer].toLong,
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT32")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Long], d.precision,
d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT64")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT96, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case TimestampType =>
+ val int96RebaseFunc =
DataSourceUtils.creteTimestampRebaseFuncInRead(
+ int96RebaseMode, "Parquet INT96")
+ val julianMicros =
+
ParquetRowConverter.binaryToSQLTimestamp(values(i).asInstanceOf[Binary])
+ val gregorianMicros = int96RebaseFunc(julianMicros)
+ val adjTime =
+ convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _,
ZoneOffset.UTC))
+ .getOrElse(gregorianMicros)
+ mutableRow.setLong(i, adjTime)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT96")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+ mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+ case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+ mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+ case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+ mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+ case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ dataSchema.fields(i).dataType match {
+ case StringType =>
+ mutableRow.update(i, UTF8String.fromBytes(bytes))
+ case BinaryType =>
+ mutableRow.update(i, bytes)
+ case d: DecimalType =>
+ val decimal =
+ Decimal(new BigDecimal(new BigInteger(bytes), d.scale),
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
Binary")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ dataSchema.fields(i).dataType match {
+ case d: DecimalType =>
+ val decimal =
+ Decimal(new BigDecimal(new BigInteger(bytes), d.scale),
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
FIXED_LEN_BYTE_ARRAY")
+ }
+ case _ =>
+ throw new IllegalArgumentException("Unexpected parquet type name")
+ }
+ mutableRow
+ }
+
+ /**
+ * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the
case of
+ * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need
buildColumnarReader
+ * to read data from parquet and aggregate at spark layer. Instead we want
+ * to calculate the Aggregates (Max/Min/Count) result using the statistics
information
+ * from parquet footer file, and then construct a ColumnarBatch from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of ColumnarBatch
+ */
+ private[sql] def aggResultToSparkColumnarBatch(
Review comment:
nit: `aggResultToSparkColumnarBatch` ->
`createColumnarBatchFromAggResult `?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +147,328 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+
+ private[sql] def aggResultToSparkInternalRows(
Review comment:
nit: `aggResultToSparkInternalRows` => `createInternalRowFromAggResult`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +145,292 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+
+ /**
+ * When the partial Aggregates (Max/Min/Count) are pushed down to parquet,
we don't need to
+ * createRowBaseReader to read data from parquet and aggregate at spark
layer. Instead we want
+ * to calculate the partial Aggregates (Max/Min/Count) result using the
statistics information
+ * from parquet footer file, and then construct an InternalRow from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of InternalRow
+ */
+ private[sql] def aggResultToSparkInternalRows(
+ footer: ParquetMetadata,
+ parquetTypes: Seq[PrimitiveType.PrimitiveTypeName],
+ values: Seq[Any],
+ dataSchema: StructType,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ convertTz: Option[ZoneId]): InternalRow = {
+ val mutableRow = new SpecificInternalRow(dataSchema.fields.map(x =>
x.dataType))
+ val footerFileMetaData = footer.getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ parquetTypes.zipWithIndex.map {
+ case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+ dataSchema.fields(i).dataType match {
+ case ByteType =>
+ mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+ case ShortType =>
+ mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+ case IntegerType =>
+ mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+ case DateType =>
+ val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+ datetimeRebaseMode, "Parquet")
+ mutableRow.update(i,
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Integer].toLong,
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT32")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Long], d.precision,
d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT64")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT96, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case TimestampType =>
+ val int96RebaseFunc =
DataSourceUtils.creteTimestampRebaseFuncInRead(
+ int96RebaseMode, "Parquet INT96")
+ val julianMicros =
+
ParquetRowConverter.binaryToSQLTimestamp(values(i).asInstanceOf[Binary])
+ val gregorianMicros = int96RebaseFunc(julianMicros)
+ val adjTime =
+ convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _,
ZoneOffset.UTC))
+ .getOrElse(gregorianMicros)
+ mutableRow.setLong(i, adjTime)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT96")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+ mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+ case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+ mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+ case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+ mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+ case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ dataSchema.fields(i).dataType match {
+ case StringType =>
+ mutableRow.update(i, UTF8String.fromBytes(bytes))
+ case BinaryType =>
+ mutableRow.update(i, bytes)
+ case d: DecimalType =>
+ val decimal =
+ Decimal(new BigDecimal(new BigInteger(bytes), d.scale),
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
Binary")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ dataSchema.fields(i).dataType match {
+ case d: DecimalType =>
+ val decimal =
+ Decimal(new BigDecimal(new BigInteger(bytes), d.scale),
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
FIXED_LEN_BYTE_ARRAY")
+ }
+ case _ =>
+ throw new IllegalArgumentException("Unexpected parquet type name")
+ }
+ mutableRow
+ }
+
+ /**
+ * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the
case of
+ * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need
buildColumnarReader
+ * to read data from parquet and aggregate at spark layer. Instead we want
+ * to calculate the Aggregates (Max/Min/Count) result using the statistics
information
+ * from parquet footer file, and then construct a ColumnarBatch from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of ColumnarBatch
+ */
+ private[sql] def aggResultToSparkColumnarBatch(
+ footer: ParquetMetadata,
+ parquetTypes: Seq[PrimitiveType.PrimitiveTypeName],
+ values: Seq[Any],
+ dataSchema: StructType,
+ offHeap: Boolean,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ convertTz: Option[ZoneId]): ColumnarBatch = {
+ val capacity = 4 * 1024
+ val footerFileMetaData = footer.getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ val columnVectors = if (offHeap) {
+ OffHeapColumnVector.allocateColumns(capacity, dataSchema)
+ } else {
+ OnHeapColumnVector.allocateColumns(capacity, dataSchema)
+ }
+
+ parquetTypes.zipWithIndex.map {
+ case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+ dataSchema.fields(i).dataType match {
+ case ByteType =>
+ columnVectors(i).appendByte(values(i).asInstanceOf[Integer].toByte)
+ case ShortType =>
+
columnVectors(i).appendShort(values(i).asInstanceOf[Integer].toShort)
+ case IntegerType =>
+ columnVectors(i).appendInt(values(i).asInstanceOf[Integer])
+ case DateType =>
+ val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+ datetimeRebaseMode, "Parquet")
+
columnVectors(i).appendInt(dateRebaseFunc(values(i).asInstanceOf[Integer]))
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT32")
Review comment:
A user can see this exception? Instead, `SparkException` or
`RuntimeException` in `QueryExecutionErrors`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/sources/aggregates.scala
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.types.DataType
+
+// Aggregate Functions in SQL statement.
+// e.g. SELECT COUNT(EmployeeID), Max(salary), deptID FROM dept GROUP BY deptID
+// aggregateExpressions are (COUNT(EmployeeID), Max(salary)), groupByColumns
are (deptID)
+case class Aggregation(aggregateExpressions: Seq[Seq[AggregateFunc]],
Review comment:
How about simply creating `case class Avg(Sum, Count) extends
AggregateFunc` for it? We could change it into `Seq[AggregateFunc]` then.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
##########
@@ -80,43 +90,108 @@ case class ParquetPartitionReaderFactory(
private val datetimeRebaseModeInRead =
parquetOptions.datetimeRebaseModeInRead
private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+ private def getFooter(file: PartitionedFile): ParquetMetadata = {
+ val conf = broadcastedConf.value.value
+
+ val filePath = new Path(new URI(file.filePath))
+
+ if (aggregation.aggregateExpressions.isEmpty) {
+ ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS)
+ } else {
+ ParquetFooterReader.readFooter(conf, filePath, NO_FILTER)
+ }
+ }
+
+ // Define isCreatedByParquetMr as function to avoid unnecessary parquet
footer reads.
+ private def isCreatedByParquetMr(file: PartitionedFile): Boolean =
+ getFooter(file).getFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+ private def convertTz(isCreatedByParquetMr: Boolean): Option[ZoneId] =
+ if (timestampConversion && !isCreatedByParquetMr) {
+ Some(DateTimeUtils
+
.getZoneId(broadcastedConf.value.value.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+ } else {
+ None
+ }
+
override def supportColumnarReads(partition: InputPartition): Boolean = {
sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
resultSchema.length <= sqlConf.wholeStageMaxNumFields &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
}
override def buildReader(file: PartitionedFile):
PartitionReader[InternalRow] = {
- val reader = if (enableVectorizedReader) {
- createVectorizedReader(file)
+ val fileReader = if (aggregation.aggregateExpressions.isEmpty) {
+
+ val reader = if (enableVectorizedReader) {
+ createVectorizedReader(file)
+ } else {
+ createRowBaseReader(file)
+ }
+
+ new PartitionReader[InternalRow] {
+ override def next(): Boolean = reader.nextKeyValue()
+
+ override def get(): InternalRow =
reader.getCurrentValue.asInstanceOf[InternalRow]
+
+ override def close(): Unit = reader.close()
+ }
} else {
- createRowBaseReader(file)
- }
+ new PartitionReader[InternalRow] {
+ var count = 0
- val fileReader = new PartitionReader[InternalRow] {
- override def next(): Boolean = reader.nextKeyValue()
+ override def next(): Boolean = if (count == 0) true else false
- override def get(): InternalRow =
reader.getCurrentValue.asInstanceOf[InternalRow]
+ override def get(): InternalRow = {
+ count += 1
+ val footer = getFooter(file)
+ val (parquetTypes, values) =
+ ParquetUtils.getPushedDownAggResult(footer, dataSchema,
aggregation)
+ ParquetUtils.aggResultToSparkInternalRows(footer, parquetTypes,
values, aggSchema,
+ datetimeRebaseModeInRead, int96RebaseModeInRead,
convertTz(isCreatedByParquetMr(file)))
Review comment:
Why did you call the two methods here to generate an internal row? We
can write it like this?
```
val footer = getFooter(file)
ParquetUtils.createInternalRowFromAggResult(footer, dataSchema,
aggregation)
}
```
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/sources/aggregates.scala
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.types.DataType
+
+// Aggregate Functions in SQL statement.
+// e.g. SELECT COUNT(EmployeeID), Max(salary), deptID FROM dept GROUP BY deptID
+// aggregateExpressions are (COUNT(EmployeeID), Max(salary)), groupByColumns
are (deptID)
+case class Aggregation(aggregateExpressions: Seq[Seq[AggregateFunc]],
+ groupByColumns: Seq[String])
+
+abstract class AggregateFunc
+
+case class Min(column: String, dataType: DataType) extends AggregateFunc
+case class Max(column: String, dataType: DataType) extends AggregateFunc
Review comment:
but, in the current implementation, we cannot support `sum(a + b)` in
future?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -127,4 +145,292 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+
+ /**
+ * When the partial Aggregates (Max/Min/Count) are pushed down to parquet,
we don't need to
+ * createRowBaseReader to read data from parquet and aggregate at spark
layer. Instead we want
+ * to calculate the partial Aggregates (Max/Min/Count) result using the
statistics information
+ * from parquet footer file, and then construct an InternalRow from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of InternalRow
+ */
+ private[sql] def aggResultToSparkInternalRows(
+ footer: ParquetMetadata,
+ parquetTypes: Seq[PrimitiveType.PrimitiveTypeName],
+ values: Seq[Any],
+ dataSchema: StructType,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ convertTz: Option[ZoneId]): InternalRow = {
+ val mutableRow = new SpecificInternalRow(dataSchema.fields.map(x =>
x.dataType))
+ val footerFileMetaData = footer.getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ parquetTypes.zipWithIndex.map {
+ case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+ dataSchema.fields(i).dataType match {
+ case ByteType =>
+ mutableRow.setByte(i, values(i).asInstanceOf[Integer].toByte)
+ case ShortType =>
+ mutableRow.setShort(i, values(i).asInstanceOf[Integer].toShort)
+ case IntegerType =>
+ mutableRow.setInt(i, values(i).asInstanceOf[Integer])
+ case DateType =>
+ val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+ datetimeRebaseMode, "Parquet")
+ mutableRow.update(i,
dateRebaseFunc(values(i).asInstanceOf[Integer]))
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Integer].toLong,
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT32")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case d: DecimalType =>
+ val decimal = Decimal(values(i).asInstanceOf[Long], d.precision,
d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT64")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT96, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ mutableRow.setLong(i, values(i).asInstanceOf[Long])
+ case TimestampType =>
+ val int96RebaseFunc =
DataSourceUtils.creteTimestampRebaseFuncInRead(
+ int96RebaseMode, "Parquet INT96")
+ val julianMicros =
+
ParquetRowConverter.binaryToSQLTimestamp(values(i).asInstanceOf[Binary])
+ val gregorianMicros = int96RebaseFunc(julianMicros)
+ val adjTime =
+ convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _,
ZoneOffset.UTC))
+ .getOrElse(gregorianMicros)
+ mutableRow.setLong(i, adjTime)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT96")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+ mutableRow.setFloat(i, values(i).asInstanceOf[Float])
+ case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+ mutableRow.setDouble(i, values(i).asInstanceOf[Double])
+ case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+ mutableRow.setBoolean(i, values(i).asInstanceOf[Boolean])
+ case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ dataSchema.fields(i).dataType match {
+ case StringType =>
+ mutableRow.update(i, UTF8String.fromBytes(bytes))
+ case BinaryType =>
+ mutableRow.update(i, bytes)
+ case d: DecimalType =>
+ val decimal =
+ Decimal(new BigDecimal(new BigInteger(bytes), d.scale),
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
Binary")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ dataSchema.fields(i).dataType match {
+ case d: DecimalType =>
+ val decimal =
+ Decimal(new BigDecimal(new BigInteger(bytes), d.scale),
d.precision, d.scale)
+ mutableRow.setDecimal(i, decimal, d.precision)
+ case _ => throw new IllegalArgumentException("Unexpected type for
FIXED_LEN_BYTE_ARRAY")
+ }
+ case _ =>
+ throw new IllegalArgumentException("Unexpected parquet type name")
+ }
+ mutableRow
+ }
+
+ /**
+ * When the Aggregates (Max/Min/Count) are pushed down to parquet, in the
case of
+ * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need
buildColumnarReader
+ * to read data from parquet and aggregate at spark layer. Instead we want
+ * to calculate the Aggregates (Max/Min/Count) result using the statistics
information
+ * from parquet footer file, and then construct a ColumnarBatch from these
Aggregate results.
+ *
+ * @return Aggregate results in the format of ColumnarBatch
+ */
+ private[sql] def aggResultToSparkColumnarBatch(
+ footer: ParquetMetadata,
+ parquetTypes: Seq[PrimitiveType.PrimitiveTypeName],
+ values: Seq[Any],
+ dataSchema: StructType,
+ offHeap: Boolean,
+ datetimeRebaseModeInRead: String,
+ int96RebaseModeInRead: String,
+ convertTz: Option[ZoneId]): ColumnarBatch = {
+ val capacity = 4 * 1024
+ val footerFileMetaData = footer.getFileMetaData
+ val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ val columnVectors = if (offHeap) {
+ OffHeapColumnVector.allocateColumns(capacity, dataSchema)
+ } else {
+ OnHeapColumnVector.allocateColumns(capacity, dataSchema)
+ }
+
+ parquetTypes.zipWithIndex.map {
+ case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
+ dataSchema.fields(i).dataType match {
+ case ByteType =>
+ columnVectors(i).appendByte(values(i).asInstanceOf[Integer].toByte)
+ case ShortType =>
+
columnVectors(i).appendShort(values(i).asInstanceOf[Integer].toShort)
+ case IntegerType =>
+ columnVectors(i).appendInt(values(i).asInstanceOf[Integer])
+ case DateType =>
+ val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+ datetimeRebaseMode, "Parquet")
+
columnVectors(i).appendInt(dateRebaseFunc(values(i).asInstanceOf[Integer]))
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT32")
+ }
+ case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
+ columnVectors(i).appendLong(values(i).asInstanceOf[Long])
+ case (PrimitiveType.PrimitiveTypeName.INT96, i) =>
+ dataSchema.fields(i).dataType match {
+ case LongType =>
+ columnVectors(i).appendLong(values(i).asInstanceOf[Long])
+ case TimestampType =>
+ val int96RebaseFunc =
DataSourceUtils.creteTimestampRebaseFuncInRead(
+ int96RebaseMode, "Parquet INT96")
+ val julianMicros =
+
ParquetRowConverter.binaryToSQLTimestamp(values(i).asInstanceOf[Binary])
+ val gregorianMicros = int96RebaseFunc(julianMicros)
+ val adjTime =
+ convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _,
ZoneOffset.UTC))
+ .getOrElse(gregorianMicros)
+ columnVectors(i).appendLong(adjTime)
+ case _ => throw new IllegalArgumentException("Unexpected type for
INT96")
+ }
+ case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
+ columnVectors(i).appendFloat(values(i).asInstanceOf[Float])
+ case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
+ columnVectors(i).appendDouble(values(i).asInstanceOf[Double])
+ case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ columnVectors(i).putByteArray(0, bytes, 0, bytes.length)
+ case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
+ val bytes = values(i).asInstanceOf[Binary].getBytes
+ columnVectors(i).putByteArray(0, bytes, 0, bytes.length)
+ case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
+ columnVectors(i).appendBoolean(values(i).asInstanceOf[Boolean])
+ case _ =>
+ throw new IllegalArgumentException("Unexpected parquet type name")
+ }
+ new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
+ }
+
+ /**
+ * Calculate the pushed down Aggregates (Max/Min/Count) result using the
statistics
+ * information from parquet footer file.
+ *
+ * @return A tuple of `Array[PrimitiveType.PrimitiveTypeName]` and
Array[Any].
+ * The first element is the PrimitiveTypeName of the Aggregate
column,
+ * and the second element is the aggregated value.
+ */
+ private[sql] def getPushedDownAggResult(
+ footer: ParquetMetadata,
+ dataSchema: StructType,
+ aggregation: Aggregation)
+ : (Array[PrimitiveType.PrimitiveTypeName], Array[Any]) = {
+ val footerFileMetaData = footer.getFileMetaData
+ val fields = footerFileMetaData.getSchema.getFields
+ val blocks = footer.getBlocks()
+ val typesBuilder = ArrayBuilder.make[PrimitiveType.PrimitiveTypeName]
+ val valuesBuilder = ArrayBuilder.make[Any]
+
+ for (i <- 0 until aggregation.aggregateExpressions.size) {
+ var value: Any = None
+ var rowCount = 0L
+ var isCount = false
+ var index = 0
+ blocks.forEach { block =>
+ val blockMetaData = block.getColumns()
+ aggregation.aggregateExpressions(i) match {
+ case Seq(Max(col, _)) =>
+ index = dataSchema.fieldNames.toList.indexOf(col)
+ val currentMax = getCurrentBlockMaxOrMin(footer, blockMetaData,
index, true)
+ if (currentMax != None &&
+ (value == None ||
currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0)) {
+ value = currentMax
+ }
+
+ case Seq(Min(col, _)) =>
+ index = dataSchema.fieldNames.toList.indexOf(col)
+ val currentMin = getCurrentBlockMaxOrMin(footer, blockMetaData,
index, false)
+ if (currentMin != None &&
+ (value == None ||
currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0)) {
+ value = currentMin
+ }
+
+ case Seq(Count(col, _, _)) =>
+ index = dataSchema.fieldNames.toList.indexOf(col)
+ rowCount += block.getRowCount
+ if (!col.equals("1")) { // "1" is for count(*)
+ rowCount -= getNumNulls(footer, blockMetaData, index)
+ }
+ isCount = true
+
+ case _ =>
+ }
+ }
+ if (isCount) {
+ valuesBuilder += rowCount
+ typesBuilder += PrimitiveType.PrimitiveTypeName.INT96
+ } else {
+ valuesBuilder += value
+ typesBuilder += fields.get(index).asPrimitiveType.getPrimitiveTypeName
+ }
+ }
+ (typesBuilder.result(), valuesBuilder.result())
+ }
+
+ /**
+ * get the Max or Min value for ith column in the current block
+ *
+ * @return the Max or Min value
+ */
+ private def getCurrentBlockMaxOrMin(
+ footer: ParquetMetadata,
+ columnChunkMetaData: util.List[ColumnChunkMetaData],
+ i: Int,
+ isMax: Boolean): Any = {
+ val parquetType = footer.getFileMetaData.getSchema.getType(i)
+ if (!parquetType.isPrimitive) {
+ throw new IllegalArgumentException("Unsupported type : " +
parquetType.toString)
+ }
+ val statistics = columnChunkMetaData.get(i).getStatistics()
+ if (!statistics.hasNonNullValue) {
+ throw new UnsupportedOperationException("No min/max found for parquet
file, Set SQLConf" +
+ " PARQUET_AGGREGATE_PUSHDOWN_ENABLED to false and execute again")
Review comment:
We cannot avoid this exception on runtime? It looks inconvenient. For
example, in `SupportsPushDownAggregates .pushAggregation`, we cannot give up
pushing down aggregates in case of this unsupported one?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]