This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 00b7ca30740 [SPARK-42667][CONNECT] Spark Connect: newSession API 00b7ca30740 is described below commit 00b7ca3074094822b3b5b3da1b292c6d25dca220 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Fri Mar 3 21:45:18 2023 -0400 [SPARK-42667][CONNECT] Spark Connect: newSession API ### What changes were proposed in this pull request? This PR proposes an implementation of newSession API. The idea is we reuse user context(e.g. user_id), gRPC channel, etc. But differentiate different Spark Remote Session by client id, which is generated randomly. So this idea has the benefits of: 1. reusing gRPC channel to not over too manny connections to the server. 2. Each user can has multiple remote sessions, differentiated by client ids (or named session ids in server side). ### Why are the changes needed? API coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #40272 from amaliujia/new_session. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../org/apache/spark/sql/connect/client/SparkConnectClient.scala | 4 ++++ .../src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala | 6 ++++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index a8a88d63b1a..2b032b7cc8a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -344,7 +344,7 @@ class SparkSession private[sql] ( // scalastyle:on def newSession(): SparkSession = { - throw new UnsupportedOperationException("newSession is not supported") + SparkSession.builder().client(client.copy()).build() } private def range( diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index 599aab441de..8828a4a87e6 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -149,6 +149,10 @@ private[sql] class SparkConnectClient( analyze(request) } + def copy(): SparkConnectClient = { + new SparkConnectClient(userContext, channel, userAgent) + } + /** * Add a single artifact to the client session. * diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index ffbf3cee025..a3f1de55892 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -586,6 +586,12 @@ class ClientE2ETestSuite extends RemoteSparkSession { list.asScala.map(kv => Row(kv.key, kv.value)), session.createDataFrame(list.asScala.toSeq)) } + + test("SparkSession newSession") { + val oldId = spark.sql("SELECT 1").analyze.getClientId + val newId = spark.newSession().sql("SELECT 1").analyze.getClientId + assert(oldId != newId) + } } private[sql] case class MyType(id: Long, a: Double, b: Double) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org