leanken commented on a change in pull request #29104:
URL: https://github.com/apache/spark/pull/29104#discussion_r459822735
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -454,6 +491,48 @@ case class BroadcastHashJoinExec(
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")
+ // fast stop if isOriginalInputEmpty = true, should accept all rows in
streamedSide
+ if (broadcastRelation.value.isOriginalInputEmpty) {
+ return s"""
+ |// Anti Join isOriginalInputEmpty(true) accept all
+ |$numOutput.add(1);
+ |${consume(ctx, input)}
+ """.stripMargin
+ }
+
+ if (isNullAwareAntiJoin) {
+ if (broadcastRelation.value.allNullColumnKeyExistsInOriginalInput) {
+ return s"""
+ |// NAAJ
Review comment:
@agrawaldevesh
```
however if there is a aggr after BHJ
the break we added here would be wrong.
I think this fast stop is not that easy.
how about I hold this issue as "Resolve Later",
and i will do more code exploring and
resolve this issue in next PR "multi column support", is that ok for you?
/* 035 */ private void agg_doAggregateWithoutKey_0() throws
java.io.IOException {
/* 036 */ // initialize aggregation buffer
/* 037 */ agg_bufIsNull_0 = false;
/* 038 */ agg_bufValue_0 = 0L;
/* 039 */
/* 040 */ while ( localtablescan_input_0.hasNext()) {
/* 041 */ InternalRow localtablescan_row_0 = (InternalRow)
localtablescan_input_0.next();
/* 042 */ ((org.apache.spark.sql.execution.metric.SQLMetric)
references[0] /* numOutputRows */).add(1);
/* 043 */ boolean localtablescan_isNull_0 =
localtablescan_row_0.isNullAt(0);
/* 044 */ int localtablescan_value_0 = localtablescan_isNull_0 ?
/* 045 */ -1 : (localtablescan_row_0.getInt(0));
/* 046 */
/* 047 */ bhj_doConsume_0(localtablescan_value_0,
localtablescan_isNull_0);
/* 048 */ // shouldStop check is eliminated
/* 049 */ }
/* 050 */
/* 051 */ }
/* 052 */
/* 053 */ private void bhj_doConsume_0(int bhj_expr_0_0, boolean
bhj_exprIsNull_0_0) throws java.io.IOException {
/* 054 */ // NAAJ
/* 055 */ // isOriginalInputEmpty(false)
allNullColumnKeyExistsInOriginalInput(true)
/* 056 */ // reject all
/* 057 */ break; !!!!!!!!!!!
/* 058 */
/* 059 */ }
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]