??????????failover
??????????????????????????????????????????????exactly-once????
????????????????????????????????????????????????new task ????????????new
??????????????????????????
??????????????slot??????
??????????
??????????????????????kafka ????????????????yarn ?????????? TM??????TM ????slot
??????????TM ?????????????????? ??????????????????[0]
??????????????????????????????????json??
===create new task :{"business":"test","src":"test","words":"????????????????
[0] 2019-10-14 17:57:47","sid":"test"} ====processing asr data:
{"business":"test","src":"test","words":"???????????????? [0] 2019-10-14
17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid,
hit_details=????, words=???????????????? [0] 2019-10-14 17:57:47, status=1,
hit_logic=basePoliticsWords}, em=OK, ec=0} ===create new task
:{"business":"test","src":"test","words":"???????????????? [0] 2019-10-14
17:57:47","sid":"test"} ====processing asr data:
{"business":"test","src":"test","words":"???????????????? [0] 2019-10-14
17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid,
hit_details=????, words=???????????????? [0] 2019-10-14 17:57:47, status=1,
hit_logic=basePoliticsWords}, em=OK, ec=0}
????????????????flink ???????????????? ??????????????????????????????????????
??????????kafka ????????1????
??????????????????????????????????
????????????????????????????????????????????????100????????????????????
??????????????????100????????????????????????????
????????????????????????????
------------------ ???????? ------------------
??????: "Jark Wu"<[email protected]>;
????????: 2019??10??14??(??????) ????11:23
??????: "user-zh"<[email protected]>;
????: Re: flink 1.6.1 RichAsyncFunction ????????????????????????????????????
Hi,
????????????????????????????????????????????????
1???????????????? failover ??????Flink ???? Kafka ??????????????????exactly
once????
2?? ?????????????????????????????????? ?????? kafka ????????????????????
Best,
Jark
On Sun, 13 Oct 2019 at 12:55, ?????? <[email protected]> wrote:
> ????????
>
>
> ?????????????????? ????????flink 1.6 ??????????????????
> ????????????????????????????????????????????????????????????????????????
>
>
> ????????Function ??????????
>
>
> private static class SimpleAsyncFunction extends RichAsyncFunction<String,
> String> {
> private static final long serialVersionUID = 2098635244857937717L;
>
> private transient ExecutorService executorService;
> private transient Client client ;
>
> private final long shutdownWaitTS=1000;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
>
> executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
> new LinkedBlockingQueue<>(1000),
> new ThreadPoolExecutor.CallerRunsPolicy());
> client= new Client();
> }
>
> @Override
> public void close() throws Exception {
> super.close();
> ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> TimeUnit.MILLISECONDS, executorService);
> }
>
> @Override
> public void asyncInvoke(final String jsonStr, final
> ResultFuture<String> resultFuture) {
> result = client.predict(jsonStr);
> resultFuture.complete(Collections.singletonList(result));}}
> ------------------------------
> dag????????????
> AsyncFunction<String, String> nlpMoaAsyncFunction = new
> SimpleAsyncFunction();
>
> DataStream<String> source = env.addSource(flinkKafkaConsumer010);
>
> DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
> source,
> nlpMoaAsyncFunction,
> timeout,
> TimeUnit.MILLISECONDS,
> 30);
>
> FlinkKafkaProducer010<String> kafkaProducer = new
> FlinkKafkaProducer010<String>(
> ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
> new SimpleStringSchema(),
> producerProp
> );
>
> nlpResult.addSink(kafkaProducer);
>
> -------------------------??????????????????????????????yarn ??10??taskmanager
> ,????tm ????slot??
> ????????????????????????????????bug ??????????????????????