Github user dongjoon-hyun commented on a diff in the pull request:
https://github.com/apache/spark/pull/22313#discussion_r214744306
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala ---
@@ -55,19 +59,52 @@ import org.apache.spark.sql.types._
* known to be convertible.
*/
private[orc] object OrcFilters extends Logging {
+ case class FilterWithTypeMap(filter: Filter, typeMap: Map[String,
DataType])
+
+ private lazy val cacheExpireTimeout =
+
org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout
+
+ private lazy val searchArgumentCache = CacheBuilder.newBuilder()
+ .expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS)
+ .build(
+ new CacheLoader[FilterWithTypeMap, Option[Builder]]() {
+ override def load(typeMapAndFilter: FilterWithTypeMap):
Option[Builder] = {
+ buildSearchArgument(
+ typeMapAndFilter.typeMap, typeMapAndFilter.filter,
SearchArgumentFactory.newBuilder())
+ }
+ })
+
+ private def getOrBuildSearchArgumentWithNewBuilder(
+ dataTypeMap: Map[String, DataType],
+ expression: Filter): Option[Builder] = {
+ // When `spark.sql.orc.cache.sarg.timeout` is 0, cache is disabled.
+ if (cacheExpireTimeout > 0) {
+ searchArgumentCache.get(FilterWithTypeMap(expression, dataTypeMap))
+ } else {
+ buildSearchArgument(dataTypeMap, expression,
SearchArgumentFactory.newBuilder())
+ }
+ }
+
def createFilter(schema: StructType, filters: Array[Filter]):
Option[SearchArgument] = {
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
// First, tries to convert each filter individually to see whether
it's convertible, and then
// collect all convertible ones to build the final `SearchArgument`.
val convertibleFilters = for {
filter <- filters
- _ <- buildSearchArgument(dataTypeMap, filter,
SearchArgumentFactory.newBuilder())
+ _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, filter)
} yield filter
for {
// Combines all convertible filters using `And` to produce a single
conjunction
- conjunction <- convertibleFilters.reduceOption(And)
+ conjunction <- convertibleFilters.reduceOption { (x, y) =>
+ val newFilter = org.apache.spark.sql.sources.And(x, y)
+ if (cacheExpireTimeout > 0) {
+ // Build in a bottom-up manner
+ getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, newFilter)
+ }
--- End diff --
Final conjunction? All sub function results will be cached in the end.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]