PFA, Flink_checkpoint_time.png in relation to this issue.

On Thu, Sep 22, 2016 at 3:38 PM, Chakravarthy varaga <
chakravarth...@gmail.com> wrote:

> Hi Aljoscha & Fabian,
>
>     Finally I got this working. Thanks for your help. In terms persisting
> the state (for S2), I tried to use checkpoint every 10 Secs using a
> FsStateBackend... What I notice is that the checkpoint duration is  almost
> 2 minutes for many cases, while for the other cases it varies from 100 ms
> to 1.5 minutes frequently.
>
>     The pseudocode is as below:
>
>      KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>      KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
> k-v pairs).keyBy(0);
>
>      ks1.connect(ks2).flatMap(X);
>      //X is a CoFlatMapFunction that inserts and removes elements from ks2
> into a key-value state member. Elements from ks1 are matched against that
> state.
>
>      //ks1 is streaming about 100K events/sec from kafka topic
>      //ks2 is streaming about 1 event every 10 minutes... Precisely when
> the 1st event is consumed from this stream, checkpoint takes 2 minutes
> straightaway.
>
>     The version of flink is 1.1.2
>
>  Best Regards
> CVP
>
> On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> you don't need the BlockedEventState class, you should be able to just do
>> this:
>>
>> private transient ValueState<BlockedRoadInfo> blockedRoads;
>>          ............
>>       @Override
>>     public void open(final org.apache.flink.configuration.Configuration
>> parameters) throws Exception {
>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>         blockedRoads = getRuntimeContext().getState(blockedStateDesc);
>>
>>     };
>>
>>   }
>>
>> Cheers,
>> Aljoscha
>>
>>
>> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>>     I'm coding to check if your proposal works and hit with an issue
>>> with ClassCastException.
>>>
>>>
>>>     // Here is my Value that has state information.....an implementation
>>> of my value state... where the key is a Double value... on connected stream
>>> ks2
>>>
>>>     public class BlockedEventState implements
>>> ValueState<BlockedRoadInfo> {
>>>
>>>     public BlockedRoadInfo blockedRoad;
>>>
>>>     @Override
>>>     public void clear() {
>>>         blockedRoad = null;
>>>
>>>     }
>>>
>>>     @Override
>>>     public BlockedRoadInfo value() throws IOException {
>>>         return blockedRoad;
>>>     }
>>>
>>>     @Override
>>>     public void update(final BlockedRoadInfo value) throws IOException {
>>>         blockedRoad = value;
>>>     }
>>> }
>>>
>>>        //BlockedRoadInfo class...
>>>         public class BlockedRoadInfo {
>>>             long maxLink;
>>>             long minLink;
>>>             double blockedEventId;
>>>     ....setters & ... getters
>>> }
>>>
>>> /// new RichCoFlatMapFunction() {
>>>
>>> private transient BlockedEventState blockedRoads;
>>>          ............
>>>       @Override
>>>     public void open(final org.apache.flink.configuration.Configuration
>>> parameters) throws Exception {
>>>         final ValueStateDescriptor<BlockedRoadInfo> blockedStateDesc =
>>> new ValueStateDescriptor<BlockedRoadInfo>("BlockedEventStates",
>>>                 TypeInformation.of(BlockedRoadInfo.class), null);
>>>         blockedRoads = (BlockedEventState) getRuntimeContext().getState(b
>>> lockedStateDesc); * // FAILS HERE WITH CLASSCAST*
>>>
>>>     };
>>>
>>>   }
>>>
>>>
>>>
>>>
>>> *Caused by: java.lang.ClassCastException:
>>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
>>> com.ericsson.components.aia.io
>>> <http://com.ericsson.components.aia.io>t.volvo.state.BlockedEventState*
>>>
>>>
>>>
>>> *I have tried to set the state backend to both MemState and
>>> FsState...streamEnv.setStateBackend(new
>>> FsStateBackend("file:///tmp/flink/checkpoints"));*
>>>
>>>
>>>
>>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Not sure if I got your requirements right, but would this work?
>>>>
>>>> KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
>>>> KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into
>>>> k-v pairs).keyBy(0);
>>>>
>>>> ks1.connect(ks2).flatMap(X)
>>>>
>>>> X is a CoFlatMapFunction that inserts and removes elements from ks2
>>>> into a key-value state member. Elements from ks1 are matched against that
>>>> state.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <
>>>> chakravarth...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>>      First of all thanks for all your prompt responses. With regards
>>>>> to 2) Multiple looks ups, I have to clarify what I mean by that...
>>>>>
>>>>>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps
>>>>> each of the streaming elements into string mapped value...
>>>>>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source
>>>>> stream, as you proposed.. xxx() should be my function() which splits the
>>>>> string and generates key1:<value1>, key2:<value2>, key3:<value3>
>>>>> ....keyN:<value4>
>>>>>
>>>>>      Now,
>>>>>         I wish to map elementKeyStream with look ups within (key1,
>>>>> key2...keyN) where key1, key2.. keyN and their respective values should be
>>>>> available across the cluster...
>>>>>
>>>>> Thanks a million !
>>>>> CVP
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That depends.
>>>>>> 1) Growing/Shrinking: This should work. New entries can always be
>>>>>> inserted. In order to remove entries from the k-v-state you have to set 
>>>>>> the
>>>>>> value to null. Note that you need an explicit delete-value record to
>>>>>> trigger the eviction.
>>>>>> 2) Multiple lookups: This does only work if all lookups are
>>>>>> independent from each other. You can partition DS1 only on a single key 
>>>>>> and
>>>>>> the other keys might be located on different shards. A workaround might 
>>>>>> be
>>>>>> to duplicate S1 events for each key that they need to look up. However, 
>>>>>> you
>>>>>> might need to collect events from the same S1 event after the join. If 
>>>>>> that
>>>>>> does not work for you, the only thing that comes to my mind is to 
>>>>>> broadcast
>>>>>> the state and keep a full local copy in each operator.
>>>>>>
>>>>>> Let me add one more thing regarding the upcoming rescaling feature.
>>>>>> If this is interesting for you, rescaling will also work (maybe not in 
>>>>>> the
>>>>>> first version) for broadcasted state, i.e. state which is the same on all
>>>>>> parallel operator instances.
>>>>>>
>>>>>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarth...@gmail.com>:
>>>>>>
>>>>>>> I'm understanding this better with your explanation..
>>>>>>> With this use case,    each element in DS1 has to look up against a
>>>>>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the 
>>>>>>> no.,
>>>>>>> of keys.... will the key-value shard work in this case?
>>>>>>>
>>>>>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Operator state is always local in Flink. However, with key-value
>>>>>>>> state, you can have something which behaves kind of similar to a 
>>>>>>>> distribute
>>>>>>>> hashmap, because each operator holds a different shard/partition of the
>>>>>>>> hashtable.
>>>>>>>>
>>>>>>>> If you have to do only a single key lookup for each element of DS1,
>>>>>>>> you should think about partitioning both streams (keyBy) and writing 
>>>>>>>> the
>>>>>>>> state into Flink's key-value state [1].
>>>>>>>>
>>>>>>>> This will have several benefits:
>>>>>>>> 1) State does not need to be replicated
>>>>>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>>>>>> reside on disk. You are not bound to the memory of the JVM.
>>>>>>>> 3) Flink takes care of the look-up. No need to have your own
>>>>>>>> hashmap.
>>>>>>>> 4) It will only be possible to rescale jobs with key-value state
>>>>>>>> (this feature is currently under development).
>>>>>>>>
>>>>>>>> If using the key-value state is possible, I'd go for that.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>>>> apis/streaming/state.html
>>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>>>>>> apis/streaming/state_backends.html
>>>>>>>>
>>>>>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>
>>>>>>>>> certainly, what I thought as well...
>>>>>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>>>>>> updates...
>>>>>>>>> reading this topic from the other job, job1, is okie.
>>>>>>>>> However, assuming that we maintain this state into a collection,
>>>>>>>>> and updating the state (by reading from the topic) in this 
>>>>>>>>> collection, will
>>>>>>>>> this be replicated across the cluster within this job1 ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the
>>>>>>>>>> other job an option?
>>>>>>>>>>
>>>>>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>>
>>>>>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink 
>>>>>>>>>>> applications
>>>>>>>>>>> running within the same cluster.
>>>>>>>>>>>     DataStream2 (from Job2) applies transformations and updates
>>>>>>>>>>> a 'cache' on which (Job1) needs to work on.
>>>>>>>>>>>     Our intention is to not use the external key/value store as
>>>>>>>>>>> we are trying to localize the cache within the cluster.
>>>>>>>>>>>     Is there a way?
>>>>>>>>>>>
>>>>>>>>>>> Best Regards
>>>>>>>>>>> CVP
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Flink does not provide shared state.
>>>>>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such
>>>>>>>>>>>> that each operator has its own local copy of the state.
>>>>>>>>>>>>
>>>>>>>>>>>> If that does not work for you because the state is too large
>>>>>>>>>>>> and if it is possible to partition the state (and both streams), 
>>>>>>>>>>>> you can
>>>>>>>>>>>> also use keyBy instead of broadcast.
>>>>>>>>>>>>
>>>>>>>>>>>> Finally, you can use an external system like a KeyValue Store
>>>>>>>>>>>> or In-Memory store like Apache Ignite to hold your distributed 
>>>>>>>>>>>> collection.
>>>>>>>>>>>>
>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>
>>>>>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>
>>>>>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>> Varaga
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Team,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 
>>>>>>>>>>>>>> 100K/sec.
>>>>>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements 
>>>>>>>>>>>>>> of this
>>>>>>>>>>>>>> DataStream2 involves in a certain transformation that finally 
>>>>>>>>>>>>>> updates a
>>>>>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application 
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 
>>>>>>>>>>>>>> application
>>>>>>>>>>>>>> could check the state of the values in this collection. Is there 
>>>>>>>>>>>>>> a way to
>>>>>>>>>>>>>> do this in Flink?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>> CVP
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to