HeartSaVioR commented on issue #23576: [SPARK-26655] [SS] Support multiple aggregates in append mode URL: https://github.com/apache/spark/pull/23576#issuecomment-524686985 I revisited and thought about this briefly, and felt that the watermark and mode Spark provide are different with other frameworks. Append mode is tricky if you are familiar with other frameworks. In Append mode, Spark tries to ensure there's only one output for each key, which "delay threshold" is taken into consideration as well. AFAIK, Flink emits another output if late but allowed tuple comes later than watermark and updates output, hence dealing with "upsert" is necessary. (Not sure for Beam but I guess Flink follows the Beam model so I would expect similar.) In Spark, "upsert" is still yet defined for DSv2, and hence UPDATE mode will be disabled for Spark 3. (#23859) Suppose there's stateful operator OP1 with batch B2, and watermark is defined before OP1 with delay threshold set to 1hr. The range of outputs OP1 can emit in B2 are following: `WM(OP1B1) - delay threshold` <= outputs < `WM(OP1B2) - delay threshold` as it denotes that outputs which were not evicted (emitted) from previous batch but match condition of evicting (emitting) for this batch. If we have OP2 having OP1 as upstream, it will retrieve outputs as above, and to not drop any intermediate outputs, either 1) OP2 should inherit WM(OP1B1) as WM(OP2B2) and also have equal bigger delay threshold, or 2) OP2 should define WM(OP2B2) as `WM(OP1B1) - delay threshold`. I think Spark needs to make some changes before introducing advanced features. I think the main issue of Spark Structured Streaming is being "flexible" on watermark, flexible enough to let end users mess up their query easily. I assume other frameworks have special field for "event time" and prevent modifying the field, but for Spark it's just same as other column and open for modification. If event time is modified, it's no longer in line with watermark and the result would be indeterministic. Same for `withWatermark`, end users can call `withWatermark` between OP1 and OP2, then everything is up to end users - what would be WM(OP2)? - and Spark can't help there. Similarly, which is event time column for stream-stream joined output where event time column is defined per each input? I'm not seeing clear definition of this. I'd in favor to let streaming engine manages event time and watermark once value of event time is defined, and restrict end users to modify event time (one-time update). To achieve this, each row should have meta-column of "event time", and once it's defined, further update should be done only from Spark side - each stateful operator needs to decide the event time of output according to its watermark. (e.g. for windowed aggregation, "window.start" should be used for "event time" and it shouldn't be changed.) That's the only way Spark could ensure event time and watermark are in sync during multiple stateful operations.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
