HeartSaVioR commented on a change in pull request #28707:
URL: https://github.com/apache/spark/pull/28707#discussion_r434226399



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala
##########
@@ -94,6 +110,28 @@ abstract class StreamingAggregationStateManagerBaseImpl(
     // discard and don't convert values to avoid computation
     store.getRange(None, None).map(_.key)
   }
+
+  override def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): 
Unit = {
+    if (checkFormat && SQLConf.get.getConf(
+        SQLConf.STREAMING_STATE_FORMAT_CHECK_ENABLED) && row != null) {
+      if (schema.fields.length != row.numFields) {

Review comment:
       This method exposes implementation details of UnsafeRow directly. Could 
we please let UnsafeRow have such check method? UnsafeRow itself is aware of 
data types so the check method can receive the list of data types and do the 
assertion by its own.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1545,6 +1545,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_STATE_FORMAT_CHECK_ENABLED =

Review comment:
       This is misleading - we're only detecting the case from streaming 
aggregation.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala
##########
@@ -77,13 +82,24 @@ object StreamingAggregationStateManager extends Logging {
   }
 }
 
+/**
+ * An exception thrown when an invalid UnsafeRow is detected.
+ */
+class InvalidUnsafeRowException
+  extends SparkException("The UnsafeRow format is invalid. This may happen 
when using the old " +
+    "version or broken checkpoint file. To resolve this problem, you can try 
to restart the " +

Review comment:
       I'm not sure I understand with the possible root causes and the proposed 
solutions. The problem comes either schema is incompatible (probably due to the 
change of the query, or change of the underlying aggregation function) or row 
is corrupted, which any solution described here can not resolve.
   
   "Old version" here is ambiguous, because there's another semantic of 
"version" here, state format, which is not expected to introduce such 
incompatible format issue. Did you see the case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to