[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r201190701 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,187 +19,200 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters.asScalaBufferConverter + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType} +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + private case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + + private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null) + private val ParquetIntegerType = ParquetSchemaType(null, INT32, null) + private val ParquetLongType = ParquetSchemaType(null, INT64, null) + private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null) + private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null) + private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null) + private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null) + private val ParquetDateType = ParquetSchemaType(DATE, INT32, null) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetBooleanType => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetIntegerType => (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetLongType => (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetFloatType => (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +case ParquetDoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) // Binary.fromString and Binary.fromByteArray don't accept null values -case StringType => +case ParquetStringType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) -case BinaryType => +case ParquetBinaryType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) -case DateType if pushDownDate => +case ParquetDateType if pushDownDate => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetBooleanType => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetIntegerType => (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetLongType => (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetFloatType => (n: String, v: Any) =>
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r201069421 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,187 +19,200 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters.asScalaBufferConverter + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType} +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + private case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + + private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null) + private val ParquetIntegerType = ParquetSchemaType(null, INT32, null) + private val ParquetLongType = ParquetSchemaType(null, INT64, null) + private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null) + private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null) + private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null) + private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null) + private val ParquetDateType = ParquetSchemaType(DATE, INT32, null) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetBooleanType => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetIntegerType => (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetLongType => (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetFloatType => (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +case ParquetDoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) // Binary.fromString and Binary.fromByteArray don't accept null values -case StringType => +case ParquetStringType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) -case BinaryType => +case ParquetBinaryType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) -case DateType if pushDownDate => +case ParquetDateType if pushDownDate => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetBooleanType => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetIntegerType => (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetLongType => (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetFloatType => (n: String, v: Any) =>
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21696 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r200011264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,187 +19,200 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters.asScalaBufferConverter + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType} +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + private case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + + private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null) + private val ParquetIntegerType = ParquetSchemaType(null, INT32, null) + private val ParquetLongType = ParquetSchemaType(null, INT64, null) + private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null) + private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null) + private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null) + private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null) + private val ParquetDateType = ParquetSchemaType(DATE, INT32, null) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetBooleanType => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetIntegerType => (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetLongType => (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetFloatType => (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +case ParquetDoubleType => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) // Binary.fromString and Binary.fromByteArray don't accept null values -case StringType => +case ParquetStringType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) -case BinaryType => +case ParquetBinaryType => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) -case DateType if pushDownDate => +case ParquetDateType if pushDownDate => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetBooleanType => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetIntegerType => (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetLongType => (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetFloatType => (n: String, v: Any) =>
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r19002 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +// IntegerType +case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +// LongType +case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +// FloatType +case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +// DoubleType +case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) +// StringType // Binary.fromString and Binary.fromByteArray don't accept null values -case StringType => +case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) -case BinaryType => +// BinaryType +case ParquetSchemaType(null, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) -case DateType if pushDownDate => +// DateType +case ParquetSchemaType(DATE, INT32, null) if pushDownDate => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) -case StringType => +case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) => FilterApi.notEq(
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r11214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +// IntegerType +case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +// LongType +case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +// FloatType +case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +// DoubleType +case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) +// StringType // Binary.fromString and Binary.fromByteArray don't accept null values -case StringType => +case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) -case BinaryType => +// BinaryType +case ParquetSchemaType(null, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) -case DateType if pushDownDate => +// DateType +case ParquetSchemaType(DATE, INT32, null) if pushDownDate => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) -case StringType => +case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) =>
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r11109 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +// IntegerType +case ParquetSchemaType(null, INT32, null) => --- End diff -- the safest way is to look at both file's type and Spark's type, and deal with type mismatch. We can do it later since it's an existing problem. Currently Spark tries its best to guarantee the type matches(except missing/extra columns). The only case I can think of that may break the assumption is: the parquet files have conflicting schema and Sparks read them using a user-specified schema(so that we can skip schema inference) that doesn't match all the parquet files. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199980993 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +// IntegerType +case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +// LongType +case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +// FloatType +case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +// DoubleType +case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) +// StringType // Binary.fromString and Binary.fromByteArray don't accept null values -case StringType => +case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) -case BinaryType => +// BinaryType +case ParquetSchemaType(null, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) -case DateType if pushDownDate => +// DateType +case ParquetSchemaType(DATE, INT32, null) if pushDownDate => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) -case LongType => +case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) -case FloatType => +case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) -case DoubleType => +case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) -case StringType => +case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) => FilterApi.notEq(
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199980897 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType --- End diff -- The other partial functions don't have these comments. Is that on purpose? Maybe these should be constants instead to make the code more readable and consistent? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199980632 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType +case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) -case IntegerType => +// IntegerType +case ParquetSchemaType(null, INT32, null) => --- End diff -- Before, it was a valid assumption that the value's type matched the `DataType`. Now that this is the file's type that might not be the case. For example, byte and short are stored as INT32. This should cast to Number and then convert to the file's type. I would also do this for INT64 columns, in case the schema has evolved and a column that was INT32 is not INT64. The converters (used to materialize records) don't currently support this, but it would be reasonable for them to support it eventually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199979463 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -379,14 +366,29 @@ class ParquetFileFormat null) val sharedConf = broadcastedHadoopConf.value.value + + val fileMetaData = +ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS).getFileMetaData --- End diff -- We should *always* read the footer and adjust the filters to match. In our version we use the following to handle situations where the column names have changed case or where Hive has returned a different case: ``` // Try to push down filters when filter push-down is enabled. val pushed = if (pushdownEnabled) { // read the file schema to create Parquet filters that match case val fileReader = ParquetFileReader.open(conf, fileSplit.getPath) val fileSchema = try { new ParquetToSparkSchemaConverter(conf).convert( ParquetReadSupport.clipParquetSchema( fileReader.getFileMetaData.getSchema, requiredSchema)) } finally { fileReader.close() } filters // Collects all converted Parquet filter predicates. Notice that not all predicates can // be converted (`ParquetFilters.createFilter` returns an `Option`). That's why a // `flatMap` is used here. .flatMap(ParquetFilters.createFilter(fileSchema, _)) .reduceOption(FilterApi.and) } else { None } ``` That's pretty much the same thing as here. In addition, if columns can be renamed within a Parquet file then you need to push filters for the names used in the file's schema. I don't think that's a problem here because I don't know of a way to rename columns in a Spark Parquet table. (Iceberg uses field IDs to do this.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199686314 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -379,14 +366,29 @@ class ParquetFileFormat null) val sharedConf = broadcastedHadoopConf.value.value + + val fileMetaData = +ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS).getFileMetaData --- End diff -- Yes, I think so. It should be avoided.`isCreatedByParquetMr` was intentionally a function to avoid it by short circuiting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199678338 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -379,14 +366,29 @@ class ParquetFileFormat null) val sharedConf = broadcastedHadoopConf.value.value + + val fileMetaData = +ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS).getFileMetaData --- End diff -- will we read footer again in the parquet reader? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199672805 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { -case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { +// BooleanType +case ParquetSchemaType(null, BOOLEAN, null) => --- End diff -- Mapping type reference: https://github.com/apache/spark/blob/21a7bfd5c324e6c82152229f1394f26afeae771c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L338-L560 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21696: [SPARK-24716][SQL] Refactor ParquetFilters
GitHub user wangyum opened a pull request: https://github.com/apache/spark/pull/21696 [SPARK-24716][SQL] Refactor ParquetFilters ## What changes were proposed in this pull request? Replace DataFrame schema to Parquet file schema when create `ParquetFilters`. more details will add later. ## How was this patch tested? unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangyum/spark SPARK-24716 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21696.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21696 commit 10d408fd3fe429d5529755b5abebac86b22b6d55 Author: Yuming Wang Date: 2018-07-02T09:36:46Z Refactor ParquetFilters --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org