This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 6cd45ab801c [SPARK-44421][CONNECT][FOLLOWUP] Minor comment improvements 6cd45ab801c is described below commit 6cd45ab801c5e39a05dd9ff760c67148bb067fa0 Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Wed Aug 2 12:49:11 2023 +0900 [SPARK-44421][CONNECT][FOLLOWUP] Minor comment improvements ### What changes were proposed in this pull request? Improve some comments about iterator retries. ### Why are the changes needed? Improve comments based on followup questions. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No code changes, only comment changes. Closes #42281 from juliuszsompolski/SPARK-44624-comment-only. Lead-authored-by: Juliusz Sompolski <ju...@databricks.com> Co-authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit c1c4a79ff1728dca0c1536b944c10d282eb13f9f) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../client/ExecutePlanResponseReattachableIterator.scala | 11 +++++++++-- .../apache/spark/sql/connect/client/GrpcRetryHandler.scala | 3 +++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index c6f75928a3a..00787b8f94d 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -40,8 +40,13 @@ import org.apache.spark.internal.Logging * ExecutePlanResponse on the iterator to return a new iterator from server that continues after * that. * - * Since in reattachable execute the server does buffer some responses in case the client needs to - * backtrack + * In reattachable execute the server does buffer some responses in case the client needs to + * backtrack. To let server release this buffer sooner, this iterator asynchronously sends + * ReleaseExecute RPCs that instruct the server to release responses that it already processed. + * + * Note: If the initial ExecutePlan did not even reach the server and execution didn't start, the + * ReattachExecute can still fail with INVALID_HANDLE.OPERATION_NOT_FOUND, failing the whole + * operation. */ class ExecutePlanResponseReattachableIterator( request: proto.ExecutePlanRequest, @@ -86,6 +91,8 @@ class ExecutePlanResponseReattachableIterator( private var responseComplete: Boolean = false // Initial iterator comes from ExecutePlan request. + // Note: This is not retried, because no error would ever be thrown here, and GRPC will only + // throw error on first iterator.hasNext() or iterator.next() private var iterator: java.util.Iterator[proto.ExecutePlanResponse] = rawBlockingStub.executePlan(initialRequest) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala index 16352bb90b5..ef446399f16 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala @@ -99,6 +99,9 @@ private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler extends StreamObserver[U] { private var opened = false // only retries on first call + + // Note: This is not retried, because no error would ever be thrown here, and GRPC will only + // throw error on first iterator.hasNext() or iterator.next() private var streamObserver = call(request) override def onNext(v: U): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org