jiangxb1987 commented on a change in pull request #34428:
URL: https://github.com/apache/spark/pull/34428#discussion_r742430558



##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -782,13 +784,23 @@ private[deploy] class Worker(
 
           case Failure(t) =>
             val failures = executorStateSyncFailureAttempts.getOrElse(fullId, 
0) + 1
-            if (failures < 5) {
+            if (failures < executorStateSyncMaxAttempts) {
               logError(s"Failed to send $newState to Master $masterRef, " +
-                s"will retry ($failures/5).", t)
+                s"will retry ($failures/$executorStateSyncMaxAttempts).", t)
               executorStateSyncFailureAttempts(fullId) = failures
+              // If the failure is not caused by TimeoutException, wait for a 
while before retry in
+              // case the connection is temporarily unavailable.
+              if (!t.isInstanceOf[TimeoutException]) {
+                try {
+                  Thread.sleep(defaultAskTimeout)

Review comment:
       Thanks @mridulm for raising the concern! The idea here is to make the 
wait time between two RPC sync attempts similar on failures. When the attempt 
fail due to TimeoutException, it should have waited for the interval defined by 
`defaultAskTimeout`. When the attempt fail due to bad network connection, it 
would fail immediately, so we insert an extra wait interval that is equal to 
`defaultAskTimeout` here. I agree the wait time here would not be strictly the 
same, but we just want to make sure it wait for a reasonable time range before 
starting another RPC sync attempt.

##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -782,13 +784,23 @@ private[deploy] class Worker(
 
           case Failure(t) =>
             val failures = executorStateSyncFailureAttempts.getOrElse(fullId, 
0) + 1
-            if (failures < 5) {
+            if (failures < executorStateSyncMaxAttempts) {
               logError(s"Failed to send $newState to Master $masterRef, " +
-                s"will retry ($failures/5).", t)
+                s"will retry ($failures/$executorStateSyncMaxAttempts).", t)
               executorStateSyncFailureAttempts(fullId) = failures
+              // If the failure is not caused by TimeoutException, wait for a 
while before retry in
+              // case the connection is temporarily unavailable.
+              if (!t.isInstanceOf[TimeoutException]) {
+                try {
+                  Thread.sleep(defaultAskTimeout)

Review comment:
       Thanks @mridulm for raising the concern! The idea here is to make the 
wait time between two RPC sync attempts similar on failures. When the attempt 
fail due to TimeoutException, it should have waited for the interval defined by 
`defaultAskTimeout`. When the attempt fail due to bad network connection, it 
would fail immediately, so we insert an extra wait interval that is equal to 
`defaultAskTimeout` here. I agree the wait time here would not be strictly the 
same, but we just want to make sure it wait for a reasonable time range before 
starting another RPC sync attempt.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to