Hi! In the above example the keySelector would run once before and once inside the window operator. In that sense, the version below is a better way to do it.
You can also create windows of 50 or max 100 ms by writing your own trigger. Have a look at the count trigger. You can augment it by scheduling a time callback for 100ms to trigger the window. https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java The better version of the "random key" program: stream .map(new MapFunction<SocialData, Tuple2<SocialData, Integer>>() { private int key; @Override public Tuple2<SocialData, Integer>map(SocialData data) { if (++key >= 24) { key = 0; } return new Tuple2<>(key, data); } }) .keyBy(0) .timeWindow(Time.milliseconds(100)) .apply(...) Greetings, Stephan On Wed, Aug 10, 2016 at 3:54 PM, Andrew Ge Wu <andrew.ge...@eniro.com> wrote: > Hi Stephan > > Thanks for the explanation! We will stick to 1.0.3 to keep our code clean. > In the workaround case, how does key selector instantiated? One instance > per window operator? > By the way is there a way to create a hybrid window of count and time, > like 50 items *or* max process time 100ms? > > > Thanks! > > Andrew > > On 10 Aug 2016, at 15:33, Stephan Ewen <se...@apache.org> wrote: > > Hi Andrew! > > Here is the reason for what is happening with your job: > > You have used some sort of undocumented and unofficial corner case > behavior of Flink 1.0.0, namely, using parallel windowAll(). > Initially, windowAll() was supposed to not be parallel, but the system did > not prevent to set a parallelism. > > In Flink 1.0.0 it just happened that a parallel windowAll() behaved like a > "window over stream partition". > In Flink 1.1.0, the parallel windowAll() really sends all data to one of > the parallel operators, and the others are idle. Admittedly, Flink 1.1.0 > should simply not allow to set a parallelism on windowAll() - we will fix > that. > > What we need to figure out now is how to have an adequate replacement for > the "window over stream partition" use case. I think we need to add an > explicit "windowPartition()" function for that case. > > Until then, you could stay on Flink 1.0.3 or you can try and use instead > of "windowAll()" a "keyBy().window()" operator and use an incrementing > number%24 as a key (would not be perfectly balanced, but a temporary > workaround): > > stream > .keyBy(new KeySelector<SocialData, Integer>() { > private int key; > > @Override > public Integer getKey(SocialData data) { > if (++key >= 24) { > key = 0; > } > return key; > } > }) > .timeWindow(Time.milliseconds(100)) > .apply(...) > > > Sorry for the inconvenience! > > Greetings, > Stephan > > > > On Wed, Aug 10, 2016 at 1:15 PM, Andrew Ge Wu <andrew.ge...@eniro.com> > wrote: > >> Hi Aljoscha >> >> We are not using state backend explicitly, recovery and state backend are >> pointed to file path. >> See attached json file >> >> Confidentiality Notice: This e-mail transmission may contain confidential >> or legally privileged information that is intended only for the individual >> or entity named in the e-mail address. If you are not the intended >> recipient, you are hereby notified that any disclosure, copying, >> distribution, or reliance upon the contents of this e-mail is strictly >> prohibited and may be unlawful. If you have received this e-mail in error, >> please notify the sender immediately by return e-mail and delete all copies >> of this message. >> >> Thanks for the help. >> >> >> Best regards >> >> >> Andrew >> >> On 10 Aug 2016, at 11:38, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> Oh, are you by any chance specifying a custom state backend for your job? >> For example, RocksDBStateBackend. >> >> Cheers, >> Aljoscha >> >> On Wed, 10 Aug 2016 at 11:17 Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> could you maybe send us the output of "env.getExecutionPlan()". This >>> would help us better understand which operators are used exactly. (You can >>> of course remove any security sensitive stuff.) >>> >>> Cheers, >>> Aljoscha >>> >>> On Tue, 9 Aug 2016 at 15:30 Andrew Ge Wu <andrew.ge...@eniro.com> wrote: >>> >>>> Oh sorry missed that part, no, Im not explicitly set that. >>>> >>>> >>>> On 09 Aug 2016, at 15:29, Aljoscha Krettek <aljos...@apache.org> wrote: >>>> >>>> Hi, >>>> are you setting a StreamTimeCharacteristic, i.e. >>>> env.setStreamTimeCharacteristic? >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Tue, 9 Aug 2016 at 14:52 Andrew Ge Wu <andrew.ge...@eniro.com> >>>> wrote: >>>> >>>>> Hi Aljoscha >>>>> >>>>> >>>>> Plan attached, there are split streams and union operations around, >>>>> but here is how windows are created >>>>> >>>>> Confidentiality Notice: This e-mail transmission may contain >>>>> confidential or legally privileged information that is intended only for >>>>> the individual or entity named in the e-mail address. If you are not the >>>>> intended recipient, you are hereby notified that any disclosure, copying, >>>>> distribution, or reliance upon the contents of this e-mail is strictly >>>>> prohibited and may be unlawful. If you have received this e-mail in error, >>>>> please notify the sender immediately by return e-mail and delete all >>>>> copies >>>>> of this message. >>>>> >>>>> Let me know if I’m doing something out of ordinary here. >>>>> >>>>> >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> Andrew >>>>> >>>>> On 09 Aug 2016, at 14:18, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>> >>>>> Hi, >>>>> could you maybe post how exactly you specify the window? Also, did you >>>>> set a "stream time characteristic", for example EventTime? >>>>> >>>>> That could help us pinpoint the problem. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Tue, 9 Aug 2016 at 12:42 Andrew Ge Wu <andrew.ge...@eniro.com> >>>>> wrote: >>>>> >>>>> I rolled back to 1.0.3 >>>>>> >>>>> If I understand this correctly, the peak when topology starts is >>>>>> because it is trying to fill all the buffers, but I can not see that in >>>>>> 1.1.0. >>>>>> >>>>>> >>>>>> >>>>>> On 09 Aug 2016, at 12:10, Robert Metzger <rmetz...@apache.org> wrote: >>>>>> >>>>>> Which source are you using? >>>>>> >>>>>> On Tue, Aug 9, 2016 at 11:50 AM, Andrew Ge Wu <andrew.ge...@eniro.com >>>>>> > wrote: >>>>>> >>>>>>> Hi Robert >>>>>>> >>>>>>> >>>>>>> Thanks for the quick reply, I guess I’m one of the early birds. >>>>>>> Yes, it is much slower, I’m not sure why, I copied slaves, masters, >>>>>>> log4j.properties and flink-conf.yaml directly from 1.0.3 >>>>>>> I have parallelization 1 on my sources, I can increase that to >>>>>>> achieve the same speed, but I’m interested to know why is that. >>>>>>> >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> Andrew >>>>>>> >>>>>>> On 09 Aug 2016, at 11:47, Robert Metzger <rmetz...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Andrew, >>>>>>> >>>>>>> here is the release announcement, with a list of all changes: >>>>>>> http://flink.apache.org/news/2016/08/08/release-1.1.0.html, >>>>>>> http://flink.apache.org/blog/release_1.1.0-changelog.html >>>>>>> >>>>>>> What does the chart say? Are the results different? is Flink faster >>>>>>> or slower now? >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Robert >>>>>>> >>>>>>> On Tue, Aug 9, 2016 at 11:32 AM, Andrew Ge Wu < >>>>>>> andrew.ge...@eniro.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> We found out there is a new stable version released: 1.1.0 but we >>>>>>>> can not find any release note. >>>>>>>> Do anyone know where to find it? >>>>>>>> >>>>>>>> >>>>>>>> We are experience some change of behavior, I’m not sure if it is >>>>>>>> related. >>>>>>>> >>>>>>>> <PastedGraphic-1.png> >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>>>> Andrew >>>>>>>> >>>>>>>> Confidentiality Notice: This e-mail transmission may contain >>>>>>>> confidential or legally privileged information that is intended only >>>>>>>> for >>>>>>>> the individual or entity named in the e-mail address. If you are not >>>>>>>> the >>>>>>>> intended recipient, you are hereby notified that any disclosure, >>>>>>>> copying, >>>>>>>> distribution, or reliance upon the contents of this e-mail is strictly >>>>>>>> prohibited and may be unlawful. If you have received this e-mail in >>>>>>>> error, >>>>>>>> please notify the sender immediately by return e-mail and delete all >>>>>>>> copies >>>>>>>> of this message. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Confidentiality Notice: This e-mail transmission may contain >>>>>>> confidential or legally privileged information that is intended only for >>>>>>> the individual or entity named in the e-mail address. If you are not the >>>>>>> intended recipient, you are hereby notified that any disclosure, >>>>>>> copying, >>>>>>> distribution, or reliance upon the contents of this e-mail is strictly >>>>>>> prohibited and may be unlawful. If you have received this e-mail in >>>>>>> error, >>>>>>> please notify the sender immediately by return e-mail and delete all >>>>>>> copies >>>>>>> of this message. >>>>>>> >>>>>> >>>>>> >>>>>> Confidentiality Notice: This e-mail transmission may contain >>>>>> confidential or legally privileged information that is intended only for >>>>>> the individual or entity named in the e-mail address. If you are not the >>>>>> intended recipient, you are hereby notified that any disclosure, copying, >>>>>> distribution, or reliance upon the contents of this e-mail is strictly >>>>>> prohibited and may be unlawful. If you have received this e-mail in >>>>>> error, >>>>>> please notify the sender immediately by return e-mail and delete all >>>>>> copies >>>>>> of this message. >>>>> >>>>> >>>> >>>> Confidentiality Notice: This e-mail transmission may contain >>>> confidential or legally privileged information that is intended only for >>>> the individual or entity named in the e-mail address. If you are not the >>>> intended recipient, you are hereby notified that any disclosure, copying, >>>> distribution, or reliance upon the contents of this e-mail is strictly >>>> prohibited and may be unlawful. If you have received this e-mail in error, >>>> please notify the sender immediately by return e-mail and delete all copies >>>> of this message. >>> >>> >> >> > > > Confidentiality Notice: This e-mail transmission may contain confidential > or legally privileged information that is intended only for the individual > or entity named in the e-mail address. If you are not the intended > recipient, you are hereby notified that any disclosure, copying, > distribution, or reliance upon the contents of this e-mail is strictly > prohibited and may be unlawful. If you have received this e-mail in error, > please notify the sender immediately by return e-mail and delete all copies > of this message. >