This worked when I ran my test code locally, but I'm seeing nothing reach my 
sink when I try to run this in YARN (previously, when I just echo'ed all sums 
to my sink, it would work).

Here's what my code looks like:

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer09<MirrorMessageRequest> consumer = new 
FlinkKafkaConsumer09<>(
                INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
        env.enableCheckpointing(5000);

        // this (or event time) is required in order to do the double-windowing 
below
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<String> stream = env
                .addSource(consumer)
                .flatMap(new CountRequests())
                .keyBy(0, 1)
                .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, 
TimeUnit.SECONDS))
                .sum(2)
                .timeWindowAll(Time.of(5, TimeUnit.SECONDS))
                .apply(new TopK(20))
                .map(new ToString<List<Tuple3<String, String, Integer>>>());
        stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new 
SimpleStringSchema(),
                properties));
        env.execute(TASK_NAME);

Note that CountRequests produces Tuple3<String, String, Integer>, TopK is an 
AllWindowFunction that produces List<Tuple3<String, String, Integer>>, and 
ToString is a MapFunction that is just a wrapper on Object#toString().

Anything obvious that I'm doing wrong?
________________________________
> From: aljos...@apache.org 
> Date: Fri, 1 Apr 2016 09:41:12 +0000 
> Subject: Re: Multiple operations on a WindowedStream 
> To: user@flink.apache.org 
> 
> Hi, 
> if you are using ingestion-time (or event-time) as your stream time 
> characteristic, i.e.: 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or 
> TimeCharacteristic.EventTime 
> 
> you can apply several window transforms after another and they will 
> apply the same "time window" because they work on the element 
> timestamps. What you can then do is have a window that does the 
> aggregation and then another one (that has to be global) to select the 
> top elements: 
> 
> result = input 
> .keyBy(<some key>) 
> .timeWindow(Time.minutes(1), Time.seconds(5)) 
> .sum(2) 
> .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding 
> window here, because the above will output a new window every 5 seconds 
> .apply(<my custom window function>) 
> 
> I hope this helps. 
> 
> Cheers, 
> Aljoscha 
> 
> On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan 
> <balaji.rajagopa...@olacabs.com<mailto:balaji.rajagopa...@olacabs.com>> 
> wrote: 
> I had a similar use case and ended writing the aggregation logic in the 
> apply function, could not find any better solution. 
> 
> On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala 
> <kana...@hotmail.com<mailto:kana...@hotmail.com>> wrote: 
> Hi, 
> 
> I would like to write something that does something like a word count, 
> and then emits only the 10 highest counts for that window. Logically, I 
> would want to do something like: 
> 
> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, 
> TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) 
> 
> However, the window context is lost after I do the sum aggregation. Is 
> there a straightforward way to express this logic in Flink 1.0? One way 
> I can think of is to have a complex function in apply() that has state, 
> but I would like to know if there is something a little cleaner than 
> that. 
> 
> Thanks, 
> Kanak 
> 
                                          

Reply via email to