GitHub user rednaxelafx opened a pull request:

    https://github.com/apache/spark/pull/20224

    [SPARK-23032][SQL] Add a per-query codegenStageId to WholeStageCodegenExec

    ## What changes were proposed in this pull request?
    
    **Proposal**
    
    Add a per-query ID to the codegen stages as represented by 
`WholeStageCodegenExec` operators. This ID will be used in
    -  the explain output of the physical plan, and in
    - the generated class name.
    
    Specifically, this ID will be stable within a query, counting up from 1 in 
depth-first post-order for all the `WholeStageCodegenExec` inserted into a plan.
    The ID value 0 is reserved for "free-floating" `WholeStageCodegenExec` 
objects, which may have been created for one-off purposes, e.g. for fallback 
handling of codegen stages that failed to codegen the whole stage and wishes to 
codegen a subset of the children operators (as seen in 
`org.apache.spark.sql.execution.FileSourceScanExec#doExecute`).
    
    Example: for the following query:
    ```scala
    scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)
    
    scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 
'y).orderBy('x).select('x + 1 as 'z, 'y)
    df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint]
    
    scala> val df2 = spark.range(5)
    df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    
    scala> val query = df1.join(df2, 'z === 'id)
    query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more 
field]
    ```
    
    The explain output before the change is:
    ```scala
    scala> query.explain
    == Physical Plan ==
    *SortMergeJoin [z#9L], [id#13L], Inner
    :- *Sort [z#9L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(z#9L, 200)
    :     +- *Project [(x#3L + 1) AS z#9L, y#4L]
    :        +- *Sort [x#3L ASC NULLS FIRST], true, 0
    :           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
    :              +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
    :                 +- *Range (0, 10, step=1, splits=8)
    +- *Sort [id#13L ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(id#13L, 200)
          +- *Range (0, 5, step=1, splits=8)
    ```
    Note how codegen'd operators are annotated with a prefix `"*"`. See how the 
`SortMergeJoin` operator and its direct children `Sort` operators are adjacent 
and all annotated with the `"*"`, so it's hard to tell they're actually in 
separate codegen stages.
    
    and after this change it'll be:
    ```scala
    scala> query.explain
    == Physical Plan ==
    *(6) SortMergeJoin [z#9L], [id#13L], Inner
    :- *(3) Sort [z#9L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(z#9L, 200)
    :     +- *(2) Project [(x#3L + 1) AS z#9L, y#4L]
    :        +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0
    :           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
    :              +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
    :                 +- *(1) Range (0, 10, step=1, splits=8)
    +- *(5) Sort [id#13L ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(id#13L, 200)
          +- *(4) Range (0, 5, step=1, splits=8)
    ```
    Note that the annotated prefix becomes `"*(id) "`. See how the 
`SortMergeJoin` operator and its direct children `Sort` operators have 
different codegen stage IDs.
    
    It'll also show up in the name of the generated class, as a suffix in the 
format of `GeneratedClass$GeneratedIterator$id`.
    
    For example, note how `GeneratedClass$GeneratedIterator$3` and 
`GeneratedClass$GeneratedIterator$6` in the following stack trace corresponds 
to the IDs shown in the explain output above:
    ```
    "Executor task launch worker for task 424@12957" daemon prio=5 tid=0x58 
nid=NA runnable
      java.lang.Thread.State: RUNNABLE
          at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
          at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$3.sort_addToSorter$(generated.java:32)
          at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$3.processNext(generated.java:41)
          at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
          at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494)
          at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$6.findNextInnerJoinRows$(generated.java:42)
          at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$6.processNext(generated.java:101)
          at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
          at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513)
          at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
          at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
          at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
          at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
          at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
          at org.apache.spark.scheduler.Task.run(Task.scala:109)
          at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:748)
    ```
    
    **Rationale**
    
    Right now, the codegen from Spark SQL lacks the means to differentiate 
between a couple of things:
    
    1. It's hard to tell which physical operators are in the same 
WholeStageCodegen stage. Note that this "stage" is a separate notion from 
Spark's RDD execution stages; this one is only to delineate codegen units.
    There can be adjacent physical operators that are both codegen'd but are in 
separate codegen stages. Some of this is due to hacky implementation details, 
such as the case with `SortMergeJoin` and its `Sort` inputs -- they're hard 
coded to be split into separate stages although both are codegen'd.
    When printing out the explain output of the physical plan, you'd only see 
the codegen'd physical operators annotated with a preceding star (`'*'`) but 
would have no way to figure out if they're in the same stage.
    
    2. Performance/error diagnosis
    The generated code has class/method names that are hard to differentiate 
between queries or even between codegen stages within the same query. If we use 
a Java-level profiler to collect profiles, or if we encounter a Java-level 
exception with a stack trace in it, it's really hard to tell which part of a 
query it's at.
    By introducing a per-query codegen stage ID, we'd at least be able to know 
which codegen stage (and in turn, which group of physical operators) was a 
profile tick or an exception happened.
    
    The reason why this proposal uses a per-query ID is because it's stable 
within a query, so that multiple runs of the same query will see the same 
resulting IDs. This both benefits understandability for users, and also it 
plays well with the codegen cache in Spark SQL which uses the generated source 
code as the key.
    
    The downside to using per-query IDs as opposed to a per-session or globally 
incrementing ID is of course we can't tell apart different query runs with this 
ID alone. But for now I believe this is a good enough tradeoff.
    
    ## How was this patch tested?
    
    Existing tests. This PR does not involve any runtime behavior changes other 
than some name changes.
    The SQL query test suites that compares explain outputs have been updates 
to ignore the newly added `codegenStageId`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rednaxelafx/apache-spark wsc-codegenstageid

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20224.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20224
    
----
commit 18a38603f5d3938b5cc967fbdb06ff54bd912fed
Author: Kris Mok <kris.mok@...>
Date:   2018-01-10T23:11:00Z

    Add a per-query codegenStageId to WholeStageCodegenExec

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to