Re: Why must the dstream.foreachRDD(...) parameter be serializable?
Hi, On Tue, Feb 24, 2015 at 4:34 AM, Tathagata Das wrote: > There are different kinds of checkpointing going on. updateStateByKey > requires RDD checkpointing which can be enabled only by called > sparkContext.setCheckpointDirectory. But that does not enable Spark > Streaming driver checkpoints, which is necessary for recovering from driver > failures. That is enabled only by streamingContext.checkpoint(...) which > internally calls sparkContext.setCheckpointDirectory and also enables other > stuff. > I see, thank you very much! I'm happy to see I will not have to rewrite the entire application :-) Tobias
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
There are different kinds of checkpointing going on. updateStateByKey requires RDD checkpointing which can be enabled only by called sparkContext.setCheckpointDirectory. But that does not enable Spark Streaming driver checkpoints, which is necessary for recovering from driver failures. That is enabled only by streamingContext.checkpoint(...) which internally calls sparkContext.setCheckpointDirectory and also enables other stuff. TD On Mon, Feb 23, 2015 at 1:28 AM, Tobias Pfeiffer wrote: > Sean, > > thanks for your message! > > On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen wrote: >> >> What I haven't investigated is whether you can enable checkpointing >> for the state in updateStateByKey separately from this mechanism, >> which is exactly your question. What happens if you set a checkpoint >> dir, but do *not* use StreamingContext.getOrCreate, but *do* call >> DStream.checkpoint? >> > > I didn't even use StreamingContext.getOrCreate(), just calling > streamingContext.checkpoint(...) blew everything up. Well, "blew up" in the > sense that actor.OneForOneStrategy will print the stack trace of > the java.io.NotSerializableException every couple of seconds and > "something" is not going right with execution (I think). > > BUT, indeed, just calling sparkContext.setCheckpointDir seems to be > sufficient for updateStateByKey! Looking at what > streamingContext.checkpoint() does, I don't get why ;-) and I am not sure > that this is a robust solution, but in fact that seems to work! > > Thanks a lot, > Tobias > >
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
Sean, thanks for your message! On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen wrote: > > What I haven't investigated is whether you can enable checkpointing > for the state in updateStateByKey separately from this mechanism, > which is exactly your question. What happens if you set a checkpoint > dir, but do *not* use StreamingContext.getOrCreate, but *do* call > DStream.checkpoint? > I didn't even use StreamingContext.getOrCreate(), just calling streamingContext.checkpoint(...) blew everything up. Well, "blew up" in the sense that actor.OneForOneStrategy will print the stack trace of the java.io.NotSerializableException every couple of seconds and "something" is not going right with execution (I think). BUT, indeed, just calling sparkContext.setCheckpointDir seems to be sufficient for updateStateByKey! Looking at what streamingContext.checkpoint() does, I don't get why ;-) and I am not sure that this is a robust solution, but in fact that seems to work! Thanks a lot, Tobias
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
I reached a similar conclusion about checkpointing . It requires your entire computation to be serializable, even all of the 'local' bits. Which makes sense. In my case I do not use checkpointing and it is fine to restart the driver in the case of failure and not try to recover its state. What I haven't investigated is whether you can enable checkpointing for the state in updateStateByKey separately from this mechanism, which is exactly your question. What happens if you set a checkpoint dir, but do *not* use StreamingContext.getOrCreate, but *do* call DStream.checkpoint? On Mon, Feb 23, 2015 at 8:47 AM, Tobias Pfeiffer wrote: > Hi, > > On Wed, Jan 28, 2015 at 7:01 PM, Sean Owen wrote: >> >> PS see https://issues.apache.org/jira/browse/SPARK-4196 for an example. > > > OK, I am only realizing now the huge effect this has on my project. Even the > simplest > stream.foreachRDD( ... logger.info(...) ... ) > pieces need to be rewritten when checkpointing is enabled, this is not so > good. However, it should be possible, I think. > > But, I think this makes it impossible to use SparkSQL together with Spark > Streaming, as every > stream.foreachRDD( ... sqlc.sql("SELECT ...") ...) > will fail with > java.io.NotSerializableException: > scala.util.parsing.combinator.Parsers$$anon$3 > etc. > > Actually I don't want checkpointing/failure recovery, I only want to use > updateStateByKey. Is there maybe any workaround to get updateStateByKey > working while keeping my foreachRDD functions as they are? > > Thanks > Tobias > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Why must the dstream.foreachRDD(...) parameter be serializable?
Aha, you’re right, I did a wrong comparison, the reason might be only for checkpointing :). Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Wednesday, January 28, 2015 10:39 AM To: Shao, Saisai Cc: user Subject: Re: Why must the dstream.foreachRDD(...) parameter be serializable? Hi, thanks for the answers! On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai mailto:saisai.s...@intel.com>> wrote: Also this `foreachFunc` is more like an action function of RDD, thinking of rdd.foreach(func), in which `func` need to be serializable. So maybe I think your way of use it is not a normal way :). Yeah I totally understand why func in rdd.foreach(func) must be serializable (because it's sent to the executors), but I didn't get why a function that's not shipped around must be serializable, too. The explanations made sense, though :-) Thanks Tobias
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
Hi, thanks for the answers! On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai wrote: > > Also this `foreachFunc` is more like an action function of RDD, thinking > of rdd.foreach(func), in which `func` need to be serializable. So maybe I > think your way of use it is not a normal way :). > Yeah I totally understand why func in rdd.foreach(func) must be serializable (because it's sent to the executors), but I didn't get why a function that's not shipped around must be serializable, too. The explanations made sense, though :-) Thanks Tobias
RE: Why must the dstream.foreachRDD(...) parameter be serializable?
Hey Tobias, I think one consideration is for checkpoint of DStream which guarantee driver fault tolerance. Also this `foreachFunc` is more like an action function of RDD, thinking of rdd.foreach(func), in which `func` need to be serializable. So maybe I think your way of use it is not a normal way :). Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Wednesday, January 28, 2015 10:16 AM To: user Subject: Why must the dstream.foreachRDD(...) parameter be serializable? Hi, I want to do something like dstream.foreachRDD(rdd => if (someCondition) ssc.stop()) so in particular the function does not touch any element in the RDD and runs completely within the driver. However, this fails with a NotSerializableException because $outer is not serializable etc. The DStream code says: def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } To be honest, I don't understand the comment. Why must that function be serializable even when there is no RDD action involved? Thanks Tobias
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
I believe this is needed for driver recovery in Spark Streaming. If your Spark driver program crashes, Spark Streaming can recover the application by reading the set of DStreams and output operations from a checkpoint file (see https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing). But to do that, it needs to remember all the operations you're running periodically, including those in foreachRDD. Matei > On Jan 27, 2015, at 6:15 PM, Tobias Pfeiffer wrote: > > Hi, > > I want to do something like > > dstream.foreachRDD(rdd => if (someCondition) ssc.stop()) > > so in particular the function does not touch any element in the RDD and runs > completely within the driver. However, this fails with a > NotSerializableException because $outer is not serializable etc. The DStream > code says: > > def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { > // because the DStream is reachable from the outer object here, and > because > // DStreams can't be serialized with closures, we can't proactively check > // it for serializability and so we pass the optional false to > SparkContext.clean > new ForEachDStream(this, context.sparkContext.clean(foreachFunc, > false)).register() > } > > To be honest, I don't understand the comment. Why must that function be > serializable even when there is no RDD action involved? > > Thanks > Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why must the dstream.foreachRDD(...) parameter be serializable?
Hi, I want to do something like dstream.foreachRDD(rdd => if (someCondition) ssc.stop()) so in particular the function does not touch any element in the RDD and runs completely within the driver. However, this fails with a NotSerializableException because $outer is not serializable etc. The DStream code says: def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } To be honest, I don't understand the comment. Why must that function be serializable even when there is no RDD action involved? Thanks Tobias