wankunde commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1655520091

   Add the inner SMJ gen-code for better understand.
   
   ```java
   /* 036 */   private boolean smj_findNextJoinRows_0(
   /* 037 */     scala.collection.Iterator streamedIter,
   /* 038 */     scala.collection.Iterator bufferedIter) {
   /* 039 */     smj_streamedRow_0 = null;
   /* 040 */     int comp = 0;
   /* 041 */     while (smj_streamedRow_0 == null) {
   /* 042 */       if (!streamedIter.hasNext()) return false;  // 2.1.2 Window 
1 will read one group data and the first row in next group (named X), return 
the first row in the first group.
   /* 043 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
   /* 044 */       int smj_value_0 = smj_streamedRow_0.getInt(0);
   /* 045 */       if (false) {
   /* 046 */         smj_streamedRow_0 = null;
   /* 047 */         continue;
   /* 048 */
   /* 049 */       }
   /* 050 */       if (!smj_matches_0.isEmpty()) {
   /* 051 */         comp = 0;
   /* 052 */         if (comp == 0) {
   /* 053 */           comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 < 
smj_value_3 ? -1 : 0);
   /* 054 */         }
   /* 055 */
   /* 056 */         if (comp == 0) {
   /* 057 */           return true;
   /* 058 */         }
   /* 059 */         smj_matches_0.clear();
   /* 060 */       }
   /* 061 */
   /* 062 */       do {
   /* 063 */         if (smj_bufferedRow_0 == null) {
   /* 064 */           if (!bufferedIter.hasNext()) {
   /* 065 */             smj_value_3 = smj_value_0;
   /* 066 */             return !smj_matches_0.isEmpty();  // 2.2.1 Sort 2 and 
Window 2 are empty, no matched rows, and SMJ will finish. 
   /* 067 */           }
   /* 068 */           smj_bufferedRow_0 = (InternalRow) bufferedIter.next();
   /* 069 */           int smj_value_1 = smj_bufferedRow_0.getInt(0);
   /* 070 */           if (false) {
   /* 071 */             smj_bufferedRow_0 = null;
   /* 072 */             continue;
   /* 073 */           }
   /* 074 */           smj_value_2 = smj_value_1;
   /* 075 */         }
   /* 076 */
   /* 077 */         comp = 0;
   /* 078 */         if (comp == 0) {
   /* 079 */           comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 < 
smj_value_2 ? -1 : 0);
   /* 080 */         }
   /* 081 */
   /* 082 */         if (comp > 0) {
   /* 083 */           smj_bufferedRow_0 = null;
   /* 084 */         } else if (comp < 0) {
   /* 085 */           if (!smj_matches_0.isEmpty()) {
   /* 086 */             smj_value_3 = smj_value_0;
   /* 087 */             return true;
   /* 088 */           } else {
   /* 089 */             smj_streamedRow_0 = null;
   /* 090 */           }
   /* 091 */         } else {
   /* 092 */           smj_matches_0.add((UnsafeRow) smj_bufferedRow_0);
   /* 093 */           smj_bufferedRow_0 = null;
   /* 094 */         }
   /* 095 */       } while (smj_streamedRow_0 != null);
   /* 096 */     }
   /* 097 */     return false; // unreachable
   /* 098 */   }
   /* 099 */
   /* 100 */   protected void processNext() throws java.io.IOException {
   /* 101 */     if (!wholestagecodegen_initJoin_0) {
   /* 102 */       wholestagecodegen_initJoin_0 = true;
   /* 113 */
   /* 114 */     while (smj_findNextJoinRows_0(smj_streamedInput_0, 
smj_bufferedInput_0)) {
   /* 115 */       int smj_value_4 = -1;
   /* 116 */       int smj_value_5 = -1;
   /* 117 */       smj_value_4 = smj_streamedRow_0.getInt(0);
   /* 118 */       smj_value_5 = smj_streamedRow_0.getInt(1);
   /* 119 */       scala.collection.Iterator<UnsafeRow> smj_iterator_0 = 
smj_matches_0.generateIterator();
   /* 120 */
   /* 121 */       while (smj_iterator_0.hasNext()) {
   /* 122 */         InternalRow smj_bufferedRow_1 = (InternalRow) 
smj_iterator_0.next();
   //   Append output rows
   /* 137 */         append((smj_mutableStateArray_0[0].getRow()).copy());
   /* 138 */
   /* 139 */       }
   /* 140 */       if (shouldStop()) return;
   /* 141 */     }
   /* 142 */     ((org.apache.spark.sql.execution.joins.SortMergeJoinExec) 
references[1] /* plan */).cleanupResources();  // 2.3 SMJ call 
earlyCleanupResources() to free offHeap memory.
   /* 143 */   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to