This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 81a7f8f184c [SPARK-45449][SQL] Cache Invalidation Issue with JDBC Table
81a7f8f184c is described below

commit 81a7f8f184cd597208fcad72130354288a0c9f79
Author: liangyongyuan <liangyongy...@xiaomi.com>
AuthorDate: Tue Oct 10 14:40:33 2023 +0800

    [SPARK-45449][SQL] Cache Invalidation Issue with JDBC Table
    
    ### What changes were proposed in this pull request?
    Add an equals method to `JDBCOptions` that considers two instances equal if 
their `JDBCOptions.parameters` are the same.
    
    ### Why are the changes needed?
    We have identified a cache invalidation issue when caching JDBC tables in 
Spark SQL. The cached table is unexpectedly invalidated when queried, leading 
to a re-read from the JDBC table instead of retrieving data from the cache.
    Example SQL:
    
    ```
    CACHE TABLE cache_t SELECT * FROM mysql.test.test1;
    SELECT * FROM cache_t;
    ```
    Expected Behavior:
    The expectation is that querying the cached table (cache_t) should retrieve 
the result from the cache without re-evaluating the execution plan.
    
    Actual Behavior:
    However, the cache is invalidated, and the content is re-read from the JDBC 
table.
    
    Root Cause:
    The issue lies in the `CacheData` class, where the comparison involves 
`JDBCTable`. The `JDBCTable` is a case class:
    
    `case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: 
JDBCOptions)`
    The comparison of non-case class components, such as `jdbcOptions`, 
involves pointer comparison. This leads to unnecessary cache invalidation.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add uts
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43258 from lyy-pineapple/spark-git-cache.
    
    Authored-by: liangyongyuan <liangyongy...@xiaomi.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit d073f2d3e2f67a4b612e020a583e23dc1fa63aab)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/execution/datasources/jdbc/JDBCOptions.scala      |  8 ++++++++
 .../datasources/v2/jdbc/JDBCTableCatalogSuite.scala       | 15 +++++++++++++++
 2 files changed, 23 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 268a65b81ff..57651684070 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -239,6 +239,14 @@ class JDBCOptions(
       .get(JDBC_PREFER_TIMESTAMP_NTZ)
       .map(_.toBoolean)
       .getOrElse(SQLConf.get.timestampType == TimestampNTZType)
+
+  override def hashCode: Int = this.parameters.hashCode()
+
+  override def equals(other: Any): Boolean = other match {
+    case otherOption: JDBCOptions =>
+      otherOption.parameters.equals(this.parameters)
+    case _ => false
+  }
 }
 
 class JdbcOptionsInWrite(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index 6b85911dca7..eed64b873c4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, 
Row}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -512,4 +513,18 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       assert(t.schema === replaced)
     }
   }
+
+  test("SPARK-45449: Cache Invalidation Issue with JDBC Table") {
+    withTable("h2.test.cache_t") {
+      withConnection { conn =>
+        conn.prepareStatement(
+          """CREATE TABLE "test"."cache_t" (id decimal(25) PRIMARY KEY NOT 
NULL,
+            |name TEXT(32) NOT NULL)""".stripMargin).executeUpdate()
+      }
+      sql("INSERT OVERWRITE h2.test.cache_t SELECT 1 AS id, 'a' AS name")
+      sql("CACHE TABLE t1 SELECT id, name FROM h2.test.cache_t")
+      val plan = sql("select * from t1").queryExecution.sparkPlan
+      assert(plan.isInstanceOf[InMemoryTableScanExec])
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to