感谢 Benchao,问题应解决了!
> 2020年4月15日 下午3:38,Benchao Li <libenc...@gmail.com> 写道: > > Hi 建刚, > > 现在Emit的原理是这样子的: > - 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器; > - 当定时器到了的时候, > - 检查当前的key下的聚合结果跟上次输出的结果是否有变化, > - 如果有变化,就发送-[old], +[new] 两条结果到下游; > - 如果是没有变化,则不做任何处理; > - 再次注册一个新的emit delay之后的处理时间定时器。 > > 你可以根据这个原理,再对照下你的数据,看看是否符合预期。 > > 刘建刚 <liujiangangp...@gmail.com <mailto:liujiangangp...@gmail.com>> > 于2020年4月15日周三 下午3:32写道: > 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码: > > public class EarlyEmitter { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlink > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, > settings); > > > tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", > true); > > tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", > "1000 ms"); > > Table table = tEnv.fromDataStream( > env.addSource(new SourceData()), "generate_time, name, city, id, > event_time.proctime"); > tEnv.createTemporaryView("person", table); > > String emit = > "SELECT name, COUNT(DISTINCT id)" + > "FROM person " + > "GROUP BY TUMBLE(event_time, interval '10' second), name"; > > Table result = tEnv.sqlQuery(emit); > tEnv.toRetractStream(result, Row.class).print(); > > env.execute("IncrementalGrouping"); > } > > private static final class SourceData implements > SourceFunction<Tuple4<Long, String, String, Long>> { > @Override > public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) > throws Exception { > while (true) { > long time = System.currentTimeMillis(); > ctx.collect(Tuple4.of(time, "flink", "bj", 1L)); > } > } > > @Override > public void cancel() { > > } > } > } > > > > >> 2020年3月27日 下午3:23,Benchao Li <libenc...@gmail.com >> <mailto:libenc...@gmail.com>> 写道: >> >> Hi, >> >> 对于第二个场景,可以尝试一下fast emit: >> table.exec.emit.early-fire.enabled = true >> table.exec.emit.early-fire.delay = 5min >> >> PS: >> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature >> 2. window加了emit之后,会由原来输出append结果变成输出retract结果 >> >> Jingsong Li <jingsongl...@gmail.com <mailto:jingsongl...@gmail.com>> >> 于2020年3月27日周五 下午2:51写道: >> >>> Hi, >>> >>> For #1: >>> 创建级联的两级window: >>> - 1分钟窗口 >>> - 5分钟窗口,计算只是保存数据,发送明细数据结果 >>> >>> Best, >>> Jingsong Lee >>> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com <mailto:libenc...@gmail.com>; >> libenc...@pku.edu.cn <mailto:libenc...@pku.edu.cn> > > > > -- > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com <mailto:libenc...@gmail.com>; libenc...@pku.edu.cn > <mailto:libenc...@pku.edu.cn>