???? sorry
?????????????????????? ????????????????????????????????????
????????????????
??????task??
private static class MoaTask implements Callable<String> {
private String jsonStr;
private JudgeClient client;
public MoaTask(String json, JudgeClient client) {
this.jsonStr = json;
this.client= client;
System.out.println("===create new task :" + json);
}
@Override
public String call() throws Exception {
JSONObject jsonObject = JSON.parseObject(jsonStr);
String business = jsonObject.getString("business");
System.out.println("====processing asr data: " + jsonStr);
String result = this.client.predict(jsonObject,"test");
return result;
}
}??????????private static class SimpleAsyncFunction extends
RichAsyncFunction<String, String> {
private static final long serialVersionUID = 2098635244857937717L;
private transient ExecutorService executorService;
private transient JudgeClient moaJudgeClient ;
private final long shutdownWaitTS=1000;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = new ThreadPoolExecutor(10,30,60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
moaJudgeClient = new JudgeClient();
}
@Override
public void close() throws Exception {
super.close();
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS,
executorService);
}
@Override
public void asyncInvoke(final String jsonStr, final ResultFuture<String>
resultFuture) {
Future<String> future = executorService.submit(new
MoaTask(jsonStr,moaJudgeClient));
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try{
return future.get();
}catch (Exception e){
return null;
}
}
}).thenAccept((result)->{
resultFuture.complete(Collections.singleton(result));
});
}
}????????AsyncFunction<String, String> nlpMoaAsyncFunction = new
SimpleAsyncFunction();
DataStream<String> source = env.addSource(flinkKafkaConsumer010);
DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
source,
nlpMoaAsyncFunction,
timeout,
TimeUnit.MILLISECONDS,
30).setParallelism(2);
FlinkKafkaProducer010<String> kafkaProducer = new FlinkKafkaProducer010<String>(
Constant.topic2,
new SimpleStringSchema(),
producerProp
);
nlpResult.addSink(kafkaProducer);
??????????????????
------------------ ???????? ------------------
??????: "Kurt Young"<[email protected]>;
????????: 2019??10??14??(??????) ????6:14
??????: "user-zh"<[email protected]>;
????: Re: flink 1.6.1 RichAsyncFunction ????????????????????????????????????
??????????????????????????????????
ps????????????
Best,
Kurt
On Mon, Oct 14, 2019 at 12:11 PM ?????? <[email protected]> wrote:
>
> ??????????failover
> ??????????????????????????????????????????????exactly-once????
> ????????????????????????????????????????????????new task ????????????new
> ??????????????????????????
> ??????????????slot??????
>
> ??????????
> ??????????????????????kafka ????????????????yarn ?????????? TM??????TM
> ????slot
>
> ??????????TM ?????????????????? ??????????????????[0]
> ??????????????????????????????????json??
>
> ===create new task :{"business":"test","src":"test","words":"????????????????
> [0] 2019-10-14 17:57:47","sid":"test"}
> ====processing asr data:
> {"business":"test","src":"test","words":"???????????????? [0] 2019-10-14
> 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=????,
> words=???????????????? [0] 2019-10-14 17:57:47, status=1,
> hit_logic=basePoliticsWords}, em=OK, ec=0}
> ===create new task :{"business":"test","src":"test","words":"????????????????
> [0] 2019-10-14 17:57:47","sid":"test"}
> ====processing asr data:
> {"business":"test","src":"test","words":"???????????????? [0] 2019-10-14
> 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=????,
> words=???????????????? [0] 2019-10-14 17:57:47, status=1,
> hit_logic=basePoliticsWords}, em=OK, ec=0}
>
>
>
> ????????????????flink ???????????????? ??????????????????????????????????????
> ??????????kafka ????????1????
> ??????????????????????????????????
>
> ????????????????????????????????????????????????100????????????????????
> ??????????????????100????????????????????????????
> ????????????????????????????
>
>
>
>
>
> ------------------ ???????? ------------------
> *??????:* "Jark Wu"<[email protected]>;
> *????????:* 2019??10??14??(??????) ????11:23
> *??????:* "user-zh"<[email protected]>;
> *????:* Re: flink 1.6.1 RichAsyncFunction ????????????????????????????????????
>
> Hi,
>
> ????????????????????????????????????????????????
> 1???????????????? failover ??????Flink ???? Kafka ??????????????????exactly
> once????
> 2?? ?????????????????????????????????? ?????? kafka ????????????????????
>
> Best,
> Jark
>
> On Sun, 13 Oct 2019 at 12:55, ?????? <[email protected]> wrote:
>
> > ????????
> >
> >
> > ?????????????????? ????????flink 1.6 ??????????????????
> > ????????????????????????????????????????????????????????????????????????
> >
> >
> > ????????Function ??????????
> >
> >
> > private static class SimpleAsyncFunction extends
> RichAsyncFunction<String,
> > String> {
> > private static final long serialVersionUID = 2098635244857937717L;
> >
> > private transient ExecutorService executorService;
> > private transient Client client ;
> >
> > private final long shutdownWaitTS=1000;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > super.open(parameters);
> >
> > executorService = new
> ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
> > new LinkedBlockingQueue<>(1000),
> > new ThreadPoolExecutor.CallerRunsPolicy());
> > client= new Client();
> > }
> >
> > @Override
> > public void close() throws Exception {
> > super.close();
> > ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> > TimeUnit.MILLISECONDS, executorService);
> > }
> >
> > @Override
> > public void asyncInvoke(final String jsonStr, final
> > ResultFuture<String> resultFuture) {
> > result = client.predict(jsonStr);
> > resultFuture.complete(Collections.singletonList(result));}}
> > ------------------------------
> > dag????????????
> > AsyncFunction<String, String> nlpMoaAsyncFunction = new
> > SimpleAsyncFunction();
> >
> > DataStream<String> source = env.addSource(flinkKafkaConsumer010);
> >
> > DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
> > source,
> > nlpMoaAsyncFunction,
> > timeout,
> > TimeUnit.MILLISECONDS,
> > 30);
> >
> > FlinkKafkaProducer010<String> kafkaProducer = new
> > FlinkKafkaProducer010<String>(
> > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
> > new SimpleStringSchema(),
> > producerProp
> > );
> >
> > nlpResult.addSink(kafkaProducer);
> >
> > -------------------------??????????????????????????????yarn
> > ??10??taskmanager ,????tm
> ????slot??
> > ????????????????????????????????bug ??????????????????????
>