I searched for checkpoint related methods in various Listener classes but
haven't found any.

Analyzing DAG is tedious and fragile since DAG may change in future Spark
releases.

Cheers

On Thu, Jan 21, 2016 at 8:25 AM, Brian London <brianmlon...@gmail.com>
wrote:

> Thanks. It looks like extending my batch duration to 7 seconds is a
> work-around.  I'd like to build a check for the lack of checkpointing in
> our integration tests.  Is there a way to parse the DAG at runtime?
>
> On Wed, Jan 20, 2016 at 2:01 PM Ted Yu <yuzhih...@gmail.com> wrote:
>
>> This is related:
>>
>> SPARK-6847
>>
>> FYI
>>
>> On Wed, Jan 20, 2016 at 7:55 AM, Brian London <brianmlon...@gmail.com>
>> wrote:
>>
>>> I'm running a streaming job that has two calls to updateStateByKey.
>>> When run in standalone mode both calls to updateStateByKey behave as
>>> expected.  When run on a cluster, however, it appears that the first call
>>> is not being checkpointed as shown in this DAG image:
>>>
>>> http://i.imgur.com/zmQ8O2z.png
>>>
>>> The middle column continues to grow one level deeper every batch until I
>>> get a stack overflow error.  I'm guessing its a problem of the stateRDD not
>>> being persisted, but I can't imagine why they wouldn't be.  I thought
>>> updateStateByKey was supposed to just handle that for you internally.
>>>
>>> Any ideas?
>>>
>>> I'll post stack trace excperpts of the stack overflow if anyone is
>>> interested below:
>>>
>>> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4
>>> times, most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529,
>>> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at
>>> java.lang.Exception.<init>(Exception.java:102) at
>>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:89)
>>> at
>>> java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
>>> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606) at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> ...
>>>
>>> And
>>>
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> scala.collection.immutable.$colon$colon in readObject at line 366
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> scala.collection.immutable.$colon$colon in readObject at line 366
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> scala.collection.immutable.$colon$colon in readObject at line 362
>>> ...
>>>
>>>

Reply via email to