wankunde commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1655520091
Add the inner SMJ gen-code for better understand.
```java
/* 036 */ private boolean smj_findNextJoinRows_0(
/* 037 */ scala.collection.Iterator streamedIter,
/* 038 */ scala.collection.Iterator bufferedIter) {
/* 039 */ smj_streamedRow_0 = null;
/* 040 */ int comp = 0;
/* 041 */ while (smj_streamedRow_0 == null) {
/* 042 */ if (!streamedIter.hasNext()) return false; // 2.1.2 Window
1 will read one group data and the first row in next group (named X), return
the first row in the first group.
/* 043 */ smj_streamedRow_0 = (InternalRow) streamedIter.next();
/* 044 */ int smj_value_0 = smj_streamedRow_0.getInt(0);
/* 045 */ if (false) {
/* 046 */ smj_streamedRow_0 = null;
/* 047 */ continue;
/* 048 */
/* 049 */ }
/* 050 */ if (!smj_matches_0.isEmpty()) {
/* 051 */ comp = 0;
/* 052 */ if (comp == 0) {
/* 053 */ comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 <
smj_value_3 ? -1 : 0);
/* 054 */ }
/* 055 */
/* 056 */ if (comp == 0) {
/* 057 */ return true;
/* 058 */ }
/* 059 */ smj_matches_0.clear();
/* 060 */ }
/* 061 */
/* 062 */ do {
/* 063 */ if (smj_bufferedRow_0 == null) {
/* 064 */ if (!bufferedIter.hasNext()) {
/* 065 */ smj_value_3 = smj_value_0;
/* 066 */ return !smj_matches_0.isEmpty(); // 2.2.1 Sort 2 and
Window 2 are empty, no matched rows, and SMJ will finish.
/* 067 */ }
/* 068 */ smj_bufferedRow_0 = (InternalRow) bufferedIter.next();
/* 069 */ int smj_value_1 = smj_bufferedRow_0.getInt(0);
/* 070 */ if (false) {
/* 071 */ smj_bufferedRow_0 = null;
/* 072 */ continue;
/* 073 */ }
/* 074 */ smj_value_2 = smj_value_1;
/* 075 */ }
/* 076 */
/* 077 */ comp = 0;
/* 078 */ if (comp == 0) {
/* 079 */ comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 <
smj_value_2 ? -1 : 0);
/* 080 */ }
/* 081 */
/* 082 */ if (comp > 0) {
/* 083 */ smj_bufferedRow_0 = null;
/* 084 */ } else if (comp < 0) {
/* 085 */ if (!smj_matches_0.isEmpty()) {
/* 086 */ smj_value_3 = smj_value_0;
/* 087 */ return true;
/* 088 */ } else {
/* 089 */ smj_streamedRow_0 = null;
/* 090 */ }
/* 091 */ } else {
/* 092 */ smj_matches_0.add((UnsafeRow) smj_bufferedRow_0);
/* 093 */ smj_bufferedRow_0 = null;
/* 094 */ }
/* 095 */ } while (smj_streamedRow_0 != null);
/* 096 */ }
/* 097 */ return false; // unreachable
/* 098 */ }
/* 099 */
/* 100 */ protected void processNext() throws java.io.IOException {
/* 101 */ if (!wholestagecodegen_initJoin_0) {
/* 102 */ wholestagecodegen_initJoin_0 = true;
/* 113 */
/* 114 */ while (smj_findNextJoinRows_0(smj_streamedInput_0,
smj_bufferedInput_0)) {
/* 115 */ int smj_value_4 = -1;
/* 116 */ int smj_value_5 = -1;
/* 117 */ smj_value_4 = smj_streamedRow_0.getInt(0);
/* 118 */ smj_value_5 = smj_streamedRow_0.getInt(1);
/* 119 */ scala.collection.Iterator<UnsafeRow> smj_iterator_0 =
smj_matches_0.generateIterator();
/* 120 */
/* 121 */ while (smj_iterator_0.hasNext()) {
/* 122 */ InternalRow smj_bufferedRow_1 = (InternalRow)
smj_iterator_0.next();
// Append output rows
/* 137 */ append((smj_mutableStateArray_0[0].getRow()).copy());
/* 138 */
/* 139 */ }
/* 140 */ if (shouldStop()) return;
/* 141 */ }
/* 142 */ ((org.apache.spark.sql.execution.joins.SortMergeJoinExec)
references[1] /* plan */).cleanupResources(); // 2.3 SMJ call
earlyCleanupResources() to free offHeap memory.
/* 143 */ }
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]