Repository: spark
Updated Branches:
  refs/heads/master e6e9031d7 -> 21fcac164


[SPARK-24288][SQL] Add a JDBC Option to enable preventing predicate pushdown

## What changes were proposed in this pull request?

Add a JDBC Option "pushDownPredicate" (default `true`) to allow/disallow 
predicate push-down in JDBC data source.

## How was this patch tested?

Add a test in `JDBCSuite`

Author: maryannxue <maryann...@apache.org>

Closes #21875 from maryannxue/spark-24288.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21fcac16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21fcac16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21fcac16

Branch: refs/heads/master
Commit: 21fcac1645bf01c453ddd4cb64c566895e66ea4f
Parents: e6e9031
Author: maryannxue <maryann...@apache.org>
Authored: Thu Jul 26 23:47:32 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Thu Jul 26 23:47:32 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  7 +++
 .../datasources/jdbc/JDBCOptions.scala          |  4 ++
 .../datasources/jdbc/JDBCRelation.scala         |  6 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 66 ++++++++++++++------
 4 files changed, 63 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/21fcac16/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index e815e5b..4b013c6 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1435,6 +1435,13 @@ the following case-insensitive options:
      The custom schema to use for reading data from JDBC connectors. For 
example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify 
partial fields, and the others use the default type mapping. For example, 
<code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the 
corresponding column names of JDBC table. Users can specify the corresponding 
data types of Spark SQL instead of using the defaults. This option applies only 
to reading.
     </td>
   </tr>
+
+  <tr>
+    <td><code>pushDownPredicate</code></td>
+    <td>
+     The option to enable or disable predicate push-down into the JDBC data 
source. The default value is true, in which case Spark will push down filters 
to the JDBC data source as much as possible. Otherwise, if set to false, no 
filter will be pushed down to the JDBC data source and thus all filters will be 
handled by Spark. Predicate push-down is usually turned off when the predicate 
filtering is performed faster by Spark than by the JDBC data source.
+    </td>
+  </tr>
 </table>
 
 <div class="codetabs">

http://git-wip-us.apache.org/repos/asf/spark/blob/21fcac16/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 574aed4..d80efce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -183,6 +183,9 @@ class JDBCOptions(
     }
   // An option to execute custom SQL before fetching data from the remote DB
   val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
+
+  // An option to allow/disallow pushing down predicate into JDBC data source
+  val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, 
"true").toBoolean
 }
 
 class JdbcOptionsInWrite(
@@ -234,4 +237,5 @@ object JDBCOptions {
   val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
   val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
   val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
+  val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/21fcac16/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 97e2d25..4f78f59 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -172,7 +172,11 @@ private[sql] case class JDBCRelation(
 
   // Check if JDBCRDD.compileFilter can accept input filters
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
-    filters.filter(JDBCRDD.compileFilter(_, 
JdbcDialects.get(jdbcOptions.url)).isEmpty)
+    if (jdbcOptions.pushDownPredicate) {
+      filters.filter(JDBCRDD.compileFilter(_, 
JdbcDialects.get(jdbcOptions.url)).isEmpty)
+    } else {
+      filters
+    }
   }
 
   override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/21fcac16/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 09facb9..0edbd3a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -261,21 +261,32 @@ class JDBCSuite extends QueryTest
       s"Expecting a JDBCRelation with $expectedNumPartitions partitions, but 
got:`$jdbcRelations`")
   }
 
+  private def checkPushdown(df: DataFrame): DataFrame = {
+    val parentPlan = df.queryExecution.executedPlan
+    // Check if SparkPlan Filter is removed in a physical plan and
+    // the plan only has PhysicalRDD to scan JDBCRelation.
+    
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+    val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+    
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
+    
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
+    df
+  }
+
+  private def checkNotPushdown(df: DataFrame): DataFrame = {
+    val parentPlan = df.queryExecution.executedPlan
+    // Check if SparkPlan Filter is not removed in a physical plan because 
JDBCRDD
+    // cannot compile given predicates.
+    
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+    val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+    assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
+    df
+  }
+
   test("SELECT *") {
     assert(sql("SELECT * FROM foobar").collect().size === 3)
   }
 
   test("SELECT * WHERE (simple predicates)") {
-    def checkPushdown(df: DataFrame): DataFrame = {
-      val parentPlan = df.queryExecution.executedPlan
-      // Check if SparkPlan Filter is removed in a physical plan and
-      // the plan only has PhysicalRDD to scan JDBCRelation.
-      
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
-      val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
-      
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
-      
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
-      df
-    }
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 
1")).collect().size == 0)
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 
2")).collect().size == 2)
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 
1")).collect().size == 1)
@@ -308,15 +319,6 @@ class JDBCSuite extends QueryTest
       "WHERE (THEID > 0 AND TRIM(NAME) = 'mary') OR (NAME = 'fred')")
     assert(df2.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
 
-    def checkNotPushdown(df: DataFrame): DataFrame = {
-      val parentPlan = df.queryExecution.executedPlan
-      // Check if SparkPlan Filter is not removed in a physical plan because 
JDBCRDD
-      // cannot compile given predicates.
-      
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
-      val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
-      
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
-      df
-    }
     assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 
2")).collect().size == 0)
     assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 
4")).collect().size == 2)
   }
@@ -1375,4 +1377,30 @@ class JDBCSuite extends QueryTest
       Row("fred", 1) :: Nil)
 
   }
+
+  test("SPARK-24288: Enable preventing predicate pushdown") {
+    val table = "test.people"
+
+    val df = spark.read.format("jdbc")
+      .option("Url", urlWithUserAndPass)
+      .option("dbTable", table)
+      .option("pushDownPredicate", false)
+      .load()
+      .filter("theid = 1")
+      .select("name", "theid")
+    checkAnswer(
+      checkNotPushdown(df),
+      Row("fred", 1) :: Nil)
+
+    // pushDownPredicate option in the create table path.
+    sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW predicateOption
+         |USING org.apache.spark.sql.jdbc
+         |OPTIONS (url '$urlWithUserAndPass', dbTable '$table', 
pushDownPredicate 'false')
+       """.stripMargin.replaceAll("\n", " "))
+    checkAnswer(
+      checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE 
theid = 1")),
+      Row("fred", 1) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to