Hi John
This is because you are using event time (TumblingEventTimeWinodws) but
you do not have a event time watermark strategy.
It is also why I opened:
https://issues.apache.org/jira/browse/FLINK-24623 because I feel like
Flink should be throwing an exception in that case
on startup.
Take a look at the documentation at:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
which should have everything.
> In order to work with event time, Flink needs to know the events
timestamps, meaning each element in the stream needs to have its event
timestamp assigned. This is usually done by accessing/extracting the
timestamp from > some field in the element by using a TimestampAssigner.
> Timestamp assignment goes hand-in-hand with generating watermarks,
which tell the system about progress in event time. You can configure
this by specifying a WatermarkGenerator.
Best regards,
Dario
On 31.01.22 22:28, John Smith wrote:
Hi I have the following job... I'm expecting the
System.out.println(key.toString()); to at least print, but nothing
prints.
- .flatMap: Fires prints my debug message once as expected.
- .keyBy: Also fires, but prints my debug message twice.
- .apply: Doesn't seem to fire. The debug statement doesn't seem to
print. I'm expecting it to print the key from above keyBy.
DataStream<MyEvent> slStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source")
.uid(kafkaTopic).name(kafkaTopic)
.setParallelism(1)
.flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
.uid("map-json-logs").name("map-json-logs"); slStream.keyBy(new
MinutesKeySelector(windowSizeMins)) // <--- This prints twice
.window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
.apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, String,
String>, TimeWindow>() {
@Override public void apply(Tuple3<String, String, String> key, TimeWindow
window, Iterable<MyEvent> input, Collector<MyEvent> out)throws Exception {
// This should print. System.out.println(key.toString()); //
Do nothing for now }
})
.uid("process").name("process")
;