GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/17568

    [SPARK-20254][SQL] Remove unnecessary data conversion for Dataset with 
primitive array

    ## What changes were proposed in this pull request?
    
    
    This PR elminates unnecessary data conversion, which is introduced by 
SPARK-19716, for Dataset with primitve array in the generated Java code.
    When we run the following example program, now we get the Java code 
"Without this PR". In this code, lines 56-82 are unnecessary since the 
primitive array in ArrayData can be converted into Java primitive array by 
using ``toDoubleArray()`` method. ``GenericArrayData`` is not required.
    
    ```java
    val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
    ds.count
    ds.map(e => e).show
    ```
    
    Without this PR
    ```java
    /* 050 */   protected void processNext() throws java.io.IOException {
    /* 051 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 052 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 053 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 054 */       ArrayData inputadapter_value = inputadapter_isNull ? null : 
(inputadapter_row.getArray(0));
    /* 055 */
    /* 056 */       ArrayData deserializetoobject_value1 = null;
    /* 057 */
    /* 058 */       if (!inputadapter_isNull) {
    /* 059 */         int deserializetoobject_dataLength = 
inputadapter_value.numElements();
    /* 060 */
    /* 061 */         Double[] deserializetoobject_convertedArray = null;
    /* 062 */         deserializetoobject_convertedArray = new 
Double[deserializetoobject_dataLength];
    /* 063 */
    /* 064 */         int deserializetoobject_loopIndex = 0;
    /* 065 */         while (deserializetoobject_loopIndex < 
deserializetoobject_dataLength) {
    /* 066 */           MapObjects_loopValue2 = (double) 
(inputadapter_value.getDouble(deserializetoobject_loopIndex));
    /* 067 */           MapObjects_loopIsNull2 = 
inputadapter_value.isNullAt(deserializetoobject_loopIndex);
    /* 068 */
    /* 069 */           if (MapObjects_loopIsNull2) {
    /* 070 */             throw new RuntimeException(((java.lang.String) 
references[0]));
    /* 071 */           }
    /* 072 */           if (false) {
    /* 073 */             
deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
    /* 074 */           } else {
    /* 075 */             
deserializetoobject_convertedArray[deserializetoobject_loopIndex] = 
MapObjects_loopValue2;
    /* 076 */           }
    /* 077 */
    /* 078 */           deserializetoobject_loopIndex += 1;
    /* 079 */         }
    /* 080 */
    /* 081 */         deserializetoobject_value1 = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray);
 /*###*/
    /* 082 */       }
    /* 083 */       boolean deserializetoobject_isNull = true;
    /* 084 */       double[] deserializetoobject_value = null;
    /* 085 */       if (!inputadapter_isNull) {
    /* 086 */         deserializetoobject_isNull = false;
    /* 087 */         if (!deserializetoobject_isNull) {
    /* 088 */           Object deserializetoobject_funcResult = null;
    /* 089 */           deserializetoobject_funcResult = 
deserializetoobject_value1.toDoubleArray();
    /* 090 */           if (deserializetoobject_funcResult == null) {
    /* 091 */             deserializetoobject_isNull = true;
    /* 092 */           } else {
    /* 093 */             deserializetoobject_value = (double[]) 
deserializetoobject_funcResult;
    /* 094 */           }
    /* 095 */
    /* 096 */         }
    /* 097 */         deserializetoobject_isNull = deserializetoobject_value == 
null;
    /* 098 */       }
    /* 099 */
    /* 100 */       boolean mapelements_isNull = true;
    /* 101 */       double[] mapelements_value = null;
    /* 102 */       if (!false) {
    /* 103 */         mapelements_resultIsNull = false;
    /* 104 */
    /* 105 */         if (!mapelements_resultIsNull) {
    /* 106 */           mapelements_resultIsNull = deserializetoobject_isNull;
    /* 107 */           mapelements_argValue = deserializetoobject_value;
    /* 108 */         }
    /* 109 */
    /* 110 */         mapelements_isNull = mapelements_resultIsNull;
    /* 111 */         if (!mapelements_isNull) {
    /* 112 */           Object mapelements_funcResult = null;
    /* 113 */           mapelements_funcResult = ((scala.Function1) 
references[1]).apply(mapelements_argValue);
    /* 114 */           if (mapelements_funcResult == null) {
    /* 115 */             mapelements_isNull = true;
    /* 116 */           } else {
    /* 117 */             mapelements_value = (double[]) mapelements_funcResult;
    /* 118 */           }
    /* 119 */
    /* 120 */         }
    /* 121 */         mapelements_isNull = mapelements_value == null;
    /* 122 */       }
    /* 123 */
    /* 124 */       serializefromobject_resultIsNull = false;
    /* 125 */
    /* 126 */       if (!serializefromobject_resultIsNull) {
    /* 127 */         serializefromobject_resultIsNull = mapelements_isNull;
    /* 128 */         serializefromobject_argValue = mapelements_value;
    /* 129 */       }
    /* 130 */
    /* 131 */       boolean serializefromobject_isNull = 
serializefromobject_resultIsNull;
    /* 132 */       final ArrayData serializefromobject_value = 
serializefromobject_resultIsNull ? null : 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
    /* 133 */       serializefromobject_isNull = serializefromobject_value == 
null;
    /* 134 */       serializefromobject_holder.reset();
    /* 135 */
    /* 136 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 137 */
    /* 138 */       if (serializefromobject_isNull) {
    /* 139 */         serializefromobject_rowWriter.setNullAt(0);
    /* 140 */       } else {
    /* 141 */         // Remember the current cursor so that we can calculate 
how many bytes are
    /* 142 */         // written later.
    /* 143 */         final int serializefromobject_tmpCursor = 
serializefromobject_holder.cursor;
    /* 144 */
    /* 145 */         if (serializefromobject_value instanceof UnsafeArrayData) 
{
    /* 146 */           final int serializefromobject_sizeInBytes = 
((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
    /* 147 */           // grow the global buffer before writing data.
    /* 148 */           
serializefromobject_holder.grow(serializefromobject_sizeInBytes);
    /* 149 */           ((UnsafeArrayData) 
serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, 
serializefromobject_holder.cursor);
    /* 150 */           serializefromobject_holder.cursor += 
serializefromobject_sizeInBytes;
    /* 151 */
    /* 152 */         } else {
    /* 153 */           final int serializefromobject_numElements = 
serializefromobject_value.numElements();
    /* 154 */           
serializefromobject_arrayWriter.initialize(serializefromobject_holder, 
serializefromobject_numElements, 8);
    /* 155 */
    /* 156 */           for (int serializefromobject_index = 0; 
serializefromobject_index < serializefromobject_numElements; 
serializefromobject_index++) {
    /* 157 */             if 
(serializefromobject_value.isNullAt(serializefromobject_index)) {
    /* 158 */               
serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
    /* 159 */             } else {
    /* 160 */               final double serializefromobject_element = 
serializefromobject_value.getDouble(serializefromobject_index);
    /* 161 */               
serializefromobject_arrayWriter.write(serializefromobject_index, 
serializefromobject_element);
    /* 162 */             }
    /* 163 */           }
    /* 164 */         }
    /* 165 */
    /* 166 */         serializefromobject_rowWriter.setOffsetAndSize(0, 
serializefromobject_tmpCursor, serializefromobject_holder.cursor - 
serializefromobject_tmpCursor);
    /* 167 */       }
    /* 168 */       
serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
    /* 169 */       append(serializefromobject_result);
    /* 170 */       if (shouldStop()) return;
    /* 171 */     }
    /* 172 */   }
    ```
    
    With this PR (eliminated lines 56-62 in the above code)
    ```java
    /* 047 */   protected void processNext() throws java.io.IOException {
    /* 048 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 049 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 050 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
    /* 051 */       ArrayData inputadapter_value = inputadapter_isNull ? null : 
(inputadapter_row.getArray(0));
    /* 052 */
    /* 053 */       boolean deserializetoobject_isNull = true;
    /* 054 */       double[] deserializetoobject_value = null;
    /* 055 */       if (!inputadapter_isNull) {
    /* 056 */         deserializetoobject_isNull = false;
    /* 057 */         if (!deserializetoobject_isNull) {
    /* 058 */           Object deserializetoobject_funcResult = null;
    /* 059 */           deserializetoobject_funcResult = 
inputadapter_value.toDoubleArray();
    /* 060 */           if (deserializetoobject_funcResult == null) {
    /* 061 */             deserializetoobject_isNull = true;
    /* 062 */           } else {
    /* 063 */             deserializetoobject_value = (double[]) 
deserializetoobject_funcResult;
    /* 064 */           }
    /* 065 */
    /* 066 */         }
    /* 067 */         deserializetoobject_isNull = deserializetoobject_value == 
null;
    /* 068 */       }
    /* 069 */
    /* 070 */       boolean mapelements_isNull = true;
    /* 071 */       double[] mapelements_value = null;
    /* 072 */       if (!false) {
    /* 073 */         mapelements_resultIsNull = false;
    /* 074 */
    /* 075 */         if (!mapelements_resultIsNull) {
    /* 076 */           mapelements_resultIsNull = deserializetoobject_isNull;
    /* 077 */           mapelements_argValue = deserializetoobject_value;
    /* 078 */         }
    /* 079 */
    /* 080 */         mapelements_isNull = mapelements_resultIsNull;
    /* 081 */         if (!mapelements_isNull) {
    /* 082 */           Object mapelements_funcResult = null;
    /* 083 */           mapelements_funcResult = ((scala.Function1) 
references[0]).apply(mapelements_argValue);
    /* 084 */           if (mapelements_funcResult == null) {
    /* 085 */             mapelements_isNull = true;
    /* 086 */           } else {
    /* 087 */             mapelements_value = (double[]) mapelements_funcResult;
    /* 088 */           }
    /* 089 */
    /* 090 */         }
    /* 091 */         mapelements_isNull = mapelements_value == null;
    /* 092 */       }
    /* 093 */
    /* 094 */       serializefromobject_resultIsNull = false;
    /* 095 */
    /* 096 */       if (!serializefromobject_resultIsNull) {
    /* 097 */         serializefromobject_resultIsNull = mapelements_isNull;
    /* 098 */         serializefromobject_argValue = mapelements_value;
    /* 099 */       }
    /* 100 */
    /* 101 */       boolean serializefromobject_isNull = 
serializefromobject_resultIsNull;
    /* 102 */       final ArrayData serializefromobject_value = 
serializefromobject_resultIsNull ? null : 
org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
    /* 103 */       serializefromobject_isNull = serializefromobject_value == 
null;
    /* 104 */       serializefromobject_holder.reset();
    /* 105 */
    /* 106 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 107 */
    /* 108 */       if (serializefromobject_isNull) {
    /* 109 */         serializefromobject_rowWriter.setNullAt(0);
    /* 110 */       } else {
    /* 111 */         // Remember the current cursor so that we can calculate 
how many bytes are
    /* 112 */         // written later.
    /* 113 */         final int serializefromobject_tmpCursor = 
serializefromobject_holder.cursor;
    /* 114 */
    /* 115 */         if (serializefromobject_value instanceof UnsafeArrayData) 
{
    /* 116 */           final int serializefromobject_sizeInBytes = 
((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
    /* 117 */           // grow the global buffer before writing data.
    /* 118 */           
serializefromobject_holder.grow(serializefromobject_sizeInBytes);
    /* 119 */           ((UnsafeArrayData) 
serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, 
serializefromobject_holder.cursor);
    /* 120 */           serializefromobject_holder.cursor += 
serializefromobject_sizeInBytes;
    /* 121 */
    /* 122 */         } else {
    /* 123 */           final int serializefromobject_numElements = 
serializefromobject_value.numElements();
    /* 124 */           
serializefromobject_arrayWriter.initialize(serializefromobject_holder, 
serializefromobject_numElements, 8);
    /* 125 */
    /* 126 */           for (int serializefromobject_index = 0; 
serializefromobject_index < serializefromobject_numElements; 
serializefromobject_index++) {
    /* 127 */             if 
(serializefromobject_value.isNullAt(serializefromobject_index)) {
    /* 128 */               
serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
    /* 129 */             } else {
    /* 130 */               final double serializefromobject_element = 
serializefromobject_value.getDouble(serializefromobject_index);
    /* 131 */               
serializefromobject_arrayWriter.write(serializefromobject_index, 
serializefromobject_element);
    /* 132 */             }
    /* 133 */           }
    /* 134 */         }
    /* 135 */
    /* 136 */         serializefromobject_rowWriter.setOffsetAndSize(0, 
serializefromobject_tmpCursor, serializefromobject_holder.cursor - 
serializefromobject_tmpCursor);
    /* 137 */       }
    /* 138 */       
serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
    /* 139 */       append(serializefromobject_result);
    /* 140 */       if (shouldStop()) return;
    /* 141 */     }
    /* 142 */   }
    ```
    
    ## How was this patch tested?
    
    Existing test suites

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-20254

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17568
    
----
commit 6a5fa5abb8ae73eaf2866630af070e0301660149
Author: Kazuaki Ishizaki <[email protected]>
Date:   2017-04-07T17:01:39Z

    initial commit

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to