This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 99db66afaf9 [SPARK-42561][CONNECT] Add temp view API to Dataset
99db66afaf9 is described below

commit 99db66afaf98043e4dd5c20ccea4fcde0b43946e
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Sat Feb 25 14:04:26 2023 -0400

    [SPARK-42561][CONNECT] Add temp view API to Dataset
    
    ### What changes were proposed in this pull request?
    
    Add temp view API to Dataset
    
    ### Why are the changes needed?
    
    API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #40167 from amaliujia/add_temp_view_api.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 710e7de08723a67eac86f8f802fcfcf70ef5039c)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 94 ++++++++++++++++++++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |  6 ++
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  | 11 +++
 3 files changed, 111 insertions(+)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 83ed7bdc071..b8481cbe158 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1906,6 +1906,100 @@ class Dataset[T] private[sql] (val sparkSession: 
SparkSession, private[sql] val
     }
   }
 
+  /**
+   * Registers this Dataset as a temporary table using the given name. The 
lifetime of this
+   * temporary table is tied to the [[SparkSession]] that was used to create 
this Dataset.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  @deprecated("Use createOrReplaceTempView(viewName) instead.", "3.4.0")
+  def registerTempTable(tableName: String): Unit = {
+    createOrReplaceTempView(tableName)
+  }
+
+  /**
+   * Creates a local temporary view using the given name. The lifetime of this 
temporary view is
+   * tied to the [[SparkSession]] that was used to create this Dataset.
+   *
+   * Local temporary view is session-scoped. Its lifetime is the lifetime of 
the session that
+   * created it, i.e. it will be automatically dropped when the session 
terminates. It's not tied
+   * to any databases, i.e. we can't use `db1.view1` to reference a local 
temporary view.
+   *
+   * @throws AnalysisException
+   *   if the view name is invalid or already exists
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]
+  def createTempView(viewName: String): Unit = {
+    buildAndExecuteTempView(viewName, replace = false, global = false)
+  }
+
+  /**
+   * Creates a local temporary view using the given name. The lifetime of this 
temporary view is
+   * tied to the [[SparkSession]] that was used to create this Dataset.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def createOrReplaceTempView(viewName: String): Unit = {
+    buildAndExecuteTempView(viewName, replace = true, global = false)
+  }
+
+  /**
+   * Creates a global temporary view using the given name. The lifetime of 
this temporary view is
+   * tied to this Spark application.
+   *
+   * Global temporary view is cross-session. Its lifetime is the lifetime of 
the Spark
+   * application,
+   * i.e. it will be automatically dropped when the application terminates. 
It's tied to a system
+   * preserved database `global_temp`, and we must use the qualified name to 
refer a global temp
+   * view, e.g. `SELECT * FROM global_temp.view1`.
+   *
+   * @throws AnalysisException
+   *   if the view name is invalid or already exists
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]
+  def createGlobalTempView(viewName: String): Unit = {
+    buildAndExecuteTempView(viewName, replace = false, global = true)
+  }
+
+  /**
+   * Creates or replaces a global temporary view using the given name. The 
lifetime of this
+   * temporary view is tied to this Spark application.
+   *
+   * Global temporary view is cross-session. Its lifetime is the lifetime of 
the Spark
+   * application,
+   * i.e. it will be automatically dropped when the application terminates. 
It's tied to a system
+   * preserved database `global_temp`, and we must use the qualified name to 
refer a global temp
+   * view, e.g. `SELECT * FROM global_temp.view1`.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def createOrReplaceGlobalTempView(viewName: String): Unit = {
+    buildAndExecuteTempView(viewName, replace = true, global = true)
+  }
+
+  private def buildAndExecuteTempView(
+      viewName: String,
+      replace: Boolean,
+      global: Boolean): Unit = {
+    val command = session.newCommand { builder =>
+      builder.getCreateDataframeViewBuilder
+        .setInput(plan.getRoot)
+        .setName(viewName)
+        .setIsGlobal(global)
+        .setReplace(replace)
+    }
+    session.execute(command)
+  }
+
   /**
    * Returns a new Dataset with a column dropped. This is a no-op if schema 
doesn't contain column
    * name.
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a63c23e6bf1..53cd3955232 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -192,6 +192,12 @@ class SparkSession(
     new Dataset[T](this, plan)
   }
 
+  private[sql] def newCommand[T](f: proto.Command.Builder => Unit): 
proto.Command = {
+    val builder = proto.Command.newBuilder()
+    f(builder)
+    builder.build()
+  }
+
   private[sql] def analyze(
       plan: proto.Plan,
       mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse =
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 7077d8c94e8..23a5d5f5e9e 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -406,4 +406,15 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     val joined = left.join(right, left("id") === 
right("id")).select(left("id"), right("a"))
     assert(joined.schema.catalogString === "struct<id:bigint,a:double>")
   }
+
+  test("test temp view") {
+    spark.range(100).createTempView("test1")
+    assert(spark.sql("SELECT * FROM test1").count() == 100)
+    spark.range(1000).createOrReplaceTempView("test1")
+    assert(spark.sql("SELECT * FROM test1").count() == 1000)
+    spark.range(100).createGlobalTempView("view1")
+    assert(spark.sql("SELECT * FROM global_temp.view1").count() == 100)
+    spark.range(1000).createOrReplaceGlobalTempView("view1")
+    assert(spark.sql("SELECT * FROM global_temp.view1").count() == 1000)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to