This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d17e9a1fb [SPARK-44833][CONNECT] Fix sending Reattach too fast after 
Execute
e4d17e9a1fb is described below

commit e4d17e9a1fb64454a6a007171837d159633e91fb
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Wed Sep 6 14:21:47 2023 +0900

    [SPARK-44833][CONNECT] Fix sending Reattach too fast after Execute
    
    ### What changes were proposed in this pull request?
    
    Redo the retry logic, so that getting a new iterator via ReattachExecute 
does not depend on "firstTry", but there is logic in "callIter" with unsetting 
the iterator when a new one is needed.
    
    ### Why are the changes needed?
    
    After an "INVALID_HANDLE.OPERATION_NOT_FOUND" error, client would realize 
that the failure in ReattachExecute was because the initial ExecutePlan didn't 
reach the server. It would then call another ExecutePlan, and it will throw a 
RetryException to let the retry logic handle retrying. However, the retry logic 
would then immediately send a ReattachExecute, and the client will want to use 
the iterator of the reattach.
    
    However, on the server the ExecutePlan and ReattachExecute could race with 
each other:
    * ExecutePlan didn't reach 
executeHolder.runGrpcResponseSender(responseSender) in 
SparkConnectExecutePlanHandler yet.
    * ReattachExecute races around and reaches 
executeHolder.runGrpcResponseSender(responseSender) in 
SparkConnectReattachExecuteHandler first.
    * When ExecutePlan reaches 
executeHolder.runGrpcResponseSender(responseSender), and 
executionObserver.attachConsumer(this) is called in ExecuteGrpcResponseSender 
of ExecutePlan, it will kick out the ExecuteGrpcResponseSender of 
ReattachExecute.
    
    So even though ReattachExecute came later, it will get interrupted by the 
earlier ExecutePlan and finish with a INVALID_CURSOR.DISCONNECTED error.
    
    After this change, such a race between ExecutePlan / ReattachExecute can 
still happens, but the client should no longer send these requests in such 
quick succession.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Integration testing.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42806 from juliuszsompolski/SPARK-44833.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../ExecutePlanResponseReattachableIterator.scala  | 33 ++++++++--------------
 python/pyspark/sql/connect/client/reattach.py      | 31 ++++++++++----------
 2 files changed, 28 insertions(+), 36 deletions(-)

diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index aeb452faecf..9bf7de33da8 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -91,8 +91,8 @@ class ExecutePlanResponseReattachableIterator(
   // Initial iterator comes from ExecutePlan request.
   // Note: This is not retried, because no error would ever be thrown here, 
and GRPC will only
   // throw error on first iter.hasNext() or iter.next()
-  private var iter: java.util.Iterator[proto.ExecutePlanResponse] =
-    rawBlockingStub.executePlan(initialRequest)
+  private var iter: Option[java.util.Iterator[proto.ExecutePlanResponse]] =
+    Some(rawBlockingStub.executePlan(initialRequest))
 
   override def next(): proto.ExecutePlanResponse = synchronized {
     // hasNext will trigger reattach in case the stream completed without 
resultComplete
@@ -102,15 +102,7 @@ class ExecutePlanResponseReattachableIterator(
 
     try {
       // Get next response, possibly triggering reattach in case of stream 
error.
-      var firstTry = true
       val ret = retry {
-        if (firstTry) {
-          // on first try, we use the existing iter.
-          firstTry = false
-        } else {
-          // on retry, the iter is borked, so we need a new one
-          iter = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
-        }
         callIter(_.next())
       }
 
@@ -134,23 +126,15 @@ class ExecutePlanResponseReattachableIterator(
       // After response complete response
       return false
     }
-    var firstTry = true
     try {
       retry {
-        if (firstTry) {
-          // on first try, we use the existing iter.
-          firstTry = false
-        } else {
-          // on retry, the iter is borked, so we need a new one
-          iter = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
-        }
         var hasNext = callIter(_.hasNext())
         // Graceful reattach:
         // If iter ended, but there was no ResultComplete, it means that there 
is more,
         // and we need to reattach.
         if (!hasNext && !resultComplete) {
           do {
-            iter = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+            iter = None // unset iterator for new ReattachExecute to be called 
in _call_iter
             assert(!resultComplete) // shouldn't change...
             hasNext = callIter(_.hasNext())
             // It's possible that the new iter will be empty, so we need to 
loop to get another.
@@ -208,7 +192,10 @@ class ExecutePlanResponseReattachableIterator(
    */
   private def callIter[V](iterFun: 
java.util.Iterator[proto.ExecutePlanResponse] => V) = {
     try {
-      iterFun(iter)
+      if (iter.isEmpty) {
+        iter = 
Some(rawBlockingStub.reattachExecute(createReattachExecuteRequest()))
+      }
+      iterFun(iter.get)
     } catch {
       case ex: StatusRuntimeException
           if Option(StatusProto.fromThrowable(ex))
@@ -219,8 +206,12 @@ class ExecutePlanResponseReattachableIterator(
             ex)
         }
         // Try a new ExecutePlan, and throw upstream for retry.
-        iter = rawBlockingStub.executePlan(initialRequest)
+        iter = Some(rawBlockingStub.executePlan(initialRequest))
         throw new GrpcRetryHandler.RetryException
+      case NonFatal(e) =>
+        // Remove the iterator, so that a new one will be created after retry.
+        iter = None
+        throw e
     }
   }
 
diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index c6b1beaa121..d3765fb6696 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -131,15 +131,6 @@ class ExecutePlanResponseReattachableIterator(Generator):
                     can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
                 ):
                     with attempt:
-                        # on first try, we use the existing iterator.
-                        if not attempt.is_first_try():
-                            # on retry, the iterator is borked, so we need a 
new one
-                            self._iterator = iter(
-                                self._stub.ReattachExecute(
-                                    self._create_reattach_execute_request(), 
metadata=self._metadata
-                                )
-                            )
-
                         if self._current is None:
                             try:
                                 self._current = self._call_iter(lambda: 
next(self._iterator))
@@ -154,12 +145,8 @@ class ExecutePlanResponseReattachableIterator(Generator):
                         # arrive, we keep reattaching.
                         if not self._result_complete and not has_next:
                             while not has_next:
-                                self._iterator = iter(
-                                    self._stub.ReattachExecute(
-                                        
self._create_reattach_execute_request(),
-                                        metadata=self._metadata,
-                                    )
-                                )
+                                # unset iterator for new ReattachExecute to be 
called in _call_iter
+                                self._iterator = None
                                 # shouldn't change
                                 assert not self._result_complete
                                 try:
@@ -238,6 +225,14 @@ class ExecutePlanResponseReattachableIterator(Generator):
 
         Called inside retry block, so retryable failure will get handled 
upstream.
         """
+        if self._iterator is None:
+            # we get a new iterator with ReattachExecute if it was unset.
+            self._iterator = iter(
+                self._stub.ReattachExecute(
+                    self._create_reattach_execute_request(), 
metadata=self._metadata
+                )
+            )
+
         try:
             return iter_fun()
         except grpc.RpcError as e:
@@ -255,7 +250,13 @@ class ExecutePlanResponseReattachableIterator(Generator):
                 )
                 raise RetryException()
             else:
+                # Remove the iterator, so that a new one will be created after 
retry.
+                self._iterator = None
                 raise e
+        except Exception as e:
+            # Remove the iterator, so that a new one will be created after 
retry.
+            self._iterator = None
+            raise e
 
     def _create_reattach_execute_request(self) -> pb2.ReattachExecuteRequest:
         reattach = pb2.ReattachExecuteRequest(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to