您好:
请教一个问题,
例如:开启一个5秒钟的滚动窗口,当key001的两条数据进来时,没有满足时间触发,但是当key002的数据进来满足窗口触发条件,会将key001的两条数据输出出去。

我想实现的是一个基于事件时间设置的滚动窗口,当key001的数据到来时,没有满足时间时,不会因为key002的数据到来触发key001的数据进行输出。
每个key都有一个属于自己的时间窗口,不会受其他分组key的影响,并且可以为每个key的时间窗口设置一个基于数量和时间的触发器,当满足数量时触发或者时间到了触发。

经过测试发现,现在设置的时间窗口里面会有不同key的数据在一起
每个分组是否有属于自己的时间窗口。


数量窗口的逻辑是每个key都有一个属于自己key的数量窗口,
例如:设置一个数量为3的滚动窗口,输入1,2,3,4,不会触发窗口执行,但是继续输入两条1的数据,会输出三个1的数据。

请问时间窗口可以实现类似数量窗口这样的逻辑吗。

public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataSource = env.socketTextStream("localhost", 
7788);
        SingleOutputStreamOperator<OrderItem> map = dataSource.map(new 
MapFunction<String, OrderItem>() {
            @Override
            public OrderItem map(String s) throws Exception {
                String[] split = s.split(",");
                return new OrderItem(split[0].trim(), 
Double.parseDouble(split[1].trim()), Long.parseLong(split[2].trim()));
            }
        });

        // 时间窗口测试代码
        SingleOutputStreamOperator<OrderItem> warter = 
map.assignTimestampsAndWatermarks(
                
WatermarkStrategy.<OrderItem>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((event, timestamp) -> 
event.getTimeStamp()));
        SingleOutputStreamOperator<String> timeWindow = warter.keyBy(data -> 
data.getOrderId())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<OrderItem, String, String, 
TimeWindow>() {
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");

                    @Override
                    public void process(String key,
                                        ProcessWindowFunction<OrderItem, 
String, String, TimeWindow>.Context context,
                                        Iterable<OrderItem> iterable,
                                        Collector<String> collector) throws 
Exception {
                        Iterator<OrderItem> iterator = iterable.iterator();
                        StringBuilder sb = new StringBuilder();
                        sb.append("key -> " + key + "窗口开始时间:" + sdf.format(new 
Date(context.window().getStart())) + "\t\n");
                        while (iterator.hasNext()) {
                            OrderItem next = iterator.next();
                            sb.append(next + "\t\n");
                        }
                        sb.append("窗口结束时间:" + sdf.format(new 
Date(context.window().getEnd())));
                        collector.collect(sb.toString());
                    }
                });

        timeWindow.print();
        env.execute();

    }
}

回复