Vitaliy Savkin created SPARK-46198:
--------------------------------------

             Summary: Unexpected Shuffle Read when using cached DataFrame
                 Key: SPARK-46198
                 URL: https://issues.apache.org/jira/browse/SPARK-46198
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.1
            Reporter: Vitaliy Savkin
         Attachments: shuffle.png

When a computation is base on a cached data frames, I expect to see no Shuffle 
Reads, but it happens under certain circumstances.

*Reproduction*
{code:scala}
    val ctx: SQLContext = // init context 
    val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"

    def populateAndRead(tag: String): DataFrame = {
      val path = s"$root/numbers_$tag"
//      import ctx.implicits._
//      import org.apache.spark.sql.functions.lit
//      (0 to 10 * 1000 * 1000)
//        .toDF("id")
//        .withColumn(tag, lit(tag.toUpperCase))
//        .repartition(100)
//        .write
//        .option("header", "true")
//        .mode("ignore")
//        .csv(path)

      ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag + 
"_id")
    }

    val dfa = populateAndRead("a1")
    val dfb = populateAndRead("b1")
    val res =
      dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
        .unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
        .cache()

    println(res.count())
    res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
{code}
Relevant configs
{code:scala}
spark.executor.instances=10
spark.executor.cores=7
spark.executor.memory=40g
spark.executor.memoryOverhead=5g

spark.shuffle.service.enabled=true
spark.sql.adaptive.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1
{code}
Spark Plan says that cache is used
{code:scala}
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (27)
+- Coalesce (26)
   +- InMemoryTableScan (1)
         +- InMemoryRelation (2)
               +- Union (25)
                  :- * SortMergeJoin Inner (13)
                  :  :- * Sort (7)
                  :  :  +- Exchange (6)
                  :  :     +- * Project (5)
                  :  :        +- * Filter (4)
                  :  :           +- Scan csv  (3)
                  :  +- * Sort (12)
                  :     +- Exchange (11)
                  :        +- * Project (10)
                  :           +- * Filter (9)
                  :              +- Scan csv  (8)
                  +- * SortMergeJoin Inner (24)
                     :- * Sort (18)
                     :  +- Exchange (17)
                     :     +- * Project (16)
                     :        +- * Filter (15)
                     :           +- Scan csv  (14)
                     +- * Sort (23)
                        +- Exchange (22)
                           +- * Project (21)
                              +- * Filter (20)
                                 +- Scan csv  (19)
{code}
But when running on YARN, the csv job has shuffle reads.

!image-2023-12-01-09-27-39-463.png!

*Additional info*
 - I was unable to reproduce it with local Spark.
 - If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join 
conditions are changed to just {{{}"id"{}}}, the issue disappears!
 - This behaviour is stable - it's not a result of failed instances.

*Production impact*

Without cache saving data in production takes much longer (30 seconds vs 18 
seconds). To avoid shuffle reads, we had to add a {{repartition}} step before 
{{cache}} as a workaround, which reduced time from 18 seconds to 10.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to