juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251818998
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -79,7 +107,11 @@ private[sql] class SparkConnectClient(
.setSessionId(sessionId)
.setClientType(userAgent)
.build()
- stub.executePlan(request)
+ retry {
+ val result = stub.executePlan(request)
+ result.hasNext // moves evaluation of BlockingResponseStream to
SparkConnectClient
Review Comment:
could you elaborate on why this is needed? I believe this `.hasNext` can
block for quite a while until the first response comes back on the stream.
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +606,33 @@ object SparkConnectClient {
}
}
}
+
+ private[client] def retryException(e: Throwable): Boolean = {
+ if (e.isInstanceOf[StatusRuntimeException]) {
+ e.asInstanceOf[StatusRuntimeException].getStatus().getCode() ==
Status.Code.UNAVAILABLE
+ } else {
+ false
+ }
+ }
+
+ /**
+ * [[RetryParameters]] configure the retry mechanism in
[[SparkConnectClient]]
+ *
+ * @param max_retries
+ * Maximum number of retries.
+ * @param initial_backoff
+ * Start value of the exponential backoff (ms).
+ * @param max_backoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoff_multiplier
+ * Multiplicative base of the exponential backoff.
+ * @param should_retry
+ * Function that determines whether a retry is to be performed in the
event of an error.
+ */
+ private[client] case class RetryParameters(
+ max_retries: Int = 15,
Review Comment:
use camelCase instead of snake_case for parameters.
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +606,33 @@ object SparkConnectClient {
}
}
}
+
+ private[client] def retryException(e: Throwable): Boolean = {
+ if (e.isInstanceOf[StatusRuntimeException]) {
+ e.asInstanceOf[StatusRuntimeException].getStatus().getCode() ==
Status.Code.UNAVAILABLE
+ } else {
+ false
+ }
+ }
+
+ /**
+ * [[RetryParameters]] configure the retry mechanism in
[[SparkConnectClient]]
+ *
+ * @param max_retries
+ * Maximum number of retries.
+ * @param initial_backoff
+ * Start value of the exponential backoff (ms).
+ * @param max_backoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoff_multiplier
+ * Multiplicative base of the exponential backoff.
+ * @param should_retry
+ * Function that determines whether a retry is to be performed in the
event of an error.
Review Comment:
specifying this configuration and defining which exceptions should be
retries is a followup?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]