GitHub user rednaxelafx opened a pull request:

    https://github.com/apache/spark/pull/23032

    [SPARK-XXXXX][SQL][MINOR] Reduce the number of unused UnsafeRowWriters 
created in whole-stage codegen

    ## What changes were proposed in this pull request?
    
    Reduce the number of unused `UnsafeRowWriter`s created in whole-stage 
generated code.
    They come from the `CodegenSupport.consume()` calling `prepareRowVar()`, 
which uses `GenerateUnsafeProjection.createCode()` and registers an 
`UnsafeRowWriter` mutable state, regardless of whether or not the downstream 
(parent) operator will use the `rowVar` or not.
    Even when the downstream `doConsume` function doesn't use the `rowVar` 
(i.e. doesn't put `row.code` as a part of this operator's codegen template), 
the registered `UnsafeRowWriter` stays there, which makes the init function of 
the generated code a bit bloated.
    
    This PR doesn't heal the root issue, but makes it slightly less painful: 
when the `doConsume` function is split out, the `prepareRowVar()` function is 
called twice, so it's double the pain of unused `UnsafeRowWriter`s. This PR 
simply moves the original call to `prepareRowVar()` down into the `doConsume` 
split/no-split branch so that we're back to just 1x the pain.
    
    To fix the root issue, something that allows the `CodegenSupport` operators 
to indicate whether or not they're going to use the `rowVar` would be needed. 
That's a much more elaborate change so I'd like to just make a minor fix first.
    
    e.g. for this query: `spark.range(10).as[Long].map(x => x + 1).map(x => x * 
x + 2)`
    **Before** (in Spark 2.4.0):
    ```java
    /* 023 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 024 */     partitionIndex = index;
    /* 025 */     this.inputs = inputs;
    /* 026 */
    /* 027 */     range_taskContext_0 = TaskContext.get();
    /* 028 */     range_inputMetrics_0 = 
range_taskContext_0.taskMetrics().inputMetrics();
    /* 029 */     range_mutableStateArray_0[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 030 */     range_mutableStateArray_0[1] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 031 */     range_mutableStateArray_0[2] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 032 */     range_mutableStateArray_0[3] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 033 */     range_mutableStateArray_0[4] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 034 */     range_mutableStateArray_0[5] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 035 */     range_mutableStateArray_0[6] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 036 */     range_mutableStateArray_0[7] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 037 */     range_mutableStateArray_0[8] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 038 */
    /* 039 */   }
    ```
    9 `UnsafeRowWriter`s created, 1 actually used.
    
    **After**:
    ```java
    /* 023 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 024 */     partitionIndex = index;
    /* 025 */     this.inputs = inputs;
    /* 026 */
    /* 027 */     range_taskContext_0 = TaskContext.get();
    /* 028 */     range_inputMetrics_0 = 
range_taskContext_0.taskMetrics().inputMetrics();
    /* 029 */     deserializetoobject_mutableStateArray_0[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 030 */     deserializetoobject_mutableStateArray_0[1] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 031 */     deserializetoobject_mutableStateArray_0[2] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 032 */     deserializetoobject_mutableStateArray_0[3] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 033 */     deserializetoobject_mutableStateArray_0[4] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 034 */
    /* 035 */   }
    ```
    5 `UnsafeRowWriter`s created, 1 actually used.
    
    ## How was this patch tested?
    
    Existing tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rednaxelafx/apache-spark wscg-reduce-rowwriter

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/23032.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #23032
    
----
commit e48ec9da9977846213044c5f6d093202ed0efd60
Author: Kris Mok <kris.mok@...>
Date:   2018-11-14T09:22:11Z

    Reduce the number of unused UnsafeRowWriters created in whole-stage codegen

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to