Hi Matei!

On Oct 4, 2013, at 1:53 PM, Matei Zaharia wrote:

> Ah, I see, I thought calling "!" on an ActorRef required an implicit 
> ActorSystem to be in scope, but maybe that's not so.

You'd think so, wouldn't you? But the situation is stranger in some sense: an 
ActorRef strongly abstracts access to the machinery to resolve the recipient of 
the message, including any relevant ActorSystem(s), since the ActorRef could 
even be to an Actor in a different ActorSystem.

> It seems like the actual issue is that ActorRef can't be sent around with 
> just Java Serialization, but rather need to be sent as an Akka message.

That's my understanding from the e-mail thread and docs I linked to as well.

> Specifically, when they are deserialized, this needs to be done in a thread 
> with an Akka-internal ThreadLocal variable set to point to an actual 
> ActorSystem.

Yep. That's the specific content of the message from the thrown exception, too.

> Anyway, there's an important reason why we serialize closures using standard 
> Java Serialization instead of Akka's send, which is that not all our task 
> launch methods go through Akka. When running on Mesos for example, Mesos has 
> its own API for sending tasks to nodes. (More generally, we also want to save 
> the closure *before* the user might modify the state it closes over, so we 
> can get deterministic re-execution.)

I had intuited that this must be the case; it's what I was alluding to when I 
said it was "understandable" that Spark Streaming would "manually" serialize 
tasks.

> If there was a way to correctly set the thread-local ActorSystem variable 
> that Akka depends on so that ActorRefs get deserialized, that would be a 
> great solution.

<http://doc.akka.io/api/akka/2.0.5/#akka.serialization.JavaSerializer$> :-)

So my request to the Spark Streaming team is precisely that this be included in 
your deserialization code.

> Otherwise I'd recommend passing the address as a string URL 
> (akka://host:port/whatever) instead of an ActorRef, and creating an ActorRef 
> on the remote node from it. Hopefully one of these works.

This workaround does not, presumably because I'm using third-party code 
(spray-io in this case) that participates in my closure and includes ActorRefs 
that it manages. It seems clear enough to me that this will be an issue in the 
general case: "don't close over ActorRefs" is not realistic advice when Spark 
Streaming clients use third-party code and control neither the deserialization 
code nor the construction of the thread that does the deserialization 
(otherwise we could simply use JavaSerializer.currentSystem.withValue(...) { 
... } in our client code, but that also does not work).

> 
> Anyway, thanks for bringing up this issue -- it's a confusing one and we 
> should have a recommended solution for it.

Thank you for your patience in listening to my explanation. Unfortunately, the 
only feasible solution I see is a minor point release of Spark Streaming that 
includes the JavaSerializer.currentSystem.withValue() invocation, again due to 
the fact that you control both the deserialization implementation and the 
construction of the thread doing the deserialization.

> Matei

Best regards,
Paul

Reply via email to