????????
?????????????????? ????????flink 1.6 ??????????????????
????????????????????????????????????????????????????????????????????????
????????Function ??????????
private static class SimpleAsyncFunction extends RichAsyncFunction<String,
String> {
private static final long serialVersionUID = 2098635244857937717L;
private transient ExecutorService executorService;
private transient Client client ;
private final long shutdownWaitTS=1000;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
client= new Client();
}
@Override
public void close() throws Exception {
super.close();
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS,
executorService);
}
@Override
public void asyncInvoke(final String jsonStr, final ResultFuture<String>
resultFuture) {
result = client.predict(jsonStr);
resultFuture.complete(Collections.singletonList(result));}}
------------------------------
dag????????????
AsyncFunction<String, String> nlpMoaAsyncFunction = new SimpleAsyncFunction();
DataStream<String> source = env.addSource(flinkKafkaConsumer010);
DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
source,
nlpMoaAsyncFunction,
timeout,
TimeUnit.MILLISECONDS,
30);
FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>(
ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
new SimpleStringSchema(),
producerProp
);
nlpResult.addSink(kafkaProducer);
-------------------------??????????????????????????????yarn ??10??taskmanager
,????tm ????slot?? ????????????????????????????????bug ??????????????????????