juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1382963356
##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -920,6 +958,12 @@ message FetchErrorDetailsResponse {
// A list of errors.
repeated Error errors = 2;
+
+ // Server-side generated idempotency key that the client can use to assert
that the server side
+ // session has not changed.
+ string server_side_session_id = 3;
+
+ string session_id = 4;
Review Comment:
nit: place it at the beginning of the response, like in others (I would put
it even above the nested msgs)
##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -803,9 +836,13 @@ message ReleaseSessionRequest {
optional string client_type = 3;
}
+// Next ID: 3
message ReleaseSessionResponse {
// Session id of the session on which the release executed.
string session_id = 1;
+ // Server-side generated idempotency key that the client can use to assert
that the server side
+ // session has not changed.
+ string server_side_session_id = 2;
Review Comment:
in this special case, should it be optional, because it will not be set when
the session doesn't exist?
will your wrapper handle that?
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
import scala.jdk.CollectionConverters._
+import com.google.protobuf.GeneratedMessageV3
import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto._
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response.
The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
+ // Server side session ID, used to detect if the server side session
changed. This is set upon
+ // receiving the first response from the server. This value is used only for
executions that
+ // do not use server-side streaming.
+ private var serverSideSessionId: Option[String] = None
+
+ protected def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT):
RespT = {
+ val response = fn
+ val field =
response.getDescriptorForType.findFieldByName("server_side_session_id")
+ // If the field does not exist, we ignore this for now.
Review Comment:
Maybe log a log warning.
Better don't put "for now" in comment unless there's an actionable followup
(when do we plan to do something about it?).
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -97,6 +97,10 @@ class ExecutePlanResponseReattachableIterator(
private[connect] var iter:
Option[java.util.Iterator[proto.ExecutePlanResponse]] =
Some(rawBlockingStub.executePlan(initialRequest))
+ // Server side session ID, used to detect if the server side session
changed. This is set upon
+ // receiving the first response from the server.
+ private var serverSideSessionId: Option[String] = None
Review Comment:
technically, a restarted server will have no information of the operation,
so trying to reattach will have failed with INVALID_HANDLE.OPERATION_NOT_FOUND.
But it doesn't hurt.
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
import scala.jdk.CollectionConverters._
+import com.google.protobuf.GeneratedMessageV3
import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto._
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response.
The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
Review Comment:
nit: move to it's own file
Also, it's not very different in what it does from RetryHandler, and
GrpcExceptionConverter. Why make this one a superclass, while others are
members?
Why not make it a member ResponseValidator object and have
responseValidator.wrapIterator and responseValidator.validateResponse instead
of superclass?
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
import scala.jdk.CollectionConverters._
+import com.google.protobuf.GeneratedMessageV3
import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto._
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response.
The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
+ // Server side session ID, used to detect if the server side session
changed. This is set upon
+ // receiving the first response from the server. This value is used only for
executions that
+ // do not use server-side streaming.
+ private var serverSideSessionId: Option[String] = None
Review Comment:
this will be different in blocking and non-blocking stub, technically
possibly leading to the two operating with different ids. Should there be a
common stub-state object for that?
If you make it a ResponseValidator object, it can there share this state.
Why not in general have the CustomSparkConnectCommonStub object be a
container that holds instances of ResponseValidator, RetryHandler and
GrpcExceptionConverter that can be shared between the blocking and non-blocking
stub?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]