JoshRosen commented on issue #25745: [SPARK-29033][SQL][WIP] Always use UnsafeRow-based version of CreateNamedStruct URL: https://github.com/apache/spark/pull/25745#issuecomment-530006567 As a high-level illustration of why I think this _might_ improve performance, compare the following before-and-after excerpts from running the following in `spark-shell`: ``` import org.apache.spark.sql.execution.debug._ spark.sql("select struct(id, id) from range(100)").debugCodegen ``` **Before:** ```java /* 021 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 022 */ partitionIndex = index; /* 023 */ this.inputs = inputs; /* 024 */ /* 025 */ range_taskContext_0 = TaskContext.get(); /* 026 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics(); /* 027 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 028 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 029 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32); /* 030 */ range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_mutableStateArray_0[2], 2); /* 031 */ /* 032 */ } /* 033 */ /* 034 */ private void project_doConsume_0(long project_expr_0_0) throws java.io.IOException { /* 035 */ Object[] project_values_0 = new Object[2]; /* 036 */ /* 037 */ if (false) { /* 038 */ project_values_0[0] = null; /* 039 */ } else { /* 040 */ project_values_0[0] = project_expr_0_0; /* 041 */ } /* 042 */ /* 043 */ if (false) { /* 044 */ project_values_0[1] = null; /* 045 */ } else { /* 046 */ project_values_0[1] = project_expr_0_0; /* 047 */ } /* 048 */ /* 049 */ final InternalRow project_value_1 = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(project_values_0); /* 050 */ project_values_0 = null; /* 051 */ range_mutableStateArray_0[2].reset(); /* 052 */ /* 053 */ final InternalRow project_tmpInput_0 = project_value_1; /* 054 */ if (project_tmpInput_0 instanceof UnsafeRow) { /* 055 */ range_mutableStateArray_0[2].write(0, (UnsafeRow) project_tmpInput_0); /* 056 */ } else { /* 057 */ // Remember the current cursor so that we can calculate how many bytes are /* 058 */ // written later. /* 059 */ final int project_previousCursor_0 = range_mutableStateArray_0[2].cursor(); /* 060 */ /* 061 */ range_mutableStateArray_0[3].resetRowWriter(); /* 062 */ /* 063 */ range_mutableStateArray_0[3].write(0, (project_tmpInput_0.getLong(0))); /* 064 */ /* 065 */ range_mutableStateArray_0[3].write(1, (project_tmpInput_0.getLong(1))); /* 066 */ /* 067 */ range_mutableStateArray_0[2].setOffsetAndSizeFromPreviousCursor(0, project_previousCursor_0); /* 068 */ } /* 069 */ append((range_mutableStateArray_0[2].getRow())); /* 070 */ /* 071 */ } ``` **After:** ```java /* 021 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 022 */ partitionIndex = index; /* 023 */ this.inputs = inputs; /* 024 */ /* 025 */ range_taskContext_0 = TaskContext.get(); /* 026 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics(); /* 027 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 028 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 029 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0); /* 030 */ range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32); /* 031 */ range_mutableStateArray_0[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_mutableStateArray_0[3], 2); /* 032 */ /* 033 */ } /* 034 */ /* 035 */ private void project_doConsume_0(long project_expr_0_0) throws java.io.IOException { /* 036 */ range_mutableStateArray_0[2].reset(); /* 037 */ /* 038 */ range_mutableStateArray_0[2].write(0, project_expr_0_0); /* 039 */ /* 040 */ range_mutableStateArray_0[2].write(1, project_expr_0_0); /* 041 */ range_mutableStateArray_0[3].reset(); /* 042 */ /* 043 */ final InternalRow project_tmpInput_0 = (range_mutableStateArray_0[2].getRow()); /* 044 */ if (project_tmpInput_0 instanceof UnsafeRow) { /* 045 */ range_mutableStateArray_0[3].write(0, (UnsafeRow) project_tmpInput_0); /* 046 */ } else { /* 047 */ // Remember the current cursor so that we can calculate how many bytes are /* 048 */ // written later. /* 049 */ final int project_previousCursor_0 = range_mutableStateArray_0[3].cursor(); /* 050 */ /* 051 */ range_mutableStateArray_0[4].resetRowWriter(); /* 052 */ /* 053 */ range_mutableStateArray_0[4].write(0, (project_tmpInput_0.getLong(0))); /* 054 */ /* 055 */ range_mutableStateArray_0[4].write(1, (project_tmpInput_0.getLong(1))); /* 056 */ /* 057 */ range_mutableStateArray_0[3].setOffsetAndSizeFromPreviousCursor(0, project_previousCursor_0); /* 058 */ } /* 059 */ append((range_mutableStateArray_0[3].getRow())); /* 060 */ /* 061 */ } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
