HeartSaVioR commented on a change in pull request #28707: URL: https://github.com/apache/spark/pull/28707#discussion_r434226399
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala ########## @@ -94,6 +110,28 @@ abstract class StreamingAggregationStateManagerBaseImpl( // discard and don't convert values to avoid computation store.getRange(None, None).map(_.key) } + + override def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit = { + if (checkFormat && SQLConf.get.getConf( + SQLConf.STREAMING_STATE_FORMAT_CHECK_ENABLED) && row != null) { + if (schema.fields.length != row.numFields) { Review comment: This method exposes implementation details of UnsafeRow directly. Could we please let UnsafeRow have such check method? UnsafeRow itself is aware of data types so the check method can receive the list of data types and do the assertion by its own. ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ########## @@ -1545,6 +1545,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_STATE_FORMAT_CHECK_ENABLED = Review comment: This is misleading - we're only detecting the case from streaming aggregation. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala ########## @@ -77,13 +82,24 @@ object StreamingAggregationStateManager extends Logging { } } +/** + * An exception thrown when an invalid UnsafeRow is detected. + */ +class InvalidUnsafeRowException + extends SparkException("The UnsafeRow format is invalid. This may happen when using the old " + + "version or broken checkpoint file. To resolve this problem, you can try to restart the " + Review comment: I'm not sure I understand with the possible root causes and the proposed solutions. The problem comes either schema is incompatible (probably due to the change of the query, or change of the underlying aggregation function) or row is corrupted, which any solution described here can not resolve. "Old version" here is ambiguous, because there's another semantic of "version" here, state format, which is not expected to introduce such incompatible format issue. Did you see the case? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org