[ 
https://issues.apache.org/jira/browse/SPARK-27614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27614:
---------------------------------
    Description: 
Most of the Tasks have been completed, and individual Tasks have a particularly 
long Duration and are not being processed at all

 

The corresponding Executor has a connection timeout, and the stack information 
shows hang in the method of ShuffleBlockFetcherIterator.next.

 

The corresponding code is as follows:

{code}
      while (!isZombie && result == null) {

      val startFetchWait = System.nanoTime()

      result = results.take()

      val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startFetchWait)

      shuffleMetrics.incFetchWaitTime(fetchWaitTime)
{code}

        

 

LinkedBlockingQueue's take method is blocked. We can use poll instead. The 
modified code is as follows:

{code}

  currentResult = 
if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", 
true)) results.take()

    else {

      logInfo("set spark.shuffle.fetch.timeout.enable=true.")

      val GB = 1L << 30

      val MB = 1L << 20

      val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS)

      else if(bytesInFlight >= GB) (1, TimeUnit.HOURS)

      else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES)

      else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES)

      else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES)

      else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES)

      else (10, TimeUnit.MINUTES)

      val r = results.poll(waitTime, unit)

      if(r == null) {

        val cost = "cost " + waitTime + unit.toString + " to wait for a shuffle 
block, give up!"

        logError(cost)

        throw new SparkException(cost)
{code}


  was:
Most of the Tasks have been completed, and individual Tasks have a particularly 
long Duration and are not being processed at all

 

The corresponding Executor has a connection timeout, and the stack information 
shows hang in the method of ShuffleBlockFetcherIterator.next.

 

The corresponding code is as follows:

      while (!isZombie && result == null) {

      val startFetchWait = System.nanoTime()

      result = results.take()

      val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startFetchWait)

      shuffleMetrics.incFetchWaitTime(fetchWaitTime)

        

 

LinkedBlockingQueue's take method is blocked. We can use poll instead. The 
modified code is as follows:

  currentResult = 
if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", 
true)) results.take()

    else {

      logInfo("set spark.shuffle.fetch.timeout.enable=true.")

      val GB = 1L << 30

      val MB = 1L << 20

      val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS)

      else if(bytesInFlight >= GB) (1, TimeUnit.HOURS)

      else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES)

      else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES)

      else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES)

      else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES)

      else (10, TimeUnit.MINUTES)

      val r = results.poll(waitTime, unit)

      if(r == null) {

        val cost = "cost " + waitTime + unit.toString + " to wait for a shuffle 
block, give up!"

        logError(cost)

        throw new SparkException(cost)


> Executor shuffle fetch hang
> ---------------------------
>
>                 Key: SPARK-27614
>                 URL: https://issues.apache.org/jira/browse/SPARK-27614
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.1, 2.4.0
>            Reporter: weDataSphere
>            Priority: Major
>
> Most of the Tasks have been completed, and individual Tasks have a 
> particularly long Duration and are not being processed at all
>  
> The corresponding Executor has a connection timeout, and the stack 
> information shows hang in the method of ShuffleBlockFetcherIterator.next.
>  
> The corresponding code is as follows:
> {code}
>       while (!isZombie && result == null) {
>       val startFetchWait = System.nanoTime()
>       result = results.take()
>       val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
> startFetchWait)
>       shuffleMetrics.incFetchWaitTime(fetchWaitTime)
> {code}
>         
>  
> LinkedBlockingQueue's take method is blocked. We can use poll instead. The 
> modified code is as follows:
> {code}
>   currentResult = 
> if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", 
> true)) results.take()
>     else {
>       logInfo("set spark.shuffle.fetch.timeout.enable=true.")
>       val GB = 1L << 30
>       val MB = 1L << 20
>       val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS)
>       else if(bytesInFlight >= GB) (1, TimeUnit.HOURS)
>       else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES)
>       else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES)
>       else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES)
>       else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES)
>       else (10, TimeUnit.MINUTES)
>       val r = results.poll(waitTime, unit)
>       if(r == null) {
>         val cost = "cost " + waitTime + unit.toString + " to wait for a 
> shuffle block, give up!"
>         logError(cost)
>         throw new SparkException(cost)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to