group.id更改,通过savepoint启动的Flink任务,Kafka consumer是否仍然可以获取到保存在状态中的start position?
Hi,all根据文档,如果从checkpoint或者savepoint中恢复任务,则Kafka Consumer会使用状态中的start position。 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-consumers-start-position-configuration 请问通过savepoint恢复的任务,如果group.id更改,Kafka consumer是否仍然可以获取到保存在状态中的start position?
?????? flink 1.6.1 RichAsyncFunction ????????????????????????????????????
sorry ?? ??task?? private static class MoaTask implements Callable { private String jsonStr; private JudgeClient client; public MoaTask(String json, JudgeClient client) { this.jsonStr = json; this.client= client; System.out.println("===create new task :" + json); } @Override public String call() throws Exception { JSONObject jsonObject = JSON.parseObject(jsonStr); String business = jsonObject.getString("business"); System.out.println("processing asr data: " + jsonStr); String result = this.client.predict(jsonObject,"test"); return result; } }??private static class SimpleAsyncFunction extends RichAsyncFunction { private static final long serialVersionUID = 2098635244857937717L; private transient ExecutorService executorService; private transient JudgeClient moaJudgeClient ; private final long shutdownWaitTS=1000; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); executorService = new ThreadPoolExecutor(10,30,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); moaJudgeClient = new JudgeClient(); } @Override public void close() throws Exception { super.close(); ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService); } @Override public void asyncInvoke(final String jsonStr, final ResultFuture resultFuture) { Future future = executorService.submit(new MoaTask(jsonStr,moaJudgeClient)); CompletableFuture.supplyAsync(new Supplier() { @Override public String get() { try{ return future.get(); }catch (Exception e){ return null; } } }).thenAccept((result)->{ resultFuture.complete(Collections.singleton(result)); }); } }AsyncFunction nlpMoaAsyncFunction = new SimpleAsyncFunction(); DataStream source = env.addSource(flinkKafkaConsumer010); DataStream nlpResult = AsyncDataStream.unorderedWait( source, nlpMoaAsyncFunction, timeout, TimeUnit.MILLISECONDS, 30).setParallelism(2); FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010( Constant.topic2, new SimpleStringSchema(), producerProp ); nlpResult.addSink(kafkaProducer); ?? -- -- ??: "Kurt Young"; : 2019??10??14??(??) 6:14 ??: "user-zh"; : Re: flink 1.6.1 RichAsyncFunction ?? ps Best, Kurt On Mon, Oct 14, 2019 at 12:11 PM ?? <513797...@qq.com> wrote: > > ??failover > ??exactly-once > new task new > ?? > ??slot?? > > ?? > ??kafka yarn ?? TM??TM > slot > > ??TM ?? ??[0] > ??json?? > > ===create new task :{"business":"test","src":"test","words":" > [0] 2019-10-14 17:57:47","sid":"test"} > processing asr data: > {"business":"test","src":"test","words":" [0] 2019-10-14 > 17:57:47","sid":"test"} > === the moa resonse is: {result={s_id=userid, hit_details=, > words= [0] 2019-10-14 17:57:47, status=1, > hit_logic=basePoliticsWords}, em=OK, ec=0} > ===create new task :{"business":"test","src":"test","words":" > [0] 2019-10-14 17:57:47","sid":"test"} > processing asr data: > {"business":"test","src":"test","words":" [0] 2019-10-14 > 17:57:47","sid":"test"} > === the moa resonse is: {result={s_id=userid, hit_details=, > words= [0] 2019-10-14 17:57:47, status=1, > hit_logic=basePoliticsWords}, em=OK, ec=0} > > > > flink ?? > ??kafka 1 > ?? > > 100 > ??100 > > > > > > > -- -- > *??:* "Jark Wu"; > *:* 2019??10??14??(??) 11:23 > *??:* "user-zh"; > *:* Re: flink 1.6.1 RichAsyncFunction > > Hi, > > > 1 failover ??Flink Kafka ??exactly > once > 2??
Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复
这位朋友你的测试数据文明一点呀。。 ps:图片看不到 Best, Kurt On Mon, Oct 14, 2019 at 12:11 PM 曾耀武 <513797...@qq.com> wrote: > > 作业是没有failover 。执行是正常的。但是官方文档上不是说那个是保证exactly-once么? > 我稍微改了一下代码,还是有同样的重复问题,好像在new task 的时候被重复new 了多次。这个不知道是为何, > 而且是在同一个slot里面。 > > 异步函数: > 然后我在执行的时候,往kafka 打了一条数据,在yarn 上起了两个 TM,每个TM 一个slot > > 然后在一个TM 上打印了这个日志: 我的日志是有编号的[0] 这个,一条输入是一个需要外部检测的json。 > > ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] > 2019-10-14 17:57:47","sid":"test"} > processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 > [0] 2019-10-14 17:57:47","sid":"test"} > === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 > [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} > ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] > 2019-10-14 17:57:47","sid":"test"} > processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 > [0] 2019-10-14 17:57:47","sid":"test"} > === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 > [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} > > > > 有时候这些记录在flink 里经过处理之后, 日志打印了两边,好像是处理了两遍,但是 写到下游的kafka 可能只有1个。 > 理论期望值是一条输入对应一条输出。 > > 但是在一百万的测试数据上,上游发送肯定是只发送了100万条,带编号递增的。 但是下游去重之后是100万,不去重就有一百多万,可能 > 会多好几万的样子。比较奇怪。 > > > > > > -- 原始邮件 -- > *发件人:* "Jark Wu"; > *发送时间:* 2019年10月14日(星期一) 中午11:23 > *收件人:* "user-zh"; > *主题:* Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复 > > Hi, > > 从代码上看,并没有发现什么问题,给一些排查建议: > 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。 > 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢? > > Best, > Jark > > On Sun, 13 Oct 2019 at 12:55, 曾耀武 <513797...@qq.com> wrote: > > > 大家好, > > > > > > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式 > > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。 > > > > > > 我的异步Function 片段如下。 > > > > > > private static class SimpleAsyncFunction extends > RichAsyncFunction > String> { > > private static final long serialVersionUID = 2098635244857937717L; > > > > private transient ExecutorService executorService; > > private transient Client client ; > > > > private final long shutdownWaitTS=1000; > > > > @Override > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > > > executorService = new > ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS, > > new LinkedBlockingQueue<>(1000), > > new ThreadPoolExecutor.CallerRunsPolicy()); > > client= new Client(); > > } > > > > @Override > > public void close() throws Exception { > > super.close(); > > ExecutorUtils.gracefulShutdown(shutdownWaitTS, > > TimeUnit.MILLISECONDS, executorService); > > } > > > > @Override > > public void asyncInvoke(final String jsonStr, final > > ResultFuture resultFuture) { > > result = client.predict(jsonStr); > > resultFuture.complete(Collections.singletonList(result));}} > > -- > > dag构建部分为: > > AsyncFunction nlpMoaAsyncFunction = new > > SimpleAsyncFunction(); > > > > DataStream source = env.addSource(flinkKafkaConsumer010); > > > > DataStream nlpResult = AsyncDataStream.unorderedWait( > > source, > > nlpMoaAsyncFunction, > > timeout, > > TimeUnit.MILLISECONDS, > > 30); > > > > FlinkKafkaProducer010 kafkaProducer = new > > FlinkKafkaProducer010( > > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC, > > new SimpleStringSchema(), > > producerProp > > ); > > > > nlpResult.addSink(kafkaProducer); > > > > -整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm > 一个slot。 > > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题? >
?????? flink 1.6.1 RichAsyncFunction ????????????????????????????????????
??failover ??exactly-once new task new ?? ??slot?? ?? ??kafka yarn ?? TM??TM slot ??TM ?? ??[0] ??json?? ===create new task :{"business":"test","src":"test","words":" [0] 2019-10-14 17:57:47","sid":"test"} processing asr data: {"business":"test","src":"test","words":" [0] 2019-10-14 17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid, hit_details=, words= [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} ===create new task :{"business":"test","src":"test","words":" [0] 2019-10-14 17:57:47","sid":"test"} processing asr data: {"business":"test","src":"test","words":" [0] 2019-10-14 17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid, hit_details=, words= [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0} flink ?? ??kafka 1 ?? 100 ??100 -- -- ??: "Jark Wu"; : 2019??10??14??(??) 11:23 ??: "user-zh"; : Re: flink 1.6.1 RichAsyncFunction Hi, 1 failover ??Flink Kafka ??exactly once 2?? ?? ?? kafka Best, Jark On Sun, 13 Oct 2019 at 12:55, ?? <513797...@qq.com> wrote: > > > > ?? flink 1.6 ?? > > > > Function ?? > > > private static class SimpleAsyncFunction extends RichAsyncFunction String> { > private static final long serialVersionUID = 2098635244857937717L; > > private transient ExecutorService executorService; > private transient Client client ; > > private final long shutdownWaitTS=1000; > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS, > new LinkedBlockingQueue<>(1000), > new ThreadPoolExecutor.CallerRunsPolicy()); > client= new Client(); > } > > @Override > public void close() throws Exception { > super.close(); > ExecutorUtils.gracefulShutdown(shutdownWaitTS, > TimeUnit.MILLISECONDS, executorService); > } > > @Override > public void asyncInvoke(final String jsonStr, final > ResultFuture resultFuture) { > result = client.predict(jsonStr); > resultFuture.complete(Collections.singletonList(result));}} > -- > dag > AsyncFunction nlpMoaAsyncFunction = new > SimpleAsyncFunction(); > > DataStream source = env.addSource(flinkKafkaConsumer010); > > DataStream nlpResult = AsyncDataStream.unorderedWait( > source, > nlpMoaAsyncFunction, > timeout, > TimeUnit.MILLISECONDS, > 30); > > FlinkKafkaProducer010 kafkaProducer = new > FlinkKafkaProducer010( > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC, > new SimpleStringSchema(), > producerProp > ); > > nlpResult.addSink(kafkaProducer); > > -??yarn ??10??taskmanager > ,tm slot?? > bug ??