c21 commented on a change in pull request #30347:
URL: https://github.com/apache/spark/pull/30347#discussion_r521876356



##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -384,239 +384,166 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     outputMode = Append
   )
 
-  // Inner joins: Multiple stream-stream joins supported only in append mode
-  testBinaryOperationInStreamingPlan(
-    "single inner join in append mode",
-    _.join(_, joinType = Inner),
-    outputMode = Append,
-    streamStreamSupported = true)
-
-  testBinaryOperationInStreamingPlan(
-    "multiple inner joins in append mode",
-    (x: LogicalPlan, y: LogicalPlan) => {
-      x.join(y, joinType = Inner).join(streamRelation, joinType = Inner)
-    },
-    outputMode = Append,
-    streamStreamSupported = true)
-
-  testBinaryOperationInStreamingPlan(
-    "inner join in update mode",
-    _.join(_, joinType = Inner),
-    outputMode = Update,
-    streamStreamSupported = false,
-    expectedMsg = "is not supported in Update output mode")
-
-  // Full outer joins: only batch-batch is allowed
-  testBinaryOperationInStreamingPlan(
-    "full outer join",
-    _.join(_, joinType = FullOuter),
-    streamStreamSupported = false,
-    batchStreamSupported = false,
-    streamBatchSupported = false)
+  // stream-stream join
+  {
+    // Inner joins: Multiple stream-stream joins supported only in append mode
+    testBinaryOperationInStreamingPlan(
+      "single inner join in append mode",
+      _.join(_, joinType = Inner),
+      outputMode = Append)
+
+    testBinaryOperationInStreamingPlan(
+      "multiple inner joins in append mode",
+      (x: LogicalPlan, y: LogicalPlan) => {
+        x.join(y, joinType = Inner).join(streamRelation, joinType = Inner)
+      },
+      outputMode = Append)
+
+    testBinaryOperationInStreamingPlan(
+      "inner join in update mode",
+      _.join(_, joinType = Inner),
+      outputMode = Update,
+      streamStreamSupported = false,
+      expectedMsg = "is not supported in Update output mode")
+
+    // Full outer joins: only batch-batch is allowed
+    testBinaryOperationInStreamingPlan(
+      "full outer join",
+      _.join(_, joinType = FullOuter),
+      streamStreamSupported = false,
+      batchStreamSupported = false,
+      streamBatchSupported = false)
+
+    // Left outer, left semi, left anti join: *-stream not allowed
+    Seq((LeftOuter, "LeftOuter join"), (LeftSemi, "LeftSemi join"), (LeftAnti, 
"Left anti join"))
+      .foreach { case (joinType, name) =>
+        testBinaryOperationInStreamingPlan(
+          name,
+          _.join(_, joinType = joinType),
+          batchStreamSupported = false,
+          streamStreamSupported = false,
+          expectedMsg = name)
+      }
 
-  // Left outer joins: *-stream not allowed
-  testBinaryOperationInStreamingPlan(
-    "left outer join",
-    _.join(_, joinType = LeftOuter),
-    batchStreamSupported = false,
-    streamStreamSupported = false,
-    expectedMsg = "outer join")
+    // Right outer joins: stream-* not allowed
+    testBinaryOperationInStreamingPlan(
+      "right outer join",
+      _.join(_, joinType = RightOuter),
+      streamBatchSupported = false,
+      streamStreamSupported = false,
+      expectedMsg = "outer join")
+
+    // Left outer, right outer, left semi joins
+    Seq(LeftOuter, RightOuter, LeftSemi).foreach { joinType =>
+      // Update mode not allowed
+      assertNotSupportedInStreamingPlan(
+        s"$joinType join with stream-stream relations and update mode",
+        streamRelation.join(streamRelation, joinType = joinType,
+          condition = Some(attribute === attribute)),
+        OutputMode.Update(),
+        Seq("is not supported in Update output mode"))
 
-  // Left outer joins: update and complete mode not allowed
-  assertNotSupportedInStreamingPlan(
-    s"left outer join with stream-stream relations and update mode",
-    streamRelation.join(streamRelation, joinType = LeftOuter,
-      condition = Some(attribute === attribute)),
-    OutputMode.Update(),
-    Seq("is not supported in Update output mode"))
-  assertNotSupportedInStreamingPlan(
-    s"left outer join with stream-stream relations and complete mode",
-    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType 
= LeftOuter,
-      condition = Some(attribute === attribute))),
-    OutputMode.Complete(),
-    Seq("is not supported in Complete output mode"))
-
-  // Left outer joins: stream-stream allowed with join on watermark attribute
-  // Note that the attribute need not be watermarked on both sides.
-  assertSupportedInStreamingPlan(
-    s"left outer join with stream-stream relations and join on attribute with 
left watermark",
-    streamRelation.join(streamRelation, joinType = LeftOuter,
-      condition = Some(attributeWithWatermark === attribute)),
-    OutputMode.Append())
-  assertSupportedInStreamingPlan(
-    s"left outer join with stream-stream relations and join on attribute with 
right watermark",
-    streamRelation.join(streamRelation, joinType = LeftOuter,
-      condition = Some(attribute === attributeWithWatermark)),
-    OutputMode.Append())
-  assertNotSupportedInStreamingPlan(
-    s"left outer join with stream-stream relations and join on non-watermarked 
attribute",
-    streamRelation.join(streamRelation, joinType = LeftOuter,
-      condition = Some(attribute === attribute)),
-    OutputMode.Append(),
-    Seq("watermark in the join keys"))
+      // Complete mode not allowed
+      assertNotSupportedInStreamingPlan(
+        s"$joinType join with stream-stream relations and complete mode",
+        Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, 
joinType = joinType,
+          condition = Some(attribute === attribute))),
+        OutputMode.Complete(),
+        Seq("is not supported in Complete output mode"))
+
+      // Stream-stream allowed with join on watermark attribute
+      // Note that the attribute need not be watermarked on both sides.
+      assertSupportedInStreamingPlan(
+        s"$joinType join with stream-stream relations and join on attribute 
with left watermark",
+        streamRelation.join(streamRelation, joinType = joinType,
+          condition = Some(attributeWithWatermark === attribute)),
+        OutputMode.Append())
+      assertSupportedInStreamingPlan(
+        s"$joinType join with stream-stream relations and join on attribute 
with right watermark",
+        streamRelation.join(streamRelation, joinType = joinType,
+          condition = Some(attribute === attributeWithWatermark)),
+        OutputMode.Append())
+      assertNotSupportedInStreamingPlan(
+        s"$joinType join with stream-stream relations and join on 
non-watermarked attribute",
+        streamRelation.join(streamRelation, joinType = joinType,
+          condition = Some(attribute === attribute)),
+        OutputMode.Append(),
+        Seq("without a watermark in the join keys"))
 
-  // Left outer joins: stream-stream allowed with range condition yielding 
state value watermark
-  assertSupportedInStreamingPlan(
-    s"left outer join with stream-stream relations and state value watermark", 
{
-      val leftRelation = streamRelation
-      val rightTimeWithWatermark =
+      val timeWithWatermark =
         AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata)
-      val rightRelation = new TestStreamingRelation(rightTimeWithWatermark)
-      leftRelation.join(
-        rightRelation,
-        joinType = LeftOuter,
-        condition = Some(attribute > rightTimeWithWatermark + 10))
-    },
-    OutputMode.Append())
-
-  // Left outer joins: stream-stream not allowed with insufficient range 
condition
-  assertNotSupportedInStreamingPlan(
-    s"left outer join with stream-stream relations and state value watermark", 
{
-      val leftRelation = streamRelation
-      val rightTimeWithWatermark =
-        AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata)
-      val rightRelation = new TestStreamingRelation(rightTimeWithWatermark)
-      leftRelation.join(
-        rightRelation,
-        joinType = LeftOuter,
-        condition = Some(attribute < rightTimeWithWatermark + 10))
-    },
-    OutputMode.Append(),
-    Seq("appropriate range condition"))
-
-  // Left semi joins: stream-* not allowed
-  testBinaryOperationInStreamingPlan(
-    "left semi join",
-    _.join(_, joinType = LeftSemi),
-    streamStreamSupported = false,
-    batchStreamSupported = false,
-    expectedMsg = "LeftSemi join")
+      val relationWithWatermark = new TestStreamingRelation(timeWithWatermark)
+      val (leftRelation, rightRelation) =
+        if (joinType == RightOuter) {
+          (relationWithWatermark, streamRelation)
+        } else {
+          (streamRelation, relationWithWatermark)
+        }
 
-  // Left semi joins: update and complete mode not allowed
-  assertNotSupportedInStreamingPlan(
-    "left semi join with stream-stream relations and update mode",
-    streamRelation.join(streamRelation, joinType = LeftSemi,
-      condition = Some(attribute === attribute)),
-    OutputMode.Update(),
-    Seq("is not supported in Update output mode"))
-  assertNotSupportedInStreamingPlan(
-    "left semi join with stream-stream relations and complete mode",
-    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType 
= LeftSemi,
-      condition = Some(attribute === attribute))),
-    OutputMode.Complete(),
-    Seq("is not supported in Complete output mode"))
-
-  // Left semi joins: stream-stream allowed with join on watermark attribute
-  // Note that the attribute need not be watermarked on both sides.
-  assertSupportedInStreamingPlan(
-    "left semi join with stream-stream relations and join on attribute with 
left watermark",
-    streamRelation.join(streamRelation, joinType = LeftSemi,
-      condition = Some(attributeWithWatermark === attribute)),
-    OutputMode.Append())
-  assertSupportedInStreamingPlan(
-    "left semi join with stream-stream relations and join on attribute with 
right watermark",
-    streamRelation.join(streamRelation, joinType = LeftSemi,
-      condition = Some(attribute === attributeWithWatermark)),
-    OutputMode.Append())
-  assertNotSupportedInStreamingPlan(
-    "left semi join with stream-stream relations and join on non-watermarked 
attribute",
-    streamRelation.join(streamRelation, joinType = LeftSemi,
-      condition = Some(attribute === attribute)),
-    OutputMode.Append(),
-    Seq("without a watermark in the join keys"))
+      // stream-stream allowed with range condition yielding state value 
watermark
+      assertSupportedInStreamingPlan(
+        s"$joinType join with stream-stream relations and state value 
watermark",
+        leftRelation.join(rightRelation, joinType = joinType,
+          condition = Some(attribute > timeWithWatermark + 10)),
+        OutputMode.Append())
 
-  // Left semi joins: stream-stream allowed with range condition yielding 
state value watermark
-  assertSupportedInStreamingPlan(
-    "left semi join with stream-stream relations and state value watermark", {
-      val leftRelation = streamRelation
-      val rightTimeWithWatermark =
-        AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata)
-      val rightRelation = new TestStreamingRelation(rightTimeWithWatermark)
-      leftRelation.join(
-        rightRelation,
-        joinType = LeftSemi,
-        condition = Some(attribute > rightTimeWithWatermark + 10))
-    },
-    OutputMode.Append())
-
-  // Left semi joins: stream-stream not allowed with insufficient range 
condition
-  assertNotSupportedInStreamingPlan(
-    "left semi join with stream-stream relations and state value watermark", {
-      val leftRelation = streamRelation
-      val rightTimeWithWatermark =
-        AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata)
-      val rightRelation = new TestStreamingRelation(rightTimeWithWatermark)
-      leftRelation.join(
-        rightRelation,
-        joinType = LeftSemi,
-        condition = Some(attribute < rightTimeWithWatermark + 10))
-    },
-    OutputMode.Append(),
-    Seq("appropriate range condition"))
-
-  // Left anti joins: stream-* not allowed
-  testBinaryOperationInStreamingPlan(
-    "left anti join",
-    _.join(_, joinType = LeftAnti),
-    streamStreamSupported = false,
-    batchStreamSupported = false,
-    expectedMsg = "Left anti join")
+      // stream-stream not allowed with insufficient range condition
+      assertNotSupportedInStreamingPlan(
+        s"$joinType join with stream-stream relations and state value 
watermark",
+        leftRelation.join(rightRelation, joinType = joinType,
+          condition = Some(attribute < timeWithWatermark + 10)),
+        OutputMode.Append(),
+        Seq("is not supported without a watermark in the join keys, or a 
watermark on " +
+          "the nullable side and an appropriate range condition"))
+    }
 
-  // Right outer joins: stream-* not allowed
-  testBinaryOperationInStreamingPlan(
-    "right outer join",
-    _.join(_, joinType = RightOuter),
-    streamBatchSupported = false,
-    streamStreamSupported = false,
-    expectedMsg = "outer join")
+    // stream-stream inner join doesn't emit late rows, whereas outer joins 
could
+    Seq((Inner, false), (LeftOuter, true), (RightOuter, true)).map {

Review comment:
       Note: this test is from 
https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L744
 .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to