Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-15 Thread via GitHub


dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1526282544


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
 @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  @volatile private var queryPersisted: Option[(Array[Boolean], 
QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {

Review Comment:
   This method should probably have `@DeveloperApi @Unstable`, and remove 
`@DeveloperApi` annotation from `queryUnpersisted` above.



##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
 @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  @volatile private var queryPersisted: Option[(Array[Boolean], 
QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+// If all children aren't cached, directly return the queryUnpersisted
+if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   nit: `cacheStatesSign.forall(_ == false)` is a bit more readable, and I 
think it'll make the comment unnecessary



##
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##
@@ -369,6 +375,20 @@ class QueryExecution(
 Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, 
message)
   }
 
+  /**
+   * This method performs a pre-order traversal and return a boolean Array
+   * representing whether some nodes of the logical tree are persisted.
+   */
+  def computeCacheStateSignature(): Array[Boolean] = {

Review Comment:
   How about using `BitSet` for persistence state representation?
   It'll be easier to work with and it's more efficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-12 Thread via GitHub


dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1521457462


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
 @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+// If all children aren't cached, directly return the queryUnpersisted
+if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   > I don't think making queryExecution wrapped by AtomicReference means it's 
thread-safe. For example, we unpersist one of it's children in another thread, 
and at meanwhile we call ds.count(), the cache consistency may be incorrect.
   
   Yes, consistency of results cannot be guaranteed when persistence state 
changes concurrently in different threads, but this is not what I was pointing 
to. Thread safety is a basic concept, not related to business logic: when we 
change a `var` in one thread, other threads might not see the updated 
reference. In order to avoid it the reference needs to be marked `volatile`. In 
the example above I used AtomicReference's `set` for simplicity, but it might 
make sense to implement it using `compareAndSet` to get additional guarantees.
   
   > Using 2 queryExecution variables may help reduce count of analysis.
   
   I doubt that the additional complexity worth it. It's not a big deal... 
Let's see what reviewers think.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-12 Thread via GitHub


doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
 @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+// If all children aren't cached, directly return the queryUnpersisted
+if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   I don't think making queryExecution wrapped by `AtomicReference` means it's 
thread-safe. For example, we unpersist one of it's children in another thread, 
and at meanwhile we call `ds.count()`, the cache consistency may be incorrect. 
   
   Use 2 queryExecution variables may help reduce count of analysis.



##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
 @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+// If all children aren't cached, directly return the queryUnpersisted
+if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   I don't think making queryExecution wrapped by `AtomicReference` means it's 
thread-safe. For example, we unpersist one of it's children in another thread, 
and at meanwhile we call `ds.count()`, the cache consistency may be incorrect. 
   
   Using 2 queryExecution variables may help reduce count of analysis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-12 Thread via GitHub


doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
 @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+// If all children aren't cached, directly return the queryUnpersisted
+if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   I don't think making queryExecution wrapped by `AtomicReference` means it's 
thread-safe. For example, we unpersist one of it's children in another thread, 
and at meanwhile we call `ds.count()`, the cache consistency may be incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-12 Thread via GitHub


doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
 @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+// If all children aren't cached, directly return the queryUnpersisted
+if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   I don't think that make queryExecution wrapped by `AtomicReference` means 
it's thread-safe. For example, we unpersist one of it's children in another 
thread, and at meanwhile we call `ds.count()`, the cache consistency may be 
incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-08 Thread via GitHub


dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1517869525


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+@DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
 @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+// If all children aren't cached, directly return the queryUnpersisted
+if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   1. It doesn't look like it's necessary to distinguish between `persisted` 
and `unpersisted` anymore. If we wanted we could have a cache `Map[State, 
QueryExecution]` for different states, but I think it'd add unjustified 
complexity.
   2. We cannot use `var` - it's not thread-safe.
   
   ```scala
   class Dataset[T] private[sql](
   @Unstable @transient val queryExecutionRef: 
AtomicReference[(Array[Boolean], QueryExecution)],
   @DeveloperApi @Unstable @transient val encoder: Encoder[T])
 extends Serializable {
   
 @DeveloperApi
 def queryExecution: QueryExecution = {
   val (state, queryExecution) = queryExecutionRef.get()
   val newState = queryExecution.computeCacheStateSignature()
   
   if (state.sameElements(newState)) queryExecution
   else {
 val newQueryExecution = new QueryExecution(queryExecution)
 queryExecutionRef.set((newState, newQueryExecution))
 newQueryExecution
   }
 }
   
 ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-07 Thread via GitHub


doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1984850287

   > All children have to be considered for changes of their persistence state. 
Currently it only checks the fist found child. For clarity there is a test 
which fails: [doki23#1](https://github.com/doki23/spark/pull/1)
   
   So, we need a cache state signature for queryExecution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-07 Thread via GitHub


dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1983260338

   All children have to be considered for changes of their persistence state. 
Currently it only checks the fist found child.
   For clarity there is a test which fails: 
https://github.com/doki23/spark/pull/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org