[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21143


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21143#discussion_r184601974
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -224,24 +226,29 @@ object DataSourceV2Relation {
 )
 
   case filterSupport: SupportsPushDownFilters =>
-// A map from original Catalyst expressions to corresponding 
translated data source
-// filters. If a predicate is not in this map, it means it cannot 
be pushed down.
-val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
-  DataSourceStrategy.translateFilter(p).map(f => p -> f)
-}.toMap
+val translateResults: Seq[(Option[Filter], Expression)] = 
filters.map { p =>
+  (DataSourceStrategy.translateFilter(p), p)
+}
 
 // Catalyst predicate expressions that cannot be converted to data 
source filters.
-val nonConvertiblePredicates = 
filters.filterNot(translatedMap.contains)
+val nonConvertiblePredicates = translateResults.collect {
--- End diff --

Avoid using the `predicates` and `filters` as the variable names in the 
same function. This just makes the reader confused. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21143#discussion_r184601574
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -224,24 +226,29 @@ object DataSourceV2Relation {
 )
 
   case filterSupport: SupportsPushDownFilters =>
-// A map from original Catalyst expressions to corresponding 
translated data source
-// filters. If a predicate is not in this map, it means it cannot 
be pushed down.
-val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
-  DataSourceStrategy.translateFilter(p).map(f => p -> f)
-}.toMap
+val translateResults: Seq[(Option[Filter], Expression)] = 
filters.map { p =>
+  (DataSourceStrategy.translateFilter(p), p)
+}
 
 // Catalyst predicate expressions that cannot be converted to data 
source filters.
-val nonConvertiblePredicates = 
filters.filterNot(translatedMap.contains)
+val nonConvertiblePredicates = translateResults.collect {
+  case (None, p) => p
+}
 
-// Data source filters that cannot be pushed down. An unhandled 
filter means
-// the data source cannot guarantee the rows returned can pass the 
filter.
+// A map from translated data source filters to original catalyst 
predicate expressions.
+val translatedMap: Map[Filter, Expression] = 
translateResults.collect {
+  case (Some(f), p) => (f, p)
+}.toMap
+
+// Data source filters that need to be evaluated again after 
scanning. which means
+// the data source cannot guarantee the rows returned can pass 
these filters.
 // As a result we must return it so Spark can plan an extra filter 
operator.
-val unhandledFilters = 
filterSupport.pushFilters(translatedMap.values.toArray).toSet
-val (unhandledPredicates, pushedPredicates) = 
translatedMap.partition { case (_, f) =>
-  unhandledFilters.contains(f)
-}
+val afterScanPredicates =
+  
filterSupport.pushFilters(translatedMap.keys.toArray).map(translatedMap)
+// The filters which are marked as pushed to this data source
+val pushedPredicates = 
filterSupport.pushedFilters().map(translatedMap)
--- End diff --

Add the logInfo messages for nonConvertiblePredicates, afterScanPredicates, 
pushedPredicates


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21143#discussion_r184600798
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -224,24 +226,29 @@ object DataSourceV2Relation {
 )
--- End diff --

the indents between 221 and 223 look weird. Also correct them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21143#discussion_r184600589
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -56,7 +56,7 @@ case class DataSourceV2Relation(
 
   lazy val (
--- End diff --

uh... Then, we should mention it in the comment too. For example, 
```
// afterScanFilters: predicates that need to be evaluated after the scan.
// pushedFilters: predicates that will be pushed down and only evaluated in 
the underlying data sources.
// Note: afterScanFilters and pushedFilters can overlap. For example, the 
parquet row group filter
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21143#discussion_r184599222
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -56,7 +56,7 @@ case class DataSourceV2Relation(
 
   lazy val (
--- End diff --

they can overlap(think about parquet row group filter), so I'd remove the 
`only` in the above comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21143#discussion_r184598059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -56,7 +56,7 @@ case class DataSourceV2Relation(
 
   lazy val (
--- End diff --

// afterScanFilters: predicates that need to be evaluated after the scan.
// pushedFilters: predicates that will be pushed down and only evaluated in 
the underlying data sources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-24 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21143#discussion_r183925902
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -237,11 +239,16 @@ object DataSourceV2Relation {
 // the data source cannot guarantee the rows returned can pass the 
filter.
 // As a result we must return it so Spark can plan an extra filter 
operator.
 val unhandledFilters = 
filterSupport.pushFilters(translatedMap.values.toArray).toSet
-val (unhandledPredicates, pushedPredicates) = 
translatedMap.partition { case (_, f) =>
+val unhandledPredicates = translatedMap.filter { case (_, f) =>
   unhandledFilters.contains(f)
-}
-
-(nonConvertiblePredicates ++ unhandledPredicates.keys, 
pushedPredicates.keys.toSeq)
+}.keys
+// The filters which are marked as pushed to this data source
+val pushedFilters = filterSupport.pushedFilters()
+val pushedPredicates = translatedMap.filter { case (_, f) =>
+  pushedFilters.contains(f)
+}.keys.toSeq
+
+(nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates)
--- End diff --

Or
```
val translatedResults: Seq[(Option[Filter], Expression)] = 
filters.map { p =>
  (DataSourceStrategy.translateFilter(p), p)
}

// A map from original Catalyst expressions to corresponding 
translated data source
// filters. If a predicate is not in this map, it means it cannot 
be pushed down.
val translatedMap: Map[Filter, Expression] = 
translatedResults.collect {
  case (f, p) if f.isDefined => (f.get, p)
}.toMap

// Catalyst predicate expressions that cannot be converted to data 
source filters.
val nonConvertiblePredicates = translatedResults.collect {
  case (f, p) if f.isEmpty => p
}

// Data source filters that cannot be pushed down. An unhandled 
filter means
// the data source cannot guarantee the rows returned can pass the 
filter.
// As a result we must return it so Spark can plan an extra filter 
operator.
val unhandledPredicates =
  
filterSupport.pushFilters(translatedMap.keys.toArray).map(translatedMap)
// The filters which are marked as pushed to this data source
val pushedPredicates = 
filterSupport.pushedFilters().map(translatedMap)

(nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters

2018-04-24 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21143#discussion_r183924792
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -237,11 +239,16 @@ object DataSourceV2Relation {
 // the data source cannot guarantee the rows returned can pass the 
filter.
 // As a result we must return it so Spark can plan an extra filter 
operator.
 val unhandledFilters = 
filterSupport.pushFilters(translatedMap.values.toArray).toSet
-val (unhandledPredicates, pushedPredicates) = 
translatedMap.partition { case (_, f) =>
+val unhandledPredicates = translatedMap.filter { case (_, f) =>
   unhandledFilters.contains(f)
-}
-
-(nonConvertiblePredicates ++ unhandledPredicates.keys, 
pushedPredicates.keys.toSeq)
+}.keys
+// The filters which are marked as pushed to this data source
+val pushedFilters = filterSupport.pushedFilters()
+val pushedPredicates = translatedMap.filter { case (_, f) =>
+  pushedFilters.contains(f)
+}.keys.toSeq
+
+(nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates)
--- End diff --

maybe we can reverse the map and make it simpler:
```
val translatedMap: Map[Filter, Expression] = filters.flatMap { p =>
  DataSourceStrategy.translateFilter(p).map(f => f -> p)
}.toMap

// Catalyst predicate expressions that cannot be converted to data 
source filters.
val convertiblePredicates = translatedMap.values.toSet
val nonConvertiblePredicates = 
filters.filterNot(convertiblePredicates.contains)

// Data source filters that cannot be pushed down. An unhandled 
filter means
// the data source cannot guarantee the rows returned can pass the 
filter.
// As a result we must return it so Spark can plan an extra filter 
operator.
val unhandledPredicates =
  
filterSupport.pushFilters(translatedMap.keys.toArray).map(translatedMap)
// The filters which are marked as pushed to this data source
val pushedPredicates = 
filterSupport.pushedFilters().map(translatedMap)

(nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org