Re: 关于状态TTL

2020-04-16 文章 Benchao Li
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:

>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: 关于状态TTL

2020-04-16 文章 酷酷的浑蛋




我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a

当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 如何合并 binlog stream 和 table stream?

2020-04-16 文章 刘宇宝
总结下折腾的后续



  1.  用 InputSelectable 后 checkpointing 是不行的了,强行 checkpoint 会死锁,后续也许能用还没实现的  
unaligned checkpointing 绕过去,但还没实现完。。。
  2.  Flink 的 side input 支持也没做完。。。
  3.  如果一个 job 里某个流发消息结束关闭了,然后 flink 就不能 checkpoint 了。。。
  4.  Flink的流批统一还在进行中。。。



目前转向 Beam继续折腾,这东西支持 side input,流批统一一开始就支持了,现在在积极加 go、python 支持,也在开始搞 sql 支持了, 
Flink 把基础功能搞好点,高层 API用 Beam 算了,不过 Beam也在换高层 API,大家都在折腾️



已经用 Beam 初步达到了想要的效果,还有些细节问题待研究。







On 2020/4/10, 10:25 PM, "刘宇宝"  wrote:



抄了下 
https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestSequentialReadingStreamOperator.java
 , 可以达到串行的效果了:



   DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));

   DataStream snapshotStream = 
env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);

   DataStream tableStream = snapshotStream.connect(binlogstream);

   tableStream.transform(“Concat”, new TypeHint<….>(){},   new 
SequentialReadingStreamOperator<>());



但是如果打开 checkpointing,那么 flink 会报错:java.lang.UnsupportedOperationException: 
Checkpointing is currently not supported for operators that implement 
InputSelectable



跪了,跪了。。。





On 2020/4/7, 4:45 PM, "刘宇宝"  wrote:



你这是个新思路,分成两个 job,但是感觉不太值当,或许这里是 Flink目前 
API或者说编程模型很受限的地方,我只是源头数据来自两个地方,要合并下两个数据源,所有下游处理都是一样的。如果按照 actor 的松散模式,我是可以在两个 
SourceActor 之间协调的,一个 SourceActor 发完后,通知另一个 SourceActor 再发,或者启动一个新的 
SourceActor,大家都往同一个下游 actor 发消息。



user@flink 里那个哥们提到 InputSelectable,我还没看明白怎么能用到 DataStream上,似乎它只实现在 
StreamOperator 上:  
https://github.com/apache/flink/search?p=2=InputSelectable_q=InputSelectable



我目前想到一个笨方法,实现一个 SourceFunction,把 FlinkKafkaConsumerBase 和 
JDBCInputFormat 包到一起,这样可以先把 JDBCInputFormat 数据发完了,再发 FlinkKafkaConsumerBase。 
但是这样做只能单并发,多并发的话需要一个分布式的 barrier,flink 没有内置支持,感觉不是个优美的解决方案。



非常感谢你的解答!







On 2020/4/7, 4:29 PM, "Jark Wu"  wrote:



如果你的作业没有 state,那么 全量和增量部分,可以分成两个独立的作业。

如果你的作业有 state,主要就是设计 state 复用的问题。两个作业的拓扑结构需要确保一致(为了复用 
savepoint),一个作业的

source operator 是 jdbc,另一个 source operator 是 kafka。

当运行完 jdbc 作业后,对作业触发一次 savepoint,然后用这个 savepoint 恢复 kafka 作业,可以从 
earliest

开始读取(假设作业支持幂等)。



这里的一个问题,主要是如何对 jdbc 作业触发 savepoint,因为 jdbc InputFormat 目前是

bounded,所以读完后整个作业就结束了,就无法进行 savepoint。

所以这里可能需要自己修改下源码,让 jdbc source 永远不要结束,但通过日志或者 metric 
或其他方式通知外界数据已经读完(可以开始触发

savepoint)。



希望这些可以帮助到你。



Best.

Jark





On Tue, 7 Apr 2020 at 16:14, 刘宇宝  wrote:



> 我看了下 streamsql 那个 bootstrap 一段,没看懂怎么处理,我现在已经有 mysql table,算是 
materialized

> view 了,也有一份

> Kafka 里的 binlog 数据,问题的麻烦在于怎么「先消费 jdbc table, 消费完后,再切换到 kafka 上」,从 
flink

> 文档来看,一旦

> 我把 jdbc table 的流和 kafka 的流加入 env 里,env.execute() 
之后,两个流就同时往下游发数据了——我期望的是

> jdbc table

> 的流发完了,才开始发 kafka 的流。

>

> 谢谢!

>

> On 2020/4/7, 2:16 PM, "Jark Wu"  wrote:

>

> Hi,

>

> 你这里的合并是用join 来做么? 这样的话,会比较耗性能。

>

> 一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 
是幂等操作的,因为会有多处理一部分的

> binlog,没法做到 精确地切换到 kafka offset 上。

>

> 另外你也可以参考下 StreamSQL 的 bootstrap 的做法:

> https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar

>

> Best,

> Jark

>

>

> On Sun, 5 Apr 2020 at 22:48, 刘宇宝  wrote:

>

> > 大家好,

> >

> > 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 
mysql_server.test.tableA

> 有一个

> > topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:

> >

> >

> >   1.  先连接上 Kafka 开始消费 topic 
“mysql_server.test.tableA”,确保连接成功,记为

> > binlog-stream,但是要暂停消费 Kafka;

> >   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为

> table-stream;

> >   3.  合并两个 streams,消费完 table-stream 后再开始消费 
binlog-stream,这样可以确保

> binlog 是

> > *后*  应用到某个快照表上。

> >

> > 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态

> > startBinlog,初始值为 false:

> >

> >   binlog-stream -> waitOperator   ->   sinkOperator

> >   table-stream -> notifyOperator -> sinkOperator

> >

> > 两个流被合并输出到 sinkOperator,waitOperator() 会 while 
loop阻塞式的检查全局状态, 等

> > table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样

> binlog-stream

> > 就能被继续消费了。

> >

> > 但由于 kafka consumer 如果长期阻塞不 ack 

Re: 关于flink run -m yarn提交失败。flink1.9

2020-04-16 文章 Weihua Hu
Hi, dlguanyq

Deployment took more than 60 seconds.
——
这个日志表示已经将 Application 提交到 Yarn 了,但是 AM 一直没有启动,这一步和perjob模式或者 yarn-session 
模式关系不大。
可以用 -yd 多试几次,还是不能成功的话,需要检查下 yarn 的日志

Best
Weihua Hu

> 2020年4月15日 09:06,guanyq  写道:
> 
> 使用的是perjob模式提交作业,没有使用yarn-seesion。为什么perjob模式提交有这个-yd参数会有问题,还是没太懂。
> 在 2020-04-15 08:52:11,"tison"  写道:
>> -yd 参数影响的是你是否使用 perjob 模式提交作业,简单地说
>> 
>> with -yd 以 perjob 模式提交作业,即启动一个新集群
>> without -yd 提交到一个现有的 Flink on YARN 集群
>> 
>> 哪个是你的需求呢?有没有实现用 yarn-session 启动 Flink on YARN 集群呢?
>> 
>> Best,
>> tison.
>> 
>> 
>> guanyq  于2020年4月15日周三 上午8:46写道:
>> 
>>> 提交失败,本人测试与-yd参数有关系,这个参数去掉就可以提交了。但是不知道 -yd这个参数影响了什么?
>>> At 2020-04-14 15:31:00, "guanyq"  wrote:
 提交失败,yarn资源也还有很多,为什么会提交失败呢?
 
 提交脚本
 ./bin/flink run -m yarn-cluster \
 -ynm TestDataProcess \
 -yd \
 -yn 2 \
 -ytm 1024 \
 -yjm 1024 \
 -c com.data.processing.unconditionalacceptance.TestDataProcess \
 ./tasks/UnconditionalAcceptanceDataProcess.jar \
 
 
 yarn资源
 Apps Submitted Apps PendingApps RunningApps Completed  Containers
>>> Running  Memory Used Memory TotalMemory Reserved VCores Used
>>> VCores TotalVCores Reserved Active NodesDecommissioned Nodes
>>> Lost Nodes  Unhealthy Nodes Rebooted Nodes
 2390   12  227 173 334 GB  1.42 TB 0 B 173
>>> 288 0   9   0   0   0   0
 
 
 
 2020-04-14 15:14:19,002 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:19,253 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:19,504 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:19,755 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:20,006 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:20,257 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:20,508 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:20,759 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:21,011 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:21,262 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:21,513 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:21,764 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:22,015 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:22,265 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:22,517 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 15:14:22,768 INFO
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
>>> took more than 60 seconds. Please check if the requested resources are
>>> available in the YARN cluster
 2020-04-14 

Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 taowang
是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。


 原始邮件 
发件人: lec ssmi
收件人: flink-user-cn
发送时间: 2020年4月17日(周五) 09:25
主题: Re: 为消息分配时间戳但不想重新分配水印


请问,你对DataStream重新声明时间列和水印,生效吗? taowang  于2020年4月16日周四 
下午10:49写道: > 嗯嗯,还是十分感谢。 > 
那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > 
原始邮件 > 发件人: tison > 收件人: 
user-zh > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: 
为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文…

Re: 关于flink检查点

2020-04-16 文章 Lee Sysuke
一点个人看法:
一般业务场景下,大家都需要对流任务的错误范围有个比较确定性的认知。比如设置固定的5min周期,就可以比较确定流处理即使failover,误差也能控制在五分钟内。
但如果是自适应的间隔,负载越高周期越长,但实际failover在高负载下出现的概率应该远大于低负载,这样的设置实用价值可能就并不太大了。

half coke  于2020年4月15日周三 下午4:15写道:

> 是的,根据任务负载的变化自动调整checkpoint的间隔,或者可以通过用户写的逻辑调整检查点。
> 刚开始学习flink,想请教一下。
>
> Congxian Qiu  于2020年4月15日周三 下午12:33写道:
>
> > hi
> >
> > 你说的间隔自适应是指什么呢?是指做 checkpoint 的间隔自动调整吗?
> >
> > Best,
> > Congxian
> >
> >
> > half coke  于2020年4月15日周三 下午12:24写道:
> >
> > > 请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗?
> > >
> >
>


Re: flink cep 匹配一段时间类A,B,C事件发生

2020-04-16 文章 Peihui He
是的,这个想法好,谢谢

Dian Fu  于2020年4月16日周四 上午9:59写道:

> 类似于这样?
>
> AA follow by BB follow by CC
>
> AA定义成A or B or C
> BB定义成(A or B or C)and BB.type != AA.type
> CC定义成(A or B or C)and CC.type != AA.type and CC.type != BB.type
>
> > 在 2020年4月16日,上午8:40,Peihui He  写道:
> >
> > hello,all
> >
> >我这个边需要匹配一段时间内A,B,C事件同时发生,但是不要求A,B,C事件的顺序,flink cep有什么好的方式不?
> >
> > 有个方案是 定义多个模式组,每个模式组是A,B,C事件的一次排列组合,但是这样比较麻烦,如果事件个数多的话,需要写太多组合。
> >
> >
> > best wish
>
>


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 lec ssmi
请问,你对DataStream重新声明时间列和水印,生效吗?

taowang  于2020年4月16日周四 下午10:49写道:

> 嗯嗯,还是十分感谢。
> 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。
>
>
> 打扰各位了,祝好!~
>
>
>  原始邮件
> 发件人: tison
> 收件人: user-zh
> 发送时间: 2020年4月16日(周四) 22:39
> 主题: Re: 为消息分配时间戳但不想重新分配水印
>
>
> 正在载入邮件原文…


Re: 请问Flink-1.10.1 release可以在哪里下载?(无正文)

2020-04-16 文章 Yu Li
1.10.1还剩余最后一个blocker [1],解决之后将创建Release Candidate并启动投票,预计还需要1-2周时间,感谢关注。

Best Regards,
Yu

[1] https://issues.apache.org/jira/browse/FLINK-16662


On Thu, 16 Apr 2020 at 17:24, godfrey he  wrote:

> 目前社区已经在讨论 release-1.10.1 RC [1] 的发布
>
> [1]
>
> http://mail-archives.apache.org/mod_mbox/flink-dev/202004.mbox/%3CCAM7-19K0YsejvZpfVJrvEX6_DOJ7sUViEn9nB-5zfhX8P28_9A%40mail.gmail.com%3E
>
> Best,
> Godfrey
>
> Benchao Li  于2020年4月16日周四 下午3:06写道:
>
> > Hi,
> > Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~
> >
> > samuel@ubtrobot.com  于2020年4月16日周四
> 下午3:04写道:
> >
> > >
> > >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>


WechatIMG169

2020-04-16 文章 阿华田
在代码中给算子添加了名称,但是flink的ui上还是显示原始的名称,这种情况大佬们遇见过吗? 


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 taowang
嗯嗯,还是十分感谢。
那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。


打扰各位了,祝好!~


 原始邮件 
发件人: tison
收件人: user-zh
发送时间: 2020年4月16日(周四) 22:39
主题: Re: 为消息分配时间戳但不想重新分配水印


正在载入邮件原文…

Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 tison
从语义上说,已经有产生 Watermark 的逻辑了,如果 forward 此前的 watermark
在其他一些用户场景下或许也不合适。从另一个角度考虑你也可以把 watermark 带在 element
上,实现 AssignerWithPunctuatedWatermarks 的 Watermark
checkAndGetNextWatermark(T lastElement, long extractedTimestamp); 方法时从
element 取出来

Best,
tison.


tison  于2020年4月16日周四 下午10:36写道:

> 喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑
>
> 参考 assignTimestampsAndWatermarks
> 的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark
> 方法,应该可以实现。DataStream 方面调用更基础的 transform 方法
>
> 如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提
> https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可
>
> Best,
> tison.
>
>
> taowang  于2020年4月16日周四 下午10:12写道:
>
>> 感谢回复,但是很抱歉我试了一下发现不可以。
>> 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return
>> null`时下游算子拿到的水印都显示为`No
>> Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。
>> 看了这两个接口文档,不太理解这里的`no new watermark will be
>> generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no
>> watermark`?)。
>> @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。
>>
>>
>> 感谢帮助!
>> ```
>> public interface AssignerWithPeriodicWatermarks extends
>> TimestampAssigner {
>>
>>  /**
>>  * Returns the current watermark. This method is periodically called by
>> the
>>  * system to retrieve the current watermark. The method may return {@code
>> null} to
>>  * indicate that no new Watermark is available.
>>  *
>>  * The returned watermark will be emitted only if it is non-null and
>> its timestamp
>>  * is larger than that of the previously emitted watermark (to preserve
>> the contract of
>>  * ascending watermarks). If the current watermark is still
>>  * identical to the previous one, no progress in event time has happened
>> since
>>  * the previous call to this method. If a null value is returned, or the
>> timestamp
>>  * of the returned watermark is smaller than that of the last emitted
>> one, then no
>>  * new watermark will be generated.
>>  *
>>  * The interval in which this method is called and Watermarks are
>> generated
>>  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
>>  *
>>  * @see org.apache.flink.streaming.api.watermark.Watermark
>>  * @see ExecutionConfig#getAutoWatermarkInterval()
>>  *
>>  * @return {@code Null}, if no watermark should be emitted, or the next
>> watermark to emit.
>>  */
>>  @Nullable
>>  Watermark getCurrentWatermark();
>> }
>> ```
>>
>>
>>  原始邮件
>> 发件人: tison
>> 收件人: user-zh
>> 发送时间: 2020年4月16日(周四) 20:33
>> 主题: Re: 为消息分配时间戳但不想重新分配水印
>>
>>
>> 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用
>> AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <
>> taow...@deepglint.com> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink >
>> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 >
>> 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink >
>> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
>> > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. >
>> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
>> > > > 我现在只能使用`assignTimestampsAndWatermarks` >
>> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答!
>
>


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 tison
喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑

参考 assignTimestampsAndWatermarks
的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark
方法,应该可以实现。DataStream 方面调用更基础的 transform 方法

如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提
https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可

Best,
tison.


taowang  于2020年4月16日周四 下午10:12写道:

> 感谢回复,但是很抱歉我试了一下发现不可以。
> 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return
> null`时下游算子拿到的水印都显示为`No
> Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。
> 看了这两个接口文档,不太理解这里的`no new watermark will be
> generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no
> watermark`?)。
> @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。
>
>
> 感谢帮助!
> ```
> public interface AssignerWithPeriodicWatermarks extends
> TimestampAssigner {
>
>  /**
>  * Returns the current watermark. This method is periodically called by the
>  * system to retrieve the current watermark. The method may return {@code
> null} to
>  * indicate that no new Watermark is available.
>  *
>  * The returned watermark will be emitted only if it is non-null and
> its timestamp
>  * is larger than that of the previously emitted watermark (to preserve
> the contract of
>  * ascending watermarks). If the current watermark is still
>  * identical to the previous one, no progress in event time has happened
> since
>  * the previous call to this method. If a null value is returned, or the
> timestamp
>  * of the returned watermark is smaller than that of the last emitted one,
> then no
>  * new watermark will be generated.
>  *
>  * The interval in which this method is called and Watermarks are
> generated
>  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
>  *
>  * @see org.apache.flink.streaming.api.watermark.Watermark
>  * @see ExecutionConfig#getAutoWatermarkInterval()
>  *
>  * @return {@code Null}, if no watermark should be emitted, or the next
> watermark to emit.
>  */
>  @Nullable
>  Watermark getCurrentWatermark();
> }
> ```
>
>
>  原始邮件
> 发件人: tison
> 收件人: user-zh
> 发送时间: 2020年4月16日(周四) 20:33
> 主题: Re: 为消息分配时间戳但不想重新分配水印
>
>
> 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用
> AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <
> taow...@deepglint.com> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink >
> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 >
> 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink >
> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
> > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. >
> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
> > > > 我现在只能使用`assignTimestampsAndWatermarks` >
> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答!


Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 taowang
感谢回复,但是很抱歉我试了一下发现不可以。
无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return
 null`时下游算子拿到的水印都显示为`No 
Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。
看了这两个接口文档,不太理解这里的`no new watermark will be 
generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no watermark`?)。
@tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。


感谢帮助!
```
public interface AssignerWithPeriodicWatermarks extends TimestampAssigner 
{

 /**
 * Returns the current watermark. This method is periodically called by the
 * system to retrieve the current watermark. The method may return {@code null} 
to
 * indicate that no new Watermark is available.
 *
 * The returned watermark will be emitted only if it is non-null and its 
timestamp
 * is larger than that of the previously emitted watermark (to preserve the 
contract of
 * ascending watermarks). If the current watermark is still
 * identical to the previous one, no progress in event time has happened since
 * the previous call to this method. If a null value is returned, or the 
timestamp
 * of the returned watermark is smaller than that of the last emitted one, then 
no
 * new watermark will be generated.
 *
 * The interval in which this method is called and Watermarks are generated
 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
 *
 * @see org.apache.flink.streaming.api.watermark.Watermark
 * @see ExecutionConfig#getAutoWatermarkInterval()
 *
 * @return {@code Null}, if no watermark should be emitted, or the next 
watermark to emit.
 */
 @Nullable
 Watermark getCurrentWatermark();
}
```


 原始邮件 
发件人: tison
收件人: user-zh
发送时间: 2020年4月16日(周四) 20:33
主题: Re: 为消息分配时间戳但不想重新分配水印


在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用 
AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang 
 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink > 
stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 > 
为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink > 
stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
 > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. > 
在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
 > > > 我现在只能使用`assignTimestampsAndWatermarks` > 
去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答!

Re: 为消息分配时间戳但不想重新分配水印

2020-04-16 文章 tison
在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark
的。另外语义上使用 AssignerWithPunctuatedWatermarks 会更合适一点。

Best,
tison.


taowang  于2020年4月16日周四 下午5:13写道:

> Hello,大家好:
> 在flink
> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。
> 为了实现这个功能,我想有两种方法:
> 1. 在算子输出后面重新为消息分配水印:看到flink
> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
> `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。
> 2.
> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
>
>
> 我现在只能使用`assignTimestampsAndWatermarks`
> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗?
> 感谢解答!


回复:jdbc connector写入数据到mysql数据不一致的问题

2020-04-16 文章 111
Hi,
业余选手建议:
1 修改groupby规则,使之与数据库主键匹配,保证幂等
2 外套一层查询,绕开upsert


Best,
xinghalo


在2020年04月16日 17:46,wldd 写道:
场景:从hive读取数据计算之后写入到mysql


demo sql:
insert into data_hotel_day
select order_date,play_date,company_code,company_name,company_region,device,
cast(coalesce(sum(current_amt),0) as decimal(38,2)) current_amt,
cast(coalesce(sum(order_amt),0) as decimal(38,2)) order_amt,
coalesce(sum(room_cnt),0) room_cnt,
cast(coalesce(sum(refund_amt),0) as decimal(38,2)) refund_amt,
coalesce(sum(budget_room_cnt),0) budget_room_cnt
from db.table where plate_type='hotel'
group by order_date,play_date,company_code,company_name,company_region,device;


问题:由于jdbc connector在group by语句之后默认使用upsert sink,
但是upsert sink会从查询语句提取唯一建,通常把group by后面的字段组合作为唯一建,
因为我的场景中group by后面的字段组合并不是唯一的,这样就会造成写入到mysql
和实际查询的数据不一致,请问有什么解决办法,或者替代方案么

jdbc connector写入数据到mysql数据不一致的问题

2020-04-16 文章 wldd
场景:从hive读取数据计算之后写入到mysql


demo sql:
insert into data_hotel_day
select order_date,play_date,company_code,company_name,company_region,device,
cast(coalesce(sum(current_amt),0) as decimal(38,2)) current_amt,
cast(coalesce(sum(order_amt),0) as decimal(38,2)) order_amt,
coalesce(sum(room_cnt),0) room_cnt,
cast(coalesce(sum(refund_amt),0) as decimal(38,2)) refund_amt,
coalesce(sum(budget_room_cnt),0) budget_room_cnt
from db.table where plate_type='hotel'
group by order_date,play_date,company_code,company_name,company_region,device;


问题:由于jdbc connector在group by语句之后默认使用upsert sink,
但是upsert sink会从查询语句提取唯一建,通常把group by后面的字段组合作为唯一建,
因为我的场景中group by后面的字段组合并不是唯一的,这样就会造成写入到mysql
和实际查询的数据不一致,请问有什么解决办法,或者替代方案么

回复:订阅

2020-04-16 文章 李朋
好的,非常感谢



---原始邮件---
发件人: "Dian Fu"

Re: 订阅

2020-04-16 文章 Dian Fu
需要发邮件到:user-zh-subscr...@flink.apache.org 
来订阅user-zh

> 在 2020年4月16日,下午5:30,Dian Fu  写道:
> 
> 需要发邮件到:user-zh-subscr...@flink.apache.org来订阅user-zh
> 
>> 在 2020年4月16日,下午5:28,李朋 <1134415...@qq.com> 写道:
>> 
>> 订阅
> 



Re: 订阅

2020-04-16 文章 Dian Fu
需要发邮件到:user-zh-subscr...@flink.apache.org来订阅user-zh

> 在 2020年4月16日,下午5:28,李朋 <1134415...@qq.com> 写道:
> 
> 订阅



Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-16 文章 taowang
hello, 唐云:
看到你说从incremental checkpoint恢复后,要重新执行一次savepoint才可以。我自己试了一下,job从incremental 
checkpoint恢复之后并成功打出第一个checkpoint之后,就手动删掉了checkpoints/目录下除了新的jobid目录之外的所有的文件夹,新的job并没有失败,看起来一切正常,执行一次savepoint也没有失败。
  
是不是说,没有必要一定是savepoint,checkpoint也可以。就像job从savepoint恢复之后,当第一个checkpoint成功之后,这个savepoint就也可以删掉了。
 还是有什么场景我没有测试到?


 原始邮件 
发件人: Yun Tang
收件人: user-zh
发送时间: 2020年4月16日(周四) 17:13
主题: Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?


Hi 如果旧作业开启了incremental 
checkpoint,并从那边进行恢复的话,需要注意的是旧的checkpoint目录下的文件是不能删除的,这个是incremental 
checkpoint语义导致的,如果想要切割掉对旧目录的依赖,需要执行一次savepoint,并启动新作业从savepoint进行恢复。 祝好 唐云 
 From: zhisheng  Sent: 
Thursday, April 16, 2020 16:37 To: user-zh  Subject: 
Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs? > 如果我把任务迁移到另外一个集群,如何让它从老的集群的checkpoint 恢复 
这种是不是可以先在旧集群对作业取消的时候做一次完整的 savepoint(地址填新集群的 HDFS 地址),然后在新集群的时候从 savepoint 启动 
Yangze Guo  于2020年4月15日周三 下午3:52写道: > 1. 
flink会去读YARN_CONF_DIR or HADOOP_CONF_DIR这两个环境变量 > 2. 
我理解这和你flink运行的集群是解耦的,只要你dir的路径不变,就会从那个dir找checkpoint恢复 > > Best, > Yangze Guo > 
> On Wed, Apr 15, 2020 at 3:38 PM tao wang  wrote: > > > > 
多谢回复, 还有几个问题请教: > > 1、外部集群的hdfs-site, core-site 这些怎么配置? > > 2、另外一个角度, 
如果我把任务迁移到另外一个集群,如何让它从老的集群的checkpoint 恢复。 > > > > Yangze Guo 
 于2020年4月15日周三 下午2:44写道: > >> > >> 
checkpoint的目录设置key为state.checkpoints.dir > >> > >> 你可以这样设置 > >> 
state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/ > >> > >> > >> 
Best, > >> Yangze Guo > >> > >> On Wed, Apr 15, 2020 at 1:45 PM tao wang 
 wrote: > >> > > >> > 现在有个场景, 
集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。 > >> > > >> > 
但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。 > >> > > >> > 谢谢!! >

订阅

2020-04-16 文章 李朋
订阅

Re: 请问Flink-1.10.1 release可以在哪里下载?(无正文)

2020-04-16 文章 godfrey he
目前社区已经在讨论 release-1.10.1 RC [1] 的发布

[1]
http://mail-archives.apache.org/mod_mbox/flink-dev/202004.mbox/%3CCAM7-19K0YsejvZpfVJrvEX6_DOJ7sUViEn9nB-5zfhX8P28_9A%40mail.gmail.com%3E

Best,
Godfrey

Benchao Li  于2020年4月16日周四 下午3:06写道:

> Hi,
> Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~
>
> samuel@ubtrobot.com  于2020年4月16日周四 下午3:04写道:
>
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-16 文章 Yun Tang
Hi

如果旧作业开启了incremental 
checkpoint,并从那边进行恢复的话,需要注意的是旧的checkpoint目录下的文件是不能删除的,这个是incremental 
checkpoint语义导致的,如果想要切割掉对旧目录的依赖,需要执行一次savepoint,并启动新作业从savepoint进行恢复。

祝好
唐云

From: zhisheng 
Sent: Thursday, April 16, 2020 16:37
To: user-zh 
Subject: Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

> 如果我把任务迁移到另外一个集群,如何让它从老的集群的checkpoint 恢复

这种是不是可以先在旧集群对作业取消的时候做一次完整的 savepoint(地址填新集群的 HDFS 地址),然后在新集群的时候从 savepoint
启动

Yangze Guo  于2020年4月15日周三 下午3:52写道:

> 1. flink会去读YARN_CONF_DIR or HADOOP_CONF_DIR这两个环境变量
> 2. 我理解这和你flink运行的集群是解耦的,只要你dir的路径不变,就会从那个dir找checkpoint恢复
>
> Best,
> Yangze Guo
>
> On Wed, Apr 15, 2020 at 3:38 PM tao wang  wrote:
> >
> > 多谢回复, 还有几个问题请教:
> > 1、外部集群的hdfs-site, core-site 这些怎么配置?
> > 2、另外一个角度, 如果我把任务迁移到另外一个集群,如何让它从老的集群的checkpoint 恢复。
> >
> > Yangze Guo  于2020年4月15日周三 下午2:44写道:
> >>
> >> checkpoint的目录设置key为state.checkpoints.dir
> >>
> >> 你可以这样设置
> >> state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/
> >>
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Wed, Apr 15, 2020 at 1:45 PM tao wang  wrote:
> >> >
> >> > 现在有个场景, 集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。
> >> >
> >> > 但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。
> >> >
> >> > 谢谢!!
>


为消息分配时间戳但不想重新分配水印

2020-04-16 文章 taowang
Hello,大家好:
在flink stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。
为了实现这个功能,我想有两种方法:
1. 在算子输出后面重新为消息分配水印:看到flink 
stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
 `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。
2. 
在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。


我现在只能使用`assignTimestampsAndWatermarks` 
去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗?
感谢解答!

(无主题)

2020-04-16 文章 李朋


Re: flink java.util.concurrent.TimeoutException

2020-04-16 文章 zhisheng
检查一下这个 TM 的 GC 次数和时间吧

Yangze Guo  于2020年4月15日周三 下午3:03写道:

> 日志上看是Taskmanager心跳超时了,如果tm还在,是不是网络问题呢?尝试把heartbeat.timeout调大一些试试?
>
> Best,
> Yangze Guo
>
> On Mon, Apr 13, 2020 at 10:40 AM 欧阳苗  wrote:
> >
> >
> job运行了两天就挂了,然后抛出如下异常,但是taskManager没有挂,其他的job还能正常在上面跑,请问这个问题是什么原因导致的,有什么好的解决办法吗
> >
> >
> > 2020-04-13 06:20:31.379 ERROR 1 --- [ent-IO-thread-3]
> org.apache.flink.runtime.rest.RestClient.parseResponse:393 : Received
> response was neither of the expected type ([simple type, class
> org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody])
> nor an error.
> Response=JsonResponse{json={"status":{"id":"COMPLETED"},"job-execution-result":{"id":"2d2a0b4efc8c3d973e2e9490b7b3b2f1","application-status":"FAILED","accumulator-results":{},"net-runtime":217272900,"failure-cause":{"class":"java.util.concurrent.TimeoutException","stack-trace":"java.util.concurrent.TimeoutException:
> Heartbeat of TaskManager with id 0a4ea651244982ef4b4b7092d18de776 timed
> out.\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1656)\n\tat
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)\n\tat
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)\n\tat
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)\n\tat
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)\n\tat
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
> 

Re: FlinkSQL构建流式应用checkpoint设置

2020-04-16 文章 zhisheng
也就是说这种 sql cli 作业启动后如果 kill 掉的时候,再次重启的话是不能够从 savepoint 或者 chekcpoint 恢复是吗?

godfrey he  于2020年4月15日周三 下午4:32写道:

> Hi Even,
>
> 1. 目前 SQL CLI 支持通过在 sql-client-default.yaml 里设置 parallelism
> 和 max-parallelism 来控制 CLI 任务的默认并发。或者通过 set 命令,如 set
> execution.parallelism=10;放方式动态设置。例外,对于如果使用 blink
> planner,可以用 table.exec.resource.default-parallelism 来配置默认并发。[1]
> 另外 SQL CLI 还不支持 checkpoint 的设置。
> 2. 目前 SQL CLI 默认是 in-memory catalog,在每个SQL CLI的独立进程中,不会共享。如果SQL
> CLI挂掉,in-memory catalog 也会消失。你可以配置你的catalog为 hive catalog [1], 这样你创建的表会持久化到
> hive catalog 中,多个SQL CLI使用同一个hive catalog,可以达到你说期望的共享。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#environment-files
>
> Best,
> Godfrey
>
> Even <452232...@qq.com> 于2020年4月15日周三 下午3:35写道:
>
> > Hi!
> > 请教两个问题:
> > 1、 Flink SQL CLI 纯文本方式构建一个流式应用,在DDL语句中如何设置checkpoint和并行度这些参数?
> > 2、 Flink SQL CLI
> >
> 纯文本方式构建的流式应用创建的那些表,我在另外一个CLI中是无法找到这些table的,这是为什么?如果任务挂掉了,应该怎么重启,还是必须重新再构建?
>


Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题

2020-04-16 文章 Arnold Zai
如果选择ingressTime就应该默认结果不一致了,即使看上去是一致的:
1.无法保证每个流的网络IO一致(那就默认为绝对不一致), 这样数据ingress time生产就会不一致。
2.系统资源导致ingress time不一致
因此不太建议依赖ingress 一致。 `这一点golang/map的处理,比较赞成的, 既然不确保有序那就主动打乱它`

或许可以自己维护一个计数器,在source中添加自定义event-time.来模拟ingress time(根据eps计算下counter增长策略)

xue...@outlook.com  于2020年4月16日周四 上午10:57写道:

> 双流join涉及的问题我罗列完整一下:
> 前提:
> 假设有两个流
> 其中一个流aStream非常庞大,基于时间单位是秒,源源不断的生产数据
>
> 另外一个流bStream也是很大,是不太可能基于内存做维度数据全局缓冲或者LRU淘汰,因为aStream使用bStream足够分散和随机,基于时间单位是天或者月,会持续不断的变化,部分数据或者长期不变
> 影响流的因子:
>
> 1、 系统集群资源,主要是内存
>
> 2、流速
>
> 3、不同流数据变化的时间单位不一致
>
> 4、同一流内数据变化的时间单位不一致
> 目标:
> 因两个流的数据都原样的保留下来,重算时,要保持每次运算结果是一致的
>
> 对于操作bStream.join(aStream).windows().apply()
> 如果是基于eventTime的问题是
> 对于aStream可以按照每个时间窗口处理数据,合适的随着时间的流式划分窗口处理,
>
> 但对于bStream,因每个部分的数据的有效时间范围不同,bStream的数据是长期驻留在state,还是超过window就被淘汰,如果是被淘汰,那么计算结果肯定有问题,即aStream中的数据肯定从业务上可以匹配到bStream中的数据
>
> 如果是基于ingressTime的问题是
>
> aStream和bStream都受系统运行环境的影响,但如果有办法对于aStream在任何一个window中的数据都能匹配到bStream的数据,肯定没有问题
>
> 那么剩下的关键问题就是:
>
> A、对于aStream在windows中的数据如何一定匹配到bStream中的数据
>
> B、对于bStream中的数据每条数据的可用时间范围是变化的,如何保持更新
>
> 发送自 Windows 10 版邮件应用
>
> 发件人: Benchao Li
> 发送时间: 2020年4月16日 10:21
> 收件人: user-zh
> 主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题
>
> 我感觉双流join如果要保证结果是一致的,需要用事件时间,而不是处理时间或者是摄入时间。
> 如果可能,建议尝试下基于事件时间的双流join。
>
> xue...@outlook.com  于2020年4月16日周四 上午9:15写道:
>
> > 双流join对数据和环境的要求很严格,这样适用的业务场景就会有限。没有办法保证环境和数据一致是严格符合的
> >
> > 发送自 Windows 10 版邮件应用
> >
> > 发件人: tison
> > 发送时间: 2020年4月15日 22:26
> > 收件人: user-zh
> > 主题: Re: 双流Join 基于IngressTime计算后在按照Keyby sum聚集后每次计算结果不一致的问题
> >
> > FYI
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamp_extractors.html
> >
> > IngestionTime 的时间基准是进入 Flink 系统即 Source 算子生产数据时附上的时间戳,你后面有
> > window/join,这个时间的间隔不是确定性的,大数据量下因为 GC 等原因有不同延迟从而导致 Window 划分不一致是正常的。即使使用
> > EventTime,在 Watermark
> > 不够可靠的情况下也有可能不一致,足够可靠又可能受到滞后数据的影响损失性能并占用内存。可以看看上面文档的相关内容。
> >
> > Best,
> > tison.
> >
> >
> > tison  于2020年4月15日周三 下午10:18写道:
> >
> > > IngestionTime 多次运行结果不一样很正常啊,试试 event time?
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > xuefli  于2020年4月15日周三 下午10:10写道:
> > >
> > >> 遇到一个非常头痛的问题
> > >>
> > >> Flink1.10的集群,用hdfs做backend
> > >>
> > >> 一个流aStream准备了10亿的数据,另外一个流bStream百万
> > >> 如果如下操作
> > >>
> > >> 我遇到一个问题 双流Join
> >
> 带windows,使用IngressTime,在一个流的数据限制在几十万级别时,每次重算结果一致。但数据量换成10亿级别,另外一个流不便。在同样的情况,多次运行,每次运行结果不一样,我抽样一个特定的数据的结果每次不同
> > 。 aStream.join(bStream) -->windows-->apply(flatMap)得到cStream后
> > 再对cStream进行keyBy-->timeWindow-->sum.
> > >> 我用fromcollection也是没有问题,但数据量很大时,就结果不对,每次运行的结果都有差异。
> > >> 用dataStream.fromcollection这样的更小的数据也是没有问题,每次重算
> > >> 但数据量很大时,就会这样。
> > >>
> > >>
> > >> 每次计算的结果不一样,这个对业务系统挑战巨大
> > >>
> > >>
> > >> 发送自 Windows 10 版邮件应用
> > >>
> > >>
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


回复: 关于状态TTL

2020-04-16 文章 酷酷的浑蛋
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a

当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 请问Flink-1.10.1 release可以在哪里下载?(无正文)

2020-04-16 文章 Benchao Li
Hi,
Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~

samuel@ubtrobot.com  于2020年4月16日周四 下午3:04写道:

>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


请问Flink-1.10.1 release可以在哪里下载?(无正文)

2020-04-16 文章 samuel....@ubtrobot.com