Hi John

This is because you are using event time (TumblingEventTimeWinodws) but you do not have a event time watermark strategy. It is also why I opened: https://issues.apache.org/jira/browse/FLINK-24623 because I feel like Flink should be throwing an exception in that case
on startup.

Take a look at the documentation at: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
which should have everything.

> In order to work with event time, Flink needs to know the events timestamps, meaning each element in the stream needs to have its event timestamp assigned. This is usually done by accessing/extracting the timestamp from > some field in the element by using a TimestampAssigner. > Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about progress in event time. You can configure this by specifying a WatermarkGenerator.

Best regards,

Dario

On 31.01.22 22:28, John Smith wrote:
Hi I have the following job... I'm expecting the System.out.println(key.toString());  to at least print, but nothing prints.

- .flatMap: Fires prints my debug message once as expected.
- .keyBy: Also fires, but prints my debug message twice.
- .apply: Doesn't seem to fire. The debug statement doesn't seem to print. I'm expecting it to print the key from above keyBy.

DataStream<MyEvent> slStream = env.fromSource(kafkaSource, 
WatermarkStrategy.noWatermarks(), "Kafka Source")
         .uid(kafkaTopic).name(kafkaTopic)
         .setParallelism(1)
         .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
         .uid("map-json-logs").name("map-json-logs"); slStream.keyBy(new 
MinutesKeySelector(windowSizeMins)) // <--- This prints twice
         .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
                 .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, String, 
String>, TimeWindow>() {
             @Override public void apply(Tuple3<String, String, String> key, TimeWindow 
window, Iterable<MyEvent> input, Collector<MyEvent> out)throws Exception {
                 // This should print. System.out.println(key.toString()); // 
Do nothing for now }
         })
         .uid("process").name("process")
         ;

Reply via email to