[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-05 Thread GitBox


HeartSaVioR commented on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-639873447


   Sorry my comment was edited so you may be missed the content, but it is also 
a sort of pointing out of "pinpoint" - do you think your approach works with 
other state store providers as well? The root cause isn't bound to the 
implementation of state store provider but this patch is only addressing in 
HDFS state store provider.
   
   I guess you're trying to find how it can be done less frequently, first time 
the state is loaded from the file, which is optimal. While I think it can be 
even done without binding to the state store provider implementation if we 
really need it, have we measured the actual overhead? If the overhead turns out 
to be trivial then it won't be matter we run validation check for each batch. 
It sounds to be sub-optimal, but the overhead would be trivial.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-04 Thread GitBox


HeartSaVioR commented on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-639239790


   My alternative with wrapping state store is something like below:
   
   ```
 class RowValidatingStateStore(
 underlying: StateStore,
 keyType: Seq[DataType],
 valueType: Seq[DataType]) extends StateStore {
   private var isValidated = false
   
   override def get(key: UnsafeRow): UnsafeRow = {
 val value = underlying.get(key)
 if (!isValidated) {
   validateRow(value)
   isValidated = true
 }
 value
   }
   
   override def id: StateStoreId = underlying.id
   override def version: Long = underlying.version
   override def put(key: UnsafeRow, value: UnsafeRow): Unit = 
underlying.put(key, value)
   override def remove(key: UnsafeRow): Unit = underlying.remove(key)
   override def commit(): Long = underlying.commit()
   override def abort(): Unit = underlying.abort()
   override def iterator(): Iterator[UnsafeRowPair] = underlying.iterator()
   override def metrics: StateStoreMetrics = underlying.metrics
   override def hasCommitted: Boolean = underlying.hasCommitted
   
   private def validateRow(row: UnsafeRow): Unit = {
 // TODO: call util method with row and schema to validate
   }
 }
   
 def get(...): StateStore = {
   require(version >= 0)
   val storeProvider = loadedProviders.synchronized {
 ...
   }
   // TODO: add if statement to see whether it should wrap state store or 
not
   new RowValidatingStateStore(storeProvider.getStore(version, keySchema, 
valueSchema))
 }
   ```
   
   The example code only checks in get operation, which is insufficient to 
check "key" row in state. That said, iterator approach still provides more 
possibility of validation, though the validation of unsafe row itself doesn't 
have enough coverage of checking various incompatibility issues (Definitely we 
should have another guards as well) so that's a sort of OK to only cover value 
side.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-04 Thread GitBox


HeartSaVioR commented on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-639200645


   > @HeartSaVioR After taking a further look. Instead of dealing with the 
iterator, how about adding the invalidation for all state store operations in 
StateStoreProvider? Since we can get the key/value row during load map. WDYT?
   
   It would be nice to see the proposed change by code to avoid 
misunderstanding, like I proposed in previous comment. (anything including 
commit in your fork or text comment is OK) I'll try out my alternative 
(wrapping State Store) and show the code change. Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-03 Thread GitBox


HeartSaVioR commented on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-638644781


   > What are other stateful operations that use unsafe row? I think we can 
apply the check everywhere.
   
   State store itself stores UnsafeRow, hence it applies to everywhere in 
stateful operations. I'd propose to do it like 
https://github.com/apache/spark/pull/28707#issuecomment-637926400 instead of 
fixing everywhere.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-03 Thread GitBox


HeartSaVioR commented on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-638522162


   And I think SPARK-27237 doesn't require a sort of "future-proof" which is 
preferably be done with a thing with risk - it doesn't touch the existing part 
of checkpoint and simply put the schema information into a new file. If we find 
a better way to pack the schema information into the checkpoint, we can simply 
discard/ignore the file or craft a logic to migrate smoothly. No risk on 
rolling back in future.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-03 Thread GitBox


HeartSaVioR commented on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-638520229


   Will this be included to Spark 3.0.0? If this is to unblock SPARK-28067 to 
be included to Spark 3.0.0 then it's OK to consider this first, but if this 
plans to go to Spark 3.1 then I'm not sure about the priority - are all of you 
aware that the PR for SPARK-27237 was submitted more than a year ago, and still 
be considered as later?
   
   I still don't get why the proposal is restricting its usage to streaming 
aggregation, whereas the mechanism is validation of the UnsafeRow which can be 
applied to all stateful operations. Let's not to pinpoint the problem we've 
just seen.
   
   Also from my side the overhead of the validation logic looks to be trivial 
compared to the operations stateful operators will take - we don't do the 
validation for all rows, even don't sample, just the first one. Unless we have 
a chance to bring a show-stopper bug in the validation logic (so that we need 
to provide the way to disable the validation), I'm not seeing the needs of new 
configuration.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-02 Thread GitBox


HeartSaVioR commented on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-637926400


   And personally I'd rather do the check in StateStore with additional 
overhead of reading "a" row in prior to achieve the same in all stateful 
operations. 
   
   ```
 /** Get or create a store associated with the id. */
 def get(
 storeProviderId: StateStoreProviderId,
 keySchema: StructType,
 valueSchema: StructType,
 indexOrdinal: Option[Int],
 version: Long,
 storeConf: StateStoreConf,
 hadoopConf: Configuration): StateStore = {
   require(version >= 0)
   val storeProvider = loadedProviders.synchronized {
 startMaintenanceIfNeeded()
 val provider = loadedProviders.getOrElseUpdate(
   storeProviderId,
   StateStoreProvider.createAndInit(
 storeProviderId.storeId, keySchema, valueSchema, indexOrdinal, 
storeConf, hadoopConf)
 )
 reportActiveStoreInstance(storeProviderId)
 provider
   }
   val store = storeProvider.getStore(version)
   val iter = store.iterator()
   if (iter.nonEmpty) {
 val rowPair = iter.next()
 val key = rowPair.key
 val value = rowPair.value
 // TODO: validate key with key schema
 // TODO: validate value with value schema
   }
   store
 }
   ```
   
   For streaming aggregations it initializes "two" state stores so the overhead 
goes to "two" rows, but I don't think the overhead matters much.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org