GitHub user ueshin opened a pull request:

    https://github.com/apache/spark/pull/13596

    [SPARK-15870][SQL] DataFrame can't execute after uncacheTable.

    ## What changes were proposed in this pull request?
    
    If a cached `DataFrame` executed more than once and then do `uncacheTable` 
like the following:
    
    ```
        val selectStar = sql("SELECT * FROM testData WHERE key = 1")
        selectStar.createOrReplaceTempView("selectStar")
    
        spark.catalog.cacheTable("selectStar")
        checkAnswer(
          selectStar,
          Seq(Row(1, "1")))
    
        spark.catalog.uncacheTable("selectStar")
        checkAnswer(
          selectStar,
          Seq(Row(1, "1")))
    ```
    
    , then the uncached `DataFrame` can't execute because of `Task not 
serializable` exception like:
    
    ```
    org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2038)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:883)
        at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
    ...
    Caused by: java.lang.UnsupportedOperationException: Accumulator must be 
registered before send to executor
        at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153)
        at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    ...
    ```
    
    Notice that `DataFrame` uncached with `DataFrame.unpersist()` works, but 
with `spark.catalog.uncacheTable` doesn't work.
    
    This pr reverts a part of cf38fe0 not to unregister `batchStats` 
accumulator, which is not needed to be unregistered here because it will be 
done by `ContextCleaner` after it is collected by GC.
    
    ## How was this patch tested?
    
    Added a test to check if DataFrame can execute after uncacheTable and other 
existing tests.
    But I made a test to check if the accumulator was cleared as `ignore` 
because the test would be flaky.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ueshin/apache-spark issues/SPARK-15870

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13596.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13596
    
----
commit 379b1dc90978fd2e3465b3cc240033943dbefd4c
Author: Takuya UESHIN <[email protected]>
Date:   2016-06-10T08:43:37Z

    Add a test to check if DataFrame can execute after uncacheTable.

commit e844a7ea0995e0be17aa96a4381e9bae90b75c76
Author: Takuya UESHIN <[email protected]>
Date:   2016-06-10T08:46:00Z

    Revert a part of cf38fe0 not to unregister batchStats accumulator.

commit 56082d99f63594ca838ebf22131695f4458238e4
Author: Takuya UESHIN <[email protected]>
Date:   2016-06-10T08:52:56Z

    Ignore a flaky test.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to