This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f13743de04e [SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error f13743de04e is described below commit f13743de04e430e59c4eaeca464447608bd32b1d Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Thu Sep 7 10:48:28 2023 +0900 [SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error ### What changes were proposed in this pull request? Make INVALID_CURSOR.DISCONNECTED a retriable error. ### Why are the changes needed? This error can happen if two RPCs are racing to reattach to the query, and the client is still using the losing one. SPARK-44833 was a bug that exposed such a situation. That was fixed, but to be more robust, we can make this error retryable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tests will be added in https://github.com/apache/spark/pull/42560 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42818 from juliuszsompolski/SPARK-44835. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/connect/client/GrpcRetryHandler.scala | 17 +++++++++++- python/pyspark/sql/connect/client/core.py | 31 +++++++++++++++++++--- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala index a6841e7f118..8791530607c 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala @@ -217,7 +217,22 @@ private[sql] object GrpcRetryHandler extends Logging { */ private[client] def retryException(e: Throwable): Boolean = { e match { - case e: StatusRuntimeException => e.getStatus.getCode == Status.Code.UNAVAILABLE + case e: StatusRuntimeException => + val statusCode: Status.Code = e.getStatus.getCode + + if (statusCode == Status.Code.INTERNAL) { + val msg: String = e.toString + + // This error happens if another RPC preempts this RPC. + if (msg.contains("INVALID_CURSOR.DISCONNECTED")) { + return true + } + } + + if (statusCode == Status.Code.UNAVAILABLE) { + return true + } + false case _ => false } } diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 1e439b8c0f6..e8d598bd0fe 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -585,11 +585,36 @@ class SparkConnectClient(object): @classmethod def retry_exception(cls, e: Exception) -> bool: - if isinstance(e, grpc.RpcError): - return e.code() == grpc.StatusCode.UNAVAILABLE - else: + """ + Helper function that is used to identify if an exception thrown by the server + can be retried or not. + + Parameters + ---------- + e : Exception + The GRPC error as received from the server. Typed as Exception, because other exception + thrown during client processing can be passed here as well. + + Returns + ------- + True if the exception can be retried, False otherwise. + + """ + if not isinstance(e, grpc.RpcError): return False + if e.code() in [grpc.StatusCode.INTERNAL]: + msg = str(e) + + # This error happens if another RPC preempts this RPC. + if "INVALID_CURSOR.DISCONNECTED" in msg: + return True + + if e.code() == grpc.StatusCode.UNAVAILABLE: + return True + + return False + def __init__( self, connection: Union[str, ChannelBuilder], --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org