Re: Re:FlinkSQL join 维表后一定会变成 upsert流吗?

2021-06-14 文章 LakeShen
维保 Join 理论上不会改变流的模式,我理解原来你的流是什么,就是什么。

Best,
LakeShen

WeiXubin <18925434...@163.com> 于2021年6月10日周四 下午5:46写道:

> 感谢你的回答,我这边看了官网目前 join 一共可以分为 Regular Joins 、 Interval Joins 以及 Temporal
> Joins
> 三大类。 我上面问题所述的确是采用了 Regular Joins 的方式。 之后我也尝试使用了 Lookup Join 但发现其最后也是转为
> INSERT INTO ON DUPLICATE KEY UPDATE  的执行语句, 并不是我所期望的纯 append 模式
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Task Container 被Kill, Managed memory使用情况查看

2021-06-14 文章 LakeShen
Hi Jason,

可以把 rocksdb statebackend 相关监控指标开启,然后结合指标看看。
具体参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#rocksdb-native-metrics
.

Best,
LakeShen

Jason Lee  于2021年6月11日周五 上午11:19写道:

>
>
> 各位社区的伙伴大家好
>
>
> 目前我们在使用Flink SQL 开发任务过程中遇到一个问题,有比较大状态的任务在运行一段时间后Task
> Container会由于使用申请内存过多被Yarn集群Kill掉。
>
>
> 针对这个问题我们任务可能是在Checkpoint时候状态过大引起的,因此我们调整了State
> ttl,也是增量Checkpoint,之后还是会出现类似情况,我们只能通过增加并发和内存来保证任务运行,但是这回造成了很大的资源浪费,因为平时查看任务的堆内存使用并不多,所以我们在考虑是不是Managed
> memory不足导致的,因为Managed memory 负责RocksDB, 我们想确定一下是不是Managed memory不足导致的任务异常。
>
>
> 但是现在通过Flink Web UI界面查看不到Managed memory的使用情况,所以请教一下社区小伙伴有没有好的方式查看Managed
> memory的使用情况,或者有没有遇到类Tm container 被kill的情况有没有好的解决方法,感谢大家,希望一起交流
>
>
> Best,
> Jason
> | |
> Jason Lee1781
> |
> |
> jasonlee1...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: sink端处理数据很慢

2021-06-09 文章 LakeShen
看下最近是不是流量变大了,以及看下 Sink 的外部存储的集群压力是不是很大。

Best,
LakeShen

田磊  于2021年6月10日周四 上午11:36写道:

> 好的,谢谢,我看看。
>
>
> | |
> totorobabyfans
> |
> |
> 邮箱:totorobabyf...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2021年06月10日 10:50,Lin Li 写道:
> 你好,如果之前一直运行正常,建议检查下 sink 节点慢的原因(io 瓶颈、异常/ 节点 gc 之类的),前面的 map
> 节点应该是被反压导致停滞,可以通过 backpressure tab 确认下
>
> 田磊  于2021年6月9日周三 下午10:39写道:
>
> >
> >
> 提交任务后,通过flink的webui界面看,中间的map算子处理速度很快,13万条数据已经处理。但是sink端只处理了几千条数据,这个时候map端的处理也停滞了,不知道什么原因。map并行度8,sink并行度1。之前也是这样的并行度,并没有出现类似的情况。
> >
> >
> > | |
> > totorobabyfans
> > |
> > |
> > 邮箱:totorobabyf...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>


Re: 【问题分析】Fink任务无限反压

2021-06-07 文章 LakeShen
你可以先结合你的任务逻辑,以及 Flink  Web UI 反压监控,看看到底是什么地方引起反压。
一般 source 反压,是下游的算子一直反压到 source。可以看看那个算子引起

Best,
LakeShen


yidan zhao  于2021年6月8日周二 上午10:28写道:

> 该任务有时候正常,偶尔反压。
> 最近观察发现,反压时,kafkaSouce节点100%反压到停滞,后续算子什么也收不到,任务整体停滞。
>
> 这类错误遇到过很多次了,目前我生产中flink有个很大问题就是这些稳定性,压力大不是需要时间去追赶,而是压力一大就整体处于停滞状态。
>


Re: 如何获取flink sql的血缘关系?

2021-06-07 文章 LakeShen
一种方法就是借助 Flink SQL Parser,解析你的 SQL,然后获取到不同的 SQL node,
然后每个 SQL Node 都有对应的类型,以及 connector 后面的 with 参数,你需要自己在
写代码判定一下即可。本质是通过解析 SQL,来获取血缘关系。

Best,
LakeShen

casel.chen  于2021年6月8日周二 上午12:05写道:

> 如何获取flink sql的血缘关系?如:表A -> 表B。有代码示例吗?谢谢!


Re: Flink 1.11.2 SQL消费kafka写Hive报错

2021-05-31 文章 LakeShen
Hi Jacob,

Maybe you miss the kafka connector dependency in your pom,
you could refer to this url :
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
LakeShen

Jacob <17691150...@163.com> 于2021年6月1日周二 上午9:54写道:

> Dear All,
>
> 我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下
>
> 其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。
> 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:
>
> 请指教
>
> *Java Code*
>
> TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS
> user_behavior_kafka_table");
> tableResult.print();
> TableResult tableResult2 = tableEnvironment.executeSql(
> "CREATE TABLE user_behavior_kafka_table
> (\r\n" +
> "   `user_id` STRING,\r\n" +
> "   `item_id` STRING\r\n" +
> " ) WITH (\r\n" +
> "   'connector' = 'kafka',\r\n" +
> "   'topic' = 'TestTopic',\r\n" +
> "   'properties.bootstrap.servers' =
> 'localhost:9092',\r\n" +
> "   'properties.group.id' =
> 'consumerTest',\r\n" +
> "   'scan.startup.mode' =
> 'earliest-offset',\r\n" +
> "   'format' = 'json'\r\n" +
> ")");
> tableResult2.print();
>
>
> // 数据写入
> tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnvironment.executeSql(
> "INSERT INTO user_behavior_hive_table SELECT user_id,
> item_id FROM user_behavior_kafka_table");
>
>
> *POM File*
>
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
>
> 
> org.apache.flink
> flink-streaming-java_2.11
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-clients_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
>
> flink-table-api-java-bridge_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> provided
> 
>
> 
> org.apache.flink
>
> flink-sql-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
>
> 
> org.apache.flink
> flink-shaded-hadoop-2-uber
> 2.7.5-10.0
> provided
> 
>
> 
> org.apache.flink
> flink-connector-hive_2.11
> ${flink.version}
> provided
> 
>
> 
> org.apache.hive
> hive-exec
> ${hive.version}
> provided
> 
>
>
> *Error Messge*
>
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
> a
> connector using option ''connector'='kafka''.
> at
>
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81)
> ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ~[flink-table-blink_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.table.planner.plan.schema

Re: 回复:Flink sql的state ttl设置

2021-05-31 文章 LakeShen
或许你可以参考这个:
[image: image.png]

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/

Best,
LakeShen

chenchencc <1353637...@qq.com> 于2021年5月28日周五 下午4:30写道:

> 想问下state ttl能针对单表设置吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: rocksdb状态后端最多保留checkpoints问题

2021-05-31 文章 LakeShen
在增量 checkpoint 下,你可以简单理解状态几乎都存在 checkpoint 目录中的 shared 目录,
所以即使清理 checkpoint,也只是先将这次 checkpoint 引用的相关文件句柄的引用数减1,
只有一个文件没有 checkpoint 引用它时,才会真正删除该文件。

Best,
LakeShen.

刘建刚  于2021年5月28日周五 下午7:03写道:

> 增量快照的原理是sst文件共享,系统会自动帮助你管理sst文件的引用,类似java的引用,并不会因为一个快照删除了就会把实际的数据删除掉。
> 也就不会发生你说的情况
>
> tison  于2021年5月28日周五 上午1:47写道:
>
> > rocksdb 增量 checkpoint 不是你这么理解的,总的不会恢复不了。原因可以参考下面的材料
> >
> > -
> >
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> > 官方 blog 介绍
> > - https://www.bilibili.com/video/BV1db411e7x2 施博士的介绍,大概 24 分钟开始讲
> >
> > Best,
> > tison.
> >
> >
> > casel.chen  于2021年5月27日周四 下午11:35写道:
> >
> > > 作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb
> > >
> >
> state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?
> >
>


Re: 流与流 left join

2021-05-31 文章 LakeShen
Hi,或许 Flink SQL  interval join 能够满足你的需求。

Best,
LakeShen.

Shuo Cheng  于2021年5月31日周一 下午12:10写道:

> state ttl 只能是全局算子维度, table.exec.state.ttl
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 求教:动态字段的处理

2021-05-31 文章 LakeShen
看下你的 Flink 版本是多少,如果是高版本的话,社区有提供 DataStream 的 HBase Sink。

Best,
LakeShen.

Zorro  于2021年5月31日周一 下午2:41写道:

> 由于你的DDL是变化的,无法提前预知所有字段,所以首先可以确定的是这个场景无法使用Flink SQL解决。
>
> 如果使用DataStream解决的话是可行的,唯一可能存在的问题就是目前社区没有提供DataStream的HBase sink。
> 如果你需要在DataStream中使用HBase sink的话,可能需要你自定义一个HBase sink或者基于社区的HBase SQL
> connector做一些更改。不过这些更改是需要在Java代码层面的。
>
> 至于其他的处理逻辑可以用pyFlink很方便的改写。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?

2021-05-26 文章 LakeShen
Hi,
 集群重启,具体是指什么重启呢,这个能在描述详细一点吗?

Best,
LakeShen

datayangl  于2021年5月26日周三 上午9:43写道:

> FixedDelaStrategy 默认是从最近一个ck
> 恢复,其他的策略可以看官网。如果你是想问怎么实现的,不建议在邮件列表里问实现原理的问题。可以google找相关文章、相关flip 或者
> 直接debug源码。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于savepoint恢复问题咨询

2021-05-26 文章 LakeShen
看下你的 flink 命令对不对,然后去 Flink Web UI  Checkpoint 界面,看下是否从 Savepoint 恢复(下面有个
restore path).
之后再看下你的窗口时间类型用的是什么。

Best,
LakeShen

王春浩  于2021年5月27日周四 上午9:26写道:

> hi, 社区
> ​
> 版本flink 1.7
> ​
>
> 我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。
> ​
> 我使用rocksdb和启用检查点。
> ​
> 现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s
> {savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。
> ​
> 我不知道为什么有些数据似乎会丢失?
> ​
> 日志显示``No restore state for FlinkKafkaConsumer''
> ​
> ​​
> ​
> 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
> 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd
> Street, GaoXin District, Chengdu, Sichuan Province
> Mobile +86 15817382279
> Email  wangchun...@navercorp.com
>
> NCloud
>
> -Original Message-
> From: "王春浩"
> To: ;
> Cc:
> Sent: 2021/5/26周三 17:03 (GMT+08:00)
> Subject: inquire about restore from savepoint
>
> Hi Community,
> ​
> version flink 1.7
> im trying to make a flink job restore from a savepoint(or checkpoint),
> what the job do is reading from kafka -> do a 30-minutes-window
> aggregation(just AggregationFunction, acts like a counter) -> sink to kafka.
> i use rocksdb and enabled checkpoint.
> now i try to trigger a savepoint manually. the expected value of each
> aggregated one is 30(1 data/per minute). but when i restore from a
> savepoint(flink run -d -s {savepoint's url}), the aggregated value is not
> 30(less than 30, depends on the time i cancel flink job and restore). but
> when the job run normally, it gets 30.
> i don't know why could some data seems to be lost?
> and a log shows "No restore state for FlinkKafkaConsumer"​
> ​
> ​
> ​
> 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
> 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd
> Street, GaoXin District, Chengdu, Sichuan Province
> Mobile +86 15817382279
> Email  wangchun...@navercorp.com
>
> NCloud
>


Re: 关于 flinksql 维表的问题

2021-05-23 文章 LakeShen
实现肯定可以实现,不过这个周期性加载 Mysql 并更新的 Cache 的功能,可能需要你自己定制化开发下。

Best,
LakeShen

WeiXubin <18925434...@163.com> 于2021年5月22日周六 下午4:09写道:

> 我想实现将MySQL中的 A 表数据预先查询出来进行缓存,用于给流表 B 进行 join关联。接下来定时查询并更新 A 表内的缓存数据,请问目前
> FlinkSQL 可以实现吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 文章 LakeShen
Hi fanrui,

thank you so much!

Best,
LakeShen


范瑞 <836961...@qq.com> 于2021年4月1日周四 下午7:36写道:

> Hi Lake:
>
>
> 目前的 Flink 版本应该都是不支持的,也就是说换了 StateBackend 不能正常恢复。Flink 1.13
> 做了这个事情,具体参考:FLIP41 和 FLINK-20976
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>
>
> https://issues.apache.org/jira/browse/FLINK-2097
>
>
> Best,
> fanrui
>
> ---原始邮件---
> 发件人: "LakeShen" 发送时间: 2021年4月1日(周四) 晚上7:16
> 收件人: "user-zh" 主题: FS StateBackend 到 RocksDB StateBackend 状态恢复问题
>
>
> Hi 社区,
>  如果实时任务状态后端之前是 FS StateBackend
> ,然后任务停止后,换成 RocksDB StateBackend
> 做恢复,作业状态能恢复吗?
>
> Best,
> LakeShen


Re: FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 文章 LakeShen
确定了 不能

LakeShen  于2021年4月1日周四 下午7:15写道:

> Hi 社区,
>如果实时任务状态后端之前是 FS StateBackend ,然后任务停止后,换成 RocksDB StateBackend
> 做恢复,作业状态能恢复吗?
>
> Best,
> LakeShen
>


FS StateBackend 到 RocksDB StateBackend 状态恢复问题

2021-04-01 文章 LakeShen
Hi 社区,
   如果实时任务状态后端之前是 FS StateBackend ,然后任务停止后,换成 RocksDB StateBackend
做恢复,作业状态能恢复吗?

Best,
LakeShen


Re: 关于flinksql 与维表mysql的关联问题

2021-02-24 文章 LakeShen
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们考虑借助 Timer 来实现的,社区如果有这个功能的话,我觉得对于 Flink
使用方会有很大帮助的。
我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063

Best,
LakeShen

小屁孩 <932460...@qq.com> 于2020年6月8日周一 上午9:28写道:

> hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题
>
>
>
>
> --原始邮件--
> 发件人:"Px New"<15701181132mr@gmail.com;
> 发送时间:2020年6月7日(星期天) 晚上7:02
> 收件人:"user-zh"
> 主题:Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
>
> 1048262223 <1048262...@qq.com于2020年6月7日 周日下午3:57写道:
>
>  Hi
> 
> 
>  可以使用open + broadcast的方式解决~
> 
> 
>  Best,
>  Yichao Yang
> 
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Px New"<15701181132mr@gmail.comgt;;
>  发送时间:nbsp;2020年6月6日(星期六) 上午9:50
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 关于flinksql 与维表mysql的关联问题
> 
> 
> 
>  Hi ,我有一个相关操作的一疑问.
>  疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
> 
>  Michael Ran  
>  gt; 放到open 方法里面可以吗?
>  gt; 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.comgt; 写道:
>  gt; gt;dear:amp;nbsp; amp;nbsp;
> 我有个问题想请教下,关于flinksql与mysql维表关联
>  关于mysql更新的问题
>  gt;
> 
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
>  gt;


Re: Flink 维表延迟join

2021-02-24 文章 LakeShen
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们这边业务方也有维表延迟关联的述求,比如 HBase 维表的数据关联。
当前的场景是,有一张实时维表,消费 mysql binlog,然后业务方 etl 后,输出到
HBase。然后业务方还有另外一个流,会去关联这张维表,由于存在某些 rowkey
的数据还没有写入到 hbase,而另外一条流就去关联 HBase,却没有数据。所以业务方希望有个延迟维表关联功能,比如 10
分钟后在进行关联,目前我们考虑借助 Timer 来实现的,
社区如果有这个功能的话,我觉得对于 Flink 使用方会有很大帮助的。我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063

Best,
LakeShen


郑斌斌  于2020年8月27日周四 上午9:23写道:

> 小伙伴们:
>
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
>
> Thanks


Re: Flink 维表延迟join

2021-02-24 文章 LakeShen
Hi,

Benchao,这种发送到另外一个 topic
,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。

Benchao Li  于2020年8月27日周四 上午10:08写道:

> Hi,
>
> 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。
>
> 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。
>
> 郑斌斌  于2020年8月27日周四 上午9:23写道:
>
> > 小伙伴们:
> >
> >
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
> >
> > Thanks
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 文章 LakeShen
这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

> 同提交作业到On Yarn集群,客户端的错误也是
>
>
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> YARN application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1610671284452_0243 failed
> 10 times due to AM Container for appattempt_1610671284452_0243_10
> exited with  exitCode: 1
> Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
> container-launch.
> Container id: container_e48_1610671284452_0243_10_01
> Exit code: 1
>
>
> [2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
> Error file: prelaunch.err.
> Last 4096 bytes of prelaunch.err :
>
>
> [2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
> Error file: prelaunch.err.
> Last 4096 bytes of prelaunch.err :
>
>
> Yarn那边的日志显示:Could not find or load main class
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
>
>
> 不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题
>
>
> | |
> 凌战
> |
> |
> m18340872...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年2月23日 18:46,LakeShen 写道:
> Hi 社区,
>
> 最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute sql
>
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)
>
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
>
> at
>
> com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)
>
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
>
> at
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
>
> at
> com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)
>
> ... 11 more
>
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
>
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
>
> at
>
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
>
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
>
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
>
> at
>
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)
>
> ... 22 more
>
> Caused by:
> org.apache.flink.yarn.YarnClusterDescr

Flink 1.12 On Yarn 作业提交失败问题

2021-02-23 文章 LakeShen
Hi 社区,

  最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1613992328588_4441 failed 2
times due to AM Container for appattempt_1613992328588_4441_02 exited
with  exitCode: 1
Diagnostics: Exception from container-launch.
Container id: container_xxx
Exit code: 1
Stack trace: ExitCodeException exitCode=1:

at org.apache.hadoop.util.Shell.runCommand(Shell.java:575)

at org.apache.hadoop.util.Shell.run(Shell.java:478)

at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766)

at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)

at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)

at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)


  相关信息如下:
  1. 我的 Flink 作业中没有 Hadoop 相关的依赖
  2. 提交作业的机器,以及 Hadoop 集群每台机器都有 HADOOP_CLASSPATH 环境变量
  3. Flink 作业提交到 Yarn 后,状态之后从 Accepted 到 FAILED 状态。

  希望有人帮我解惑,感谢

  Best,
  LakeShen


Re: Flink实时统计 结果波动时大时小

2021-02-18 文章 LakeShen
Hi flink2021,
  你看下的聚合逻辑是不是在一个可撤回流上面进行聚合的呢,如果是的话,可以添加一下 mini batch 聚合优化参数,具体可以参考[1].
  [1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html

Best,
LakeShen


Robin Zhang  于2021年2月18日周四 下午2:44写道:

> Hi,flink2021
>首先看看业务场景,是否存在订单数据减少的情况,如果没有,就是逻辑或者代码有问题
>
> Best,
> Robin
>
>
> flink2021 wrote
> > 我的数据源是kafka
> >
> 统计订单数据结果写入到mysql,发现在数据有积压的过程中,统计结果会忽大忽小?有人遇到过相关的问题没有呢?需要调整那些设置呢?(数据链路又点复杂,state
> > 使用rockdb报错,没有设置过期时间)
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.12 SQL 语法,是否完全兼容 Flink 1.10 的 SQL 语法

2021-02-01 文章 LakeShen
Hello 社区,

最近开始考虑整理 Flink 1.10 升级到 1.12 的整体收益,想问下, Flink 1.12 SQL 语法是否完全兼容 1.10
版本的 SQL 语法,我的理解应该是兼容的.

Best,
LakeShen


Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-28 文章 LakeShen
如果是窗口类聚合,可以尝试一下自定义窗口 Trigger

Best,
LakeShen

林影  于2021年1月28日周四 下午5:46写道:

> Hi, Jessica.J.Wang
> 开源flink看起来没这个功能哈,文档翻了一遍没找到
>
> Jessica.J.Wang  于2021年1月28日周四 下午5:25写道:
>
> > 你使用的是什么窗口呢,是 Tumble或者Hop吗,如果没数据 但是想提前输出结果可以用 emit
> >
> >
> https://help.aliyun.com/document_detail/98951.html?spm=5176.11065259.1996646101.searchclickresult.513d1037M5VADa
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: SQL作业的提交方式

2021-01-07 文章 LakeShen
我这边是底层其实有个 Flink Jar 任务,然后将 Flink SQL 代码以及作业相关配置参数,当做参数一起传入到底层
的 Flink Jar 中去,当然,现在也有很多其他的方式能够实现,也可以参考楼上的链接。

Best,
LakeShen

Peihui He  于2021年1月8日周五 上午9:21写道:

> 可以尝试下zeppelin 0.9
> http://zeppelin.apache.org/
>
>
> jiangjiguang719  于2021年1月7日周四 下午8:34写道:
>
> > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下:
> > 1、有没有更好的SQL作业的提交方式?
> > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢?
> > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?
>


Re: flink cpu 利用率

2021-01-05 文章 LakeShen
看下 Flink 任务运行,是否是其他机器上的资源先达到瓶颈,而不是 CPU,比如 IO,同时看下你的 flatmap 处理单条记录的时间。
同时也参考上面同学的,是否存在反压,如果 flatmap 逻辑比较复杂,也有这个可能。

Best,
LakeShen

赵一旦  于2021年1月5日周二 下午9:13写道:

>
> 可以看看是否反压。反压说明并行度还是不够,不反压的话看处理速度是否符合预期。符合预期就不用调了,说明你的任务不复杂,那点cpu占用就够了。如果不符合预期,也没有任何反压,那就是source消费速度太慢。
>
>
>
> housezhang  于2021年1月5日周二 下午5:44写道:
>
> > 有可能是cpu能够处理得过来,网络io处理不过来了,看看网络使用情况
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: TUMBLE函数不支持 回撤流

2020-11-03 文章 LakeShen
Hi 夜思流年梦,

看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
如果是 retract ,应该就不能再上面进行窗口计算了。

Best,
LakeShen

史 正超  于2020年11月3日周二 下午6:34写道:

> canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> UPDATE DELETE, 相关代码如下:
>
> @Override
> public ChangelogMode getChangelogMode() {
>return ChangelogMode.newBuilder()
>   .addContainedKind(RowKind.INSERT)
>   .addContainedKind(RowKind.UPDATE_BEFORE)
>   .addContainedKind(RowKind.UPDATE_AFTER)
>   .addContainedKind(RowKind.DELETE)
>   .build();
> }
>
> 所以在window里消费带有update和delete的数据现在应该是不支持的。
> 
> 发件人: 夜思流年梦 
> 发送时间: 2020年11月3日 9:46
> 收件人: user-zh@flink.apache.org 
> 主题: TUMBLE函数不支持 回撤流
>
>
>
>
> 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
>
>
>
>
>
>
>
> 原sql
>
> select 0 as id
>
> , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
>
> ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
> then memberid else NULL end) as paynum_h
>
> ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> '-MM-dd')  then real_product else 0 end)) as paymoney_h
>
> from dwd_XXX
>
> where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>
> group by TUMBLE(proctime ,interval '1' HOUR);
>
>
> 报错:
>  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> 发现把kafka建表语句改成 json格式就可以
>
>
> 数据源不是flink-mysql-cdc得来的
>
>
> 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
>
>
>  'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XX',
>   'topic' = 'ODS_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'canal-json');
>
>
> 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> 建kafka表的格式,使用的changelog-json:
>
>
> WITH (
>   'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XXX',
>   'topic' = 'DWD_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'changelog-json');
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
> >Hi,
> >能贴一下完整的sql吗,数据源是CDC的数据吗?
> >
> >> 2020年10月30日 下午2:48,夜思流年梦  写道:
> >>
> >> 开发者你好:
> >> 现有此场景:
> >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> >> select
> >>
> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> >>
> >>> ,sum(amt) as paymoney_h
> >>
> >>> from 
> >>
> >>> group by TUMBLE(write_time,interval '1' HOUR);
> >>
> >>
> >> 报错:
> >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> >>
> >>
> >>
> >>
> >> 发现把kafka建表语句改成 json格式就可以
> >>
> >>
> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>
>


Re: Re: Re:flink任务挂掉后自动重启

2020-11-03 文章 LakeShen
Hi bradyMk,

整体上有两种方法:
   1. 任务被 Kill 掉后,拉起时,从checkpoint 恢复,这个就需要知道任务结束之前,最新一次的 checkpoint
信息,然后从这开始恢复。
   Flink 任务 checkpoint 的路径是 checkpoint 根路径 + job_id 组成的路径,所以你可以从这个目录找到
chk-xx 最新的 checkpoint ,然后进行恢复即可。
   当然,还有其他方法知道任务被kill 前,最新一次的 checkpoint 信息。你选择一种适合你的方式就行,然后做成自动化的,降低运维成本。

   2. 任务被 kill 掉后,从 Savepoint 恢复,可以定时做一次 savepoint
,不过这种方法由于状态和当前时间有间隙,可能会导致数据重放,从而下游可能会有消息重复。

  建议选择 1 来实现。

Best,
LakeShen



hailongwang <18868816...@163.com> 于2020年11月3日周二 下午7:21写道:

> Hi bradyMk,
>
>
> 在 on yarn 的模式下,如果某个container 被kill 了,是会重新拉起的。
> 至于整个 job 被kill 了,这种情况应该是自己手动显示的去停止把?
> 最于重启的话,重启次数可以设置个非常大的数字(~无限重启),但是一旦 job 一直这么重启,我个人任务就算重新拉起也是没用的把?
>   这个时候应该结合平台的告警策略来进行人工干预了。
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-11-03 09:32:50,"bradyMk"  写道:
> >您好,你说的这个策略是失败重启策略,但是如果job在某些情况被kill掉,或者重启超过重启次数,任务也会退出;我主要是针对这种情况重启的
> >
> >
> >
> >-
> >Best Wishes
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink SQL 任务乱码问题

2020-09-03 文章 LakeShen
Hi 社区,

   我的一个 Flink SQL 任务,版本为 1.10,代码中使用了中文值,具体如下:

 select xxx, case when a = 'a' then '你好'  when a = 'b' then '你好呀' end as va
from xxx ;

  然后会把这个结果输出,最后发现 va 的值乱码了,也就是中文乱码。

目前有什么比较好的解决方法吗。

Best,
LakeShen


Re: flink checkpoint导致反压严重

2020-08-25 文章 LakeShen
Hi zhanglachun,

你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢

Best,
LakeShen

徐骁  于2020年8月26日周三 上午2:10写道:

> input
>   .keyBy()
>   .timeWindow()
>   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
>
> 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
> window 里面
>


Re: Flink on k8s 中,Jar 任务 avatica-core 依赖和 flink-table jar 冲突问题

2020-07-16 文章 LakeShen
嗯嗯,Congxian,感谢你的回复,我通过 Maven Shaded 解决问题。

Congxian Qiu  于2020年7月16日周四 下午8:19写道:

> Hi
>
> 你的图挂了,如果单纯想解决 jar 包冲突的问题,那么 maven shade plugin[1] 或许对你有用
>
> [1]
>
> https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html
> Best,
> Congxian
>
>
> LakeShen  于2020年7月16日周四 下午6:03写道:
>
> > Hi 社区,
> >
> > 我现在正在迁移任务到 k8s ,目前版本为 Flink 1.6 版本,k8s 上面作业运行模式为 standalone per job.
> >
> > 现在遇到一个问题,业务方 Flink jar 任务使用了 org.apache.calcite.avatica 依赖,也就是下面依赖:
> > 
> > org.apache.calcite.avatica
> > avatica-core
> > ${avatica.version}
> > 
> >
> > 但是这个依赖其实在 flink-table 模块中,也有这个依赖:
> > [image: image.png]
> >
> > 由于 flink on k8s  standalone per job 模式,会把 Flink 任务 jar 包放入到 flink 本身的lib
> > 包中,我在任务启动的时候,就会报:
> > Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> > org.apache.calcite.avatica.ConnectionPropertiesImpl 错误。
> >
> > 按照我的理解,由于 Flink jar 任务包中有 avatica-core 依赖,同时在 flink lib
> > 目录下面,flink-table_2.11-1.6-RELEASE.jar 中也有这个依赖,这两个都在 lib 目录下,然后就出现了类冲突问题。
> >
> > 请问怎么解决这个问题呢,非常期待你的回复。
> >
> > Best,
> > LakeShen
> >
> >
> >
>


Flink on k8s 中,Jar 任务 avatica-core 依赖和 flink-table jar 冲突问题

2020-07-16 文章 LakeShen
Hi 社区,

我现在正在迁移任务到 k8s ,目前版本为 Flink 1.6 版本,k8s 上面作业运行模式为 standalone per job.

现在遇到一个问题,业务方 Flink jar 任务使用了 org.apache.calcite.avatica 依赖,也就是下面依赖:

org.apache.calcite.avatica
avatica-core
${avatica.version}


但是这个依赖其实在 flink-table 模块中,也有这个依赖:
[image: image.png]

由于 flink on k8s  standalone per job 模式,会把 Flink 任务 jar 包放入到 flink 本身的lib
包中,我在任务启动的时候,就会报:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.calcite.avatica.ConnectionPropertiesImpl 错误。

按照我的理解,由于 Flink jar 任务包中有 avatica-core 依赖,同时在 flink lib
目录下面,flink-table_2.11-1.6-RELEASE.jar 中也有这个依赖,这两个都在 lib 目录下,然后就出现了类冲突问题。

请问怎么解决这个问题呢,非常期待你的回复。

Best,
LakeShen


Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 LakeShen
你的 Yarn 环境,Flink 任务使用的 Kafka 地址,应该是 Yarn 环境的 kafka broker 地址。

LakeShen  于2020年7月10日周五 上午10:08写道:

> Hi,
>
> 从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。
>
> 这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。
>
> Best,
> LakeShen
>
> m...@sinoiov.com  于2020年7月9日周四 下午9:21写道:
>
>> hi:zhisheng:
>>
>> 这是TM日志,在这之前没有任何错误日志,
>>
>> 代码逻辑很简单:
>> SingleOutputStreamOperator>
>> sourceStream = env.addSource(source)
>> .setParallelism(2)
>> .uid("DataProcessSource")
>> .flatMap(new DataConvertFunction())
>> .setParallelism(2)
>> .uid("DataProcessDataCovert")
>> .keyBy(new KeySelectorFunction())
>> .process(new DataCleanFunction())
>> .setParallelism(2)
>> .uid("DataProcessDataProcess");
>>
>> AsyncDataStream.orderedWait(
>> sourceStream,
>> new AsyncDataCleanFunction(),
>> EnvUtil.TOOL.getLong(Constant.ASYNC_TOMEOUT),
>> TimeUnit.MILLISECONDS,
>> EnvUtil.TOOL.getInt(Constant.ASYNC_CAPACITY)
>> ).uid("DataProcessAsync")
>> .setParallelism(2)
>> .addSink(sink)
>> .uid("DataProcessSinkKafka")
>> .setParallelism(2);
>>
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'gps.kafka.sasl' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'java.ext.dirs' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'java.class.version' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
>> - Kafka version: 2.2.0
>> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
>> - Kafka commitId: 05fcfde8f69b0349
>> 2020-07-09 19:33:38,482 INFO com.sinoi.rt.common.protocol.HttpPoolUtil -
>> http pool init,maxTotal:18,maxPerRoute:6
>> 2020-07-09 19:33:38,971 WARN org.apache.kafka.clients.NetworkClient -
>> [Producer clientId=producer-1] Error while fetching metadata with
>> correlation id 8 : {=INVALID_TOPIC_EXCEPTION}
>> 2020-07-09 19:33:38,974 INFO
>> org.apache.kafka.clients.producer.KafkaProducer - [Producer
>> clientId=producer-1] Closing the Kafka producer with timeoutMillis =
>> 9223372036854775807 ms.
>> 2020-07-09 19:33:41,612 INFO org.apache.flink.runtime.taskmanager.Task -
>> async wait operator -> Sink: Unnamed (1/2)
>> (cdbe008dcdb76813f88c4a48b9907d77) switched from RUNNING to FAILED.
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
>> to send data to Kafka:
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>> at
>> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
>> at
>> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:279)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:76)
>> at
>> org.apache.flink.streaming.api

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 文章 LakeShen
Hi,

从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。

这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。

Best,
LakeShen

m...@sinoiov.com  于2020年7月9日周四 下午9:21写道:

> hi:zhisheng:
>
> 这是TM日志,在这之前没有任何错误日志,
>
> 代码逻辑很简单:
> SingleOutputStreamOperator>
> sourceStream = env.addSource(source)
> .setParallelism(2)
> .uid("DataProcessSource")
> .flatMap(new DataConvertFunction())
> .setParallelism(2)
> .uid("DataProcessDataCovert")
> .keyBy(new KeySelectorFunction())
> .process(new DataCleanFunction())
> .setParallelism(2)
> .uid("DataProcessDataProcess");
>
> AsyncDataStream.orderedWait(
> sourceStream,
> new AsyncDataCleanFunction(),
> EnvUtil.TOOL.getLong(Constant.ASYNC_TOMEOUT),
> TimeUnit.MILLISECONDS,
> EnvUtil.TOOL.getInt(Constant.ASYNC_CAPACITY)
> ).uid("DataProcessAsync")
> .setParallelism(2)
> .addSink(sink)
> .uid("DataProcessSinkKafka")
> .setParallelism(2);
>
> 2020-07-09 19:33:37,291 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> 'gps.kafka.sasl' was supplied but isn't a known config.
> 2020-07-09 19:33:37,291 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> 'java.ext.dirs' was supplied but isn't a known config.
> 2020-07-09 19:33:37,291 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> 'java.class.version' was supplied but isn't a known config.
> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser -
> Kafka version: 2.2.0
> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser -
> Kafka commitId: 05fcfde8f69b0349
> 2020-07-09 19:33:38,482 INFO com.sinoi.rt.common.protocol.HttpPoolUtil -
> http pool init,maxTotal:18,maxPerRoute:6
> 2020-07-09 19:33:38,971 WARN org.apache.kafka.clients.NetworkClient -
> [Producer clientId=producer-1] Error while fetching metadata with
> correlation id 8 : {=INVALID_TOPIC_EXCEPTION}
> 2020-07-09 19:33:38,974 INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-1] Closing the Kafka producer with timeoutMillis =
> 9223372036854775807 ms.
> 2020-07-09 19:33:41,612 INFO org.apache.flink.runtime.taskmanager.Task -
> async wait operator -> Sink: Unnamed (1/2)
> (cdbe008dcdb76813f88c4a48b9907d77) switched from RUNNING to FAILED.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka:
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
> at
> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:279)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:76)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:351)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:336)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProc

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 LakeShen
Hi Peihui,

如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:

{
"a":"b",
"c":{
"d":"e",
"g":"f"
}
},

那么在 kafka table source 可以使用 row 来定义:

create table xxx (
a varchar,
c row
)

如果 还存在嵌套,可以继续再使用 Row 来定义。

Best,
LakeShen

Peihui He  于2020年7月10日周五 上午9:12写道:

> Hello:
>
> 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
>
>  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
>
>
> Best wishes.
>


Re: 做实时数仓,sql怎么保证分topic区有序

2020-07-02 文章 LakeShen
Hi air23,

  > flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
 你可以为你的程序设置默认的并发度,代码或者命令行参数,配置文件都可以。

>  flink sql 通过ddl写入kafka怎么自定义分区呢?
kafka sink 自定义分区器:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#kafka-connector
,
将 'connector.sink-partitioner'设置为 'custom', 然后设置 '
connector.sink-partitioner-class'. Best,
LakeShen

shizk233  于2020年7月2日周四 下午7:46写道:

> Hi air23,
>
> sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。
> 你可以试试流转表,可以做到细粒度的控制。
>
> Best,
> shizk233
>
> air23  于2020年7月2日周四 下午6:40写道:
>
> > hi
> > 就是我用
> >flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
> >flink sql 通过ddl写入kafka怎么自定义分区呢?
> >
> >
> > 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置  或者做自定义分区。
> >
> >
> >
> >
> >
> >
>


Re: 作业因为异常restart后,频繁OOM

2020-06-30 文章 LakeShen
我在较低版本,Flink on k8s ,也遇到 OOM 被 kill 了。

我感觉可能是 TaskManager 堆外内存不足了,我目前是 Flink 1.6 版本,Flink on k8s , standalone per
job 模式,堆外内存默认没有限制~。

我的解决方法增加了一个参数:taskmanager.memory.off-heap: true.

目前来看,OOM被 kill 掉的问题没有在出现了。希望能帮到你。

Best,
LakeShen

SmileSmile  于2020年6月30日周二 下午11:19写道:

>
> 补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用?
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年06月30日 23:00,GuoSmileSmil 写道:
> hi all,
>
>
>
> 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。
>
>
> 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os
> kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。
>
>
> 如果单纯heap的状态后台,作业restart不会出现这样的问题。
>
>
> 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job
> restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。
>
>
> 请问大家是否有什么好的提议或者解决方法?
>
>
> 其中一次系统内核日志如下:
>
>
> Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit
> 28672000kB, failcnt 11225
> Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit
> 9007199254740988kB, failcnt 0
> Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit
> 9007199254740988kB, failcnt 0
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice:
> cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K
> B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB
> unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1
> 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB
> swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB
> active_file:0KB unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope:
> cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB
> inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB
> unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: [ pid ]   uid  tgid total_vm  rss
> nr_ptes swapents oom_score_adj name
> Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875  2531
>  40  -998 pause
> Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369  421
>  70  -998 bash
> Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832  7174316
>  145000  -998 java
> Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017  196
>  60  -998 tail
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill
> process 26824 (Window(Tumbling) score 4 or sacrifice child
> Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java)
> total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB
>
>
>
>
>
>
> Looking forward to your reply and help.
>
> Best


Re: mysql sink connection timeout

2020-06-30 文章 LakeShen
Hi shizk233,

可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681。

这个就是长时间没有数据,导致 connection 断开问题。

Best,
LakeShen

shizk233  于2020年6月30日周二 下午1:34写道:

> Hi Zhong Tang,
>
>我查看了该jira有关的重连pr,https://github.com/apache/flink/pull/8429
> ,但该pr主要通过重连机制来规避网络问题导致的连接失效,
>  但由于我的业务场景数据比较稀疏,遭遇wait timeout连接失效是一个常见的情况,有最大次数限制的重连机制并不是很适合。
>
> 主要的需求其实是connection的持久保活。
>
> Thanks,
> Xuhui Mao
>
> Zhonghan Tang <13122260...@163.com> 于2020年6月30日周二 下午12:05写道:
>
> > 可以参考这个jira
> > https://issues.apache.org/jira/browse/FLINK-12494
> > 1. Throw execption and let flink runtime handle it;
> > 2. Handle it in OutputFormat;
> >
> >
> > | |
> > Zhonghan Tang
> > |
> > |
> > 13122260...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > On 06/30/2020 11:53,shizk233 wrote:
> > Hi All,
> > 最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
> > timeout限制(默认的8小时)导致连接失效。
> > 即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。
> >
> > 版本信息:
> > flink 1.10.1
> > mysql server 5.6.47
> > mysql Connector/J 5.1.49
> >
> > 请问:
> > 1.flink的jdbc
> connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
> > 2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
> > 3.在当前版本下,连接失效有什么比较好的解决方案吗?
> >
> > Thanks,
> > Xuhui Mao
> >
> > 异常信息:
> > 2020-06-24 22:39:46,923 ERROR
> > org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC
> > executeBatch error, retry times = 1
> > java.sql.SQLException: Could not retrieve transaction read-only status
> from
> > server
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
> > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
> > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
> > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
> > at
> >
> >
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
> > at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
> > at
> > org.apache.flink.api.java.io
> > .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
> > at
> > org.apache.flink.api.java.io
> > .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
> > at
> > org.apache.flink.api.java.io
> >
> .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
> > Communications link failure
> >
> > The last packet successfully received from the server was 10,384,059
> > milliseconds ago.  The last packet sent successfully to the server was
> > 10,384,063 milliseconds ago.
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> > at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
> > at
> com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
> > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
> > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
> > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
> > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
> > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
> > at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
> > at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
> > ... 13 more
> > Caused by: java.net.SocketException: Connection reset
> > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
> > at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> > at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> > at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
> > ... 19 more
> >
>


Re: Re: flink 高可用问题

2020-06-29 文章 LakeShen
Hi, Tony,

看了一下,你运行在 k8s 上面的任务,job id 是,那如果有多个任务,jobid
如果都是 
的话,如果都是在相同的ZK根目录,不同 k8s 任务 在 zk 上面的信息可能会有影响。目前我们这边是每个k8s 任务,在不同的 zk 路径下面。

第二点的话,你的任务是否能够正常运行起来?还是说任务正常运行起来,只是 checkpoint 有问题。

目前 k8s 上,JobManager 的高可用我们也借助 ZK,由于 JobManager 是一个 K8s Job,所以配置 K8s Job
的 restartPolicy 为 OnFailure。

这只是我的一些经验,仅供参考。

Best,
LakeShen

zhisheng  于2020年6月30日周二 上午8:51写道:

> hi,Tony
>
> 你可以把 Checkpoint 间隔时间稍微设置大一些,看起来像是作业启动的时候 Task 还没 Running,就开始执行 Checkpoint
> 了,而 Checkpoint 是要求所有的 Task 是处于 Running 状态的,所以就会丢弃掉那次
> Checkpoint,BT,就算有这个异常应该问题也不大,只要后面你的作业全启动成功了的话,则 Checkpoint 还是会成功的。
>
> Best!
>
> zhisheng
>
> Tony  于2020年6月29日周一 下午8:16写道:
>
> >
> >
> 你好,我的flink运行环境是在k8s中,我先是打开了checkpoint功能,那样是可以用的,task失败了数据还可以恢复,但job失败了就不行了,所以我又配置flink的高可用,在job的yaml文件里设置了动态属性("-Dhigh-availability=zookeeper"),这样job启动时就出现那种警告,功能也不好用了。但如果配置在flink-config文件里的话就可以,不知道为什么?而我就是想用那个动态属性的方式,谢谢大神指点。
> >
> >
> >
> >
> >
> > --
> > 发自我的网易邮箱手机智能版
> > 
> >
> >
> > - Original Message -
> > From: tison 
> > To: user-zh 
> > Sent: Mon, 22 Jun 2020 15:08:04 +0800
> > Subject: Re: flink 高可用问题
> >
> > 你看一下你的 chk 间隔,看起来是作业还没调度起来就开始 chk 所以失败。可能原因资源不足,调度不起来或者调度得慢,你 chk
> > 间隔又小,就这样了。
> >
> > 如果是一直 chk 以这个方式失败,应该看下调度的日志为啥迟迟调不起来
> >
> > Best,
> > tison.
> >
> >
> > Yichao Yang <1048262...@qq.com> 于2020年6月22日周一 上午10:57写道:
> >
> > > Hi
> > >
> > >
> > > 看日志应该只是INFO,而不是错误,你的job是做不了checkpoint吗?
> > >
> > >
> > > Best,
> > > Yichao Yang
> > >
> > >
> > >
> > >
> > > --原始邮件--
> > > 发件人:"Tony" > > 发送时间:2020年6月22日(星期一) 上午10:54
> > > 收件人:"user-zh" > >
> > > 主题:flink 高可用问题
> > >
> > >
> > >
> > > 你好。
> > >
> > >
> > > 我按着官方文档配置了flink的高可用(flink-conf.yaml)如下:
> > > high-availability:zookeeper
> > > high-availability.zookeeper.quorum:master:2181 ,slave1:2181,slave2:2181
> > > high-availability.zookeeper.path.root:/flink
> > > high-availability.cluster-id:/cluster_one
> > > highavailability.storageDir:hdfs://master:9000/flink/ha
> > >
> > >
> > > 我的flink和zookeeper都是在K8s的容器中
> > > job启动出现如下问题:麻烦帮忙看一下,谢谢。
> > > 2020-06-22 02:47:43,884 INFO
> > >
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Checkpoint triggering task Source:Kafka-Consumer - (Sink: Print
> to
> > > Std. Out, Filter -Query Map - Unwind - Custom Map -
> > filter
> > > - Data Transformation - Filter) (1/1) of job
> > >  is not in state RUNNING but SCHEDULED
> > > instead. Aborting checkpoint.
> >
>


Re: flink sql能否显示地创造一列null行

2020-06-29 文章 LakeShen
或者补齐一个非 Null ,但是又不影响业务逻辑的数值

Jingsong Li  于2020年6月30日周二 上午9:58写道:

> Hi,
>
> 我记得NULL的literal是可以的,不过需要cast成确定的类型,比如 select CAST(null AS VARCHAR);
> 你试试。
>
> Best,
> Jingsong
>
> On Tue, Jun 30, 2020 at 9:40 AM seeksst  wrote:
>
> > Hi,
> >
> >
> >   按照你的意思是想将两个不同的数据集进行union,但是由于字段不同需要补充NULL。
> >   显示的NULL是不行的,你可以使用更复杂的方式进行对齐:
> > case when 1 = 2 then 1 end as 字段
> >   1永远不可能等于2,又没有else分支,所以结果是会返回null.
> >
> >
> > 原始邮件
> > 发件人:naisili yuanyuanlong1...@gmail.com
> > 收件人:user-zhuser...@flink.apache.org
> > 发送时间:2020年6月30日(周二) 09:31
> > 主题:flink sql能否显示地创造一列null行
> >
> >
> > 由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下: UNION SELECT NULL , aaa, bbb, NULL
> > FROM ()
>
>
>
> --
> Best, Jingsong Lee
>


Re: 回复: 关于拓展 Tuple元组的问题

2020-06-29 文章 LakeShen
哈哈,学习了一波

Jingsong Li  于2020年6月30日周二 上午9:59写道:

> > 用Row 和 Tuple 性能上会有差别吗?
>
> 理论上有细微的差别,
> 但是,基本上性能瓶颈不会在这里。。所以你应该感受不到
>
> Best,
> Jingsong
>
> On Tue, Jun 30, 2020 at 8:51 AM zhisheng  wrote:
>
> > 可以测试一下
> >
> > Tianwang Li  于2020年6月29日周一 下午8:13写道:
> >
> > > >
> > > > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> > > >
> > > 用Row 和 Tuple 性能上会有差别吗?
> > >
> > > Jark Wu  于2020年6月19日周五 下午3:47写道:
> > >
> > > > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> > > >
> > > >
> > > > On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
> > > >
> > > > > 感谢你的回答,请问可否举一个参照例子?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
> > > > > wangweigu...@stevegame.cn> 写道:
> > > > > >
> > > > > >   多个值组合在一起,当一个复合值使用!
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >发件人: 魏旭斌
> > > > > >发送时间: 2020-06-19 15:01
> > > > > >收件人: user-zh
> > > > > >主题: 关于拓展 Tuple元组的问题
> > > > > >目前Flink 提供了Tuple1 ~
> > Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
> > > > > 请问有什么解决的方案? 谢谢
> > > > >
> > > >
> > >
> > >
> > > --
> > > **
> > >  tivanli
> > > **
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: 高可用集群

2020-06-28 文章 LakeShen
Hi 李军,

目前我们在 Yarn 上面的话,用的是 Flink On Yarn Per Job 模式,在 K8s 上面的话,就是 Standalone per
Job 模式。

Best,
LakeShen

刘佳炜  于2020年6月28日周日 下午5:14写道:

> 如果你公司用hadoop的话就是YARN StandAlone一般都是单机测试练习的
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 李军  发送时间: 2020年6月28日 17:11
> 收件人: user-zh  主题: 回复:高可用集群
>
>
>
> 
> 请教下,各位大佬们生产环境使用的是哪种集群配置
>  1.
> Standalone 集群
>  2. Yarn
> 集群
> 
>
>  理由是什么,不知道怎么选择
>
>
> 2020-6-28
> | |
> 李军
> |
> |
> hold_li...@163.com
> |
> 签名由网易邮箱大师定制


Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-27 文章 LakeShen
Hi Tianwang Li,

偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。

Best,
LakeShen

zhisheng  于2020年6月28日周日 上午10:27写道:

> hi, Tianwang Li
>
> 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:
>
> > 任务经常会出现反压(特别是在窗口输出的时候)
>
> 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。
>
>
> > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)
>
> 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象
>
> 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。
>
>
> Best !
> zhisheng
>
>
> Tianwang Li  于2020年6月28日周日 上午10:17写道:
>
> > 关于Flink checkpoint偶尔会比较长时间的问题。
> >
> > *环境与背景:*
> > 版本:flink1.10.0
> > 数据量:每秒约10万左右的记录,数据源是kafka
> > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
> >
> > *问题:*
> > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting
> checkpoint消耗时间比较长。
> >
> > checkpoint情况大致如下:
> >
> > [image: image.png]
> > [image: image.png]
> > [image: image.png]
> >
> > 2020-06-24 21:09:53,369 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger
> > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
> >
> > 2020-06-24 21:09:58,327 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> >
> > 2020-06-24 21:09:59,266 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> > 111/114/424 MB (used/committed/max)]
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> > 2020-06-24 21:10:08,346 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> >
> > 2020-06-24 21:10:09,286 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> > 111/114/424 MB (used/committed/max)]
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> >
> > 省略
> >
> >
> > 2020-06-24 21:55:39,875 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:55:39,875 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:55:39,876 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >   - Garbage collector stats: [PS Sc

Re: 为什么 flink checkpoint Checkpoint Duration (Async) 阶段耗时很慢

2020-06-27 文章 LakeShen
Hi 张立志,

一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。

然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details

Best,
LakeShen

张立志  于2020年6月28日周日 上午9:52写道:

> flink 版本1.8
> 部署集群yarn
>
>
> 配置代码:
> StreamExecutionEnvironment.stateBackend(new
> FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
> 业务代码相对比较简单,内存占用较大
> 超过10分钟后开始报错,state 大概在1.5G时,开始耗时开始变长
>
>
>
>
>
>


Re: flink1.9 on yarn

2020-06-27 文章 LakeShen
Hi guanyq,

你为什么希望 app id 不变呢?

Best,
LakeShen

guanyq  于2020年6月28日周日 上午9:10写道:

> 问题1
>
> ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
> 当yarn application -kill application_1567067657620_0254后,
>
> 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
> 问题2
>
> ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
>
>


Re: flink1.9 on yarn 运行二个多月之后出现错误

2020-06-23 文章 LakeShen
Hi guanyq,

从日志中,我看到 TaskManager 所在机器的本地存储几乎快用完了。

看下是否因为 TaskManager 所在机器的存储不够导致

Best,
LakeShen

xueaohui_...@163.com  于2020年6月20日周六 上午9:57写道:

> 不知道有没有yarn上面的详细日志。
>
> hdfs是否有权限问题
>
>
>
> xueaohui_...@163.com
>
> 发件人: guanyq
> 发送时间: 2020-06-20 08:48
> 收件人: user-zh
> 主题: flink1.9 on yarn 运行二个多月之后出现错误
> 附件为错误日志。哪位大佬帮忙分析下。
>
>
>
>


Re: Flink DataStream

2020-06-23 文章 LakeShen
Hi xuhaiLong,

看你的依赖,应该用的 old planner,你要使用 blink planner 才能使用row_number 函数。要使用
flink-table-planner-blink_2.11
具体文档参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/#table-program-dependencies

Best,
LakeShen

xuhaiLong  于2020年6月23日周二 下午8:14写道:

> "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
> "org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",
>  看下粘贴的 sbt 依赖
> On 6/23/2020 20:06,Jark Wu wrote:
> 图片无法查看,你可以把图片上传到某图床,然后将链接贴这里。
>
> On Tue, 23 Jun 2020 at 19:59, xuhaiLong  wrote:
>
> 使用的是1.10.1,在 table api 无法使用ROW_NUMBER
> On 6/23/2020 19:52,Jark Wu  wrote:
>
> Hi xuhaiLong,
>
> 1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old
> planner 呢?
>
> Best,
> Jark
>
> On Tue, 23 Jun 2020 at 19:44, LakeShen  wrote:
>
> Hi xuhaiLong,
>
> 看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。
>
> Best,
> LakeShen
>
> xuhaiLong  于2020年6月23日周二 下午7:18写道:
>
> Hi
>
> 请教一个问题
>
>
> 我需要对一个类似这样的数据进行计算获取用户 categoryId
> | userId | articleID | categoryId | score |
> | 01 | A | 1 | 10 |
> | 01 | B | 1 | 20 |
> | 01 | C | 2 | 30 |
>
>
>
>
> 目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合
> 再通过状态做TopN排序,有没有其他更好的方案来实现?
>
>
> 我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API
>
> 还有其他方法实现吗?
>
>
>
>
>
> 感谢!!
>
>
>
>
>
>
>


Re: flink任务失败重启时, flink last checkpoint 失败但任务仍然正常重启,导致 state 重启前后不一致

2020-06-23 文章 LakeShen
Hi ,

正如 Congxian 所说,当 Flink 任务容错恢复重启时,会从上一次成功的 Checkpoint 进行恢复。

所以你所说的 last checkpoint 失败,具体是什么含义呢?

Best,
LakeShen

Congxian Qiu  于2020年6月22日周一 下午8:23写道:

> hi
>
> 这里说的 state 不一致是什么意思呢?checkpoint 恢复保证全局的 state 被重置到之前某个成功的 checkpoint。
>
> Best,
> Congxian
>
>
> 莫失莫忘  于2020年6月22日周一 下午8:09写道:
>
> > 如题,可以要求flink失败重启时 必须正常从checkpoint恢复,否则就重启失败吗?
>


Re: Flink DataStream

2020-06-23 文章 LakeShen
Hi xuhaiLong,

看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。

Best,
LakeShen

xuhaiLong  于2020年6月23日周二 下午7:18写道:

> Hi
>
> 请教一个问题
>
>
> 我需要对一个类似这样的数据进行计算获取用户 categoryId
> | userId | articleID | categoryId | score |
> | 01 | A | 1 | 10 |
> | 01 | B | 1 | 20 |
> | 01 | C | 2 | 30 |
>
>
>
>
> 目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合
> 再通过状态做TopN排序,有没有其他更好的方案来实现?
>
>
> 我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API 还有其他方法实现吗?
>
>
>
>
> 感谢!!
>
>
>
>


Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-11 文章 LakeShen
Hi ZheFu,

可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
的数据是否都已经 Sink 到了 kafka.

也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。

具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。

Best,
LakeShen

Congxian Qiu  于2020年6月11日周四 上午9:50写道:

> Hi
>
> 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
> java.lang.IllegalStateException: Pending record count must be zero at this
> point: 5”,需要看一下为什么会走到这里
>
> Best,
> Congxian
>
>
> 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
>
> >
> >
> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
> >
> > > 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> > >
> > > 补充一下,在TaskManager发现了如下错误日志:
> > >
> > > 2020-06-10 12:44:40,688 ERROR
> > > org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> > > during disposal of stream operator.
> > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> > to
> > > send data to Kafka: Pending record count must be zero at this point: 5
> > > at
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> > > at
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> > > at
> > >
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> > > at
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.IllegalStateException: Pending record count must
> be
> > > zero at this point: 5
> > > at
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> > > at
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> > > ... 8 more
> > >
> > > 希望得到帮助,感谢!
> > >
> > >
> > > Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> > >
> > >> Hi all,
> > >>
> > >> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> > >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
> > Field_Filter
> > >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
> > >>
> > >>
> > >>
> >
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
> > >>
> > >> 部分报错信息如下:
> > >> 2020-06-10 12:02:49,083 INFO
> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > Triggering
> > >> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> > >> 2020-06-10 12:04:47,898 INFO
> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Decline
> > >> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> > >> c41f4811262db1c4c270b136571c8201 at
> > >> container_e27_1591466310139_21670_01_06 @
> > >> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> > >> 2020-06-10 12:04:47,899 INFO
> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > Discarding
> > >> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> > >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> > >> complete snapshot 1 for operator Source: Custom Source -> Map ->
> > Source_Map
> > >> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map
> ->
> > Map
>

Re: flink 任务并发顺序

2020-06-08 文章 LakeShen
Hi,

Flink 任务运行时,本身就是一个 DAG 图,从 Source 出发,到 Sink 结束。

所以你所说的顺序,完全是按照你的业务逻辑来的。

Best,
LakeShen

Caizhi Weng  于2020年6月8日周一 下午5:41写道:

> Hi,
>
> 不能设置这两个算子的执行顺序。流作业很可能是不会结束的,如果非要设定顺序的话可能另一个算子永远都不会执行。
>
> 不过如果已知是有限流的话,可以把这两个算子分成两个 job 提交,拿到 job client 以后用 job client 等待一个 job
> 的结束,再提交另一个 job。
>
> 小学生 <201782...@qq.com> 于2020年6月8日周一 下午5:29写道:
>
> > 各位大佬好,有一个问题需要咨询下:
> > 当前我在一个flink执行文件中,对同一源流数据有不同的算子操作,例如:
> > 1.table1.insert(‘sink_table’)
> > 2.sink_table.insert(‘sink_table1’)
> >
> >
> 也就是说第二个算子依赖于第一个算子产生的结果,所以想咨询下,在flink中可设置流数据算子的执行顺序吗(当前我理解是flink会并发处理两个算子,先后性保证不了)?
>


Re: flink 1.9 关于回撤流的问题

2020-06-03 文章 LakeShen
Hi,

RetractStream 目前是无法输出到 kafka 的,因为 kafka 是 Append 模式。

不过你应该可以定义一个时间窗口 T ,滚动窗口的时间就是 T,然后聚合一次,输出到 kafka,后面都使用这个 kafka topic。

Best,
LakeShen

star <3149768...@qq.com> 于2020年6月3日周三 下午4:31写道:

> 感谢两位的回复,
> 转成回撤流的这个流其实是一张轻度汇总表,
> 例如,select year,month,day,province,sub_name,sum(amount),count(*) as cou
> from mytable group by year,month,day,province,sub_name;
>
>
> 后面有几十张实时报表依赖这个流 再进行汇总 计算;
> 我现在是把这个轻度汇总表转成了回撤流输出到了kafka里面,如果后面这几十张报表能够消费这个topic并转成table,就可以做后面到运算了。
>
>
> 不知道能不能转成这样到table?
>
>
>
>
>
>
> --原始邮件--
> 发件人:"godfrey he" 发送时间:2020年6月3日(星期三) 下午3:40
> 收件人:"user-zh"
> 主题:Re: flink 1.9 关于回撤流的问题
>
>
>
> hi star,
> Flink 1.11 开始已经支持 table source 读取 retract 消息,update 消息。
> 目前支持 Debezium format,Canal format [1],其他的情况目前需要自己实现。
>
>
> Best,
> Godfrey
>
> [1]
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat
>
>
> 1048262223 <1048262...@qq.com 于2020年6月3日周三 下午2:59写道:
>
>  Hi
>  Flink 中RetractStream
> 
> 是必须要sink支持update的,kafka消息队列本身是不支持update的,所以基于sink为kafka的程序是不能做RetractStream的。
> 
> 
>  Best,
>  Yichao Yang
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"star"<3149768...@qq.comgt;;
>  发送时间:nbsp;2020年6月3日(星期三) 下午2:47
>  收件人:nbsp;"user-zh@flink.apache.org" gt;;
> 
>  主题:nbsp;flink 1.9 关于回撤流的问题
> 
> 
> 
>  大家好,
> 
> 
> 
> 
> 在做实时统计分析的时候我将基础汇总层做成了一个回撤流(toRetractStream)输出到kafka里(因为后面很多实时报表依赖这个流,所以就输出了)
>  问题是这个kafka里到回撤流还能转成flink 的RetractStream流吗,转成后我想注册成一张表,然后基于这个表再做聚合分析
> 
> 
> 
> 
>  谢谢


Re: Re:回复:flink1.9,如何实时查看kafka消费的挤压量

2020-06-03 文章 LakeShen
或者可以通过 Kafka-Manager 来查看

guanyq  于2020年6月3日周三 下午4:45写道:

>
>
>
> 找到了,原生就有的committedOffsets-currentOffsets
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter
> Connectors
> Kafka Connectors
> | Scope | Metrics | User Variables | Description | Type |
> | Operator | commitsSucceeded | n/a | The total number of successful
> offset commits to Kafka, if offset committing is turned on and
> checkpointing is enabled. | Counter |
> | Operator | commitsFailed | n/a | The total number of offset commit
> failures to Kafka, if offset committing is turned on and checkpointing is
> enabled. Note that committing offsets back to Kafka is only a means to
> expose consumer progress, so a commit failure does not affect the integrity
> of Flink's checkpointed partition offsets. | Counter |
> | Operator | committedOffsets | topic, partition | The last successfully
> committed offsets to Kafka, for each partition. A particular partition's
> metric can be specified by topic name and partition id. | Gauge |
> | Operator | currentOffsets | topic, partition | The consumer's current
> read offset, for each partition. A particular partition's metric can be
> specified by topic name and partition id. | Gauge |
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:02:24,"guanyq"  写道:
> >kafka挤压量的metrics的demo有么,或者参考资料
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-06-03 14:31:56,"1530130567" <1530130...@qq.com> 写道:
> >>Hi:
> >>  可以考虑用prometheus采集kafka的metrics,在grafana上展示
> >>
> >>
> >>
> >>
> >>--原始邮件--
> >>发件人: "Zhonghan Tang"<13122260...@163.com;
> >>发送时间: 2020年6月3日(星期三) 下午2:29
> >>收件人: "user-zh" >>抄送: "user-zh" >>主题: 回复:flink1.9,如何实时查看kafka消费的挤压量
> >>
> >>
> >>
> >>一般是kafka自带的查看消费组的命令工具可以看
> >>./kafka-consumer-groups.sh --describe --group test-consumer-group
> --bootstrap-server
> >>
> >>
> >>| |
> >>Zhonghan Tang
> >>|
> >>|
> >>13122260...@163.com
> >>|
> >>签名由网易邮箱大师定制
> >>
> >>
> >>在2020年06月3日 14:10,guanyq >>请加个问题
> >>
> >>1.消费kafka时,是如何实时查看kafka topic的挤压量的?
>


Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 文章 LakeShen
Hi Benchao,

TableEnvironment 和 StreamTableEnvironment 具体有什么差异吗,我看StreamTableEnvironment
继承了 TableEnvironment。

这块我不是很了解,有什么文档介绍吗,感谢。

Best,
LakeShen

Benchao Li  于2020年5月28日周四 下午5:52写道:

> Hi zhisheng,
>
> 这块内容在FLIP-84[1]里面做了大量的重构和优化,而且大部分工作在1.11已经完成了,你们可以看下。
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:45写道:
>
> > Hi,
> >
> >
> StreamTableEnvironment类似于TableEnvironment的Dag优化有版本规划吗?未来TableEnvironment的dag优化和StreamTableEnvironment较丰富api会不会统一?
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 17:35
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> > Hi,
> >
> > 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
> >
> >
> 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午5:27写道:
> >
> > > Hi, Benchao:
> > >
> > >
> >
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
> > >
> > >
> > >
> > >
> > > Best,
> > > Junbao Zhang
> > > 
> > > 发件人: Benchao Li 
> > > 发送时间: 2020年5月28日 17:05
> > > 收件人: user-zh 
> > > 主题: Re: 疑问:flink sql
> > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > >
> > > 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> > > UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
> > >
> > > wind.fly@outlook.com  于2020年5月28日周四
> > > 下午5:02写道:
> > >
> > > > Hi, Benchao:
> > > >
> 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> > > >
> > > >
> > > >
> > > >
> > > > Best,
> > > > Junbao Zhang
> > > > 
> > > > 发件人: Benchao Li 
> > > > 发送时间: 2020年5月28日 15:59
> > > > 收件人: user-zh 
> > > > 主题: Re: 疑问:flink sql
> > > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > > >
> > > >
> > > >
> > >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> > > >
> > > > wind.fly@outlook.com  于2020年5月28日周四
> > > > 下午3:14写道:
> > > >
> > > > > Hi,all:
> > > > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > > > >
> > > > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > > > >
> > > > > 其中a是kafka表,connector属性为:
> > > > > 'connector.properties.group.id' = 'testGroup',
> > > > > 'connector.startup-mode' = 'group-offsets'
> > > > >
> > > > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL 嵌套 nested Json 解析

2020-05-26 文章 LakeShen
Hi,

关于 Json 的解析,当你的 Json 里面的一个字段一个镶嵌类型的话,可以将其定义为一个 row,row 里面还可以定义 row 字段。

注意 row 里面的字段名称要和原始json 里面的字段一致。

Best,
LakeShen

claylin <1012539...@qq.com> 于2020年5月26日周二 上午10:17写道:

> 嗯 谢谢 我试下看下
>
>
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年5月26日(星期二) 上午10:09
> 收件人:"user-zh"
> 主题:Re: Flink SQL 嵌套 nested Json 解析
>
>
>
> 嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。
>
> claylin <1012539...@qq.com 于2020年5月26日周二 上午10:07写道:
>
>  这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗
> 
> 
>  create table my_source (
>  nbsp; database varchar,
>  nbsp; maxwell_ts bigint,
>  nbsp; table varchar,
>  nbsp; data row<
>  nbsp;nbsp;nbsp; transaction_sn varchar,
>  nbsp;nbsp;nbsp; parent_id int,
>  nbsp;nbsp;nbsp; user_id int,
>  nbsp;nbsp;nbsp; amount int,
>  nbsp;nbsp;nbsp; reference_id varchar,
>  nbsp;nbsp;nbsp; status int,
>  nbsp;nbsp;nbsp; transaction_type int,
>  nbsp;nbsp;nbsp; merchant_id int,
>  nbsp;nbsp;nbsp; update_time int,
>  nbsp;nbsp;nbsp; create_time int
>  nbsp; nbsp; ts AS CAST(FROM_UNIXTIME(create_time) AS
>  TIMESTAMP(3)),nbsp; // 定义事件时间
>  nbsp; nbsp; WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE
>  nbsp; gt;
>  ) with (
>  nbsp;nbsp;nbsp; ...
>  )
> 
> 
>  这样可以行吗
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Benchao Li"  发送时间:nbsp;2020年5月26日(星期二) 上午9:55
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: Flink SQL 嵌套 nested Json 解析
> 
> 
> 
>  Hi,
> 
>  你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子:
>  create table my_source (
>  nbsp; database varchar,
>  nbsp; maxwell_ts bigint,
>  nbsp; table varchar,
>  nbsp; data row<
>  nbsp;nbsp;nbsp; transaction_sn varchar,
>  nbsp;nbsp;nbsp; parent_id int,
>  nbsp;nbsp;nbsp; user_id int,
>  nbsp;nbsp;nbsp; amount int,
>  nbsp;nbsp;nbsp; reference_id varchar,
>  nbsp;nbsp;nbsp; status int,
>  nbsp;nbsp;nbsp; transaction_type int,
>  nbsp;nbsp;nbsp; merchant_id int,
>  nbsp;nbsp;nbsp; update_time int,
>  nbsp;nbsp;nbsp; create_time int
>  nbsp; gt;
>  ) with (
>  nbsp;nbsp;nbsp; ...
>  )
> 
>  macia kk  
>  gt; Flink version: 1.10
>  gt;
>  gt; Json:
>  gt;
>  gt; {
>  gt;nbsp;nbsp;nbsp;nbsp; "database":"main_db",
>  gt;nbsp;nbsp;nbsp;nbsp;
> "maxwell_ts":1590416550358000,
>  gt;nbsp;nbsp;nbsp;nbsp;
> "table":"transaction_tab",
>  gt;nbsp;nbsp;nbsp;nbsp; "data":{
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "transaction_sn":"",
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "parent_id":0,
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "user_id":333,
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "amount":555,
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "reference_id":"666",
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "status":3,
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "transaction_type":3,
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "merchant_id":2,
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "update_time":1590416550,
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "create_time":1590416550
>  gt;nbsp;nbsp;nbsp;nbsp; }}
>  gt;
>  gt;
>  gt; 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
>  gt;
>  gt;
>  gt; macia kk   gt;
>  gt; gt; Flink version: 1.10
>  gt; gt;
>  gt; gt; Json:
>  gt; gt; ```j
>  gt; gt; {
>  gt; gt;nbsp;nbsp;nbsp;nbsp;
> "database":"main_db",
>  gt; gt;nbsp;nbsp;nbsp;nbsp;
> "maxwell_ts":1590416550358000,
>  gt; gt;nbsp;nbsp;nbsp;nbsp;
> "table":"transaction_tab",
>  gt; gt;nbsp;nbsp;nbsp;nbsp; "data":{
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "transaction_sn":"",
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "parent_id":0,
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "user_id":333,
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "amount":555,
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "reference_id":"666",
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "status":3,
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "transaction_type":3,
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "merchant_id":2,
>  gt;
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "update_time&quo

Re: 使用滚动窗口的 Flink SQL State 一直增加

2020-05-26 文章 LakeShen
Hi,

看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by

Best,
LakeShen

Benchao Li  于2020年5月26日周二 下午6:50写道:

> Hi,
>
> 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下:
> 1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的
> 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?
>
> 瓜牛  于2020年5月26日周二 下午6:07写道:
>
> > hi,大家好!
> >
> > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加
> >
> > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和
> > role_id 的去重数
> >
> > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state
> > 大小却一直是增加的,是 SQL 写得有问题吗?
> >
> > 麻烦大家帮我看一下
> >
> > 谢谢!
> >
> > 
> >
> > CREATE TABLE source_kafka (
> >   dtime string,
> >   wm as cast(dtime as TIMESTAMP(3)),
> >   server string,
> >   reason string,
> >   role_id string,
> >   WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = '0.11',
> >   'connector.topic' = 'xxx',
> >   'connector.properties.bootstrap.servers' = 'xxx',
> >   'connector.properties.zookeeper.connect' = 'xxx',
> >   'connector.properties.group.id' = 'xxx',
> >   'format.type' = 'json',
> > )
> > -
> >
> > CREATE TABLE sink_kafka (
> >   window_time string,
> >   server string,
> >   reason string,
> >   role_id_distinct_cnt BIGINT,
> >   log_cnt BIGINT
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = '0.11',
> >   'connector.topic' = 'xxx',
> >   'connector.properties.bootstrap.servers' = 'xxx',
> >   'connector.properties.zookeeper.connect' = 'xxx',
> >   'format.type' = 'json'
> > )
> > -
> >
> > INSERT INTO sink_kafka
> > SELECT
> >  DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd
> HH:mm:ss')
> > AS window_time,
> >  server,
> >  reason,
> >  COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
> >  COUNT(1) AS log_cnt
> > FROM source_kafka
> > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大

2020-05-17 文章 LakeShen
Hi,

你可以描述一下你的 Flink 版本,具体空闲状态保留时间的含义,请参考一下[1]:

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time

Best,
LakeShen



claylin <1012539...@qq.com> 于2020年5月17日周日 下午10:24写道:

> 过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M
>
>
> --原始邮件--
> 发件人:"刘大龙" 发送时间:2020年5月17日(星期天) 晚上10:14
> 收件人:"user-zh"
> 主题:Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
>
>
>
> Hi,
>  你的状态过期时间设置的是多久?对于普通的group by
> agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大
>
>
>  -原始邮件-
>  发件人: claylin <1012539...@qq.com
>  发送时间: 2020-05-17 17:41:13 (星期日)
>  收件人: user-zh   抄送:
>  主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
>  链接这里nbsp;
> https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4
> 
> <https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4>;
>
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"tison"  发送时间:nbsp;2020年5月17日(星期天) 下午5:34
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
> 
> 
> 
>  考虑把 SQL 贴成 gist 链接?
> 
>  Best,
>  tison.
> 
> 
>  claylin <1012539...@qq.comgt; 于2020年5月17日周日 下午5:32写道:
> 
>  gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle
>  gt;
> time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE
>  gt; TABLE yy_yapmnetwork_original
> (nbsp;nbsp;nbsp;nbsp; happenAt
> BIGINT,nbsp;nbsp;nbsp;nbsp; uid BIGINT,
>  gt;nbsp; appId
> STRING,nbsp;nbsp;nbsp;nbsp; deviceId
> STRING,nbsp;nbsp;nbsp;nbsp; appVer
> STRING,nbsp;nbsp;nbsp;nbsp; dnsDur BIGINT,
>  gt;nbsp;nbsp;nbsp; useGlb
> INT,nbsp;nbsp;nbsp;nbsp; hitCache
> INT,nbsp;nbsp;nbsp;nbsp; requestSize
> DOUBLE,nbsp;nbsp;nbsp;nbsp; responseSize
>  gt; DOUBLE,nbsp;nbsp;nbsp;nbsp; totalDur
> BIGINT,nbsp;nbsp;nbsp;nbsp; url
> STRING,nbsp;nbsp;nbsp;nbsp; statusCode INT,
>  gt;nbsp; prototype
> STRING,nbsp;nbsp;nbsp;nbsp; netType
> STRING,nbsp;nbsp;nbsp;nbsp; traceId
> STRING,nbsp;nbsp;nbsp;nbsp; ts AS
>  gt; CAST(FROM_UNIXTIME(happenAt/1000) AS
> TIMESTAMP(3)),nbsp;nbsp;nbsp;nbsp; WATERMARK FOR ts AS
>  gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka',
>  gt; 'connector.version' = 'universal', 'connector.topic' =
> 'yapm_metrics',
>  gt; 'connector.properties.zookeeper.connect' = 'localhost:2181',
>  gt; 'connector.properties.bootstrap.servers' = '
> kafkawx007-core001.yy.com:8101
>  gt; ,kafkawx007-core002.yy.com:8101,
> kafkawx007-core003.yy.com:8101', '
>  gt; connector.properties.group.id' =
> 'interface_success_rate_consumer',
>  gt; 'connector.startup-mode' = 'latest-offset', 'format.type' =
> 'json' );
>  gt; create table request_latency_tbl
> (nbsp;nbsp;nbsp;nbsp; app_id
> string,nbsp;nbsp;nbsp;nbsp; app_ver string,
>  gt;nbsp;nbsp;nbsp; net_type
> string,nbsp;nbsp;nbsp;nbsp; prototype
> string,nbsp;nbsp;nbsp;nbsp; url
> string,nbsp;nbsp;nbsp;nbsp; status_code
>  gt; int,nbsp;nbsp;nbsp;nbsp; w_start
> string,nbsp;nbsp;nbsp;nbsp; success_cnt
> BIGINT,nbsp;nbsp;nbsp;nbsp; failure_cnt BIGINT,
>  gt;nbsp; total_cnt BIGINT ) with( 'connector.type' =
> 'jdbc', 'connector.url' =
>  gt;
> 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=trueamp;amp;characterEncoding=utf-8amp;amp;zeroDateTimeBehavior=convertToNullamp;amp;autoReconnect=true',
>  gt; 'connector.table' = 'request_latency_statistics',
> 'connector.username' =
>  gt; 'yapm_metrics', 'connector.password' = '1234456',
>  gt; 'connector.write.flush.max-rows' = '1000',
> 'connector.write.flush.interval'
>  gt; = '5s', 'connector.write.max-retries' = '2' ); create view
>  gt; request_1minutes_latencynbsp;
> asnbsp;nbsp;nbsp;nbsp; select appId, appVer, netType,
> prototype,
>  gt; url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start,
>  gt;nbsp; count(distinct traceId) filter (where statusCode
> in (200)) as successCnt,
>  gt;nbsp;nbsp;nbsp; count(distinct traceId) filter
> (where statusCode not in (200)) as
>  gt; failureCnt,nbsp;nbsp;nbsp;nbsp;
> count(distinct traceId) as
> total_cntnbsp;nbsp;nbsp;nbsp; from
>  gt; yy_yapmnetwork_original group by appId, appVer, netType,
> prototype, url,
>  gt; statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into
>  gt; request_latency_tblnbsp;nbsp;nbsp;nbsp;
> select * fromnbsp; request_1minutes_latency;
>
>
> --
> 刘大龙
>
> 浙江大学 控制系 智能系统与控制研究所 工控新楼217
> 地址:浙江省杭州市浙大路38号浙江大学玉泉校区
> Tel:18867547281


Re: save point容灾方案咨询

2020-05-14 文章 LakeShen
Hi ,

你可以把你的场景在描述的详细一些。

Best,
LakeShen

请叫我雷锋 <854194...@qq.com> 于2020年5月14日周四 下午9:42写道:

> 各位大佬好,请问有啥好的save point容灾方案嘛?
>
>
>
> 发自我的iPhone


可撤回流是否可以进行双流 Join?

2020-05-08 文章 LakeShen
Hi 各位,

最近有业务需求,需要用到双流 Join (Interval Join),如果两个流中,其中一个流式可撤回流(Retract),或者两个流都是
Retract 流,那么他们还能进行双流 Join 吗?

目前我知道如果两个流是 Append 流的话,肯定可以双流 Join,但是其中一个流式 Retract 流,就不知道还能不能 Join 了。

期望你的回复。

Best,
LakeShen


Re: flink-1.10 on yarn日志输出问题

2020-05-08 文章 LakeShen
Yarn 日志的话,直接根据 任务的 Application ID ,去 Yarn 的 Web UI 上面看吧。

Best,
LakeShen

guaishushu1...@163.com  于2020年5月8日周五 下午3:43写道:

> 日志全部输出到.err日志里面了,flink-web也看不到日志有人知道吗?
>
> --
> guaishushu1...@163.com
>


Re: flink on kubernetes 作业卡主现象咨询

2020-05-07 文章 LakeShen
Hi ,

你可以看下你的内存配置情况,看看是不是内存配置太小,导致 networkd bufffers 不够。

具体文档参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html

Best,
LakeShen

a511955993  于2020年5月7日周四 下午9:54写道:

> hi, all
>
>
> 集群信息:
> flink版本是1.10,部署在kubernetes上,kubernetes版本为1.17.4,docker版本为19.03,
> cni使用的是weave。
>
>
> 现象:
> 作业运行的时候,偶发会出现operation卡住,下游收不到数据,水位线无法更新,反压上游,作业在一段时间会被kill掉的情况。
>
>
> 通过jstack出来的堆栈信息片段如下:
>
>
> "Map (152/200)" #155 prio=5 os_prio=0 tid=0x7f67a4076800 nid=0x31f
> waiting on condition [0x7f66b04ed000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x000608f3c600> (a
> java.util.concurrent.CompletableFuture$Signaller)
> at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
> at org.apache.flink.runtime.io
> .network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
> at org.apache.flink.runtime.io
> .network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
> at org.apache.flink.runtime.io
> .network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at org.apache.flink.runtime.io
> .network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
> at org.apache.flink.runtime.io
> .network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at org.apache.flink.runtime.io
> .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>
>
>
>
> 有怀疑过是虚拟化网络问题,增加了如下参数,不见效:
> taskmanager.network.request-backoff.max: 30
> akka.ask.timeout: 120s
> akka.watch.heartbeat.interval: 10s
>
>
> 尝试过调整buffer数量,不见效:
> taskmanager.network.memory.floating-buffers-per-gate: 16
> taskmanager.network.memory.buffers-per-channel: 6
>
>
>
>
> 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,非常感谢。
>
> Looking forward to your reply and help.
>
> Best
>
>
>
>


What is the RocksDB local directory in flink checkpointing?

2020-05-05 文章 LakeShen
Hi community,

Now I have a question about flink checkpoint local directory , our flink
version is 1.6, job mode is

flink on yarn per job . I saw the flink source code , and I find the flink
checkpoint local directory is

/tmp when you didn't config the "state.backend.rocksdb.localdir". But I go
into the /tmp dir ,I

couldn't find the flink checkpoint state local directory.

What is the RocksDB local directory in flink checkpointing?  I am looking
forward to your reply.

Best,
LakeShen


Re: Committing offsets to Kafka takes longer than the checkpoint interval.

2020-04-28 文章 LakeShen
Hi 首维,

你用的 Flink 版本是多少呢,然后你的 Checkpoint interval 设置的时间是多少,这两个信息提供一下。

Best,
LakeShen



刘首维  于2020年4月28日周二 下午6:28写道:

> Hi all,
>
>
>
> 今天发现有一个作业日志中连续打印下面这个报警
>
> "Committing offsets to Kafka takes longer than the checkpoint interval.
> Skipping commit of previous offsets because newer complete checkpoint
> offsets are available. This does not compromise Flink's checkpoint
> integrity."
>
>
> 导致作业卡住无法继续消费Kafka topic
>
>
> 请问这个场景如何排查比较好
>


Re: flink背压问题

2020-04-28 文章 LakeShen
Hi 阿华,

数据延迟有可能是逻辑中某个环节比较耗时,比如查询 mysql,或者某处逻辑较复杂等等。

可以看看自己代码中,有么有比较耗时的逻辑。同时可以将自己认为比较耗时的地方,加上日志,看下处理时间。

Best,
LakeShen


阿华田  于2020年4月29日周三 上午9:21写道:

> 好的 感谢大佬
>
>
>
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年04月29日 09:08,zhisheng 写道:
> hi,
>
> 数据延迟不一定会产生背压,举个例子,Flink 写 HBase 的作业,Source 并行度为 5,Sink 并行度
> 10,这种情况下游写入速度很快的,可能写入速度超过 Flink 消费 Kafka 的速度,这种情况就不会出现背压的问题。
>
> 1、建议排查一下作业的并行度(可以设置和 Kafka 分区数一样);
>
> 2、背压监控是通过 Flink Web UI 监控查看的,还是通过指标来判断的?
>
> 3、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况
>
> Best !
>
> zhisheng
>
> 阿华田  于2020年4月28日周二 上午9:37写道:
>
> 线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?
>
>
> | |
> 王志华
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>


Re: Flink on k8s ,设置 taskmanager.heap.mb 对于 jvm 启动堆大小不生效

2020-04-24 文章 LakeShen
好的 Xintong,我测试一波,非常感谢你的帮助 

Xintong Song  于2020年4月24日周五 上午11:48写道:

> 抱歉,我刚刚说的是 docker-compose.yaml 是只用 docker 不用 kubernetes 的情况。
>
> 对于 kubernetes,如果你是按照官方文档[1]推荐的方法部署 flink 的,那么直接把这个参数加在
> taskmanager-deployment.yaml 的 args 处应该就可以了。
>
> > args:
>
> - taskmanager
>
> *- Dtaskmanager.heap.size=2000m*
>
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/kubernetes.html
>
>
>
> On Fri, Apr 24, 2020 at 11:10 AM LakeShen 
> wrote:
>
> > Hi Xintong,
> >
> > 非常感谢你的回复。想再请教一个问题,什么地方会使用到 docker-compose.yaml  呢。
> >
> > 我目前使用一种绕开的方式解决这个问题,就是在 DockerFile 打镜像的时候,先把 conf 目录拷贝出来(这个里面的 TaskManger
> > 内存动态传入的)
> > 然后在 config.sh 中,强行设置了 FLINK-CONF-DIR . 但是我觉得你的方式更优雅一些。
> >
> > 所以想问一下 什么地方会使用到 docker-compose.yaml呢 。
> >
> > Best,
> > LakeShen
> >
> > Xintong Song  于2020年4月24日周五 上午10:49写道:
> >
> > > 应该没有其他地方去写 flink-conf.yaml,能把具体用来打镜像、动态写配置的命令或者脚本发一下吗?
> > >
> > > 另外你这个问题还有一种解决方案,是 taskmanager.heap.mb 通过 -D 参数传给 taskmanager.sh。可以在
> > > docker-compose.yaml 中 taskmanager command 处追加
> -Dtaskmanager.heap.mb=2000m
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Thu, Apr 23, 2020 at 5:59 PM LakeShen 
> > > wrote:
> > >
> > > > Hi 社区,
> > > >
> > > > 最近我在弄 Flink on k8s,使用的 Flink 版本为 Flink 1.6。作业模式为 standalone per job
> 模式。
> > > >
> > > > 我在创建启动 jobmanager 的时候,设置的 taskmanager.heap.mb 为 2000 mb,虽然在 flink web
> > ui
> > > > 上面看到的 jobmanager  的配置, taskmanager.heap.mb 的确是 2000mb,在我启动
> taskmanager
> > > > deployment 的时候,我登录到 其中一个 pod 上看,发现 taskmanager 启动的 -xms 和 -xmx 都是
> > 922mb。
> > > >
> > > > 我将 taskmanager.heap.mb 设置为 1000 mb,停止我的作业,重启,同样,登录到taskmanager 其中一个
> > > > pod,-xms 和 -xmx 都是 922mb,也就是说 设置的taskmanager.heap.mb 没有对 taskmanager
> > 启动的
> > > > jvm 堆没有生效。
> > > >
> > > > 我看了源码,flink on k8s ,standalone per job 模式,taskmanager 会使用
> > taskmanager.sh
> > > > 来启动。在 taskmanager.sh 中,taskmanager heap mb 是根据镜像中的,flink dist
> 目录下面,conf
> > > > 目录中的 flink-conf.yaml 里面的配置来启动。
> > > >
> > > > 我现在在打镜像的时候,也会把flink-dist 目录打进去,同样把 taskmanager.heap.mb动态传入到
> > > > flink-conf.yaml中,但是最终我在启动我的作业的时候,登录到 taskmanager 的一个 pod 上面查看,发现其
> > > > flink-conf.yaml 里面, taskmanager.heap.mb 始终是 1024.
> > > >
> > > > 是不是在什么地方,把 taskmanager.heap.mb 写死到了 flink-conf.yaml 中呢?
> > > >
> > > >
> > > > Best,
> > > > LakeShen
> > > >
> > >
> >
>


Re: Flink on k8s ,设置 taskmanager.heap.mb 对于 jvm 启动堆大小不生效

2020-04-23 文章 LakeShen
Hi Xintong,

非常感谢你的回复。想再请教一个问题,什么地方会使用到 docker-compose.yaml  呢。

我目前使用一种绕开的方式解决这个问题,就是在 DockerFile 打镜像的时候,先把 conf 目录拷贝出来(这个里面的 TaskManger
内存动态传入的)
然后在 config.sh 中,强行设置了 FLINK-CONF-DIR . 但是我觉得你的方式更优雅一些。

所以想问一下 什么地方会使用到 docker-compose.yaml呢 。

Best,
LakeShen

Xintong Song  于2020年4月24日周五 上午10:49写道:

> 应该没有其他地方去写 flink-conf.yaml,能把具体用来打镜像、动态写配置的命令或者脚本发一下吗?
>
> 另外你这个问题还有一种解决方案,是 taskmanager.heap.mb 通过 -D 参数传给 taskmanager.sh。可以在
> docker-compose.yaml 中 taskmanager command 处追加 -Dtaskmanager.heap.mb=2000m
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Apr 23, 2020 at 5:59 PM LakeShen 
> wrote:
>
> > Hi 社区,
> >
> > 最近我在弄 Flink on k8s,使用的 Flink 版本为 Flink 1.6。作业模式为 standalone per job 模式。
> >
> > 我在创建启动 jobmanager 的时候,设置的 taskmanager.heap.mb 为 2000 mb,虽然在 flink web ui
> > 上面看到的 jobmanager  的配置, taskmanager.heap.mb 的确是 2000mb,在我启动 taskmanager
> > deployment 的时候,我登录到 其中一个 pod 上看,发现 taskmanager 启动的 -xms 和 -xmx 都是 922mb。
> >
> > 我将 taskmanager.heap.mb 设置为 1000 mb,停止我的作业,重启,同样,登录到taskmanager 其中一个
> > pod,-xms 和 -xmx 都是 922mb,也就是说 设置的taskmanager.heap.mb 没有对 taskmanager 启动的
> > jvm 堆没有生效。
> >
> > 我看了源码,flink on k8s ,standalone per job 模式,taskmanager 会使用 taskmanager.sh
> > 来启动。在 taskmanager.sh 中,taskmanager heap mb 是根据镜像中的,flink dist 目录下面,conf
> > 目录中的 flink-conf.yaml 里面的配置来启动。
> >
> > 我现在在打镜像的时候,也会把flink-dist 目录打进去,同样把 taskmanager.heap.mb动态传入到
> > flink-conf.yaml中,但是最终我在启动我的作业的时候,登录到 taskmanager 的一个 pod 上面查看,发现其
> > flink-conf.yaml 里面, taskmanager.heap.mb 始终是 1024.
> >
> > 是不是在什么地方,把 taskmanager.heap.mb 写死到了 flink-conf.yaml 中呢?
> >
> >
> > Best,
> > LakeShen
> >
>


Flink on k8s ,设置 taskmanager.heap.mb 对于 jvm 启动堆大小不生效

2020-04-23 文章 LakeShen
Hi 社区,

最近我在弄 Flink on k8s,使用的 Flink 版本为 Flink 1.6。作业模式为 standalone per job 模式。

我在创建启动 jobmanager 的时候,设置的 taskmanager.heap.mb 为 2000 mb,虽然在 flink web ui
上面看到的 jobmanager  的配置, taskmanager.heap.mb 的确是 2000mb,在我启动 taskmanager
deployment 的时候,我登录到 其中一个 pod 上看,发现 taskmanager 启动的 -xms 和 -xmx 都是 922mb。

我将 taskmanager.heap.mb 设置为 1000 mb,停止我的作业,重启,同样,登录到taskmanager 其中一个
pod,-xms 和 -xmx 都是 922mb,也就是说 设置的taskmanager.heap.mb 没有对 taskmanager 启动的
jvm 堆没有生效。

我看了源码,flink on k8s ,standalone per job 模式,taskmanager 会使用 taskmanager.sh
来启动。在 taskmanager.sh 中,taskmanager heap mb 是根据镜像中的,flink dist 目录下面,conf
目录中的 flink-conf.yaml 里面的配置来启动。

我现在在打镜像的时候,也会把flink-dist 目录打进去,同样把 taskmanager.heap.mb动态传入到
flink-conf.yaml中,但是最终我在启动我的作业的时候,登录到 taskmanager 的一个 pod 上面查看,发现其
flink-conf.yaml 里面, taskmanager.heap.mb 始终是 1024.

是不是在什么地方,把 taskmanager.heap.mb 写死到了 flink-conf.yaml 中呢?


Best,
LakeShen


Re: 关于状态TTL

2020-04-21 文章 LakeShen
社区版的 Planner 针对 Key 状态的清理,使用的 Timer 来进行清理。
1.9.1 Blink planner 最底层状态清理 还是使用的 StateTTLConfig 来进行清理(不是
Background),所以存在部分状态后面没读,
状态没有清理的情况

Benchao Li  于2020年4月21日周二 下午11:15写道:

> 我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。
>
> 酷酷的浑蛋  于2020年4月21日周二 下午10:37写道:
>
> > hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
> > java.lang.RuntimeException: Error while getting state
> > at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
> > at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:221)
> > at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:205)
> > at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
> > at
> >
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.util.StateMigrationException: The new state
> > serializer cannot be incompatible.
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> > at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> > at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> > at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> > at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> > ... 10 more
> >
> >
> >
> >
> > 在2020年4月17日 15:27,酷酷的浑蛋 写道:
> > 好的,非常感谢您,我去按照您说的代码改下,非常感谢
> >
> >
> >
> >
> > 在2020年4月17日 15:17,Benchao Li 写道:
> > 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。
> >
> > 酷酷的浑蛋  于2020年4月17日周五 下午3:09写道:
> >
> > 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
> > tableConfig.setIdleStateRetentionTime(Time.minutes(1),
> > Time.minutes(6));这种方式设置ttl
> >
> >
> >
> >
> > 在2020年4月17日 14:54,Benchao Li 写道:
> > 嗯,blink planner跟legacy planner是有一些实现上的差异。
> > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
> >
> > static StateTtlConfig createTtlConfig(long retentionTime, boolean
> > stateCleaningEnabled) {
> > if (stateCleaningEnabled) {
> > checkArgument(retentionTime > 0);
> > return StateTtlConfig
> > .newBuilder(Time.milliseconds(retentionTime))
> > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > .cleanupInBackground() // added this line
> > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > // changed this line
> > .build();
> > } else {
> > return StateTtlConfig.DISABLED;
> > }
> > }
> >
> >
> > 酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:
> >
> > 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
> >
> >
> >
> >
> > 在2020年4月17日 14:16,Benchao Li 写道:
> > 这是两个问题,
> >
> > - 状态只访问一次,可能不会清理。
> >
> >
> >
> >
> >
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> > - 状态已经过期了,但是会被使用到。
> > 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-16581
> >
> > 酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:
> >
> > 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
> >
> >
> >
> >
> > 在2020年4月17日 13:07,Benchao Li 写道:
> > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> > 所以这个问题现在是不能完全避免了。
> > 我已经建了一个jira[1]来跟踪和改进这一点。
> >
> > [1] 

Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-15 文章 LakeShen
Hi tao wang,

你可以在你的 flink-conf.yaml 里面配置 Checkpoint 的目录,就像楼上 Yangze 所说

state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/

Best,
LakeShen

Yangze Guo  于2020年4月15日周三 下午2:44写道:

> checkpoint的目录设置key为state.checkpoints.dir
>
> 你可以这样设置
> state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints/
>
>
> Best,
> Yangze Guo
>
> On Wed, Apr 15, 2020 at 1:45 PM tao wang  wrote:
> >
> > 现在有个场景, 集群上上了一些比较重要的任务,为了提高这些任务的性能,为它提供了一个专有的hdfs 集群用来存储checkpoint 。
> >
> > 但是现在不知道怎么配置才能让支持任务打到外部的hdfs集群。
> >
> > 谢谢!!
>


Re: flink反压问题求助

2020-04-12 文章 LakeShen
Hi Junzhong ,

图片没有显示,能否把图片重新上传一下。

Best,
LakeShen

Junzhong Qin  于2020年4月11日周六 上午10:38写道:

> 在跑Flink任务时,遇到了operator反压问题,任务执行图如下,source(读Kafka),
> KeyBy(抽取数据字段供keyBy操作使用),Parser(业务处理逻辑),Sink(写Kafka),除了KeyBy->Parser使用hash(KeyBy操作)链接,其他都使用RESCALE链接。(并发度仅供参考,这个是解决问题后的并发度,最初的并发度为
> 500->1000->3000->500)
> [image: image.png]
> 相关metric
> [image: image.png]
> [image: image.png]
> 为了解决反压问题做的处理:
> 1. 增大Parse并发,KeyByOpe.buffers.outPoolUsage 上升速率有减缓,多次加并发依然没有解决
> 2. 优化Parse逻辑,减少CPU使用,效果不明显
> 3. 将Parse里的一些数据过滤逻辑移到KeyBy operator里面,效果不明显
> 最后猜测可能是KeyBy operator并发大和Parse链接hash操作占用NetWork资源过多导致反压,于是减少KeBy
> operator的并发度,发现解决问题。但是想请教一下这个操作解决这个问题的具体原因。
>
> 谢谢!
>


Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 文章 LakeShen
Hi 苟刚,

Flink 任务中,如果开启 Checkpoint 的话,会在每次Checkpoint
完成后,提交偏移量。如果没有开启的话,就是根据自动提交来提交偏移量,默认是开启的,间隔是 5 s.
至于你说每次都是重头开始的,我个人的想法是不是在代码中设置了从最早开始消费,也就是 你使用到了这个方法:setStartFromEarliest[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

Best,
LakeShen

gang.gou  于2020年4月7日周二 下午4:17写道:

> 好的,我试一下,有结果了同步大家,谢谢!
>
> 在 2020/4/7 下午3:52,“Evan” 163@flink.apache.org 代表 chengyanan1...@foxmail.com> 写入:
>
> 之前的代码好像乱码了,我设置了一下,重新发一下,建议你
> 在获取consumer之后,再设置一下consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
> Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6
>
>
>
> /**
>   * @param env
>   * @param topic
>   * @param time 订阅的时间
>   * @return
>   * @throws IllegalAccessException
>   */
>   public static DataStreamSource buildSource(StreamExecutionEnvironment env, String topic, Long time) throws
> IllegalAccessException {
> ParameterTool parameterTool =
> (ParameterTool) env.getConfig().getGlobalJobParameters();
> Properties props =
> buildKafkaProps(parameterTool);
>
> FlinkKafkaConsumer011 FlinkKafkaConsumer011<(
> topic,
> new
> MetricSchema(),
> props);
>
>
>consumer.setStartFromLatest();
>
>
> consumer.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor   @Override
>   public long
> extractAscendingTimestamp(XlogStreamBasicBean element) {
> if (element ==
> null || element.getTimestamp() == null) {
>  
> return System.currentTimeMillis();
> }
> return
> element.getTimestamp() - 1;
>   }
> });
> return env.addSource(consumer);
>   }
>
>
> }
>
>
>
>
>
> --原始邮件--
> 发件人:"苟刚" 发送时间:2020年4月7日(星期二) 中午11:27
> 收件人:"user-zh"
> 主题:fink新增计算逻辑时kafka从头开始追平消费记录
>
>
>
> Hello,
>
>  
> 我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>  我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>
>
> flink版本:1.6.3
>
> 部分代码如下:
>
> public static void main(String[] args) throws Exception {
> final ParameterTool parameterTool =
> ExecutionEnvUtil.createParameterTool(args);
>  StreamExecutionEnvironment env =
> ExecutionEnvUtil.prepare(parameterTool);
>
>  DataStreamSource KafkaTools.buildSource(env);
> // 处理timing数据
> processTimingData(parameterTool, data);
> // 处理front error数据
> processFrontErrorData(parameterTool, data);
> // 处理img error数据
> processImgLoadErrorData(parameterTool, data);
>  env.execute("xlog compute");
> }
>
>
>
>
> kafka的连接参数配置:
> public static Properties buildKafkaProps(ParameterTool parameterTool) {
>  Properties props = parameterTool.getProperties();
>  props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
> DEFAULT_KAFKA_BROKERS));
>  props.put("zookeeper.connect",
> parameterTool.get(KAFKA_ZOOKEEPER_CONNECT,
> DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>  props.put("group.id", parameterTool.get(KAFKA_GROUP_ID,
> DEFAULT_KAFKA_GROUP_ID));
>  props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>  props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>  props.put("auto.offset.reset", "latest");
> return props;
> }
>
>
>
>
>
>
>
> --
>
> Best Wishes
>  Galen.K
>
>
>


Re: Flink 读取 Kafka 多个 Partition 问题,

2020-04-02 文章 LakeShen
Hi Qijun,

看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。

Best,
LakeShen

Qijun Feng  于2020年4月2日周四 下午5:44写道:

> Dear All,
>
> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>  现在改成了所有地址,也换了 group.id
>
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> 10.216.77.170:9092,10.216.77.188:9092");
> properties.setProperty("group.id", "behavior-logs-aggregator");
>
> FlinkKafkaConsumer010 kafkaConsumer010 =
>new FlinkKafkaConsumer010("behavior-logs_dev", new
> BehaviorLogDeserializationSchema(), properties);
> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>
> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2
> 的,
>
> 2020-04-02 14:54:58,532 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> Consumer subtask 0 creating fetcher with offsets
> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>
>
> 是哪里有问题吗?
>
>


Re: 从savepoint不能恢复问题

2020-04-02 文章 LakeShen
Hi ,

这种情况可能是你改变的 Flink SQL 的拓扑结构,导致部分算子的 uid 发生变化,然后在从状态恢复的时候,没有找到算子的状态。
所以在开发 SQL 任务的时候,一般更改 SQL 代码时,不要改变其拓扑结构,SQL 任务上线后,就不要在轻意改了。

Best,
LakeShen

酷酷的浑蛋  于2020年4月2日周四 下午6:31写道:

> 关键我的程序是flink-sql,其它的算子基本都设置过uid了,flink-sql可以设置uid吗,或者说sql中的自动分配的uid怎么查找呢
>
>
> | |
> apache22
> |
> |
> apach...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年4月2日 18:22,Yangze Guo 写道:
> 如果没有显示指定的话,operator id将是一个随机生成的值[1].
> 当从savepoint恢复时,将依据这些id来匹配,如果发生了变化,可能是你修改了你的jobGraph。Flink推荐显示的指定operator
> id字面量。[2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#faq
>
> Best,
> Yangze Guo
>
> On Thu, Apr 2, 2020 at 6:01 PM 酷酷的浑蛋  wrote:
>
>
>
> Failed to rollback to checkpoint/savepoint
> hdfs://xxx/savepoint-9d5b7a-66c0340f6672. Cannot map checkpoint/savepoint
> state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program,
> because the operator is not available in the new program. If you want to
> allow to skip this, you can set the --allowNonRestoredState option on the
> CLI.
>
>
> 像上面这个错误,我怎么根据 operator cbc357ccb763df2852fee8c4fc7d55f2 去找是我程序中的哪个算子不能恢复呢?
> | |
> apache22
> |
> |
> apach...@163.com
> |
> 签名由网易邮箱大师定制
>


Question about the flink 1.6 memory config

2020-03-31 文章 LakeShen
Hi community,

Now I am optimizing the flink 1.6 task memory configuration. I see the
source code, at first, the flink task config the cut-off memory, cut-off
memory = Math.max(600,containerized.heap-cutoff-ratio  * TaskManager
Memory), containerized.heap-cutoff-ratio default value is 0.25. For
example, if TaskManager Memory is 4G, cut-off memory is 1 G.

However, I set the taskmanager's gc.log, I find the  metaspace only used 60
MB. I personally feel that the memory configuration of cut-off is a little
too large. Can this cut-off memory configuration be reduced, like making
the containerized.heap-cutoff-ratio be 0.15.
Is there any problem for this config?

I am looking forward to your reply.

Best wishes,
LakeShen


Re: Flink 1.10: 关于 RocksDBStateBackend In background 状态清理机制的问题

2020-03-29 文章 LakeShen
嗯嗯,非常感谢你的回答,Congxian Qiu  。

Congxian Qiu  于2020年3月28日周六 上午11:39写道:

> Hi
>
> 这个地方我理解是,每次处理一定数量的 StateEntry 之后,会获取当前的 timestamp 然后在 RocksDB 的 compaction
> 时对所有的 StateEntry 进行 filter。
> > Calling of TTL filter during compaction slows it down.
>
> Best,
> Congxian
>
>
> LakeShen  于2020年3月26日周四 下午8:55写道:
>
> > Hi 社区的小伙伴,
> >
> > 我现在有个关于 Flink 1.10 RocksDBStateBackend 状态清理机制的问题,在 1.10中,RocksDB 默认使用 in
> > background 方式进行状态清理,使用 compaction filter 方式。正如官方文档所说:
> >
> > > RocksDB compaction filter will query current timestamp, used to check
> > > expiration, from Flink every time after processing certain number of
> > state
> > > entries.
> >
> >
> > 现在有个疑问,RocksDB 在处理一定数量的 State Entrys 就会进行 compaction filter,那么这个
> compaction
> > filter 是针对这一定数量 State Entrys ,然后检查他们是否过期吗?
> > 还是说,会针对一个 Task 当前所有的状态文件,统一进行 Compaction filter,在合并时,检查每个 entry,过期的状态 Key
> > 就过滤删除掉。
> >
> > 这个地方我没有弄明白,非常期待你的回复。
> >
> > Best wishes,
> > 沈磊
> >
>


Re: Re: flinksql如何控制结果输出的频率

2020-03-29 文章 LakeShen
哈哈,学习了,Benchao,

Benchao Li  于2020年3月28日周六 下午11:26写道:

> Hi,
>
> 这个输出是retract的是by design的,你可以自己改造下sink,来输出你想要的结果。
> fast
> emit是按照处理时间来提前输出的。比如某个key下面来了第一条数据之后,开始设置一个固定周期的定时,如果下个周期聚合结果有发生变化,则输出。
>
> flink小猪 <18579099...@163.com> 于2020年3月28日周六 下午8:25写道:
>
> >
> >
> >
> > 感谢您的回复,我试了一下,的确通过您说的这种方式,可以得到一个retract流的数据。换一个场景
> > 我需要每小时计算当天的交易额(例如两点到了,我应该输出从0点到2点的总交易额)我想获得如下结果:
> > 2020-03-28T01:00 100
> > 2020-03-28T02:00 280
> > 
> > 2020-03-28T23:00 18000
> > 2020-03-28T00:00 19520
> > 2020-03-29T01:00 120
> > 2020-03-29T01:00 230
> > 我应该获得是一个不断append的数据流,而不是retract数据流。
> > 并且设置提前发射的事件,flink应该是选取的处理时间而不是事件时间?
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-03-27 15:23:39,"Benchao Li"  写道:
> > >Hi,
> > >
> > >对于第二个场景,可以尝试一下fast emit:
> > >table.exec.emit.early-fire.enabled = true
> > >table.exec.emit.early-fire.delay = 5min
> > >
> > >PS:
> > >1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> > >2. window加了emit之后,会由原来输出append结果变成输出retract结果
> > >
> > >Jingsong Li  于2020年3月27日周五 下午2:51写道:
> > >
> > >> Hi,
> > >>
> > >> For #1:
> > >> 创建级联的两级window:
> > >> - 1分钟窗口
> > >> - 5分钟窗口,计算只是保存数据,发送明细数据结果
> > >>
> > >> Best,
> > >> Jingsong Lee
> > >>
> > >
> > >
> > >--
> > >
> > >Benchao Li
> > >School of Electronics Engineering and Computer Science, Peking
> University
> > >Tel:+86-15650713730
> > >Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 LakeShen
统一对 Flink 项目源码进行编译打包,你会在 flink-dist 这个模块下面的 target 目录下面看到相关 Flink
命令行的一些东西,同时在lib 包下面,
会有一些 Flink Jar 包

Best wishes,
沈磊

godfrey he  于2020年3月26日周四 下午8:51写道:

> 目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala
> 2.11 的包,例如  flink-table-blink_*2.11*-1.10.0.jar。
> 可以通过  -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12*
> -1.10.0.jar  这样的。
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年3月26日周四
> 下午6:34写道:
>
> >
> > flink-table-uber-blink 下
> >  mvn clean install -DskipTests -Dscala-2.12 -DskipTests
> >
> > 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> > Sender: Kurt Young
> > Send Time: 2020-03-26 18:15
> > Receiver: user-zh
> > cc: jihongchao
> > Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
> > flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
> > wangl...@geekplus.com.cn> wrote:
> >
> > >
> > > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> > > 这个 jar 是从哪里 build 出来的呢?
> > >
> > > 我 clone github 上的源代码,mvn clean package
> > > 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> > > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
> > >  flink-table-blink_2.12-1.10.0.jar  是对应的
> > > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
> > >
> > > 谢谢,
> > > 王磊
> > >
> > >
> > >
> > > wangl...@geekplus.com.cn
> > >
> > >
> >
>


Flink 1.10: 关于 RocksDBStateBackend In background 状态清理机制的问题

2020-03-26 文章 LakeShen
Hi 社区的小伙伴,

我现在有个关于 Flink 1.10 RocksDBStateBackend 状态清理机制的问题,在 1.10中,RocksDB 默认使用 in
background 方式进行状态清理,使用 compaction filter 方式。正如官方文档所说:

> RocksDB compaction filter will query current timestamp, used to check
> expiration, from Flink every time after processing certain number of state
> entries.


现在有个疑问,RocksDB 在处理一定数量的 State Entrys 就会进行 compaction filter,那么这个 compaction
filter 是针对这一定数量 State Entrys ,然后检查他们是否过期吗?
还是说,会针对一个 Task 当前所有的状态文件,统一进行 Compaction filter,在合并时,检查每个 entry,过期的状态 Key
就过滤删除掉。

这个地方我没有弄明白,非常期待你的回复。

Best wishes,
沈磊


Re: Flink1.10执行sql超出内存限制被yarn杀掉

2020-03-23 文章 LakeShen
Hi farron ,

能否在详细描述一下你的 SQL 的逻辑



faaron zheng  于2020年3月23日周一 下午10:12写道:

>
> 大家好,我在用flink1.10执行sql时,当数据比较大的时候,3T左右,100多亿条数据,在执行hash和sort的时候经常超出内存限制,被yarn杀掉,我的tm给了40g内存,每个有10个slot,每个slot3g内存。我也试过给更大的内存,但是没什么效果。不知道这是什么原因?
>
>
>
>


Re: Re: Flink SQL中关于chk删除和保留的疑问

2020-03-20 文章 LakeShen
Hi amenhub,

是的,yarn kill 我个人不建议。当真的出现这种假死的情况,任务每次cancel的时候,cancel 执行完 , 可以在判断一下 Yarn
上任务的状态,如果没有停止,加个重试机制。

yarn kill 是终极保底方案。记得在使用 cancel 的 rest api 时,因为这个请求返回
202状态码,表示服务端接收到请求,但并不一定开始执行。这个地方需要注意一下。

由于触发 Savepoint 可能失败,stop with savepoint 或者 cancel with savepoint
有可能失败,这个地方也需要注意一下。

当然,如果你有其他好的方式,也欢迎一起讨论。

Best wishes,
沈磊

amen...@163.com  于2020年3月20日周五 下午4:33写道:

> hi 沈磊,
>
> 感谢耐心及专业的解惑~
>
> 如你所说,yarn kill任务不会自动清理保存在hdfs上的chk,是不是可以理解为根本不建议以yarn
> kill方式停止任务,类似于暴力停止的感觉,如果是这样的话,以stop或cancel方式停止任务时,如果出现在yarn
> ui上监控到的application假死,这种情况下岂不是需要再做一套监控,以监测二者状态是否同步。不过我不太了解在SQL环境中停止任务时,是如何与yarn进行交互反馈的,仍需努力学习,不过感激不尽,谢谢
>
> Best,
> amenhub
>
>
>
> amen...@163.com
>
> 发件人: LakeShen
> 发送时间: 2020-03-20 16:18
> 收件人: user-zh
> 主题: Re: Flink SQL中关于chk删除和保留的疑问
> Hi amenhub,
>
> 我这边针对你这三个问题回答一下:
>
> 1. 官网描述的[when program is cancelled],这个cancelled是指除故障导致停止之外的一切任务停止方式吗?
> 这里的 canceled 主要是针对你使用 cancel 或者 cancel with savepoint 来停止作业,你可以通过命令行或者
> REST API 来调用。
>
> 2.如果是,在flink on yarn模式下,停止任务时采取 yarn application -kill [yarnAppId]
> 形式停止flink任务,chk似乎没有自动删除?
> 是的,如果你直接 yarn kill 你的 Flink 任务,Flink 任务的 Checkpoint 文件不会进行清理,还是会保留在 HDFS
> 上面。
>
> 3. 假如不能删除,是否需要像trigger savepoint那样采用 stop 命令进行优雅的停止,chk才会自动删除?
> 目前 FLIP-34 建议用户如果需要停止作业的时候,触发 Savepoint,可以使用 stop 命令来停止。如果你不需要触发
> Savepoint,也可以使用 cancel. 具体信息可以参考下列文档:
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html#stop
>
> Best wishes,
> 沈磊
>
> amen...@163.com  于2020年3月20日周五 下午3:35写道:
>
> > hi, everyone
> >
> > 官网描述当flink program is cancelled,此时的flink默认会删除chk。
> >
> > 我的疑问是:
> > 1.官网描述的[when program is cancelled],这个cancelled是指除故障导致停止之外的一切任务停止方式吗?
> > 2.如果是,在flink on yarn模式下,停止任务时采取 yarn application -kill [yarnAppId]
> > 形式停止flink任务,chk似乎没有自动删除?
> > 3.假如不能删除,是否需要像trigger savepoint那样采用 stop 命令进行优雅的停止,chk才会自动删除?
> > 注:我的flink程序通过SQL提交,以on yarn模式运行。
> > Best
> > amenhub
> >
> >
> >
> > amen...@163.com
> >
>


Re: Flink SQL中关于chk删除和保留的疑问

2020-03-20 文章 LakeShen
Hi amenhub,

我这边针对你这三个问题回答一下:

1. 官网描述的[when program is cancelled],这个cancelled是指除故障导致停止之外的一切任务停止方式吗?
 这里的 canceled 主要是针对你使用 cancel 或者 cancel with savepoint 来停止作业,你可以通过命令行或者
REST API 来调用。

2.如果是,在flink on yarn模式下,停止任务时采取 yarn application -kill [yarnAppId]
形式停止flink任务,chk似乎没有自动删除?
 是的,如果你直接 yarn kill 你的 Flink 任务,Flink 任务的 Checkpoint 文件不会进行清理,还是会保留在 HDFS
上面。

3. 假如不能删除,是否需要像trigger savepoint那样采用 stop 命令进行优雅的停止,chk才会自动删除?
 目前 FLIP-34 建议用户如果需要停止作业的时候,触发 Savepoint,可以使用 stop 命令来停止。如果你不需要触发
Savepoint,也可以使用 cancel. 具体信息可以参考下列文档:

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html#stop

Best wishes,
沈磊

amen...@163.com  于2020年3月20日周五 下午3:35写道:

> hi, everyone
>
> 官网描述当flink program is cancelled,此时的flink默认会删除chk。
>
> 我的疑问是:
> 1.官网描述的[when program is cancelled],这个cancelled是指除故障导致停止之外的一切任务停止方式吗?
> 2.如果是,在flink on yarn模式下,停止任务时采取 yarn application -kill [yarnAppId]
> 形式停止flink任务,chk似乎没有自动删除?
> 3.假如不能删除,是否需要像trigger savepoint那样采用 stop 命令进行优雅的停止,chk才会自动删除?
> 注:我的flink程序通过SQL提交,以on yarn模式运行。
> Best
> amenhub
>
>
>
> amen...@163.com
>


Re: flink sql 去重算法

2020-03-19 文章 LakeShen
Hi zhisheng,

我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。
比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置),
总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。

对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。
Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用
Compaction Filter 算法来清理。

第二个就是使用增量 Checkpoint 方式吧。

Best wishes,
LakeShen



lucas.wu  于2020年3月20日周五 上午11:50写道:

> 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。
>
>
> 原始邮件
> 发件人:zhishengzhisheng2...@gmail.com
> 收件人:user-zhuser...@flink.apache.org
> 发送时间:2020年3月20日(周五) 11:44
> 主题:Re: flink sql 去重算法
>
>
> hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state
> 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
> 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration ">
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com 于2020年3月20日周五
> 上午9:50写道:  Hi hiliuxg,   count distinct 用的MapVIew来做的去重:
> 在batch场景下,MapView的底层实现就是HashMap;
> 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。
>  hiliuxg 736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all:   请问flink
> sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ?
>  还是简单通过java的set容器去重的呢? --   Benchao Li  School of Electronics
> Engineering and Computer Science, Peking University  Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn


The question about the FLIP-45

2020-03-19 文章 LakeShen
Hi community,

Now I am reading the FLIP-45 Reinforce Job Stop Semantic, I have three
questions about it :
1. What the command to use to stop the Flink task, stop or cancel?

2. If use stop command to stop filnk task , but I see the flink source code
, the stop command we can set the savepoint dir , if we didn't set it , the
default savepoint dir will use . Both the target Savepoint  Dir or default
savepoint dir are null , the flink will throw the exception. But in FLIP-45
, If retained checkpoint is enabled, we should always do a checkpoint when
stopping job. I can't find this code.

Thanks to your reply.

Best regards,
LakeShen


Flink Weekly | 每周社区动态更新 - 2020/03/18

2020-03-17 文章 LakeShen
大家好,本文为 Flink Weekly 的第九期,由沈磊(LakeShen)整理,主要内容包括:近期社区开发进展,邮件问题答疑以及 Flink
中文社区相关技术博客的分享。

社区开发进展
[Table API & SQL] Jingsong Li 发起 FLIP-115 的讨论,主要在 Flink Table 支持 FileSystem
Connector,FLIP-115 主要内容包括:
1. 在 Flink Table 中支持 FileSystem Table Factory,同时支持csv/parquet/orc/json/avro
格式。

2. 支持在流应用或者 Flink On Hive 中数据输出。

更多信息请参考:

[1
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-115-Filesystem-connector-in-Table-td33625.html

[RunTime / Configuration]  Andrey 发起 FLIP-116 统一的 JobManager 的内存配置的讨论,在
FLIP-49中,我们针对 TaskManager进行统一的内存管理和配置,在 Flink 1.10 版本中 release 该功能。为了让
JobManager 的内存模型和配置保持对齐,同时针对用户代码 native non-direct memory 的使用,在FLIP-116
中都进行了详细说明。

更多信息请参考:

[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors

[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP+116%3A+Unified+Memory+Configuration+for+Job+Managers

[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview
[Connectors / HBase] Flavio 发起了 FLIP-117 HBase Catalog的讨论,该 FLIP 主要讨论
HBaseCatalog 的实现。

更多信息参考:

[6]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-117%3A+HBase+catalog

[7]https://issues.apache.org/jira/browse/FLINK-16575
Yu Li 发起了 Releasing Flink 1.10.1 的相关讨论。

[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html

为了让大家了解到 Flink  1.11 的相关特性,Zhijiang 发起了 Flink 1.11
特性的讨论,大家有什么想法或者期待可以在下面邮件中进行相关回复。

[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Features-of-Apache-Flink-1-11-td38724.html#a38793





用户问题
shravan 社区提问:当 k8s 集群突然 down 掉时,如何优雅的通过 stop with savepoint 停止作业,Vijay
进行了解答。

[10]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stop-job-with-savepoint-during-graceful-shutdown-on-a-k8s-cluster-td33626.html
Alexander 使用 Flink 1.10 ,对于 Mesos 容器内存配置方面遇到一些问题,Yangze Guo进行了详细解答。[11]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-10-container-memory-configuration-with-Mesos-td33594.htmlwanglei2
询问了在  Flink SQL 任务中,如何设置状态后端,以及在 SQL Client 中,维表 Join 任务没有运行问题。Jingsong Li
和 Zhenghua Gao 分别进行了详细的回答。

[12]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-set-stateBackEnd-in-flink-sql-program-td33590.html

[13]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/dimention-table-join-not-work-under-sql-client-fink-1-10-0-td33616.html
Yuval 提问了关于 Flink 如何从增量的 Checkpoint 状态回复的一些问题,Andrey 进行了详细的解答。

[14]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Restoring-state-from-an-incremental-RocksDB-checkpoint-td33630.html

Eyal 遇到了一些 Flink On Yarn 方面的日志打印配置问题,社区同学进行了解答。

[15]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Setting-app-Flink-logger-td33537.html
Flavio 在社区提问了关于 Alink 和 Flink ML 的问题,感兴趣的可以看一下。

[16]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Alink-and-Flink-ML-td2.html
LakeShen 询问了关于从 Checkppoint 状态文件恢复,是否能够改变算子的并发的问题。对于 Checkpoint
的状态文件,当任务从其开始恢复时,可以调整算法的并发度,只要不要修改算子最大的并发度就行。

[17]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cancel-the-flink-task-and-restore-from-checkpoint-can-I-change-the-flink-operator-s-parallelism-td33613.html
karl 在社区提问了关于 Flink Session 窗口的状态 TTL 问题。

[18]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Session-Windows-State-TTL-td33349.html



活动/博客文章/其他

SQL 开发任务超 50%,滴滴实时计算的演进和优化

[19]
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247486354=1=02a55f72b950cac757fafd79e10b39a3=fd3b85d0ca4c0cc68c558aff88c4f207dbd55239c931b339f5cc0de6681923a20542f035609b=1=1=_sharetime=1584422047859_shareid=3e9e3225194926b8fbe2f7e547e483de=1#wechat_redirect

Flink 如何支持特征工程、在线学习、在线预测等 AI 场景?

[20]
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247486319=1=d095fd58dd0a447f41b616279de1983e=fd3b852dca4c0c3be3f9a7cc0b750f57ab17bda7079d2da52d54c3f0ea9e84e42a848ed90167=1=1=_sharetime=1584422074948_shareid=3e9e3225194926b8fbe2f7e547e483de=1#wechat_redirect

一行配置作业性能提升53%!Flink SQL 性能之旅

[21]
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247486278=1=9b90e3dff21d7e973b9e40777381d032=fd3b8504ca4c0c121efd09bf3823b4f856529ab2911f09035ddffbd08fcf8d6369b28b46642e=1=1=_sharetime=1584422086156_shareid=3e9e3225194926b8fbe2f7e547e483de=1#wechat_redirect

有赞实时任务优化:Flink Checkpoint 异常解析与应用实践

[22]
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247486257=1=22484a15929de46cf5e8a68aadf50875=fd3b8573ca4c0c650c34deb00c12e5130af471739364a95a81b9091690b09eb8094b49bb79bf=1=1=_sharetime=1584422097105_shareid=3e9e3225194926b8fbe2f7e547e483de=1#wechat_redirect


Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 文章 LakeShen
Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();



> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.


What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen


Re: How to change the flink web-ui jobServer?

2020-03-14 文章 LakeShen
Ok, thanks! Arvid

Arvid Heise  于2020年3月10日周二 下午4:14写道:

> Hi LakeShen,
>
> you can change the port with
>
> conf.setInteger(RestOptions.PORT, 8082);
>
> or if want to be on the safe side specify a range
>
> conf.setString(RestOptions.BIND_PORT, "8081-8099");
>
>
> On Mon, Mar 9, 2020 at 10:47 AM LakeShen 
> wrote:
>
>> Hi community,
>>now I am moving the flink job to k8s,and I plan to use the ingress
>> to show the flink web ui  , the problem is that fink job server aren't
>> correct, so I want to change the flink web-ui jobserver ,I don't find the
>> any method  to change it ,are there some method to do that?
>>Thanks to your reply.
>>
>> Best wishes,
>> LakeShen
>>
>


Re: ddl

2020-03-13 文章 LakeShen
Hi 志华,
 你可以完全自己扩展 Flink SQL DDL 语法的功能,用来支持你们公司自己的实时数据源,或者 Sink
等等,具体实现,请参考楼上 jinhai
的链接

Best wishes,
沈磊

jinhai wang  于2020年3月13日周五 下午7:22写道:

> Page on “User-defined Sources & Sinks”. For flink 1.10:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
> >
>
> Best Regards
>
> jinhai...@gmail.com
>
> > 2020年3月13日 下午7:17,王志华  写道:
> >
> > 目前FLINK中对于DDL这块,它都只能什么类型的技术作为源头表或者SINK 表呢,我也网上也仅仅看到了ddl mysql sink、ddl
> hbase sink等。还有其他类型的支持吗?如果不支持的话,是否flink开放了相关的接口,可以提供对其他类型技术的ddl语法支持呢?比如想做一个
> ddl kudu sink之类的
> >
> >
> > | |
> > 王志华
> > |
> > |
> > a15733178...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
>
>


Re: Flink 1.10 StopWithSavepoint only suit for the sources that implement the StoppableFunction interface?

2020-03-12 文章 LakeShen
Thanks a lot!, tison

tison  于2020年3月12日周四 下午5:56写道:

> The StoppableFunction is gone.
>
> See also https://issues.apache.org/jira/browse/FLINK-11889
>
> Best,
> tison.
>
>
> LakeShen  于2020年3月12日周四 下午5:44写道:
>
>> Hi community,
>> now  I am seeing the FLIP-45 , as I see the stop command only
>> suit for the sources that implement the StoppableFunction interface.
>> So I have a question is that if I use StopWithSavepoint command
>> to stop my flink task , just like './flink stop -p xxx ...', this
>> command only suit for the sources that implement the StoppableFunction
>> interface, is it correct?
>> Thanks to your reply.
>>
>> Best wishes,
>> LakeShen
>>
>


Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-12 文章 LakeShen
Hi community,
  I have a question is that I cancel the flink task and retain the
checkpoint dir, then restore from the checkpoint dir ,can I change the
flink operator's parallelism,in my thoughts, I think I can't change the
flink operator's parallelism,but I am not sure.
 Thanks to your reply.

Best wishes,
LakeShen


Flink 1.10 StopWithSavepoint only suit for the sources that implement the StoppableFunction interface?

2020-03-12 文章 LakeShen
Hi community,
now  I am seeing the FLIP-45 , as I see the stop command only suit
for the sources that implement the StoppableFunction interface.
So I have a question is that if I use StopWithSavepoint command to
stop my flink task , just like './flink stop -p xxx ...', this command only
suit for the sources that implement the StoppableFunction interface, is it
correct?
Thanks to your reply.

Best wishes,
LakeShen


How to change the flink web-ui jobServer?

2020-03-09 文章 LakeShen
Hi community,
   now I am moving the flink job to k8s,and I plan to use the ingress
to show the flink web ui  , the problem is that fink job server aren't
correct, so I want to change the flink web-ui jobserver ,I don't find the
any method  to change it ,are there some method to do that?
   Thanks to your reply.

Best wishes,
LakeShen


Re: Flink Web UI display nothing in k8s when use ingress

2020-03-03 文章 LakeShen
In my thought , I think I should config the correct flink jobserver for
flink task

LakeShen  于2020年3月4日周三 下午2:07写道:

> Hi community,
> now we plan to move all flink tasks to k8s cluster. For one flink
> task , we want to see this flink task web ui . First , we create the k8s
> Service to expose 8081 port of jobmanager, then we use ingress controller
> so that we can see it outside.But the flink web like this :
>
> [image: image.png]
>
> The flink web ui images and other info not display , what can I do to
> display flink web info ?
> Thanks to your replay.
>


Flink Web UI display nothing in k8s when use ingress

2020-03-03 文章 LakeShen
Hi community,
now we plan to move all flink tasks to k8s cluster. For one flink
task , we want to see this flink task web ui . First , we create the k8s
Service to expose 8081 port of jobmanager, then we use ingress controller
so that we can see it outside.But the flink web like this :

[image: image.png]

The flink web ui images and other info not display , what can I do to
display flink web info ?
Thanks to your replay.


Flink 1.6, increment Checkpoint, the shared dir stored the last year checkpoint state

2020-01-19 文章 LakeShen
Hi community,
now I have a flink sql job, and I set the flink sql sate retention
time, there are three dir in flink checkpoint dir :
1. chk -xx dir
2. shared dir
3. taskowned dir

I find the shared dir store the last year checkpoint state,the only reason
I thought is that the latest
checkpoint retain reference of last year checkpoint state file.
Are there any other reason to lead this? Or is it a bug?

Thanks to your replay.

Best wishes,
Lake Shen


Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-19 文章 LakeShen
另外,我的容错恢复是 Flink 自身行为 Checkpoint 的容错恢复,我看 CheckpointCoordinator 有两个方法:
restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢?

LakeShen  于2020年1月19日周日 下午4:30写道:

> 非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
> 1. chk-id 的目录
> 2. shared 目录,其中状态非常大
> 3. taskowned
>
> 我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到
> chk-id目录。
> 如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录,
> 占用的存储不大。下面是相关文件目录的大小:
> 1.3 Mhdfs:xxx/chk-94794
> 1.1 Thdfs:xxx/shared
> 0hdfs:xxx/taskowned
> 如果有什么理解错误,请指出,非常感谢。
>
> 祝好,
> 沈磊
>
> Yun Tang  于2020年1月19日周日 下午4:11写道:
>
>> Hi
>>
>> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
>> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
>> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk-
>> 开头的目录。
>>
>> 祝好
>> 唐云
>>
>> 
>> From: LakeShen 
>> Sent: Sunday, January 19, 2020 15:42
>> To: user-zh@flink.apache.org 
>> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>>
>> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
>>
>> LakeShen  于2020年1月19日周日 下午3:30写道:
>>
>> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
>> > Savepoint 类似,如果不清理,就永久保留。
>> > 非常感谢
>> >
>> >
>> > Yun Tang  于2020年1月19日周日 下午2:06写道:
>> >
>> >> Hi
>> >>
>> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
>> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
>> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
>> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。
>> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
>> >>
>> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>> >>
>> >> 希望这些解释能解答你的困惑
>> >>
>> >> [1]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
>> >> [2]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>> >>
>> >> 祝好
>> >> 唐云
>> >>
>> >>
>> >>
>> >>
>> >> 
>> >> From: LakeShen 
>> >> Sent: Friday, January 17, 2020 16:28
>> >> To: user-zh@flink.apache.org 
>> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>> >>
>> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
>> >> 我看了一下源码,发现当完成的 Checkpoint 数大于
>> state.checkpoints.num-retained的数值时,会对之前的完成的
>> >> Checkpoint 状态做清理。
>> >>
>> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
>> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
>> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
>> >> Checkpoint 超时失败。
>> >>
>> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是
>> state.checkpoints.num-retained又为1,完成的
>> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
>> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>> >>
>> >> 希望有大佬能帮我解惑,非常感谢
>> >>
>> >
>>
>


Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-19 文章 LakeShen
非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
1. chk-id 的目录
2. shared 目录,其中状态非常大
3. taskowned

我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到
chk-id目录。
如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录,
占用的存储不大。下面是相关文件目录的大小:
1.3 Mhdfs:xxx/chk-94794
1.1 Thdfs:xxx/shared
0hdfs:xxx/taskowned
如果有什么理解错误,请指出,非常感谢。

祝好,
沈磊

Yun Tang  于2020年1月19日周日 下午4:11写道:

> Hi
>
> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk-
> 开头的目录。
>
> 祝好
> 唐云
>
> 
> From: LakeShen 
> Sent: Sunday, January 19, 2020 15:42
> To: user-zh@flink.apache.org 
> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>
> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
>
> LakeShen  于2020年1月19日周日 下午3:30写道:
>
> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
> > Savepoint 类似,如果不清理,就永久保留。
> > 非常感谢
> >
> >
> > Yun Tang  于2020年1月19日周日 下午2:06写道:
> >
> >> Hi
> >>
> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。
> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
> >>
> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
> >>
> >> 希望这些解释能解答你的困惑
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
> >> [2]
> >>
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
> >>
> >> 祝好
> >> 唐云
> >>
> >>
> >>
> >>
> >> 
> >> From: LakeShen 
> >> Sent: Friday, January 17, 2020 16:28
> >> To: user-zh@flink.apache.org 
> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
> >>
> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
> >> 我看了一下源码,发现当完成的 Checkpoint 数大于
> state.checkpoints.num-retained的数值时,会对之前的完成的
> >> Checkpoint 状态做清理。
> >>
> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
> >> Checkpoint 超时失败。
> >>
> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是
> state.checkpoints.num-retained又为1,完成的
> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
> >>
> >> 希望有大佬能帮我解惑,非常感谢
> >>
> >
>


Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-18 文章 LakeShen
Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
Savepoint 类似,如果不清理,就永久保留。
非常感谢


Yun Tang  于2020年1月19日周日 下午2:06写道:

> Hi
>
> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
> 因此,加载的checkpoint被赋予了savepoint的property [2]。
> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>
> 希望这些解释能解答你的困惑
>
> [1]
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
> [2]
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>
> 祝好
> 唐云
>
>
>
>
> 
> From: LakeShen 
> Sent: Friday, January 17, 2020 16:28
> To: user-zh@flink.apache.org 
> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>
> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
> 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的
> Checkpoint 状态做清理。
>
> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
> Checkpoint 超时失败。
>
> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的
> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>
> 希望有大佬能帮我解惑,非常感谢
>


Re: 回复: 关于Flink SQL DISTINCT问题

2020-01-18 文章 LakeShen
是否可以使用 空闲状态 Retention Time 来设置

JingsongLee  于2019年9月4日周三 下午6:12写道:

>  一般是按时间(比如天)来group by,state配置了超时过期的时间。
> 基本的去重方式就是靠state(比如RocksDbState)。
>  有mini-batch来减少 对state的访问。
>
> 如果有倾斜,那是解倾斜问题的话题了。
>
> Best,
> Jingsong Lee
>
>
> --
> From:lvwenyuan 
> Send Time:2019年9月4日(星期三) 15:11
> To:user-zh 
> Subject:Re:回复: 关于Flink SQL DISTINCT问题
>
> 对,肯定是按照窗口去重的。我就想问下,窗口去重时,所采用的方式
> 在 2019-09-04 14:38:29,"athlon...@gmail.com"  写道:
> >在窗口内去重吧,不可能无限保留去重数据的
> >
> >
> >
> >athlon...@gmail.com
> >
> >发件人: lvwenyuan
> >发送时间: 2019-09-04 14:28
> >收件人: user-zh
> >主题: 关于Flink SQL DISTINCT问题
> >各位大佬好:
> >   我想问下,关于flink sql的实时去重,就是count(distinct user_id)
> 。就是Flink内部是如何做到实时去重,如果对于数据量比较大的时候实时去重,是否会有性能问题。用的Blink Planner
>


Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-17 文章 LakeShen
大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的
Checkpoint 状态做清理。

当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
Checkpoint 超时失败。

但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的
Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?

希望有大佬能帮我解惑,非常感谢


Frequently checkpoint failure, could make the flink sql state not clear?

2020-01-16 文章 LakeShen
Hi community, now I am using Flink sql , and I set the retention time, As I
all know is that Flink will set the timer for per key to clear their state,
if Flink task always checkpoint failure, are the  key state cleared by
timer?
Thanks to your replay.


  1   2   >