HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows URL: https://github.com/apache/spark/pull/23634#issuecomment-457354283 Seems like I need to add the query along with edge-case. (Just updated the description.) > the left side of the self-join is supposed to evict records 5 seconds behind the watermark, but it seems to be incorrectly waiting 10 second instead. No, left side waited for 5 seconds behind the watermark, whereas right side didn't wait behind the watermark. The join condition is not `ts1 = ts2`, but `ts1 <= ts2 <= ts1 + interval 5 seconds`, and in this case this is I guess "known behavior" that left side is expected to wait up to 5 seconds behind the watermark to match like `ts2 = ts1 + 4 seconds` (equals to `ts1 = ts2 - 4 seconds`). Here's a physical plan from one of batch while running similar query of UT in spark-shell: ``` == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@758c98d2 +- StreamingSymmetricHashJoin [fooId#4L], [barId#8L], LeftOuter, condition = [ leftOnly = null, rightOnly = null, both = ((barTime#9-T5000ms >= fooTime#5-T5000ms) && (barTime#9-T5000ms <= fooTime#5-T5000ms + interval 5 seconds)), full = ((barTime#9-T5000ms >= fooTime#5-T5000ms) && (barTime#9-T5000ms <= fooTime#5-T5000ms + interval 5 seconds)) ], state info [ checkpoint = file:/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/temporary-455ca2eb-f4d4-44af-a8f5-5a90d30b520c/state, runId = fdc8a843-021f-4029-8be4-83f480780c43, opId = 0, ver = 2, numPartitions = 200], 1548219494841, state cleanup [ left value predicate: (fooTime#5-T5000ms <= 1548219489840000), right value predicate: (barTime#9-T5000ms <= 1548219494840000) ] :- Exchange hashpartitioning(fooId#4L, 200) : +- EventTimeWatermark fooTime#5: timestamp, interval 5 seconds : +- *(1) Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5] : +- *(1) Project [timestamp#0, value#1L] : +- *(1) ScanV2[timestamp#0, value#1L] class org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1$$anon$2 +- Exchange hashpartitioning(barId#8L, 200) +- *(3) Filter isnotnull(barTime#9-T5000ms) +- EventTimeWatermark barTime#9: timestamp, interval 5 seconds +- *(2) Project [value#1L AS barId#8L, timestamp#0 AS barTime#9] +- *(2) Filter (isnotnull(value#1L) && ((value#1L % 2) = 0)) +- *(2) Project [timestamp#0, value#1L] +- *(2) ScanV2[timestamp#0, value#1L] class org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1$$anon$2 ``` cropped join information which we only want: ``` 1548219494841, state cleanup [ left value predicate: (fooTime#5-T5000ms <= 1548219489840000), right value predicate: (barTime#9-T5000ms <= 1548219494840000) ] ``` event time watermark = '1548219494841' (2019/01/23 13:58:14.841 GMT+09:00) left predicate = '1548219489840000' (2019/01/23 13:58:09.840 GMT+09:00) right predicate = '1548219494840000' (2019/01/23 13:58:14.840 GMT+09:00) > If L1 isn't evicted, that means a new row L1' should still be able to match with R1, and therefore R1 can't be evicted either. I think state eviction behind the watermark is due to wait for new row, which other conditions (like previously joined rows) should not matter. So we may need to focus on watermark itself. Suppose `L1.ts = R1.ts = L1'.ts` and R1 is evicted, then this represents `watermark > R1.ts (= L1'.ts)` and L1' will be dropped without joining due to watermark. Once watermark passes, right side doesn't have a chance to match against left side via join condition (whereas left side still has a chance to match against right side), so it looks correct to evict R1. Please let me know if I'm missing here.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
