Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21623#discussion_r199078944
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
---
@@ -270,6 +273,36 @@ private[parquet] class ParquetFilters(pushDownDate:
Boolean) {
case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)
+ case sources.StringStartsWith(name, prefix) if pushDownStartWith &&
canMakeFilterOn(name) =>
+ Option(prefix).map { v =>
+ FilterApi.userDefined(binaryColumn(name),
+ new UserDefinedPredicate[Binary] with Serializable {
+ private val strToBinary =
Binary.fromReusedByteArray(v.getBytes)
+ private val size = strToBinary.length
+
+ override def canDrop(statistics: Statistics[Binary]):
Boolean = {
+ val comparator =
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
+ val max = statistics.getMax
+ val min = statistics.getMin
+ comparator.compare(max.slice(0, math.min(size,
max.length)), strToBinary) < 0 ||
+ comparator.compare(min.slice(0, math.min(size,
min.length)), strToBinary) > 0
+ }
+
+ override def inverseCanDrop(statistics: Statistics[Binary]):
Boolean = {
+ val comparator =
PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR
+ val max = statistics.getMax
+ val min = statistics.getMin
+ comparator.compare(max.slice(0, math.min(size,
max.length)), strToBinary) == 0 &&
+ comparator.compare(min.slice(0, math.min(size,
min.length)), strToBinary) == 0
+ }
+
+ override def keep(value: Binary): Boolean = {
+
UTF8String.fromBytes(value.getBytes).startsWith(UTF8String.fromString(v))
--- End diff --
`UTF8String.fromString(v)` -> `UTF8String.fromBytes(strToBinary.getBytes)`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]