Ah, I see, I thought calling "!" on an ActorRef required an implicit ActorSystem to be in scope, but maybe that's not so. It seems like the actual issue is that ActorRef can't be sent around with just Java Serialization, but rather need to be sent as an Akka message. Specifically, when they are deserialized, this needs to be done in a thread with an Akka-internal ThreadLocal variable set to point to an actual ActorSystem.
Anyway, there's an important reason why we serialize closures using standard Java Serialization instead of Akka's send, which is that not all our task launch methods go through Akka. When running on Mesos for example, Mesos has its own API for sending tasks to nodes. (More generally, we also want to save the closure *before* the user might modify the state it closes over, so we can get deterministic re-execution.) If there was a way to correctly set the thread-local ActorSystem variable that Akka depends on so that ActorRefs get deserialized, that would be a great solution. Otherwise I'd recommend passing the address as a string URL (akka://host:port/whatever) instead of an ActorRef, and creating an ActorRef on the remote node from it. Hopefully one of these works. Anyway, thanks for bringing up this issue -- it's a confusing one and we should have a recommended solution for it. Matei On Oct 4, 2013, at 1:13 PM, Paul Snively <[email protected]> wrote: > Hi Matei! > > On Oct 4, 2013, at 12:03 PM, Matei Zaharia wrote: > >> Hi Paul, >> >> Just FYI, I'm not sure Akka was designed to pass ActorSystems across >> closures the way you're doing. > > There may be some confusion here: I'm most certainly not closing over an > ActorSystem! > >> Also, there's a bit of a misunderstanding about closures on RDDs. Consider >> this change you made to ActorWordCount: >> >> lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach { >> x => x foreach { x1 => con ! x1 } >> } > > I believe Prashant made that change to see if he could reproduce my issue. He > was able to, but not, I think, for reasons that either you or I expect, about > which more in a moment. > >> You say that using "con" in the foreach on RDD x won't work, but that makes >> a lot of sense -- foreach on RDDs could be running on a completely different >> node! > > As I would hope and expect! > >> Akka doesn't support passing ActorSystems across nodes > > This is the part that's confusing me: no one is asking it to. > >> and in the best case > > Which I would instead call the expected case... :-) > >> what might've happened with this code is that the remote node would send a >> message to the "con" object in your main program, which would then send >> stuff over the network. > > Exactly. What I close over is an ActorRef, not an ActorSystem. ActorRefs are > deliberately Serializable, and, when Akka's Serialization infrastructure is > used to do the serialization and deserialization, ActorRefs sent to different > nodes work exactly as you describe, assuming the Akka configuration includes > the RemoteActorRefProvider, which Spark's, of course, does. > >> Basically, you have to be really aware of when an operation is running on a >> distributed collection versus locally in the main program. RDD.foreach, map, >> etc all run code on remote nodes of the cluster. DStream.foreach is an >> exception (and I'm kind of surprised it's called foreach now, because it >> used to be called foreachRDD) in that it lets you run stuff locally on the >> RDD object. > > While your point is well taken, I can assure you that I'm well aware of this, > and in fact, for this use case, the replication, lineage, exactly-once > semantics, and recovery features of Spark Streaming are precisely what > motivates its use. > >> The fact that Akka passes ActorRefs around makes this really confusing, >> unfortunately, because those can be passed across nodes, and it looks like >> they can fail in weird ways if they're not for a local ActorSystem. > > I'm frankly confused by this, for basically two reasons. One is that I've > been working with Akka's remote actor support for a few years now, and have > never before run into any such issues: ActorRefs serialize and deserialize > fine, becoming remote ActorRefs on the destination node, and messages sent to > them are sent to the originating node, exactly as you would expect and is > documented at > <http://doc.akka.io/docs/akka/2.0.5/scala/serialization.html#Serializing_ActorRefs> > and > <http://doc.akka.io/docs/akka/2.0.5/general/addressing.html#What_is_an_Actor_Reference_> > bullets 2 and 4. The second reason is that actorStream is a documented, > shipping feature of Spark Streaming, and the documentation makes no mention > of limitations with respect to stream processes closing over ActorRefs—as, > again frankly, it should not, given that I'm constructing those ActorRefs > using Spark's ActorSystem, which is configured to use the > RemoteActorRefProvider. > > In short, I am doing everything according to Hoyle. I'm afraid I have to > stick to my guns here: Spark Streaming's failure to deserialize these > ActorRefs correctly is a serious defect, the shape of which is probably very > similar to what's described at > <https://groups.google.com/forum/#!topic/akka-user/umVoYFmXogI>. That is, I'm > guessing that Spark is doing "manual serialization" rather than "letting Akka > do the serialization," and for Spark's architecture this makes perfect sense. > It just leaves exactly the gap Prashant and I are describing: there's a bit > more work to do in order to correctly serialize and deserialize ActorRefs, > but what that work consists of is documented—even in the message of the > exception that's thrown! > >> I'd really recommend looking into another way to send these messages that >> fails in a more obvious way if you are trying to send them from a remote >> machine. Could be as simple as having a non-Serializable object that handles >> the sending and that you just call into. > > Thank you for the suggestion, but no. If we need to find an alternative to > Spark Streaming for this project, that will be a shame, but I need to be > clear: this is not about flaky Akka remoting behavior or serializing > ActorSystems (which aren't Serializable anyway) vs. ActorRefs. It's a known > consequence of doing serialization of ActorRefs "manually" vs. with Akka's > serializer via simply sending messages. Again, perfectly understandable and > far from intuitively obvious as a potential problem. But there it is, and it > would be helpful, as we're still in proof-of-concept phase, to have some > information as to how seriously we should expect this issue to be taken. I > will do my best this weekend to craft an appropriate pull request with a > proposed solution if at all possible. > >> Matei > > Best regards, > Paul > >
