GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/13909

    [SPARK-16213][SQL] Reduce runtime overhead of a program that creates an 
primitive array in DataFrame

    ## What changes were proposed in this pull request?
    
    This PR reduces runtime overhead of a program the creates an primitive 
array in DataFrameGenerated code performs boxing operation in an assignment 
from InternalRow to an ```Object[]``` temporary array (at Lines 040 and 048 in 
the generated code before applying this PR). If we know that type of array 
elements is primitive, we apply the following optimizations:
    
    1. Eliminate a pair of ```isNullAt()``` and a null assignment
    2. Allocate an primitive array instead of ```Object[]``` (eliminate boxing 
operations)
    3. Call ```GenericArrayData.allocate(project_values)``` to avoid 
[boxing](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala#L31)
 in constructor of ```GenericArrayData``` if 
https://github.com/apache/spark/pull/13758 is merged
    
    An example program
    ```
    val df = sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF
    df.selectExpr("Array(value + 1.1d, value + 2.2d)").show
    ```
    
    
    Generated code before applying this PR
    ```java
    /* 018 */   public void init(int index, scala.collection.Iterator inputs[]) 
{
    /* 019 */     partitionIndex = index;
    /* 020 */     inputadapter_input = inputs[0];
    /* 021 */     this.project_values = null;
    /* 022 */     project_result = new UnsafeRow(1);
    /* 023 */     this.project_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 
32);
    /* 024 */     this.project_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder,
 1);
    /* 025 */     this.project_arrayWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
    /* 026 */   }
    /* 027 */
    /* 028 */   protected void processNext() throws java.io.IOException {
    /* 029 */     while (inputadapter_input.hasNext()) {
    /* 030 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 031 */       double inputadapter_value = inputadapter_row.getDouble(0);
    /* 032 */
    /* 033 */       final boolean project_isNull = false;
    /* 034 */       this.project_values = new Object[2];
    /* 035 */       double project_value7 = -1.0;
    /* 036 */       project_value7 = inputadapter_value + 1.1D;
    /* 037 */       if (false) {
    /* 038 */         project_values[0] = null;
    /* 039 */       } else {
    /* 040 */         project_values[0] = project_value7;
    /* 041 */       }
    /* 042 */
    /* 043 */       double project_value10 = -1.0;
    /* 044 */       project_value10 = inputadapter_value + 2.2D;
    /* 045 */       if (false) {
    /* 046 */         project_values[1] = null;
    /* 047 */       } else {
    /* 048 */         project_values[1] = project_value10;
    /* 049 */       }
    /* 050 */
    /* 051 */       /* final ArrayData project_value = 
org.apache.spark.sql.catalyst.util.GenericArrayData.allocate(project_values); */
    /* 052 */       final ArrayData project_value = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
    /* 053 */       this.project_values = null;
    /* 054 */       project_holder.reset();
    /* 055 */
    /* 056 */       project_rowWriter.zeroOutNullBytes();
    /* 057 */
    /* 058 */       if (project_isNull) {
    /* 059 */         project_rowWriter.setNullAt(0);
    /* 060 */       } else {
    /* 061 */         // Remember the current cursor so that we can calculate 
how many bytes are
    /* 062 */         // written later.
    /* 063 */         final int project_tmpCursor = project_holder.cursor;
    /* 064 */
    /* 065 */         if (project_value instanceof UnsafeArrayData) {
    /* 066 */           final int project_sizeInBytes = ((UnsafeArrayData) 
project_value).getSizeInBytes();
    /* 067 */           // grow the global buffer before writing data.
    /* 068 */           project_holder.grow(project_sizeInBytes);
    /* 069 */           ((UnsafeArrayData) 
project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
    /* 070 */           project_holder.cursor += project_sizeInBytes;
    /* 071 */
    /* 072 */         } else {
    /* 073 */           final int project_numElements = 
project_value.numElements();
    /* 074 */           project_arrayWriter.initialize(project_holder, 
project_numElements, 8);
    /* 075 */
    /* 076 */           for (int project_index = 0; project_index < 
project_numElements; project_index++) {
    /* 077 */             if (project_value.isNullAt(project_index)) {
    /* 078 */               project_arrayWriter.setNullAt(project_index);
    /* 079 */             } else {
    /* 080 */               final double project_element = 
project_value.getDouble(project_index);
    /* 081 */               project_arrayWriter.write(project_index, 
project_element);
    /* 082 */             }
    /* 083 */           }
    /* 084 */         }
    /* 085 */
    /* 086 */         project_rowWriter.setOffsetAndSize(0, project_tmpCursor, 
project_holder.cursor - project_tmpCursor);
    /* 087 */         project_rowWriter.alignToWords(project_holder.cursor - 
project_tmpCursor);
    /* 088 */       }
    /* 089 */       project_result.setTotalSize(project_holder.totalSize());
    /* 090 */       append(project_result);
    /* 091 */       if (shouldStop()) return;
    /* 092 */     }
    /* 093 */   }
    ```
    
    Generated code after applying this PR
    ```java
    /* 018 */   public void init(int index, scala.collection.Iterator inputs[]) 
{
    /* 019 */     partitionIndex = index;
    /* 020 */     inputadapter_input = inputs[0];
    /* 021 */     this.project_values = new double[2];
    /* 022 */     project_result = new UnsafeRow(1);
    /* 023 */     this.project_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 
32);
    /* 024 */     this.project_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder,
 1);
    /* 025 */     this.project_arrayWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
    /* 026 */   }
    /* 027 */
    /* 028 */   protected void processNext() throws java.io.IOException {
    /* 029 */     while (inputadapter_input.hasNext()) {
    /* 030 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 031 */       double inputadapter_value = inputadapter_row.getDouble(0);
    /* 032 */
    /* 033 */       double project_value1 = -1.0;
    /* 034 */       project_value1 = inputadapter_value + 1.1D;
    /* 035 */       project_values[0] = project_value1;
    /* 036 */       double project_value4 = -1.0;
    /* 037 */       project_value4 = inputadapter_value + 2.2D;
    /* 038 */       project_values[1] = project_value4;
    /* 039 */       /* final ArrayData project_value = 
org.apache.spark.sql.catalyst.util.GenericArrayData.allocate(project_values); */
    /* 040 */       final ArrayData project_value = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
    /* 041 */       project_holder.reset();
    /* 042 */
    /* 043 */       // Remember the current cursor so that we can calculate how 
many bytes are
    /* 044 */       // written later.
    /* 045 */       final int project_tmpCursor = project_holder.cursor;
    /* 046 */
    /* 047 */       if (project_value instanceof UnsafeArrayData) {
    /* 048 */         final int project_sizeInBytes = ((UnsafeArrayData) 
project_value).getSizeInBytes();
    /* 049 */         // grow the global buffer before writing data.
    /* 050 */         project_holder.grow(project_sizeInBytes);
    /* 051 */         ((UnsafeArrayData) 
project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
    /* 052 */         project_holder.cursor += project_sizeInBytes;
    /* 053 */
    /* 054 */       } else {
    /* 055 */         final int project_numElements = 
project_value.numElements();
    /* 056 */         project_arrayWriter.initialize(project_holder, 
project_numElements, 8);
    /* 057 */
    /* 058 */         for (int project_index = 0; project_index < 
project_numElements; project_index++) {
    /* 059 */           if (project_value.isNullAt(project_index)) {
    /* 060 */             project_arrayWriter.setNullAt(project_index);
    /* 061 */           } else {
    /* 062 */             final double project_element = 
project_value.getDouble(project_index);
    /* 063 */             project_arrayWriter.write(project_index, 
project_element);
    /* 064 */           }
    /* 065 */         }
    /* 066 */       }
    /* 067 */
    /* 068 */       project_rowWriter.setOffsetAndSize(0, project_tmpCursor, 
project_holder.cursor - project_tmpCursor);
    /* 069 */       project_rowWriter.alignToWords(project_holder.cursor - 
project_tmpCursor);
    /* 070 */       project_result.setTotalSize(project_holder.totalSize());
    /* 071 */       append(project_result);
    /* 072 */       if (shouldStop()) return;
    /* 073 */     }
    /* 074 */   }
    ```
    
    ## How was this patch tested?
    
    Added unit tests into ```DataFrameComplexTypeSuite```
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-16213

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13909.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13909
    
----
commit 37e4ce2d09b8233fdefc615296155a3ec5cb6eb6
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-06-26T04:02:49Z

    remove unboxing operations when an array is primitive type array

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to