GitHub user rednaxelafx opened a pull request:
https://github.com/apache/spark/pull/23032
[SPARK-XXXXX][SQL][MINOR] Reduce the number of unused UnsafeRowWriters
created in whole-stage codegen
## What changes were proposed in this pull request?
Reduce the number of unused `UnsafeRowWriter`s created in whole-stage
generated code.
They come from the `CodegenSupport.consume()` calling `prepareRowVar()`,
which uses `GenerateUnsafeProjection.createCode()` and registers an
`UnsafeRowWriter` mutable state, regardless of whether or not the downstream
(parent) operator will use the `rowVar` or not.
Even when the downstream `doConsume` function doesn't use the `rowVar`
(i.e. doesn't put `row.code` as a part of this operator's codegen template),
the registered `UnsafeRowWriter` stays there, which makes the init function of
the generated code a bit bloated.
This PR doesn't heal the root issue, but makes it slightly less painful:
when the `doConsume` function is split out, the `prepareRowVar()` function is
called twice, so it's double the pain of unused `UnsafeRowWriter`s. This PR
simply moves the original call to `prepareRowVar()` down into the `doConsume`
split/no-split branch so that we're back to just 1x the pain.
To fix the root issue, something that allows the `CodegenSupport` operators
to indicate whether or not they're going to use the `rowVar` would be needed.
That's a much more elaborate change so I'd like to just make a minor fix first.
e.g. for this query: `spark.range(10).as[Long].map(x => x + 1).map(x => x *
x + 2)`
**Before** (in Spark 2.4.0):
```java
/* 023 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 024 */ partitionIndex = index;
/* 025 */ this.inputs = inputs;
/* 026 */
/* 027 */ range_taskContext_0 = TaskContext.get();
/* 028 */ range_inputMetrics_0 =
range_taskContext_0.taskMetrics().inputMetrics();
/* 029 */ range_mutableStateArray_0[0] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */ range_mutableStateArray_0[1] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */ range_mutableStateArray_0[2] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 032 */ range_mutableStateArray_0[3] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 033 */ range_mutableStateArray_0[4] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 034 */ range_mutableStateArray_0[5] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 035 */ range_mutableStateArray_0[6] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 036 */ range_mutableStateArray_0[7] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 037 */ range_mutableStateArray_0[8] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 038 */
/* 039 */ }
```
9 `UnsafeRowWriter`s created, 1 actually used.
**After**:
```java
/* 023 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 024 */ partitionIndex = index;
/* 025 */ this.inputs = inputs;
/* 026 */
/* 027 */ range_taskContext_0 = TaskContext.get();
/* 028 */ range_inputMetrics_0 =
range_taskContext_0.taskMetrics().inputMetrics();
/* 029 */ deserializetoobject_mutableStateArray_0[0] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */ deserializetoobject_mutableStateArray_0[1] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */ deserializetoobject_mutableStateArray_0[2] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 032 */ deserializetoobject_mutableStateArray_0[3] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 033 */ deserializetoobject_mutableStateArray_0[4] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 034 */
/* 035 */ }
```
5 `UnsafeRowWriter`s created, 1 actually used.
## How was this patch tested?
Existing tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rednaxelafx/apache-spark wscg-reduce-rowwriter
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/23032.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #23032
----
commit e48ec9da9977846213044c5f6d093202ed0efd60
Author: Kris Mok <kris.mok@...>
Date: 2018-11-14T09:22:11Z
Reduce the number of unused UnsafeRowWriters created in whole-stage codegen
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]