[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-03-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17064


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103594974
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   assert(getNumInMemoryRelations(cachedPlan2) == 4)
 }
   }
+
+  test("refreshByPath should refresh all cached plans with the specified 
path") {
+def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = {
+  spark.catalog.refreshByPath(path)
+  val data = spark.read.parquet(path)
+  assert(data.count == dataCount)
+  val df = data.filter("id > 10")
+  df.cache
+  assert(df.count == dataCount - 11)
+  val df1 = df.filter("id > 11")
+  df1.cache
+  assert(df1.count == dataCount - 12)
+  df1
--- End diff --

The function is called twice. So actually it is meant to refresh the cache 
in first call. Since I will change the test to what you suggested 
https://github.com/apache/spark/pull/17064#discussion_r103530085, we can get 
rid of this confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103594748
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   assert(getNumInMemoryRelations(cachedPlan2) == 4)
 }
   }
+
+  test("refreshByPath should refresh all cached plans with the specified 
path") {
+def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = {
+  spark.catalog.refreshByPath(path)
+  val data = spark.read.parquet(path)
+  assert(data.count == dataCount)
+  val df = data.filter("id > 10")
+  df.cache
+  assert(df.count == dataCount - 11)
+  val df1 = df.filter("id > 11")
+  df1.cache
+  assert(df1.count == dataCount - 12)
+  df1
+}
+
+withTempDir { dir =>
+  val path = dir.getPath()
+  spark.range(100).write.mode("overwrite").parquet(path)
+  assert(f(path, spark, 100).count == 88)
+
+  spark.range(1000).write.mode("overwrite").parquet(path)
+  assert(f(path, spark, 1000).count == 988)
--- End diff --

Ok. Looks simpler and more explicit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103530085
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   assert(getNumInMemoryRelations(cachedPlan2) == 4)
 }
   }
+
+  test("refreshByPath should refresh all cached plans with the specified 
path") {
+def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = {
+  spark.catalog.refreshByPath(path)
+  val data = spark.read.parquet(path)
+  assert(data.count == dataCount)
+  val df = data.filter("id > 10")
+  df.cache
+  assert(df.count == dataCount - 11)
+  val df1 = df.filter("id > 11")
+  df1.cache
+  assert(df1.count == dataCount - 12)
+  df1
+}
+
+withTempDir { dir =>
+  val path = dir.getPath()
+  spark.range(100).write.mode("overwrite").parquet(path)
+  assert(f(path, spark, 100).count == 88)
+
+  spark.range(1000).write.mode("overwrite").parquet(path)
+  assert(f(path, spark, 1000).count == 988)
--- End diff --

we can make this test more explicit
```
  spark.range(10).write.mode("overwrite").parquet(path)
  spark.read.parquet(path).cache()
  spark.read.parquet(path).filter($"id" > 4).cache()
  assert(spark.read.parquet(path).filter($"id" > 4).count() == 5)

  spark.range(20).write.mode("overwrite").parquet(path)
  spark.catalog.refreshByPath(path)
  assert(spark.read.parquet(path).filter($"id" > 4).count() == 15)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103528104
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   assert(getNumInMemoryRelations(cachedPlan2) == 4)
 }
   }
+
+  test("refreshByPath should refresh all cached plans with the specified 
path") {
+def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = {
+  spark.catalog.refreshByPath(path)
+  val data = spark.read.parquet(path)
+  assert(data.count == dataCount)
+  val df = data.filter("id > 10")
+  df.cache
+  assert(df.count == dataCount - 11)
+  val df1 = df.filter("id > 11")
+  df1.cache
+  assert(df1.count == dataCount - 12)
+  df1
+}
+
+withTempDir { dir =>
+  val path = dir.getPath()
--- End diff --

we usually call `dir.getCanonicalPath`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103527113
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   assert(getNumInMemoryRelations(cachedPlan2) == 4)
 }
   }
+
+  test("refreshByPath should refresh all cached plans with the specified 
path") {
+def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = {
+  spark.catalog.refreshByPath(path)
+  val data = spark.read.parquet(path)
+  assert(data.count == dataCount)
+  val df = data.filter("id > 10")
+  df.cache
+  assert(df.count == dataCount - 11)
+  val df1 = df.filter("id > 11")
+  df1.cache
+  assert(df1.count == dataCount - 12)
+  df1
--- End diff --

I don't get it, so we call `refreshByPath` before caching the query? 
Shouldn't we test the opposite order?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-28 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103526741
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -634,4 +634,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   assert(getNumInMemoryRelations(cachedPlan2) == 4)
 }
   }
+
+  test("refreshByPath should refresh all cached plans with the specified 
path") {
+def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = {
+  spark.catalog.refreshByPath(path)
--- End diff --

we can put `spark.range(dataCount).write.mode("overwrite").parquet(path)` 
at the beginning of this method and name it `testRefreshByPath` instead of `f`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103393968
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

The problem can be shown clearly with an example code snippet:

val t = new scala.collection.mutable.ArrayBuffer[Int]

t += 1
t += 2

t.foreach { 
  case i if i > 0 =>
println(s"i = $i")
val index = t.indexWhere(_ == i)
if (index >= 0) {
  t.remove(index)
}
println(s"t: $t")
t += (i + 2)
println(s"t: $t")
}

Output:

i = 1// The first iteration, we get the first element "1"
t: ArrayBuffer(2)   // "1" has been removed from the array
t: ArrayBuffer(2, 3) // New element "3" has been inserted
i = 3   // In next iteration, element "2" is wrongly skipped
t: ArrayBuffer(2) // "3" has been removed from the array
t: ArrayBuffer(2, 5) 






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103390013
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

After `filter`, we iterate on a different collection than `cachedData`, so 
it is no problem to add/delete elements to `cachedData`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103351494
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

can we use a java collection so that we can remove elements while iterating?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103349345
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

but we are still modifying it during iteration, after the `filter`. can you 
be more specific about what the problem is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103348444
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

This kind of collection can't be modified during iterating. Some elements 
are not iterated over if we delete/add elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17064#discussion_r103277557
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking 
= true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
--- End diff --

why the previous one doesn't work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17064: [SPARK-19736][SQL] refreshByPath should clear all...

2017-02-24 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/17064

[SPARK-19736][SQL] refreshByPath should clear all cached plans with the 
specified path

## What changes were proposed in this pull request?

`Catalog.refreshByPath` can refresh the cache entry and the associated 
metadata for all dataframes (if any), that contain the given data source path.

However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans 
with the specified path. It causes some strange behaviors reported in 
SPARK-15678.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 fix-refreshByPath

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17064.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17064


commit dd6d8ca1c091d00bfc29363ebc0d518b12927325
Author: Liang-Chi Hsieh 
Date:   2017-02-25T05:58:40Z

refreshByPath should clear all cached plans with the specified path.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org