Hi Paul,

Just FYI, I'm not sure Akka was designed to pass ActorSystems across closures 
the way you're doing. Also, there's a bit of a misunderstanding about closures 
on RDDs. Consider this change you made to ActorWordCount:

 lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).foreach {
      x => x foreach { x1 => con ! x1 }
    }

You say that using "con" in the foreach on RDD x won't work, but that makes a 
lot of sense -- foreach on RDDs could be running on a completely different 
node! Akka doesn't support passing ActorSystems across nodes, and in the best 
case, what might've happened with this code is that the remote node would send 
a message to the "con" object in your main program, which would then send stuff 
over the network. Basically, you have to be really aware of when an operation 
is running on a distributed collection versus locally in the main program. 
RDD.foreach, map, etc all run code on remote nodes of the cluster. 
DStream.foreach is an exception (and I'm kind of surprised it's called foreach 
now, because it used to be called foreachRDD) in that it lets you run stuff 
locally on the RDD object.

The fact that Akka passes ActorRefs around makes this really confusing, 
unfortunately, because those can be passed across nodes, and it looks like they 
can fail in weird ways if they're not for a local ActorSystem. I'd really 
recommend looking into another way to send these messages that fails in a more 
obvious way if you are trying to send them from a remote machine. Could be as 
simple as having a non-Serializable object that handles the sending and that 
you just call into.

Matei


On Oct 4, 2013, at 8:27 AM, Paul Snively <psniv...@icloud.com> wrote:

> Hi Prashant!
> 
>> That's very interesting. So this appears to be a general issue with 
>> ActorStream, then?
>> 
>> With all DStream's I suppose. 
>  
> OK, so that seems like a bug independent of the issue around having no 
> ActorSystem in scope for runJob(), then.
> 
>> I have not spent enough time on spray-io yet, but if it is implemented as 
>> ActorExtension, then using actorStream is the best fit. For example I have 
>> used akka-camel/akka-zeromq to receive streams from variety of endpoints. I 
>> am not suggesting you should use it, although it has SSL support etc. That 
>> was the original motivation while designing it. But if it can be somehow 
>> used in a different and more useful way, I had encourage it. 
> 
>  I'm reluctant to add another abstraction layer here, since the motivation 
> for using spray-io in the first place is its extreme scalability, and we 
> mostly want to rely on Spark Streaming here for managing availability and 
> recovery. To me, it seems like we've identified two obvious issues: one 
> revolving around foreach ... foreach ... over a DStream not working 
> correctly, the other revolving around runJob() not having an ActorSystem in 
> scope. While I understand that there is much other work to do, both of these 
> seem, first of all, rather serious, and secondly, as if they should have 
> relatively easy fixes. Of course, I'm open to being corrected in this belief. 
> :-)
> 
>> I had like to understand a bit further, as to what are your ultimate 
>> intention in a bit more detail on architecture from a higher level. Probably 
>> I am hinting on a redesign.
>  
> That's certainly a valid option!
> 
> Without discussing things I'm not at liberty to, all I can really say is that 
> I'm trying to build a real-time streaming server that takes a connection from 
> a client that expects to be able to then send arbitrary bytes to the server. 
> On the server, I want a DStream per socket connection to process the incoming 
> data. There's other stuff, such as this needing to support TLS mutual 
> authentication, but I'll get there when I have data flowing. :-)
> 
> It's really quite simple, or at least it should be, and again, I believe it 
> will be, when the bugs we've identified are fixed.
> 
>> In your case, the code is still complicated to spot where exactly we are 
>> leaking an ActorRef in the test case ?
>  
> If it helps, the only ActorRef in my code now is in the SpraySslClient, i.e. 
> nowhere that's being serialized.
> 
>> or in our Handler. 
>  
> I have to respectfully suggest it is indeed this. OTOH, is it really a 
> "leak?" I am, in fact, closing over objects that, sooner or later, construct 
> ActorRefs. The whole point of my task is to do something with data coming 
> from an ActorRef that represents a TCP connection, then send the result back 
> to that same ActorRef. I guess I'm having a little trouble understanding how 
> many actorStream use cases there are that wouldn't involve ultimately closing 
> over at least one ActorRef. :-)
> 
>> It is also possible to start multiple actorStream receiver in advance and 
>> then using their supervisor to start more actor as receiver for you, for 
>> example on each connection (if that makes any sense, take a look at it). 
>  
> I'm afraid I didn't quite follow that, but it sounds interesting!
> 
> At this point, though, I have to suggest that the path forward, simply for 
> Spark Streaming to work correctly out of the box, is clear. I have created an 
> issue against the missing ActorSystem in runJob() in JIRA, but it sounds like 
> we need another one for the foreach ... foreach ... issue. I'm afraid I don't 
> know exactly how to characterize that issue. Could I possibly impose upon you 
> to create that ticket?
> 
> Thanks again for digging into tis!
> Paul
> 

Reply via email to