嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。





------------------ 原始邮件 ------------------
发件人: shizk233 <wangwangdaxian...@gmail.com&gt;
发送时间: 2020年8月3日 23:03
收件人: user-zh@flink.apache.org <user-zh@flink.apache.org&gt;
主题: 回复:flink-1.11 模拟背压



source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗

kcz <573693...@qq.com&gt; 于2020年8月3日周一 下午7:29写道:

&gt; 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
&gt; public static void main(String[] args) throws Exception{
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env =
&gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.enableCheckpointing(2000L, 
CheckpointingMode.EXACTLY_ONCE);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setStateBackend(new MemoryStateBackend());
&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(4);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = getLocal();
&gt;&nbsp;&nbsp;&nbsp;&nbsp; properties.setProperty("group.id","test");
&gt;&nbsp;&nbsp;&nbsp;&nbsp; FlinkKafkaConsumer<String&amp;gt; consumer = new
&gt; FlinkKafkaConsumer<&amp;gt;("testOrderTopic", new SimpleStringSchema(),
&gt; properties);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream = env
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
.addSource(consumer);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.map(new MapFunction<String, 
Tuple2<Integer,Integer&amp;gt;&amp;gt;() {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public 
Tuple2<Integer,Integer&amp;gt; map(String s) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
Thread.sleep(1000*60*60*60);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
return new Tuple2(1,1);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }).keyBy(0).sum(0);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.print();
&gt;&nbsp;&nbsp;&nbsp;&nbsp; //stream.map();
&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.execute();
&gt;
&gt; }

回复