我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new MemoryStateBackend()); env.setParallelism(4); Properties properties = getLocal(); properties.setProperty("group.id","test"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("testOrderTopic", new SimpleStringSchema(), properties); DataStream<String> stream = env .addSource(consumer); stream.map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer,Integer> map(String s) throws Exception { Thread.sleep(1000*60*60*60); return new Tuple2(1,1); } }).keyBy(0).sum(0); stream.print(); //stream.map(); env.execute(); }