[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
HeartSaVioR commented on pull request #28707: URL: https://github.com/apache/spark/pull/28707#issuecomment-639873447 Sorry my comment was edited so you may be missed the content, but it is also a sort of pointing out of "pinpoint" - do you think your approach works with other state store providers as well? The root cause isn't bound to the implementation of state store provider but this patch is only addressing in HDFS state store provider. I guess you're trying to find how it can be done less frequently, first time the state is loaded from the file, which is optimal. While I think it can be even done without binding to the state store provider implementation if we really need it, have we measured the actual overhead? If the overhead turns out to be trivial then it won't be matter we run validation check for each batch. It sounds to be sub-optimal, but the overhead would be trivial. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
HeartSaVioR commented on pull request #28707: URL: https://github.com/apache/spark/pull/28707#issuecomment-639239790 My alternative with wrapping state store is something like below: ``` class RowValidatingStateStore( underlying: StateStore, keyType: Seq[DataType], valueType: Seq[DataType]) extends StateStore { private var isValidated = false override def get(key: UnsafeRow): UnsafeRow = { val value = underlying.get(key) if (!isValidated) { validateRow(value) isValidated = true } value } override def id: StateStoreId = underlying.id override def version: Long = underlying.version override def put(key: UnsafeRow, value: UnsafeRow): Unit = underlying.put(key, value) override def remove(key: UnsafeRow): Unit = underlying.remove(key) override def commit(): Long = underlying.commit() override def abort(): Unit = underlying.abort() override def iterator(): Iterator[UnsafeRowPair] = underlying.iterator() override def metrics: StateStoreMetrics = underlying.metrics override def hasCommitted: Boolean = underlying.hasCommitted private def validateRow(row: UnsafeRow): Unit = { // TODO: call util method with row and schema to validate } } def get(...): StateStore = { require(version >= 0) val storeProvider = loadedProviders.synchronized { ... } // TODO: add if statement to see whether it should wrap state store or not new RowValidatingStateStore(storeProvider.getStore(version, keySchema, valueSchema)) } ``` The example code only checks in get operation, which is insufficient to check "key" row in state. That said, iterator approach still provides more possibility of validation, though the validation of unsafe row itself doesn't have enough coverage of checking various incompatibility issues (Definitely we should have another guards as well) so that's a sort of OK to only cover value side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
HeartSaVioR commented on pull request #28707: URL: https://github.com/apache/spark/pull/28707#issuecomment-639200645 > @HeartSaVioR After taking a further look. Instead of dealing with the iterator, how about adding the invalidation for all state store operations in StateStoreProvider? Since we can get the key/value row during load map. WDYT? It would be nice to see the proposed change by code to avoid misunderstanding, like I proposed in previous comment. (anything including commit in your fork or text comment is OK) I'll try out my alternative (wrapping State Store) and show the code change. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
HeartSaVioR commented on pull request #28707: URL: https://github.com/apache/spark/pull/28707#issuecomment-638644781 > What are other stateful operations that use unsafe row? I think we can apply the check everywhere. State store itself stores UnsafeRow, hence it applies to everywhere in stateful operations. I'd propose to do it like https://github.com/apache/spark/pull/28707#issuecomment-637926400 instead of fixing everywhere. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
HeartSaVioR commented on pull request #28707: URL: https://github.com/apache/spark/pull/28707#issuecomment-638522162 And I think SPARK-27237 doesn't require a sort of "future-proof" which is preferably be done with a thing with risk - it doesn't touch the existing part of checkpoint and simply put the schema information into a new file. If we find a better way to pack the schema information into the checkpoint, we can simply discard/ignore the file or craft a logic to migrate smoothly. No risk on rolling back in future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
HeartSaVioR commented on pull request #28707: URL: https://github.com/apache/spark/pull/28707#issuecomment-638520229 Will this be included to Spark 3.0.0? If this is to unblock SPARK-28067 to be included to Spark 3.0.0 then it's OK to consider this first, but if this plans to go to Spark 3.1 then I'm not sure about the priority - are all of you aware that the PR for SPARK-27237 was submitted more than a year ago, and still be considered as later? I still don't get why the proposal is restricting its usage to streaming aggregation, whereas the mechanism is validation of the UnsafeRow which can be applied to all stateful operations. Let's not to pinpoint the problem we've just seen. Also from my side the overhead of the validation logic looks to be trivial compared to the operations stateful operators will take - we don't do the validation for all rows, even don't sample, just the first one. Unless we have a chance to bring a show-stopper bug in the validation logic (so that we need to provide the way to disable the validation), I'm not seeing the needs of new configuration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
HeartSaVioR commented on pull request #28707: URL: https://github.com/apache/spark/pull/28707#issuecomment-637926400 And personally I'd rather do the check in StateStore with additional overhead of reading "a" row in prior to achieve the same in all stateful operations. ``` /** Get or create a store associated with the id. */ def get( storeProviderId: StateStoreProviderId, keySchema: StructType, valueSchema: StructType, indexOrdinal: Option[Int], version: Long, storeConf: StateStoreConf, hadoopConf: Configuration): StateStore = { require(version >= 0) val storeProvider = loadedProviders.synchronized { startMaintenanceIfNeeded() val provider = loadedProviders.getOrElseUpdate( storeProviderId, StateStoreProvider.createAndInit( storeProviderId.storeId, keySchema, valueSchema, indexOrdinal, storeConf, hadoopConf) ) reportActiveStoreInstance(storeProviderId) provider } val store = storeProvider.getStore(version) val iter = store.iterator() if (iter.nonEmpty) { val rowPair = iter.next() val key = rowPair.key val value = rowPair.value // TODO: validate key with key schema // TODO: validate value with value schema } store } ``` For streaming aggregations it initializes "two" state stores so the overhead goes to "two" rows, but I don't think the overhead matters much. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org