Hi,

从代码上看,并没有发现什么问题,给一些排查建议:
1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。
2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢?

Best,
Jark

On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[email protected]> wrote:

> 大家好,
>
>
> 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式
> 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。
>
>
> 我的异步Function 片段如下。
>
>
> private static class SimpleAsyncFunction extends RichAsyncFunction<String,
> String> {
>     private static final long serialVersionUID = 2098635244857937717L;
>
>     private transient ExecutorService executorService;
>     private  transient Client client ;
>
>     private final long shutdownWaitTS=1000;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>
>         executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
>                 new LinkedBlockingQueue<>(1000),
>                 new ThreadPoolExecutor.CallerRunsPolicy());
>          client= new Client();
>     }
>
>     @Override
>     public void close() throws Exception {
>         super.close();
>         ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> TimeUnit.MILLISECONDS, executorService);
>     }
>
>     @Override
>     public void asyncInvoke(final String jsonStr, final
> ResultFuture<String> resultFuture) {
>           result = client.predict(jsonStr);
> resultFuture.complete(Collections.singletonList(result));}}
> ------------------------------
> dag构建部分为:
> AsyncFunction<String, String> nlpMoaAsyncFunction = new
> SimpleAsyncFunction();
>
> DataStream<String> source = env.addSource(flinkKafkaConsumer010);
>
> DataStream<String> nlpResult = AsyncDataStream.unorderedWait(
>         source,
>         nlpMoaAsyncFunction,
>         timeout,
>         TimeUnit.MILLISECONDS,
>         30);
>
> FlinkKafkaProducer010<String> kafkaProducer = new
> FlinkKafkaProducer010<String>(
>         ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
>         new SimpleStringSchema(),
>         producerProp
> );
>
> nlpResult.addSink(kafkaProducer);
>
> -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm 一个slot。
> 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?

回复