Hi again, and again sorry for the late response.
Regarding your first question: You can use a Key Selector Function [1]. Regarding your second question: If I understand your requirement correctly, this is already happening in my gist. By taking the union of both streams the local and away max are taken over both streams. The coFlatMap holds the currentAwayMax and currentAwayLocal and evaluates your condition if either of the maximum values changes. Does this help? Cheers, Konstantin [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-keys On 02.06.2016 23:20, iñaki williams wrote: > Hi again! Thanks for your tips and gists, those are being really > helpful. However, I probably didn't express my idea properly and it has > been a litle misunderstood. I have been thinking about how to do this > during these days and I will try to put a concrete example of what I > want and if my way is the correct one. I have made a new diagram with > "real" examples. > > > First question, as you can notice, "Match name" is quite similar but it > is not always the same. Name from DataStream 1 could be "Rafa Nadal" > while in the 2nd DataStream the name of the match could be "R. Nadal", > so is there any way to rewrite the .keyby() method in order to use a > library that compares Strings and match it according to similarities and > not because of the exact name?. > > Second question, In the case that I could key those tennis matches, when > I am doing the CoFlatMap and having these two matches for example: > > DataStream1 <"Rafa Nadal - Roger Federer", 1.90, 2.10> > DataStream2 <"Rafa Nadal - Roger Federer", 2.20, 1.80> > > I would like to take the biggest values from both fields, in this case > it should be: 2.20 and 2.10, being the final result as: > <"Rafa Nadal - Roger Federer", 2.20, 2.10>. > > I don't know if I am mistaken but, I think I could use the valueState to > save those values and compare it? > > Thanks for your time! :) > Very Grateful. > > > 2016-05-29 17:32 GMT+02:00 Konstantin Knauf > <konstantin.kn...@tngtech.com <mailto:konstantin.kn...@tngtech.com>>: > > Hi again, > > from your diagram I have put together a gist, which I think does the > job. I haven't had the time to test it though :( > > https://gist.github.com/knaufk/d1312503b99ee51554a70c9a22abe7e5 > > If you have any questions, let me know. It sometimes just takes a while > until I answer ;) > > Cheers, > > Konstantin > > On 28.05.2016 13:49, iñaki williams wrote: > > Hello again! :) > > > > I have been checking the solution that you proprosed and but I don't > > really get how the KeyValueState helps on it. Could you please explain > > it a little bmore? > > > > I have drawn a diagram to make what I want clear, notice that the middle > > table doesn't need to be a table, it is just what I want and I don't > > have enough knowledge on Flink to know how to do it. > > > > > > Thanks for your time! > > > > > > > > 2016-05-26 20:33 GMT+02:00 Konstantin Knauf > > <konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> > <mailto:konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com>>>: > > > > Hi, > > > > interesting use case, you are looking for sure bets, I guess ;) > > > > Well, I think, what you want to then is probably to use a > > ConnectedStream, which you keyBy the "name" of both streams. > > > > The you can use CoFlatMap for comparison. You can use a > KeyValueState zu > > save prices. In each map you can then check if you have a price for > this > > name already saved from the other stream and if not save the price. > The > > challenge will be to clean up state. > > > > Let me know, if this works out. > > > > Cheers, > > > > Konstantin > > > > On 26.05.2016 20 <tel:26.05.2016%2020> <tel:26.05.2016%2020>:01, > iñaki > williams wrote: > > > Hi! > > > > > > I will explain it with more details: > > > > > > I am comparing real time sport odds from two different betting > Webpages. > > > > > > Assuming that I get just one java object (in reality I should get > a List > > > of in-play matches), for each DataStream and assuming that the > name is > > > the same of course, what I want to do is compare both price > attributes > > > in "real time", I am only interested on the currently price, not > the > > > previous one. Example: > > > > > > What is the price for the Event 1 from website "X" RIGHT NOW? > > > > > > JavaObjectX.price > > > > > > What is the price for the Event 1 from website "Y" RIGHT NOW? > > > > > > JavaObjectY.price > > > > > > > > > Compare both attributes > > > Get a result depending on that comparison > > > > > > My java object doesn't have a timestamp, but I think I should use > it right? > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > 2016-05-26 19:48 GMT+02:00 Konstantin Knauf > > > <konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> > > <mailto:konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com>> > > <mailto:konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> > > <mailto:konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com>>>>: > > > > > > Hi, > > > > > > let me first check, if I understand your requirements > correctly. I > > > assume you want to compare attribute price for objects with > the same > > > name only, right? > > > > > > Further, I assume the objects are some kind of offer/bid with > a > > > timestamp? > > > > > > I think the solution heavily depends on how the records, > which should be > > > compared relate in time. So basically, if an object arrives > from one > > > source, which time window of objects from the other stream > should be > > > considered for comparison? > > > > > > Cheers, > > > > > > Konstantin > > > > > > On 26.05.2016 18 <tel:26.05.2016%2018> > <tel:26.05.2016%2018> <tel:26.05.2016%2018>:55, iñaki > > williams wrote: > > > > Hi! > > > > > > > > I am working on something quite similar to the stockPrice > example that > > > > is posted on the webpage > > > > > (https://flink.apache.org/news/2015/02/09/streaming-example.html) > > > > > > > > I am extracting some data from 2 different webpages and I > > > represent the > > > > result using a java object. The diagram could be something > like this: > > > > > > > > DataStream1--------JavaObject(name, price) --\ > > > > > \ > > > > > [ > > > > how to compare result?] > > > > > / > > > > DataStream2--------JavaObject(name, price) --/ > > > > > > > > > > > > What I want to do is to get the attribute price from both > data objects > > > > and compare it between each other / make some math > operations. For > > > > example, if the first JavaObject.price is bigger than the > second > > > > JavaObject.price, then show a message. > > > > > > > > > > > > Which is the (best) way of doing this? I am new using Flink > and I am > > > > quite lost :) > > > > > > > > > > > > Thanks! > > > > > > -- > > > Konstantin Knauf * konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> > <mailto:konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com>> > > > <mailto:konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> > > <mailto:konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com>>> * +49-174-3413182 > <tel:%2B49-174-3413182> > > <tel:%2B49-174-3413182> > > > <tel:%2B49-174-3413182> > > > TNG Technology Consulting GmbH, Betastr. 13a, 85774 > Unterföhring > > > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert > > Dahlke > > > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > > > > > > > > > -- > > Konstantin Knauf * konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> > > <mailto:konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com>> * +49-174-3413182 > <tel:%2B49-174-3413182> > > <tel:%2B49-174-3413182> > > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert > Dahlke > > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > > > > > -- > Konstantin Knauf * konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> * +49-174-3413182 > <tel:%2B49-174-3413182> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082