This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e8bc176e6fd1 [SPARK-47342][SQL] Support TimestampNTZ for DB2 TIMESTAMP 
WITH TIME ZONE
e8bc176e6fd1 is described below

commit e8bc176e6fd145bab4cde6bf38931a7ad4c7eecd
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Mar 12 07:33:24 2024 -0700

    [SPARK-47342][SQL] Support TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE
    
    ### What changes were proposed in this pull request?
    
    This PR Supports TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE when 
`preferTimestampNTZ` option is set to true by users
    
    ### Why are the changes needed?
    
    improve DB2 connector
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, preferTimestampNTZ works for DB2 TIMESTAMP WITH TIME ZONE
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #45471 from yaooqinn/SPARK-47342.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala    | 14 ++++++++++++++
 .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala   | 10 ++++++++--
 .../main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala  |  2 +-
 .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala     |  7 +++++++
 .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala  | 13 +++++--------
 .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala   |  9 +++++++--
 6 files changed, 42 insertions(+), 13 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
index cedb33d491fb..14776047cec4 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
 
 import java.math.BigDecimal
 import java.sql.{Connection, Date, Timestamp}
+import java.time.LocalDateTime
 import java.util.Properties
 
 import org.scalatest.time.SpanSugar._
@@ -224,4 +225,17 @@ class DB2IntegrationSuite extends 
DockerJDBCIntegrationSuite {
 
     assert(actual === expected)
   }
+
+  test("SPARK-47342:gi Support TimestampNTZ for DB2 TIMESTAMP WITH TIME ZONE") 
{
+    // The test only covers TIMESTAMP WITHOUT TIME ZONE so far, we shall 
support
+    // TIMESTAMP WITH TIME ZONE but I don't figure it out to mock a TSTZ value.
+    withDefaultTimeZone(UTC) {
+      val df = spark.read.format("jdbc")
+        .option("url", jdbcUrl)
+        .option("preferTimestampNTZ", "true")
+        .option("query", "select ts from dates")
+        .load()
+      checkAnswer(df, Row(LocalDateTime.of(2009, 2, 13, 23, 31, 30)))
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index a7bbb832a839..27c032471b57 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -212,8 +212,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
     case java.sql.Types.SQLXML => StringType
     case java.sql.Types.STRUCT => StringType
     case java.sql.Types.TIME => TimestampType
-    case java.sql.Types.TIMESTAMP if isTimestampNTZ => TimestampNTZType
-    case java.sql.Types.TIMESTAMP => TimestampType
+    case java.sql.Types.TIMESTAMP => getTimestampType(isTimestampNTZ)
     case java.sql.Types.TINYINT => IntegerType
     case java.sql.Types.VARBINARY => BinaryType
     case java.sql.Types.VARCHAR if conf.charVarcharAsString => StringType
@@ -229,6 +228,13 @@ object JdbcUtils extends Logging with SQLConfHelper {
       throw QueryExecutionErrors.unrecognizedSqlTypeError(jdbcType, typeName)
   }
 
+  /**
+   * Return TimestampNTZType if isTimestampNT; otherwise TimestampType.
+   */
+  def getTimestampType(isTimestampNTZ: Boolean): DataType = {
+    if (isTimestampNTZ) TimestampNTZType else TimestampType
+  }
+
   /**
    * Returns the schema if the table already exists in the JDBC database.
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 62c31b1c4c5d..ff3e74eae205 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -91,7 +91,7 @@ private object DB2Dialect extends JdbcDialect {
       typeName match {
         case "DECFLOAT" => Option(DecimalType(38, 18))
         case "XML" => Option(StringType)
-        case t if (t.startsWith("TIMESTAMP")) => Option(TimestampType) // 
TIMESTAMP WITH TIMEZONE
+        case t if t.startsWith("TIMESTAMP") => 
Option(getTimestampType(md.build()))
         case _ => None
       }
     case _ => None
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 6621282647d4..6d67a0d91eae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -744,6 +744,13 @@ abstract class JdbcDialect extends Serializable with 
Logging {
   def getFullyQualifiedQuotedTableName(ident: Identifier): String = {
     (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
   }
+
+  /**
+   * Return TimestampType/TimestampNTZType based on the metadata.
+   */
+  protected final def getTimestampType(md: Metadata): DataType = {
+    JdbcUtils.getTimestampType(md.getBoolean("isTimestampNTZ"))
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index d19b5ba3e0eb..6852f1f69984 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -64,18 +64,15 @@ private object PostgresDialect extends JdbcDialect with 
SQLConfHelper {
     } else if ("text".equalsIgnoreCase(typeName)) {
       Some(StringType) // sqlType is  Types.VARCHAR
     } else if (sqlType == Types.ARRAY) {
-      val scale = md.build().getLong("scale").toInt
-      val isTimestampNTZ = md.build().getBoolean("isTimestampNTZ")
       // postgres array type names start with underscore
-      toCatalystType(typeName.drop(1), size, scale, 
isTimestampNTZ).map(ArrayType(_))
+      toCatalystType(typeName.drop(1), size, md.build()).map(ArrayType(_))
     } else None
   }
 
   private def toCatalystType(
       typeName: String,
       precision: Int,
-      scale: Int,
-      isTimestampNTZ: Boolean): Option[DataType] = typeName match {
+      metadata: Metadata): Option[DataType] = typeName match {
     case "bool" => Some(BooleanType)
     case "bit" => Some(BinaryType)
     case "int2" => Some(ShortType)
@@ -91,10 +88,10 @@ private object PostgresDialect extends JdbcDialect with 
SQLConfHelper {
          "interval" | "pg_snapshot" =>
       Some(StringType)
     case "bytea" => Some(BinaryType)
-    case "timestamp" | "timestamptz" | "time" | "timetz" =>
-      Some(if (isTimestampNTZ) TimestampNTZType else TimestampType)
+    case "timestamp" | "timestamptz" | "time" | "timetz" => 
Some(getTimestampType(metadata))
     case "date" => Some(DateType)
-    case "numeric" | "decimal" if precision > 0 => 
Some(DecimalType.bounded(precision, scale))
+    case "numeric" | "decimal" if precision > 0 =>
+      Some(DecimalType.bounded(precision, metadata.getLong("scale").toInt))
     case "numeric" | "decimal" =>
       // SPARK-26538: handle numeric without explicit precision and scale.
       Some(DecimalType.SYSTEM_DEFAULT)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b8ca70e0b175..47d60abd1dd4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -903,6 +903,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
 
   test("DB2Dialect type mapping") {
     val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
+    val metadata = new MetadataBuilder().putBoolean("isTimestampNTZ", false)
+
     
assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == 
"CLOB")
     
assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == 
"CHAR(1)")
     assert(db2Dialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get 
== "SMALLINT")
@@ -912,8 +914,11 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
     assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "DECFLOAT", 1, 
null) ==
       Option(DecimalType(38, 18)))
     assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "XML", 1, null) == 
Option(StringType))
-    assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "TIMESTAMP WITH 
TIME ZONE", 1, null) ==
-      Option(TimestampType))
+    assert(db2Dialect.getCatalystType(
+      java.sql.Types.OTHER, "TIMESTAMP WITH TIME ZONE", 1, metadata) === 
Option(TimestampType))
+    metadata.putBoolean("isTimestampNTZ", true)
+    assert(db2Dialect.getCatalystType(
+      java.sql.Types.OTHER, "TIMESTAMP WITH TIME ZONE", 1, metadata) === 
Option(TimestampNTZType))
   }
 
   test("MySQLDialect catalyst type mapping") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to