???????? ????????



------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<wangwangdaxian...@gmail.com&gt;;
????????:&nbsp;2020??8??4??(??????) ????1:13
??????:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: flink-1.11 ????????



source??????????????In??????,????Out??????
????source??map??????operator chain??????task????disable????operator
chain,??map??????????????task????????map????????????In??Out????
????????????????????isBackpressured??????????????????operator????????????????????????????????

kcz <573693...@qq.com&gt; ??2020??8??4?????? ????12:41??????

&gt; ???? yeah??ui??????????????????????????souce????????????????map 
sleep???????????????????????? ??????????????100w????????
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ------------------ ???????? ------------------
&gt; ??????: shizk233 <wangwangdaxian...@gmail.com&amp;gt;
&gt; ????????: 2020??8??3?? 23:03
&gt; ??????: user-zh@flink.apache.org <user-zh@flink.apache.org&amp;gt;
&gt; ????: ??????flink-1.11 ????????
&gt;
&gt;
&gt;
&gt; source??????????????????????????????web ui??task??numOfRecordsIn??
&gt;
&gt; kcz <573693...@qq.com&amp;gt; ??2020??8??3?????? ????7:29??????
&gt;
&gt; &amp;gt; 
????????????????????????????kafka??????100w??????????source??????????????????????????????????????????
&gt; &amp;gt; public static void main(String[] args) throws Exception{
&gt; &amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
StreamExecutionEnvironment env =
&gt; &amp;gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
env.enableCheckpointing(2000L,
&gt; CheckpointingMode.EXACTLY_ONCE);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; env.setStateBackend(new 
MemoryStateBackend());
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; env.setParallelism(4);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; Properties properties = 
getLocal();
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
properties.setProperty("group.id","test");
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
FlinkKafkaConsumer<String&amp;amp;gt; consumer =
&gt; new
&gt; &amp;gt; FlinkKafkaConsumer<&amp;amp;gt;("testOrderTopic", new
&gt; SimpleStringSchema(),
&gt; &amp;gt; properties);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
DataStream<String&amp;amp;gt; stream = env
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .addSource(consumer);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; stream.map(new 
MapFunction<String,
&gt; Tuple2<Integer,Integer&amp;amp;gt;&amp;amp;gt;() {
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 @Override
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 public
&gt; Tuple2<Integer,Integer&amp;amp;gt; map(String s) throws Exception {
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; Thread.sleep(1000*60*60*60);
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; return new Tuple2(1,1);
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 }
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }).keyBy(0).sum(0);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; stream.print();
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //stream.map();
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; env.execute();
&gt; &amp;gt;
&gt; &amp;gt; }

回复