cloud-fan commented on code in PR #56061: URL: https://github.com/apache/spark/pull/56061#discussion_r3289376355
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WidenStatefulOperatorAttributeNullability.scala: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Shared helpers for the stateful-operator nullability fix. The fix has three + * independent components, all gated by + * [[SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT]] (pinned per-query via the + * offset log so existing queries keep their pre-fix behavior on restart): + * + * - (a) `widenStateSchema`: explicit `asNullable` at every state-schema construction + * site in each stateful physical exec. Review Comment: Component (a) is described as applying "at every state-schema construction site in each stateful physical exec," but several execs are missing the explicit widening: - `FlatMapGroupsWithStateExec.validateAndMaybeEvolveStateSchema` (`FlatMapGroupsWithStateExec.scala` ~L203): `groupingAttributes.toStructType` is registered un-widened; and the two `StateStore.get` / `mapPartitionsWithStateStore` calls in `doExecute` (~L247-263) open state stores with the un-widened key schema. - `FlatMapGroupsInPandasWithStateExec` inherits the same base, so it has the same gap. - `TransformWithStateExec`: `getColFamilySchemas`'s `defaultSchema` (~L143-145), `validateAndMaybeEvolveStateSchema` (via `validateAndWriteStateSchema` at ~L380), and the `StateStore.get` / `mapPartitionsWithStateStore` calls (~L406-417, ~L428-435) all use `keyExpressions.toStructType` / `keyEncoder.schema` un-widened. - `TransformWithStateInPySparkExec`: same pattern. Grouping attributes are input-derived and subject to the same nullability drift the rest of the fix is preventing. Component (c) may incidentally widen the references via the logical-plan rewrite, but having component (a) skip these execs makes the defense-in-depth claim of the design false and leaves a real gap if (c) misses for any reason (rule excluded, unresolved subplan, etc.). Either add `widenStateSchema(...)` at these sites for consistency with `StateStoreSaveExec` / `BaseStreamingDeduplicateExec` / `StreamingSymmetricHashJoinExec`, or tighten the wording here to describe which execs are intentionally exempt and why. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -3403,6 +3403,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT = + buildConf("spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled") + .internal() + .doc("When true, every streaming stateful operator reports its output schema with " + + "nullable=true on all columns (including nested struct fields, array elements, and " + + "map values), the state schema is widened at every construction site, and the state " + + "schema compatibility checker ignores nullability for stateful operator schemas. " + Review Comment: This claim is not accurate: `StateSchemaCompatibilityChecker` is not changed by this PR and does not ignore nullability — it still calls `DataType.equalsIgnoreNameAndCompatibleNullability` which rejects nullable→non-nullable narrowing. What actually happens is that this conf causes the schemas passed *to* the checker (both stored and new) to be widened beforehand, so the existing strict check trivially passes regardless of input nullability. Suggested rewording: ```suggestion "schema is widened at every construction site, so the existing state schema " + "compatibility check trivially passes regardless of input nullability. " + ``` ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WidenStatefulOperatorAttributeNullability.scala: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Shared helpers for the stateful-operator nullability fix. The fix has three + * independent components, all gated by + * [[SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT]] (pinned per-query via the + * offset log so existing queries keep their pre-fix behavior on restart): + * + * - (a) `widenStateSchema`: explicit `asNullable` at every state-schema construction + * site in each stateful physical exec. + * - (b) `widenOutputForStatefulOp`: a per-op `output` override on every stateful logical + * and physical operator, used by the operator's `output` definition. + * - (c) [[WidenStatefulOperatorAttributeNullability]] (defined below in this file): a + * custom optimizer rule that widens `AttributeReference`s inside stateful ops' + * internal expressions and propagates upward to ancestor expressions. + */ +object WidenStatefulOpNullability { + + def isEnabled: Boolean = + SQLConf.get.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT) + + /** + * Recursively widens an attribute to be fully nullable: outer `nullable = true` plus + * every nested `StructField.nullable`, `ArrayType.containsNull`, and + * `MapType.valueContainsNull` flipped to `true` via + * [[org.apache.spark.sql.types.DataType#asNullable]]. + */ + def deepWidenAttribute(a: Attribute): Attribute = a match { + case ref: AttributeReference => + AttributeReference( + ref.name, ref.dataType.asNullable, nullable = true, ref.metadata)( + ref.exprId, ref.qualifier) + case other => other.withNullability(true) + } + + /** + * Component (a): widens a state schema to fully nullable. Stateful physical execs apply + * this at every `validateAndMaybeEvolveStateSchema(...)` call site and every + * `mapPartitionsWith*StateStore(...)` call site. When the conf is off, returns the + * schema unchanged. + */ + def widenStateSchema(schema: StructType): StructType = + if (isEnabled) schema.asNullable else schema + + /** + * Component (b): wraps a stateful operator's `output` to be fully nullable. The caller + * is responsible for only calling this from within an `output` definition on a stateful + * operator; gating is handled here via [[isEnabled]]. + */ + def widenOutputForStatefulOp(base: Seq[Attribute]): Seq[Attribute] = + if (isEnabled) base.map(deepWidenAttribute) else base +} + +/** + * Component (c) of the stateful-operator nullability fix: a custom optimizer rule that + * widens `AttributeReference`s inside streaming-stateful operators' internal expressions + * and propagates the widening upward to ancestor operators' expressions. + * + * The rule does NOT introduce any new logical or physical node. It is purely an + * attribute-rewrite pass: + * + * 1. At a stateful operator: rewrite every `AttributeReference` inside the operator's + * internal expressions via [[WidenStatefulOpNullability#deepWidenAttribute]] whenever + * the attribute's `exprId` matches one in the operator's own (already widened via + * component (b)) `output`. + * + * 2. At non-stateful ancestor operators: rewrite `AttributeReference`s whose `exprId` is + * in `children.flatMap(_.output)` (already widened thanks to component (b)). + * + * '''Scope.''' The walk only fires on nodes whose subtree contains a stateful operator. + * + * '''Ordering constraint.''' This rule must run AFTER every `UpdateAttributeNullability` + * invocation in both the main optimizer and AQE. + * + * '''Idempotence.''' [[WidenStatefulOpNullability#deepWidenAttribute]] is idempotent. + */ +object WidenStatefulOperatorAttributeNullability extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!conf.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT) || + !plan.containsStatefulOperator) { + return plan + } + plan.resolveOperatorsUp { + case p if !p.resolved => p + case p: LeafNode => p + case p if !p.containsStatefulOperator => p + case p => + val widenableExprIds: Set[ExprId] = (p.output ++ p.children.flatMap(_.output)) Review Comment: Because `widenableExprIds` always pulls from both `p.output` and *all* of `p.children.flatMap(_.output)`, when an operator has a mix of stateful and non-stateful children (e.g. a non-stream-stream `Join` above a streaming aggregate on one side and a batch source on the other), references to the non-stateful sibling's attributes are also deep-widened. The docstring above implies this happens only against attributes "already widened thanks to component (b)" — but the non-stateful sibling's attributes are not. The widening is always correctness-safe (nullable is a valid weakening), so this is a docs / over-widening concern, not a bug. Worth either restricting to children whose subtrees contain a stateful operator (`p.children.filter(_.containsStatefulOperator).flatMap(_.output)`), or acknowledging the over-widening in the comment so future readers don't expect the narrower behavior the docstring promises. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala: ########## @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.transformwith import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability Review Comment: Import out of order — `org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability` should sit alongside the other `org.apache.spark.sql.catalyst.*` imports near the top of the `org.apache.spark.*` block, not after `org.apache.spark.sql.streaming._`. Same issue in `TransformWithStateInPySparkExec.scala`, `FlatMapGroupsWithStateExec.scala`, `FlatMapGroupsInPandasWithStateExec.scala`, `StreamingSymmetricHashJoinExec.scala`, and `streamingLimits.scala`. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala: ########## @@ -307,174 +443,48 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { verifyException(keySchema, valueSchemaWithCollation, keySchema, valueSchema, ignoreValueSchema = false) } +} - private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = { - applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3") - } +class StateSchemaCompatibilityCheckerWithNullabilityWideningDisabledSuite + extends StateSchemaCompatibilityCheckerTestMixin { - private def applyNewSchemaToNestedFieldInValue(newNestedSchema: StructType): StructType = { - applyNewSchemaToNestedField(valueSchema, newNestedSchema, "value3") + override protected def sparkConf: org.apache.spark.SparkConf = { + super.sparkConf.set(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key, "false") } Review Comment: The only thing this suite changes vs the parent is setting `STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT=false`. But `StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema` doesn't read that conf — the schemas it receives are exactly what the test passes in, and no production widening helper is invoked from these tests. The four `storing nullable column into non-nullable column ...` tests and the two `changing the name of nested field ...` tests therefore pass identically with the conf at either value. The conf separation is cosmetic. Two follow-ups: 1. The `changing the name of nested field ...` pair is unrelated to nullability — there's no reason for these to live in a "NullabilityWideningDisabled" suite. Move them back to the main suite. 2. For the four nullability tests, the intent appears to be "these are no longer reachable in production with widening on," but the unit tests don't simulate that path — they exercise the checker in isolation. Either consolidate them back into the main suite (they still validate the unchanged checker behavior, which is worth keeping), or rework the setup so the conf actually has an observable effect (e.g. call through the production widening helpers in the test). ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WidenStatefulOperatorAttributeNullability.scala: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * Shared helpers for the stateful-operator nullability fix. The fix has three + * independent components, all gated by + * [[SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT]] (pinned per-query via the + * offset log so existing queries keep their pre-fix behavior on restart): + * + * - (a) `widenStateSchema`: explicit `asNullable` at every state-schema construction + * site in each stateful physical exec. + * - (b) `widenOutputForStatefulOp`: a per-op `output` override on every stateful logical + * and physical operator, used by the operator's `output` definition. + * - (c) [[WidenStatefulOperatorAttributeNullability]] (defined below in this file): a + * custom optimizer rule that widens `AttributeReference`s inside stateful ops' + * internal expressions and propagates upward to ancestor expressions. + */ +object WidenStatefulOpNullability { + + def isEnabled: Boolean = + SQLConf.get.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT) + + /** + * Recursively widens an attribute to be fully nullable: outer `nullable = true` plus + * every nested `StructField.nullable`, `ArrayType.containsNull`, and + * `MapType.valueContainsNull` flipped to `true` via + * [[org.apache.spark.sql.types.DataType#asNullable]]. + */ + def deepWidenAttribute(a: Attribute): Attribute = a match { + case ref: AttributeReference => + AttributeReference( + ref.name, ref.dataType.asNullable, nullable = true, ref.metadata)( + ref.exprId, ref.qualifier) + case other => other.withNullability(true) + } + + /** + * Component (a): widens a state schema to fully nullable. Stateful physical execs apply + * this at every `validateAndMaybeEvolveStateSchema(...)` call site and every + * `mapPartitionsWith*StateStore(...)` call site. When the conf is off, returns the + * schema unchanged. + */ + def widenStateSchema(schema: StructType): StructType = + if (isEnabled) schema.asNullable else schema + + /** + * Component (b): wraps a stateful operator's `output` to be fully nullable. The caller + * is responsible for only calling this from within an `output` definition on a stateful + * operator; gating is handled here via [[isEnabled]]. + */ + def widenOutputForStatefulOp(base: Seq[Attribute]): Seq[Attribute] = + if (isEnabled) base.map(deepWidenAttribute) else base +} + +/** + * Component (c) of the stateful-operator nullability fix: a custom optimizer rule that + * widens `AttributeReference`s inside streaming-stateful operators' internal expressions + * and propagates the widening upward to ancestor operators' expressions. + * + * The rule does NOT introduce any new logical or physical node. It is purely an + * attribute-rewrite pass: + * + * 1. At a stateful operator: rewrite every `AttributeReference` inside the operator's + * internal expressions via [[WidenStatefulOpNullability#deepWidenAttribute]] whenever + * the attribute's `exprId` matches one in the operator's own (already widened via + * component (b)) `output`. + * + * 2. At non-stateful ancestor operators: rewrite `AttributeReference`s whose `exprId` is + * in `children.flatMap(_.output)` (already widened thanks to component (b)). Review Comment: These two bullets describe a split — (1) "at a stateful operator" matches against "the operator's own ... `output`", (2) "at non-stateful ancestor operators" matches against `children.flatMap(_.output)`. But the implementation below uses the same union `(p.output ++ p.children.flatMap(_.output))` for every node it visits, with no branch on `isStateful`. Either rewrite this section to describe the actual uniform behavior, or change the code to take different exprId sources for the two cases (the more conservative version would also help with the over-widening concern in the next comment). ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStatefulOperatorNullabilityDriftSuite.scala: ########## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkUnsupportedOperationException +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability +import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker, StateStore} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} + +/** + * Regression suite for stateful-operator nullability drift. + * + * Driver: `PropagateEmptyRelation` drops empty `Union` branches without a streaming + * guard, so the surviving branch's per-column nullability becomes the Union's + * nullability and propagates into a stateful operator above -- across microbatches or + * restarts. + * + * Coverage: + * - New-query (default conf): originally-failing scenarios now complete cleanly. + * - Existing-query (conf forced false): pre-fix behavior preserved verbatim. + * - Helper invariant: `WidenStatefulOpNullability.deepWidenAttribute` recurses into + * nested types. + */ +class StreamingStatefulOperatorNullabilityDriftSuite extends StreamTest { + + import testImplicits._ + + private def buildTwoSources(): (MemoryStream[Int], MemoryStream[Int], DataFrame, DataFrame) = { + val inputA = MemoryStream[Int] + val inputB = MemoryStream[Int] + + val dfA = inputA.toDF().select($"value".as("key")) + val dfB = inputB.toDF() + .select(when($"value" > Int.MinValue, $"value") + .otherwise(lit(null).cast("int")) + .as("key")) + + (inputA, inputB, dfA, dfB) + } + + private def buildTwoSourcesWithWatermark() + : (MemoryStream[Int], MemoryStream[Int], DataFrame, DataFrame) = { + val inputA = MemoryStream[Int] + val inputB = MemoryStream[Int] + + val dfA = inputA.toDF() + .select($"value".as("key"), + current_timestamp().cast("timestamp").as("ts")) + .withWatermark("ts", "1 minute") + val dfB = inputB.toDF() + .select(when($"value" > Int.MinValue, $"value") + .otherwise(lit(null).cast("int")).as("key"), + current_timestamp().cast("timestamp").as("ts")) + .withWatermark("ts", "1 minute") + + (inputA, inputB, dfA, dfB) + } + + private def runUnionBranchDropRestart( + buildSources: () => (MemoryStream[Int], MemoryStream[Int], DataFrame, DataFrame), + buildQuery: (DataFrame, DataFrame) => DataFrame, + outputMode: OutputMode, + nullableToNonNullable: Boolean): Unit = { + withTempDir { checkpointDir => + val checkpointPath = checkpointDir.getAbsolutePath + + val (inputA, inputB, dfA, dfB) = buildSources() + val q = buildQuery(dfA, dfB) + + if (nullableToNonNullable) { + testStream(q, outputMode)( + StartStream(checkpointLocation = checkpointPath), + MultiAddData(inputA, 1, 2, 3)(inputB, 4, 5), + ProcessAllAvailable(), + StopStream + ) + } else { + testStream(q, outputMode)( + StartStream(checkpointLocation = checkpointPath), + AddData(inputA, 1, 2, 3), + ProcessAllAvailable(), + StopStream + ) + } + + assertJournaledStateSchemaAllNullable(checkpointPath) + + if (nullableToNonNullable) { + testStream(q, outputMode)( + StartStream(checkpointLocation = checkpointPath), + AddData(inputA, 6), + ProcessAllAvailable() + ) + } else { + testStream(q, outputMode)( + StartStream(checkpointLocation = checkpointPath), + MultiAddData(inputA, 6)(inputB, 7), + ProcessAllAvailable() + ) + } + } + } + + private def assertJournaledStateSchemaAllNullable(checkpointPath: String): Unit = { + val schemaFilePath = new Path(checkpointPath, + s"state/0/${StateStore.PARTITION_ID_TO_CHECK_SCHEMA}/_metadata/schema") + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = CheckpointFileManager.create(schemaFilePath, hadoopConf) + val inStream = fm.open(schemaFilePath) + try { + val schemas = StateSchemaCompatibilityChecker.readSchemaFile(inStream) + assert(schemas.nonEmpty, "expected at least one persisted state column family schema") + schemas.foreach { s => + assertSchemaAllNullable(s.keySchema, s"key schema for col family ${s.colFamilyName}") + assertSchemaAllNullable(s.valueSchema, s"value schema for col family ${s.colFamilyName}") + } + } finally inStream.close() + } + + private def assertSchemaAllNullable(schema: StructType, label: String): Unit = { + schema.fields.foreach { f => + assert(f.nullable, s"$label: field ${f.name} should be nullable") + assertDataTypeAllNullable(f.dataType, s"$label.${f.name}") + } + } + + private def assertDataTypeAllNullable(dataType: DataType, label: String): Unit = dataType match { + case s: StructType => assertSchemaAllNullable(s, label) + case ArrayType(elementType, containsNull) => + assert(containsNull, s"$label: array element should be nullable") + assertDataTypeAllNullable(elementType, s"$label[]") + case MapType(keyType, valueType, valueContainsNull) => + assert(valueContainsNull, s"$label: map value should be nullable") + assertDataTypeAllNullable(keyType, s"$label.key") + assertDataTypeAllNullable(valueType, s"$label.value") + case _ => + } + + test("streaming aggregate: non-nullable -> nullable widening remains restart-compatible") { + runUnionBranchDropRestart( + buildSources = () => buildTwoSources(), + buildQuery = (dfA, dfB) => dfA.union(dfB).groupBy($"key").count(), + outputMode = OutputMode.Update(), + nullableToNonNullable = false) + } + + test("streaming aggregate: nullable -> non-nullable narrowing remains restart-compatible") { + runUnionBranchDropRestart( + buildSources = () => buildTwoSources(), + buildQuery = (dfA, dfB) => dfA.union(dfB).groupBy($"key").count(), + outputMode = OutputMode.Update(), + nullableToNonNullable = true) + } + + test("streaming dropDuplicates: non-nullable -> nullable widening remains restart-compatible") { + runUnionBranchDropRestart( + buildSources = () => buildTwoSources(), + buildQuery = (dfA, dfB) => dfA.union(dfB).dropDuplicates(Seq("key")), + outputMode = OutputMode.Append(), + nullableToNonNullable = false) + } + + test("streaming dropDuplicatesWithinWatermark: " + + "non-nullable -> nullable widening remains restart-compatible") { + runUnionBranchDropRestart( + buildSources = () => buildTwoSourcesWithWatermark(), + buildQuery = (dfA, dfB) => dfA.union(dfB).dropDuplicatesWithinWatermark(Seq("key")), + outputMode = OutputMode.Append(), + nullableToNonNullable = false) + } + + test("streaming aggregate (Complete mode): no codegen NPE on state-restored null " + + "struct grouping key after fix") { + import org.apache.spark.sql.functions.struct + + def mkQuery(inNullableK: MemoryStream[Int], inNonNullK: MemoryStream[Int]): DataFrame = { + val dfNullable = inNullableK.toDF() + .select( + when($"value" > 0, struct($"value".as("v"))) + .otherwise(lit(null).cast("struct<v:int>")) + .as("key"), + lit(1).as("metric")) + + val dfNonNull = inNonNullK.toDF() + .select( + struct($"value".as("v")).as("key"), + lit(1).as("metric")) + + dfNullable.union(dfNonNull) + .groupBy($"key") + .agg(sum($"metric").as("c")) + .select($"key.v".as("v"), $"c") + } + + withTempDir { checkpointDir => + withSQLConf( + SQLConf.STATE_SCHEMA_CHECK_ENABLED.key -> "false", + SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key -> "false", + SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + val inNullable = MemoryStream[Int] + val inNonNull = MemoryStream[Int] + val q = mkQuery(inNullable, inNonNull) + testStream(q, OutputMode.Complete())( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inNullable, 0), + ProcessAllAvailable(), + StopStream + ) + + testStream(q, OutputMode.Complete())( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inNonNull, 1), + ProcessAllAvailable() + ) + } + } + } + + test("streaming aggregate: with widening forced off (existing-query path), " + + "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE still triggers on restart") { + withTempDir { checkpointDir => + withSQLConf( + SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> "false") { + val (inputA, inputB, dfA, dfB) = buildTwoSources() + val aggregated = dfA.union(dfB).groupBy($"key").count() + testStream(aggregated, OutputMode.Update())( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + AddData(inputA, 1, 2, 3), + ProcessAllAvailable(), + StopStream + ) + + inputA.addData(4) + inputB.addData(5) + + val ex = intercept[SparkUnsupportedOperationException] { + testStream(aggregated, OutputMode.Update())( + StartStream(checkpointLocation = checkpointDir.getAbsolutePath), + ProcessAllAvailable() + ) + } + + checkError( + ex, + condition = "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE.NULLABILITY_CHANGED", + parameters = Map( + "changedFields" -> ".*", + "storedKeySchema" -> ".*", + "newKeySchema" -> ".*"), + matchPVals = true + ) + } + } + } + + test("rule skips non-stateful nodes whose subtree has no stateful operator") { + import org.apache.spark.sql.catalyst.analysis.WidenStatefulOperatorAttributeNullability + import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression} + import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, Project} + import org.apache.spark.sql.types.IntegerType + + withSQLConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> "true") { Review Comment: `STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> "true"` is redundant — `true` is the default. Either drop the `withSQLConf` wrapper, or change to `"false"` and assert the rule no-ops (which would be a useful additional case). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
