PFA, Flink_checkpoint_time.png in relation to this issue. On Thu, Sep 22, 2016 at 3:38 PM, Chakravarthy varaga < chakravarth...@gmail.com> wrote:
> Hi Aljoscha & Fabian, > > Finally I got this working. Thanks for your help. In terms persisting > the state (for S2), I tried to use checkpoint every 10 Secs using a > FsStateBackend... What I notice is that the checkpoint duration is almost > 2 minutes for many cases, while for the other cases it varies from 100 ms > to 1.5 minutes frequently. > > The pseudocode is as below: > > KeyedStream<String, String> ks1 = ds1.keyBy("*") ; > KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into > k-v pairs).keyBy(0); > > ks1.connect(ks2).flatMap(X); > //X is a CoFlatMapFunction that inserts and removes elements from ks2 > into a key-value state member. Elements from ks1 are matched against that > state. > > //ks1 is streaming about 100K events/sec from kafka topic > //ks2 is streaming about 1 event every 10 minutes... Precisely when > the 1st event is consumed from this stream, checkpoint takes 2 minutes > straightaway. > > The version of flink is 1.1.2 > > Best Regards > CVP > > On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> you don't need the BlockedEventState class, you should be able to just do >> this: >> >> private transient ValueState<BlockedRoadInfo> blockedRoads; >> ............ >> @Override >> public void open(final org.apache.flink.configuration.Configuration >> parameters) throws Exception { >> final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = >> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", >> TypeInformation.of(BlockedRoadInfo.class), null); >> blockedRoads = getRuntimeContext().getState(blockedStateDesc); >> >> }; >> >> } >> >> Cheers, >> Aljoscha >> >> >> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga < >> chakravarth...@gmail.com> wrote: >> >>> Hi Fabian, >>> >>> I'm coding to check if your proposal works and hit with an issue >>> with ClassCastException. >>> >>> >>> // Here is my Value that has state information.....an implementation >>> of my value state... where the key is a Double value... on connected stream >>> ks2 >>> >>> public class BlockedEventState implements >>> ValueState<BlockedRoadInfo> { >>> >>> public BlockedRoadInfo blockedRoad; >>> >>> @Override >>> public void clear() { >>> blockedRoad = null; >>> >>> } >>> >>> @Override >>> public BlockedRoadInfo value() throws IOException { >>> return blockedRoad; >>> } >>> >>> @Override >>> public void update(final BlockedRoadInfo value) throws IOException { >>> blockedRoad = value; >>> } >>> } >>> >>> //BlockedRoadInfo class... >>> public class BlockedRoadInfo { >>> long maxLink; >>> long minLink; >>> double blockedEventId; >>> ....setters & ... getters >>> } >>> >>> /// new RichCoFlatMapFunction() { >>> >>> private transient BlockedEventState blockedRoads; >>> ............ >>> @Override >>> public void open(final org.apache.flink.configuration.Configuration >>> parameters) throws Exception { >>> final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc = >>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates", >>> TypeInformation.of(BlockedRoadInfo.class), null); >>> blockedRoads = (BlockedEventState) getRuntimeContext().getState(b >>> lockedStateDesc); * // FAILS HERE WITH CLASSCAST* >>> >>> }; >>> >>> } >>> >>> >>> >>> >>> *Caused by: java.lang.ClassCastException: >>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to >>> com.ericsson.components.aia.io >>> <http://com.ericsson.components.aia.io>t.volvo.state.BlockedEventState* >>> >>> >>> >>> *I have tried to set the state backend to both MemState and >>> FsState...streamEnv.setStateBackend(new >>> FsStateBackend("file:///tmp/flink/checkpoints"));* >>> >>> >>> >>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Not sure if I got your requirements right, but would this work? >>>> >>>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ; >>>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into >>>> k-v pairs).keyBy(0); >>>> >>>> ks1.connect(ks2).flatMap(X) >>>> >>>> X is a CoFlatMapFunction that inserts and removes elements from ks2 >>>> into a key-value state member. Elements from ks1 are matched against that >>>> state. >>>> >>>> Best, Fabian >>>> >>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga < >>>> chakravarth...@gmail.com>: >>>> >>>>> Hi Fabian, >>>>> >>>>> First of all thanks for all your prompt responses. With regards >>>>> to 2) Multiple looks ups, I have to clarify what I mean by that... >>>>> >>>>> DS1<String> elementKeyStream = stream1.map(String<>); this maps >>>>> each of the streaming elements into string mapped value... >>>>> DS2<T> = stream2.xxx(); // where stream2 is a kafka source >>>>> stream, as you proposed.. xxx() should be my function() which splits the >>>>> string and generates key1:<value1>, key2:<value2>, key3:<value3> >>>>> ....keyN:<value4> >>>>> >>>>> Now, >>>>> I wish to map elementKeyStream with look ups within (key1, >>>>> key2...keyN) where key1, key2.. keyN and their respective values should be >>>>> available across the cluster... >>>>> >>>>> Thanks a million ! >>>>> CVP >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> That depends. >>>>>> 1) Growing/Shrinking: This should work. New entries can always be >>>>>> inserted. In order to remove entries from the k-v-state you have to set >>>>>> the >>>>>> value to null. Note that you need an explicit delete-value record to >>>>>> trigger the eviction. >>>>>> 2) Multiple lookups: This does only work if all lookups are >>>>>> independent from each other. You can partition DS1 only on a single key >>>>>> and >>>>>> the other keys might be located on different shards. A workaround might >>>>>> be >>>>>> to duplicate S1 events for each key that they need to look up. However, >>>>>> you >>>>>> might need to collect events from the same S1 event after the join. If >>>>>> that >>>>>> does not work for you, the only thing that comes to my mind is to >>>>>> broadcast >>>>>> the state and keep a full local copy in each operator. >>>>>> >>>>>> Let me add one more thing regarding the upcoming rescaling feature. >>>>>> If this is interesting for you, rescaling will also work (maybe not in >>>>>> the >>>>>> first version) for broadcasted state, i.e. state which is the same on all >>>>>> parallel operator instances. >>>>>> >>>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga < >>>>>> chakravarth...@gmail.com>: >>>>>> >>>>>>> I'm understanding this better with your explanation.. >>>>>>> With this use case, each element in DS1 has to look up against a >>>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the >>>>>>> no., >>>>>>> of keys.... will the key-value shard work in this case? >>>>>>> >>>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Operator state is always local in Flink. However, with key-value >>>>>>>> state, you can have something which behaves kind of similar to a >>>>>>>> distribute >>>>>>>> hashmap, because each operator holds a different shard/partition of the >>>>>>>> hashtable. >>>>>>>> >>>>>>>> If you have to do only a single key lookup for each element of DS1, >>>>>>>> you should think about partitioning both streams (keyBy) and writing >>>>>>>> the >>>>>>>> state into Flink's key-value state [1]. >>>>>>>> >>>>>>>> This will have several benefits: >>>>>>>> 1) State does not need to be replicated >>>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can >>>>>>>> reside on disk. You are not bound to the memory of the JVM. >>>>>>>> 3) Flink takes care of the look-up. No need to have your own >>>>>>>> hashmap. >>>>>>>> 4) It will only be possible to rescale jobs with key-value state >>>>>>>> (this feature is currently under development). >>>>>>>> >>>>>>>> If using the key-value state is possible, I'd go for that. >>>>>>>> >>>>>>>> Best, Fabian >>>>>>>> >>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>>>>> apis/streaming/state.html >>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>>>>>> apis/streaming/state_backends.html >>>>>>>> >>>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga < >>>>>>>> chakravarth...@gmail.com>: >>>>>>>> >>>>>>>>> certainly, what I thought as well... >>>>>>>>> The output of DataStream2 could be in 1000s and there are state >>>>>>>>> updates... >>>>>>>>> reading this topic from the other job, job1, is okie. >>>>>>>>> However, assuming that we maintain this state into a collection, >>>>>>>>> and updating the state (by reading from the topic) in this >>>>>>>>> collection, will >>>>>>>>> this be replicated across the cluster within this job1 ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the >>>>>>>>>> other job an option? >>>>>>>>>> >>>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga < >>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>> >>>>>>>>>>> Hi Fabian, >>>>>>>>>>> >>>>>>>>>>> Thanks for your response. Apparently these DataStream >>>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink >>>>>>>>>>> applications >>>>>>>>>>> running within the same cluster. >>>>>>>>>>> DataStream2 (from Job2) applies transformations and updates >>>>>>>>>>> a 'cache' on which (Job1) needs to work on. >>>>>>>>>>> Our intention is to not use the external key/value store as >>>>>>>>>>> we are trying to localize the cache within the cluster. >>>>>>>>>>> Is there a way? >>>>>>>>>>> >>>>>>>>>>> Best Regards >>>>>>>>>>> CVP >>>>>>>>>>> >>>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com >>>>>>>>>>> > wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> Flink does not provide shared state. >>>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such >>>>>>>>>>>> that each operator has its own local copy of the state. >>>>>>>>>>>> >>>>>>>>>>>> If that does not work for you because the state is too large >>>>>>>>>>>> and if it is possible to partition the state (and both streams), >>>>>>>>>>>> you can >>>>>>>>>>>> also use keyBy instead of broadcast. >>>>>>>>>>>> >>>>>>>>>>>> Finally, you can use an external system like a KeyValue Store >>>>>>>>>>>> or In-Memory store like Apache Ignite to hold your distributed >>>>>>>>>>>> collection. >>>>>>>>>>>> >>>>>>>>>>>> Best, Fabian >>>>>>>>>>>> >>>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga < >>>>>>>>>>>> chakravarth...@gmail.com>: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Team, >>>>>>>>>>>>> >>>>>>>>>>>>> Can someone help me here? Appreciate any response ! >>>>>>>>>>>>> >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> Varaga >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga < >>>>>>>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Team, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm working on a Flink Streaming application. The data is >>>>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly >>>>>>>>>>>>>> 100K/sec. >>>>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1. >>>>>>>>>>>>>> This application also uses another DataStream, call it >>>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements >>>>>>>>>>>>>> of this >>>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally >>>>>>>>>>>>>> updates a >>>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application >>>>>>>>>>>>>> should >>>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 >>>>>>>>>>>>>> application >>>>>>>>>>>>>> could check the state of the values in this collection. Is there >>>>>>>>>>>>>> a way to >>>>>>>>>>>>>> do this in Flink? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I don't see any Shared Collection used within the cluster? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> CVP >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >