GitHub user willb opened a pull request:
https://github.com/apache/spark/pull/189
SPARK-729: Closures not always serialized at capture time
[SPARK-729](https://spark-project.atlassian.net/browse/SPARK-729) concerns
when free variables in closure arguments to transformations are captured.
Currently, it is possible for closures to get the environment in which they are
serialized (not the environment in which they are created). There are a few
possible approaches to solving this problem and this PR will discuss some of
them. The approach I took has the advantage of being simple, obviously
correct, and minimally-invasive, but it preserves something that has been
bothering me about Spark's closure handling, so I'd like to discuss an
alternative and get some feedback on whether or not it is worth pursuing.
## What I did
The basic approach I took depends on the work I did for #143, and so this
PR is based atop that. Specifically: #143 modifies `ClosureCleaner.clean` to
preemptively determine whether or not closures are serializable immediately
upon closure cleaning (rather than waiting for an job involving that closure to
be scheduled). Thus non-serializable closure exceptions will be triggered by
the line defining the closure rather than triggered where the closure is used.
Since the easiest way to determine whether or not a closure is serializable
is to attempt to serialize it, the code in #143 is creating a serialized
closure as part of `ClosureCleaner.clean`. `clean` currently modifies its
argument, but the method in `SparkContext` that wraps it to return a value (a
reference to the modified-in-place argument). This branch modifies
`ClosureCleaner.clean` so that it returns a value: if it is cleaning a
serializable closure, it returns the result of deserializing its serialized
argument; therefore it is returning a closure with an environment captured at
cleaning time. `SparkContext.clean` then returns the result of
`ClosureCleaner.clean`, rather than a reference to its modified-in-place
argument.
I've added tests for this behavior (777a1bc). The pull request as it
stands, given the changes in #143, is nearly trivial. There is some overhead
from deserializing the closure, but it is minimal and the benefit of obvious
operational correctness (vs. a more sophisticated but harder-to-validate
transformation in `ClosureCleaner`) seems pretty important. I think this is a
fine way to solve this problem, but it's not perfect.
## What we might want to do
The thing that has been bothering me about Spark's handling of closures is
that it seems like we should be able to statically ensure that cleaning and
serialization happen exactly once for a given closure. If we serialize a
closure in order to determine whether or not it is serializable, we should be
able to hang on to the generated byte buffer and use it instead of
re-serializing the closure later. By replacing closures with instances of a
sum type that encodes whether or not a closure has been cleaned or serialized,
we could handle clean, to-be-cleaned, and serialized closures separately with
case matches. Here's a somewhat-concrete sketch (taken from my git stash) of
what this might look like:
```scala
package org.apache.spark.util
import java.nio.ByteBuffer
import scala.reflect.ClassManifest
sealed abstract class ClosureBox[T] { def func: T }
final case class RawClosure[T](func: T) extends ClosureBox[T] {}
final case class CleanedClosure[T](func: T) extends ClosureBox[T] {}
final case class SerializedClosure[T](func: T, bytebuf: ByteBuffer) extends
ClosureBox[T] {}
object ClosureBoxImplicits {
implicit def closureBoxFromFunc[T <: AnyRef](fun: T) = new
RawClosure[T](fun)
}
```
With these types declared, we'd be able to change `ClosureCleaner.clean` to
take a `ClosureBox[T=>U]` (possibly generated by implicit conversion) and
return a `ClosureBox[T=>U]` (either a `CleanedClosure[T=>U]` or a
`SerializedClosure[T=>U]`, depending on whether or not serializability-checking
was enabled) instead of a `T=>U`. A case match could thus short-circuit
cleaning or serializing closures that had already been cleaned or serialized
(both in `ClosureCleaner` and in the closure serializer).
Cleaned-and-serialized closures would be represented by a boxed tuple of the
original closure and a serialized copy (complete with an environment quiesced
at transformation time). Additional implicit conversions could convert from
`ClosureBox` instances to the underlying function type where appropriate.
Tracking this sort of state in the type system seems like the right thing to do
to me.
### Why we might not want to do that
_It's pretty invasive._ Every function type used by every `RDD` subclass
would have to change to reflect that they expected a `ClosureBox[T=>U]` instead
of a `T=>U`. This obscures what's going on and is not a little ugly. Although
I really like the idea of using the type system to enforce the
clean-or-serialize once discipline, it might not be worth adding another layer
of types (even if we could hide some of the extra boilerplate with judicious
application of implicit conversions).
_It statically guarantees a property whose absence is unlikely to cause any
serious problems as it stands._ It appears that all closures are currently
dynamically cleaned once and it's not obvious that repeated closure-cleaning is
likely to be a problem in the future. Furthermore, serializing closures is
relatively cheap, so doing it once to check for serialization and once again to
actually ship them across the wire doesn't seem like a big deal.
Taken together, these seem like a high price to pay for statically
guaranteeing that closures are operated upon only once.
## Other possibilities
I felt like the serialize-and-deserialize approach was best due to its
obvious simplicity. But it would be possible to do a more sophisticated
transformation within `ClosureCleaner.clean`. It might also be possible for
`clean` to modify its argument in a way so that whether or not a given closure
had been cleaned would be apparent upon inspection; this would buy us some of
the operational benefits of the `ClosureBox` approach but not the static
cleanliness.
I'm interested in any feedback or discussion on whether or not the problems
with the type-based approach indeed outweigh the advantage, as well as of
approaches to this issue and to closure handling in general.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/willb/spark spark-729
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/189.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #189
----
commit 21b4b063372bfbdd289ff770d3d38cb4453e7ca6
Author: William Benton <[email protected]>
Date: 2014-03-13T02:56:32Z
Test cases for SPARK-897.
Tests to make sure that passing an unserializable closure
to a transformation fails fast.
commit d8df3dbbed3e1c9e16a2c2002c04c2d2e6a0b2e2
Author: William Benton <[email protected]>
Date: 2014-03-13T19:40:42Z
Adds proactive closure-serializablilty checking
ClosureCleaner.clean now checks to ensure that its closure argument
is serializable by default and throws a SparkException with the
underlying NotSerializableException in the detail message otherwise.
As a result, transformation invocations with unserializable closures
will fail at their call sites rather than when they actually execute.
ClosureCleaner.clean now takes a second boolean argument; pass false
to disable serializability-checking behavior at call sites where this
behavior isn't desired.
commit d5947b33f8e22ee498473abd59795d4f15a7b198
Author: William Benton <[email protected]>
Date: 2014-03-14T16:40:56Z
Ensure assertions in Graph.apply are asserted.
The Graph.apply test in GraphSuite had some assertions in a closure in
a graph transformation. This caused two problems:
1. because assert() was called, test classes were reachable from the
closures, which made them not serializable, and
2. (more importantly) these assertions never actually executed, since
they occurred within a lazy map()
This commit simply changes the Graph.apply test to collects the graph
triplets so it can assert about each triplet from a map method.
commit 4ecf84100e22224aed204c3c4251c6ab20ff8bf6
Author: William Benton <[email protected]>
Date: 2014-03-14T17:33:33Z
Make proactive serializability checking optional.
SparkContext.clean uses ClosureCleaner's proactive serializability
checking by default. This commit adds an overloaded clean method
to SparkContext that allows clients to specify that serializability
checking should not occur as part of closure cleaning.
commit d6e8dd6469ef24ee0631d7c5bb424498715c59f5
Author: William Benton <[email protected]>
Date: 2014-03-14T17:34:42Z
Don't check serializability of DStream transforms.
Since the DStream is reachable from within these closures, they aren't
checkable by the straightforward technique of passing them to the
closure serializer.
commit 9c5cf90996bd7f9b8afbfa05e539cf68e2e48ee4
Author: William Benton <[email protected]>
Date: 2014-03-18T01:12:46Z
Remove closure-serializablity test in DAGScheduler
Since all closures that will be marshaled via the DAGScheduler will
already be checked to ensure serializability at transformation or
action invocation by ClosureCleaner.clean, there is no need to check
these for serializability again when jobs are submitted.
commit b276f0ed2fc619edbb3a48617bed8de8f84beb6b
Author: William Benton <[email protected]>
Date: 2014-03-18T14:55:57Z
Added tests for variable capture in closures
The two tests added to ClosureCleanerSuite ensure that variable values
are captured at RDD definition time, not at job-execution time.
commit c73efa8ae9a83b4d62ae37a763a6454d51d3d359
Author: William Benton <[email protected]>
Date: 2014-03-20T15:48:17Z
Predictable closure environment capture
The environments of serializable closures are now captured as
part of closure cleaning. Since we already proactively check most
closures for serializability, ClosureCleaner.clean now returns
the result of deserializing the serialized version of the cleaned
closure.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---