Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21696#discussion_r201069421
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
---
@@ -19,187 +19,200 @@ package
org.apache.spark.sql.execution.datasources.parquet
import java.sql.Date
+import scala.collection.JavaConverters.asScalaBufferConverter
+
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.PrimitiveComparator
+import org.apache.parquet.schema.{DecimalMetadata, MessageType,
OriginalType, PrimitiveComparator, PrimitiveType}
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
import org.apache.spark.sql.sources
-import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* Some utility function to convert Spark data source filters to Parquet
filters.
*/
private[parquet] class ParquetFilters(pushDownDate: Boolean,
pushDownStartWith: Boolean) {
+ private case class ParquetSchemaType(
+ originalType: OriginalType,
+ primitiveTypeName: PrimitiveTypeName,
+ decimalMetadata: DecimalMetadata)
+
+ private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
+ private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
+ private val ParquetLongType = ParquetSchemaType(null, INT64, null)
+ private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
+ private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
+ private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
+ private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
+ private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
+
private def dateToDays(date: Date): SQLDate = {
DateTimeUtils.fromJavaDate(date)
}
- private val makeEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case BooleanType =>
+ private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ case ParquetBooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n),
v.asInstanceOf[java.lang.Boolean])
- case IntegerType =>
+ case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.eq(intColumn(n),
v.asInstanceOf[Integer])
- case LongType =>
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.eq(longColumn(n),
v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.eq(floatColumn(n),
v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
// Binary.fromString and Binary.fromByteArray don't accept null values
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s =>
Binary.fromString(s.asInstanceOf[String])).orNull)
- case BinaryType =>
+ case ParquetBinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b =>
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- case DateType if pushDownDate =>
+ case ParquetDateType if pushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
- private val makeNotEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case BooleanType =>
+ private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any)
=> FilterPredicate] = {
+ case ParquetBooleanType =>
(n: String, v: Any) => FilterApi.notEq(booleanColumn(n),
v.asInstanceOf[java.lang.Boolean])
- case IntegerType =>
+ case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.notEq(intColumn(n),
v.asInstanceOf[Integer])
- case LongType =>
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.notEq(longColumn(n),
v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.notEq(floatColumn(n),
v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s =>
Binary.fromString(s.asInstanceOf[String])).orNull)
- case BinaryType =>
+ case ParquetBinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b =>
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- case DateType if pushDownDate =>
+ case ParquetDateType if pushDownDate =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
}
- private val makeLt: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case IntegerType =>
+ private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ case ParquetIntegerType =>
(n: String, v: Any) => FilterApi.lt(intColumn(n),
v.asInstanceOf[Integer])
- case LongType =>
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.lt(longColumn(n),
v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.lt(floatColumn(n),
v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
- case BinaryType =>
+ FilterApi.lt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
+ case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.lt(
- intColumn(n),
- Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+ case ParquetDateType if pushDownDate =>
+ (n: String, v: Any) =>
+ FilterApi.lt(intColumn(n),
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}
- private val makeLtEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.ltEq(intColumn(n),
v.asInstanceOf[java.lang.Integer])
- case LongType =>
+ private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any)
=> FilterPredicate] = {
+ case ParquetIntegerType =>
+ (n: String, v: Any) => FilterApi.ltEq(intColumn(n),
v.asInstanceOf[Integer])
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.ltEq(longColumn(n),
v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n),
v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) =>
- FilterApi.ltEq(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
- case BinaryType =>
+ FilterApi.ltEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
+ case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.ltEq(
- intColumn(n),
- Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+ case ParquetDateType if pushDownDate =>
+ (n: String, v: Any) =>
+ FilterApi.ltEq(intColumn(n),
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}
- private val makeGt: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.gt(intColumn(n),
v.asInstanceOf[java.lang.Integer])
- case LongType =>
+ private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) =>
FilterPredicate] = {
+ case ParquetIntegerType =>
+ (n: String, v: Any) => FilterApi.gt(intColumn(n),
v.asInstanceOf[Integer])
+ case ParquetLongType =>
(n: String, v: Any) => FilterApi.gt(longColumn(n),
v.asInstanceOf[java.lang.Long])
- case FloatType =>
+ case ParquetFloatType =>
(n: String, v: Any) => FilterApi.gt(floatColumn(n),
v.asInstanceOf[java.lang.Float])
- case DoubleType =>
+ case ParquetDoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
- case StringType =>
+ case ParquetStringType =>
(n: String, v: Any) =>
- FilterApi.gt(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
- case BinaryType =>
+ FilterApi.gt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
+ case ParquetBinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- case DateType if pushDownDate =>
- (n: String, v: Any) => FilterApi.gt(
- intColumn(n),
- Option(v).map(date =>
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
+ case ParquetDateType if pushDownDate =>
+ (n: String, v: Any) =>
+ FilterApi.gt(intColumn(n),
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
}
- private val makeGtEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
- case IntegerType =>
- (n: String, v: Any) => FilterApi.gtEq(intColumn(n),
v.asInstanceOf[java.lang.Integer])
- case LongType =>
+ private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any)
=> FilterPredicate] = {
+ case ParquetIntegerType =>
+ (n: String, v: Any) => FilterApi.gtEq(intColumn(n),
v.asInstanceOf[Integer])
--- End diff --
We have a problem here. we map Spark byte/short to parquet int, so the `v`
here can be `java.lang.Byte` and we may have a cast exception. We should do
`v.asInstanceOf[java.lang.Number].intValue`. @wangyum can you fix it and add a
test? thanks!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]