juliuszsompolski commented on code in PR #42274:
URL: https://github.com/apache/spark/pull/42274#discussion_r1281269604


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -151,6 +153,82 @@ class ExecutePlanResponseReattachableIterator(
     }
   }
 
+  /**
+   * Get a new iterator to the execution by using ReattachExecute.
+   * However, if this fails with this operationId not existing on the server, 
this means that
+   * the initial ExecutePlan request didn't even reach the server. In that 
case, attempt to start
+   * again with ExecutePlan.
+   *
+   * Called inside retry block, so retryable failure will get handled upstream.
+   *
+   * Note: From empirical observation even if one would expect an immediate 
error from the GRPC,
+   * but one is only thrown from the first iterator.next() or 
iterator.hasNext() call.
+   * However, in case this is a GRPC quirk that cannot be relied upon, check 
the error here.
+   */
+  private def reattach(): java.util.Iterator[proto.ExecutePlanResponse] = {
+    try {
+      rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+    } catch {
+      case ex: StatusRuntimeException if 
StatusProto.fromThrowable(ex).getMessage
+        .contains("INVALID_HANDLE.OPERATION_NOT_FOUND") =>
+      if (lastReturnedResponseId.isDefined) {
+        throw new IllegalStateException(
+          "OPERATION_NOT_FOUND on the server but responses were already 
received from it.", ex)
+      }
+      // We use the helper that will check if OPERATION_ALREADY_EXISTS out of 
abundance in case a
+      // situation in which some earlier lost ExecutePlan actually reached the 
server is possible.
+      execute()
+    }
+  }
+
+  /**
+   * Start the execution by using ExecutePlan.
+   * However, if this fails with this operationId already existing on the 
server, it means that
+   * a previous try has in fact reached the server. In that case, try to 
reattach to the execution
+   * instead.
+   *
+   * Note: From empirical observation even if one would expect an immediate 
error from the GRPC,
+   * but one is only thrown from the first iterator.next() or 
iterator.hasNext() call.
+   * However, in case this is a GRPC quirk that cannot be relied upon, check 
the error here.
+   */
+  private def execute(): java.util.Iterator[proto.ExecutePlanResponse] = {
+    try {
+      rawBlockingStub.executePlan(initialRequest)
+    } catch {
+      case ex: StatusRuntimeException
+      if StatusProto.fromThrowable(ex).getMessage
+        .contains("INVALID_HANDLE.OPERATION_ALREADY_EXISTS") =>
+      // we just checked that OPERATION_ALREADY_EXISTS, so we don't need to 
use the helper that
+      // would check if OPERATION_NOT_FOUND.
+      rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+    }
+  }
+
+  /**
+   * Call next() or hasNext() on the iterator.
+   * If this fails with this operationId not existing on the server, this 
means that
+   * the initial ExecutePlan request didn't even reach the server. In that 
case, attempt to start
+   * again with ExecutePlan.
+   *
+   * Called inside retry block, so retryable failure will get handled upstream.
+   */
+  private def callIter[V](iterFun: 
java.util.Iterator[proto.ExecutePlanResponse] => V) = {
+    try {
+      iterFun(iterator)
+    } catch {
+      case ex: StatusRuntimeException
+        if StatusProto.fromThrowable(ex).getMessage
+        .contains("INVALID_HANDLE.OPERATION_NOT_FOUND") =>

Review Comment:
   one can ask what if the operation ran on the server, and already got removed 
and evicted because the client didn't get to it.
   I'm taking this into account in 
https://issues.apache.org/jira/browse/SPARK-44625 with a graveyard of 
tombstones of such evicted operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to