HeartSaVioR opened a new pull request #26173: [SPARK-29503][SQL] Copy result row from RowWriter in GenerateUnsafeProjection when the expression is lambdaFunction in MapObject URL: https://github.com/apache/spark/pull/26173 ### What changes were proposed in this pull request? There's a case where MapObjects has a lambda function which creates nested struct - unsafe data in safe data struct. In this case, MapObjects doesn't copy the row returned from lambda function (as outmost data type is safe data struct), which misses copying nested unsafe data. Ideally, MapObjects should check the type of lambda function recursively and call `copy()` to all nested unsafe data, but given the case is not easy to encounter, we may just handle only discovered case (in JIRA issue) as compromised solution via copying result row from RowWriter in GenerateUnsafeProject when the expression comes from lambda function of MapObjects. Before the patch: ``` /* 105 */ private ArrayData MapObjects_0(InternalRow i) { /* 106 */ boolean isNull_1 = i.isNullAt(0); /* 107 */ ArrayData value_1 = isNull_1 ? /* 108 */ null : (i.getArray(0)); /* 109 */ ArrayData value_0 = null; /* 110 */ /* 111 */ if (!isNull_1) { /* 112 */ /* 113 */ int dataLength_0 = value_1.numElements(); /* 114 */ /* 115 */ ArrayData[] convertedArray_0 = null; /* 116 */ convertedArray_0 = new ArrayData[dataLength_0]; /* 117 */ /* 118 */ /* 119 */ int loopIndex_0 = 0; /* 120 */ /* 121 */ while (loopIndex_0 < dataLength_0) { /* 122 */ value_MapObject_lambda_variable_1 = (int) (value_1.getInt(loopIndex_0)); /* 123 */ isNull_MapObject_lambda_variable_1 = value_1.isNullAt(loopIndex_0); /* 124 */ /* 125 */ ArrayData arrayData_0 = ArrayData.allocateArrayData( /* 126 */ -1, 1L, " createArray failed."); /* 127 */ /* 128 */ mutableStateArray_0[0].reset(); /* 129 */ /* 130 */ /* 131 */ mutableStateArray_0[0].zeroOutNullBytes(); /* 132 */ /* 133 */ /* 134 */ if (isNull_MapObject_lambda_variable_1) { /* 135 */ mutableStateArray_0[0].setNullAt(0); /* 136 */ } else { /* 137 */ mutableStateArray_0[0].write(0, value_MapObject_lambda_variable_1); /* 138 */ } /* 139 */ arrayData_0.update(0, (mutableStateArray_0[0].getRow())); /* 140 */ if (false) { /* 141 */ convertedArray_0[loopIndex_0] = null; /* 142 */ } else { /* 143 */ convertedArray_0[loopIndex_0] = arrayData_0 instanceof UnsafeArrayData? arrayData_0.copy() : arrayData_0; /* 144 */ } /* 145 */ /* 146 */ loopIndex_0 += 1; /* 147 */ } /* 148 */ /* 149 */ value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray_0); /* 150 */ } /* 151 */ globalIsNull_0 = isNull_1; /* 152 */ return value_0; /* 153 */ } ``` After the patch: ``` /* 105 */ private ArrayData MapObjects_0(InternalRow i) { /* 106 */ boolean isNull_1 = i.isNullAt(0); /* 107 */ ArrayData value_1 = isNull_1 ? /* 108 */ null : (i.getArray(0)); /* 109 */ ArrayData value_0 = null; /* 110 */ /* 111 */ if (!isNull_1) { /* 112 */ /* 113 */ int dataLength_0 = value_1.numElements(); /* 114 */ /* 115 */ ArrayData[] convertedArray_0 = null; /* 116 */ convertedArray_0 = new ArrayData[dataLength_0]; /* 117 */ /* 118 */ /* 119 */ int loopIndex_0 = 0; /* 120 */ /* 121 */ while (loopIndex_0 < dataLength_0) { /* 122 */ value_MapObject_lambda_variable_1 = (int) (value_1.getInt(loopIndex_0)); /* 123 */ isNull_MapObject_lambda_variable_1 = value_1.isNullAt(loopIndex_0); /* 124 */ /* 125 */ ArrayData arrayData_0 = ArrayData.allocateArrayData( /* 126 */ -1, 1L, " createArray failed."); /* 127 */ /* 128 */ mutableStateArray_0[0].reset(); /* 129 */ /* 130 */ /* 131 */ mutableStateArray_0[0].zeroOutNullBytes(); /* 132 */ /* 133 */ /* 134 */ if (isNull_MapObject_lambda_variable_1) { /* 135 */ mutableStateArray_0[0].setNullAt(0); /* 136 */ } else { /* 137 */ mutableStateArray_0[0].write(0, value_MapObject_lambda_variable_1); /* 138 */ } /* 139 */ arrayData_0.update(0, (mutableStateArray_0[0].getRow().copy())); /* 140 */ if (false) { /* 141 */ convertedArray_0[loopIndex_0] = null; /* 142 */ } else { /* 143 */ convertedArray_0[loopIndex_0] = arrayData_0 instanceof UnsafeArrayData? arrayData_0.copy() : arrayData_0; /* 144 */ } /* 145 */ /* 146 */ loopIndex_0 += 1; /* 147 */ } /* 148 */ /* 149 */ value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray_0); /* 150 */ } /* 151 */ globalIsNull_0 = isNull_1; /* 152 */ return value_0; /* 153 */ } ``` ### Why are the changes needed? This patch fixes the bug described above. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? UT added which fails on master branch and passes on PR.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
