Hey Abhi,

Could you post how you use mapWithState? By default, it should do
checkpointing every 10 batches.
However, there is a known issue that prevents mapWithState from
checkpointing in some special cases:
https://issues.apache.org/jira/browse/SPARK-6847

On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> Any Insights on this one ?
>
>
> Thanks !!!
> Abhi
>
> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com>
> wrote:
>
>> I am now trying to use mapWithState in the following way using some
>> example codes. But, by looking at the DAG it does not seem to checkpoint
>> the state and when restarting the application from checkpoint, it
>> re-partitions all the previous batches data from kafka.
>>
>> static Function3<String, Optional<MyClass>, State<MyClass>,
>> Tuple2<String, MyClass>> mappingFunc =
>> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
>> MyClass>>() {
>> @Override
>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
>> State<MyClass> state) {
>> MyClass nullObj = new MyClass();
>> nullObj.setImprLog(null);
>> nullObj.setNotifyLog(null);
>> MyClass current = one.or(nullObj);
>>
>> if(current!= null && current.getImprLog() != null &&
>> current.getMyClassType() == 1){
>> return new Tuple2<>(key, null);
>> }
>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>> 3){
>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>> if(oldState!= null && oldState.getNotifyLog() != null){
>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>> return new Tuple2<>(key, oldState);
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>>
>> }
>> };
>>
>>
>> Please suggest if this is the proper way or am I doing something wrong.
>>
>>
>> Thanks !!
>> Abhi
>>
>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com>
>> wrote:
>>
>>> If you don't want to update your only option will be updateStateByKey
>>> then
>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>
>>>> mapWithState supports checkpoint.
>>>>
>>>> There has been some bug fix since release of 1.6.0
>>>> e.g.
>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>>>> KryoSerializer
>>>>
>>>> which is in the upcoming 1.6.1
>>>>
>>>> Cheers
>>>>
>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>>> abhis.anan...@gmail.com> wrote:
>>>>
>>>>> Does mapWithState checkpoints the data ?
>>>>>
>>>>> When my application goes down and is restarted from checkpoint, will
>>>>> mapWithState need to recompute the previous batches data ?
>>>>>
>>>>> Also, to use mapWithState I will need to upgrade my application as I
>>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>>> other work around ?
>>>>>
>>>>> Cheers!!
>>>>> Abhi
>>>>>
>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <
>>>>> sebastian....@gmail.com> wrote:
>>>>>
>>>>>> Looks like mapWithState could help you?
>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I have an use case like follows in my production environment where I
>>>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 
>>>>>>> 2
>>>>>>> hours.
>>>>>>>
>>>>>>> I have a JavaPairDStream where for each key I am getting the same
>>>>>>> key but with different value,which might appear in the same batch or 
>>>>>>> some
>>>>>>> next batch.
>>>>>>>
>>>>>>> When the key appears second time I need to update a field in value
>>>>>>> of previous key with a field in the later key. The keys for which the
>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>
>>>>>>> At the end of each second I need to output the result to external
>>>>>>> database.
>>>>>>>
>>>>>>> For example :
>>>>>>>
>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>> At t=1sec I am getting
>>>>>>> key0,value0(0,"prev0")
>>>>>>> key1,value1 (1, "prev1")
>>>>>>> key2,value2 (2,"prev2")
>>>>>>> key2,value3 (3, "next2")
>>>>>>>
>>>>>>> Output to database after 1 sec
>>>>>>> key2, newValue (2,"next2")
>>>>>>>
>>>>>>> At t=2 sec getting
>>>>>>> key3,value4(4,"prev3")
>>>>>>> key1,value5(5,"next1")
>>>>>>>
>>>>>>> Output to database after 2 sec
>>>>>>> key1,newValue(1,"next1")
>>>>>>>
>>>>>>> At t=3 sec
>>>>>>> key4,value6(6,"prev4")
>>>>>>> key3,value7(7,"next3")
>>>>>>> key5,value5(8,"prev5")
>>>>>>> key5,value5(9,"next5")
>>>>>>> key0,value0(10,"next0")
>>>>>>>
>>>>>>> Output to database after 3 sec
>>>>>>> key0,newValue(0,"next0")
>>>>>>> key3,newValue(4,"next3")
>>>>>>> key5,newValue(8,"next5")
>>>>>>>
>>>>>>>
>>>>>>> Please suggest how this can be achieved.
>>>>>>>
>>>>>>>
>>>>>>> Thanks a lot !!!!
>>>>>>> Abhi
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>
>

Reply via email to