Not sure if I got your requirements right, but would this work? KeyedStream<String, String> ks1 = ds1.keyBy("*") ; KeyedStream<Tuple2<String, V>, String> ks2 = ds2.flatMap(split T into k-v pairs).keyBy(0);
ks1.connect(ks2).flatMap(X) X is a CoFlatMapFunction that inserts and removes elements from ks2 into a key-value state member. Elements from ks1 are matched against that state. Best, Fabian 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com>: > Hi Fabian, > > First of all thanks for all your prompt responses. With regards to 2) > Multiple looks ups, I have to clarify what I mean by that... > > DS1<String> elementKeyStream = stream1.map(String<>); this maps each > of the streaming elements into string mapped value... > DS2<T> = stream2.xxx(); // where stream2 is a kafka source stream, > as you proposed.. xxx() should be my function() which splits the string and > generates key1:<value1>, key2:<value2>, key3:<value3> ....keyN:<value4> > > Now, > I wish to map elementKeyStream with look ups within (key1, > key2...keyN) where key1, key2.. keyN and their respective values should be > available across the cluster... > > Thanks a million ! > CVP > > > > > On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> That depends. >> 1) Growing/Shrinking: This should work. New entries can always be >> inserted. In order to remove entries from the k-v-state you have to set the >> value to null. Note that you need an explicit delete-value record to >> trigger the eviction. >> 2) Multiple lookups: This does only work if all lookups are independent >> from each other. You can partition DS1 only on a single key and the other >> keys might be located on different shards. A workaround might be to >> duplicate S1 events for each key that they need to look up. However, you >> might need to collect events from the same S1 event after the join. If that >> does not work for you, the only thing that comes to my mind is to broadcast >> the state and keep a full local copy in each operator. >> >> Let me add one more thing regarding the upcoming rescaling feature. If >> this is interesting for you, rescaling will also work (maybe not in the >> first version) for broadcasted state, i.e. state which is the same on all >> parallel operator instances. >> >> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga <chakravarth...@gmail.com> >> : >> >>> I'm understanding this better with your explanation.. >>> With this use case, each element in DS1 has to look up against a >>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no., >>> of keys.... will the key-value shard work in this case? >>> >>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Operator state is always local in Flink. However, with key-value state, >>>> you can have something which behaves kind of similar to a distribute >>>> hashmap, because each operator holds a different shard/partition of the >>>> hashtable. >>>> >>>> If you have to do only a single key lookup for each element of DS1, you >>>> should think about partitioning both streams (keyBy) and writing the state >>>> into Flink's key-value state [1]. >>>> >>>> This will have several benefits: >>>> 1) State does not need to be replicated >>>> 2) Depending on the backend (RocksDB) [2], parts of the state can >>>> reside on disk. You are not bound to the memory of the JVM. >>>> 3) Flink takes care of the look-up. No need to have your own hashmap. >>>> 4) It will only be possible to rescale jobs with key-value state (this >>>> feature is currently under development). >>>> >>>> If using the key-value state is possible, I'd go for that. >>>> >>>> Best, Fabian >>>> >>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>> apis/streaming/state.html >>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/ >>>> apis/streaming/state_backends.html >>>> >>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga < >>>> chakravarth...@gmail.com>: >>>> >>>>> certainly, what I thought as well... >>>>> The output of DataStream2 could be in 1000s and there are state >>>>> updates... >>>>> reading this topic from the other job, job1, is okie. >>>>> However, assuming that we maintain this state into a collection, and >>>>> updating the state (by reading from the topic) in this collection, will >>>>> this be replicated across the cluster within this job1 ? >>>>> >>>>> >>>>> >>>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Is writing DataStream2 to a Kafka topic and reading it from the other >>>>>> job an option? >>>>>> >>>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga < >>>>>> chakravarth...@gmail.com>: >>>>>> >>>>>>> Hi Fabian, >>>>>>> >>>>>>> Thanks for your response. Apparently these DataStream >>>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink >>>>>>> applications >>>>>>> running within the same cluster. >>>>>>> DataStream2 (from Job2) applies transformations and updates a >>>>>>> 'cache' on which (Job1) needs to work on. >>>>>>> Our intention is to not use the external key/value store as we >>>>>>> are trying to localize the cache within the cluster. >>>>>>> Is there a way? >>>>>>> >>>>>>> Best Regards >>>>>>> CVP >>>>>>> >>>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Flink does not provide shared state. >>>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such that >>>>>>>> each operator has its own local copy of the state. >>>>>>>> >>>>>>>> If that does not work for you because the state is too large and if >>>>>>>> it is possible to partition the state (and both streams), you can also >>>>>>>> use >>>>>>>> keyBy instead of broadcast. >>>>>>>> >>>>>>>> Finally, you can use an external system like a KeyValue Store or >>>>>>>> In-Memory store like Apache Ignite to hold your distributed collection. >>>>>>>> >>>>>>>> Best, Fabian >>>>>>>> >>>>>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga < >>>>>>>> chakravarth...@gmail.com>: >>>>>>>> >>>>>>>>> Hi Team, >>>>>>>>> >>>>>>>>> Can someone help me here? Appreciate any response ! >>>>>>>>> >>>>>>>>> Best Regards >>>>>>>>> Varaga >>>>>>>>> >>>>>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga < >>>>>>>>> chakravarth...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Team, >>>>>>>>>> >>>>>>>>>> I'm working on a Flink Streaming application. The data is >>>>>>>>>> injected through Kafka connectors. The payload volume is roughly >>>>>>>>>> 100K/sec. >>>>>>>>>> The event payload is a string. Let's call this as DataStream1. >>>>>>>>>> This application also uses another DataStream, call it >>>>>>>>>> DataStream2, (consumes events off a kafka topic). The elements of >>>>>>>>>> this >>>>>>>>>> DataStream2 involves in a certain transformation that finally >>>>>>>>>> updates a >>>>>>>>>> Hashmap(/Java util Collection). Apparently the flink application >>>>>>>>>> should >>>>>>>>>> share this HashMap across the flink cluster so that DataStream1 >>>>>>>>>> application >>>>>>>>>> could check the state of the values in this collection. Is there a >>>>>>>>>> way to >>>>>>>>>> do this in Flink? >>>>>>>>>> >>>>>>>>>> I don't see any Shared Collection used within the cluster? >>>>>>>>>> >>>>>>>>>> Best Regards >>>>>>>>>> CVP >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >