Re: [PR] [SPARK-46992]Fix cache consistence [spark]
dtarima commented on code in PR #45181: URL: https://github.com/apache/spark/pull/45181#discussion_r1526282544 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -193,10 +193,40 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( -@DeveloperApi @Unstable @transient val queryExecution: QueryExecution, +@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + @volatile private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None + + def queryExecution: QueryExecution = { Review Comment: This method should probably have `@DeveloperApi @Unstable`, and remove `@DeveloperApi` annotation from `queryUnpersisted` above. ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -193,10 +193,40 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( -@DeveloperApi @Unstable @transient val queryExecution: QueryExecution, +@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + @volatile private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None + + def queryExecution: QueryExecution = { +val cacheStatesSign = queryUnpersisted.computeCacheStateSignature() +// If all children aren't cached, directly return the queryUnpersisted +if (cacheStatesSign.forall(b => !b)) { Review Comment: nit: `cacheStatesSign.forall(_ == false)` is a bit more readable, and I think it'll make the comment unnecessary ## sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala: ## @@ -369,6 +375,20 @@ class QueryExecution( Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message) } + /** + * This method performs a pre-order traversal and return a boolean Array + * representing whether some nodes of the logical tree are persisted. + */ + def computeCacheStateSignature(): Array[Boolean] = { Review Comment: How about using `BitSet` for persistence state representation? It'll be easier to work with and it's more efficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46992]Fix cache consistence [spark]
dtarima commented on code in PR #45181: URL: https://github.com/apache/spark/pull/45181#discussion_r1521457462 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -193,10 +193,40 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( -@DeveloperApi @Unstable @transient val queryExecution: QueryExecution, +@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None + + def queryExecution: QueryExecution = { +val cacheStatesSign = queryUnpersisted.computeCacheStateSignature() +// If all children aren't cached, directly return the queryUnpersisted +if (cacheStatesSign.forall(b => !b)) { Review Comment: > I don't think making queryExecution wrapped by AtomicReference means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call ds.count(), the cache consistency may be incorrect. Yes, consistency of results cannot be guaranteed when persistence state changes concurrently in different threads, but this is not what I was pointing to. Thread safety is a basic concept, not related to business logic: when we change a `var` in one thread, other threads might not see the updated reference. In order to avoid it the reference needs to be marked `volatile`. In the example above I used AtomicReference's `set` for simplicity, but it might make sense to implement it using `compareAndSet` to get additional guarantees. > Using 2 queryExecution variables may help reduce count of analysis. I doubt that the additional complexity worth it. It's not a big deal... Let's see what reviewers think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46992]Fix cache consistence [spark]
doki23 commented on code in PR #45181: URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -193,10 +193,40 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( -@DeveloperApi @Unstable @transient val queryExecution: QueryExecution, +@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None + + def queryExecution: QueryExecution = { +val cacheStatesSign = queryUnpersisted.computeCacheStateSignature() +// If all children aren't cached, directly return the queryUnpersisted +if (cacheStatesSign.forall(b => !b)) { Review Comment: I don't think making queryExecution wrapped by `AtomicReference` means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call `ds.count()`, the cache consistency may be incorrect. Use 2 queryExecution variables may help reduce count of analysis. ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -193,10 +193,40 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( -@DeveloperApi @Unstable @transient val queryExecution: QueryExecution, +@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None + + def queryExecution: QueryExecution = { +val cacheStatesSign = queryUnpersisted.computeCacheStateSignature() +// If all children aren't cached, directly return the queryUnpersisted +if (cacheStatesSign.forall(b => !b)) { Review Comment: I don't think making queryExecution wrapped by `AtomicReference` means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call `ds.count()`, the cache consistency may be incorrect. Using 2 queryExecution variables may help reduce count of analysis. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46992]Fix cache consistence [spark]
doki23 commented on code in PR #45181: URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -193,10 +193,40 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( -@DeveloperApi @Unstable @transient val queryExecution: QueryExecution, +@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None + + def queryExecution: QueryExecution = { +val cacheStatesSign = queryUnpersisted.computeCacheStateSignature() +// If all children aren't cached, directly return the queryUnpersisted +if (cacheStatesSign.forall(b => !b)) { Review Comment: I don't think making queryExecution wrapped by `AtomicReference` means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call `ds.count()`, the cache consistency may be incorrect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46992]Fix cache consistence [spark]
doki23 commented on code in PR #45181: URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -193,10 +193,40 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( -@DeveloperApi @Unstable @transient val queryExecution: QueryExecution, +@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None + + def queryExecution: QueryExecution = { +val cacheStatesSign = queryUnpersisted.computeCacheStateSignature() +// If all children aren't cached, directly return the queryUnpersisted +if (cacheStatesSign.forall(b => !b)) { Review Comment: I don't think that make queryExecution wrapped by `AtomicReference` means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call `ds.count()`, the cache consistency may be incorrect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46992]Fix cache consistence [spark]
dtarima commented on code in PR #45181: URL: https://github.com/apache/spark/pull/45181#discussion_r1517869525 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -193,10 +193,40 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( -@DeveloperApi @Unstable @transient val queryExecution: QueryExecution, +@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None + + def queryExecution: QueryExecution = { +val cacheStatesSign = queryUnpersisted.computeCacheStateSignature() +// If all children aren't cached, directly return the queryUnpersisted +if (cacheStatesSign.forall(b => !b)) { Review Comment: 1. It doesn't look like it's necessary to distinguish between `persisted` and `unpersisted` anymore. If we wanted we could have a cache `Map[State, QueryExecution]` for different states, but I think it'd add unjustified complexity. 2. We cannot use `var` - it's not thread-safe. ```scala class Dataset[T] private[sql]( @Unstable @transient val queryExecutionRef: AtomicReference[(Array[Boolean], QueryExecution)], @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { @DeveloperApi def queryExecution: QueryExecution = { val (state, queryExecution) = queryExecutionRef.get() val newState = queryExecution.computeCacheStateSignature() if (state.sameElements(newState)) queryExecution else { val newQueryExecution = new QueryExecution(queryExecution) queryExecutionRef.set((newState, newQueryExecution)) newQueryExecution } } ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46992]Fix cache consistence [spark]
doki23 commented on PR #45181: URL: https://github.com/apache/spark/pull/45181#issuecomment-1984850287 > All children have to be considered for changes of their persistence state. Currently it only checks the fist found child. For clarity there is a test which fails: [doki23#1](https://github.com/doki23/spark/pull/1) So, we need a cache state signature for queryExecution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46992]Fix cache consistence [spark]
dtarima commented on PR #45181: URL: https://github.com/apache/spark/pull/45181#issuecomment-1983260338 All children have to be considered for changes of their persistence state. Currently it only checks the fist found child. For clarity there is a test which fails: https://github.com/doki23/spark/pull/1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org