???? sorry
?????????????????????? ????????????????????????????????????

????????????????


??????task??
private static class MoaTask implements Callable<String> {

    private String jsonStr;
    private JudgeClient client;

    public MoaTask(String json, JudgeClient client) {
        this.jsonStr = json;
        this.client= client;
        System.out.println("===create new task :" + json);
    }
    @Override
    public String call() throws Exception {

        JSONObject jsonObject = JSON.parseObject(jsonStr);
        String business = jsonObject.getString("business");
        System.out.println("====processing asr data: " + jsonStr);
        String result = this.client.predict(jsonObject,"test");
        return result;
    }
}??????????private static class SimpleAsyncFunction extends 
RichAsyncFunction<String, String> {
    private static final long serialVersionUID = 2098635244857937717L;
    private transient ExecutorService executorService;
    private  transient JudgeClient moaJudgeClient ;
    private final long shutdownWaitTS=1000;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executorService = new ThreadPoolExecutor(10,30,60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());
        moaJudgeClient = new JudgeClient();
    }
    @Override
    public void close() throws Exception {
        super.close();
        ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, 
executorService);
    }
    @Override
    public void asyncInvoke(final String jsonStr, final ResultFuture<String> 
resultFuture) {
        Future<String> future = executorService.submit(new 
MoaTask(jsonStr,moaJudgeClient));
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try{
                    return future.get();

                }catch (Exception e){
                    return null;
                }
            }
        }).thenAccept((result)->{
            resultFuture.complete(Collections.singleton(result));
        });
    }
}????????AsyncFunction<String, String> nlpMoaAsyncFunction = new 
SimpleAsyncFunction();
DataStream<String> source = env.addSource(flinkKafkaConsumer010);
DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
        source,
        nlpMoaAsyncFunction,
        timeout,
        TimeUnit.MILLISECONDS,
        30).setParallelism(2);
FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>(
        Constant.topic2,
        new SimpleStringSchema(),
        producerProp
);
nlpResult.addSink(kafkaProducer);


??????????????????






------------------ ???????? ------------------
??????: "Kurt Young"<[email protected]>;
????????: 2019??10??14??(??????) ????6:14
??????: "user-zh"<[email protected]>;

????: Re: flink 1.6.1 RichAsyncFunction ????????????????????????????????????



??????????????????????????????????
ps????????????

Best,
Kurt


On Mon, Oct 14, 2019 at 12:11 PM ?????? <[email protected]> wrote:

>
> ??????????failover 
> ??????????????????????????????????????????????exactly-once????
> ????????????????????????????????????????????????new task ????????????new 
> ??????????????????????????
> ??????????????slot??????
>
> ??????????
> ??????????????????????kafka ????????????????yarn ??????????  TM??????TM 
> ????slot
>
> ??????????TM ?????????????????? ??????????????????[0]  
> ??????????????????????????????????json??
>
> ===create new task :{"business":"test","src":"test","words":"???????????????? 
> [0] 2019-10-14 17:57:47","sid":"test"}
> ====processing asr data: 
> {"business":"test","src":"test","words":"???????????????? [0] 2019-10-14 
> 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=????, 
> words=???????????????? [0] 2019-10-14 17:57:47, status=1, 
> hit_logic=basePoliticsWords}, em=OK, ec=0}
> ===create new task :{"business":"test","src":"test","words":"???????????????? 
> [0] 2019-10-14 17:57:47","sid":"test"}
> ====processing asr data: 
> {"business":"test","src":"test","words":"???????????????? [0] 2019-10-14 
> 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=????, 
> words=???????????????? [0] 2019-10-14 17:57:47, status=1, 
> hit_logic=basePoliticsWords}, em=OK, ec=0}
>
>
>
> ????????????????flink ???????????????? ?????????????????????????????????????? 
> ??????????kafka ????????1????
> ??????????????????????????????????
>
> ????????????????????????????????????????????????100???????????????????? 
> ??????????????????100????????????????????????????
> ????????????????????????????
>
>
>
>
>
> ------------------ ???????? ------------------
> *??????:* "Jark Wu"<[email protected]>;
> *????????:* 2019??10??14??(??????) ????11:23
> *??????:* "user-zh"<[email protected]>;
> *????:* Re: flink 1.6.1 RichAsyncFunction ????????????????????????????????????
>
> Hi,
>
> ????????????????????????????????????????????????
> 1???????????????? failover ??????Flink ???? Kafka ??????????????????exactly 
> once????
> 2?? ?????????????????????????????????? ?????? kafka ????????????????????
>
> Best,
> Jark
>
> On Sun, 13 Oct 2019 at 12:55, ?????? <[email protected]> wrote:
>
> > ????????
> >
> >
> > ?????????????????? ????????flink 1.6 ??????????????????
> > ????????????????????????????????????????????????????????????????????????
> >
> >
> > ????????Function ??????????
> >
> >
> > private static class SimpleAsyncFunction extends
> RichAsyncFunction<String,
> > String> {
> >     private static final long serialVersionUID = 2098635244857937717L;
> >
> >     private transient ExecutorService executorService;
> >     private  transient Client client ;
> >
> >     private final long shutdownWaitTS=1000;
> >
> >     @Override
> >     public void open(Configuration parameters) throws Exception {
> >         super.open(parameters);
> >
> >         executorService = new
> ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
> >                 new LinkedBlockingQueue<>(1000),
> >                 new ThreadPoolExecutor.CallerRunsPolicy());
> >          client= new Client();
> >     }
> >
> >     @Override
> >     public void close() throws Exception {
> >         super.close();
> >         ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> > TimeUnit.MILLISECONDS, executorService);
> >     }
> >
> >     @Override
> >     public void asyncInvoke(final String jsonStr, final
> > ResultFuture<String> resultFuture) {
> >           result = client.predict(jsonStr);
> > resultFuture.complete(Collections.singletonList(result));}}
> > ------------------------------
> > dag????????????
> > AsyncFunction<String, String> nlpMoaAsyncFunction = new
> > SimpleAsyncFunction();
> >
> > DataStream<String> source = env.addSource(flinkKafkaConsumer010);
> >
> > DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
> >         source,
> >         nlpMoaAsyncFunction,
> >         timeout,
> >         TimeUnit.MILLISECONDS,
> >         30);
> >
> > FlinkKafkaProducer010<String> kafkaProducer = new
> > FlinkKafkaProducer010<String>(
> >         ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
> >         new SimpleStringSchema(),
> >         producerProp
> > );
> >
> > nlpResult.addSink(kafkaProducer);
> >
> > -------------------------??????????????????????????????yarn 
> > ??10??taskmanager ,????tm
> ????slot??
> > ????????????????????????????????bug ??????????????????????
>

回复