HeartSaVioR opened a new pull request #26173: [SPARK-29503][SQL] Copy result 
row from RowWriter in GenerateUnsafeProjection when the expression is 
lambdaFunction in MapObject
URL: https://github.com/apache/spark/pull/26173
 
 
   ### What changes were proposed in this pull request?
   
   There's a case where MapObjects has a lambda function which creates nested 
struct - unsafe data in safe data struct. In this case, MapObjects doesn't copy 
the row returned from lambda function (as outmost data type is safe data 
struct), which misses copying nested unsafe data.
   
   Ideally, MapObjects should check the type of lambda function recursively and 
call `copy()` to all nested unsafe data, but given the case is not easy to 
encounter, we may just handle only discovered case (in JIRA issue) as 
compromised solution via copying result row from RowWriter in 
GenerateUnsafeProject when the expression comes from lambda function of 
MapObjects.
   
   Before the patch:
   
   ```
   /* 105 */   private ArrayData MapObjects_0(InternalRow i) {
   /* 106 */     boolean isNull_1 = i.isNullAt(0);
   /* 107 */     ArrayData value_1 = isNull_1 ?
   /* 108 */     null : (i.getArray(0));
   /* 109 */     ArrayData value_0 = null;
   /* 110 */
   /* 111 */     if (!isNull_1) {
   /* 112 */
   /* 113 */       int dataLength_0 = value_1.numElements();
   /* 114 */
   /* 115 */       ArrayData[] convertedArray_0 = null;
   /* 116 */       convertedArray_0 = new ArrayData[dataLength_0];
   /* 117 */
   /* 118 */
   /* 119 */       int loopIndex_0 = 0;
   /* 120 */
   /* 121 */       while (loopIndex_0 < dataLength_0) {
   /* 122 */         value_MapObject_lambda_variable_1 = (int) 
(value_1.getInt(loopIndex_0));
   /* 123 */         isNull_MapObject_lambda_variable_1 = 
value_1.isNullAt(loopIndex_0);
   /* 124 */
   /* 125 */         ArrayData arrayData_0 = ArrayData.allocateArrayData(
   /* 126 */           -1, 1L, " createArray failed.");
   /* 127 */
   /* 128 */         mutableStateArray_0[0].reset();
   /* 129 */
   /* 130 */
   /* 131 */         mutableStateArray_0[0].zeroOutNullBytes();
   /* 132 */
   /* 133 */
   /* 134 */         if (isNull_MapObject_lambda_variable_1) {
   /* 135 */           mutableStateArray_0[0].setNullAt(0);
   /* 136 */         } else {
   /* 137 */           mutableStateArray_0[0].write(0, 
value_MapObject_lambda_variable_1);
   /* 138 */         }
   /* 139 */         arrayData_0.update(0, (mutableStateArray_0[0].getRow()));
   /* 140 */         if (false) {
   /* 141 */           convertedArray_0[loopIndex_0] = null;
   /* 142 */         } else {
   /* 143 */           convertedArray_0[loopIndex_0] = arrayData_0 instanceof 
UnsafeArrayData? arrayData_0.copy() : arrayData_0;
   /* 144 */         }
   /* 145 */
   /* 146 */         loopIndex_0 += 1;
   /* 147 */       }
   /* 148 */
   /* 149 */       value_0 = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray_0);
   /* 150 */     }
   /* 151 */     globalIsNull_0 = isNull_1;
   /* 152 */     return value_0;
   /* 153 */   }
   ```
   
   After the patch:
   
   ```
   /* 105 */   private ArrayData MapObjects_0(InternalRow i) {
   /* 106 */     boolean isNull_1 = i.isNullAt(0);
   /* 107 */     ArrayData value_1 = isNull_1 ?
   /* 108 */     null : (i.getArray(0));
   /* 109 */     ArrayData value_0 = null;
   /* 110 */
   /* 111 */     if (!isNull_1) {
   /* 112 */
   /* 113 */       int dataLength_0 = value_1.numElements();
   /* 114 */
   /* 115 */       ArrayData[] convertedArray_0 = null;
   /* 116 */       convertedArray_0 = new ArrayData[dataLength_0];
   /* 117 */
   /* 118 */
   /* 119 */       int loopIndex_0 = 0;
   /* 120 */
   /* 121 */       while (loopIndex_0 < dataLength_0) {
   /* 122 */         value_MapObject_lambda_variable_1 = (int) 
(value_1.getInt(loopIndex_0));
   /* 123 */         isNull_MapObject_lambda_variable_1 = 
value_1.isNullAt(loopIndex_0);
   /* 124 */
   /* 125 */         ArrayData arrayData_0 = ArrayData.allocateArrayData(
   /* 126 */           -1, 1L, " createArray failed.");
   /* 127 */
   /* 128 */         mutableStateArray_0[0].reset();
   /* 129 */
   /* 130 */
   /* 131 */         mutableStateArray_0[0].zeroOutNullBytes();
   /* 132 */
   /* 133 */
   /* 134 */         if (isNull_MapObject_lambda_variable_1) {
   /* 135 */           mutableStateArray_0[0].setNullAt(0);
   /* 136 */         } else {
   /* 137 */           mutableStateArray_0[0].write(0, 
value_MapObject_lambda_variable_1);
   /* 138 */         }
   /* 139 */         arrayData_0.update(0, 
(mutableStateArray_0[0].getRow().copy()));
   /* 140 */         if (false) {
   /* 141 */           convertedArray_0[loopIndex_0] = null;
   /* 142 */         } else {
   /* 143 */           convertedArray_0[loopIndex_0] = arrayData_0 instanceof 
UnsafeArrayData? arrayData_0.copy() : arrayData_0;
   /* 144 */         }
   /* 145 */
   /* 146 */         loopIndex_0 += 1;
   /* 147 */       }
   /* 148 */
   /* 149 */       value_0 = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray_0);
   /* 150 */     }
   /* 151 */     globalIsNull_0 = isNull_1;
   /* 152 */     return value_0;
   /* 153 */   }
   ```
   
   ### Why are the changes needed?
   
   This patch fixes the bug described above.
   
   ### Does this PR introduce any user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   UT added which fails on master branch and passes on PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to