Then you should be using a process based time window, in your case: TumblingProcessingTimeWindows

See https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/ for more info

On 31.01.22 23:13, John Smith wrote:
Hi Dario, I don't care about event time I just want to do tumbling window over the "processing time" I.e: count whatever I have in the last 5 minutes.

On Mon, 31 Jan 2022 at 17:09, Dario Heinisch <dario.heini...@gmail.com> wrote:

    Hi John

    This is because you are using event time
    (TumblingEventTimeWinodws) but you do not have a event time
    watermark strategy.
    It is also why I opened:
    https://issues.apache.org/jira/browse/FLINK-24623 because I feel
    like Flink should be throwing an exception in that case
    on startup.

    Take a look at the documentation at:
    
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/

    which should have everything.

    > In order to work with event time, Flink needs to know the events
    timestamps, meaning each element in the stream needs to have its
    event timestamp assigned. This is usually done by
    accessing/extracting the timestamp from > some field in the
    element by using a TimestampAssigner.
    > Timestamp assignment goes hand-in-hand with generating
    watermarks, which tell the system about progress in event time.
    You can configure this by specifying a WatermarkGenerator.

    Best regards,

    Dario

    On 31.01.22 22:28, John Smith wrote:
    Hi I have the following job... I'm expecting the
    System.out.println(key.toString());  to at least print, but
    nothing prints.

    - .flatMap: Fires prints my debug message once as expected.
    - .keyBy: Also fires, but prints my debug message twice.
    - .apply: Doesn't seem to fire. The debug statement doesn't seem
    to print. I'm expecting it to print the key from above keyBy.

    DataStream<MyEvent> slStream = env.fromSource(kafkaSource, 
WatermarkStrategy.noWatermarks(), "Kafka Source")
             .uid(kafkaTopic).name(kafkaTopic)
             .setParallelism(1)
             .flatMap(new MapToMyEvent("my-event", "message")) // <--- This 
works
             .uid("map-json-logs").name("map-json-logs"); slStream.keyBy(new 
MinutesKeySelector(windowSizeMins)) // <--- This prints twice
             .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
                     .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, 
String, String>, TimeWindow>() {
                 @Override public void apply(Tuple3<String, String, String> key, TimeWindow 
window, Iterable<MyEvent> input, Collector<MyEvent> out)throws Exception {
                     // This should print. System.out.println(key.toString()); 
// Do nothing for now }
             })
             .uid("process").name("process")
             ;

Reply via email to