GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/13704

    [SPARK-15985][SQL] Reduce runtime overhead of a program that reads an 
primitive array in Dataset

    ## What changes were proposed in this pull request?
    
    This PR reduces runtime overhead of a program the reads an primitive array 
in Dataset. Generated code copies array elements from Dataset to a temporary 
array. If we know that types of source and destination are primitive array, we 
apply one of the following optimization:
    
    1. Eliminate an allocation of ```Object[]``` and call 
```ArrayData.to<Type>Array()``` method if types of source and destination are 
the same
    2. Eliminate a pair of ```isNullAt()``` and a ```null``` assignment and 
allocate an primitive array instead of ```Object[]``` if types of source and 
destination are different
    
    
    An example program
    ```
    val ds = Seq(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0)).toDS()
    ds.map(p => {
         var s = 0.0
         for (i <- 0 to 2) { s += p(i) }
         s
       }).show
    ```
    
    Generated code before applying this PR
    ```
    /* 036 */   protected void processNext() throws java.io.IOException {
    /* 037 */     while (inputadapter_input.hasNext()) {
    /* 038 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 039 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 040 */       ArrayData inputadapter_value = inputadapter_isNull ? null : 
(inputadapter_row.getArray(0));
    /* 041 */
    /* 042 */       boolean deserializetoobject_isNull1 = inputadapter_isNull;
    /* 043 */       ArrayData deserializetoobject_value1 = null;
    /* 044 */       if (!inputadapter_isNull) {
    /* 045 */         final int deserializetoobject_n = 
inputadapter_value.numElements();
    /* 046 */         final Object[] deserializetoobject_values = new 
Object[deserializetoobject_n];
    /* 047 */         for (int deserializetoobject_j = 0; deserializetoobject_j 
< deserializetoobject_n; deserializetoobject_j ++) {
    /* 048 */           if (inputadapter_value.isNullAt(deserializetoobject_j)) 
{
    /* 049 */             deserializetoobject_values[deserializetoobject_j] = 
null;
    /* 050 */           } else {
    /* 051 */             boolean deserializetoobject_feNull = false;
    /* 052 */             double deserializetoobject_fePrim =
    /* 053 */             inputadapter_value.getDouble(deserializetoobject_j);
    /* 054 */
    /* 055 */             boolean deserializetoobject_teNull = 
deserializetoobject_feNull;
    /* 056 */             double deserializetoobject_tePrim = -1.0;
    /* 057 */             if (!deserializetoobject_feNull) {
    /* 058 */               deserializetoobject_tePrim = 
deserializetoobject_fePrim;
    /* 059 */             }
    /* 060 */
    /* 061 */             if (deserializetoobject_teNull) {
    /* 062 */               deserializetoobject_values[deserializetoobject_j] = 
null;
    /* 063 */             } else {
    /* 064 */               deserializetoobject_values[deserializetoobject_j] = 
deserializetoobject_tePrim;
    /* 065 */             }
    /* 066 */           }
    /* 067 */         }
    /* 068 */         deserializetoobject_value1 = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_values);
    /* 069 */
    /* 070 */       }
    /* 071 */
    /* 072 */       boolean deserializetoobject_isNull = 
deserializetoobject_isNull1;
    /* 073 */       final double[] deserializetoobject_value = 
deserializetoobject_isNull ? null : (double[]) 
deserializetoobject_value1.toDoubleArray();
    /* 074 */       deserializetoobject_isNull = deserializetoobject_value == 
null;
    /* 075 */
    /* 076 */       Object mapelements_obj = ((Expression) 
references[0]).eval(null);
    /* 077 */       scala.Function1 mapelements_value1 = (scala.Function1) 
mapelements_obj;
    /* 078 */
    /* 079 */       boolean mapelements_isNull = false || 
deserializetoobject_isNull;
    /* 080 */       final double mapelements_value = mapelements_isNull ? -1.0 
: (Double) mapelements_value1.apply(deserializetoobject_value);
    /* 081 */
    /* 082 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 083 */
    /* 084 */       if (mapelements_isNull) {
    /* 085 */         serializefromobject_rowWriter.setNullAt(0);
    /* 086 */       } else {
    /* 087 */         serializefromobject_rowWriter.write(0, mapelements_value);
    /* 088 */       }
    /* 089 */       append(serializefromobject_result);
    /* 090 */       if (shouldStop()) return;
    /* 091 */     }
    /* 092 */   }
    ```
    
    Generated code after applying this PR
    ```
    /* 036 */   protected void processNext() throws java.io.IOException {
    /* 037 */     while (inputadapter_input.hasNext()) {
    /* 038 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 039 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 040 */       ArrayData inputadapter_value = inputadapter_isNull ? null : 
(inputadapter_row.getArray(0));
    /* 041 */
    /* 042 */       boolean deserializetoobject_isNull1 = inputadapter_isNull;
    /* 043 */       ArrayData deserializetoobject_value1 = null;
    /* 044 */       if (!inputadapter_isNull) {
    /* 045 */         final double[] deserializetoobject_values = 
inputadapter_value.toDoubleArray();
    /* 046 */         deserializetoobject_value1 = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_values);
    /* 047 */
    /* 048 */       }
    /* 049 */
    /* 050 */       boolean deserializetoobject_isNull = 
deserializetoobject_isNull1;
    /* 051 */       final double[] deserializetoobject_value = 
deserializetoobject_isNull ? null : (double[]) 
deserializetoobject_value1.toDoubleArray();
    /* 052 */       deserializetoobject_isNull = deserializetoobject_value == 
null;
    /* 053 */
    /* 054 */       Object mapelements_obj = ((Expression) 
references[0]).eval(null);
    /* 055 */       scala.Function1 mapelements_value1 = (scala.Function1) 
mapelements_obj;
    /* 056 */
    /* 057 */       boolean mapelements_isNull = false || 
deserializetoobject_isNull;
    /* 058 */       final double mapelements_value = mapelements_isNull ? -1.0 
: (Double) mapelements_value1.apply(deserializetoobject_value);
    /* 059 */
    /* 060 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 061 */
    /* 062 */       if (mapelements_isNull) {
    /* 063 */         serializefromobject_rowWriter.setNullAt(0);
    /* 064 */       } else {
    /* 065 */         serializefromobject_rowWriter.write(0, mapelements_value);
    /* 066 */       }
    /* 067 */       append(serializefromobject_result);
    /* 068 */       if (shouldStop()) return;
    /* 069 */     }
    /* 070 */   }
    ```
    
    
    ## How was this patch tested?
    
    Tested by existing Dataset unit tests
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-15985

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13704.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13704
    
----
commit 17f17d60794c1f0ab81e21ec2484742a7610f06d
Author: Kazuaki Ishizaki <[email protected]>
Date:   2016-06-16T07:46:27Z

    optimize to read primitive array elements in Dataset

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to