Re: flink-1.11 模拟背压

2020-08-03 文章 shizk233
source算子好像是没有In指标的,只有Out指标;
默认source和map算子会operator chain成一个task,你disable一下operator
chain,把map算子作为单独的task,就能在map算子上观察到In和Out了。
背压的话,建议看一下isBackpressured这个指标,我记得是operator级别的,可以看到各个算子的状态。

kcz <573693...@qq.com> 于2020年8月4日周二 上午12:41写道:

> 嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。
>
>
>
>
>
> -- 原始邮件 --
> 发件人: shizk233  发送时间: 2020年8月3日 23:03
> 收件人: user-zh@flink.apache.org  主题: 回复:flink-1.11 模拟背压
>
>
>
> source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗
>
> kcz <573693...@qq.com 于2020年8月3日周一 下午7:29写道:
>
>  我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
>  public static void main(String[] args) throws Exception{
> 
>  StreamExecutionEnvironment env =
>  StreamExecutionEnvironment.getExecutionEnvironment();
>  env.enableCheckpointing(2000L,
> CheckpointingMode.EXACTLY_ONCE);
>  env.setStateBackend(new MemoryStateBackend());
>  env.setParallelism(4);
>  Properties properties = getLocal();
>  properties.setProperty("group.id","test");
>  FlinkKafkaConsumer new
>  FlinkKafkaConsumer<gt;("testOrderTopic", new
> SimpleStringSchema(),
>  properties);
>  DataStream 
> .addSource(consumer);
>  stream.map(new MapFunction Tuple2  @Override
>  public
> Tuple2 
> Thread.sleep(1000*60*60*60);
> 
> return new Tuple2(1,1);
>  }
>  }).keyBy(0).sum(0);
>  stream.print();
>  //stream.map();
>  env.execute();
> 
>  }


回复:flink-1.11 模拟背压

2020-08-03 文章 kcz
嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。





-- 原始邮件 --
发件人: shizk233 

Re: flink-1.11 模拟背压

2020-08-03 文章 shizk233
source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗

kcz <573693...@qq.com> 于2020年8月3日周一 下午7:29写道:

> 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
> public static void main(String[] args) throws Exception{
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new MemoryStateBackend());
> env.setParallelism(4);
> Properties properties = getLocal();
> properties.setProperty("group.id","test");
> FlinkKafkaConsumer FlinkKafkaConsumer<("testOrderTopic", new SimpleStringSchema(),
> properties);
> DataStream .addSource(consumer);
> stream.map(new MapFunction @Override
> public Tuple2 Thread.sleep(1000*60*60*60);
> return new Tuple2(1,1);
> }
> }).keyBy(0).sum(0);
> stream.print();
> //stream.map();
> env.execute();
>
> }


flink-1.11 模拟背压

2020-08-03 文章 kcz
我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new MemoryStateBackend());
env.setParallelism(4);
Properties properties = getLocal();
properties.setProperty("group.id","test");
FlinkKafkaConsumer