Bobby Wang created SPARK-40932:
----------------------------------

             Summary: Barrier: messages for allGather will be overridden by the 
following barrier APIs
                 Key: SPARK-40932
                 URL: https://issues.apache.org/jira/browse/SPARK-40932
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.3.1, 3.3.0
            Reporter: Bobby Wang


When I was working on an internal project which has not been opened source. I 
found this bug that the messages for Barrier.allGather may be overridden by the 
following Barrier APIs, which means the user can't get the correct allGather 
message.

 

This issue can easily repro by the following unit tests.

 

 
{code:java}
test("SPARK-XXX, messages of allGather should not been overridden " +
  "by the following barrier APIs") {

  sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local[2]"))
  sc.setLogLevel("INFO")
  val rdd = sc.makeRDD(1 to 10, 2)
  val rdd2 = rdd.barrier().mapPartitions { it =>
    val context = BarrierTaskContext.get()
    // Sleep for a random time before global sync.
    Thread.sleep(Random.nextInt(1000))
    // Pass partitionId message in
    val message: String = context.partitionId().toString
    val messages: Array[String] = context.allGather(message)
    context.barrier()
    Iterator.single(messages.toList)
  }
  val messages = rdd2.collect()
  // All the task partitionIds are shared across all tasks
  assert(messages.length === 2)
  messages.foreach(m => println("------- " + m))
  assert(messages.forall(_ == List("0", "1")))
} {code}
 

 

before throwing the exception by (assert(messages.forall(_ == List("0", "1"))), 
the print log is 

 
{code:java}
------- List(, )
------- List(, ) {code}
 

 

You can see, the messages are empty which has been overridden by 
context.barrier() API.

 

Below is the spark log,

 

_22/10/27 17:03:50.236 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO Executor: Running task 0.0 in stage 0.0 (TID 1)_
_22/10/27 17:03:50.236 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO Executor: Running task 1.0 in stage 0.0 (TID 0)_
_22/10/27 17:03:50.949 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 0._
_22/10/27 17:03:50.964 dispatcher-event-loop-1 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 0._
_22/10/27 17:03:50.966 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, current 
progress: 1/2._
_22/10/27 17:03:51.436 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 0._
_22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 0._
_22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, current 
progress: 2/2._
_22/10/27 17:03:51.440 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 0 from Stage 0 (Attempt 0) received all updates from tasks, finished 
successfully._
_22/10/27 17:03:51.958 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 1._
_22/10/27 17:03:51.959 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 1._
_22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 1._
_22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received update from Task 1, current 
progress: 1/2._
_22/10/27 17:03:52.437 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 1._
_22/10/27 17:03:52.438 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the 
global sync, current barrier epoch is 1._
_22/10/27 17:03:52.438 dispatcher-event-loop-0 INFO BarrierCoordinator: Current 
barrier epoch for Stage 0 (Attempt 0) is 1._
_22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received update from Task 0, current 
progress: 2/2._
_22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier 
sync epoch 1 from Stage 0 (Attempt 0) received all updates from tasks, finished 
successfully._
_22/10/27 17:03:52.960 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 2._
_22/10/27 17:03:52.972 Executor task launch worker for task 0.0 in stage 0.0 
(TID 1) INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 1040 bytes 
result sent to driver_
_22/10/27 17:03:52.974 dispatcher-event-loop-1 INFO TaskSchedulerImpl: Skip 
current round of resource offers for barrier stage 0 because the barrier 
taskSet requires 2 slots, while the total number of available slots is 1._
_22/10/27 17:03:52.976 task-result-getter-0 INFO TaskSetManager: Finished task 
0.0 in stage 0.0 (TID 1) in 2762 ms on 192.168.31.236 (executor driver) (1/2)_
_22/10/27 17:03:53.439 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global 
sync successfully, waited for 1 seconds, current barrier epoch is 2._
_22/10/27 17:03:53.445 Executor task launch worker for task 1.0 in stage 0.0 
(TID 0) INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 1040 bytes 
result sent to driver_

 

After debugging, I found the [object 
messages|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102]
 (Array[String]) returning to BarrierTaskContext are the same as the [original 
message|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107]]

 

I will file a PR for this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to