[ 
https://issues.apache.org/jira/browse/SPARK-40932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bobby Wang updated SPARK-40932:
-------------------------------
    Description: 
When I was working on an internal project which has not been opened source. I 
found this bug that the messages for Barrier.allGather may be overridden by the 
following Barrier APIs, which means the user can't get the correct allGather 
message.

 

This issue can easily repro by the following unit tests.

 

 
{code:java}
test("SPARK-XXX, messages of allGather should not been overridden " +
  "by the following barrier APIs") {

  sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local[2]"))
  sc.setLogLevel("INFO")
  val rdd = sc.makeRDD(1 to 10, 2)
  val rdd2 = rdd.barrier().mapPartitions { it =>
    val context = BarrierTaskContext.get()
    // Sleep for a random time before global sync.
    Thread.sleep(Random.nextInt(1000))
    // Pass partitionId message in
    val message: String = context.partitionId().toString
    val messages: Array[String] = context.allGather(message)
    context.barrier()
    Iterator.single(messages.toList)
  }
  val messages = rdd2.collect()
  // All the task partitionIds are shared across all tasks
  assert(messages.length === 2)
  messages.foreach(m => println("------- " + m))
  assert(messages.forall(_ == List("0", "1")))
} {code}
 

 

before throwing the exception by (assert(messages.forall(_ == List("0", "1"))), 
the print log is 

 
{code:java}
------- List(, )
------- List(, ) {code}
 

 

You can see, the messages are empty which has been overridden by 
context.barrier() API.

 

Below is the spark log,

 

_22/10/27 17:03:50.236 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO Executor: Running task 0.0 in stage 0.0 (TID 1)_
_22/10/27 17:03:50.236 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO Executor: Running task 1.0 in stage 0.0 (TID 0)_
_22/10/27 17:03:50.949 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 0._
_22/10/27 17:03:50.964 dispatcher-event-loop-1 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 0._
_22/10/27 17:03:50.966 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, current 
progress: 1/2._
_22/10/27 17:03:51.436 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 0._
_22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 0._
_22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, current 
progress: 2/2._
_22/10/27 17:03:51.440 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received all updates from tasks, finished 
successfully._
_22/10/27 17:03:51.958 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 1._
_22/10/27 17:03:51.959 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 1._
_22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 1._
_22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received update from Task 1, current 
progress: 1/2._
_22/10/27 17:03:52.437 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 1._
_22/10/27 17:03:52.438 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 1._
_22/10/27 17:03:52.438 dispatcher-event-loop-0 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 1._
_22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received update from Task 0, current 
progress: 2/2._
_22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received all updates from tasks, finished 
successfully._
_22/10/27 17:03:52.960 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 2._
_22/10/27 17:03:52.972 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 1040 bytes 
result sent to driver_
_22/10/27 17:03:52.974 dispatcher-event-loop-1 INFO TaskSchedulerImpl: Skip 
current round of resource offers for barrier stage 0 because the barrier 
taskSet requires 2 slots, while the total number of available slots is 1._
_22/10/27 17:03:52.976 task-result-getter-0 INFO TaskSetManager: Finished task 
0.0 in stage 0.0 (TID 1) in 2762 ms on 192.168.31.236 (executor driver) (1/2)_
_22/10/27 17:03:53.439 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 2._
_22/10/27 17:03:53.445 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 1040 bytes 
result sent to driver_

 

After debugging, I found the [object 
messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102]
 (Array[String]) returning to BarrierTaskContext are the same as the [original 
messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107]

 

I will file a PR for this issue

  was:
When I was working on an internal project which has not been opened source. I 
found this bug that the messages for Barrier.allGather may be overridden by the 
following Barrier APIs, which means the user can't get the correct allGather 
message.

 

This issue can easily repro by the following unit tests.

 

 
{code:java}
test("SPARK-XXX, messages of allGather should not been overridden " +
  "by the following barrier APIs") {

  sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local[2]"))
  sc.setLogLevel("INFO")
  val rdd = sc.makeRDD(1 to 10, 2)
  val rdd2 = rdd.barrier().mapPartitions { it =>
    val context = BarrierTaskContext.get()
    // Sleep for a random time before global sync.
    Thread.sleep(Random.nextInt(1000))
    // Pass partitionId message in
    val message: String = context.partitionId().toString
    val messages: Array[String] = context.allGather(message)
    context.barrier()
    Iterator.single(messages.toList)
  }
  val messages = rdd2.collect()
  // All the task partitionIds are shared across all tasks
  assert(messages.length === 2)
  messages.foreach(m => println("------- " + m))
  assert(messages.forall(_ == List("0", "1")))
} {code}
 

 

before throwing the exception by (assert(messages.forall(_ == List("0", "1"))), 
the print log is 

 
{code:java}
------- List(, )
------- List(, ) {code}
 

 

You can see, the messages are empty which has been overridden by 
context.barrier() API.

 

Below is the spark log,

 

_22/10/27 17:03:50.236 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO Executor: Running task 0.0 in stage 0.0 (TID 1)_
_22/10/27 17:03:50.236 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO Executor: Running task 1.0 in stage 0.0 (TID 0)_
_22/10/27 17:03:50.949 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 0._
_22/10/27 17:03:50.964 dispatcher-event-loop-1 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 0._
_22/10/27 17:03:50.966 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, current 
progress: 1/2._
_22/10/27 17:03:51.436 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 0._
_22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 0._
_22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, current 
progress: 2/2._
_22/10/27 17:03:51.440 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received all updates from tasks, finished 
successfully._
_22/10/27 17:03:51.958 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 1._
_22/10/27 17:03:51.959 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 1._
_22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 1._
_22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received update from Task 1, current 
progress: 1/2._
_22/10/27 17:03:52.437 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 1._
_22/10/27 17:03:52.438 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 1._
_22/10/27 17:03:52.438 dispatcher-event-loop-0 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 1._
_22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received update from Task 0, current 
progress: 2/2._
_22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received all updates from tasks, finished 
successfully._
_22/10/27 17:03:52.960 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 2._
_22/10/27 17:03:52.972 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 1040 bytes 
result sent to driver_
_22/10/27 17:03:52.974 dispatcher-event-loop-1 INFO TaskSchedulerImpl: Skip 
current round of resource offers for barrier stage 0 because the barrier 
taskSet requires 2 slots, while the total number of available slots is 1._
_22/10/27 17:03:52.976 task-result-getter-0 INFO TaskSetManager: Finished task 
0.0 in stage 0.0 (TID 1) in 2762 ms on 192.168.31.236 (executor driver) (1/2)_
_22/10/27 17:03:53.439 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 2._
_22/10/27 17:03:53.445 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 1040 bytes 
result sent to driver_

 

After debugging, I found the [object 
messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102]
 (Array[String]) returning to BarrierTaskContext are the same as the [original 
message|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107]]

 

I will file a PR for this issue


> Barrier: messages for allGather will be overridden by the following barrier 
> APIs
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-40932
>                 URL: https://issues.apache.org/jira/browse/SPARK-40932
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.0, 3.3.1
>            Reporter: Bobby Wang
>            Priority: Critical
>
> When I was working on an internal project which has not been opened source. I 
> found this bug that the messages for Barrier.allGather may be overridden by 
> the following Barrier APIs, which means the user can't get the correct 
> allGather message.
>  
> This issue can easily repro by the following unit tests.
>  
>  
> {code:java}
> test("SPARK-XXX, messages of allGather should not been overridden " +
>   "by the following barrier APIs") {
>   sc = new SparkContext(new 
> SparkConf().setAppName("test").setMaster("local[2]"))
>   sc.setLogLevel("INFO")
>   val rdd = sc.makeRDD(1 to 10, 2)
>   val rdd2 = rdd.barrier().mapPartitions { it =>
>     val context = BarrierTaskContext.get()
>     // Sleep for a random time before global sync.
>     Thread.sleep(Random.nextInt(1000))
>     // Pass partitionId message in
>     val message: String = context.partitionId().toString
>     val messages: Array[String] = context.allGather(message)
>     context.barrier()
>     Iterator.single(messages.toList)
>   }
>   val messages = rdd2.collect()
>   // All the task partitionIds are shared across all tasks
>   assert(messages.length === 2)
>   messages.foreach(m => println("------- " + m))
>   assert(messages.forall(_ == List("0", "1")))
> } {code}
>  
>  
> before throwing the exception by (assert(messages.forall(_ == List("0", 
> "1"))), the print log is 
>  
> {code:java}
> ------- List(, )
> ------- List(, ) {code}
>  
>  
> You can see, the messages are empty which has been overridden by 
> context.barrier() API.
>  
> Below is the spark log,
>  
> _22/10/27 17:03:50.236 Executor task launch worker for task 0.0 in stage 0.0 
> (TID 1) INFO Executor: Running task 0.0 in stage 0.0 (TID 1)_
> _22/10/27 17:03:50.236 Executor task launch worker for task 1.0 in stage 0.0 
> (TID 0) INFO Executor: Running task 1.0 in stage 0.0 (TID 0)_
> _22/10/27 17:03:50.949 Executor task launch worker for task 0.0 in stage 0.0 
> (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered 
> the global sync, current barrier epoch is 0._
> _22/10/27 17:03:50.964 dispatcher-event-loop-1 INFO BarrierCoordinator: 
> Current barrier epoch for Stage 0 (Attempt 0) is 0._
> _22/10/27 17:03:50.966 dispatcher-event-loop-1 INFO BarrierCoordinator: 
> Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, 
> current progress: 1/2._
> _22/10/27 17:03:51.436 Executor task launch worker for task 1.0 in stage 0.0 
> (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered 
> the global sync, current barrier epoch is 0._
> _22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: 
> Current barrier epoch for Stage 0 (Attempt 0) is 0._
> _22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: 
> Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, 
> current progress: 2/2._
> _22/10/27 17:03:51.440 dispatcher-event-loop-0 INFO BarrierCoordinator: 
> Barrier sync epoch 0 from Stage 0 (Attempt 0) received all updates from 
> tasks, finished successfully._
> _22/10/27 17:03:51.958 Executor task launch worker for task 0.0 in stage 0.0 
> (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished 
> global sync successfully, waited for 1 seconds, current barrier epoch is 1._
> _22/10/27 17:03:51.959 Executor task launch worker for task 0.0 in stage 0.0 
> (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered 
> the global sync, current barrier epoch is 1._
> _22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: 
> Current barrier epoch for Stage 0 (Attempt 0) is 1._
> _22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: 
> Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 1, 
> current progress: 1/2._
> _22/10/27 17:03:52.437 Executor task launch worker for task 1.0 in stage 0.0 
> (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished 
> global sync successfully, waited for 1 seconds, current barrier epoch is 1._
> _22/10/27 17:03:52.438 Executor task launch worker for task 1.0 in stage 0.0 
> (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered 
> the global sync, current barrier epoch is 1._
> _22/10/27 17:03:52.438 dispatcher-event-loop-0 INFO BarrierCoordinator: 
> Current barrier epoch for Stage 0 (Attempt 0) is 1._
> _22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: 
> Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 0, 
> current progress: 2/2._
> _22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: 
> Barrier sync epoch 1 from Stage 0 (Attempt 0) received all updates from 
> tasks, finished successfully._
> _22/10/27 17:03:52.960 Executor task launch worker for task 0.0 in stage 0.0 
> (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished 
> global sync successfully, waited for 1 seconds, current barrier epoch is 2._
> _22/10/27 17:03:52.972 Executor task launch worker for task 0.0 in stage 0.0 
> (TID 1) INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 1040 bytes 
> result sent to driver_
> _22/10/27 17:03:52.974 dispatcher-event-loop-1 INFO TaskSchedulerImpl: Skip 
> current round of resource offers for barrier stage 0 because the barrier 
> taskSet requires 2 slots, while the total number of available slots is 1._
> _22/10/27 17:03:52.976 task-result-getter-0 INFO TaskSetManager: Finished 
> task 0.0 in stage 0.0 (TID 1) in 2762 ms on 192.168.31.236 (executor driver) 
> (1/2)_
> _22/10/27 17:03:53.439 Executor task launch worker for task 1.0 in stage 0.0 
> (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished 
> global sync successfully, waited for 1 seconds, current barrier epoch is 2._
> _22/10/27 17:03:53.445 Executor task launch worker for task 1.0 in stage 0.0 
> (TID 0) INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 1040 bytes 
> result sent to driver_
>  
> After debugging, I found the [object 
> messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102]
>  (Array[String]) returning to BarrierTaskContext are the same as the 
> [original 
> messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107]
>  
> I will file a PR for this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to