GitHub user heary-cao opened a pull request:

    https://github.com/apache/spark/pull/20820

    [SPARK-23676][SQL]Support left join codegen in SortMergeJoinExec

    ## What changes were proposed in this pull request?
    
    This PR generates java code to directly complete the function of LeftOuter 
in `SortMergeJoinExec` without using an iterator. 
    This PR improves runtime performance by this generates java code.
    
    joinBenchmark result: **1.3x**
    ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
    Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
    left sort merge join:              Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
    
------------------------------------------------------------------------------------------
    left merge join wholestage=off          2439 / 2575          0.9        
1163.0       1.0X
    left merge join wholestage=on           1890 / 1904          1.1         
901.1       1.3X
    ```
    joinBenchmark program
    ```
        val N = 2 << 20
        runBenchmark("left sort merge join", N) {
          val df1 = sparkSession.range(N)
            .selectExpr(s"(id * 15485863) % ${N*10} as k1")
          val df2 = sparkSession.range(N)
            .selectExpr(s"(id * 15485867) % ${N*10} as k2")
          val df = df1.join(df2, col("k1") === col("k2"), "left")
          
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
          df.count()
    ```
    code example
    ```
    val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
    val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
    df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), 
"left").collect
    ```
    Generated code
    ```
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIteratorForCodegenStage5(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ // codegenStageId=5
    /* 006 */ final class GeneratedIteratorForCodegenStage5 extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */   private Object[] references;
    /* 008 */   private scala.collection.Iterator[] inputs;
    /* 009 */   private scala.collection.Iterator smj_leftInput;
    /* 010 */   private scala.collection.Iterator smj_rightInput;
    /* 011 */   private InternalRow smj_leftRow;
    /* 012 */   private InternalRow smj_rightRow;
    /* 013 */   private long smj_value2;
    /* 014 */   private 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
    /* 015 */   private long smj_value3;
    /* 016 */   private long smj_value4;
    /* 017 */   private long smj_value5;
    /* 018 */   private long smj_value6;
    /* 019 */   private boolean smj_isNull2;
    /* 020 */   private long smj_value7;
    /* 021 */   private boolean smj_isNull3;
    /* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
smj_mutableStateArray1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
    /* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
smj_mutableStateArray2 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
    /* 024 */   private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
    /* 025 */
    /* 026 */   public GeneratedIteratorForCodegenStage5(Object[] references) {
    /* 027 */     this.references = references;
    /* 028 */   }
    /* 029 */
    /* 030 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 031 */     partitionIndex = index;
    /* 032 */     this.inputs = inputs;
    /* 033 */     smj_leftInput = inputs[0];
    /* 034 */     smj_rightInput = inputs[1];
    /* 035 */
    /* 036 */     smj_matches = new 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 
2147483647);
    /* 037 */     smj_mutableStateArray[0] = new UnsafeRow(4);
    /* 038 */     smj_mutableStateArray1[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
 0);
    /* 039 */     smj_mutableStateArray2[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
 4);
    /* 040 */
    /* 041 */   }
    /* 042 */
    /* 043 */   private void writeJoinRows() throws java.io.IOException {
    /* 044 */     smj_mutableStateArray2[0].zeroOutNullBytes();
    /* 045 */
    /* 046 */     smj_mutableStateArray2[0].write(0, smj_value4);
    /* 047 */
    /* 048 */     smj_mutableStateArray2[0].write(1, smj_value5);
    /* 049 */
    /* 050 */     if (smj_isNull2) {
    /* 051 */       smj_mutableStateArray2[0].setNullAt(2);
    /* 052 */     } else {
    /* 053 */       smj_mutableStateArray2[0].write(2, smj_value6);
    /* 054 */     }
    /* 055 */
    /* 056 */     if (smj_isNull3) {
    /* 057 */       smj_mutableStateArray2[0].setNullAt(3);
    /* 058 */     } else {
    /* 059 */       smj_mutableStateArray2[0].write(3, smj_value7);
    /* 060 */     }
    /* 061 */     append(smj_mutableStateArray[0].copy());
    /* 062 */
    /* 063 */   }
    /* 064 */
    /* 065 */   private boolean findNextJoinRows(
    /* 066 */     scala.collection.Iterator leftIter,
    /* 067 */     scala.collection.Iterator rightIter) {
    /* 068 */     smj_leftRow = null;
    /* 069 */     int comp = 0;
    /* 070 */     while (smj_leftRow == null) {
    /* 071 */       if (!leftIter.hasNext()) return false;
    /* 072 */       smj_leftRow = (InternalRow) leftIter.next();
    /* 073 */
    /* 074 */       long smj_value = smj_leftRow.getLong(0);
    /* 075 */       if (false) {
    /* 076 */         if (!smj_matches.isEmpty()) {
    /* 077 */           smj_matches.clear();
    /* 078 */         }
    /* 079 */         return true;
    /* 080 */       }
    /* 081 */       if (!smj_matches.isEmpty()) {
    /* 082 */         comp = 0;
    /* 083 */         if (comp == 0) {
    /* 084 */           comp = (smj_value > smj_value3 ? 1 : smj_value < 
smj_value3 ? -1 : 0);
    /* 085 */         }
    /* 086 */
    /* 087 */         if (comp == 0) {
    /* 088 */           return true;
    /* 089 */         }
    /* 090 */         smj_matches.clear();
    /* 091 */       }
    /* 092 */
    /* 093 */       do {
    /* 094 */         if (smj_rightRow == null) {
    /* 095 */           if (!rightIter.hasNext()) {
    /* 096 */             smj_value3 = smj_value;
    /* 097 */             return true;
    /* 098 */           }
    /* 099 */           smj_rightRow = (InternalRow) rightIter.next();
    /* 100 */
    /* 101 */           long smj_value1 = smj_rightRow.getLong(0);
    /* 102 */           if (false) {
    /* 103 */             smj_rightRow = null;
    /* 104 */             continue;
    /* 105 */           }
    /* 106 */           smj_value2 = smj_value1;
    /* 107 */         }
    /* 108 */
    /* 109 */         comp = 0;
    /* 110 */         if (comp == 0) {
    /* 111 */           comp = (smj_value > smj_value2 ? 1 : smj_value < 
smj_value2 ? -1 : 0);
    /* 112 */         }
    /* 113 */
    /* 114 */         if (comp > 0) {
    /* 115 */           smj_rightRow = null;
    /* 116 */         } else if (comp < 0) {
    /* 117 */           if (!smj_matches.isEmpty()) {
    /* 118 */             smj_value3 = smj_value;
    /* 119 */           }
    /* 120 */           return true;
    /* 121 */         } else {
    /* 122 */           smj_matches.add((UnsafeRow) smj_rightRow);
    /* 123 */           smj_rightRow = null;
    /* 124 */         }
    /* 125 */       } while (smj_leftRow != null);
    /* 126 */     }
    /* 127 */     return false; // unreachable
    /* 128 */   }
    /* 129 */
    /* 130 */   protected void processNext() throws java.io.IOException {
    /* 131 */     while (findNextJoinRows(smj_leftInput, smj_rightInput)) {
    /* 132 */       boolean smj_loaded = false;
    /* 133 */       smj_value4 = smj_leftRow.getLong(0);
    /* 134 */       smj_value5 = smj_leftRow.getLong(1);
    /* 135 */       scala.collection.Iterator<UnsafeRow> smj_iterator = 
smj_matches.generateIterator();
    /* 136 */       while (smj_iterator.hasNext()) {
    /* 137 */         InternalRow smj_rightRow1 = (InternalRow) 
smj_iterator.next();
    /* 138 */         smj_isNull3 = smj_rightRow1.isNullAt(1);
    /* 139 */         smj_value7 = smj_rightRow1.getLong(1);
    /* 140 */         boolean smj_isNull4 = true;
    /* 141 */         boolean smj_value8 = false;
    /* 142 */
    /* 143 */         if (!smj_isNull3) {
    /* 144 */           smj_isNull4 = false; // resultCode could change 
nullability.
    /* 145 */           smj_value8 = smj_value5 < smj_value7;
    /* 146 */
    /* 147 */         }
    /* 148 */         if (smj_isNull4 || !smj_value8) continue;
    /* 149 */         smj_isNull2 = smj_rightRow1.isNullAt(0);
    /* 150 */         smj_value6 = smj_rightRow1.getLong(0);
    /* 151 */         ((org.apache.spark.sql.execution.metric.SQLMetric) 
references[0] /* numOutputRows */).add(1);
    /* 152 */         smj_loaded = true;
    /* 153 */         writeJoinRows();
    /* 154 */       }
    /* 155 */       if (!smj_loaded) {
    /* 156 */         smj_isNull2 = true;
    /* 157 */         smj_isNull3 = true;
    /* 158 */         writeJoinRows();
    /* 159 */       }
    /* 160 */       if (shouldStop()) return;
    /* 161 */     }
    /* 162 */   }
    /* 163 */
    /* 164 */ }
    ```
    
    ## How was this patch tested?
    
    Add test cases into `LeftJoinSuite` and `WholeStageCodegenSuite`


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/heary-cao/spark LeftJoinCodegen

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20820.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20820
    
----
commit cec653c713157a2084ef8c880e0482db9e77491d
Author: caoxuewen <cao.xuewen@...>
Date:   2018-03-14T07:43:56Z

    Support left join codegen in SortMergeJoinExec

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to