neilramaswamy commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1513523534
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -198,18 +198,23 @@ object StreamingSymmetricHashJoinHelper extends Logging {
val joinKeyOrdinalForWatermark: Option[Int] =
findJoinKeyOrdinalForWatermark(
leftKeys, rightKeys)
+ // Returns a predicate that drops data less than the state watermark.
+ // oneSideInputAttributes are the attributes to base the state watermark
off of, while
+ // otherSideInputAttributes are the attributes on which the watermark is
defined.
def getOneSideStateWatermarkPredicate(
oneSideInputAttributes: Seq[Attribute],
oneSideJoinKeys: Seq[Expression],
otherSideInputAttributes: Seq[Attribute]):
Option[JoinStateWatermarkPredicate] = {
- val isWatermarkDefinedOnInput =
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+ val watermarkAttribute =
otherSideInputAttributes.find(_.metadata.contains(delayKey))
+ val isWatermarkDefinedOnInput = watermarkAttribute.isDefined
val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined
if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 in the
StreamingSymmetricHashJoinExec docs
val keyExprWithWatermark = BoundReference(
joinKeyOrdinalForWatermark.get,
oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
- oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+ oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable
+ )
Review Comment:
```suggestion
oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]