dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513104440
##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,12 @@ private[sql] object Dataset {
*/
@Stable
class Dataset[T] private[sql](
- @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+ @DeveloperApi @Unstable @transient private[sql] var queryExecution:
QueryExecution,
Review Comment:
It's not thread-safe.
##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala:
##########
@@ -134,6 +134,19 @@ class DatasetCacheSuite extends QueryTest
assert(df.storageLevel == StorageLevel.NONE)
}
+ test("SPARK-46992 collect before persisting") {
+ val ds = (1 to 4).toDF("id").sort("id").sample(0.4, 123)
+ // this check will call ds.collect() first
+ assert(Array(Row(1), Row(4)).sameElements(ds.collect()))
+ // and then cache it
+ ds.cache()
+ // Make sure, the Dataset is indeed cached.
+ assertCached(ds)
+ // Make sure the result of count() is consistent with collect()
Review Comment:
there is no need to the comments - they are saying the same thing what the
code is doing
##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,6 +3880,8 @@ class Dataset[T] private[sql](
*/
def persist(newLevel: StorageLevel): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
+ queryExecution = new QueryExecution(
+ queryExecution.sparkSession, queryExecution.logical,
queryExecution.tracker)
Review Comment:
there is also `mode: CommandExecutionMode.Value` argument - I don't know its
meaning, but I'm pretty sure it'd need to be set too
##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,6 +3880,8 @@ class Dataset[T] private[sql](
*/
def persist(newLevel: StorageLevel): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
+ queryExecution = new QueryExecution(
Review Comment:
1. Persisting in `cacheManager` is global - it affects all `Dataset`
instances.
2. `unpersist` on any of `Dataset` instances would require updating
`queryExecution` again on all instances with the same `logicalPlan`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]