??????????failover 
??????????????????????????????????????????????exactly-once????
????????????????????????????????????????????????new task ????????????new 
??????????????????????????
??????????????slot??????



??????????

??????????????????????kafka ????????????????yarn ??????????  TM??????TM ????slot


??????????TM ?????????????????? ??????????????????[0]  
??????????????????????????????????json??


===create new task :{"business":"test","src":"test","words":"???????????????? 
[0] 2019-10-14 17:57:47","sid":"test"} ====processing asr data: 
{"business":"test","src":"test","words":"???????????????? [0] 2019-10-14 
17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid, 
hit_details=????, words=???????????????? [0] 2019-10-14 17:57:47, status=1, 
hit_logic=basePoliticsWords}, em=OK, ec=0} ===create new task 
:{"business":"test","src":"test","words":"???????????????? [0] 2019-10-14 
17:57:47","sid":"test"} ====processing asr data: 
{"business":"test","src":"test","words":"???????????????? [0] 2019-10-14 
17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid, 
hit_details=????, words=???????????????? [0] 2019-10-14 17:57:47, status=1, 
hit_logic=basePoliticsWords}, em=OK, ec=0}




????????????????flink ???????????????? ?????????????????????????????????????? 
??????????kafka ????????1????
??????????????????????????????????


????????????????????????????????????????????????100???????????????????? 
??????????????????100????????????????????????????
????????????????????????????










------------------ ???????? ------------------
??????: "Jark Wu"<[email protected]>;
????????: 2019??10??14??(??????) ????11:23
??????: "user-zh"<[email protected]>;

????: Re: flink 1.6.1 RichAsyncFunction ????????????????????????????????????



Hi,

????????????????????????????????????????????????
1???????????????? failover ??????Flink ???? Kafka ??????????????????exactly 
once????
2?? ?????????????????????????????????? ?????? kafka ????????????????????

Best,
Jark

On Sun, 13 Oct 2019 at 12:55, ?????? <[email protected]> wrote:

> ????????
>
>
> ?????????????????? ????????flink 1.6 ??????????????????
> ????????????????????????????????????????????????????????????????????????
>
>
> ????????Function ??????????
>
>
> private static class SimpleAsyncFunction extends RichAsyncFunction<String,
> String> {
>     private static final long serialVersionUID = 2098635244857937717L;
>
>     private transient ExecutorService executorService;
>     private  transient Client client ;
>
>     private final long shutdownWaitTS=1000;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>
>         executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
>                 new LinkedBlockingQueue<>(1000),
>                 new ThreadPoolExecutor.CallerRunsPolicy());
>          client= new Client();
>     }
>
>     @Override
>     public void close() throws Exception {
>         super.close();
>         ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> TimeUnit.MILLISECONDS, executorService);
>     }
>
>     @Override
>     public void asyncInvoke(final String jsonStr, final
> ResultFuture<String> resultFuture) {
>           result = client.predict(jsonStr);
> resultFuture.complete(Collections.singletonList(result));}}
> ------------------------------
> dag????????????
> AsyncFunction<String, String> nlpMoaAsyncFunction = new
> SimpleAsyncFunction();
>
> DataStream<String> source = env.addSource(flinkKafkaConsumer010);
>
> DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
>         source,
>         nlpMoaAsyncFunction,
>         timeout,
>         TimeUnit.MILLISECONDS,
>         30);
>
> FlinkKafkaProducer010<String> kafkaProducer = new
> FlinkKafkaProducer010<String>(
>         ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
>         new SimpleStringSchema(),
>         producerProp
> );
>
> nlpResult.addSink(kafkaProducer);
>
> -------------------------??????????????????????????????yarn ??10??taskmanager 
> ,????tm ????slot??
> ????????????????????????????????bug ??????????????????????

回复