[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-05 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-639873447


   Sorry my comment was edited so you may be missed the content, but it is also 
a sort of pointing out for "pinpointing" - do you think your approach works 
with other state store providers as well? The root cause isn't bound to the 
implementation of state store provider but this patch is only addressing HDFS 
state store provider.
   
   I guess you're trying to find how it can be done less frequently, first time 
the state is loaded from the file, which is optimal. While I think it can be 
even done without binding to the state store provider implementation if we 
really need it (check only once when the provider instance is created), have we 
measured the actual overhead? If the overhead turns out to be trivial then it 
won't be matter we run validation check for each batch. It sounds to be 
sub-optimal, but the overhead would be trivial.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-05 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-639873447


   Sorry my comment was edited so you may be missed the content, but it is also 
a sort of pointing out for "pinpointing" - do you think your approach works 
with other state store providers as well? The root cause isn't bound to the 
implementation of state store provider but this patch is only addressing HDFS 
state store provider.
   
   I guess you're trying to find how it can be done less frequently, first time 
the state is loaded from the file, which is optimal. While I think it can be 
even done without binding to the state store provider implementation if we 
really need it, have we measured the actual overhead? If the overhead turns out 
to be trivial then it won't be matter we run validation check for each batch. 
It sounds to be sub-optimal, but the overhead would be trivial.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-05 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-639873447


   Sorry my comment was edited so you may be missed the content, but it is also 
a sort of pointing out of "pinpoint" - do you think your approach works with 
other state store providers as well? The root cause isn't bound to the 
implementation of state store provider but this patch is only addressing HDFS 
state store provider.
   
   I guess you're trying to find how it can be done less frequently, first time 
the state is loaded from the file, which is optimal. While I think it can be 
even done without binding to the state store provider implementation if we 
really need it, have we measured the actual overhead? If the overhead turns out 
to be trivial then it won't be matter we run validation check for each batch. 
It sounds to be sub-optimal, but the overhead would be trivial.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-04 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-639239790


   My alternative with wrapping state store is something like below:
   
   ```
 class RowValidatingStateStore(
 underlying: StateStore,
 keyType: Seq[DataType],
 valueType: Seq[DataType]) extends StateStore {
   private var isValidated = false
   
   override def get(key: UnsafeRow): UnsafeRow = {
 val value = underlying.get(key)
 if (!isValidated) {
   validateRow(value, valueType)
   isValidated = true
 }
 value
   }
   
   override def id: StateStoreId = underlying.id
   override def version: Long = underlying.version
   override def put(key: UnsafeRow, value: UnsafeRow): Unit = 
underlying.put(key, value)
   override def remove(key: UnsafeRow): Unit = underlying.remove(key)
   override def commit(): Long = underlying.commit()
   override def abort(): Unit = underlying.abort()
   override def iterator(): Iterator[UnsafeRowPair] = underlying.iterator()
   override def metrics: StateStoreMetrics = underlying.metrics
   override def hasCommitted: Boolean = underlying.hasCommitted
   
   private def validateRow(row: UnsafeRow, rowDataType: Seq[DataType]): 
Unit = {
 // TODO: call util method with row and data type to validate - note 
that it can only check with value schema
   }
 }
   
 def get(...): StateStore = {
   require(version >= 0)
   val storeProvider = loadedProviders.synchronized {
 ...
   }
   // TODO: add if statement to see whether it should wrap state store or 
not
   new RowValidatingStateStore(storeProvider.getStore(version, keySchema, 
valueSchema))
 }
   ```
   
   The example code only checks in get operation, which is insufficient to 
check "key" row in state. That said, iterator approach still provides more 
possibility of validation, though the validation of unsafe row itself doesn't 
have enough coverage of checking various incompatibility issues (Definitely we 
should have another guards as well) so that's a sort of OK to only cover value 
side.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-04 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-639200645


   > @HeartSaVioR After taking a further look. Instead of dealing with the 
iterator, how about adding the invalidation for all state store operations in 
StateStoreProvider? Since we can get the key/value row during load map. WDYT?
   
   It would be nice to see the proposed change by code to avoid 
misunderstanding, like I proposed in previous comment. (anything including 
commit in your fork or text comment is OK) I'll try out my alternative 
(wrapping State Store) and show the code change. Thanks!
   
   EDIT: Please deal with interface whenever possible - there're different 
implementations of state store providers and we should avoid sticking to the 
specific implementation.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-03 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-638520229


   Will this be included to Spark 3.0.0? If this is to unblock SPARK-28067 to 
be included to Spark 3.0.0 then it's OK to consider this first, but if this 
plans to go to Spark 3.1 then I'm not sure about the priority - are all of you 
aware that the PR for SPARK-27237 was submitted more than a year ago, and still 
be considered as later?
   
   I still don't get why the proposal is restricting its usage to streaming 
aggregation, whereas the mechanism is a validation of the UnsafeRow which can 
be applied to all stateful operations. Let's not to pinpoint the problem we've 
just seen.
   
   Also from my side the overhead of the validation logic looks to be trivial 
compared to the operations stateful operators will take - we don't do the 
validation for all rows, even don't sample, just the first one. Unless we have 
a chance to bring a show-stopper bug in the validation logic (so that we need 
to provide the way to disable the validation), I'm not seeing the needs of new 
configuration.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-02 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-637926400


   And personally I'd rather do the check in StateStore with additional 
overhead of reading "a" row in prior to achieve the same in all stateful 
operations. 
   
   ```
 /** Get or create a store associated with the id. */
 def get(
 storeProviderId: StateStoreProviderId,
 keySchema: StructType,
 valueSchema: StructType,
 indexOrdinal: Option[Int],
 version: Long,
 storeConf: StateStoreConf,
 hadoopConf: Configuration): StateStore = {
   require(version >= 0)
   val storeProvider = loadedProviders.synchronized {
 startMaintenanceIfNeeded()
 val provider = loadedProviders.getOrElseUpdate(
   storeProviderId,
   StateStoreProvider.createAndInit(
 storeProviderId.storeId, keySchema, valueSchema, indexOrdinal, 
storeConf, hadoopConf)
 )
 reportActiveStoreInstance(storeProviderId)
 provider
   }
   val store = storeProvider.getStore(version)
   val iter = store.iterator()
   if (iter.nonEmpty) {
 val rowPair = iter.next()
 val key = rowPair.key
 val value = rowPair.value
 // TODO: validate key with key schema
 // TODO: validate value with value schema
   }
   store
 }
   ```
   
   For streaming aggregations it initializes "two" state stores so the overhead 
goes to "two" rows, but I don't think the overhead matters much.
   
   If we really concern about the overhead of making additional "iterator" or 
do the validation on early phase (where it might be possible the state store 
may not be accessed), just have a StateStore wrapper wrapping `store` and do 
the same - only validate once for the first "get". In either way, we never need 
to restrict the functionality to the streaming aggregation.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-02 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-637926400


   And personally I'd rather do the check in StateStore with additional 
overhead of reading "a" row in prior to achieve the same in all stateful 
operations. 
   
   ```
 /** Get or create a store associated with the id. */
 def get(
 storeProviderId: StateStoreProviderId,
 keySchema: StructType,
 valueSchema: StructType,
 indexOrdinal: Option[Int],
 version: Long,
 storeConf: StateStoreConf,
 hadoopConf: Configuration): StateStore = {
   require(version >= 0)
   val storeProvider = loadedProviders.synchronized {
 startMaintenanceIfNeeded()
 val provider = loadedProviders.getOrElseUpdate(
   storeProviderId,
   StateStoreProvider.createAndInit(
 storeProviderId.storeId, keySchema, valueSchema, indexOrdinal, 
storeConf, hadoopConf)
 )
 reportActiveStoreInstance(storeProviderId)
 provider
   }
   val store = storeProvider.getStore(version)
   val iter = store.iterator()
   if (iter.nonEmpty) {
 val rowPair = iter.next()
 val key = rowPair.key
 val value = rowPair.value
 // TODO: validate key with key schema
 // TODO: validate value with value schema
   }
   store
 }
   ```
   
   For streaming aggregations it initializes "two" state stores so the overhead 
goes to "two" rows, but I don't think the overhead matters much.
   
   If we really concern about the overhead of making additional "iterator", 
just have a StateStore wrapper wrapping `store` and do the same - only validate 
once for the first "get". In either way, we never need to restrict the 
functionality to the streaming aggregation.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-02 Thread GitBox


HeartSaVioR edited a comment on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-637926400


   And personally I'd rather do the check in StateStore with additional 
overhead of reading "a" row in prior to achieve the same in all stateful 
operations. 
   
   ```
 /** Get or create a store associated with the id. */
 def get(
 storeProviderId: StateStoreProviderId,
 keySchema: StructType,
 valueSchema: StructType,
 indexOrdinal: Option[Int],
 version: Long,
 storeConf: StateStoreConf,
 hadoopConf: Configuration): StateStore = {
   require(version >= 0)
   val storeProvider = loadedProviders.synchronized {
 startMaintenanceIfNeeded()
 val provider = loadedProviders.getOrElseUpdate(
   storeProviderId,
   StateStoreProvider.createAndInit(
 storeProviderId.storeId, keySchema, valueSchema, indexOrdinal, 
storeConf, hadoopConf)
 )
 reportActiveStoreInstance(storeProviderId)
 provider
   }
   val store = storeProvider.getStore(version)
   val iter = store.iterator()
   if (iter.nonEmpty) {
 val rowPair = iter.next()
 val key = rowPair.key
 val value = rowPair.value
 // TODO: validate key with key schema
 // TODO: validate value with value schema
   }
   store
 }
   ```
   
   For streaming aggregations it initializes "two" state stores so the overhead 
goes to "two" rows, but I don't think the overhead matters much.
   
   If we really concern about the overhead, just have a StateStore wrapper 
wrapping `store` and do the same - only validate once for the first "get". In 
either way, we never need to restrict the functionality to the streaming 
aggregation.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org