aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204826928


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SplitUpdateAsDeleteAndInsertExec.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
BindReferences, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsertExec(
+    deleteOutput: Seq[Expression],
+    insertOutput: Seq[Expression],
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryExecNode with CodegenSupport {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"SplitUpdateAsDeleteAndInsertExec${truncatedString(output, "[", ", ", 
"]", maxFields)}"
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions(processPartition)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan 
= {
+    copy(child = newChild)
+  }
+
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this 
plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val exprs = insertOutput ++ deleteOutput
+    val exprIds = exprs.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = exprIds.groupBy(id => id).filter(_._2.size > 
1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {

Review Comment:
   Here is how a sample output looks like.
   
   ```
   Generated code:
   /* 001 */ public Object generate(Object[] references) {
   /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
   /* 003 */ }
   /* 004 */
   /* 005 */ // codegenStageId=1
   /* 006 */ final class GeneratedIteratorForCodegenStage1 extends 
org.apache.spark.sql.execution.BufferedRowIterator {
   /* 007 */   private Object[] references;
   /* 008 */   private scala.collection.Iterator[] inputs;
   /* 009 */   private scala.collection.Iterator inputadapter_input_0;
   /* 010 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
project_mutableStateArray_0 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[6];
   /* 011 */
   /* 012 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
   /* 013 */     this.references = references;
   /* 014 */   }
   /* 015 */
   /* 016 */   public void init(int index, scala.collection.Iterator[] inputs) {
   /* 017 */     partitionIndex = index;
   /* 018 */     this.inputs = inputs;
   /* 019 */     inputadapter_input_0 = inputs[0];
   /* 020 */     project_mutableStateArray_0[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(7, 64);
   /* 021 */     project_mutableStateArray_0[1] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[0],
 0);
   /* 022 */     project_mutableStateArray_0[2] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 64);
   /* 023 */     project_mutableStateArray_0[3] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[2],
 0);
   /* 024 */     project_mutableStateArray_0[4] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 64);
   /* 025 */     project_mutableStateArray_0[5] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[4],
 0);
   /* 026 */
   /* 027 */   }
   /* 028 */
   /* 029 */   protected void processNext() throws java.io.IOException {
   /* 030 */     while ( inputadapter_input_0.hasNext()) {
   /* 031 */       InternalRow inputadapter_row_0 = (InternalRow) 
inputadapter_input_0.next();
   /* 032 */
   /* 033 */       // common sub-expressions
   /* 034 */
   /* 035 */       boolean inputadapter_isNull_1 = 
inputadapter_row_0.isNullAt(1);
   /* 036 */       int inputadapter_value_1 = inputadapter_isNull_1 ?
   /* 037 */       -1 : (inputadapter_row_0.getInt(1));
   /* 038 */
   /* 039 */       // generate DELETE record
   /* 040 */
   /* 041 */       UTF8String inputadapter_value_3 = 
inputadapter_row_0.getUTF8String(3);
   /* 042 */       long inputadapter_value_4 = inputadapter_row_0.getLong(4);
   /* 043 */       int inputadapter_value_5 = inputadapter_row_0.getInt(5);
   /* 044 */       boolean inputadapter_isNull_6 = 
inputadapter_row_0.isNullAt(6);
   /* 045 */       InternalRow inputadapter_value_6 = inputadapter_isNull_6 ?
   /* 046 */       null : (inputadapter_row_0.getStruct(6, 0));
   /* 047 */       project_mutableStateArray_0[2].reset();
   /* 048 */
   /* 049 */       project_mutableStateArray_0[2].zeroOutNullBytes();
   /* 050 */
   /* 051 */       project_mutableStateArray_0[2].write(0, 1);
   /* 052 */
   /* 053 */       if (true) {
   /* 054 */         project_mutableStateArray_0[2].setNullAt(1);
   /* 055 */       } else {
   /* 056 */         project_mutableStateArray_0[2].write(1, -1);
   /* 057 */       }
   /* 058 */
   /* 059 */       if (true) {
   /* 060 */         project_mutableStateArray_0[2].setNullAt(2);
   /* 061 */       } else {
   /* 062 */         project_mutableStateArray_0[2].write(2, -1);
   /* 063 */       }
   /* 064 */
   /* 065 */       if (true) {
   /* 066 */         project_mutableStateArray_0[2].setNullAt(3);
   /* 067 */       } else {
   /* 068 */         project_mutableStateArray_0[2].write(3, -1);
   /* 069 */       }
   /* 070 */
   /* 071 */       if (false) {
   /* 072 */         project_mutableStateArray_0[2].setNullAt(4);
   /* 073 */       } else {
   /* 074 */         project_mutableStateArray_0[2].write(4, 
inputadapter_value_3);
   /* 075 */       }
   /* 076 */
   /* 077 */       if (false) {
   /* 078 */         project_mutableStateArray_0[2].setNullAt(5);
   /* 079 */       } else {
   /* 080 */         project_mutableStateArray_0[2].write(5, 
inputadapter_value_4);
   /* 081 */       }
   /* 082 */
   /* 083 */       if (false) {
   /* 084 */         project_mutableStateArray_0[2].setNullAt(6);
   /* 085 */       } else {
   /* 086 */         project_mutableStateArray_0[2].write(6, 
inputadapter_value_5);
   /* 087 */       }
   /* 088 */
   /* 089 */       if (inputadapter_isNull_6) {
   /* 090 */         project_mutableStateArray_0[2].setNullAt(7);
   /* 091 */       } else {
   /* 092 */         final InternalRow updaterows_tmpInput_0 = 
inputadapter_value_6;
   /* 093 */         if (updaterows_tmpInput_0 instanceof UnsafeRow) {
   /* 094 */           project_mutableStateArray_0[2].write(7, (UnsafeRow) 
updaterows_tmpInput_0);
   /* 095 */         } else {
   /* 096 */           // Remember the current cursor so that we can calculate 
how many bytes are
   /* 097 */           // written later.
   /* 098 */           final int updaterows_previousCursor_0 = 
project_mutableStateArray_0[2].cursor();
   /* 099 */
   /* 100 */           project_mutableStateArray_0[3].resetRowWriter();
   /* 101 */
   /* 102 */           
project_mutableStateArray_0[2].setOffsetAndSizeFromPreviousCursor(7, 
updaterows_previousCursor_0);
   /* 103 */         }
   /* 104 */       }
   /* 105 */       append((project_mutableStateArray_0[2].getRow()));
   /* 106 */
   /* 107 */       // generate INSERT records
   /* 108 */
   /* 109 */       boolean updaterows_isNull_8 = true;
   /* 110 */       int updaterows_value_8 = -1;
   /* 111 */
   /* 112 */       if (!inputadapter_isNull_1) {
   /* 113 */         updaterows_isNull_8 = false; // resultCode could change 
nullability.
   /* 114 */
   /* 115 */         updaterows_value_8 = inputadapter_value_1 - 11;
   /* 116 */
   /* 117 */       }
   /* 118 */
   /* 119 */       boolean inputadapter_isNull_0 = 
inputadapter_row_0.isNullAt(0);
   /* 120 */       int inputadapter_value_0 = inputadapter_isNull_0 ?
   /* 121 */       -1 : (inputadapter_row_0.getInt(0));
   /* 122 */       project_mutableStateArray_0[4].reset();
   /* 123 */
   /* 124 */       project_mutableStateArray_0[4].zeroOutNullBytes();
   /* 125 */
   /* 126 */       project_mutableStateArray_0[4].write(0, 3);
   /* 127 */
   /* 128 */       if (inputadapter_isNull_0) {
   /* 129 */         project_mutableStateArray_0[4].setNullAt(1);
   /* 130 */       } else {
   /* 131 */         project_mutableStateArray_0[4].write(1, 
inputadapter_value_0);
   /* 132 */       }
   /* 133 */
   /* 134 */       if (updaterows_isNull_8) {
   /* 135 */         project_mutableStateArray_0[4].setNullAt(2);
   /* 136 */       } else {
   /* 137 */         project_mutableStateArray_0[4].write(2, 
updaterows_value_8);
   /* 138 */       }
   /* 139 */
   /* 140 */       if (updaterows_isNull_8) {
   /* 141 */         project_mutableStateArray_0[4].setNullAt(3);
   /* 142 */       } else {
   /* 143 */         project_mutableStateArray_0[4].write(3, 
updaterows_value_8);
   /* 144 */       }
   /* 145 */
   /* 146 */       if (true) {
   /* 147 */         project_mutableStateArray_0[4].setNullAt(4);
   /* 148 */       } else {
   /* 149 */         project_mutableStateArray_0[4].write(4, 
((UTF8String)null));
   /* 150 */       }
   /* 151 */
   /* 152 */       if (true) {
   /* 153 */         project_mutableStateArray_0[4].setNullAt(5);
   /* 154 */       } else {
   /* 155 */         project_mutableStateArray_0[4].write(5, -1L);
   /* 156 */       }
   /* 157 */
   /* 158 */       if (true) {
   /* 159 */         project_mutableStateArray_0[4].setNullAt(6);
   /* 160 */       } else {
   /* 161 */         project_mutableStateArray_0[4].write(6, -1);
   /* 162 */       }
   /* 163 */
   /* 164 */       if (true) {
   /* 165 */         project_mutableStateArray_0[4].setNullAt(7);
   /* 166 */       } else {
   /* 167 */         final InternalRow wholestagecodegen_tmpInput_0 = 
((InternalRow)null);
   /* 168 */         if (wholestagecodegen_tmpInput_0 instanceof UnsafeRow) {
   /* 169 */           project_mutableStateArray_0[4].write(7, (UnsafeRow) 
wholestagecodegen_tmpInput_0);
   /* 170 */         } else {
   /* 171 */           // Remember the current cursor so that we can calculate 
how many bytes are
   /* 172 */           // written later.
   /* 173 */           final int wholestagecodegen_previousCursor_0 = 
project_mutableStateArray_0[4].cursor();
   /* 174 */
   /* 175 */           project_mutableStateArray_0[5].resetRowWriter();
   /* 176 */
   /* 177 */           
project_mutableStateArray_0[4].setOffsetAndSizeFromPreviousCursor(7, 
wholestagecodegen_previousCursor_0);
   /* 178 */         }
   /* 179 */       }
   /* 180 */       append((project_mutableStateArray_0[4].getRow()));
   /* 181 */       if (shouldStop()) return;
   /* 182 */     }
   /* 183 */   }
   /* 184 */
   /* 185 */ }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to