[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17064 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103594974 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(getNumInMemoryRelations(cachedPlan2) == 4) } } + + test("refreshByPath should refresh all cached plans with the specified path") { +def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = { + spark.catalog.refreshByPath(path) + val data = spark.read.parquet(path) + assert(data.count == dataCount) + val df = data.filter("id > 10") + df.cache + assert(df.count == dataCount - 11) + val df1 = df.filter("id > 11") + df1.cache + assert(df1.count == dataCount - 12) + df1 --- End diff -- The function is called twice. So actually it is meant to refresh the cache in first call. Since I will change the test to what you suggested https://github.com/apache/spark/pull/17064#discussion_r103530085, we can get rid of this confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103594748 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(getNumInMemoryRelations(cachedPlan2) == 4) } } + + test("refreshByPath should refresh all cached plans with the specified path") { +def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = { + spark.catalog.refreshByPath(path) + val data = spark.read.parquet(path) + assert(data.count == dataCount) + val df = data.filter("id > 10") + df.cache + assert(df.count == dataCount - 11) + val df1 = df.filter("id > 11") + df1.cache + assert(df1.count == dataCount - 12) + df1 +} + +withTempDir { dir => + val path = dir.getPath() + spark.range(100).write.mode("overwrite").parquet(path) + assert(f(path, spark, 100).count == 88) + + spark.range(1000).write.mode("overwrite").parquet(path) + assert(f(path, spark, 1000).count == 988) --- End diff -- Ok. Looks simpler and more explicit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103530085 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(getNumInMemoryRelations(cachedPlan2) == 4) } } + + test("refreshByPath should refresh all cached plans with the specified path") { +def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = { + spark.catalog.refreshByPath(path) + val data = spark.read.parquet(path) + assert(data.count == dataCount) + val df = data.filter("id > 10") + df.cache + assert(df.count == dataCount - 11) + val df1 = df.filter("id > 11") + df1.cache + assert(df1.count == dataCount - 12) + df1 +} + +withTempDir { dir => + val path = dir.getPath() + spark.range(100).write.mode("overwrite").parquet(path) + assert(f(path, spark, 100).count == 88) + + spark.range(1000).write.mode("overwrite").parquet(path) + assert(f(path, spark, 1000).count == 988) --- End diff -- we can make this test more explicit ``` spark.range(10).write.mode("overwrite").parquet(path) spark.read.parquet(path).cache() spark.read.parquet(path).filter($"id" > 4).cache() assert(spark.read.parquet(path).filter($"id" > 4).count() == 5) spark.range(20).write.mode("overwrite").parquet(path) spark.catalog.refreshByPath(path) assert(spark.read.parquet(path).filter($"id" > 4).count() == 15) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103528104 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(getNumInMemoryRelations(cachedPlan2) == 4) } } + + test("refreshByPath should refresh all cached plans with the specified path") { +def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = { + spark.catalog.refreshByPath(path) + val data = spark.read.parquet(path) + assert(data.count == dataCount) + val df = data.filter("id > 10") + df.cache + assert(df.count == dataCount - 11) + val df1 = df.filter("id > 11") + df1.cache + assert(df1.count == dataCount - 12) + df1 +} + +withTempDir { dir => + val path = dir.getPath() --- End diff -- we usually call `dir.getCanonicalPath` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103527113 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(getNumInMemoryRelations(cachedPlan2) == 4) } } + + test("refreshByPath should refresh all cached plans with the specified path") { +def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = { + spark.catalog.refreshByPath(path) + val data = spark.read.parquet(path) + assert(data.count == dataCount) + val df = data.filter("id > 10") + df.cache + assert(df.count == dataCount - 11) + val df1 = df.filter("id > 11") + df1.cache + assert(df1.count == dataCount - 12) + df1 --- End diff -- I don't get it, so we call `refreshByPath` before caching the query? Shouldn't we test the opposite order? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103526741 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala --- @@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(getNumInMemoryRelations(cachedPlan2) == 4) } } + + test("refreshByPath should refresh all cached plans with the specified path") { +def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = { + spark.catalog.refreshByPath(path) --- End diff -- we can put `spark.range(dataCount).write.mode("overwrite").parquet(path)` at the beginning of this method and name it `testRefreshByPath` instead of `f` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103393968 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -168,15 +168,16 @@ class CacheManager extends Logging { (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } -cachedData.foreach { - case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => -val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) -if (dataIndex >= 0) { - data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) - cachedData.remove(dataIndex) -} - sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) - case _ => // Do Nothing +cachedData.filter { --- End diff -- The problem can be shown clearly with an example code snippet: val t = new scala.collection.mutable.ArrayBuffer[Int] t += 1 t += 2 t.foreach { case i if i > 0 => println(s"i = $i") val index = t.indexWhere(_ == i) if (index >= 0) { t.remove(index) } println(s"t: $t") t += (i + 2) println(s"t: $t") } Output: i = 1// The first iteration, we get the first element "1" t: ArrayBuffer(2) // "1" has been removed from the array t: ArrayBuffer(2, 3) // New element "3" has been inserted i = 3 // In next iteration, element "2" is wrongly skipped t: ArrayBuffer(2) // "3" has been removed from the array t: ArrayBuffer(2, 5) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103390013 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -168,15 +168,16 @@ class CacheManager extends Logging { (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } -cachedData.foreach { - case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => -val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) -if (dataIndex >= 0) { - data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) - cachedData.remove(dataIndex) -} - sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) - case _ => // Do Nothing +cachedData.filter { --- End diff -- After `filter`, we iterate on a different collection than `cachedData`, so it is no problem to add/delete elements to `cachedData`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103351494 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -168,15 +168,16 @@ class CacheManager extends Logging { (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } -cachedData.foreach { - case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => -val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) -if (dataIndex >= 0) { - data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) - cachedData.remove(dataIndex) -} - sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) - case _ => // Do Nothing +cachedData.filter { --- End diff -- can we use a java collection so that we can remove elements while iterating? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103349345 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -168,15 +168,16 @@ class CacheManager extends Logging { (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } -cachedData.foreach { - case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => -val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) -if (dataIndex >= 0) { - data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) - cachedData.remove(dataIndex) -} - sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) - case _ => // Do Nothing +cachedData.filter { --- End diff -- but we are still modifying it during iteration, after the `filter`. can you be more specific about what the problem is? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103348444 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -168,15 +168,16 @@ class CacheManager extends Logging { (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } -cachedData.foreach { - case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => -val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) -if (dataIndex >= 0) { - data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) - cachedData.remove(dataIndex) -} - sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) - case _ => // Do Nothing +cachedData.filter { --- End diff -- This kind of collection can't be modified during iterating. Some elements are not iterated over if we delete/add elements. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17064#discussion_r103277557 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -168,15 +168,16 @@ class CacheManager extends Logging { (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } -cachedData.foreach { - case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => -val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) -if (dataIndex >= 0) { - data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) - cachedData.remove(dataIndex) -} - sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) - case _ => // Do Nothing +cachedData.filter { --- End diff -- why the previous one doesn't work? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/17064 [SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path ## What changes were proposed in this pull request? `Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path. However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 fix-refreshByPath Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17064.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17064 commit dd6d8ca1c091d00bfc29363ebc0d518b12927325 Author: Liang-Chi HsiehDate: 2017-02-25T05:58:40Z refreshByPath should clear all cached plans with the specified path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org