Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19327#discussion_r140933568
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
    @@ -233,16 +234,46 @@ object UnsupportedOperationChecker {
                     throwError("Full outer joins with streaming 
DataFrames/Datasets are not supported")
                   }
     
    -            case LeftOuter | LeftSemi | LeftAnti =>
    +            case LeftSemi | LeftAnti =>
                   if (right.isStreaming) {
    -                throwError("Left outer/semi/anti joins with a streaming 
DataFrame/Dataset " +
    -                    "on the right is not supported")
    +                throwError("Left semi/anti joins with a streaming 
DataFrame/Dataset " +
    +                    "on the right are not supported")
                   }
     
    -            case RightOuter =>
    -              if (left.isStreaming) {
    -                throwError("Right outer join with a streaming 
DataFrame/Dataset on the left is " +
    -                    "not supported")
    +            // We support left and right outer streaming joins only in the 
stream+stream case.
    +            case LeftOuter | RightOuter =>
    +              if (joinType == LeftOuter && !left.isStreaming && 
right.isStreaming) {
    +                throwError("Left outer join with a streaming 
DataFrame/Dataset " +
    +                  "on the right and non-streaming on the left is not 
supported")
    +              }
    +              if (joinType == RightOuter && left.isStreaming && 
!right.isStreaming) {
    +                throwError("Right outer join with a streaming 
DataFrame/Dataset on the left and " +
    +                    "non-streaming on the right not supported")
    +              }
    +
    +              if (left.isStreaming && right.isStreaming) {
    +                val watermarkInJoinKeys = subPlan match {
    +                  case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, 
_) =>
    +                    val keySet = AttributeSet(leftKeys ++ rightKeys)
    +                    (leftKeys ++ rightKeys).exists {
    +                      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
    +                      case _ => false
    +                    }
    +                  case _ => false
    +                }
    +
    +                val oppositeSideHasWatermark = joinType match {
    +                  case LeftOuter =>
    +                    
right.output.find(_.metadata.contains(EventTimeWatermark.delayKey)).isDefined
    +                  case RightOuter =>
    --- End diff --
    
    This condition is not sufficient. state can be dropped from left side (in 
left outer join) only when both conditions are satisfied
    1. right.output has watermark
    2. join condition has a time range condition such that left side state 
value watermark can be defined from the right side event time watermark. 
    
    Now `StreamingSymmetricHashJoinHelper.getOneSideStateWatermarkPredicate` 
checks for (2). 
    So you have to invoke that to find out whether the conditions are 
sufficient for defining state value watermarks
    However, that is in the SQL project, not catalyst. So you may have to move 
that helper class into catalyst, and then use it from the 
UnsupportedOperationChecker
    (also have to rename that helper class accordingly what makes sense)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to