cloud-fan commented on code in PR #56061:
URL: https://github.com/apache/spark/pull/56061#discussion_r3289376355


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WidenStatefulOperatorAttributeNullability.scala:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Shared helpers for the stateful-operator nullability fix. The fix has three
+ * independent components, all gated by
+ * [[SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT]] (pinned per-query via 
the
+ * offset log so existing queries keep their pre-fix behavior on restart):
+ *
+ *   - (a) `widenStateSchema`: explicit `asNullable` at every state-schema 
construction
+ *         site in each stateful physical exec.

Review Comment:
   Component (a) is described as applying "at every state-schema construction 
site in each stateful physical exec," but several execs are missing the 
explicit widening:
   
   - `FlatMapGroupsWithStateExec.validateAndMaybeEvolveStateSchema` 
(`FlatMapGroupsWithStateExec.scala` ~L203): `groupingAttributes.toStructType` 
is registered un-widened; and the two `StateStore.get` / 
`mapPartitionsWithStateStore` calls in `doExecute` (~L247-263) open state 
stores with the un-widened key schema.
   - `FlatMapGroupsInPandasWithStateExec` inherits the same base, so it has the 
same gap.
   - `TransformWithStateExec`: `getColFamilySchemas`'s `defaultSchema` 
(~L143-145), `validateAndMaybeEvolveStateSchema` (via 
`validateAndWriteStateSchema` at ~L380), and the `StateStore.get` / 
`mapPartitionsWithStateStore` calls (~L406-417, ~L428-435) all use 
`keyExpressions.toStructType` / `keyEncoder.schema` un-widened.
   - `TransformWithStateInPySparkExec`: same pattern.
   
   Grouping attributes are input-derived and subject to the same nullability 
drift the rest of the fix is preventing. Component (c) may incidentally widen 
the references via the logical-plan rewrite, but having component (a) skip 
these execs makes the defense-in-depth claim of the design false and leaves a 
real gap if (c) misses for any reason (rule excluded, unresolved subplan, 
etc.). Either add `widenStateSchema(...)` at these sites for consistency with 
`StateStoreSaveExec` / `BaseStreamingDeduplicateExec` / 
`StreamingSymmetricHashJoinExec`, or tighten the wording here to describe which 
execs are intentionally exempt and why.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3403,6 +3403,22 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT =
+    
buildConf("spark.sql.streaming.statefulOperator.alwaysNullableOutput.enabled")
+      .internal()
+      .doc("When true, every streaming stateful operator reports its output 
schema with " +
+        "nullable=true on all columns (including nested struct fields, array 
elements, and " +
+        "map values), the state schema is widened at every construction site, 
and the state " +
+        "schema compatibility checker ignores nullability for stateful 
operator schemas. " +

Review Comment:
   This claim is not accurate: `StateSchemaCompatibilityChecker` is not changed 
by this PR and does not ignore nullability — it still calls 
`DataType.equalsIgnoreNameAndCompatibleNullability` which rejects 
nullable→non-nullable narrowing. What actually happens is that this conf causes 
the schemas passed *to* the checker (both stored and new) to be widened 
beforehand, so the existing strict check trivially passes regardless of input 
nullability. Suggested rewording:
   
   ```suggestion
           "schema is widened at every construction site, so the existing state 
schema " +
           "compatibility check trivially passes regardless of input 
nullability. " +
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WidenStatefulOperatorAttributeNullability.scala:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Shared helpers for the stateful-operator nullability fix. The fix has three
+ * independent components, all gated by
+ * [[SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT]] (pinned per-query via 
the
+ * offset log so existing queries keep their pre-fix behavior on restart):
+ *
+ *   - (a) `widenStateSchema`: explicit `asNullable` at every state-schema 
construction
+ *         site in each stateful physical exec.
+ *   - (b) `widenOutputForStatefulOp`: a per-op `output` override on every 
stateful logical
+ *         and physical operator, used by the operator's `output` definition.
+ *   - (c) [[WidenStatefulOperatorAttributeNullability]] (defined below in 
this file): a
+ *         custom optimizer rule that widens `AttributeReference`s inside 
stateful ops'
+ *         internal expressions and propagates upward to ancestor expressions.
+ */
+object WidenStatefulOpNullability {
+
+  def isEnabled: Boolean =
+    SQLConf.get.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT)
+
+  /**
+   * Recursively widens an attribute to be fully nullable: outer `nullable = 
true` plus
+   * every nested `StructField.nullable`, `ArrayType.containsNull`, and
+   * `MapType.valueContainsNull` flipped to `true` via
+   * [[org.apache.spark.sql.types.DataType#asNullable]].
+   */
+  def deepWidenAttribute(a: Attribute): Attribute = a match {
+    case ref: AttributeReference =>
+      AttributeReference(
+        ref.name, ref.dataType.asNullable, nullable = true, ref.metadata)(
+        ref.exprId, ref.qualifier)
+    case other => other.withNullability(true)
+  }
+
+  /**
+   * Component (a): widens a state schema to fully nullable. Stateful physical 
execs apply
+   * this at every `validateAndMaybeEvolveStateSchema(...)` call site and every
+   * `mapPartitionsWith*StateStore(...)` call site. When the conf is off, 
returns the
+   * schema unchanged.
+   */
+  def widenStateSchema(schema: StructType): StructType =
+    if (isEnabled) schema.asNullable else schema
+
+  /**
+   * Component (b): wraps a stateful operator's `output` to be fully nullable. 
The caller
+   * is responsible for only calling this from within an `output` definition 
on a stateful
+   * operator; gating is handled here via [[isEnabled]].
+   */
+  def widenOutputForStatefulOp(base: Seq[Attribute]): Seq[Attribute] =
+    if (isEnabled) base.map(deepWidenAttribute) else base
+}
+
+/**
+ * Component (c) of the stateful-operator nullability fix: a custom optimizer 
rule that
+ * widens `AttributeReference`s inside streaming-stateful operators' internal 
expressions
+ * and propagates the widening upward to ancestor operators' expressions.
+ *
+ * The rule does NOT introduce any new logical or physical node. It is purely 
an
+ * attribute-rewrite pass:
+ *
+ *   1. At a stateful operator: rewrite every `AttributeReference` inside the 
operator's
+ *      internal expressions via 
[[WidenStatefulOpNullability#deepWidenAttribute]] whenever
+ *      the attribute's `exprId` matches one in the operator's own (already 
widened via
+ *      component (b)) `output`.
+ *
+ *   2. At non-stateful ancestor operators: rewrite `AttributeReference`s 
whose `exprId` is
+ *      in `children.flatMap(_.output)` (already widened thanks to component 
(b)).
+ *
+ * '''Scope.''' The walk only fires on nodes whose subtree contains a stateful 
operator.
+ *
+ * '''Ordering constraint.''' This rule must run AFTER every 
`UpdateAttributeNullability`
+ * invocation in both the main optimizer and AQE.
+ *
+ * '''Idempotence.''' [[WidenStatefulOpNullability#deepWidenAttribute]] is 
idempotent.
+ */
+object WidenStatefulOperatorAttributeNullability extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!conf.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT) ||
+        !plan.containsStatefulOperator) {
+      return plan
+    }
+    plan.resolveOperatorsUp {
+      case p if !p.resolved => p
+      case p: LeafNode => p
+      case p if !p.containsStatefulOperator => p
+      case p =>
+        val widenableExprIds: Set[ExprId] = (p.output ++ 
p.children.flatMap(_.output))

Review Comment:
   Because `widenableExprIds` always pulls from both `p.output` and *all* of 
`p.children.flatMap(_.output)`, when an operator has a mix of stateful and 
non-stateful children (e.g. a non-stream-stream `Join` above a streaming 
aggregate on one side and a batch source on the other), references to the 
non-stateful sibling's attributes are also deep-widened. The docstring above 
implies this happens only against attributes "already widened thanks to 
component (b)" — but the non-stateful sibling's attributes are not. The 
widening is always correctness-safe (nullable is a valid weakening), so this is 
a docs / over-widening concern, not a bug. Worth either restricting to children 
whose subtrees contain a stateful operator 
(`p.children.filter(_.containsStatefulOperator).flatMap(_.output)`), or 
acknowledging the over-widening in the comment so future readers don't expect 
the narrower behavior the docstring promises.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala:
##########
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwith
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability

Review Comment:
   Import out of order — 
`org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability` should sit 
alongside the other `org.apache.spark.sql.catalyst.*` imports near the top of 
the `org.apache.spark.*` block, not after `org.apache.spark.sql.streaming._`. 
Same issue in `TransformWithStateInPySparkExec.scala`, 
`FlatMapGroupsWithStateExec.scala`, `FlatMapGroupsInPandasWithStateExec.scala`, 
`StreamingSymmetricHashJoinExec.scala`, and `streamingLimits.scala`.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala:
##########
@@ -307,174 +443,48 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     verifyException(keySchema, valueSchemaWithCollation, keySchema, 
valueSchema,
       ignoreValueSchema = false)
   }
+}
 
-  private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): 
StructType = {
-    applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3")
-  }
+class StateSchemaCompatibilityCheckerWithNullabilityWideningDisabledSuite
+    extends StateSchemaCompatibilityCheckerTestMixin {
 
-  private def applyNewSchemaToNestedFieldInValue(newNestedSchema: StructType): 
StructType = {
-    applyNewSchemaToNestedField(valueSchema, newNestedSchema, "value3")
+  override protected def sparkConf: org.apache.spark.SparkConf = {
+    super.sparkConf.set(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key, 
"false")
   }

Review Comment:
   The only thing this suite changes vs the parent is setting 
`STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT=false`. But 
`StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema` doesn't 
read that conf — the schemas it receives are exactly what the test passes in, 
and no production widening helper is invoked from these tests. The four 
`storing nullable column into non-nullable column ...` tests and the two 
`changing the name of nested field ...` tests therefore pass identically with 
the conf at either value. The conf separation is cosmetic.
   
   Two follow-ups:
   1. The `changing the name of nested field ...` pair is unrelated to 
nullability — there's no reason for these to live in a 
"NullabilityWideningDisabled" suite. Move them back to the main suite.
   2. For the four nullability tests, the intent appears to be "these are no 
longer reachable in production with widening on," but the unit tests don't 
simulate that path — they exercise the checker in isolation. Either consolidate 
them back into the main suite (they still validate the unchanged checker 
behavior, which is worth keeping), or rework the setup so the conf actually has 
an observable effect (e.g. call through the production widening helpers in the 
test).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WidenStatefulOperatorAttributeNullability.scala:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Shared helpers for the stateful-operator nullability fix. The fix has three
+ * independent components, all gated by
+ * [[SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT]] (pinned per-query via 
the
+ * offset log so existing queries keep their pre-fix behavior on restart):
+ *
+ *   - (a) `widenStateSchema`: explicit `asNullable` at every state-schema 
construction
+ *         site in each stateful physical exec.
+ *   - (b) `widenOutputForStatefulOp`: a per-op `output` override on every 
stateful logical
+ *         and physical operator, used by the operator's `output` definition.
+ *   - (c) [[WidenStatefulOperatorAttributeNullability]] (defined below in 
this file): a
+ *         custom optimizer rule that widens `AttributeReference`s inside 
stateful ops'
+ *         internal expressions and propagates upward to ancestor expressions.
+ */
+object WidenStatefulOpNullability {
+
+  def isEnabled: Boolean =
+    SQLConf.get.getConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT)
+
+  /**
+   * Recursively widens an attribute to be fully nullable: outer `nullable = 
true` plus
+   * every nested `StructField.nullable`, `ArrayType.containsNull`, and
+   * `MapType.valueContainsNull` flipped to `true` via
+   * [[org.apache.spark.sql.types.DataType#asNullable]].
+   */
+  def deepWidenAttribute(a: Attribute): Attribute = a match {
+    case ref: AttributeReference =>
+      AttributeReference(
+        ref.name, ref.dataType.asNullable, nullable = true, ref.metadata)(
+        ref.exprId, ref.qualifier)
+    case other => other.withNullability(true)
+  }
+
+  /**
+   * Component (a): widens a state schema to fully nullable. Stateful physical 
execs apply
+   * this at every `validateAndMaybeEvolveStateSchema(...)` call site and every
+   * `mapPartitionsWith*StateStore(...)` call site. When the conf is off, 
returns the
+   * schema unchanged.
+   */
+  def widenStateSchema(schema: StructType): StructType =
+    if (isEnabled) schema.asNullable else schema
+
+  /**
+   * Component (b): wraps a stateful operator's `output` to be fully nullable. 
The caller
+   * is responsible for only calling this from within an `output` definition 
on a stateful
+   * operator; gating is handled here via [[isEnabled]].
+   */
+  def widenOutputForStatefulOp(base: Seq[Attribute]): Seq[Attribute] =
+    if (isEnabled) base.map(deepWidenAttribute) else base
+}
+
+/**
+ * Component (c) of the stateful-operator nullability fix: a custom optimizer 
rule that
+ * widens `AttributeReference`s inside streaming-stateful operators' internal 
expressions
+ * and propagates the widening upward to ancestor operators' expressions.
+ *
+ * The rule does NOT introduce any new logical or physical node. It is purely 
an
+ * attribute-rewrite pass:
+ *
+ *   1. At a stateful operator: rewrite every `AttributeReference` inside the 
operator's
+ *      internal expressions via 
[[WidenStatefulOpNullability#deepWidenAttribute]] whenever
+ *      the attribute's `exprId` matches one in the operator's own (already 
widened via
+ *      component (b)) `output`.
+ *
+ *   2. At non-stateful ancestor operators: rewrite `AttributeReference`s 
whose `exprId` is
+ *      in `children.flatMap(_.output)` (already widened thanks to component 
(b)).

Review Comment:
   These two bullets describe a split — (1) "at a stateful operator" matches 
against "the operator's own ... `output`", (2) "at non-stateful ancestor 
operators" matches against `children.flatMap(_.output)`. But the implementation 
below uses the same union `(p.output ++ p.children.flatMap(_.output))` for 
every node it visits, with no branch on `isStateful`. Either rewrite this 
section to describe the actual uniform behavior, or change the code to take 
different exprId sources for the two cases (the more conservative version would 
also help with the over-widening concern in the next comment).



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStatefulOperatorNullabilityDriftSuite.scala:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.analysis.WidenStatefulOpNullability
+import 
org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import 
org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker,
 StateStore}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
+
+/**
+ * Regression suite for stateful-operator nullability drift.
+ *
+ * Driver: `PropagateEmptyRelation` drops empty `Union` branches without a 
streaming
+ * guard, so the surviving branch's per-column nullability becomes the Union's
+ * nullability and propagates into a stateful operator above -- across 
microbatches or
+ * restarts.
+ *
+ * Coverage:
+ *   - New-query (default conf): originally-failing scenarios now complete 
cleanly.
+ *   - Existing-query (conf forced false): pre-fix behavior preserved verbatim.
+ *   - Helper invariant: `WidenStatefulOpNullability.deepWidenAttribute` 
recurses into
+ *     nested types.
+ */
+class StreamingStatefulOperatorNullabilityDriftSuite extends StreamTest {
+
+  import testImplicits._
+
+  private def buildTwoSources(): (MemoryStream[Int], MemoryStream[Int], 
DataFrame, DataFrame) = {
+    val inputA = MemoryStream[Int]
+    val inputB = MemoryStream[Int]
+
+    val dfA = inputA.toDF().select($"value".as("key"))
+    val dfB = inputB.toDF()
+      .select(when($"value" > Int.MinValue, $"value")
+        .otherwise(lit(null).cast("int"))
+        .as("key"))
+
+    (inputA, inputB, dfA, dfB)
+  }
+
+  private def buildTwoSourcesWithWatermark()
+      : (MemoryStream[Int], MemoryStream[Int], DataFrame, DataFrame) = {
+    val inputA = MemoryStream[Int]
+    val inputB = MemoryStream[Int]
+
+    val dfA = inputA.toDF()
+      .select($"value".as("key"),
+        current_timestamp().cast("timestamp").as("ts"))
+      .withWatermark("ts", "1 minute")
+    val dfB = inputB.toDF()
+      .select(when($"value" > Int.MinValue, $"value")
+        .otherwise(lit(null).cast("int")).as("key"),
+        current_timestamp().cast("timestamp").as("ts"))
+      .withWatermark("ts", "1 minute")
+
+    (inputA, inputB, dfA, dfB)
+  }
+
+  private def runUnionBranchDropRestart(
+      buildSources: () => (MemoryStream[Int], MemoryStream[Int], DataFrame, 
DataFrame),
+      buildQuery: (DataFrame, DataFrame) => DataFrame,
+      outputMode: OutputMode,
+      nullableToNonNullable: Boolean): Unit = {
+    withTempDir { checkpointDir =>
+      val checkpointPath = checkpointDir.getAbsolutePath
+
+      val (inputA, inputB, dfA, dfB) = buildSources()
+      val q = buildQuery(dfA, dfB)
+
+      if (nullableToNonNullable) {
+        testStream(q, outputMode)(
+          StartStream(checkpointLocation = checkpointPath),
+          MultiAddData(inputA, 1, 2, 3)(inputB, 4, 5),
+          ProcessAllAvailable(),
+          StopStream
+        )
+      } else {
+        testStream(q, outputMode)(
+          StartStream(checkpointLocation = checkpointPath),
+          AddData(inputA, 1, 2, 3),
+          ProcessAllAvailable(),
+          StopStream
+        )
+      }
+
+      assertJournaledStateSchemaAllNullable(checkpointPath)
+
+      if (nullableToNonNullable) {
+        testStream(q, outputMode)(
+          StartStream(checkpointLocation = checkpointPath),
+          AddData(inputA, 6),
+          ProcessAllAvailable()
+        )
+      } else {
+        testStream(q, outputMode)(
+          StartStream(checkpointLocation = checkpointPath),
+          MultiAddData(inputA, 6)(inputB, 7),
+          ProcessAllAvailable()
+        )
+      }
+    }
+  }
+
+  private def assertJournaledStateSchemaAllNullable(checkpointPath: String): 
Unit = {
+    val schemaFilePath = new Path(checkpointPath,
+      s"state/0/${StateStore.PARTITION_ID_TO_CHECK_SCHEMA}/_metadata/schema")
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val fm = CheckpointFileManager.create(schemaFilePath, hadoopConf)
+    val inStream = fm.open(schemaFilePath)
+    try {
+      val schemas = StateSchemaCompatibilityChecker.readSchemaFile(inStream)
+      assert(schemas.nonEmpty, "expected at least one persisted state column 
family schema")
+      schemas.foreach { s =>
+        assertSchemaAllNullable(s.keySchema, s"key schema for col family 
${s.colFamilyName}")
+        assertSchemaAllNullable(s.valueSchema, s"value schema for col family 
${s.colFamilyName}")
+      }
+    } finally inStream.close()
+  }
+
+  private def assertSchemaAllNullable(schema: StructType, label: String): Unit 
= {
+    schema.fields.foreach { f =>
+      assert(f.nullable, s"$label: field ${f.name} should be nullable")
+      assertDataTypeAllNullable(f.dataType, s"$label.${f.name}")
+    }
+  }
+
+  private def assertDataTypeAllNullable(dataType: DataType, label: String): 
Unit = dataType match {
+    case s: StructType => assertSchemaAllNullable(s, label)
+    case ArrayType(elementType, containsNull) =>
+      assert(containsNull, s"$label: array element should be nullable")
+      assertDataTypeAllNullable(elementType, s"$label[]")
+    case MapType(keyType, valueType, valueContainsNull) =>
+      assert(valueContainsNull, s"$label: map value should be nullable")
+      assertDataTypeAllNullable(keyType, s"$label.key")
+      assertDataTypeAllNullable(valueType, s"$label.value")
+    case _ =>
+  }
+
+  test("streaming aggregate: non-nullable -> nullable widening remains 
restart-compatible") {
+    runUnionBranchDropRestart(
+      buildSources = () => buildTwoSources(),
+      buildQuery = (dfA, dfB) => dfA.union(dfB).groupBy($"key").count(),
+      outputMode = OutputMode.Update(),
+      nullableToNonNullable = false)
+  }
+
+  test("streaming aggregate: nullable -> non-nullable narrowing remains 
restart-compatible") {
+    runUnionBranchDropRestart(
+      buildSources = () => buildTwoSources(),
+      buildQuery = (dfA, dfB) => dfA.union(dfB).groupBy($"key").count(),
+      outputMode = OutputMode.Update(),
+      nullableToNonNullable = true)
+  }
+
+  test("streaming dropDuplicates: non-nullable -> nullable widening remains 
restart-compatible") {
+    runUnionBranchDropRestart(
+      buildSources = () => buildTwoSources(),
+      buildQuery = (dfA, dfB) => dfA.union(dfB).dropDuplicates(Seq("key")),
+      outputMode = OutputMode.Append(),
+      nullableToNonNullable = false)
+  }
+
+  test("streaming dropDuplicatesWithinWatermark: " +
+    "non-nullable -> nullable widening remains restart-compatible") {
+    runUnionBranchDropRestart(
+      buildSources = () => buildTwoSourcesWithWatermark(),
+      buildQuery = (dfA, dfB) => 
dfA.union(dfB).dropDuplicatesWithinWatermark(Seq("key")),
+      outputMode = OutputMode.Append(),
+      nullableToNonNullable = false)
+  }
+
+  test("streaming aggregate (Complete mode): no codegen NPE on state-restored 
null " +
+    "struct grouping key after fix") {
+    import org.apache.spark.sql.functions.struct
+
+    def mkQuery(inNullableK: MemoryStream[Int], inNonNullK: 
MemoryStream[Int]): DataFrame = {
+      val dfNullable = inNullableK.toDF()
+        .select(
+          when($"value" > 0, struct($"value".as("v")))
+            .otherwise(lit(null).cast("struct<v:int>"))
+            .as("key"),
+          lit(1).as("metric"))
+
+      val dfNonNull = inNonNullK.toDF()
+        .select(
+          struct($"value".as("v")).as("key"),
+          lit(1).as("metric"))
+
+      dfNullable.union(dfNonNull)
+        .groupBy($"key")
+        .agg(sum($"metric").as("c"))
+        .select($"key.v".as("v"), $"c")
+    }
+
+    withTempDir { checkpointDir =>
+      withSQLConf(
+        SQLConf.STATE_SCHEMA_CHECK_ENABLED.key -> "false",
+        SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED.key -> "false",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+        val inNullable = MemoryStream[Int]
+        val inNonNull = MemoryStream[Int]
+        val q = mkQuery(inNullable, inNonNull)
+        testStream(q, OutputMode.Complete())(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inNullable, 0),
+          ProcessAllAvailable(),
+          StopStream
+        )
+
+        testStream(q, OutputMode.Complete())(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inNonNull, 1),
+          ProcessAllAvailable()
+        )
+      }
+    }
+  }
+
+  test("streaming aggregate: with widening forced off (existing-query path), " 
+
+    "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE still triggers on restart") {
+    withTempDir { checkpointDir =>
+      withSQLConf(
+        SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> "false") {
+        val (inputA, inputB, dfA, dfB) = buildTwoSources()
+        val aggregated = dfA.union(dfB).groupBy($"key").count()
+        testStream(aggregated, OutputMode.Update())(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputA, 1, 2, 3),
+          ProcessAllAvailable(),
+          StopStream
+        )
+
+        inputA.addData(4)
+        inputB.addData(5)
+
+        val ex = intercept[SparkUnsupportedOperationException] {
+          testStream(aggregated, OutputMode.Update())(
+            StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+            ProcessAllAvailable()
+          )
+        }
+
+        checkError(
+          ex,
+          condition = 
"STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE.NULLABILITY_CHANGED",
+          parameters = Map(
+            "changedFields" -> ".*",
+            "storedKeySchema" -> ".*",
+            "newKeySchema" -> ".*"),
+          matchPVals = true
+        )
+      }
+    }
+  }
+
+  test("rule skips non-stateful nodes whose subtree has no stateful operator") 
{
+    import 
org.apache.spark.sql.catalyst.analysis.WidenStatefulOperatorAttributeNullability
+    import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
NamedExpression}
+    import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
LocalRelation, Project}
+    import org.apache.spark.sql.types.IntegerType
+
+    withSQLConf(SQLConf.STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> 
"true") {

Review Comment:
   `STATEFUL_OPERATOR_ALWAYS_NULLABLE_OUTPUT.key -> "true"` is redundant — 
`true` is the default. Either drop the `withSQLConf` wrapper, or change to 
`"false"` and assert the rule no-ops (which would be a useful additional case).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to