????????

?????????????????? ????????flink 1.6 ??????????????????
????????????????????????????????????????????????????????????????????????


????????Function ??????????


private static class SimpleAsyncFunction extends RichAsyncFunction<String, 
String> {
    private static final long serialVersionUID = 2098635244857937717L;

    private transient ExecutorService executorService;
    private  transient Client client ;

    private final long shutdownWaitTS=1000;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());
         client= new Client();
    }

    @Override
    public void close() throws Exception {
        super.close();
        ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, 
executorService);
    }

    @Override
    public void asyncInvoke(final String jsonStr, final ResultFuture<String> 
resultFuture) {
          result = client.predict(jsonStr);          
resultFuture.complete(Collections.singletonList(result));}}
------------------------------
dag????????????
AsyncFunction<String, String> nlpMoaAsyncFunction = new SimpleAsyncFunction();

DataStream<String> source = env.addSource(flinkKafkaConsumer010);

DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
        source,
        nlpMoaAsyncFunction,
        timeout,
        TimeUnit.MILLISECONDS,
        30);

FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>(
        ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
        new SimpleStringSchema(),
        producerProp
);

nlpResult.addSink(kafkaProducer);

-------------------------??????????????????????????????yarn ??10??taskmanager 
,????tm ????slot?? ????????????????????????????????bug ??????????????????????

回复