Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6205#discussion_r31830196
--- Diff: core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---
@@ -182,3 +185,130 @@ private[spark] object RpcAddress {
RpcAddress(host, port)
}
}
+
+
+/**
+ * An exception thrown if RpcTimeout modifies a [[TimeoutException]].
+ */
+private[rpc] class RpcTimeoutException(message: String)
+ extends TimeoutException(message)
+
+
+/**
+ * Associates a timeout with a description so that a when a
TimeoutException occurs, additional
+ * context about the timeout can be amended to the exception message.
+ * @param timeout timeout duration in seconds
+ * @param description description to be displayed in a timeout exception
+ */
+private[spark] class RpcTimeout(timeout: FiniteDuration, description:
String) {
+
+ /** Get the timeout duration */
+ def duration: FiniteDuration = timeout
+
+ /** Get the message associated with this timeout */
+ def message: String = description
+
+ /** Amends the standard message of TimeoutException to include the
description */
+ def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException
= {
+ new RpcTimeoutException(te.getMessage() + " " + description)
+ }
+
+ /**
+ * Add a callback to the given Future so that if it completes as failed
with a TimeoutException
+ * then the timeout description is added to the message
+ */
+ def addMessageIfTimeout[T](future: Future[T]): Future[T] = {
+ future.recover {
+ // Add a warning message if Future is passed to
addMessageIfTimeoutTest more than once
+ case rte: RpcTimeoutException => throw new
RpcTimeoutException(rte.getMessage() +
+ " (Future has multiple calls to
RpcTimeout.addMessageIfTimeoutTest)")
+ // Any other TimeoutException get converted to a RpcTimeoutException
with modified message
+ case te: TimeoutException => throw createRpcTimeoutException(te)
+ }(ThreadUtils.sameThread)
+ }
+
+ /** Applies the duration to create future before calling
addMessageIfTimeout*/
+ def addMessageIfTimeout[T](f: FiniteDuration => Future[T]): Future[T] = {
+ addMessageIfTimeout(f(duration))
+ }
+
+ /**
+ * Waits for a completed result to catch and amend a TimeoutException
message
+ * @param awaitable the `Awaitable` to be awaited
+ * @throws RpcTimeoutException if after waiting for the specified time
`awaitable`
+ * is still not ready
+ */
+ def awaitResult[T](awaitable: Awaitable[T]): T = {
+ try {
+ Await.result(awaitable, duration)
+ }
+ catch {
+ // The exception has already been converted to a RpcTimeoutException
so just raise it
+ case rte: RpcTimeoutException => throw rte
+ // Any other TimeoutException get converted to a RpcTimeoutException
with modified message
+ case te: TimeoutException => throw createRpcTimeoutException(te)
+ }
+ }
+}
+
+
+/**
+ * Create an RpcTimeout using a configuration property that controls the
timeout duration so when
+ * a TimeoutException is thrown, the property key will be indicated in the
message.
+ */
+object RpcTimeout {
+
+ private[this] val messagePrefix = "This timeout is controlled by "
+
+ /**
+ * Lookup the timeout property in the configuration and create
+ * a RpcTimeout with the property key in the description.
+ * @param conf configuration properties containing the timeout
+ * @param timeoutProp property key for the timeout in seconds
+ * @throws NoSuchElementException if property is not set
+ */
+ def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
+ val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
+ new RpcTimeout(timeout, messagePrefix + timeoutProp)
+ }
+
+ /**
+ * Lookup the timeout property in the configuration and create
+ * a RpcTimeout with the property key in the description.
+ * Uses the given default value if property is not set
+ * @param conf configuration properties containing the timeout
+ * @param timeoutProp property key for the timeout in seconds
+ * @param defaultValue default timeout value in seconds if property not
found
+ */
+ def apply(conf: SparkConf, timeoutProp: String, defaultValue: String):
RpcTimeout = {
+ val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue)
seconds }
+ new RpcTimeout(timeout, messagePrefix + timeoutProp)
+ }
+
+ /**
+ * Lookup prioritized list of timeout properties in the configuration
+ * and create a RpcTimeout with the first set property key in the
+ * description.
+ * Uses the given default value if property is not set
+ * @param conf configuration properties containing the timeout
+ * @param timeoutPropList prioritized list of property keys for the
timeout in seconds
+ * @param defaultValue default timeout value in seconds if no properties
found
+ */
+ def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue:
String): RpcTimeout = {
+ require(timeoutPropList.nonEmpty)
+
+ // Find the first set property or use the default value with the first
property
+ val itr = timeoutPropList.iterator
+ var foundProp = None: Option[(String, String)]
+ while (itr.hasNext && foundProp.isEmpty){
+ val propKey = itr.next()
+ conf.getOption(propKey) match {
+ case Some(prop) => foundProp = Some(propKey,prop)
+ case None =>
--- End diff --
this could be `conf.getOption(propKey).foreach { prop => foundProp =
Some(propKey, prop) }`. Its fine as is, though
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]