This worked when I ran my test code locally, but I'm seeing nothing reach my sink when I try to run this in YARN (previously, when I just echo'ed all sums to my sink, it would work).
Here's what my code looks like: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer09<MirrorMessageRequest> consumer = new FlinkKafkaConsumer09<>( INPUT_TOPIC, new KafkaMessageDeserializer(), properties); env.enableCheckpointing(5000); // this (or event time) is required in order to do the double-windowing below env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<String> stream = env .addSource(consumer) .flatMap(new CountRequests()) .keyBy(0, 1) .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS)) .sum(2) .timeWindowAll(Time.of(5, TimeUnit.SECONDS)) .apply(new TopK(20)) .map(new ToString<List<Tuple3<String, String, Integer>>>()); stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(), properties)); env.execute(TASK_NAME); Note that CountRequests produces Tuple3<String, String, Integer>, TopK is an AllWindowFunction that produces List<Tuple3<String, String, Integer>>, and ToString is a MapFunction that is just a wrapper on Object#toString(). Anything obvious that I'm doing wrong? ________________________________ > From: aljos...@apache.org > Date: Fri, 1 Apr 2016 09:41:12 +0000 > Subject: Re: Multiple operations on a WindowedStream > To: user@flink.apache.org > > Hi, > if you are using ingestion-time (or event-time) as your stream time > characteristic, i.e.: > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or > TimeCharacteristic.EventTime > > you can apply several window transforms after another and they will > apply the same "time window" because they work on the element > timestamps. What you can then do is have a window that does the > aggregation and then another one (that has to be global) to select the > top elements: > > result = input > .keyBy(<some key>) > .timeWindow(Time.minutes(1), Time.seconds(5)) > .sum(2) > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding > window here, because the above will output a new window every 5 seconds > .apply(<my custom window function>) > > I hope this helps. > > Cheers, > Aljoscha > > On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan > <balaji.rajagopa...@olacabs.com<mailto:balaji.rajagopa...@olacabs.com>> > wrote: > I had a similar use case and ended writing the aggregation logic in the > apply function, could not find any better solution. > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala > <kana...@hotmail.com<mailto:kana...@hotmail.com>> wrote: > Hi, > > I would like to write something that does something like a word count, > and then emits only the 10 highest counts for that window. Logically, I > would want to do something like: > > stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, > TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) > > However, the window context is lost after I do the sum aggregation. Is > there a straightforward way to express this logic in Flink 1.0? One way > I can think of is to have a complex function in apply() that has state, > but I would like to know if there is something a little cleaner than > that. > > Thanks, > Kanak >