Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r141986708
--- Diff:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
---
@@ -413,36 +414,103 @@ class UnsupportedOperationsSuite extends
SparkFunSuite {
batchStreamSupported = false,
streamBatchSupported = false)
- // Left outer joins: *-stream not allowed
+ // Left outer joins: *-stream not allowed with default condition
testBinaryOperationInStreamingPlan(
"left outer join",
_.join(_, joinType = LeftOuter),
- streamStreamSupported = false,
batchStreamSupported = false,
- expectedMsg = "left outer/semi/anti joins")
+ streamStreamSupported = false,
+ expectedMsg = "outer join")
+
+ // Left outer joins: stream-stream allowed with join on watermark
attribute
+ assertSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and join on watermark
attribute key",
+ streamRelation.join(streamRelation, joinType = LeftOuter,
+ condition = Some(attributeWithWatermark === attributeWithWatermark)),
+ OutputMode.Append())
+
+ // Left outer joins: stream-stream allowed with range condition yielding
state value watermark
+ assertSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and state value
watermark", {
+ val firstRelationWithWatermark = new
TestStreamingRelation(attributeWithWatermark)
+ val secondAttribute = AttributeReference("b",
IntegerType)().withMetadata(watermarkMetadata)
+ firstRelationWithWatermark.join(
+ new TestStreamingRelation(secondAttribute),
+ joinType = LeftOuter,
+ condition = Some(secondAttribute < attributeWithWatermark + 10 &&
+ secondAttribute > attributeWithWatermark - 10))
+ },
+ OutputMode.Append())
+
+ // Left outer joins: stream-stream not allowed with insufficient range
condition
+ assertNotSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and state value
watermark", {
+ val firstRelationWithWatermark = new
TestStreamingRelation(attributeWithWatermark)
+ val secondAttribute = AttributeReference("b",
IntegerType)().withMetadata(watermarkMetadata)
+ firstRelationWithWatermark.join(
+ new TestStreamingRelation(secondAttribute),
+ joinType = LeftOuter,
+ condition = Some(secondAttribute > attributeWithWatermark - 10))
+ },
+ OutputMode.Append(),
+ Seq("appropriate range condition"))
// Left semi joins: stream-* not allowed
testBinaryOperationInStreamingPlan(
"left semi join",
_.join(_, joinType = LeftSemi),
streamStreamSupported = false,
batchStreamSupported = false,
- expectedMsg = "left outer/semi/anti joins")
+ expectedMsg = "left semi/anti joins")
// Left anti joins: stream-* not allowed
testBinaryOperationInStreamingPlan(
"left anti join",
_.join(_, joinType = LeftAnti),
streamStreamSupported = false,
batchStreamSupported = false,
- expectedMsg = "left outer/semi/anti joins")
+ expectedMsg = "left semi/anti joins")
- // Right outer joins: stream-* not allowed
+ // Right outer joins: stream-* not allowed with default condition
testBinaryOperationInStreamingPlan(
"right outer join",
_.join(_, joinType = RightOuter),
+ streamBatchSupported = false,
streamStreamSupported = false,
- streamBatchSupported = false)
+ expectedMsg = "outer join")
+
+ // Right outer joins: stream-stream allowed with join on watermark
attribute
+ assertSupportedInStreamingPlan(
+ s"right outer join with stream-stream relations and join on watermark
attribute key",
+ streamRelation.join(streamRelation, joinType = RightOuter,
+ condition = Some(attributeWithWatermark === attributeWithWatermark)),
--- End diff --
similar to comment as for left outer join.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]