Re: updateStateByKey not persisting in Spark 1.5.1

2016-01-21 Thread Ted Yu
I searched for checkpoint related methods in various Listener classes but
haven't found any.

Analyzing DAG is tedious and fragile since DAG may change in future Spark
releases.

Cheers

On Thu, Jan 21, 2016 at 8:25 AM, Brian London 
wrote:

> Thanks. It looks like extending my batch duration to 7 seconds is a
> work-around.  I'd like to build a check for the lack of checkpointing in
> our integration tests.  Is there a way to parse the DAG at runtime?
>
> On Wed, Jan 20, 2016 at 2:01 PM Ted Yu  wrote:
>
>> This is related:
>>
>> SPARK-6847
>>
>> FYI
>>
>> On Wed, Jan 20, 2016 at 7:55 AM, Brian London 
>> wrote:
>>
>>> I'm running a streaming job that has two calls to updateStateByKey.
>>> When run in standalone mode both calls to updateStateByKey behave as
>>> expected.  When run on a cluster, however, it appears that the first call
>>> is not being checkpointed as shown in this DAG image:
>>>
>>> http://i.imgur.com/zmQ8O2z.png
>>>
>>> The middle column continues to grow one level deeper every batch until I
>>> get a stack overflow error.  I'm guessing its a problem of the stateRDD not
>>> being persisted, but I can't imagine why they wouldn't be.  I thought
>>> updateStateByKey was supposed to just handle that for you internally.
>>>
>>> Any ideas?
>>>
>>> I'll post stack trace excperpts of the stack overflow if anyone is
>>> interested below:
>>>
>>> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4
>>> times, most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529,
>>> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at
>>> java.lang.Exception.(Exception.java:102) at
>>> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89)
>>> at
>>> java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72)
>>> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606) at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> ...
>>>
>>> And
>>>
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> scala.collection.immutable.$colon$colon in readObject at line 366
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> scala.collection.immutable.$colon$colon in readObject at line 366
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> ...
>>>
>>>


Re: updateStateByKey not persisting in Spark 1.5.1

2016-01-21 Thread Brian London
Thanks. It looks like extending my batch duration to 7 seconds is a
work-around.  I'd like to build a check for the lack of checkpointing in
our integration tests.  Is there a way to parse the DAG at runtime?

On Wed, Jan 20, 2016 at 2:01 PM Ted Yu  wrote:

> This is related:
>
> SPARK-6847
>
> FYI
>
> On Wed, Jan 20, 2016 at 7:55 AM, Brian London 
> wrote:
>
>> I'm running a streaming job that has two calls to updateStateByKey.  When
>> run in standalone mode both calls to updateStateByKey behave as expected.
>> When run on a cluster, however, it appears that the first call is not being
>> checkpointed as shown in this DAG image:
>>
>> http://i.imgur.com/zmQ8O2z.png
>>
>> The middle column continues to grow one level deeper every batch until I
>> get a stack overflow error.  I'm guessing its a problem of the stateRDD not
>> being persisted, but I can't imagine why they wouldn't be.  I thought
>> updateStateByKey was supposed to just handle that for you internally.
>>
>> Any ideas?
>>
>> I'll post stack trace excperpts of the stack overflow if anyone is
>> interested below:
>>
>> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4
>> times, most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529,
>> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at
>> java.lang.Exception.(Exception.java:102) at
>> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89)
>> at
>> java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72)
>> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606) at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> ...
>>
>> And
>>
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> scala.collection.immutable.$colon$colon in readObject at line 366
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> scala.collection.immutable.$colon$colon in readObject at line 366
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> ...
>>
>>


Re: updateStateByKey not persisting in Spark 1.5.1

2016-01-20 Thread Shixiong(Ryan) Zhu
Could you share your log?

On Wed, Jan 20, 2016 at 7:55 AM, Brian London 
wrote:

> I'm running a streaming job that has two calls to updateStateByKey.  When
> run in standalone mode both calls to updateStateByKey behave as expected.
> When run on a cluster, however, it appears that the first call is not being
> checkpointed as shown in this DAG image:
>
> http://i.imgur.com/zmQ8O2z.png
>
> The middle column continues to grow one level deeper every batch until I
> get a stack overflow error.  I'm guessing its a problem of the stateRDD not
> being persisted, but I can't imagine why they wouldn't be.  I thought
> updateStateByKey was supposed to just handle that for you internally.
>
> Any ideas?
>
> I'll post stack trace excperpts of the stack overflow if anyone is
> interested below:
>
> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 times,
> most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529,
> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at
> java.lang.Exception.(Exception.java:102) at
> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89)
> at
> java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606) at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> ...
>
> And
>
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 366
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 366
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 362
> ...
>
>


updateStateByKey not persisting in Spark 1.5.1

2016-01-20 Thread Brian London
I'm running a streaming job that has two calls to updateStateByKey.  When
run in standalone mode both calls to updateStateByKey behave as expected.
When run on a cluster, however, it appears that the first call is not being
checkpointed as shown in this DAG image:

http://i.imgur.com/zmQ8O2z.png

The middle column continues to grow one level deeper every batch until I
get a stack overflow error.  I'm guessing its a problem of the stateRDD not
being persisted, but I can't imagine why they wouldn't be.  I thought
updateStateByKey was supposed to just handle that for you internally.

Any ideas?

I'll post stack trace excperpts of the stack overflow if anyone is
interested below:

Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 times,
most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529,
ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at
java.lang.Exception.(Exception.java:102) at
java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89)
at
java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
...

And

scala.collection.immutable.$colon$colon in readObject at line 362
scala.collection.immutable.$colon$colon in readObject at line 366
scala.collection.immutable.$colon$colon in readObject at line 362
scala.collection.immutable.$colon$colon in readObject at line 362
scala.collection.immutable.$colon$colon in readObject at line 366
scala.collection.immutable.$colon$colon in readObject at line 362
scala.collection.immutable.$colon$colon in readObject at line 362
...


Re: updateStateByKey not persisting in Spark 1.5.1

2016-01-20 Thread Ted Yu
This is related:

SPARK-6847

FYI

On Wed, Jan 20, 2016 at 7:55 AM, Brian London 
wrote:

> I'm running a streaming job that has two calls to updateStateByKey.  When
> run in standalone mode both calls to updateStateByKey behave as expected.
> When run on a cluster, however, it appears that the first call is not being
> checkpointed as shown in this DAG image:
>
> http://i.imgur.com/zmQ8O2z.png
>
> The middle column continues to grow one level deeper every batch until I
> get a stack overflow error.  I'm guessing its a problem of the stateRDD not
> being persisted, but I can't imagine why they wouldn't be.  I thought
> updateStateByKey was supposed to just handle that for you internally.
>
> Any ideas?
>
> I'll post stack trace excperpts of the stack overflow if anyone is
> interested below:
>
> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 times,
> most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529,
> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at
> java.lang.Exception.(Exception.java:102) at
> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89)
> at
> java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606) at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> ...
>
> And
>
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 366
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 366
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 362
> ...
>
>