Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tobias Pfeiffer
Hi,

On Tue, Feb 24, 2015 at 4:34 AM, Tathagata Das 
wrote:

> There are different kinds of checkpointing going on. updateStateByKey
> requires RDD checkpointing which can be enabled only by called
> sparkContext.setCheckpointDirectory. But that does not enable Spark
> Streaming driver checkpoints, which is necessary for recovering from driver
> failures. That is enabled only by streamingContext.checkpoint(...) which
> internally calls sparkContext.setCheckpointDirectory and also enables other
> stuff.
>

I see, thank you very much! I'm happy to see I will not have to rewrite the
entire application :-)

Tobias


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tathagata Das
There are different kinds of checkpointing going on. updateStateByKey
requires RDD checkpointing which can be enabled only by called
sparkContext.setCheckpointDirectory. But that does not enable Spark
Streaming driver checkpoints, which is necessary for recovering from driver
failures. That is enabled only by streamingContext.checkpoint(...) which
internally calls sparkContext.setCheckpointDirectory and also enables other
stuff.

TD

On Mon, Feb 23, 2015 at 1:28 AM, Tobias Pfeiffer  wrote:

> Sean,
>
> thanks for your message!
>
> On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen  wrote:
>>
>> What I haven't investigated is whether you can enable checkpointing
>> for the state in updateStateByKey separately from this mechanism,
>> which is exactly your question. What happens if you set a checkpoint
>> dir, but do *not* use StreamingContext.getOrCreate, but *do* call
>> DStream.checkpoint?
>>
>
> I didn't even use StreamingContext.getOrCreate(), just calling
> streamingContext.checkpoint(...) blew everything up. Well, "blew up" in the
> sense that actor.OneForOneStrategy will print the stack trace of
> the java.io.NotSerializableException every couple of seconds and
> "something" is not going right with execution (I think).
>
> BUT, indeed, just calling sparkContext.setCheckpointDir seems to be
> sufficient for updateStateByKey! Looking at what
> streamingContext.checkpoint() does, I don't get why ;-) and I am not sure
> that this is a robust solution, but in fact that seems to work!
>
> Thanks a lot,
> Tobias
>
>


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tobias Pfeiffer
Sean,

thanks for your message!

On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen  wrote:
>
> What I haven't investigated is whether you can enable checkpointing
> for the state in updateStateByKey separately from this mechanism,
> which is exactly your question. What happens if you set a checkpoint
> dir, but do *not* use StreamingContext.getOrCreate, but *do* call
> DStream.checkpoint?
>

I didn't even use StreamingContext.getOrCreate(), just calling
streamingContext.checkpoint(...) blew everything up. Well, "blew up" in the
sense that actor.OneForOneStrategy will print the stack trace of
the java.io.NotSerializableException every couple of seconds and
"something" is not going right with execution (I think).

BUT, indeed, just calling sparkContext.setCheckpointDir seems to be
sufficient for updateStateByKey! Looking at what
streamingContext.checkpoint() does, I don't get why ;-) and I am not sure
that this is a robust solution, but in fact that seems to work!

Thanks a lot,
Tobias


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Sean Owen
I reached a similar conclusion about checkpointing . It requires your
entire computation to be serializable, even all of the 'local' bits.
Which makes sense. In my case I do not use checkpointing and it is
fine to restart the driver in the case of failure and not try to
recover its state.

What I haven't investigated is whether you can enable checkpointing
for the state in updateStateByKey separately from this mechanism,
which is exactly your question. What happens if you set a checkpoint
dir, but do *not* use StreamingContext.getOrCreate, but *do* call
DStream.checkpoint?

On Mon, Feb 23, 2015 at 8:47 AM, Tobias Pfeiffer  wrote:
> Hi,
>
> On Wed, Jan 28, 2015 at 7:01 PM, Sean Owen  wrote:
>>
>> PS see https://issues.apache.org/jira/browse/SPARK-4196 for an example.
>
>
> OK, I am only realizing now the huge effect this has on my project. Even the
> simplest
>   stream.foreachRDD( ... logger.info(...) ... )
> pieces need to be rewritten when checkpointing is enabled, this is not so
> good. However, it should be possible, I think.
>
> But, I think this makes it impossible to use SparkSQL together with Spark
> Streaming, as every
>   stream.foreachRDD( ... sqlc.sql("SELECT ...") ...)
> will fail with
>   java.io.NotSerializableException:
> scala.util.parsing.combinator.Parsers$$anon$3
> etc.
>
> Actually I don't want checkpointing/failure recovery, I only want to use
> updateStateByKey. Is there maybe any workaround to get updateStateByKey
> working while keeping my foreachRDD functions as they are?
>
> Thanks
> Tobias
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Shao, Saisai
Aha, you’re right, I did a wrong comparison, the reason might be only for 
checkpointing  :).

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Wednesday, January 28, 2015 10:39 AM
To: Shao, Saisai
Cc: user
Subject: Re: Why must the dstream.foreachRDD(...) parameter be serializable?

Hi,

thanks for the answers!

On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai 
mailto:saisai.s...@intel.com>> wrote:
Also this `foreachFunc` is more like an action function of RDD, thinking of 
rdd.foreach(func), in which `func` need to be serializable. So maybe I think 
your way of use it is not a normal way :).

Yeah I totally understand why func in rdd.foreach(func) must be serializable 
(because it's sent to the executors), but I didn't get why a function that's 
not shipped around must be serializable, too.

The explanations made sense, though :-)

Thanks
Tobias




Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Tobias Pfeiffer
Hi,

thanks for the answers!

On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai 
wrote:
>
> Also this `foreachFunc` is more like an action function of RDD, thinking
> of rdd.foreach(func), in which `func` need to be serializable. So maybe I
> think your way of use it is not a normal way :).
>

Yeah I totally understand why func in rdd.foreach(func) must be
serializable (because it's sent to the executors), but I didn't get why a
function that's not shipped around must be serializable, too.

The explanations made sense, though :-)

Thanks
Tobias


RE: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Shao, Saisai
Hey Tobias,

I think one consideration is for checkpoint of DStream which guarantee driver 
fault tolerance.

Also this `foreachFunc` is more like an action function of RDD, thinking of 
rdd.foreach(func), in which `func` need to be serializable. So maybe I think 
your way of use it is not a normal way :).

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Wednesday, January 28, 2015 10:16 AM
To: user
Subject: Why must the dstream.foreachRDD(...) parameter be serializable?

Hi,

I want to do something like

dstream.foreachRDD(rdd => if (someCondition) ssc.stop())

so in particular the function does not touch any element in the RDD and runs 
completely within the driver. However, this fails with a 
NotSerializableException because $outer is not serializable etc. The DStream 
code says:

  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to 
SparkContext.clean
new ForEachDStream(this, context.sparkContext.clean(foreachFunc, 
false)).register()
  }

To be honest, I don't understand the comment. Why must that function be 
serializable even when there is no RDD action involved?

Thanks
Tobias


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Matei Zaharia
I believe this is needed for driver recovery in Spark Streaming. If your Spark 
driver program crashes, Spark Streaming can recover the application by reading 
the set of DStreams and output operations from a checkpoint file (see 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing).
 But to do that, it needs to remember all the operations you're running 
periodically, including those in foreachRDD.

Matei

> On Jan 27, 2015, at 6:15 PM, Tobias Pfeiffer  wrote:
> 
> Hi,
> 
> I want to do something like
> 
> dstream.foreachRDD(rdd => if (someCondition) ssc.stop())
> 
> so in particular the function does not touch any element in the RDD and runs 
> completely within the driver. However, this fails with a 
> NotSerializableException because $outer is not serializable etc. The DStream 
> code says:
> 
>   def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
> // because the DStream is reachable from the outer object here, and 
> because 
> // DStreams can't be serialized with closures, we can't proactively check 
> // it for serializability and so we pass the optional false to 
> SparkContext.clean
> new ForEachDStream(this, context.sparkContext.clean(foreachFunc, 
> false)).register()
>   }
> 
> To be honest, I don't understand the comment. Why must that function be 
> serializable even when there is no RDD action involved?
> 
> Thanks
> Tobias


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Tobias Pfeiffer
Hi,

I want to do something like

dstream.foreachRDD(rdd => if (someCondition) ssc.stop())

so in particular the function does not touch any element in the RDD and
runs completely within the driver. However, this fails with a
NotSerializableException because $outer is not serializable etc. The
DStream code says:

  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
// because the DStream is reachable from the outer object here, and
because
// DStreams can't be serialized with closures, we can't proactively
check
// it for serializability and so we pass the optional false to
SparkContext.clean
new ForEachDStream(this, context.sparkContext.clean(foreachFunc,
false)).register()
  }

To be honest, I don't understand the comment. Why must that function be
serializable even when there is no RDD action involved?

Thanks
Tobias