This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 04558fc90fe [SPARK-44833][CONNECT] Fix sending Reattach too fast after Execute 04558fc90fe is described below commit 04558fc90fe2df2d2791c98c5f7b15ee26e250eb Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Wed Sep 6 14:21:47 2023 +0900 [SPARK-44833][CONNECT] Fix sending Reattach too fast after Execute ### What changes were proposed in this pull request? Redo the retry logic, so that getting a new iterator via ReattachExecute does not depend on "firstTry", but there is logic in "callIter" with unsetting the iterator when a new one is needed. ### Why are the changes needed? After an "INVALID_HANDLE.OPERATION_NOT_FOUND" error, client would realize that the failure in ReattachExecute was because the initial ExecutePlan didn't reach the server. It would then call another ExecutePlan, and it will throw a RetryException to let the retry logic handle retrying. However, the retry logic would then immediately send a ReattachExecute, and the client will want to use the iterator of the reattach. However, on the server the ExecutePlan and ReattachExecute could race with each other: * ExecutePlan didn't reach executeHolder.runGrpcResponseSender(responseSender) in SparkConnectExecutePlanHandler yet. * ReattachExecute races around and reaches executeHolder.runGrpcResponseSender(responseSender) in SparkConnectReattachExecuteHandler first. * When ExecutePlan reaches executeHolder.runGrpcResponseSender(responseSender), and executionObserver.attachConsumer(this) is called in ExecuteGrpcResponseSender of ExecutePlan, it will kick out the ExecuteGrpcResponseSender of ReattachExecute. So even though ReattachExecute came later, it will get interrupted by the earlier ExecutePlan and finish with a INVALID_CURSOR.DISCONNECTED error. After this change, such a race between ExecutePlan / ReattachExecute can still happens, but the client should no longer send these requests in such quick succession. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Integration testing. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42806 from juliuszsompolski/SPARK-44833. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit e4d17e9a1fb64454a6a007171837d159633e91fb) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../ExecutePlanResponseReattachableIterator.scala | 33 ++++++++-------------- python/pyspark/sql/connect/client/reattach.py | 31 ++++++++++---------- 2 files changed, 28 insertions(+), 36 deletions(-) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index aeb452faecf..9bf7de33da8 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -91,8 +91,8 @@ class ExecutePlanResponseReattachableIterator( // Initial iterator comes from ExecutePlan request. // Note: This is not retried, because no error would ever be thrown here, and GRPC will only // throw error on first iter.hasNext() or iter.next() - private var iter: java.util.Iterator[proto.ExecutePlanResponse] = - rawBlockingStub.executePlan(initialRequest) + private var iter: Option[java.util.Iterator[proto.ExecutePlanResponse]] = + Some(rawBlockingStub.executePlan(initialRequest)) override def next(): proto.ExecutePlanResponse = synchronized { // hasNext will trigger reattach in case the stream completed without resultComplete @@ -102,15 +102,7 @@ class ExecutePlanResponseReattachableIterator( try { // Get next response, possibly triggering reattach in case of stream error. - var firstTry = true val ret = retry { - if (firstTry) { - // on first try, we use the existing iter. - firstTry = false - } else { - // on retry, the iter is borked, so we need a new one - iter = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) - } callIter(_.next()) } @@ -134,23 +126,15 @@ class ExecutePlanResponseReattachableIterator( // After response complete response return false } - var firstTry = true try { retry { - if (firstTry) { - // on first try, we use the existing iter. - firstTry = false - } else { - // on retry, the iter is borked, so we need a new one - iter = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) - } var hasNext = callIter(_.hasNext()) // Graceful reattach: // If iter ended, but there was no ResultComplete, it means that there is more, // and we need to reattach. if (!hasNext && !resultComplete) { do { - iter = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) + iter = None // unset iterator for new ReattachExecute to be called in _call_iter assert(!resultComplete) // shouldn't change... hasNext = callIter(_.hasNext()) // It's possible that the new iter will be empty, so we need to loop to get another. @@ -208,7 +192,10 @@ class ExecutePlanResponseReattachableIterator( */ private def callIter[V](iterFun: java.util.Iterator[proto.ExecutePlanResponse] => V) = { try { - iterFun(iter) + if (iter.isEmpty) { + iter = Some(rawBlockingStub.reattachExecute(createReattachExecuteRequest())) + } + iterFun(iter.get) } catch { case ex: StatusRuntimeException if Option(StatusProto.fromThrowable(ex)) @@ -219,8 +206,12 @@ class ExecutePlanResponseReattachableIterator( ex) } // Try a new ExecutePlan, and throw upstream for retry. - iter = rawBlockingStub.executePlan(initialRequest) + iter = Some(rawBlockingStub.executePlan(initialRequest)) throw new GrpcRetryHandler.RetryException + case NonFatal(e) => + // Remove the iterator, so that a new one will be created after retry. + iter = None + throw e } } diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py index c6b1beaa121..d3765fb6696 100644 --- a/python/pyspark/sql/connect/client/reattach.py +++ b/python/pyspark/sql/connect/client/reattach.py @@ -131,15 +131,6 @@ class ExecutePlanResponseReattachableIterator(Generator): can_retry=SparkConnectClient.retry_exception, **self._retry_policy ): with attempt: - # on first try, we use the existing iterator. - if not attempt.is_first_try(): - # on retry, the iterator is borked, so we need a new one - self._iterator = iter( - self._stub.ReattachExecute( - self._create_reattach_execute_request(), metadata=self._metadata - ) - ) - if self._current is None: try: self._current = self._call_iter(lambda: next(self._iterator)) @@ -154,12 +145,8 @@ class ExecutePlanResponseReattachableIterator(Generator): # arrive, we keep reattaching. if not self._result_complete and not has_next: while not has_next: - self._iterator = iter( - self._stub.ReattachExecute( - self._create_reattach_execute_request(), - metadata=self._metadata, - ) - ) + # unset iterator for new ReattachExecute to be called in _call_iter + self._iterator = None # shouldn't change assert not self._result_complete try: @@ -238,6 +225,14 @@ class ExecutePlanResponseReattachableIterator(Generator): Called inside retry block, so retryable failure will get handled upstream. """ + if self._iterator is None: + # we get a new iterator with ReattachExecute if it was unset. + self._iterator = iter( + self._stub.ReattachExecute( + self._create_reattach_execute_request(), metadata=self._metadata + ) + ) + try: return iter_fun() except grpc.RpcError as e: @@ -255,7 +250,13 @@ class ExecutePlanResponseReattachableIterator(Generator): ) raise RetryException() else: + # Remove the iterator, so that a new one will be created after retry. + self._iterator = None raise e + except Exception as e: + # Remove the iterator, so that a new one will be created after retry. + self._iterator = None + raise e def _create_reattach_execute_request(self) -> pb2.ReattachExecuteRequest: reattach = pb2.ReattachExecuteRequest( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org