GitHub user kiszk opened a pull request:
https://github.com/apache/spark/pull/13911
[SPARK-16215][SQL] Reduce runtime overhead of a program that writes an
primitive array in Dataframe/Dataset
## What changes were proposed in this pull request?
This PR optimize generate code of projection for an primitive type array.
While we know primitive type array does not require null check and has
contigious data region, current generated code performs null checks and
performs copy for each element (at Lines 075-082 at Generated code before
applying this PR)
1. Eliminate null checks for each array element
2. Perform bulk data copy by using ```Platform.copy```
3. Eliminate primitive array allocation in ```GenericArrayData``` when
https://github.com/apache/spark/pull/13758 is merged
4. Eliminate setting sparse index for ```UnsafeArrayData``` when
https://github.com/apache/spark/pull/13680 is merged
They are done in helper method
```UnsafeArrayWrite.writePrimitive<PrimitiveType>Array()``` (at Line 075 at
Generated code after applying this PR).
For now, 3 and 4 are not enabled. But, code are ready.
An example program
```
val df = sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF
df.selectExpr("Array(value + 1.1d, value + 2.2d)").collect
```
Generated code before applying this PR
```java
/* 028 */ protected void processNext() throws java.io.IOException {
/* 029 */ while (inputadapter_input.hasNext()) {
/* 030 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 031 */ double inputadapter_value = inputadapter_row.getDouble(0);
/* 032 */
/* 033 */ final boolean project_isNull = false;
/* 034 */ this.project_values = new Object[2];
/* 035 */ double project_value1 = -1.0;
/* 036 */ project_value1 = inputadapter_value + 1.1D;
/* 037 */ if (false) {
/* 038 */ project_values[0] = null;
/* 039 */ } else {
/* 040 */ project_values[0] = project_value1;
/* 041 */ }
/* 042 */
/* 043 */ double project_value4 = -1.0;
/* 044 */ project_value4 = inputadapter_value + 2.2D;
/* 045 */ if (false) {
/* 046 */ project_values[1] = null;
/* 047 */ } else {
/* 048 */ project_values[1] = project_value4;
/* 049 */ }
/* 050 */
/* 051 */ final ArrayData project_value = new
org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 052 */ this.project_values = null;
/* 053 */ project_holder.reset();
/* 054 */
/* 055 */ project_rowWriter.zeroOutNullBytes();
/* 056 */
/* 057 */ if (project_isNull) {
/* 058 */ project_rowWriter.setNullAt(0);
/* 059 */ } else {
/* 060 */ // Remember the current cursor so that we can calculate
how many bytes are
/* 061 */ // written later.
/* 062 */ final int project_tmpCursor = project_holder.cursor;
/* 063 */
/* 064 */ if (project_value instanceof UnsafeArrayData) {
/* 065 */ final int project_sizeInBytes = ((UnsafeArrayData)
project_value).getSizeInBytes();
/* 066 */ // grow the global buffer before writing data.
/* 067 */ project_holder.grow(project_sizeInBytes);
/* 068 */ ((UnsafeArrayData)
project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 069 */ project_holder.cursor += project_sizeInBytes;
/* 070 */
/* 071 */ } else {
/* 072 */ final int project_numElements =
project_value.numElements();
/* 073 */ project_arrayWriter.initialize(project_holder,
project_numElements, 8);
/* 074 */
/* 075 */ for (int project_index = 0; project_index <
project_numElements; project_index++) {
/* 076 */ if (project_value.isNullAt(project_index)) {
/* 077 */ project_arrayWriter.setNullAt(project_index);
/* 078 */ } else {
/* 079 */ final double project_element =
project_value.getDouble(project_index);
/* 080 */ project_arrayWriter.write(project_index,
project_element);
/* 081 */ }
/* 082 */ }
/* 083 */
/* 084 */ }
/* 085 */
/* 086 */ project_rowWriter.setOffsetAndSize(0, project_tmpCursor,
project_holder.cursor - project_tmpCursor);
/* 087 */ project_rowWriter.alignToWords(project_holder.cursor -
project_tmpCursor);
/* 088 */ }
/* 089 */ project_result.setTotalSize(project_holder.totalSize());
/* 090 */ append(project_result);
/* 091 */ if (shouldStop()) return;
/* 092 */ }
/* 093 */ }
/* 094 */ }
```
Generated code after applying this PR
```java
/* 028 */ protected void processNext() throws java.io.IOException {
/* 029 */ while (inputadapter_input.hasNext()) {
/* 030 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 031 */ double inputadapter_value = inputadapter_row.getDouble(0);
/* 032 */
/* 033 */ final boolean project_isNull = false;
/* 034 */ this.project_values = new Object[2];
/* 035 */ double project_value1 = -1.0;
/* 036 */ project_value1 = inputadapter_value + 1.1D;
/* 037 */ if (false) {
/* 038 */ project_values[0] = null;
/* 039 */ } else {
/* 040 */ project_values[0] = project_value1;
/* 041 */ }
/* 042 */
/* 043 */ double project_value4 = -1.0;
/* 044 */ project_value4 = inputadapter_value + 2.2D;
/* 045 */ if (false) {
/* 046 */ project_values[1] = null;
/* 047 */ } else {
/* 048 */ project_values[1] = project_value4;
/* 049 */ }
/* 050 */
/* 051 */ final ArrayData project_value = new
org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 052 */ this.project_values = null;
/* 053 */ project_holder.reset();
/* 054 */
/* 055 */ project_rowWriter.zeroOutNullBytes();
/* 056 */
/* 057 */ if (project_isNull) {
/* 058 */ project_rowWriter.setNullAt(0);
/* 059 */ } else {
/* 060 */ // Remember the current cursor so that we can calculate
how many bytes are
/* 061 */ // written later.
/* 062 */ final int project_tmpCursor = project_holder.cursor;
/* 063 */
/* 064 */ if (project_value instanceof UnsafeArrayData) {
/* 065 */ final int project_sizeInBytes = ((UnsafeArrayData)
project_value).getSizeInBytes();
/* 066 */ // grow the global buffer before writing data.
/* 067 */ project_holder.grow(project_sizeInBytes);
/* 068 */ ((UnsafeArrayData)
project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 069 */ project_holder.cursor += project_sizeInBytes;
/* 070 */
/* 071 */ } else {
/* 072 */ final int project_numElements =
project_value.numElements();
/* 073 */ project_arrayWriter.initialize(project_holder,
project_numElements, 8);
/* 074 */
/* 075 */
project_arrayWriter.writePrimitiveDoubleArray(project_value);
/* 076 */ }
/* 077 */
/* 078 */ project_rowWriter.setOffsetAndSize(0, project_tmpCursor,
project_holder.cursor - project_tmpCursor);
/* 079 */ project_rowWriter.alignToWords(project_holder.cursor -
project_tmpCursor);
/* 080 */ }
/* 081 */ project_result.setTotalSize(project_holder.totalSize());
/* 082 */ append(project_result);
/* 083 */ if (shouldStop()) return;
/* 084 */ }
/* 085 */ }
```
## How was this patch tested?
Added test suites into ```DataFrameComplexTypeSuite```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kiszk/spark SPARK-16215
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/13911.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #13911
----
commit 31399cd3273fabe6f33c6f2de5060be116df5d98
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-06-26T05:10:17Z
pass information on containsNull in ArrayType
commit d95706a916361e21fdfc32c602d7008d148dfdc2
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-06-26T05:54:15Z
generate helper call instead of for-loop
commit b1f6289d99445980f35a7e80127fb129517280d5
Author: Kazuaki Ishizaki <[email protected]>
Date: 2016-06-26T05:54:37Z
add test suite
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]