Repository: spark
Updated Branches:
  refs/heads/master 9a75c1829 -> a78a90464


[SPARK-24521][SQL][TEST] Fix ineffective test in CachedTableSuite

## What changes were proposed in this pull request?

test("withColumn doesn't invalidate cached dataframe") in CachedTableSuite 
doesn't not work because:

The UDF is executed and test count incremented when "df.cache()" is called and 
the subsequent "df.collect()" has no effect on the test result.

This PR fixed this test and add another test for caching UDF.

## How was this patch tested?

Add new tests.

Author: Li Jin <ice.xell...@gmail.com>

Closes #21531 from icexelloss/fix-cache-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a78a9046
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a78a9046
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a78a9046

Branch: refs/heads/master
Commit: a78a9046413255756653f70165520efd486fb493
Parents: 9a75c18
Author: Li Jin <ice.xell...@gmail.com>
Authored: Tue Jun 19 10:42:08 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Tue Jun 19 10:42:08 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/CachedTableSuite.scala | 19 ----------
 .../apache/spark/sql/DatasetCacheSuite.scala    | 38 +++++++++++++++++++-
 2 files changed, 37 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a78a9046/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 81b7e18..6982c22 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -83,25 +83,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
     }.sum
   }
 
-  test("withColumn doesn't invalidate cached dataframe") {
-    var evalCount = 0
-    val myUDF = udf((x: String) => { evalCount += 1; "result" })
-    val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))
-    df.cache()
-
-    df.collect()
-    assert(evalCount === 1)
-
-    df.collect()
-    assert(evalCount === 1)
-
-    val df2 = df.withColumn("newColumn", lit(1))
-    df2.collect()
-
-    // We should not reevaluate the cached dataframe
-    assert(evalCount === 1)
-  }
-
   test("cache temp table") {
     withTempView("tempTable") {
       testData.select('key).createOrReplaceTempView("tempTable")

http://git-wip-us.apache.org/repos/asf/spark/blob/a78a9046/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index e0561ee..82a93f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql
 
+import org.scalatest.concurrent.TimeLimits
+import org.scalatest.time.SpanSugar._
+
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.storage.StorageLevel
 
 
-class DatasetCacheSuite extends QueryTest with SharedSQLContext {
+class DatasetCacheSuite extends QueryTest with SharedSQLContext with 
TimeLimits {
   import testImplicits._
 
   test("get storage level") {
@@ -96,4 +99,37 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext {
     agged.unpersist()
     assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should 
not be cached.")
   }
+
+  test("persist and then withColumn") {
+    val df = Seq(("test", 1)).toDF("s", "i")
+    val df2 = df.withColumn("newColumn", lit(1))
+
+    df.cache()
+    assertCached(df)
+    assertCached(df2)
+
+    df.count()
+    assertCached(df2)
+
+    df.unpersist()
+    assert(df.storageLevel == StorageLevel.NONE)
+  }
+
+  test("cache UDF result correctly") {
+    val expensiveUDF = udf({x: Int => Thread.sleep(10000); x})
+    val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
+    val df2 = df.agg(sum(df("b")))
+
+    df.cache()
+    df.count()
+    assertCached(df2)
+
+    // udf has been evaluated during caching, and thus should not be 
re-evaluated here
+    failAfter(5 seconds) {
+      df2.collect()
+    }
+
+    df.unpersist()
+    assert(df.storageLevel == StorageLevel.NONE)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to