嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。
------------------ 原始邮件 ------------------ 发件人: shizk233 <wangwangdaxian...@gmail.com> 发送时间: 2020年8月3日 23:03 收件人: user-zh@flink.apache.org <user-zh@flink.apache.org> 主题: 回复:flink-1.11 模拟背压 source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 kcz <573693...@qq.com> 于2020年8月3日周一 下午7:29写道: > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new MemoryStateBackend()); > env.setParallelism(4); > Properties properties = getLocal(); > properties.setProperty("group.id","test"); > FlinkKafkaConsumer<String&gt; consumer = new > FlinkKafkaConsumer<&gt;("testOrderTopic", new SimpleStringSchema(), > properties); > DataStream<String&gt; stream = env > .addSource(consumer); > stream.map(new MapFunction<String, Tuple2<Integer,Integer&gt;&gt;() { > @Override > public Tuple2<Integer,Integer&gt; map(String s) throws Exception { > Thread.sleep(1000*60*60*60); > return new Tuple2(1,1); > } > }).keyBy(0).sum(0); > stream.print(); > //stream.map(); > env.execute(); > > }