PHOENIX-4966 Implement unhandledFilters in PhoenixRelation so that spark only 
evaluates filters when required


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a694638f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a694638f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a694638f

Branch: refs/heads/4.x-cdh5.15
Commit: a694638fa8b7a4c7bd1a0b3b2b8874830f7760e8
Parents: fb1e8f7
Author: Thomas D'Silva <tdsi...@apache.org>
Authored: Thu Oct 11 23:46:48 2018 +0100
Committer: Pedro Boado <pbo...@apache.org>
Committed: Wed Oct 17 22:50:43 2018 +0100

----------------------------------------------------------------------
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala   | 14 +++++++-------
 .../org/apache/phoenix/spark/PhoenixRelation.scala  | 16 ++++++++++++----
 2 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a694638f/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git 
a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala 
b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index b8e44fe..4e11acc 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -285,13 +285,13 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     // Make sure we got the right value back
     assert(res.first().getLong(0) == 1L)
 
-    /*
-      NOTE: There doesn't appear to be any way of verifying from the Spark 
query planner that
-      filtering is being pushed down and done server-side. However, since 
PhoenixRelation
-      implements PrunedFilteredScan, debugging has shown that both the SELECT 
columns and WHERE
-      predicates are being passed along to us, which we then forward it to 
Phoenix.
-      TODO: investigate further to find a way to verify server-side pushdown
-     */
+    val plan = res.queryExecution.sparkPlan
+    // filters should be pushed into phoenix relation
+    assert(plan.toString.contains("PushedFilters: [IsNotNull(COL1), 
IsNotNull(ID), " +
+      "EqualTo(COL1,test_row_1), EqualTo(ID,1)]"))
+    // spark should run the filters on the rows returned by Phoenix
+    assert(!plan.toString.contains("Filter (((isnotnull(COL1#8) && 
isnotnull(ID#7L)) " +
+      "&& (COL1#8 = test_row_1)) && (ID#7L = 1))"))
   }
 
   test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a694638f/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
----------------------------------------------------------------------
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index d2eac8c..38bf29a 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -36,11 +36,12 @@ case class PhoenixRelation(tableName: String, zkUrl: 
String, dateAsTimestamp: Bo
     but this prevents having to load the whole table into Spark first.
   */
   override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    val(pushedFilters, unhandledFilters) = buildFilter(filters)
     new PhoenixRDD(
       sqlContext.sparkContext,
       tableName,
       requiredColumns,
-      Some(buildFilter(filters)),
+      Some(pushedFilters),
       Some(zkUrl),
       new Configuration(),
       dateAsTimestamp
@@ -62,12 +63,13 @@ case class PhoenixRelation(tableName: String, zkUrl: 
String, dateAsTimestamp: Bo
 
   // Attempt to create Phoenix-accepted WHERE clauses from Spark filters,
   // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
-  private def buildFilter(filters: Array[Filter]): String = {
+  private def buildFilter(filters: Array[Filter]): (String, Array[Filter]) = {
     if (filters.isEmpty) {
-      return ""
+      return ("" , Array[Filter]())
     }
 
     val filter = new StringBuilder("")
+    val unsupportedFilters = Array[Filter]();
     var i = 0
 
     filters.foreach(f => {
@@ -92,12 +94,18 @@ case class PhoenixRelation(tableName: String, zkUrl: 
String, dateAsTimestamp: Bo
         case StringStartsWith(attr, value) => filter.append(s" 
${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
         case StringEndsWith(attr, value) => filter.append(s" 
${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
         case StringContains(attr, value) => filter.append(s" 
${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
+        case _ => unsupportedFilters :+ f
       }
 
       i = i + 1
     })
 
-    filter.toString()
+    (filter.toString(), unsupportedFilters)
+  }
+
+  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
+    val(pushedFilters, unhandledFilters) = buildFilter(filters)
+    unhandledFilters
   }
 
   // Helper function to escape column key to work with SQL queries

Reply via email to