Hi, 从代码上看,并没有发现什么问题,给一些排查建议: 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢?
Best, Jark On Sun, 13 Oct 2019 at 12:55, 曾耀武 <[email protected]> wrote: > 大家好, > > > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式 > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。 > > > 我的异步Function 片段如下。 > > > private static class SimpleAsyncFunction extends RichAsyncFunction<String, > String> { > private static final long serialVersionUID = 2098635244857937717L; > > private transient ExecutorService executorService; > private transient Client client ; > > private final long shutdownWaitTS=1000; > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS, > new LinkedBlockingQueue<>(1000), > new ThreadPoolExecutor.CallerRunsPolicy()); > client= new Client(); > } > > @Override > public void close() throws Exception { > super.close(); > ExecutorUtils.gracefulShutdown(shutdownWaitTS, > TimeUnit.MILLISECONDS, executorService); > } > > @Override > public void asyncInvoke(final String jsonStr, final > ResultFuture<String> resultFuture) { > result = client.predict(jsonStr); > resultFuture.complete(Collections.singletonList(result));}} > ------------------------------ > dag构建部分为: > AsyncFunction<String, String> nlpMoaAsyncFunction = new > SimpleAsyncFunction(); > > DataStream<String> source = env.addSource(flinkKafkaConsumer010); > > DataStream<String> nlpResult = AsyncDataStream.unorderedWait( > source, > nlpMoaAsyncFunction, > timeout, > TimeUnit.MILLISECONDS, > 30); > > FlinkKafkaProducer010<String> kafkaProducer = new > FlinkKafkaProducer010<String>( > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC, > new SimpleStringSchema(), > producerProp > ); > > nlpResult.addSink(kafkaProducer); > > -------------------------整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm 一个slot。 > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?
