Hi Matei!

On Oct 4, 2013, at 12:03 PM, Matei Zaharia wrote:

> Hi Paul,
> 
> Just FYI, I'm not sure Akka was designed to pass ActorSystems across closures 
> the way you're doing.

There may be some confusion here: I'm most certainly not closing over an 
ActorSystem!

> Also, there's a bit of a misunderstanding about closures on RDDs. Consider 
> this change you made to ActorWordCount:
> 
>  lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach {
>       x => x foreach { x1 => con ! x1 }
>     }

I believe Prashant made that change to see if he could reproduce my issue. He 
was able to, but not, I think, for reasons that either you or I expect, about 
which more in a moment.

> You say that using "con" in the foreach on RDD x won't work, but that makes a 
> lot of sense -- foreach on RDDs could be running on a completely different 
> node!

As I would hope and expect!

> Akka doesn't support passing ActorSystems across nodes

This is the part that's confusing me: no one is asking it to.

> and in the best case

Which I would instead call the expected case... :-)

> what might've happened with this code is that the remote node would send a 
> message to the "con" object in your main program, which would then send stuff 
> over the network.

Exactly. What I close over is an ActorRef, not an ActorSystem. ActorRefs are 
deliberately Serializable, and, when Akka's Serialization infrastructure is 
used to do the serialization and deserialization, ActorRefs sent to different 
nodes work exactly as you describe, assuming the Akka configuration includes 
the RemoteActorRefProvider, which Spark's, of course, does.

> Basically, you have to be really aware of when an operation is running on a 
> distributed collection versus locally in the main program. RDD.foreach, map, 
> etc all run code on remote nodes of the cluster. DStream.foreach is an 
> exception (and I'm kind of surprised it's called foreach now, because it used 
> to be called foreachRDD) in that it lets you run stuff locally on the RDD 
> object.

While your point is well taken, I can assure you that I'm well aware of this, 
and in fact, for this use case, the replication, lineage, exactly-once 
semantics, and recovery features of Spark Streaming are precisely what 
motivates its use.

> The fact that Akka passes ActorRefs around makes this really confusing, 
> unfortunately, because those can be passed across nodes, and it looks like 
> they can fail in weird ways if they're not for a local ActorSystem.

I'm frankly confused by this, for basically two reasons. One is that I've been 
working with Akka's remote actor support for a few years now, and have never 
before run into any such issues: ActorRefs serialize and deserialize fine, 
becoming remote ActorRefs on the destination node, and messages sent to them 
are sent to the originating node, exactly as you would expect and is documented 
at 
<http://doc.akka.io/docs/akka/2.0.5/scala/serialization.html#Serializing_ActorRefs>
 and 
<http://doc.akka.io/docs/akka/2.0.5/general/addressing.html#What_is_an_Actor_Reference_>
 bullets 2 and 4. The second reason is that actorStream is a documented, 
shipping feature of Spark Streaming, and the documentation makes no mention of 
limitations with respect to stream processes closing over ActorRefs—as, again 
frankly, it should not, given that I'm constructing those ActorRefs using 
Spark's ActorSystem, which is configured to use the RemoteActorRefProvider.

In short, I am doing everything according to Hoyle. I'm afraid I have to stick 
to my guns here: Spark Streaming's failure to deserialize these ActorRefs 
correctly is a serious defect, the shape of which is probably very similar to 
what's described at 
<https://groups.google.com/forum/#!topic/akka-user/umVoYFmXogI>. That is, I'm 
guessing that Spark is doing "manual serialization" rather than "letting Akka 
do the serialization," and for Spark's architecture this makes perfect sense. 
It just leaves exactly the gap Prashant and I are describing: there's a bit 
more work to do in order to correctly serialize and deserialize ActorRefs, but 
what that work consists of is documented—even in the message of the exception 
that's thrown!

> I'd really recommend looking into another way to send these messages that 
> fails in a more obvious way if you are trying to send them from a remote 
> machine. Could be as simple as having a non-Serializable object that handles 
> the sending and that you just call into.

Thank you for the suggestion, but no. If we need to find an alternative to 
Spark Streaming for this project, that will be a shame, but I need to be clear: 
this is not about flaky Akka remoting behavior or serializing ActorSystems 
(which aren't Serializable anyway) vs. ActorRefs. It's a known consequence of 
doing serialization of ActorRefs "manually" vs. with Akka's serializer via 
simply sending messages. Again, perfectly understandable and far from 
intuitively obvious as a potential problem. But there it is, and it would be 
helpful, as we're still in proof-of-concept phase, to have some information as 
to how seriously we should expect this issue to be taken. I will do my best 
this weekend to craft an appropriate pull request with a proposed solution if 
at all possible.

> Matei

Best regards,
Paul


Reply via email to