HeartSaVioR commented on code in PR #49751:
URL: https://github.com/apache/spark/pull/49751#discussion_r1945987644
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -363,18 +363,36 @@ class DriverStatefulProcessorHandleImpl(timeMode:
TimeMode, keyExprEnc: Expressi
addTimerColFamily()
}
+ /**
+ * This method returns all column family schemas, and checks and enforces
nullability
+ * if need be. The nullability check and set is only set to true when Avro
is enabled.
+ * @param shouldCheckNullable Whether we need to check the nullability. This
is set to
+ * true when using Python, as this is the only
avenue through
+ * which users can set nullability
+ * @param shouldSetNullable Whether we need to set the fields as nullable.
This is set to
+ * true when using Scala, as case classes are set to
Review Comment:
> case classes are set to non-nullable by default.
I'm actually surprised and it sounds like a bug to me. (Sorry, you had to
handle Python and Scala differently due to this. My bad.)
What if you set `null` to any of fields in case class? Will it work, and if
it works, how?
If this is indeed a bug and we can fix that, then we can simplify things a
lot. I'm OK if you want to defer this, but definitely need to have follow up
ticket for this.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -149,8 +149,11 @@ case class TransformWithStateExec(
0, keyExpressions.toStructType, 0, DUMMY_VALUE_ROW_SCHEMA,
Some(NoPrefixKeyStateEncoderSpec(keySchema)))
+ // For Scala, the user can't explicitly set nullability on schema, so
there is
Review Comment:
Likewise I mentioned in other comment, it is not impossible to set
nullability on encoder (although I tend to agree most users won't). Let's not
make this be conditional.
Also, this is concerning me - if we are very confident that users would
never be able to set column to be nullable, why we need to change the schema as
we all know it has to be nullable? What we are worrying about if we just do the
same with Python?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala:
##########
@@ -145,6 +145,12 @@ object StateStoreErrors {
new StateStoreValueSchemaNotCompatible(storedValueSchema, newValueSchema)
}
+ def twsSchemaMustBeNullable(
Review Comment:
I think TWS deserves its own error collection class, but I agree this is out
of scope. Let's make a follow-up.
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5072,6 +5072,14 @@
],
"sqlState" : "42601"
},
+ "TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE" : {
+ "message" : [
+ "If schema evolution is enabled, all the fields in the schema for column
family <columnFamilyName> must be nullable",
Review Comment:
Do we think whichever is easier to understand, "using Avro" or "schema
evolution is enabled"?
I foresee the direction of using Avro for all stateful operators (unless
there is outstanding regression), and once we make Avro by default, this will
be confusing one to consume because they don't do anything for schema
evolution. IMO it is "indirect" information and they would probably try to
figure out how to disable schema evolution instead, without knowing that Avro
and schema evolution is coupled.
cc. @anishshri-db to hear his voice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]