Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19327#discussion_r141934196
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
    @@ -233,16 +234,54 @@ object UnsupportedOperationChecker {
                     throwError("Full outer joins with streaming 
DataFrames/Datasets are not supported")
                   }
     
    -            case LeftOuter | LeftSemi | LeftAnti =>
    +            case LeftSemi | LeftAnti =>
                   if (right.isStreaming) {
    -                throwError("Left outer/semi/anti joins with a streaming 
DataFrame/Dataset " +
    -                    "on the right is not supported")
    +                throwError("Left semi/anti joins with a streaming 
DataFrame/Dataset " +
    +                    "on the right are not supported")
                   }
     
    -            case RightOuter =>
    -              if (left.isStreaming) {
    -                throwError("Right outer join with a streaming 
DataFrame/Dataset on the left is " +
    -                    "not supported")
    +            // We support left and right outer streaming joins only in the 
stream+stream case.
    +            case LeftOuter | RightOuter =>
    +              if (joinType == LeftOuter && !left.isStreaming && 
right.isStreaming) {
    +                throwError("Left outer join with a streaming 
DataFrame/Dataset " +
    +                  "on the right and non-streaming on the left is not 
supported")
    +              }
    +              if (joinType == RightOuter && left.isStreaming && 
!right.isStreaming) {
    +                throwError("Right outer join with a streaming 
DataFrame/Dataset on the left and " +
    +                    "non-streaming on the right not supported")
    +              }
    +
    +              if (left.isStreaming && right.isStreaming) {
    +                val watermarkInJoinKeys = subPlan match {
    +                  case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, 
_) =>
    +                    val keySet = AttributeSet(leftKeys ++ rightKeys)
    +                    (leftKeys ++ rightKeys).exists {
    +                      case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
    +                      case _ => false
    +                    }
    +                  case _ => false
    +                }
    +
    +                // Check if the nullable side has a watermark, and there's 
a range condition which
    +                // implies a state value watermark on the first side.
    +                val hasValidWatermarkRange = joinType match {
    +                  case LeftOuter =>
    +                    // We provide a dummy watermark value 0 - we just want 
to check if the
    +                    // watermark predicate can be constructed.
    +                    StreamingJoinHelper.getStateValueWatermark(
    --- End diff --
    
    this needs to be refactored. the function needs to be refactored such that 
it returns an optional expression to calculate state watermark. it will return 
an expression only if there is one or more time range condition of the right 
type. 
    
    this refactoring shouldnt be too hard. in the helper method, instead of 
replacing `attributeWithWatermark` with the value of `eventTimeWatermark` and 
evaluating the equation, it should just create the expression 
`attributeWithWatermark + C`. This is the expression to calculate 
stateWatermark. Note that there can be multiple conditions with the necessary 
directionality, so it can be a list of expressions `attributeWithWatermark + 
C1`, `attributeWithWatermark + C2`, `...C3`, etc.
    
    In the unsupported operation checker, it should just check if such 
expressions have been extracted or not. And in the planner, a separate method 
should take these expression and evaluate them.
    
    And a different function, can take these conditions, and split them up 
again, and evaluate 



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to