group.id更改,通过savepoint启动的Flink任务,Kafka consumer是否仍然可以获取到保存在状态中的start position?

2019-10-14 文章 justskinny
Hi,all根据文档,如果从checkpoint或者savepoint中恢复任务,则Kafka Consumer会使用状态中的start 
position。
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
请问通过savepoint恢复的任务,如果group.id更改,Kafka consumer是否仍然可以获取到保存在状态中的start 
position?

?????? flink 1.6.1 RichAsyncFunction ????????????????????????????????????

2019-10-14 文章 ??????
 sorry
?? 





??task??
private static class MoaTask implements Callable {

private String jsonStr;
private JudgeClient client;

public MoaTask(String json, JudgeClient client) {
this.jsonStr = json;
this.client= client;
System.out.println("===create new task :" + json);
}
@Override
public String call() throws Exception {

JSONObject jsonObject = JSON.parseObject(jsonStr);
String business = jsonObject.getString("business");
System.out.println("processing asr data: " + jsonStr);
String result = this.client.predict(jsonObject,"test");
return result;
}
}??private static class SimpleAsyncFunction extends 
RichAsyncFunction {
private static final long serialVersionUID = 2098635244857937717L;
private transient ExecutorService executorService;
private  transient JudgeClient moaJudgeClient ;
private final long shutdownWaitTS=1000;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = new ThreadPoolExecutor(10,30,60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
moaJudgeClient = new JudgeClient();
}
@Override
public void close() throws Exception {
super.close();
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, 
executorService);
}
@Override
public void asyncInvoke(final String jsonStr, final ResultFuture 
resultFuture) {
Future future = executorService.submit(new 
MoaTask(jsonStr,moaJudgeClient));
CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
try{
return future.get();

}catch (Exception e){
return null;
}
}
}).thenAccept((result)->{
resultFuture.complete(Collections.singleton(result));
});
}
}AsyncFunction nlpMoaAsyncFunction = new 
SimpleAsyncFunction();
DataStream source = env.addSource(flinkKafkaConsumer010);
DataStream nlpResult = AsyncDataStream.unorderedWait(
source,
nlpMoaAsyncFunction,
timeout,
TimeUnit.MILLISECONDS,
30).setParallelism(2);
FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010(
Constant.topic2,
new SimpleStringSchema(),
producerProp
);
nlpResult.addSink(kafkaProducer);


??






--  --
??: "Kurt Young";
: 2019??10??14??(??) 6:14
??: "user-zh";

: Re: flink 1.6.1 RichAsyncFunction 



??
ps

Best,
Kurt


On Mon, Oct 14, 2019 at 12:11 PM ?? <513797...@qq.com> wrote:

>
> ??failover 
> ??exactly-once
> new task new 
> ??
> ??slot??
>
> ??
> ??kafka yarn ??  TM??TM 
> slot
>
> ??TM ?? ??[0]  
> ??json??
>
> ===create new task :{"business":"test","src":"test","words":" 
> [0] 2019-10-14 17:57:47","sid":"test"}
> processing asr data: 
> {"business":"test","src":"test","words":" [0] 2019-10-14 
> 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=, 
> words= [0] 2019-10-14 17:57:47, status=1, 
> hit_logic=basePoliticsWords}, em=OK, ec=0}
> ===create new task :{"business":"test","src":"test","words":" 
> [0] 2019-10-14 17:57:47","sid":"test"}
> processing asr data: 
> {"business":"test","src":"test","words":" [0] 2019-10-14 
> 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=, 
> words= [0] 2019-10-14 17:57:47, status=1, 
> hit_logic=basePoliticsWords}, em=OK, ec=0}
>
>
>
> flink  ?? 
> ??kafka 1
> ??
>
> 100 
> ??100
> 
>
>
>
>
>
> --  --
> *??:* "Jark Wu";
> *:* 2019??10??14??(??) 11:23
> *??:* "user-zh";
> *:* Re: flink 1.6.1 RichAsyncFunction 
>
> Hi,
>
> 
> 1 failover ??Flink  Kafka ??exactly 
> once
> 2?? 

Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复

2019-10-14 文章 Kurt Young
这位朋友你的测试数据文明一点呀。。
ps:图片看不到

Best,
Kurt


On Mon, Oct 14, 2019 at 12:11 PM 曾耀武 <513797...@qq.com> wrote:

>
> 作业是没有failover 。执行是正常的。但是官方文档上不是说那个是保证exactly-once么?
> 我稍微改了一下代码,还是有同样的重复问题,好像在new task 的时候被重复new 了多次。这个不知道是为何,
> 而且是在同一个slot里面。
>
> 异步函数:
> 然后我在执行的时候,往kafka 打了一条数据,在yarn 上起了两个  TM,每个TM 一个slot
>
> 然后在一个TM 上打印了这个日志: 我的日志是有编号的[0]  这个,一条输入是一个需要外部检测的json。
>
> ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 
> 2019-10-14 17:57:47","sid":"test"}
> processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 
> [0] 2019-10-14 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 
> [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0}
> ===create new task :{"business":"test","src":"test","words":"操你妈的傻逼卧槽 [0] 
> 2019-10-14 17:57:47","sid":"test"}
> processing asr data: {"business":"test","src":"test","words":"操你妈的傻逼卧槽 
> [0] 2019-10-14 17:57:47","sid":"test"}
> === the moa resonse is: {result={s_id=userid, hit_details=操你, words=操你妈的傻逼卧槽 
> [0] 2019-10-14 17:57:47, status=1, hit_logic=basePoliticsWords}, em=OK, ec=0}
>
>
>
> 有时候这些记录在flink 里经过处理之后, 日志打印了两边,好像是处理了两遍,但是 写到下游的kafka 可能只有1个。
> 理论期望值是一条输入对应一条输出。
>
> 但是在一百万的测试数据上,上游发送肯定是只发送了100万条,带编号递增的。 但是下游去重之后是100万,不去重就有一百多万,可能
> 会多好几万的样子。比较奇怪。
>
>
>
>
>
> -- 原始邮件 --
> *发件人:* "Jark Wu";
> *发送时间:* 2019年10月14日(星期一) 中午11:23
> *收件人:* "user-zh";
> *主题:* Re: flink 1.6.1 RichAsyncFunction 异步请求外部系统导致下游数据大量重复
>
> Hi,
>
> 从代码上看,并没有发现什么问题,给一些排查建议:
> 1)作业在这期间有 failover 过吗?Flink 写入 Kafka 目前默认不是端到端exactly once的。
> 2) “重复几十条”是怎么判断出来的呢? 会不会 kafka 源中的数据有重复呢?
>
> Best,
> Jark
>
> On Sun, 13 Oct 2019 at 12:55, 曾耀武 <513797...@qq.com> wrote:
>
> > 大家好,
> >
> >
> > 请教大家一个问题, 我在使用flink 1.6 版本去用异步的方式
> > 访问外部系统的时候发现下游的数据,本来应该只有一条的,结果被重复几十条。
> >
> >
> > 我的异步Function 片段如下。
> >
> >
> > private static class SimpleAsyncFunction extends
> RichAsyncFunction > String> {
> > private static final long serialVersionUID = 2098635244857937717L;
> >
> > private transient ExecutorService executorService;
> > private  transient Client client ;
> >
> > private final long shutdownWaitTS=1000;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > super.open(parameters);
> >
> > executorService = new
> ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
> > new LinkedBlockingQueue<>(1000),
> > new ThreadPoolExecutor.CallerRunsPolicy());
> >  client= new Client();
> > }
> >
> > @Override
> > public void close() throws Exception {
> > super.close();
> > ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> > TimeUnit.MILLISECONDS, executorService);
> > }
> >
> > @Override
> > public void asyncInvoke(final String jsonStr, final
> > ResultFuture resultFuture) {
> >   result = client.predict(jsonStr);
> > resultFuture.complete(Collections.singletonList(result));}}
> > --
> > dag构建部分为:
> > AsyncFunction nlpMoaAsyncFunction = new
> > SimpleAsyncFunction();
> >
> > DataStream source = env.addSource(flinkKafkaConsumer010);
> >
> > DataStream nlpResult = AsyncDataStream.unorderedWait(
> > source,
> > nlpMoaAsyncFunction,
> > timeout,
> > TimeUnit.MILLISECONDS,
> > 30);
> >
> > FlinkKafkaProducer010 kafkaProducer = new
> > FlinkKafkaProducer010(
> > ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
> > new SimpleStringSchema(),
> > producerProp
> > );
> >
> > nlpResult.addSink(kafkaProducer);
> >
> > -整个逻辑其实很简单,程序运行在yarn 上10个taskmanager ,每个tm
> 一个slot。
> > 有没有哪个碰到类似的问题,还是是bug 或者是我的实现有问题?
>


?????? flink 1.6.1 RichAsyncFunction ????????????????????????????????????

2019-10-14 文章 ??????
??failover 
??exactly-once
new task new 
??
??slot??



??

??kafka yarn ??  TM??TM slot


??TM ?? ??[0]  
??json??


===create new task :{"business":"test","src":"test","words":" 
[0] 2019-10-14 17:57:47","sid":"test"} processing asr data: 
{"business":"test","src":"test","words":" [0] 2019-10-14 
17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid, 
hit_details=, words= [0] 2019-10-14 17:57:47, status=1, 
hit_logic=basePoliticsWords}, em=OK, ec=0} ===create new task 
:{"business":"test","src":"test","words":" [0] 2019-10-14 
17:57:47","sid":"test"} processing asr data: 
{"business":"test","src":"test","words":" [0] 2019-10-14 
17:57:47","sid":"test"} === the moa resonse is: {result={s_id=userid, 
hit_details=, words= [0] 2019-10-14 17:57:47, status=1, 
hit_logic=basePoliticsWords}, em=OK, ec=0}




flink  ?? 
??kafka 1
??


100 
??100











--  --
??: "Jark Wu";
: 2019??10??14??(??) 11:23
??: "user-zh";

: Re: flink 1.6.1 RichAsyncFunction 



Hi,


1 failover ??Flink  Kafka ??exactly 
once
2?? ?? ?? kafka 

Best,
Jark

On Sun, 13 Oct 2019 at 12:55, ?? <513797...@qq.com> wrote:

> 
>
>
> ?? flink 1.6 ??
> 
>
>
> Function ??
>
>
> private static class SimpleAsyncFunction extends RichAsyncFunction String> {
> private static final long serialVersionUID = 2098635244857937717L;
>
> private transient ExecutorService executorService;
> private  transient Client client ;
>
> private final long shutdownWaitTS=1000;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
>
> executorService = new ThreadPoolExecutor(10,30,60,TimeUnit.SECONDS,
> new LinkedBlockingQueue<>(1000),
> new ThreadPoolExecutor.CallerRunsPolicy());
>  client= new Client();
> }
>
> @Override
> public void close() throws Exception {
> super.close();
> ExecutorUtils.gracefulShutdown(shutdownWaitTS,
> TimeUnit.MILLISECONDS, executorService);
> }
>
> @Override
> public void asyncInvoke(final String jsonStr, final
> ResultFuture resultFuture) {
>   result = client.predict(jsonStr);
> resultFuture.complete(Collections.singletonList(result));}}
> --
> dag
> AsyncFunction nlpMoaAsyncFunction = new
> SimpleAsyncFunction();
>
> DataStream source = env.addSource(flinkKafkaConsumer010);
>
> DataStream nlpResult = AsyncDataStream.unorderedWait(
> source,
> nlpMoaAsyncFunction,
> timeout,
> TimeUnit.MILLISECONDS,
> 30);
>
> FlinkKafkaProducer010 kafkaProducer = new
> FlinkKafkaProducer010(
> ImmutableConstant.NLP_RESULT_KAFKA_TOPIC,
> new SimpleStringSchema(),
> producerProp
> );
>
> nlpResult.addSink(kafkaProducer);
>
> -??yarn ??10??taskmanager 
> ,tm slot??
> bug ??