This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 99db66afaf9 [SPARK-42561][CONNECT] Add temp view API to Dataset 99db66afaf9 is described below commit 99db66afaf98043e4dd5c20ccea4fcde0b43946e Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Sat Feb 25 14:04:26 2023 -0400 [SPARK-42561][CONNECT] Add temp view API to Dataset ### What changes were proposed in this pull request? Add temp view API to Dataset ### Why are the changes needed? API coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #40167 from amaliujia/add_temp_view_api. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 710e7de08723a67eac86f8f802fcfcf70ef5039c) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 94 ++++++++++++++++++++++ .../scala/org/apache/spark/sql/SparkSession.scala | 6 ++ .../org/apache/spark/sql/ClientE2ETestSuite.scala | 11 +++ 3 files changed, 111 insertions(+) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 83ed7bdc071..b8481cbe158 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1906,6 +1906,100 @@ class Dataset[T] private[sql] (val sparkSession: SparkSession, private[sql] val } } + /** + * Registers this Dataset as a temporary table using the given name. The lifetime of this + * temporary table is tied to the [[SparkSession]] that was used to create this Dataset. + * + * @group basic + * @since 3.4.0 + */ + @deprecated("Use createOrReplaceTempView(viewName) instead.", "3.4.0") + def registerTempTable(tableName: String): Unit = { + createOrReplaceTempView(tableName) + } + + /** + * Creates a local temporary view using the given name. The lifetime of this temporary view is + * tied to the [[SparkSession]] that was used to create this Dataset. + * + * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that + * created it, i.e. it will be automatically dropped when the session terminates. It's not tied + * to any databases, i.e. we can't use `db1.view1` to reference a local temporary view. + * + * @throws AnalysisException + * if the view name is invalid or already exists + * + * @group basic + * @since 3.4.0 + */ + @throws[AnalysisException] + def createTempView(viewName: String): Unit = { + buildAndExecuteTempView(viewName, replace = false, global = false) + } + + /** + * Creates a local temporary view using the given name. The lifetime of this temporary view is + * tied to the [[SparkSession]] that was used to create this Dataset. + * + * @group basic + * @since 3.4.0 + */ + def createOrReplaceTempView(viewName: String): Unit = { + buildAndExecuteTempView(viewName, replace = true, global = false) + } + + /** + * Creates a global temporary view using the given name. The lifetime of this temporary view is + * tied to this Spark application. + * + * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark + * application, + * i.e. it will be automatically dropped when the application terminates. It's tied to a system + * preserved database `global_temp`, and we must use the qualified name to refer a global temp + * view, e.g. `SELECT * FROM global_temp.view1`. + * + * @throws AnalysisException + * if the view name is invalid or already exists + * + * @group basic + * @since 3.4.0 + */ + @throws[AnalysisException] + def createGlobalTempView(viewName: String): Unit = { + buildAndExecuteTempView(viewName, replace = false, global = true) + } + + /** + * Creates or replaces a global temporary view using the given name. The lifetime of this + * temporary view is tied to this Spark application. + * + * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark + * application, + * i.e. it will be automatically dropped when the application terminates. It's tied to a system + * preserved database `global_temp`, and we must use the qualified name to refer a global temp + * view, e.g. `SELECT * FROM global_temp.view1`. + * + * @group basic + * @since 3.4.0 + */ + def createOrReplaceGlobalTempView(viewName: String): Unit = { + buildAndExecuteTempView(viewName, replace = true, global = true) + } + + private def buildAndExecuteTempView( + viewName: String, + replace: Boolean, + global: Boolean): Unit = { + val command = session.newCommand { builder => + builder.getCreateDataframeViewBuilder + .setInput(plan.getRoot) + .setName(viewName) + .setIsGlobal(global) + .setReplace(replace) + } + session.execute(command) + } + /** * Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column * name. diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index a63c23e6bf1..53cd3955232 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -192,6 +192,12 @@ class SparkSession( new Dataset[T](this, plan) } + private[sql] def newCommand[T](f: proto.Command.Builder => Unit): proto.Command = { + val builder = proto.Command.newBuilder() + f(builder) + builder.build() + } + private[sql] def analyze( plan: proto.Plan, mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse = diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 7077d8c94e8..23a5d5f5e9e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -406,4 +406,15 @@ class ClientE2ETestSuite extends RemoteSparkSession { val joined = left.join(right, left("id") === right("id")).select(left("id"), right("a")) assert(joined.schema.catalogString === "struct<id:bigint,a:double>") } + + test("test temp view") { + spark.range(100).createTempView("test1") + assert(spark.sql("SELECT * FROM test1").count() == 100) + spark.range(1000).createOrReplaceTempView("test1") + assert(spark.sql("SELECT * FROM test1").count() == 1000) + spark.range(100).createGlobalTempView("view1") + assert(spark.sql("SELECT * FROM global_temp.view1").count() == 100) + spark.range(1000).createOrReplaceGlobalTempView("view1") + assert(spark.sql("SELECT * FROM global_temp.view1").count() == 1000) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org