Not sure if I got your requirements right, but would this work?

KeyedStream<String, String> ks1 = ds1.keyBy("*") ;
KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v
pairs).keyBy(0);

ks1.connect(ks2).flatMap(X)

X is a CoFlatMapFunction that inserts and removes elements from ks2 into a
key-value state member. Elements from ks1 are matched against that state.

Best, Fabian

2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com>:

> Hi Fabian,
>
>      First of all thanks for all your prompt responses. With regards to 2)
> Multiple looks ups, I have to clarify what I mean by that...
>
>      DS1<String> elementKeyStream  = stream1.map(String<>); this maps each
> of the streaming elements into string mapped value...
>      DS2<T>  = stream2.xxx(); // where stream2 is a kafka source stream,
> as you proposed.. xxx() should be my function() which splits the string and
> generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4>
>
>      Now,
>         I wish to map elementKeyStream with look ups within (key1,
> key2...keyN) where key1, key2.. keyN and their respective values should be
> available across the cluster...
>
> Thanks a million !
> CVP
>
>
>
>
> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> That depends.
>> 1) Growing/Shrinking: This should work. New entries can always be
>> inserted. In order to remove entries from the k-v-state you have to set the
>> value to null. Note that you need an explicit delete-value record to
>> trigger the eviction.
>> 2) Multiple lookups: This does only work if all lookups are independent
>> from each other. You can partition DS1 only on a single key and the other
>> keys might be located on different shards. A workaround might be to
>> duplicate S1 events for each key that they need to look up. However, you
>> might need to collect events from the same S1 event after the join. If that
>> does not work for you, the only thing that comes to my mind is to broadcast
>> the state and keep a full local copy in each operator.
>>
>> Let me add one more thing regarding the upcoming rescaling feature. If
>> this is interesting for you, rescaling will also work (maybe not in the
>> first version) for broadcasted state, i.e. state which is the same on all
>> parallel operator instances.
>>
>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com>
>> :
>>
>>> I'm understanding this better with your explanation..
>>> With this use case,    each element in DS1 has to look up against a
>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>> of keys.... will the key-value shard work in this case?
>>>
>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Operator state is always local in Flink. However, with key-value state,
>>>> you can have something which behaves kind of similar to a distribute
>>>> hashmap, because each operator holds a different shard/partition of the
>>>> hashtable.
>>>>
>>>> If you have to do only a single key lookup for each element of DS1, you
>>>> should think about partitioning both streams (keyBy) and writing the state
>>>> into Flink's key-value state [1].
>>>>
>>>> This will have several benefits:
>>>> 1) State does not need to be replicated
>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>> reside on disk. You are not bound to the memory of the JVM.
>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>> 4) It will only be possible to rescale jobs with key-value state (this
>>>> feature is currently under development).
>>>>
>>>> If using the key-value state is possible, I'd go for that.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> apis/streaming/state.html
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> apis/streaming/state_backends.html
>>>>
>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>> chakravarth...@gmail.com>:
>>>>
>>>>> certainly, what I thought as well...
>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>> updates...
>>>>> reading this topic from the other job, job1, is okie.
>>>>> However, assuming that we maintain this state into a collection, and
>>>>> updating the state (by reading from the topic) in this collection, will
>>>>> this be replicated across the cluster within this job1 ?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Is writing DataStream2 to a Kafka topic and reading it from the other
>>>>>> job an option?
>>>>>>
>>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>>>> chakravarth...@gmail.com>:
>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>>     Thanks for your response. Apparently these DataStream
>>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink 
>>>>>>> applications
>>>>>>> running within the same cluster.
>>>>>>>     DataStream2 (from Job2) applies transformations and updates a
>>>>>>> 'cache' on which (Job1) needs to work on.
>>>>>>>     Our intention is to not use the external key/value store as we
>>>>>>> are trying to localize the cache within the cluster.
>>>>>>>     Is there a way?
>>>>>>>
>>>>>>> Best Regards
>>>>>>> CVP
>>>>>>>
>>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Flink does not provide shared state.
>>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>>>>>> each operator has its own local copy of the state.
>>>>>>>>
>>>>>>>> If that does not work for you because the state is too large and if
>>>>>>>> it is possible to partition the state (and both streams), you can also 
>>>>>>>> use
>>>>>>>> keyBy instead of broadcast.
>>>>>>>>
>>>>>>>> Finally, you can use an external system like a KeyValue Store or
>>>>>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi Team,
>>>>>>>>>
>>>>>>>>>      Can someone help me here? Appreciate any response !
>>>>>>>>>
>>>>>>>>> Best Regards
>>>>>>>>> Varaga
>>>>>>>>>
>>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>>>>>> chakravarth...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Team,
>>>>>>>>>>
>>>>>>>>>>     I'm working on a Flink Streaming application. The data is
>>>>>>>>>> injected through Kafka connectors. The payload volume is roughly 
>>>>>>>>>> 100K/sec.
>>>>>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>>>>>> This application also uses another DataStream, call it
>>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of 
>>>>>>>>>> this
>>>>>>>>>> DataStream2 involves in a certain transformation that finally 
>>>>>>>>>> updates a
>>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application 
>>>>>>>>>> should
>>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 
>>>>>>>>>> application
>>>>>>>>>> could check the state of the values in this collection. Is there a 
>>>>>>>>>> way to
>>>>>>>>>> do this in Flink?
>>>>>>>>>>
>>>>>>>>>>     I don't see any Shared Collection used within the cluster?
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>> CVP
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to