非常开心能够帮助到你~ 刘建刚 <[email protected]> 于2020年4月15日周三 下午3:57写道:
> 感谢 Benchao,问题应解决了! > > 2020年4月15日 下午3:38,Benchao Li <[email protected]> 写道: > > Hi 建刚, > > 现在Emit的原理是这样子的: > - *当某个key*下面来了第一条数据的时候,注册一个emit delay之后的*处理时间定时器*; > - 当定时器到了的时候, > - 检查当前的key下的聚合结果跟上次输出的结果是否有变化, > - 如果有变化,就发送-[old], +[new] 两条结果到下游; > - 如果是*没有变化,则不做任何处理*; > - 再次注册一个新的emit delay之后的处理时间定时器。 > > 你可以根据这个原理,再对照下你的数据,看看是否符合预期。 > > 刘建刚 <[email protected]> 于2020年4月15日周三 下午3:32写道: > >> >> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码: >> >> public class EarlyEmitter { >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setParallelism(1); >> >> EnvironmentSettings settings = >> EnvironmentSettings.newInstance().useBlink >> >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> settings); >> >> tEnv.getConfig().getConfiguration().setBoolean( >> "table.exec.emit.early-fire.enabled", true); >> tEnv.getConfig().getConfiguration().setString( >> "table.exec.emit.early-fire.delay", "1000 ms"); >> >> Table table = tEnv.fromDataStream( >> env.addSource(new SourceData()), "generate_time, name, city, id, >> event_time.proctime"); >> tEnv.createTemporaryView("person", table); >> >> String emit = >> "SELECT name, COUNT(DISTINCT id)" + >> "FROM person " + >> "GROUP BY TUMBLE(event_time, interval '10' second), name"; >> >> Table result = tEnv.sqlQuery(emit); >> tEnv.toRetractStream(result, Row.class).print(); >> >> env.execute("IncrementalGrouping"); >> } >> >> private static final class SourceData implements >> SourceFunction<Tuple4<Long, String, String, Long>> { >> @Override >> public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) throws >> Exception { >> while (true) { >> long time = System.currentTimeMillis(); >> ctx.collect(Tuple4.of(time, "flink", "bj", 1L)); >> } >> } >> >> @Override >> public void cancel() { >> >> } >> } >> } >> >> >> >> >> >> 2020年3月27日 下午3:23,Benchao Li <[email protected]> 写道: >> >> Hi, >> >> 对于第二个场景,可以尝试一下fast emit: >> table.exec.emit.early-fire.enabled = true >> table.exec.emit.early-fire.delay = 5min >> >> PS: >> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature >> 2. window加了emit之后,会由原来输出append结果变成输出retract结果 >> >> Jingsong Li <[email protected]> 于2020年3月27日周五 下午2:51写道: >> >> Hi, >> >> For #1: >> 创建级联的两级window: >> - 1分钟窗口 >> - 5分钟窗口,计算只是保存数据,发送明细数据结果 >> >> Best, >> Jingsong Lee >> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: [email protected] <[email protected]>; [email protected] >> >> >> > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [email protected]; [email protected] > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [email protected]; [email protected]
