HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-457354283
 
 
   Seems like I need to add the query along with edge-case. (Just updated the 
description.) 
   
   > the left side of the self-join is supposed to evict records 5 seconds 
behind the watermark, but it seems to be incorrectly waiting 10 second instead.
   
   No, left side waited for 5 seconds behind the watermark, whereas right side 
didn't wait behind the watermark. The join condition is not `ts1 = ts2`, but 
`ts1 <= ts2 <= ts1 + interval 5 seconds`, and in this case this is I guess 
"known behavior" that left side is expected to wait up to 5 seconds behind the 
watermark to match like `ts2 = ts1 + 4 seconds` (equals to `ts1 = ts2 - 4 
seconds`).
   
   Here's a physical plan from one of batch while running similar query of UT 
in spark-shell:
   
   ```
   == Physical Plan ==
   WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@758c98d2
   +- StreamingSymmetricHashJoin [fooId#4L], [barId#8L], LeftOuter, condition = 
[ leftOnly = null, rightOnly = null, both = ((barTime#9-T5000ms >= 
fooTime#5-T5000ms) && (barTime#9-T5000ms <= fooTime#5-T5000ms + interval 5 
seconds)), full = ((barTime#9-T5000ms >= fooTime#5-T5000ms) && 
(barTime#9-T5000ms <= fooTime#5-T5000ms + interval 5 seconds)) ], state info [ 
checkpoint = 
file:/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/temporary-455ca2eb-f4d4-44af-a8f5-5a90d30b520c/state,
 runId = fdc8a843-021f-4029-8be4-83f480780c43, opId = 0, ver = 2, numPartitions 
= 200], 1548219494841, state cleanup [ left value predicate: (fooTime#5-T5000ms 
<= 1548219489840000), right value predicate: (barTime#9-T5000ms <= 
1548219494840000) ]
      :- Exchange hashpartitioning(fooId#4L, 200)
      :  +- EventTimeWatermark fooTime#5: timestamp, interval 5 seconds
      :     +- *(1) Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
      :        +- *(1) Project [timestamp#0, value#1L]
      :           +- *(1) ScanV2[timestamp#0, value#1L] class 
org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1$$anon$2
      +- Exchange hashpartitioning(barId#8L, 200)
         +- *(3) Filter isnotnull(barTime#9-T5000ms)
            +- EventTimeWatermark barTime#9: timestamp, interval 5 seconds
               +- *(2) Project [value#1L AS barId#8L, timestamp#0 AS barTime#9]
                  +- *(2) Filter (isnotnull(value#1L) && ((value#1L % 2) = 0))
                     +- *(2) Project [timestamp#0, value#1L]
                        +- *(2) ScanV2[timestamp#0, value#1L] class 
org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1$$anon$2
   ```
   
   cropped join information which we only want:
   
   ```
   1548219494841, 
   state cleanup [ 
     left value predicate: (fooTime#5-T5000ms <= 1548219489840000), 
     right value predicate: (barTime#9-T5000ms <= 1548219494840000) 
   ]
   ```
   
   event time watermark = '1548219494841' (2019/01/23 13:58:14.841 GMT+09:00)
   left predicate               = '1548219489840000' (2019/01/23 13:58:09.840 
GMT+09:00)
   right predicate             = '1548219494840000' (2019/01/23 13:58:14.840 
GMT+09:00)
   
   > If L1 isn't evicted, that means a new row L1' should still be able to 
match with R1, and therefore R1 can't be evicted either.
   
   I think state eviction behind the watermark is due to wait for new row, 
which other conditions (like previously joined rows) should not matter. So we 
may need to focus on watermark itself.
   
   Suppose `L1.ts = R1.ts = L1'.ts` and R1 is evicted, then this represents 
`watermark > R1.ts (= L1'.ts)` and L1' will be dropped without joining due to 
watermark. Once watermark passes, right side doesn't have a chance to match 
against left side via join condition (whereas left side still has a chance to 
match against right side), so it looks correct to evict R1.
   
   Please let me know if I'm missing here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to