HeartSaVioR commented on code in PR #49751:
URL: https://github.com/apache/spark/pull/49751#discussion_r1944051166


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -150,7 +150,7 @@ case class TransformWithStateExec(
       Some(NoPrefixKeyStateEncoderSpec(keySchema)))
 
     val columnFamilySchemas = getDriverProcessorHandle()
-      .getColumnFamilySchemas(setNullableFields) ++
+      .getColumnFamilySchemas(false) ++

Review Comment:
   Are we going to ignore the param and just put true/false "differently" 
between Scala and Python? What's the rationale?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -150,7 +150,7 @@ case class TransformWithStateExec(
       Some(NoPrefixKeyStateEncoderSpec(keySchema)))
 
     val columnFamilySchemas = getDriverProcessorHandle()
-      .getColumnFamilySchemas(setNullableFields) ++
+      .getColumnFamilySchemas(false) ++

Review Comment:
   Also nit: use named parameter for boolean param



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -4669,6 +4669,13 @@
     ],
     "sqlState" : "42K06"
   },
+  "STATE_STORE_SCHEMA_MUST_BE_NULLABLE" : {
+    "message" : [
+      "If schema evolution is enabled, all the fields in the schema for column 
family <columnFamilyName> must be nullable.",

Review Comment:
   Shall we be very specific about error class and the message? 
   
   * Is it only happening when schema evolution is enabled, or it's bound to 
Avro encoding? 
   * Is it only bound to TWS? Note that this will be a breaking change for 
other stateful operators.
   
   This generic error class and message leads confusion that this is applied 
for all stateful operators and the encoding type.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:
##########
@@ -117,7 +117,7 @@ case class TransformWithStateInPandasExec(
   override def getColFamilySchemas(
       setNullableFields: Boolean
   ): Map[String, StateStoreColFamilySchema] = {
-    driverProcessorHandle.getColumnFamilySchemas(setNullableFields)
+    driverProcessorHandle.getColumnFamilySchemas(true)

Review Comment:
   nit: use named param for boolean to easily get what is it.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -1815,7 +1815,6 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
           TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
         withTempDir { checkpointDir =>
           // When Avro is used, we want to set the StructFields to nullable
-          val shouldBeNullable = usingAvroEncoding()

Review Comment:
   nit: please also remove out of sync comment.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -4669,6 +4669,13 @@
     ],
     "sqlState" : "42K06"
   },
+  "STATE_STORE_SCHEMA_MUST_BE_NULLABLE" : {
+    "message" : [
+      "If schema evolution is enabled, all the fields in the schema for column 
family <columnFamilyName> must be nullable.",

Review Comment:
   I'd suggest not to generalize the error. I think it's better to be scoped to 
TWS. Let's not bind this to general state store error class.
   
   In future, we may want to migrate to Avro for other stateful operators. For 
other stateful operators, it is really uneasy for users to ensure nullability 
of the schema, hence it's high likely on us to fix this rather than disallowing 
and forcing users to ensure nullability.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:
##########
@@ -117,7 +117,7 @@ case class TransformWithStateInPandasExec(
   override def getColFamilySchemas(
       setNullableFields: Boolean
   ): Map[String, StateStoreColFamilySchema] = {
-    driverProcessorHandle.getColumnFamilySchemas(setNullableFields)
+    driverProcessorHandle.getColumnFamilySchemas(true)

Review Comment:
   Also, since we do not use the parameter, should the parameter be needed? Why 
this method has the param `setNullableFields` and 
DriverStatefulProcessorHandleImpl.getColumnFamilySchemas has the param 
`shouldCheckNullable`?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -1815,7 +1815,6 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
           TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
         withTempDir { checkpointDir =>
           // When Avro is used, we want to set the StructFields to nullable
-          val shouldBeNullable = usingAvroEncoding()

Review Comment:
   Btw, this worries me that disallowing non-nullable column does not only 
impact Avro but UnsafeRow as well. Do we have tests which will check we do not 
disallow non-nullable column for UnsafeRow encode?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -364,17 +364,20 @@ class DriverStatefulProcessorHandleImpl(timeMode: 
TimeMode, keyExprEnc: Expressi
   }
 
   def getColumnFamilySchemas(
-      setNullableFields: Boolean
+      shouldCheckNullable: Boolean
   ): Map[String, StateStoreColFamilySchema] = {
     val schemas = columnFamilySchemas.toMap
-    if (setNullableFields) {
-      schemas.map { case (colFamilyName, stateStoreColFamilySchema) =>
-        colFamilyName -> stateStoreColFamilySchema.copy(
-          valueSchema = stateStoreColFamilySchema.valueSchema.toNullable
-        )
+    schemas.map { case (colFamilyName, schema) =>
+      // assert that each field is nullable if schema evolution is enabled
+      schema.valueSchema.fields.foreach { field =>
+        if (!field.nullable && shouldCheckNullable) {
+          throw StateStoreErrors.stateStoreSchemaMustBeNullable(
+            schema.colFamilyName, schema.valueSchema.toString())
+        }
       }
-    } else {
-      schemas
+      colFamilyName -> schema.copy(

Review Comment:
   Is it intentional to change columns to be nullable regardless of 
`shouldCheckNullable` param?



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -1470,6 +1470,39 @@ def check_exception(error):
                     check_exception=check_exception,
                 )
 
+    def test_not_nullable_fails(self):

Review Comment:
   Why not having identical test in Scala as well? I don't see a new test 
verifying the error.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -1826,15 +1825,15 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
           val schema0 = StateStoreColFamilySchema(
             "countState", 0,
             keySchema, 0,
-            new StructType().add("value", LongType, nullable = 
shouldBeNullable),

Review Comment:
   nit: Looks like there are also other columns which rely on the fact the 
default value of `nullable = true`. Let's be consistent, removing all explicit 
values or adding explicit nullable value for all columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to