juliuszsompolski commented on code in PR #43664:
URL: https://github.com/apache/spark/pull/43664#discussion_r1382963356


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -920,6 +958,12 @@ message FetchErrorDetailsResponse {
 
   // A list of errors.
   repeated Error errors = 2;
+
+  // Server-side generated idempotency key that the client can use to assert 
that the server side
+  // session has not changed.
+  string server_side_session_id = 3;
+
+  string session_id = 4;

Review Comment:
   nit: place it at the beginning of the response, like in others (I would put 
it even above the nested msgs)



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -803,9 +836,13 @@ message ReleaseSessionRequest {
   optional string client_type = 3;
 }
 
+// Next ID: 3
 message ReleaseSessionResponse {
   // Session id of the session on which the release executed.
   string session_id = 1;
+  // Server-side generated idempotency key that the client can use to assert 
that the server side
+  // session has not changed.
+  string server_side_session_id = 2;

Review Comment:
   in this special case, should it be optional, because it will not be set when 
the session doesn't exist?
   will your wrapper handle that?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. 
The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
+  // Server side session ID, used to detect if the server side session 
changed. This is set upon
+  // receiving the first response from the server. This value is used only for 
executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None
+
+  protected def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): 
RespT = {
+    val response = fn
+    val field = 
response.getDescriptorForType.findFieldByName("server_side_session_id")
+    // If the field does not exist, we ignore this for now.

Review Comment:
   Maybe log a log warning.
   
   Better don't put "for now" in comment unless there's an actionable followup 
(when do we plan to do something about it?).



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -97,6 +97,10 @@ class ExecutePlanResponseReattachableIterator(
   private[connect] var iter: 
Option[java.util.Iterator[proto.ExecutePlanResponse]] =
     Some(rawBlockingStub.executePlan(initialRequest))
 
+  // Server side session ID, used to detect if the server side session 
changed. This is set upon
+  // receiving the first response from the server.
+  private var serverSideSessionId: Option[String] = None

Review Comment:
   technically, a restarted server will have no information of the operation, 
so trying to reattach will have failed with INVALID_HANDLE.OPERATION_NOT_FOUND.
   But it doesn't hurt.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. 
The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {

Review Comment:
   nit: move to it's own file
   
   Also, it's not very different in what it does from RetryHandler, and 
GrpcExceptionConverter. Why make this one a superclass, while others are 
members?
   Why not make it a member ResponseValidator object and have 
responseValidator.wrapIterator and responseValidator.validateResponse instead 
of superclass?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client
 
 import scala.jdk.CollectionConverters._
 
+import com.google.protobuf.GeneratedMessageV3
 import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto._
 
+// This is common logic shared between the blocking and non-blocking stubs.
+//
+// The common logic is responsible to verify the integrity of the response. 
The invariant is
+// that the same stub instance is used for all requests from the same client.
+private[client] class SparkConnectCommonStub {
+  // Server side session ID, used to detect if the server side session 
changed. This is set upon
+  // receiving the first response from the server. This value is used only for 
executions that
+  // do not use server-side streaming.
+  private var serverSideSessionId: Option[String] = None

Review Comment:
   this will be different in blocking and non-blocking stub, technically 
possibly leading to the two operating with different ids. Should there be a 
common stub-state object for that?
   
   If you make it a ResponseValidator object, it can there share this state.
   
   Why not in general have the CustomSparkConnectCommonStub object be a 
container that holds instances of ResponseValidator, RetryHandler and 
GrpcExceptionConverter that can be shared between the blocking and non-blocking 
stub?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to