Hey Abhi, Could you post how you use mapWithState? By default, it should do checkpointing every 10 batches. However, there is a known issue that prevents mapWithState from checkpointing in some special cases: https://issues.apache.org/jira/browse/SPARK-6847
On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com> wrote: > Any Insights on this one ? > > > Thanks !!! > Abhi > > On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com> > wrote: > >> I am now trying to use mapWithState in the following way using some >> example codes. But, by looking at the DAG it does not seem to checkpoint >> the state and when restarting the application from checkpoint, it >> re-partitions all the previous batches data from kafka. >> >> static Function3<String, Optional<MyClass>, State<MyClass>, >> Tuple2<String, MyClass>> mappingFunc = >> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, >> MyClass>>() { >> @Override >> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, >> State<MyClass> state) { >> MyClass nullObj = new MyClass(); >> nullObj.setImprLog(null); >> nullObj.setNotifyLog(null); >> MyClass current = one.or(nullObj); >> >> if(current!= null && current.getImprLog() != null && >> current.getMyClassType() == 1){ >> return new Tuple2<>(key, null); >> } >> else if (current.getNotifyLog() != null && current.getMyClassType() == >> 3){ >> MyClass oldState = (state.exists() ? state.get() : nullObj); >> if(oldState!= null && oldState.getNotifyLog() != null){ >> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >> return new Tuple2<>(key, oldState); >> } >> else{ >> return new Tuple2<>(key, null); >> } >> } >> else{ >> return new Tuple2<>(key, null); >> } >> >> } >> }; >> >> >> Please suggest if this is the proper way or am I doing something wrong. >> >> >> Thanks !! >> Abhi >> >> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com> >> wrote: >> >>> If you don't want to update your only option will be updateStateByKey >>> then >>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote: >>> >>>> mapWithState supports checkpoint. >>>> >>>> There has been some bug fix since release of 1.6.0 >>>> e.g. >>>> SPARK-12591 NullPointerException using checkpointed mapWithState with >>>> KryoSerializer >>>> >>>> which is in the upcoming 1.6.1 >>>> >>>> Cheers >>>> >>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand < >>>> abhis.anan...@gmail.com> wrote: >>>> >>>>> Does mapWithState checkpoints the data ? >>>>> >>>>> When my application goes down and is restarted from checkpoint, will >>>>> mapWithState need to recompute the previous batches data ? >>>>> >>>>> Also, to use mapWithState I will need to upgrade my application as I >>>>> am using version 1.4.0 and mapWithState isnt supported there. Is there any >>>>> other work around ? >>>>> >>>>> Cheers!! >>>>> Abhi >>>>> >>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu < >>>>> sebastian....@gmail.com> wrote: >>>>> >>>>>> Looks like mapWithState could help you? >>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I have an use case like follows in my production environment where I >>>>>>> am listening from kafka with slideInterval of 1 min and windowLength of >>>>>>> 2 >>>>>>> hours. >>>>>>> >>>>>>> I have a JavaPairDStream where for each key I am getting the same >>>>>>> key but with different value,which might appear in the same batch or >>>>>>> some >>>>>>> next batch. >>>>>>> >>>>>>> When the key appears second time I need to update a field in value >>>>>>> of previous key with a field in the later key. The keys for which the >>>>>>> combination keys do not come should be rejected after 2 hours. >>>>>>> >>>>>>> At the end of each second I need to output the result to external >>>>>>> database. >>>>>>> >>>>>>> For example : >>>>>>> >>>>>>> Suppose valueX is object of MyClass with fields int a, String b >>>>>>> At t=1sec I am getting >>>>>>> key0,value0(0,"prev0") >>>>>>> key1,value1 (1, "prev1") >>>>>>> key2,value2 (2,"prev2") >>>>>>> key2,value3 (3, "next2") >>>>>>> >>>>>>> Output to database after 1 sec >>>>>>> key2, newValue (2,"next2") >>>>>>> >>>>>>> At t=2 sec getting >>>>>>> key3,value4(4,"prev3") >>>>>>> key1,value5(5,"next1") >>>>>>> >>>>>>> Output to database after 2 sec >>>>>>> key1,newValue(1,"next1") >>>>>>> >>>>>>> At t=3 sec >>>>>>> key4,value6(6,"prev4") >>>>>>> key3,value7(7,"next3") >>>>>>> key5,value5(8,"prev5") >>>>>>> key5,value5(9,"next5") >>>>>>> key0,value0(10,"next0") >>>>>>> >>>>>>> Output to database after 3 sec >>>>>>> key0,newValue(0,"next0") >>>>>>> key3,newValue(4,"next3") >>>>>>> key5,newValue(8,"next5") >>>>>>> >>>>>>> >>>>>>> Please suggest how this can be achieved. >>>>>>> >>>>>>> >>>>>>> Thanks a lot !!!! >>>>>>> Abhi >>>>>>> >>>>>>> >>>>>>> >>>>> >>>> >> >