各位大神,你们好:
最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束
窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束
时发送这个数据,应该如何处理?万分感激
// 这里配置了kafka的信息,并进行数据流的输入
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<RfidRawData> kafkaSource = new
FlinkKafkaConsumer010<>("rfid-input-topic",
new RfidRawDataSchema(), props);
kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());
DataStream<RfidRawData> dataStream = env.addSource(kafkaSource);
// 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写
的是窗口融合的方法
DataStream<RfidRawData> outputStream = dataStream.keyBy("uniqueId")
.window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new
RfidReduceFunction());
//通过kafka数据流的输出
outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new
RfidRawDataSchema(), props));
try {
env.execute("Flink add data source");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}