感谢 Benchao,问题应解决了!

> 2020年4月15日 下午3:38,Benchao Li <libenc...@gmail.com> 写道:
> 
> Hi 建刚,
> 
> 现在Emit的原理是这样子的:
> - 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器;
> - 当定时器到了的时候,
>   - 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
>      - 如果有变化,就发送-[old], +[new] 两条结果到下游;
>      - 如果是没有变化,则不做任何处理;
>   - 再次注册一个新的emit delay之后的处理时间定时器。
> 
> 你可以根据这个原理,再对照下你的数据,看看是否符合预期。
> 
> 刘建刚 <liujiangangp...@gmail.com <mailto:liujiangangp...@gmail.com>> 
> 于2020年4月15日周三 下午3:32写道:
> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
> 
> public class EarlyEmitter {
>    public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       env.setParallelism(1);
> 
>       EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlink
>       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> settings);
> 
>       
> tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled",
>  true);
>       
> tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay",
>  "1000 ms");
> 
>       Table table = tEnv.fromDataStream(
>             env.addSource(new SourceData()), "generate_time, name, city, id, 
> event_time.proctime");
>       tEnv.createTemporaryView("person", table);
> 
>       String emit =
>             "SELECT name, COUNT(DISTINCT id)" +
>                   "FROM person " +
>                   "GROUP BY TUMBLE(event_time, interval '10' second), name";
> 
>       Table result = tEnv.sqlQuery(emit);
>       tEnv.toRetractStream(result, Row.class).print();
> 
>       env.execute("IncrementalGrouping");
>    }
> 
>    private static final class SourceData implements 
> SourceFunction<Tuple4<Long, String, String, Long>> {
>       @Override
>       public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) 
> throws Exception {
>          while (true) {
>             long time = System.currentTimeMillis();
>             ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
>          }
>       }
> 
>       @Override
>       public void cancel() {
> 
>       }
>    }
> }
> 
> 
> 
> 
>> 2020年3月27日 下午3:23,Benchao Li <libenc...@gmail.com 
>> <mailto:libenc...@gmail.com>> 写道:
>> 
>> Hi,
>> 
>> 对于第二个场景,可以尝试一下fast emit:
>> table.exec.emit.early-fire.enabled = true
>> table.exec.emit.early-fire.delay = 5min
>> 
>> PS:
>> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
>> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>> 
>> Jingsong Li <jingsongl...@gmail.com <mailto:jingsongl...@gmail.com>> 
>> 于2020年3月27日周五 下午2:51写道:
>> 
>>> Hi,
>>> 
>>> For #1:
>>> 创建级联的两级window:
>>> - 1分钟窗口
>>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>>> 
>>> Best,
>>> Jingsong Lee
>>> 
>> 
>> 
>> -- 
>> 
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com <mailto:libenc...@gmail.com>; 
>> libenc...@pku.edu.cn <mailto:libenc...@pku.edu.cn>
> 
> 
> 
> -- 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com <mailto:libenc...@gmail.com>; libenc...@pku.edu.cn 
> <mailto:libenc...@pku.edu.cn>

回复