Hi All,
I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I
have understood that the gap is computed dynamically by a function on each
element. What I should be able to obtain is a Flink application that can
automatically manage the windows based on the frequency of the data. (if I have
understood correctly)
But I'm wondering if there is any parameter to adjust the computation to do
more windows or less windows considering the same data.
I have my event that provide "millis" of which I would like to pass to the
function but I don't understand how, for the moment I'm trying with the code
below but no luck.. Can you please give me some help? Thanks!
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_1", new
EventDeserializationSchema(), p);
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withIdleness(Duration.ofMinutes(1))
.withTimestampAssigner((event, timestamp) -> { return
event.get_Time();
});
DataStream<Event> stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));
DataStream<Event> Data = stream
.keyBy((Event ride) -> ride.CorrID)
.window(EventTimeSessionWindows.withDynamicGap((event)->{
return event.get_Time();}));
Where from the load of the message which i receive from Kafka i convert the
date time in millis.
public long get_Time() {
long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
this.millis = tn;
return millis;
}
public void set_a_t_rt(String a_t_rt) {
this.a_t_rt = a_t_rt;
}