Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19327#discussion_r141934201
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
    @@ -233,16 +234,54 @@ object UnsupportedOperationChecker {
                     throwError("Full outer joins with streaming 
DataFrames/Datasets are not supported")
                   }
     
    -            case LeftOuter | LeftSemi | LeftAnti =>
    +            case LeftSemi | LeftAnti =>
                   if (right.isStreaming) {
    -                throwError("Left outer/semi/anti joins with a streaming 
DataFrame/Dataset " +
    -                    "on the right is not supported")
    +                throwError("Left semi/anti joins with a streaming 
DataFrame/Dataset " +
    +                    "on the right are not supported")
                   }
     
    -            case RightOuter =>
    -              if (left.isStreaming) {
    -                throwError("Right outer join with a streaming 
DataFrame/Dataset on the left is " +
    -                    "not supported")
    +            // We support left and right outer streaming joins only in the 
stream+stream case.
    +            case LeftOuter | RightOuter =>
    +              if (joinType == LeftOuter && !left.isStreaming && 
right.isStreaming) {
    +                throwError("Left outer join with a streaming 
DataFrame/Dataset " +
    +                  "on the right and non-streaming on the left is not 
supported")
    +              }
    +              if (joinType == RightOuter && left.isStreaming && 
!right.isStreaming) {
    +                throwError("Right outer join with a streaming 
DataFrame/Dataset on the left and " +
    +                    "non-streaming on the right not supported")
    +              }
    +
    +              if (left.isStreaming && right.isStreaming) {
    +                val watermarkInJoinKeys = subPlan match {
    +                  case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, 
_) =>
    +                    val keySet = AttributeSet(leftKeys ++ rightKeys)
    +                    (leftKeys ++ rightKeys).exists {
    +                      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
    +                      case _ => false
    +                    }
    +                  case _ => false
    +                }
    +
    +                // Check if the nullable side has a watermark, and there's 
a range condition which
    +                // implies a state value watermark on the first side.
    +                val hasValidWatermarkRange = joinType match {
    +                  case LeftOuter =>
    --- End diff --
    
    with so many `case LeftOuter` and if (joinType = LeftOuter ... ` it might 
be better to actually split the top level case on join type in two distince 
`case LeftOuter` and `case RightOuter` (instead of the current `case 
LeftOuter|RightOuter`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to