Hi Matei!
On Oct 4, 2013, at 12:03 PM, Matei Zaharia wrote:
> Hi Paul,
>
> Just FYI, I'm not sure Akka was designed to pass ActorSystems across closures
> the way you're doing.
There may be some confusion here: I'm most certainly not closing over an
ActorSystem!
> Also, there's a bit of a misunderstanding about closures on RDDs. Consider
> this change you made to ActorWordCount:
>
> lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach {
> x => x foreach { x1 => con ! x1 }
> }
I believe Prashant made that change to see if he could reproduce my issue. He
was able to, but not, I think, for reasons that either you or I expect, about
which more in a moment.
> You say that using "con" in the foreach on RDD x won't work, but that makes a
> lot of sense -- foreach on RDDs could be running on a completely different
> node!
As I would hope and expect!
> Akka doesn't support passing ActorSystems across nodes
This is the part that's confusing me: no one is asking it to.
> and in the best case
Which I would instead call the expected case... :-)
> what might've happened with this code is that the remote node would send a
> message to the "con" object in your main program, which would then send stuff
> over the network.
Exactly. What I close over is an ActorRef, not an ActorSystem. ActorRefs are
deliberately Serializable, and, when Akka's Serialization infrastructure is
used to do the serialization and deserialization, ActorRefs sent to different
nodes work exactly as you describe, assuming the Akka configuration includes
the RemoteActorRefProvider, which Spark's, of course, does.
> Basically, you have to be really aware of when an operation is running on a
> distributed collection versus locally in the main program. RDD.foreach, map,
> etc all run code on remote nodes of the cluster. DStream.foreach is an
> exception (and I'm kind of surprised it's called foreach now, because it used
> to be called foreachRDD) in that it lets you run stuff locally on the RDD
> object.
While your point is well taken, I can assure you that I'm well aware of this,
and in fact, for this use case, the replication, lineage, exactly-once
semantics, and recovery features of Spark Streaming are precisely what
motivates its use.
> The fact that Akka passes ActorRefs around makes this really confusing,
> unfortunately, because those can be passed across nodes, and it looks like
> they can fail in weird ways if they're not for a local ActorSystem.
I'm frankly confused by this, for basically two reasons. One is that I've been
working with Akka's remote actor support for a few years now, and have never
before run into any such issues: ActorRefs serialize and deserialize fine,
becoming remote ActorRefs on the destination node, and messages sent to them
are sent to the originating node, exactly as you would expect and is documented
at
<http://doc.akka.io/docs/akka/2.0.5/scala/serialization.html#Serializing_ActorRefs>
and
<http://doc.akka.io/docs/akka/2.0.5/general/addressing.html#What_is_an_Actor_Reference_>
bullets 2 and 4. The second reason is that actorStream is a documented,
shipping feature of Spark Streaming, and the documentation makes no mention of
limitations with respect to stream processes closing over ActorRefs—as, again
frankly, it should not, given that I'm constructing those ActorRefs using
Spark's ActorSystem, which is configured to use the RemoteActorRefProvider.
In short, I am doing everything according to Hoyle. I'm afraid I have to stick
to my guns here: Spark Streaming's failure to deserialize these ActorRefs
correctly is a serious defect, the shape of which is probably very similar to
what's described at
<https://groups.google.com/forum/#!topic/akka-user/umVoYFmXogI>. That is, I'm
guessing that Spark is doing "manual serialization" rather than "letting Akka
do the serialization," and for Spark's architecture this makes perfect sense.
It just leaves exactly the gap Prashant and I are describing: there's a bit
more work to do in order to correctly serialize and deserialize ActorRefs, but
what that work consists of is documented—even in the message of the exception
that's thrown!
> I'd really recommend looking into another way to send these messages that
> fails in a more obvious way if you are trying to send them from a remote
> machine. Could be as simple as having a non-Serializable object that handles
> the sending and that you just call into.
Thank you for the suggestion, but no. If we need to find an alternative to
Spark Streaming for this project, that will be a shame, but I need to be clear:
this is not about flaky Akka remoting behavior or serializing ActorSystems
(which aren't Serializable anyway) vs. ActorRefs. It's a known consequence of
doing serialization of ActorRefs "manually" vs. with Akka's serializer via
simply sending messages. Again, perfectly understandable and far from
intuitively obvious as a potential problem. But there it is, and it would be
helpful, as we're still in proof-of-concept phase, to have some information as
to how seriously we should expect this issue to be taken. I will do my best
this weekend to craft an appropriate pull request with a proposed solution if
at all possible.
> Matei
Best regards,
Paul