Re: flink-1.11 模拟背压
source算子好像是没有In指标的,只有Out指标; 默认source和map算子会operator chain成一个task,你disable一下operator chain,把map算子作为单独的task,就能在map算子上观察到In和Out了。 背压的话,建议看一下isBackpressured这个指标,我记得是operator级别的,可以看到各个算子的状态。 kcz <573693...@qq.com> 于2020年8月4日周二 上午12:41写道: > 嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。 > > > > > > -- 原始邮件 -- > 发件人: shizk233 发送时间: 2020年8月3日 23:03 > 收件人: user-zh@flink.apache.org 主题: 回复:flink-1.11 模拟背压 > > > > source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 > > kcz <573693...@qq.com 于2020年8月3日周一 下午7:29写道: > > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(2000L, > CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new MemoryStateBackend()); > env.setParallelism(4); > Properties properties = getLocal(); > properties.setProperty("group.id","test"); > FlinkKafkaConsumer new > FlinkKafkaConsumer<gt;("testOrderTopic", new > SimpleStringSchema(), > properties); > DataStream > .addSource(consumer); > stream.map(new MapFunction Tuple2 @Override > public > Tuple2 > Thread.sleep(1000*60*60*60); > > return new Tuple2(1,1); > } > }).keyBy(0).sum(0); > stream.print(); > //stream.map(); > env.execute(); > > }
回复:flink-1.11 模拟背压
嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。 -- 原始邮件 -- 发件人: shizk233
Re: flink-1.11 模拟背压
source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 kcz <573693...@qq.com> 于2020年8月3日周一 下午7:29写道: > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new MemoryStateBackend()); > env.setParallelism(4); > Properties properties = getLocal(); > properties.setProperty("group.id","test"); > FlinkKafkaConsumer FlinkKafkaConsumer<("testOrderTopic", new SimpleStringSchema(), > properties); > DataStream .addSource(consumer); > stream.map(new MapFunction @Override > public Tuple2 Thread.sleep(1000*60*60*60); > return new Tuple2(1,1); > } > }).keyBy(0).sum(0); > stream.print(); > //stream.map(); > env.execute(); > > }
flink-1.11 模拟背压
我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new MemoryStateBackend()); env.setParallelism(4); Properties properties = getLocal(); properties.setProperty("group.id","test"); FlinkKafkaConsumer