doki23 commented on PR #45181: URL: https://github.com/apache/spark/pull/45181#issuecomment-1980094365
> I don't think it fixes the issue completely and there are some problems with the solution. I believe a proper solution is in the following comment: [#45181 (comment)](https://github.com/apache/spark/pull/45181#issuecomment-1969241145) > > Also many more tests are needed. Based on this solution, I think we should def `queryExecution` like this: ```scala def queryExecution: QueryExecution = { val cacheManager = queryExec.sparkSession.sharedState.cacheManager val plan = queryExec.logical plan.find({ case _: IgnoreCachedData => false case currentFragment => cacheManager.lookupCachedData(currentFragment).isEmpty }) match { // if we can't find cached plan, we directly return the queryExec case None => queryPersisted = None queryExec // we find children plan is cached, we make sure that queryPersisted is consistent // with the cachedPlan case Some(cachedPlan) => queryPersisted match { case None => val qe = new QueryExecution(queryExec) queryPersisted = Some(cachedPlan, qe) qe case Some((prevCachedPlan, prevCachedQe)) => if (prevCachedPlan.sameResult(cachedPlan)) { prevCachedQe } else { val qe = new QueryExecution(queryExec) // refresh the cached queryPersisted queryPersisted = Some(cachedPlan, qe) qe } } } } ``` When the ds or the child dataset is persisted or unpersisted, we can get newest consistent result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
