HeartSaVioR opened a new pull request, #39931:
URL: https://github.com/apache/spark/pull/39931
### What changes were proposed in this pull request?
This PR proposes to introduce watermark propagation among operators via
simulation, which enables the workload of "stream-stream time interval join
followed by stateful operator".
As of now, Spark considers all stateful operators to have same input
watermark and output watermark, which is insufficient to handle stream-stream
time interval join. It can delay joined output more than global watermark,
based on the join criteria. (e.g. `leftTime BETWEEN rightTime - INTERVAL 30
seconds AND rightTime + INTERVAL 40 seconds`). To address this, the join
operator should be able to produce "delayed" watermark to the downstream
operator. That said, Spark has to "propagate" watermark among operators,
flowing through leaf node(s) to root (going downstream).
This PR introduces a new interface `WatermarkPropagator` which performs
simulation of watermark propagation based on the watermark. There are three
implementations for this interface:
1. NoOpWatermarkPropagator: Do nothing. This is used for initializing dummy
IncrementalExecution.
2. UseSingleWatermarkPropagator: Uses a single global watermark for late
events and eviction. This is used for compatibility mode
(`spark.sql.streaming.statefulOperator.allowMultiple` to `false`).
3. PropagateWatermarkSimulator: simulates propagation of watermark among
operators.
The simulation algorithm used in `PropagateWatermarkSimulator` traverses the
physical plan tree via post-order (children first) to calculate (input
watermark, output watermark) for all nodes. For each node, below logic is
applied:
- Input watermark for specific node is decided by `min(input watermarks from
all children)`.
- Children providing no input watermark (DEFAULT_WATERMARK_MS) are
excluded.
- If there is no valid input watermark from children, input watermark =
DEFAULT_WATERMARK_MS.
- Output watermark for specific node is decided as following:
- watermark nodes: origin watermark value (global watermark).
- stateless nodes: same as input watermark.
- stateful nodes: the return value of `op.produceWatermark(input
watermark)`.
Once the algorithm traverses the physical plan tree, the association between
stateful operator and input watermark will be constructed. The association is
cached after calculation and being used across microbatches, till Spark
determines the association as no longer to be used.
As mentioned like `op.produceWatermark()` in above, this PR also adds a new
method `produceWatermark` in StateStoreWriter, which requires each stateful
operator to calculate output watermark based on given input watermark. In most
cases, this is same as the criteria of state eviction, as most stateful
operators produce the output from two different kinds:
1. without buffering (event time > input watermark)
2. with buffering (state)
The state eviction happens when event time exceeds a "certain threshold of
timestamp", which denotes a lower bound of event time values for output (output
watermark). Since most stateful operators construct the predicate for state
eviction based on watermark in batch planning phase, they can produce an output
watermark once Spark provides an input watermark.
Please refer to the walkthrough code comment for the test case of
`stream-stream time interval left outer join -> aggregation, append mode`.
There are several additional decisions made by this PR which introduces
backward incompatibility.
1. Re-definition of watermark will be disallowed.
Technically, each watermark node can track its own value of watermark and
PropagateWatermarkSimulator can propagate these values correctly. (multiple
origins) While this may help to accelerate processing faster stream (as all
watermarks don't need to follow the slowest one till join/union), this involves
more complicated questions on UX perspective, as all UX about watermark is
based on global watermark. This seems harder to address, hence this PR proposes
to retain the global watermark as it is.
Since we want to produce watermark as the single origin value, redefinition
of watermark does not make sense. Consider stream-stream time interval join
followed by another watermark node. Which is the right value of output
watermark for another watermark node? delayed watermark, or global watermark?
2. stateful operator will not allow multiple event time columns being
defined in the input DataFrame.
The output of stream-stream join may have two event time columns, which is
ambiguous on late record filtering and eviction. Currently the first appeared
column has been picked up for late record filtering and eviction, which is NOT
correct. After this PR, Spark will throw an exception.
**TODO: Apply this to the code change.**
Turning off the flag `spark.sql.streaming.statefulOperator.allowMultiple`
will restore the old behavior from the above.
### Why are the changes needed?
stream-stream time interval join followed by stateful operator is not
supported yet.
### Does this PR introduce _any_ user-facing change?
Yes, here is a list of user facing changes (some are backward
incompatibility changes, though we have compatibility flag):
- stream-stream time-interval join followed by stateful operator will be
allowed.
- Re-definition of watermark will be disallowed.
- stateful operator will not allow multiple event time columns being defined
in the input DataFrame.
### How was this patch tested?
New & modified test cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]