Understood! I have created a WindowStream and now it is working. Thanks !
El jueves, 9 de junio de 2016, Fabian Hueske <fhue...@gmail.com> escribió: > Hi, > > you are computing a running aggregate, i.e., you're getting one output > record for each input record and the output record is the record with the > largest value observed so far. > If the record with the largest value is the first, the record is sent out > another time. This is what happened with Match3 in your example. > > There are two ways to compute aggregates on streams: 1) a running > aggregate as you just did, or 2) a windowed aggregate. > For a windowed aggregate, you need to need to specify a window. The window > can be time or count based. > The following blog post should be a good introduction into Flink's window > support [1]. > > Best, Fabian > > [1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html > > 2016-06-09 14:36 GMT+02:00 iñaki williams <juanramall...@gmail.com > <javascript:_e(%7B%7D,'cvml','juanramall...@gmail.com');>>: > >> Hi again! >> >> I am working with two DataStreams, I want to get the maximun value from >> each pair of them, for example: >> >> //Informacion (matchName, LocalOdd, AwayOdd) >> >> Informacion info1= new Informacion("Match1", 1.10, 3.22); >> Informacion info2= new Informacion("Match2", 2.11, 1.10); >> Informacion info3= new Informacion("Match3", 4.10, 1.05); >> >> Informacion info11= new Informacion("Match1", 1.80, 2.20); >> Informacion info22= new Informacion("Match2", 3.10, 1.15); >> Informacion info33= new Informacion("Match3", 2.12, 1.25); >> >> >> DataStream<Informacion> src = see.fromElements(info1,info2, >> info3); >> DataStream<Informacion> src2 = >> see.fromElements(info11,info22,info33); >> DataStream<Informacion> src3= src.union(src2); >> >> DataStream<Informacion> maxLocal = >> src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal"); >> >> maxLocal.print(); >> >> >> >> Let's suppose that those are tennis matches with their names and their >> bet odds, and the name of the matches are the same on both streams, I mean >> Match1=Match1 , Match2=Match2 .... (Image that match 1 name is "Rafa Nadal >> - Roger Federer"). >> >> >> I want to get the maximun localOdd from matches with the same name, the >> result of my code is: >> >> >> 1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1] >> 1> *Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]* >> 1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15] >> 1>* Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]* >> 4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22] >> 4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2] >> >> It seems like it is taking the biggest value from all the matches and not >> by keyed matches >> >> >> I am looking for this: >> >> >> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2] >> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15] >> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05] >> >> >> >> How can I get it? >> >> >> Thanks in advanced >> >> >> >> >> >