This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 2bce5314da3 [SPARK-42688][CONNECT] Rename Connect proto Request client_id to session_id 2bce5314da3 is described below commit 2bce5314da32c1fac7a6025449cb786c9b5c233b Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Mon Mar 6 22:11:18 2023 -0400 [SPARK-42688][CONNECT] Rename Connect proto Request client_id to session_id ### What changes were proposed in this pull request? Rename Connect proto requests `client_id` to `session_id`. On the one hand when I read `client_id` I was confused on what it is used to, even after reading the proto documentation. On the other hand, client sides already use session_id: https://github.com/apache/spark/blob/9bf174f9722e34f13bfaede5e59f989bf2a511e9/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala#L51 https://github.com/apache/spark/blob/9bf174f9722e34f13bfaede5e59f989bf2a511e9/python/pyspark/sql/connect/client.py#L522 ### Why are the changes needed? Code readability ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Existing UT Closes #40309 from amaliujia/update_client_id. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit dfdc4a1d65e69ed88d43e17bd7325e9f8416c8e6) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../sql/connect/client/SparkConnectClient.scala | 6 +- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 4 +- .../connect/client/SparkConnectClientSuite.scala | 14 +- .../src/main/protobuf/spark/connect/base.proto | 41 +++-- .../sql/connect/planner/SparkConnectPlanner.scala | 10 +- .../service/SparkConnectAnalyzeHandler.scala | 4 +- .../service/SparkConnectConfigHandler.scala | 4 +- .../service/SparkConnectStreamHandler.scala | 24 +-- .../connect/planner/SparkConnectServiceSuite.scala | 2 +- python/pyspark/sql/connect/client.py | 22 +-- python/pyspark/sql/connect/proto/base_pb2.py | 196 ++++++++++----------- python/pyspark/sql/connect/proto/base_pb2.pyi | 98 ++++++----- python/pyspark/sql/tests/connect/test_client.py | 2 +- 13 files changed, 223 insertions(+), 204 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index 05aa191a4dd..348fc94bb89 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -63,7 +63,7 @@ private[sql] class SparkConnectClient( .newBuilder() .setPlan(plan) .setUserContext(userContext) - .setClientId(sessionId) + .setSessionId(sessionId) .setClientType(userAgent) .build() stub.executePlan(request) @@ -78,7 +78,7 @@ private[sql] class SparkConnectClient( val request = proto.ConfigRequest .newBuilder() .setOperation(operation) - .setClientId(sessionId) + .setSessionId(sessionId) .setClientType(userAgent) .setUserContext(userContext) .build() @@ -157,7 +157,7 @@ private[sql] class SparkConnectClient( private def analyze(builder: proto.AnalyzePlanRequest.Builder): proto.AnalyzePlanResponse = { val request = builder .setUserContext(userContext) - .setClientId(sessionId) + .setSessionId(sessionId) .setClientType(userAgent) .build() analyze(request) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 11e28f538e8..94bc22ef77d 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -612,8 +612,8 @@ class ClientE2ETestSuite extends RemoteSparkSession { } test("SparkSession newSession") { - val oldId = spark.sql("SELECT 1").analyze.getClientId - val newId = spark.newSession().sql("SELECT 1").analyze.getClientId + val oldId = spark.sql("SELECT 1").analyze.getSessionId + val newId = spark.newSession().sql("SELECT 1").analyze.getSessionId assert(oldId != newId) } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index dcb13589206..bc600e5a071 100755 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -75,11 +75,11 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { client = clientBuilder(server.getPort) val request = AnalyzePlanRequest .newBuilder() - .setClientId("abc123") + .setSessionId("abc123") .build() val response = client.analyze(request) - assert(response.getClientId === "abc123") + assert(response.getSessionId === "abc123") } test("Test connection") { @@ -99,7 +99,7 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { .connectionString(s"sc://localhost:${server.getPort}/;use_ssl=true") .build() - val request = AnalyzePlanRequest.newBuilder().setClientId("abc123").build() + val request = AnalyzePlanRequest.newBuilder().setSessionId("abc123").build() // Failed the ssl handshake as the dummy server does not have any server credentials installed. assertThrows[StatusRuntimeException] { @@ -201,11 +201,11 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer request: ExecutePlanRequest, responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { // Reply with a dummy response using the same client ID - val requestClientId = request.getClientId + val requestSessionId = request.getSessionId inputPlan = request.getPlan val response = ExecutePlanResponse .newBuilder() - .setClientId(requestClientId) + .setSessionId(requestSessionId) .build() responseObserver.onNext(response) responseObserver.onCompleted() @@ -215,7 +215,7 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer request: AnalyzePlanRequest, responseObserver: StreamObserver[AnalyzePlanResponse]): Unit = { // Reply with a dummy response using the same client ID - val requestClientId = request.getClientId + val requestSessionId = request.getSessionId request.getAnalyzeCase match { case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA => inputPlan = request.getSchema.getPlan @@ -233,7 +233,7 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer } val response = AnalyzePlanResponse .newBuilder() - .setClientId(requestClientId) + .setSessionId(requestSessionId) .build() responseObserver.onNext(response) responseObserver.onCompleted() diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index 2252d91c9ff..1a9c437f0ec 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -58,9 +58,10 @@ message UserContext { message AnalyzePlanRequest { // (Required) // - // The client_id is set by the client to be able to collate streaming responses from - // different queries. - string client_id = 1; + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + string session_id = 1; // (Required) User context UserContext user_context = 2; @@ -161,7 +162,7 @@ message AnalyzePlanRequest { // Response to performing analysis of the query. Contains relevant metadata to be able to // reason about the performance. message AnalyzePlanResponse { - string client_id = 1; + string session_id = 1; oneof result { Schema schema = 2; @@ -217,11 +218,15 @@ message AnalyzePlanResponse { message ExecutePlanRequest { // (Required) // - // The client_id is set by the client to be able to collate streaming responses from - // different queries. - string client_id = 1; + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + string session_id = 1; // (Required) User context + // + // user_context.user_id and session+id both identify a unique remote spark session on the + // server side. UserContext user_context = 2; // (Required) The logical plan to be executed / analyzed. @@ -234,9 +239,9 @@ message ExecutePlanRequest { } // The response of a query, can be one or more for each request. Responses belonging to the -// same input query, carry the same `client_id`. +// same input query, carry the same `session_id`. message ExecutePlanResponse { - string client_id = 1; + string session_id = 1; // Union type for the different response messages. oneof response_type { @@ -304,9 +309,10 @@ message KeyValue { message ConfigRequest { // (Required) // - // The client_id is set by the client to be able to collate streaming responses from - // different queries. - string client_id = 1; + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + string session_id = 1; // (Required) User context UserContext user_context = 2; @@ -369,7 +375,7 @@ message ConfigRequest { // Response to the config request. message ConfigResponse { - string client_id = 1; + string session_id = 1; // (Optional) The result key-value pairs. // @@ -386,9 +392,12 @@ message ConfigResponse { // Request to transfer client-local artifacts. message AddArtifactsRequest { - // The client_id is set by the client to be able to collate streaming responses from - // different queries. - string client_id = 1; + // (Required) + // + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + string session_id = 1; // User context UserContext user_context = 2; diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 60fb94e8098..8ca004d520c 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1459,7 +1459,7 @@ class SparkConnectPlanner(val session: SparkSession) { def process( command: proto.Command, - clientId: String, + sessionId: String, responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { command.getCommandTypeCase match { case proto.Command.CommandTypeCase.REGISTER_FUNCTION => @@ -1473,14 +1473,14 @@ class SparkConnectPlanner(val session: SparkSession) { case proto.Command.CommandTypeCase.EXTENSION => handleCommandPlugin(command.getExtension) case proto.Command.CommandTypeCase.SQL_COMMAND => - handleSqlCommand(command.getSqlCommand, clientId, responseObserver) + handleSqlCommand(command.getSqlCommand, sessionId, responseObserver) case _ => throw new UnsupportedOperationException(s"$command not supported.") } } def handleSqlCommand( getSqlCommand: SqlCommand, - clientId: String, + sessionId: String, responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { // Eagerly execute commands of the provided SQL string. val df = session.sql(getSqlCommand.getSql, getSqlCommand.getArgsMap) @@ -1537,12 +1537,12 @@ class SparkConnectPlanner(val session: SparkSession) { responseObserver.onNext( ExecutePlanResponse .newBuilder() - .setClientId(clientId) + .setSessionId(sessionId) .setSqlCommandResult(result) .build()) // Send Metrics - SparkConnectStreamHandler.sendMetricsToResponse(clientId, df) + SparkConnectStreamHandler.sendMetricsToResponse(sessionId, df) } private def handleRegisterUserDefinedFunction( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala index e3d4da66a08..9520ec8015f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala @@ -35,7 +35,7 @@ private[connect] class SparkConnectAnalyzeHandler( def handle(request: proto.AnalyzePlanRequest): Unit = { val session = SparkConnectService - .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId) + .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId) .session session.withActive { val response = process(request, session) @@ -155,7 +155,7 @@ private[connect] class SparkConnectAnalyzeHandler( case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!") } - builder.setClientId(request.getClientId) + builder.setSessionId(request.getSessionId) builder.build() } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala index 84f625222a8..38fd88297f3 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala @@ -32,7 +32,7 @@ class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigRes def handle(request: proto.ConfigRequest): Unit = { val session = SparkConnectService - .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId) + .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId) .session val builder = request.getOperation.getOpTypeCase match { @@ -53,7 +53,7 @@ class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigRes case _ => throw new UnsupportedOperationException(s"${request.getOperation} not supported.") } - builder.setClientId(request.getClientId) + builder.setSessionId(request.getSessionId) responseObserver.onNext(builder.build()) responseObserver.onCompleted() } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index 15d5a981ae8..0dd1741f099 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -44,7 +44,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp def handle(v: ExecutePlanRequest): Unit = { val session = SparkConnectService - .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getClientId) + .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId) .session session.withActive { v.getPlan.getOpTypeCase match { @@ -60,12 +60,12 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp // Extract the plan from the request and convert it to a logical plan val planner = new SparkConnectPlanner(session) val dataframe = Dataset.ofRows(session, planner.transformRelation(request.getPlan.getRoot)) - processAsArrowBatches(request.getClientId, dataframe, responseObserver) + processAsArrowBatches(request.getSessionId, dataframe, responseObserver) responseObserver.onNext( - SparkConnectStreamHandler.sendMetricsToResponse(request.getClientId, dataframe)) + SparkConnectStreamHandler.sendMetricsToResponse(request.getSessionId, dataframe)) if (dataframe.queryExecution.observedMetrics.nonEmpty) { responseObserver.onNext( - SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getClientId, dataframe)) + SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getSessionId, dataframe)) } responseObserver.onCompleted() } @@ -73,7 +73,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp private def handleCommand(session: SparkSession, request: ExecutePlanRequest): Unit = { val command = request.getPlan.getCommand val planner = new SparkConnectPlanner(session) - planner.process(command, request.getClientId, responseObserver) + planner.process(command, request.getSessionId, responseObserver) responseObserver.onCompleted() } } @@ -96,7 +96,7 @@ object SparkConnectStreamHandler { } def processAsArrowBatches( - clientId: String, + sessionId: String, dataframe: DataFrame, responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { val spark = dataframe.sparkSession @@ -173,7 +173,7 @@ object SparkConnectStreamHandler { } partition.foreach { case (bytes, count) => - val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId) + val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId) val batch = proto.ExecutePlanResponse.ArrowBatch .newBuilder() .setRowCount(count) @@ -191,7 +191,7 @@ object SparkConnectStreamHandler { // Make sure at least 1 batch will be sent. if (numSent == 0) { val bytes = ArrowConverters.createEmptyArrowBatch(schema, timeZoneId) - val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId) + val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId) val batch = proto.ExecutePlanResponse.ArrowBatch .newBuilder() .setRowCount(0L) @@ -203,17 +203,17 @@ object SparkConnectStreamHandler { } } - def sendMetricsToResponse(clientId: String, rows: DataFrame): ExecutePlanResponse = { + def sendMetricsToResponse(sessionId: String, rows: DataFrame): ExecutePlanResponse = { // Send a last batch with the metrics ExecutePlanResponse .newBuilder() - .setClientId(clientId) + .setSessionId(sessionId) .setMetrics(MetricGenerator.buildMetrics(rows.queryExecution.executedPlan)) .build() } def sendObservedMetricsToResponse( - clientId: String, + sessionId: String, dataframe: DataFrame): ExecutePlanResponse = { val observedMetrics = dataframe.queryExecution.observedMetrics.map { case (name, row) => val cols = (0 until row.length).map(i => toConnectProtoValue(row(i))) @@ -226,7 +226,7 @@ object SparkConnectStreamHandler { // Prepare a response with the observed metrics. ExecutePlanResponse .newBuilder() - .setClientId(clientId) + .setSessionId(sessionId) .addAllObservedMetrics(observedMetrics.asJava) .build() } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index 2885d0035bc..e2aecaaea86 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -221,7 +221,7 @@ class SparkConnectServiceSuite extends SharedSparkSession { .newBuilder() .setPlan(plan) .setUserContext(context) - .setClientId("session") + .setSessionId("session") .build() // The observer is executed inside this thread. So diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index 2594640aa3e..8c85f17bb5f 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -712,7 +712,7 @@ class SparkConnectClient(object): def _execute_plan_request_with_metadata(self) -> pb2.ExecutePlanRequest: req = pb2.ExecutePlanRequest() - req.client_id = self._session_id + req.session_id = self._session_id req.client_type = self._builder.userAgent if self._user_id: req.user_context.user_id = self._user_id @@ -720,7 +720,7 @@ class SparkConnectClient(object): def _analyze_plan_request_with_metadata(self) -> pb2.AnalyzePlanRequest: req = pb2.AnalyzePlanRequest() - req.client_id = self._session_id + req.session_id = self._session_id req.client_type = self._builder.userAgent if self._user_id: req.user_context.user_id = self._user_id @@ -791,10 +791,10 @@ class SparkConnectClient(object): ): with attempt: resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata()) - if resp.client_id != self._session_id: + if resp.session_id != self._session_id: raise SparkConnectException( "Received incorrect session identifier for request:" - f"{resp.client_id} != {self._session_id}" + f"{resp.session_id} != {self._session_id}" ) return AnalyzeResult.fromProto(resp) raise SparkConnectException("Invalid state during retry exception handling.") @@ -818,10 +818,10 @@ class SparkConnectClient(object): ): with attempt: for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()): - if b.client_id != self._session_id: + if b.session_id != self._session_id: raise SparkConnectException( "Received incorrect session identifier for request: " - f"{b.client_id} != {self._session_id}" + f"{b.session_id} != {self._session_id}" ) except grpc.RpcError as rpc_error: self._handle_error(rpc_error) @@ -842,10 +842,10 @@ class SparkConnectClient(object): with attempt: batches = [] for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()): - if b.client_id != self._session_id: + if b.session_id != self._session_id: raise SparkConnectException( "Received incorrect session identifier for request: " - f"{b.client_id} != {self._session_id}" + f"{b.session_id} != {self._session_id}" ) if b.metrics is not None: logger.debug("Received metric batch.") @@ -878,7 +878,7 @@ class SparkConnectClient(object): def _config_request_with_metadata(self) -> pb2.ConfigRequest: req = pb2.ConfigRequest() - req.client_id = self._session_id + req.session_id = self._session_id req.client_type = self._builder.userAgent if self._user_id: req.user_context.user_id = self._user_id @@ -905,10 +905,10 @@ class SparkConnectClient(object): ): with attempt: resp = self._stub.Config(req, metadata=self._builder.metadata()) - if resp.client_id != self._session_id: + if resp.session_id != self._session_id: raise SparkConnectException( "Received incorrect session identifier for request:" - f"{resp.client_id} != {self._session_id}" + f"{resp.session_id} != {self._session_id}" ) return ConfigResult.fromProto(resp) raise SparkConnectException("Invalid state during retry exception handling.") diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py index 6d41ce28c7c..c67e58b44cd 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.py +++ b/python/pyspark/sql/connect/proto/base_pb2.py @@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...] + b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...] ) @@ -620,101 +620,101 @@ if _descriptor._USE_C_DESCRIPTORS == False: _USERCONTEXT._serialized_start = 309 _USERCONTEXT._serialized_end = 431 _ANALYZEPLANREQUEST._serialized_start = 434 - _ANALYZEPLANREQUEST._serialized_end = 2089 - _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1295 - _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1344 - _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1347 - _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1662 - _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1490 - _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1662 - _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1664 - _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1717 - _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1719 - _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1769 - _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1771 - _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1825 - _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1827 - _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1880 - _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1882 - _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1896 - _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1898 - _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1939 - _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1941 - _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2062 - _ANALYZEPLANRESPONSE._serialized_start = 2092 - _ANALYZEPLANRESPONSE._serialized_end = 3294 - _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2862 - _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2919 - _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2921 - _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2969 - _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2971 - _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3016 - _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3018 - _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3054 - _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3056 - _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3104 - _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3106 - _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3140 - _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3142 - _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3182 - _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3184 - _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3243 - _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3245 - _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3284 - _EXECUTEPLANREQUEST._serialized_start = 3297 - _EXECUTEPLANREQUEST._serialized_end = 3504 - _EXECUTEPLANRESPONSE._serialized_start = 3507 - _EXECUTEPLANRESPONSE._serialized_end = 4731 - _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3962 - _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4033 - _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4035 - _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4096 - _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4099 - _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4616 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4194 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4526 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4403 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4526 - _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4528 - _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4616 - _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4618 - _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4714 - _KEYVALUE._serialized_start = 4733 - _KEYVALUE._serialized_end = 4798 - _CONFIGREQUEST._serialized_start = 4801 - _CONFIGREQUEST._serialized_end = 5827 - _CONFIGREQUEST_OPERATION._serialized_start = 5019 - _CONFIGREQUEST_OPERATION._serialized_end = 5517 - _CONFIGREQUEST_SET._serialized_start = 5519 - _CONFIGREQUEST_SET._serialized_end = 5571 - _CONFIGREQUEST_GET._serialized_start = 5573 - _CONFIGREQUEST_GET._serialized_end = 5598 - _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5600 - _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5663 - _CONFIGREQUEST_GETOPTION._serialized_start = 5665 - _CONFIGREQUEST_GETOPTION._serialized_end = 5696 - _CONFIGREQUEST_GETALL._serialized_start = 5698 - _CONFIGREQUEST_GETALL._serialized_end = 5746 - _CONFIGREQUEST_UNSET._serialized_start = 5748 - _CONFIGREQUEST_UNSET._serialized_end = 5775 - _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5777 - _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5811 - _CONFIGRESPONSE._serialized_start = 5829 - _CONFIGRESPONSE._serialized_end = 5949 - _ADDARTIFACTSREQUEST._serialized_start = 5952 - _ADDARTIFACTSREQUEST._serialized_end = 6767 - _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6299 - _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6352 - _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6354 - _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6465 - _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6467 - _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6560 - _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6563 - _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6756 - _ADDARTIFACTSRESPONSE._serialized_start = 6770 - _ADDARTIFACTSRESPONSE._serialized_end = 6958 - _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6877 - _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6958 - _SPARKCONNECTSERVICE._serialized_start = 6961 - _SPARKCONNECTSERVICE._serialized_end = 7326 + _ANALYZEPLANREQUEST._serialized_end = 2091 + _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1297 + _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1346 + _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1349 + _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1664 + _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1492 + _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1664 + _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1666 + _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1719 + _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1721 + _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1771 + _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1773 + _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1827 + _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1829 + _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1882 + _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1884 + _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1898 + _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1900 + _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1941 + _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1943 + _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2064 + _ANALYZEPLANRESPONSE._serialized_start = 2094 + _ANALYZEPLANRESPONSE._serialized_end = 3298 + _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2866 + _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2923 + _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2925 + _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2973 + _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2975 + _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3020 + _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3022 + _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3058 + _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3060 + _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3108 + _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3110 + _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3144 + _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3146 + _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3186 + _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3188 + _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3247 + _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3249 + _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3288 + _EXECUTEPLANREQUEST._serialized_start = 3301 + _EXECUTEPLANREQUEST._serialized_end = 3510 + _EXECUTEPLANRESPONSE._serialized_start = 3513 + _EXECUTEPLANRESPONSE._serialized_end = 4739 + _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3970 + _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4041 + _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4043 + _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4104 + _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4107 + _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4624 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4202 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4534 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4411 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4534 + _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4536 + _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4624 + _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4626 + _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4722 + _KEYVALUE._serialized_start = 4741 + _KEYVALUE._serialized_end = 4806 + _CONFIGREQUEST._serialized_start = 4809 + _CONFIGREQUEST._serialized_end = 5837 + _CONFIGREQUEST_OPERATION._serialized_start = 5029 + _CONFIGREQUEST_OPERATION._serialized_end = 5527 + _CONFIGREQUEST_SET._serialized_start = 5529 + _CONFIGREQUEST_SET._serialized_end = 5581 + _CONFIGREQUEST_GET._serialized_start = 5583 + _CONFIGREQUEST_GET._serialized_end = 5608 + _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5610 + _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5673 + _CONFIGREQUEST_GETOPTION._serialized_start = 5675 + _CONFIGREQUEST_GETOPTION._serialized_end = 5706 + _CONFIGREQUEST_GETALL._serialized_start = 5708 + _CONFIGREQUEST_GETALL._serialized_end = 5756 + _CONFIGREQUEST_UNSET._serialized_start = 5758 + _CONFIGREQUEST_UNSET._serialized_end = 5785 + _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5787 + _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5821 + _CONFIGRESPONSE._serialized_start = 5839 + _CONFIGRESPONSE._serialized_end = 5961 + _ADDARTIFACTSREQUEST._serialized_start = 5964 + _ADDARTIFACTSREQUEST._serialized_end = 6781 + _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6313 + _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6366 + _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6368 + _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6479 + _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6481 + _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6574 + _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6577 + _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6770 + _ADDARTIFACTSRESPONSE._serialized_start = 6784 + _ADDARTIFACTSRESPONSE._serialized_end = 6972 + _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6891 + _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6972 + _SPARKCONNECTSERVICE._serialized_start = 6975 + _SPARKCONNECTSERVICE._serialized_end = 7340 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi index 2e9a877b658..e87194f31aa 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.pyi +++ b/python/pyspark/sql/connect/proto/base_pb2.pyi @@ -350,7 +350,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message): ], ) -> None: ... - CLIENT_ID_FIELD_NUMBER: builtins.int + SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int SCHEMA_FIELD_NUMBER: builtins.int @@ -362,11 +362,12 @@ class AnalyzePlanRequest(google.protobuf.message.Message): SPARK_VERSION_FIELD_NUMBER: builtins.int DDL_PARSE_FIELD_NUMBER: builtins.int SAME_SEMANTICS_FIELD_NUMBER: builtins.int - client_id: builtins.str + session_id: builtins.str """(Required) - The client_id is set by the client to be able to collate streaming responses from - different queries. + The session_id specifies a spark session for a user id (which is specified + by user_context.user_id). The session_id is set by the client to be able to + collate streaming responses from different queries within the dedicated session. """ @property def user_context(self) -> global___UserContext: @@ -397,7 +398,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message): def __init__( self, *, - client_id: builtins.str = ..., + session_id: builtins.str = ..., user_context: global___UserContext | None = ..., client_type: builtins.str | None = ..., schema: global___AnalyzePlanRequest.Schema | None = ..., @@ -448,8 +449,6 @@ class AnalyzePlanRequest(google.protobuf.message.Message): b"_client_type", "analyze", b"analyze", - "client_id", - b"client_id", "client_type", b"client_type", "ddl_parse", @@ -466,6 +465,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message): b"same_semantics", "schema", b"schema", + "session_id", + b"session_id", "spark_version", b"spark_version", "tree_string", @@ -638,7 +639,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message): self, field_name: typing_extensions.Literal["result", b"result"] ) -> None: ... - CLIENT_ID_FIELD_NUMBER: builtins.int + SESSION_ID_FIELD_NUMBER: builtins.int SCHEMA_FIELD_NUMBER: builtins.int EXPLAIN_FIELD_NUMBER: builtins.int TREE_STRING_FIELD_NUMBER: builtins.int @@ -648,7 +649,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message): SPARK_VERSION_FIELD_NUMBER: builtins.int DDL_PARSE_FIELD_NUMBER: builtins.int SAME_SEMANTICS_FIELD_NUMBER: builtins.int - client_id: builtins.str + session_id: builtins.str @property def schema(self) -> global___AnalyzePlanResponse.Schema: ... @property @@ -670,7 +671,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message): def __init__( self, *, - client_id: builtins.str = ..., + session_id: builtins.str = ..., schema: global___AnalyzePlanResponse.Schema | None = ..., explain: global___AnalyzePlanResponse.Explain | None = ..., tree_string: global___AnalyzePlanResponse.TreeString | None = ..., @@ -709,8 +710,6 @@ class AnalyzePlanResponse(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ - "client_id", - b"client_id", "ddl_parse", b"ddl_parse", "explain", @@ -727,6 +726,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message): b"same_semantics", "schema", b"schema", + "session_id", + b"session_id", "spark_version", b"spark_version", "tree_string", @@ -754,19 +755,24 @@ class ExecutePlanRequest(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - CLIENT_ID_FIELD_NUMBER: builtins.int + SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int PLAN_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int - client_id: builtins.str + session_id: builtins.str """(Required) - The client_id is set by the client to be able to collate streaming responses from - different queries. + The session_id specifies a spark session for a user id (which is specified + by user_context.user_id). The session_id is set by the client to be able to + collate streaming responses from different queries within the dedicated session. """ @property def user_context(self) -> global___UserContext: - """(Required) User context""" + """(Required) User context + + user_context.user_id and session+id both identify a unique remote spark session on the + server side. + """ @property def plan(self) -> global___Plan: """(Required) The logical plan to be executed / analyzed.""" @@ -778,7 +784,7 @@ class ExecutePlanRequest(google.protobuf.message.Message): def __init__( self, *, - client_id: builtins.str = ..., + session_id: builtins.str = ..., user_context: global___UserContext | None = ..., plan: global___Plan | None = ..., client_type: builtins.str | None = ..., @@ -801,12 +807,12 @@ class ExecutePlanRequest(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "_client_type", b"_client_type", - "client_id", - b"client_id", "client_type", b"client_type", "plan", b"plan", + "session_id", + b"session_id", "user_context", b"user_context", ], @@ -819,7 +825,7 @@ global___ExecutePlanRequest = ExecutePlanRequest class ExecutePlanResponse(google.protobuf.message.Message): """The response of a query, can be one or more for each request. Responses belonging to the - same input query, carry the same `client_id`. + same input query, carry the same `session_id`. """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -995,13 +1001,13 @@ class ExecutePlanResponse(google.protobuf.message.Message): self, field_name: typing_extensions.Literal["name", b"name", "values", b"values"] ) -> None: ... - CLIENT_ID_FIELD_NUMBER: builtins.int + SESSION_ID_FIELD_NUMBER: builtins.int ARROW_BATCH_FIELD_NUMBER: builtins.int SQL_COMMAND_RESULT_FIELD_NUMBER: builtins.int EXTENSION_FIELD_NUMBER: builtins.int METRICS_FIELD_NUMBER: builtins.int OBSERVED_METRICS_FIELD_NUMBER: builtins.int - client_id: builtins.str + session_id: builtins.str @property def arrow_batch(self) -> global___ExecutePlanResponse.ArrowBatch: ... @property @@ -1025,7 +1031,7 @@ class ExecutePlanResponse(google.protobuf.message.Message): def __init__( self, *, - client_id: builtins.str = ..., + session_id: builtins.str = ..., arrow_batch: global___ExecutePlanResponse.ArrowBatch | None = ..., sql_command_result: global___ExecutePlanResponse.SqlCommandResult | None = ..., extension: google.protobuf.any_pb2.Any | None = ..., @@ -1053,8 +1059,6 @@ class ExecutePlanResponse(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "arrow_batch", b"arrow_batch", - "client_id", - b"client_id", "extension", b"extension", "metrics", @@ -1063,6 +1067,8 @@ class ExecutePlanResponse(google.protobuf.message.Message): b"observed_metrics", "response_type", b"response_type", + "session_id", + b"session_id", "sql_command_result", b"sql_command_result", ], @@ -1310,15 +1316,16 @@ class ConfigRequest(google.protobuf.message.Message): ) -> None: ... def ClearField(self, field_name: typing_extensions.Literal["keys", b"keys"]) -> None: ... - CLIENT_ID_FIELD_NUMBER: builtins.int + SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int OPERATION_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int - client_id: builtins.str + session_id: builtins.str """(Required) - The client_id is set by the client to be able to collate streaming responses from - different queries. + The session_id specifies a spark session for a user id (which is specified + by user_context.user_id). The session_id is set by the client to be able to + collate streaming responses from different queries within the dedicated session. """ @property def user_context(self) -> global___UserContext: @@ -1334,7 +1341,7 @@ class ConfigRequest(google.protobuf.message.Message): def __init__( self, *, - client_id: builtins.str = ..., + session_id: builtins.str = ..., user_context: global___UserContext | None = ..., operation: global___ConfigRequest.Operation | None = ..., client_type: builtins.str | None = ..., @@ -1357,12 +1364,12 @@ class ConfigRequest(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "_client_type", b"_client_type", - "client_id", - b"client_id", "client_type", b"client_type", "operation", b"operation", + "session_id", + b"session_id", "user_context", b"user_context", ], @@ -1378,10 +1385,10 @@ class ConfigResponse(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - CLIENT_ID_FIELD_NUMBER: builtins.int + SESSION_ID_FIELD_NUMBER: builtins.int PAIRS_FIELD_NUMBER: builtins.int WARNINGS_FIELD_NUMBER: builtins.int - client_id: builtins.str + session_id: builtins.str @property def pairs( self, @@ -1402,14 +1409,14 @@ class ConfigResponse(google.protobuf.message.Message): def __init__( self, *, - client_id: builtins.str = ..., + session_id: builtins.str = ..., pairs: collections.abc.Iterable[global___KeyValue] | None = ..., warnings: collections.abc.Iterable[builtins.str] | None = ..., ) -> None: ... def ClearField( self, field_name: typing_extensions.Literal[ - "client_id", b"client_id", "pairs", b"pairs", "warnings", b"warnings" + "pairs", b"pairs", "session_id", b"session_id", "warnings", b"warnings" ], ) -> None: ... @@ -1546,14 +1553,17 @@ class AddArtifactsRequest(google.protobuf.message.Message): ], ) -> None: ... - CLIENT_ID_FIELD_NUMBER: builtins.int + SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int BATCH_FIELD_NUMBER: builtins.int BEGIN_CHUNK_FIELD_NUMBER: builtins.int CHUNK_FIELD_NUMBER: builtins.int - client_id: builtins.str - """The client_id is set by the client to be able to collate streaming responses from - different queries. + session_id: builtins.str + """(Required) + + The session_id specifies a spark session for a user id (which is specified + by user_context.user_id). The session_id is set by the client to be able to + collate streaming responses from different queries within the dedicated session. """ @property def user_context(self) -> global___UserContext: @@ -1574,7 +1584,7 @@ class AddArtifactsRequest(google.protobuf.message.Message): def __init__( self, *, - client_id: builtins.str = ..., + session_id: builtins.str = ..., user_context: global___UserContext | None = ..., batch: global___AddArtifactsRequest.Batch | None = ..., begin_chunk: global___AddArtifactsRequest.BeginChunkedArtifact | None = ..., @@ -1604,10 +1614,10 @@ class AddArtifactsRequest(google.protobuf.message.Message): b"begin_chunk", "chunk", b"chunk", - "client_id", - b"client_id", "payload", b"payload", + "session_id", + b"session_id", "user_context", b"user_context", ], diff --git a/python/pyspark/sql/tests/connect/test_client.py b/python/pyspark/sql/tests/connect/test_client.py index 84281a6764f..6131e146363 100644 --- a/python/pyspark/sql/tests/connect/test_client.py +++ b/python/pyspark/sql/tests/connect/test_client.py @@ -64,7 +64,7 @@ class MockService: def ExecutePlan(self, req: proto.ExecutePlanRequest, metadata): self.req = req resp = proto.ExecutePlanResponse() - resp.client_id = self._session_id + resp.session_id = self._session_id pdf = pd.DataFrame(data={"col1": [1, 2]}) schema = pa.Schema.from_pandas(pdf) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org