Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r141985815
--- Diff:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
---
@@ -413,36 +414,103 @@ class UnsupportedOperationsSuite extends
SparkFunSuite {
batchStreamSupported = false,
streamBatchSupported = false)
- // Left outer joins: *-stream not allowed
+ // Left outer joins: *-stream not allowed with default condition
testBinaryOperationInStreamingPlan(
"left outer join",
_.join(_, joinType = LeftOuter),
- streamStreamSupported = false,
batchStreamSupported = false,
- expectedMsg = "left outer/semi/anti joins")
+ streamStreamSupported = false,
+ expectedMsg = "outer join")
+
+ // Left outer joins: stream-stream allowed with join on watermark
attribute
+ assertSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and join on watermark
attribute key",
+ streamRelation.join(streamRelation, joinType = LeftOuter,
+ condition = Some(attributeWithWatermark === attributeWithWatermark)),
+ OutputMode.Append())
+
+ // Left outer joins: stream-stream allowed with range condition yielding
state value watermark
+ assertSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and state value
watermark", {
+ val firstRelationWithWatermark = new
TestStreamingRelation(attributeWithWatermark)
+ val secondAttribute = AttributeReference("b",
IntegerType)().withMetadata(watermarkMetadata)
+ firstRelationWithWatermark.join(
+ new TestStreamingRelation(secondAttribute),
+ joinType = LeftOuter,
+ condition = Some(secondAttribute < attributeWithWatermark + 10 &&
--- End diff --
i think conditions on both sides should not be required. For LeftOuter, we
only need `leftAttribute > rightAttributeWithWatermark + c`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]