您好:
请教一个问题,
例如:开启一个5秒钟的滚动窗口,当key001的两条数据进来时,没有满足时间触发,但是当key002的数据进来满足窗口触发条件,会将key001的两条数据输出出去。
我想实现的是一个基于事件时间设置的滚动窗口,当key001的数据到来时,没有满足时间时,不会因为key002的数据到来触发key001的数据进行输出。
每个key都有一个属于自己的时间窗口,不会受其他分组key的影响,并且可以为每个key的时间窗口设置一个基于数量和时间的触发器,当满足数量时触发或者时间到了触发。
经过测试发现,现在设置的时间窗口里面会有不同key的数据在一起
每个分组是否有属于自己的时间窗口。
数量窗口的逻辑是每个key都有一个属于自己key的数量窗口,
例如:设置一个数量为3的滚动窗口,输入1,2,3,4,不会触发窗口执行,但是继续输入两条1的数据,会输出三个1的数据。
请问时间窗口可以实现类似数量窗口这样的逻辑吗。
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataSource = env.socketTextStream("localhost",
7788);
SingleOutputStreamOperator<OrderItem> map = dataSource.map(new
MapFunction<String, OrderItem>() {
@Override
public OrderItem map(String s) throws Exception {
String[] split = s.split(",");
return new OrderItem(split[0].trim(),
Double.parseDouble(split[1].trim()), Long.parseLong(split[2].trim()));
}
});
// 时间窗口测试代码
SingleOutputStreamOperator<OrderItem> warter =
map.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderItem>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) ->
event.getTimeStamp()));
SingleOutputStreamOperator<String> timeWindow = warter.keyBy(data ->
data.getOrderId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<OrderItem, String, String,
TimeWindow>() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
@Override
public void process(String key,
ProcessWindowFunction<OrderItem,
String, String, TimeWindow>.Context context,
Iterable<OrderItem> iterable,
Collector<String> collector) throws
Exception {
Iterator<OrderItem> iterator = iterable.iterator();
StringBuilder sb = new StringBuilder();
sb.append("key -> " + key + "窗口开始时间:" + sdf.format(new
Date(context.window().getStart())) + "\t\n");
while (iterator.hasNext()) {
OrderItem next = iterator.next();
sb.append(next + "\t\n");
}
sb.append("窗口结束时间:" + sdf.format(new
Date(context.window().getEnd())));
collector.collect(sb.toString());
}
});
timeWindow.print();
env.execute();
}
}