??????????????chain??????????????????????????????????????????????????????????????????????????????????


????????????????????????????????????????????????????????????taskmanager??????slot??????


jobmanager 2G,????taskmanager4g??????taskmanager 4??slot??????????4??8G
??????????????timewindow??????????????????????????????????????????????????????


???? ??????????4??5????


????????????????


????????


// ??????
Constant.PARALLELISM = 8


DataStream<List<OrderInfo>> thisData = lastData.keyBy(new 
MyKeySelector()).timeWindow(Time.milliseconds(5000))
                .process(new MyWindowFunction())
                .setParallelism(Constant.PARALLELISM);


thisData.addSink(new SinkToDb()).setParallelism(Constant.PARALLELISM);


??????KeySelector


public class MyKeySelector implements KeySelector<OrderInfo, String> {


    @Override
    public String getKey(OrderInfo value) throws Exception {


        String uniqueId = value.getBusiCode() + value.getOrderNo();


        int mod = Math.abs(uniqueId.hashCode() % Constant.PARALLELISM);


        return String.valueOf(mod);
    }
}




??????ProcessWindowFunction


public class MyWindowFunction extends ProcessWindowFunction<OrderInfo, 
List<OrderInfo>, String, TimeWindow> {


    @Override
    public void process(String key, Context context, Iterable<OrderInfo> 
elements, Collector<List<OrderInfo>> out) throws Exception {
        ArrayList<OrderInfo> OrderInfoArrayList = Lists.newArrayList(elements);
        out.collect(OrderInfoArrayList);
    }
}







------------------ ???????? ------------------
??????: "Shi Quan"<[email protected]>;
????????: 2019??4??23??(??????) ????10:02
??????: "[email protected]"<[email protected]>;

????: RE: flink??????????????



????????????????????????????????????????????????

????????????????????????????????????????????????????????????????????????



??????????????????

  1.  
????????????????????????????operator??????????????????????????????????????????????????????????????????????????????????????????????????chain????????????????????????????????
  2.  ??kafka??????????you are 
right????????source??????????????????????????????????
  3.  kafka source????????????????????????????????????????????????????????
  4.  & 5 ??????????????????????????????





Sent from Mail<https://go.microsoft.com/fwlink/?LinkId=550986> for Windows 10



________________________________
From: 1900 <[email protected]>
Sent: Tuesday, April 23, 2019 9:47:08 AM
To: user-zh
Subject: flink??????????????

????flink????????????????1.7.2??hadoop??????2.8.5,????flink on yarn 
ha?????????????????? run a job on yarn


??????????????????kafka??????????????window??????????????????5??????????????????list??????????????????????????db


1.??????????????????????????????????????
2.????kafka????????????????????????????topic????????????????topic??8??????????????????8????
 ????????????????????????
3.??????????????????????kafka??????????????????????????????????????????????window????????????????????
????????????kafka??????????????????????4????????????????????kafka??????????????????????????kafka??????????1????
??????????????????????????????????????????kafka????????????????????????????????????
4.????????????????????key????????????????????ID??????hash??????????????????????????
??????????????????kafka????????????????????????????????????
??????DB??????????????????????????????????????????????????????????????????????????????kafka??????????????????????????
5.????????????????key????????????????????????????????????????????????slot????????????????????????8??????????????????6????????????????????
??2????????????????????????????????????????????????????????????????

回复