Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/21696#discussion_r199980993
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
---
@@ -19,166 +19,186 @@ package
org.apache.spark.sql.execution.datasources.parquet
import java.sql.Date
+import scala.collection.JavaConverters._
+
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema._
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* Some utility function to convert Spark data source filters to Parquet
filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean,
pushDownStartWith: Boolean) {
+ case class ParquetSchemaType(
+ originalType: OriginalType,
+ primitiveTypeName: PrimitiveType.PrimitiveTypeName,
+ decimalMetadata: DecimalMetadata)
+
private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
}
- private val makeEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case BooleanType =>
+ private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ // BooleanType
+ case ParquetSchemaType(null, BOOLEAN, null) =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n),
v.asInstanceOf[java.lang.Boolean])
- case IntegerType =>
+ // IntegerType
+ case ParquetSchemaType(null, INT32, null) =>
(n: String, v: Any) => FilterApi.eq(intColumn(n),
v.asInstanceOf[Integer])
- case LongType =>
+ // LongType
+ case ParquetSchemaType(null, INT64, null) =>
(n: String, v: Any) => FilterApi.eq(longColumn(n),
v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ // FloatType
+ case ParquetSchemaType(null, FLOAT, null) =>
(n: String, v: Any) => FilterApi.eq(floatColumn(n),
v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ // DoubleType
+ case ParquetSchemaType(null, DOUBLE, null) =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
+ // StringType
// Binary.fromString and Binary.fromByteArray don't accept null values
- case StringType =>
+ case ParquetSchemaType(UTF8, BINARY, null) =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s =>
Binary.fromString(s.asInstanceOf[String])).orNull)
- case BinaryType =>
+ // BinaryType
+ case ParquetSchemaType(null, BINARY, null) =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b =>
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- case DateType if pushDownDate =>
+ // DateType
+ case ParquetSchemaType(DATE, INT32, null) if pushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
- private val makeNotEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case BooleanType =>
+ private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any)
=> FilterPredicate] = {
+ case ParquetSchemaType(null, BOOLEAN, null) =>
(n: String, v: Any) => FilterApi.notEq(booleanColumn(n),
v.asInstanceOf[java.lang.Boolean])
- case IntegerType =>
+ case ParquetSchemaType(null, INT32, null) =>
(n: String, v: Any) => FilterApi.notEq(intColumn(n),
v.asInstanceOf[Integer])
- case LongType =>
+ case ParquetSchemaType(null, INT64, null) =>
(n: String, v: Any) => FilterApi.notEq(longColumn(n),
v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetSchemaType(null, FLOAT, null) =>
(n: String, v: Any) => FilterApi.notEq(floatColumn(n),
v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetSchemaType(null, DOUBLE, null) =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetSchemaType(UTF8, BINARY, null) =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s =>
Binary.fromString(s.asInstanceOf[String])).orNull)
- case BinaryType =>
+ case ParquetSchemaType(null, BINARY, null) =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b =>
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- case DateType if pushDownDate =>
+ case ParquetSchemaType(DATE, INT32, null) if pushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
- private val makeLt: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case IntegerType =>
+ private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ case ParquetSchemaType(null, INT32, null) =>
(n: String, v: Any) => FilterApi.lt(intColumn(n),
v.asInstanceOf[Integer])
- case LongType =>
+ case ParquetSchemaType(null, INT64, null) =>
(n: String, v: Any) => FilterApi.lt(longColumn(n),
v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetSchemaType(null, FLOAT, null) =>
(n: String, v: Any) => FilterApi.lt(floatColumn(n),
v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetSchemaType(null, DOUBLE, null) =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
- case StringType =>
- (n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
- case BinaryType =>
- (n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n),
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if pushDownDate =>
+ case ParquetSchemaType(UTF8, BINARY, null) =>
+ (n: String, v: Any) => FilterApi.lt(
+ binaryColumn(n),
+ Option(v).map(s =>
Binary.fromString(s.asInstanceOf[String])).orNull)
--- End diff --
Why did this introduce `Option` to handle `null` passed as `v`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]