This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new b2966d762721 [SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't reach server b2966d762721 is described below commit b2966d7627216845d6a1c3854077a02c6d4e84c5 Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Fri Aug 4 12:05:19 2023 +0900 [SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't reach server ### What changes were proposed in this pull request? If the ExecutePlan never reached the server, a ReattachExecute will fail with INVALID_HANDLE.OPERATION_NOT_FOUND. In that case, we could try to send ExecutePlan again. ### Why are the changes needed? This solves an edge case of reattachable execution where the initial execution never reached the server. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Testing these failures is difficult, will require some special testing setup Closes #42282 from juliuszsompolski/SPARK-44624-fix. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 52437bc73695e392bee60fbb340b6de4324b25d8) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../ExecutePlanResponseReattachableIterator.scala | 43 +++++++++++++++++----- .../sql/connect/client/GrpcRetryHandler.scala | 10 ++++- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index fc07deaa081f..41648c3c1004 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -20,7 +20,8 @@ import java.util.UUID import scala.util.control.NonFatal -import io.grpc.ManagedChannel +import io.grpc.{ManagedChannel, StatusRuntimeException} +import io.grpc.protobuf.StatusProto import io.grpc.stub.StreamObserver import org.apache.spark.connect.proto @@ -38,15 +39,12 @@ import org.apache.spark.internal.Logging * Initial iterator is the result of an ExecutePlan on the request, but it can be reattached with * ReattachExecute request. ReattachExecute request is provided the responseId of last returned * ExecutePlanResponse on the iterator to return a new iterator from server that continues after - * that. + * that. If the initial ExecutePlan did not even reach the server, and hence reattach fails with + * INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan. * * In reattachable execute the server does buffer some responses in case the client needs to * backtrack. To let server release this buffer sooner, this iterator asynchronously sends * ReleaseExecute RPCs that instruct the server to release responses that it already processed. - * - * Note: If the initial ExecutePlan did not even reach the server and execution didn't start, the - * ReattachExecute can still fail with INVALID_HANDLE.OPERATION_NOT_FOUND, failing the whole - * operation. */ class ExecutePlanResponseReattachableIterator( request: proto.ExecutePlanRequest, @@ -113,7 +111,7 @@ class ExecutePlanResponseReattachableIterator( // on retry, the iterator is borked, so we need a new one iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) } - iterator.next() + callIter(_.next()) } // Record last returned response, to know where to restart in case of reattach. @@ -146,7 +144,7 @@ class ExecutePlanResponseReattachableIterator( // on retry, the iterator is borked, so we need a new one iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) } - var hasNext = iterator.hasNext() + var hasNext = callIter(_.hasNext()) // Graceful reattach: // If iterator ended, but there was no ResultComplete, it means that there is more, // and we need to reattach. @@ -154,7 +152,7 @@ class ExecutePlanResponseReattachableIterator( do { iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) assert(!resultComplete) // shouldn't change... - hasNext = iterator.hasNext() + hasNext = callIter(_.hasNext()) // It's possible that the new iterator will be empty, so we need to loop to get another. // Eventually, there will be a non empty iterator, because there is always a // ResultComplete inserted by the server at the end of the stream. @@ -197,6 +195,33 @@ class ExecutePlanResponseReattachableIterator( } } + /** + * Call next() or hasNext() on the iterator. If this fails with this operationId not existing on + * the server, this means that the initial ExecutePlan request didn't even reach the server. In + * that case, attempt to start again with ExecutePlan. + * + * Called inside retry block, so retryable failure will get handled upstream. + */ + private def callIter[V](iterFun: java.util.Iterator[proto.ExecutePlanResponse] => V) = { + try { + iterFun(iterator) + } catch { + case ex: StatusRuntimeException + if StatusProto + .fromThrowable(ex) + .getMessage + .contains("INVALID_HANDLE.OPERATION_NOT_FOUND") => + if (lastReturnedResponseId.isDefined) { + throw new IllegalStateException( + "OPERATION_NOT_FOUND on the server but responses were already received from it.", + ex) + } + // Try a new ExecutePlan, and throw upstream for retry. + iterator = rawBlockingStub.executePlan(initialRequest) + throw new GrpcRetryHandler.RetryException + } + } + /** * Create result callback to the asynchronouse ReleaseExecute. The client does not block on * ReleaseExecute and continues with iteration, but if it fails with a retryable error, the diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala index ef446399f167..47ff975b2675 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala @@ -164,7 +164,9 @@ private[client] object GrpcRetryHandler extends Logging { try { return fn } catch { - case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries => + case NonFatal(e) + if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException]) + && currentRetryNum < retryPolicy.maxRetries => logWarning( s"Non fatal error during RPC execution: $e, " + s"retrying (currentRetryNum=$currentRetryNum)") @@ -209,4 +211,10 @@ private[client] object GrpcRetryHandler extends Logging { maxBackoff: FiniteDuration = FiniteDuration(1, "min"), backoffMultiplier: Double = 4.0, canRetry: Throwable => Boolean = retryException) {} + + /** + * An exception that can be thrown upstream when inside retry and which will be retryable + * regardless of policy. + */ + class RetryException extends Throwable } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org