HeartSaVioR edited a comment on issue #23576: [SPARK-26655] [SS] Support 
multiple aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#issuecomment-524686985
 
 
   I revisited and thought about this briefly, and felt that the watermark and 
mode Spark provide are different with other frameworks. 
   
   Append mode is tricky if you are familiar with other frameworks. In Append 
mode, Spark tries to ensure there's only one output for each key, which "delay 
threshold" is taken into consideration as well. AFAIK, Flink emits another 
output if late but allowed tuple comes later than watermark and updates output, 
hence dealing with "upsert" is necessary. (Not sure for Beam but I guess Flink 
follows the Beam model so I would expect similar.) In Spark, "upsert" is still 
yet defined for DSv2, and hence UPDATE mode will be disabled for Spark 3. 
(#23859)
   
   Suppose there's stateful operator OP1 with batch B2, and watermark is 
defined before OP1 with delay threshold set to 1hr. The range of outputs OP1 
can emit in B2 are following:
   
   `WM(OP1B1) - delay threshold` <= outputs < `WM(OP1B2) - delay threshold`
   
   as it denotes that outputs which were not evicted (emitted) from previous 
batch but match condition of evicting (emitting) for this batch.
   
   If we have OP2 having OP1 as upstream, it will retrieve outputs as above, 
and to not drop any intermediate outputs, either 1) OP2 should inherit 
WM(OP1B1) as WM(OP2B2) and also have equal or bigger delay threshold, or 2) OP2 
should define WM(OP2B2) as `WM(OP1B1) - delay threshold`.
   
   I think Spark needs to make some changes before introducing advanced 
features. 
   
   I think the main issue of Spark Structured Streaming is being "flexible" on 
watermark, flexible enough to let end users mess up their query easily. I 
assume other frameworks have special field for "event time" and prevent 
modifying the field, but for Spark it's just same as other column and open for 
modification. If event time is modified, it's no longer in line with watermark 
and the result would be indeterministic. Same for `withWatermark`, end users 
can call `withWatermark` between OP1 and OP2, then everything is up to end 
users - what would be WM(OP2)? - and Spark can't help there.
   
   Similarly, which is event time column for stream-stream joined output where 
event time column is defined per each input? I'm not seeing clear definition of 
this.
   
   I'd in favor to let streaming engine manages event time and watermark once 
value of event time is defined, and restrict end users to modify event time 
(one-time update). To achieve this, each row should have meta-column of "event 
time", and once it's defined, further update should be done only from Spark 
side - each stateful operator needs to decide the event time of output 
according to its watermark. (e.g. for windowed aggregation, "window.start" 
should be used for "event time" and it shouldn't be changed.) That's the only 
way Spark could ensure event time and watermark are in sync during multiple 
stateful operations.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to