This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 81a7f8f184c [SPARK-45449][SQL] Cache Invalidation Issue with JDBC Table 81a7f8f184c is described below commit 81a7f8f184cd597208fcad72130354288a0c9f79 Author: liangyongyuan <liangyongy...@xiaomi.com> AuthorDate: Tue Oct 10 14:40:33 2023 +0800 [SPARK-45449][SQL] Cache Invalidation Issue with JDBC Table ### What changes were proposed in this pull request? Add an equals method to `JDBCOptions` that considers two instances equal if their `JDBCOptions.parameters` are the same. ### Why are the changes needed? We have identified a cache invalidation issue when caching JDBC tables in Spark SQL. The cached table is unexpectedly invalidated when queried, leading to a re-read from the JDBC table instead of retrieving data from the cache. Example SQL: ``` CACHE TABLE cache_t SELECT * FROM mysql.test.test1; SELECT * FROM cache_t; ``` Expected Behavior: The expectation is that querying the cached table (cache_t) should retrieve the result from the cache without re-evaluating the execution plan. Actual Behavior: However, the cache is invalidated, and the content is re-read from the JDBC table. Root Cause: The issue lies in the `CacheData` class, where the comparison involves `JDBCTable`. The `JDBCTable` is a case class: `case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions)` The comparison of non-case class components, such as `jdbcOptions`, involves pointer comparison. This leads to unnecessary cache invalidation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add uts ### Was this patch authored or co-authored using generative AI tooling? No Closes #43258 from lyy-pineapple/spark-git-cache. Authored-by: liangyongyuan <liangyongy...@xiaomi.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit d073f2d3e2f67a4b612e020a583e23dc1fa63aab) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 8 ++++++++ .../datasources/v2/jdbc/JDBCTableCatalogSuite.scala | 15 +++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 268a65b81ff..57651684070 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -239,6 +239,14 @@ class JDBCOptions( .get(JDBC_PREFER_TIMESTAMP_NTZ) .map(_.toBoolean) .getOrElse(SQLConf.get.timestampType == TimestampNTZType) + + override def hashCode: Int = this.parameters.hashCode() + + override def equals(other: Any): Boolean = other match { + case otherOption: JDBCOptions => + otherOption.parameters.equals(this.parameters) + case _ => false + } } class JdbcOptionsInWrite( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 6b85911dca7..eed64b873c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -512,4 +513,18 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { assert(t.schema === replaced) } } + + test("SPARK-45449: Cache Invalidation Issue with JDBC Table") { + withTable("h2.test.cache_t") { + withConnection { conn => + conn.prepareStatement( + """CREATE TABLE "test"."cache_t" (id decimal(25) PRIMARY KEY NOT NULL, + |name TEXT(32) NOT NULL)""".stripMargin).executeUpdate() + } + sql("INSERT OVERWRITE h2.test.cache_t SELECT 1 AS id, 'a' AS name") + sql("CACHE TABLE t1 SELECT id, name FROM h2.test.cache_t") + val plan = sql("select * from t1").queryExecution.sparkPlan + assert(plan.isInstanceOf[InMemoryTableScanExec]) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org