HeartSaVioR opened a new pull request, #56061:
URL: https://github.com/apache/spark/pull/56061
### What changes were proposed in this pull request?
Introduce a three-component fix for stateful-operator nullability drift,
gated by `spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled`
(pinned per-query via the offset log):
- (a) `WidenStatefulOpNullability.widenStateSchema`: every stateful physical
exec widens its state key/value schema to fully nullable at construction.
- (b) `WidenStatefulOpNullability.widenOutputForStatefulOp`: every stateful
logical and physical operator widens its declared `output` to fully nullable.
- (c) `WidenStatefulOperatorAttributeNullability`: an optimizer rule that
widens `AttributeReference`s inside stateful ops' internal expressions and
propagates upward through ancestor expressions.
With the above fix, we aim to ensure the state schema to be "fully" nullable
(top level column, nested column, and collection types) regardless of the input
schema, and the output schema of the stateful operator to be also "fully"
nullable as well. The change of output schema for stateful operator is
necessary, because even if the input schema is non-nullable, state can produce
the null value, hence the output can be nullable.
### Why are the changes needed?
This has been a long standing issue of streaming engine vs Query Optimizer.
By the nature of streaming query, the query is meant to be long-running, in
many cases spans to multiple Spark versions. Also, the logical plan is not
always the same across batches (e.g. there are multiple stream sources and one
of the source does not have a new data at batch N). This puts the streaming
query to be affected by analyzer and optimizer.
The state schema of stateful operator is mostly determined by the input
schema of the stateful operator, and nullability isn't an exception. If the
input schema has a nullable column, state schema would have a nullable column.
Vice versa with non-nullable column.
For Query Optimizer, one of the optimizations is to flip the nullability,
say, nullable to non-nullable if appropriate. This can be done directly or
indirectly, and the most problematic case is when the optimization is applied
"selectively".
The one of easy example is the elimination of Union: for the streaming query
with multiple streams using Union, batch N could have one stream be non-empty
while another stream to be empty. For that case,`PropagateEmptyRelation` can
drop empty `Union` branches, causing a per-column nullability flip that
propagates into a stateful operator's state schema across microbatches or
restarts. This causes either `STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE` on restart
or a codegen NPE when state-restored rows carry nulls in columns declared
non-nullable.
### Does this PR introduce _any_ user-facing change?
No user-visible behavior change for new queries (all stateful operator
outputs become nullable, which is semantically correct). Existing queries keep
their original behavior via the offset log gate.
### How was this patch tested?
New `StreamingStatefulOperatorNullabilityDriftSuite` covering:
- New-query path: Union-branch-drop restart scenarios for aggregate,
dropDuplicates, dropDuplicatesWithinWatermark.
- Codegen NPE regression with struct grouping keys.
- Existing-query path: widening forced off still triggers schema mismatch.
- Rule-level: scope check (non-stateful subtrees skipped).
- Helper-level: `deepWidenAttribute` recursion into nested types.
### Was this patch authored or co-authored using generative AI tooling?
Yes. Generated-by: Claude 4.7 Opus
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]