Github user mpetruska commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19671#discussion_r163297550
  
    --- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1322,33 +1322,55 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     
       test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are 
working") {
         val tryAgainMsg = "test_spark_20640_try_again"
    +    val timingoutExecutor = "timingoutExecutor"
    +    val tryAgainExecutor = "tryAgainExecutor"
    +    val succeedingExecutor = "succeedingExecutor"
    +
         // a server which delays response 50ms and must try twice for success.
         def newShuffleServer(port: Int): (TransportServer, Int) = {
    -      val attempts = new mutable.HashMap[String, Int]()
    +      val failure = new Exception(tryAgainMsg)
    +      val success = ByteBuffer.wrap(new Array[Byte](0))
    +
    +      var secondExecutorFailedOnce = false
    +      var thirdExecutorFailedOnce = false
    +
           val handler = new NoOpRpcHandler {
             override def receive(
                 client: TransportClient,
                 message: ByteBuffer,
                 callback: RpcResponseCallback): Unit = {
               val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
               msgObj match {
    -            case exec: RegisterExecutor =>
    -              Thread.sleep(50)
    -              val attempt = attempts.getOrElse(exec.execId, 0) + 1
    -              attempts(exec.execId) = attempt
    -              if (attempt < 2) {
    -                callback.onFailure(new Exception(tryAgainMsg))
    -                return
    -              }
    -              callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
    +
    +            case exec: RegisterExecutor if exec.execId == 
timingoutExecutor =>
    +              () // No reply to generate client-side timeout
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == tryAgainExecutor && 
!secondExecutorFailedOnce =>
    +              secondExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == tryAgainExecutor 
=>
    +              callback.onSuccess(success)
    +
    +            case exec: RegisterExecutor
    +              if exec.execId == succeedingExecutor && 
!thirdExecutorFailedOnce =>
    +              thirdExecutorFailedOnce = true
    +              callback.onFailure(failure)
    +
    +            case exec: RegisterExecutor if exec.execId == 
succeedingExecutor =>
    +              callback.onSuccess(success)
    +
               }
             }
           }
     
    -      val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 
numUsableCores = 0)
    +      val transConf: TransportConf =
    --- End diff --
    
    Not really, although IntelliJ Idea recommends adding the type annotation. 
Do you think I should delete it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to