GitHub user willb opened a pull request:

    https://github.com/apache/spark/pull/189

    SPARK-729:  Closures not always serialized at capture time

    [SPARK-729](https://spark-project.atlassian.net/browse/SPARK-729) concerns 
when free variables in closure arguments to transformations are captured.  
Currently, it is possible for closures to get the environment in which they are 
serialized (not the environment in which they are created).  There are a few 
possible approaches to solving this problem and this PR will discuss some of 
them.  The approach I took has the advantage of being simple, obviously 
correct, and minimally-invasive, but it preserves something that has been 
bothering me about Spark's closure handling, so I'd like to discuss an 
alternative and get some feedback on whether or not it is worth pursuing.
    
    ## What I did
    
    The basic approach I took depends on the work I did for #143, and so this 
PR is based atop that.  Specifically: #143 modifies `ClosureCleaner.clean` to 
preemptively determine whether or not closures are serializable immediately 
upon closure cleaning (rather than waiting for an job involving that closure to 
be scheduled).  Thus non-serializable closure exceptions will be triggered by 
the line defining the closure rather than triggered where the closure is used.
    
    Since the easiest way to determine whether or not a closure is serializable 
is to attempt to serialize it, the code in #143 is creating a serialized 
closure as part of `ClosureCleaner.clean`.  `clean` currently modifies its 
argument, but the method in `SparkContext` that wraps it to return a value (a 
reference to the modified-in-place argument).  This branch modifies 
`ClosureCleaner.clean` so that it returns a value:  if it is cleaning a 
serializable closure, it returns the result of deserializing its serialized 
argument; therefore it is returning a closure with an environment captured at 
cleaning time.  `SparkContext.clean` then returns the result of 
`ClosureCleaner.clean`, rather than a reference to its modified-in-place 
argument.
    
    I've added tests for this behavior (777a1bc).  The pull request as it 
stands, given the changes in #143, is nearly trivial.  There is some overhead 
from deserializing the closure, but it is minimal and the benefit of obvious 
operational correctness (vs. a more sophisticated but harder-to-validate 
transformation in `ClosureCleaner`) seems pretty important.  I think this is a 
fine way to solve this problem, but it's not perfect.
    
    ## What we might want to do
    
    The thing that has been bothering me about Spark's handling of closures is 
that it seems like we should be able to statically ensure that cleaning and 
serialization happen exactly once for a given closure.  If we serialize a 
closure in order to determine whether or not it is serializable, we should be 
able to hang on to the generated byte buffer and use it instead of 
re-serializing the closure later.  By replacing closures with instances of a 
sum type that encodes whether or not a closure has been cleaned or serialized, 
we could handle clean, to-be-cleaned, and serialized closures separately with 
case matches.  Here's a somewhat-concrete sketch (taken from my git stash) of 
what this might look like:
    
    ```scala
    package org.apache.spark.util
     
    import java.nio.ByteBuffer
    import scala.reflect.ClassManifest
     
    sealed abstract class ClosureBox[T] { def func: T }
    final case class RawClosure[T](func: T) extends ClosureBox[T] {}
    final case class CleanedClosure[T](func: T) extends ClosureBox[T] {}
    final case class SerializedClosure[T](func: T, bytebuf: ByteBuffer) extends 
ClosureBox[T] {}
     
    object ClosureBoxImplicits {
      implicit def closureBoxFromFunc[T <: AnyRef](fun: T) = new 
RawClosure[T](fun)
    }
    ```
    
    With these types declared, we'd be able to change `ClosureCleaner.clean` to 
take a `ClosureBox[T=>U]` (possibly generated by implicit conversion) and 
return a `ClosureBox[T=>U]` (either a `CleanedClosure[T=>U]` or a 
`SerializedClosure[T=>U]`, depending on whether or not serializability-checking 
was enabled) instead of a `T=>U`.  A case match could thus short-circuit 
cleaning or serializing closures that had already been cleaned or serialized 
(both in `ClosureCleaner` and in the closure serializer).  
Cleaned-and-serialized closures would be represented by a boxed tuple of the 
original closure and a serialized copy (complete with an environment quiesced 
at transformation time).  Additional implicit conversions could convert from 
`ClosureBox` instances to the underlying function type where appropriate.  
Tracking this sort of state in the type system seems like the right thing to do 
to me.
    
    ### Why we might not want to do that
    
    _It's pretty invasive._  Every function type used by every `RDD` subclass 
would have to change to reflect that they expected a `ClosureBox[T=>U]` instead 
of a `T=>U`.  This obscures what's going on and is not a little ugly.  Although 
I really like the idea of using the type system to enforce the 
clean-or-serialize once discipline, it might not be worth adding another layer 
of types (even if we could hide some of the extra boilerplate with judicious 
application of implicit conversions).
    
    _It statically guarantees a property whose absence is unlikely to cause any 
serious problems as it stands._  It appears that all closures are currently 
dynamically cleaned once and it's not obvious that repeated closure-cleaning is 
likely to be a problem in the future.  Furthermore, serializing closures is 
relatively cheap, so doing it once to check for serialization and once again to 
actually ship them across the wire doesn't seem like a big deal.
    
    Taken together, these seem like a high price to pay for statically 
guaranteeing that closures are operated upon only once.
    
    ## Other possibilities
    
    I felt like the serialize-and-deserialize approach was best due to its 
obvious simplicity.  But it would be possible to do a more sophisticated 
transformation within `ClosureCleaner.clean`.  It might also be possible for 
`clean` to modify its argument in a way so that whether or not a given closure 
had been cleaned would be apparent upon inspection; this would buy us some of 
the operational benefits of the `ClosureBox` approach but not the static 
cleanliness.
    
    I'm interested in any feedback or discussion on whether or not the problems 
with the type-based approach indeed outweigh the advantage, as well as of 
approaches to this issue and to closure handling in general.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/willb/spark spark-729

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #189
    
----
commit 21b4b063372bfbdd289ff770d3d38cb4453e7ca6
Author: William Benton <[email protected]>
Date:   2014-03-13T02:56:32Z

    Test cases for SPARK-897.
    
    Tests to make sure that passing an unserializable closure
    to a transformation fails fast.

commit d8df3dbbed3e1c9e16a2c2002c04c2d2e6a0b2e2
Author: William Benton <[email protected]>
Date:   2014-03-13T19:40:42Z

    Adds proactive closure-serializablilty checking
    
    ClosureCleaner.clean now checks to ensure that its closure argument
    is serializable by default and throws a SparkException with the
    underlying NotSerializableException in the detail message otherwise.
    As a result, transformation invocations with unserializable closures
    will fail at their call sites rather than when they actually execute.
    
    ClosureCleaner.clean now takes a second boolean argument; pass false
    to disable serializability-checking behavior at call sites where this
    behavior isn't desired.

commit d5947b33f8e22ee498473abd59795d4f15a7b198
Author: William Benton <[email protected]>
Date:   2014-03-14T16:40:56Z

    Ensure assertions in Graph.apply are asserted.
    
    The Graph.apply test in GraphSuite had some assertions in a closure in
    a graph transformation. This caused two problems:
    
    1. because assert() was called, test classes were reachable from the
    closures, which made them not serializable, and
    
    2. (more importantly) these assertions never actually executed, since
    they occurred within a lazy map()
    
    This commit simply changes the Graph.apply test to collects the graph
    triplets so it can assert about each triplet from a map method.

commit 4ecf84100e22224aed204c3c4251c6ab20ff8bf6
Author: William Benton <[email protected]>
Date:   2014-03-14T17:33:33Z

    Make proactive serializability checking optional.
    
    SparkContext.clean uses ClosureCleaner's proactive serializability
    checking by default.  This commit adds an overloaded clean method
    to SparkContext that allows clients to specify that serializability
    checking should not occur as part of closure cleaning.

commit d6e8dd6469ef24ee0631d7c5bb424498715c59f5
Author: William Benton <[email protected]>
Date:   2014-03-14T17:34:42Z

    Don't check serializability of DStream transforms.
    
    Since the DStream is reachable from within these closures, they aren't
    checkable by the straightforward technique of passing them to the
    closure serializer.

commit 9c5cf90996bd7f9b8afbfa05e539cf68e2e48ee4
Author: William Benton <[email protected]>
Date:   2014-03-18T01:12:46Z

    Remove closure-serializablity test in DAGScheduler
    
    Since all closures that will be marshaled via the DAGScheduler will
    already be checked to ensure serializability at transformation or
    action invocation by ClosureCleaner.clean, there is no need to check
    these for serializability again when jobs are submitted.

commit b276f0ed2fc619edbb3a48617bed8de8f84beb6b
Author: William Benton <[email protected]>
Date:   2014-03-18T14:55:57Z

    Added tests for variable capture in closures
    
    The two tests added to ClosureCleanerSuite ensure that variable values
    are captured at RDD definition time, not at job-execution time.

commit c73efa8ae9a83b4d62ae37a763a6454d51d3d359
Author: William Benton <[email protected]>
Date:   2014-03-20T15:48:17Z

    Predictable closure environment capture
    
    The environments of serializable closures are now captured as
    part of closure cleaning. Since we already proactively check most
    closures for serializability, ClosureCleaner.clean now returns
    the result of deserializing the serialized version of the cleaned
    closure.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to