Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6205#discussion_r32350737
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -182,3 +184,114 @@ private[spark] object RpcAddress {
RpcAddress(host, port)
}
}
+
+
+/**
+ * An exception thrown if RpcTimeout modifies a [[TimeoutException]].
+ */
+private[rpc] class RpcTimeoutException(message: String, cause:
TimeoutException)
+ extends TimeoutException(message) { initCause(cause) }
+
+
+/**
+ * Associates a timeout with a description so that a when a
TimeoutException occurs, additional
+ * context about the timeout can be amended to the exception message.
+ * @param timeout timeout duration in seconds
+ * @param description description to be displayed in a timeout exception
+ */
+private[spark] class RpcTimeout(timeout: FiniteDuration, description:
String) {
+
+ /** Get the timeout duration */
+ def duration: FiniteDuration = timeout
+
+ /** Get the message associated with this timeout */
+ def message: String = description
+
+ /** Amends the standard message of TimeoutException to include the
description */
+ private def createRpcTimeoutException(te: TimeoutException):
RpcTimeoutException = {
+ new RpcTimeoutException(te.getMessage() + " " + description, te)
+ }
+
+ /**
+ * PartialFunction to match a TimeoutException and add the timeout
description to the message
+ *
+ * @note This can be used in the recover callback of a Future to add to
a TimeoutException
+ * Example:
+ * val timeout = new RpcTimeout(5 millis, "short timeout")
+ * Future(throw new
TimeoutException).recover(timeout.addMessageIfTimeout)
+ */
+ def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
+ // The exception has already been converted to a RpcTimeoutException
so just raise it
+ case rte: RpcTimeoutException => throw rte
+ // Any other TimeoutException get converted to a RpcTimeoutException
with modified message
+ case te: TimeoutException => throw createRpcTimeoutException(te)
+ }
+
+ /**
+ * Wait for the completed result and return it. If the result is not
available within this
+ * timeout, throw a [[RpcTimeoutException]] to indicate which
configuration controls the timeout.
+ * @param awaitable the `Awaitable` to be awaited
+ * @throws RpcTimeoutException if after waiting for the specified time
`awaitable`
+ * is still not ready
+ */
+ def awaitResult[T](awaitable: Awaitable[T]): T = {
+ try {
+ Await.result(awaitable, duration)
+ } catch addMessageIfTimeout
+ }
+}
+
+
+object RpcTimeout {
+
+ private[this] val messagePrefix = "This timeout is controlled by "
--- End diff --
I don't think this is related to whether or not we get the time as seconds
or millis. I'm thinking that we still leave the `FiniteDuration` in the
constructor, but just change that second param to the constructor to be just
the name of the conf, not the full description. eg. :
```scala
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 5149cf1..e0de397 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -197,19 +197,16 @@ private[rpc] class RpcTimeoutException(message:
String, cause: TimeoutException)
* Associates a timeout with a description so that a when a
TimeoutException occurs, additional
* context about the timeout can be amended to the exception message.
* @param timeout timeout duration in seconds
- * @param description description to be displayed in a timeout exception
+ * @param conf the configuration parameter that controls this timeout
*/
-private[spark] class RpcTimeout(timeout: FiniteDuration, description:
String) {
+private[spark] class RpcTimeout(timeout: FiniteDuration, val conf: String)
{
/** Get the timeout duration */
def duration: FiniteDuration = timeout
- /** Get the message associated with this timeout */
- def message: String = description
-
/** Amends the standard message of TimeoutException to include the
description */
private def createRpcTimeoutException(te: TimeoutException):
RpcTimeoutException = {
- new RpcTimeoutException(te.getMessage() + " " + description, te)
+ new RpcTimeoutException(te.getMessage() + ". This timeout is
controlled by " + conf, te)
}
```
(and corresponding changes to the `apply` methods.)
I do kinda wish that the values were read w/ `conf.getTimeAsMs` to allow
finer grained resolution, but (a) I'd be nervous about changing any default and
(b) in any case that should just be a separate issue.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]