Flink与Yarn的状态一致性问题

2020-11-11 文章 amen...@163.com
hi everyone, 最近在使用Flink-1.11.1 On Yarn Per Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn application仍处于运行状态 疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢 best, amenhub

Re: Flink sql查询NULL值错误

2020-11-11 文章 Danny Chan
是的 Flink SQL 现在还不支持隐式类型,需要手动设置 NULL 的类型 SQL 才能通过编译。 丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午8:52写道: > 感谢大佬!!! > > > 在 2020年11月10日,下午8:22,hailongwang <18868816...@163.com> 写道: > > > > Hi, > > > > > > 需要将 null cast 成某个具体的值,比如: > > if(type=1,2,cast(null as int)) > > > > > > Best, > > Hailong > >

Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-11 文章 Danny Chan
感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 Lei Wang 于2020年11月11日周三 下午2:03写道: > 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > > 比如 > robot1 2020-11-11 12:00:00 msginfo > 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00

Re: 关于filesystem connector的一点疑问

2020-11-11 文章 Jingsong Li
Hi admin, 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作) On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote: > 补充一下不用partition time trigger的原因,partition > time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的 > > > 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道: > > > > Hi ,kandy > >

Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-11 文章 xiexinyuan341
source组件使用的是 kafka connector , 使用的JdbcRowDataLookupFunction 作为维表查询. 报错信息是下面这种: 2020-11-12 00:10:00.153 ERROR JdbcRowDataLookupFunction.java:170 JDBC executeBatch error, retry times = 1 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet

Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
补充一下不用partition time trigger的原因,partition time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的 > 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道: > > Hi ,kandy > 我没有基于partition time 提交分区,我是基于默认的process > time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 > >> 2020年11月12日

Re: Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-11 文章 xiexinyuan341
souce是kafka,使用JdbcRowDataLookupFunction作为维表.异常信息是这样的,看了下日志,这种异常基本上每10多分钟就会有一次. 2020-11-12 01:00:09.028 ERROR JdbcRowDataLookupFunction.java:170 JDBC executeBatch error, retry times = 1 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet

Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
sink.partition-commit.trigger process-timeString Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires

Re: 关于filesystem connector的一点疑问

2020-11-11 文章 admin
Hi ,kandy 我没有基于partition time 提交分区,我是基于默认的process time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区 > 2020年11月12日 下午12:46,kandy.wang 写道: > > hi: > 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit > delay 时机触发分区提交,得看你的sink.partition-commit.delay

Re: Re:1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 Xintong Song
Flink on Yarn 废除了 `-n` 参数后,不再支持指定固定数量的 TM。Flink 会根据作业的并行度,按需向 Yarn 申请资源。所以你说的没错,session 模式下提交新的 job 时 flink 会向 yarn 申请更多的资源。 如果想要限制 session 使用的总资源、可以接受资源不足时后提交的 job 可能无法运行需要等待的话,可以配置 `slotmanager.number-of-slots.max`(默认是 Integer.MAX_VALUE)来限制总的 slot 数量。 如果不想 job 运行结束后 tm 很快被释放、下次提交作业又需要等待 tm

Re: slot数量与并行度的大小关系

2020-11-11 文章 Xintong Song
你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的? 流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3] 中的几种方式更改作业的并行度。另外,也可以通过配置 `taskmanager.numberOfTaskSlots` 来增加 flink 集群的 slot 数量。 Thank you~ Xintong Song On Wed, Nov 11, 2020 at 7:54 PM Shawn Huang wrote: > Hi, > > Flink

Re:关于filesystem connector的一点疑问

2020-11-11 文章 kandy.wang
hi: 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay 设置的多久,如果超过之后,应当默认是会丢弃的吧。 https://cloud.tencent.com/developer/article/1707182 这个连接可以看一下 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道: >Hi,all >Flink

关于filesystem connector的一点疑问

2020-11-11 文章 admin
Hi,all Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 现在有这样的场景: 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? 有大佬知道吗,有实际验证过吗 感谢 附上简单sql: CREATE TABLE kafka ( a STRING, b STRING, c BIGINT,

Re:flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-11 文章 hailongwang
Hi xiexinyuan341, 我理解这边有 2 个问题: 1. “偶尔会出现连接超时”,这个的话有具体的堆栈吗。如果是因为长时间没有数据的查询导致 connection invalid 话,这个在1.12,1.11.3 中应该是解决了[1]. 2. 你的 source 是什么组件呢?程序抛异常的话,自动重启或者手动重启话,如果是 “最少一次” 语义的话,应该还是会 join 上 sink 到下游的;或者可以开启 checkpoint,保证 flink 内部的 “精确一次”。 [1]

flink 1.11.1 使用mysql作为维表进行temporal join时,偶尔会报超时异常导致数据更新失败,请教解决方案

2020-11-11 文章 xiexinyuan341
在flink 1.11.1 使用mysql作为维表进行temporal join时,大部分时间都能正常join,偶尔会出现mysql连接超时的情况,此时程序直接抛出异常,这条数据就不能被正确的更新到sink 表里面,请问对于这个情况有解决方案吗?

Re:Re:1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 kingdomad
flink on yarn使用第一种方式yarn session,先创建一个yarn session,然后再提交job到这个session中。 您的意思是这个session所申请的资源会根据我后续提交的job的并发度去动态地无限地扩展? 如果我提交了一个并发度为10的job a到这个session,那这个session申请10个slot对应的资源, 我再提交一个并发度为10的job b到这个session,这个session再申请10个slot对应的资源? job需要多少并发度,session通通都向yarn去申请吗? -- kingdomad

Re:Re: Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 hailongwang
会多一个 outputConversion 类型转换算子 如果是 DataStream 转 Table API,会多一个 inputConversion 类型转换算子 在 2020-11-11 20:25:31,"Luna Wong" 写道: >Table API 转 DataStream为啥会出现性能损耗 > >hailongwang <18868816...@163.com> 于2020年11月11日周三 下午6:28写道: >> >> 我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema`

Re: Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 Luna Wong
Table API 转 DataStream为啥会出现性能损耗 hailongwang <18868816...@163.com> 于2020年11月11日周三 下午6:28写道: > > 我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema` 序列化类就好了? > 而不是再实现一个 Connector。 > > > > > 在 2020-11-11 16:56:58,"LittleFall" <1578166...@qq.com> 写道: > >明白了,多谢。 > > > >是 Canal-Json 格式的 Kafka

ElasticsearchApiCallBridge相关类构造函数问题

2020-11-11 文章 Luna Wong
为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。 我还想继承Elasticsearch6ApiCallBridge类。在new RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。 不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?

Re: UDTAGG在SQL中可以使用么,语法是什么

2020-11-11 文章 Jark Wu
目前 SQL 中不支持 UDTAF。 你使用 UDTAF 的场景是什么呢? On Wed, 11 Nov 2020 at 20:27, Shuai Xia wrote: > Hi,像TableAggregateFunction可以在SQL中使用么? > >

UDTAGG在SQL中可以使用么,语法是什么

2020-11-11 文章 Shuai Xia
Hi,像TableAggregateFunction可以在SQL中使用么?

Re: slot数量与并行度的大小关系

2020-11-11 文章 Shawn Huang
Hi, Flink 的调度策略会保证一个job需要的slot数恰好等于该job所有算子的最大并行度。 如果slot数量小于算子的最大并行度,则该job无法执行。可以参考[1][2]中的文档描述。 目前没有方法让flink自动选择可用slot数量作为并行度,但可以通过[3]中的几种方法来设置。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/internals/job_scheduling.html [2]

Re:Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 hailongwang
我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema` 序列化类就好了? 而不是再实现一个 Connector。 在 2020-11-11 16:56:58,"LittleFall" <1578166...@qq.com> 写道: >明白了,多谢。 > >是 Canal-Json 格式的 Kafka Connector. > >我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table >api 对接 flink。 >

Re: Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 LittleFall
明白了,多谢。 是 Canal-Json 格式的 Kafka Connector. 我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table api 对接 flink。 现在是因为考虑到 Stream Api 能力比 Table Api 能力要强,所以在评估是否需要再实现一个 Stream Connector. -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql hbase维表关联性能上不去

2020-11-11 文章 kandy.wang
看了一下hbase的维表关联主要是通过org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction 实现的,测试了一下性能tps只有大概3-4w, 经加本地cache之后性能仍然没有提升。 分析了一下flink ui LookupJoin 是与kafka source的算子 chain 在一起了,这样整个算子的并行度就受限于kafka分区的并行度。 1.想问一下这块的 hbase connector开发,是否有做过connector的性能测试。

Re:1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 jiangjiguang719
根据 -p 最大并行度 和-ys 每个TM的slot个数来计算 在 2020-11-11 17:14:41,"kingdomad" 写道: >我发现1.11版本的yarn-session.sh废弃了-n参数,那如何指定taskmanager数量? > > > > > > > > > > > > > >-- > >kingdomad >

Re:1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 hailongwang
根据你 Job 的并发和指定的 TM 的规格来计算出 TM 的数量。 在 2020-11-11 16:14:41,"kingdomad" 写道: >我发现1.11版本的yarn-session.sh废弃了-n参数,那如何指定taskmanager数量? > > > > > > > > > > > > > >-- > >kingdomad >

Re:Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 hailongwang
可以的,将 table 转换成 datastream,但是会多一层转换的性能消耗。 方便说下哪个 Connector 有现成的 Table Connector 可以满足需求,但是 Datastream Connector不满足需求呢,具体是什么功能呢 在 2020-11-11 16:08:08,"LittleFall" <1578166...@qq.com> 写道: >非常感谢你的回复! > >问下另一个问题,现在有这样一个场景: > >1. table api 的计算无法满足一些需求,需要使用 stream api 进行计算; >2. 有现成可用的 table api

1.11版本的yarn-session如何指定taskmanager数量

2020-11-11 文章 kingdomad
我发现1.11版本的yarn-session.sh废弃了-n参数,那如何指定taskmanager数量? -- kingdomad

回复: Flink cdc mysql 字段是datetime类型时0000-00-00 00:00:00会被flink转成1970-01-01T00:00

2020-11-11 文章 史 正超
在flink sql 中用STRING表示datetime,这样的话后续的可操作性会比较大些。 发件人: 丁浩浩 <18579099...@163.com> 发送时间: 2020年11月11日 6:37 收件人: user-zh@flink.apache.org 主题: Flink cdc mysql 字段是datetime类型时-00-00 00:00:00会被flink转成1970-01-01T00:00 当我mysql字段时datetime并且字段值是-00-00

Re:Flink sql cdc 锁超时

2020-11-11 文章 hailongwang
有更完整的堆栈不? 在 2020-11-11 10:28:02,"丁浩浩" <18579099...@163.com> 写道: >当我使用flink cdc 对多张表进行关联查询时其中的一张表总是会有锁超时的情况,导致任务无法正常启动, >请问这种情况应该如何处理? >org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded; >try restarting transaction Error code: 1205; SQLSTATE: 40001. > at >

Re: Re:请问有哪些功能,是 stream api 可以做到,而 table api 无法做到的呢?

2020-11-11 文章 LittleFall
非常感谢你的回复! 问下另一个问题,现在有这样一个场景: 1. table api 的计算无法满足一些需求,需要使用 stream api 进行计算; 2. 有现成可用的 table api connector; 3. 没有现成可用的 stream api connector,需要进行一段时间的开发适配工作。 那么是否存在一种方法,使用 table api connector 输入输出数据,但是使用 stream api 进行计算?

Re:怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-11 文章 hailongwang
Hi Lei, 我理解这篇文章少介绍了 keyby 的逻辑。 可以keyby(robotId),然后在 processFunction 里面使用 ValueState 存储最近一次 robot 的到达时间, 同时注册一个 20min 的timer来触发检测,在检测时候,取出 ValueState 的值都是同一个 robotId的。 Best, hailong 在 2020-11-11 12:54:22,"Lei Wang" 写道: >有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > >比如

[ANNOUNCE] Apache Flink Stateful Functions 2.2.1 released

2020-11-11 文章 Tzu-Li (Gordon) Tai
The Apache Flink community released the first bugfix release of the Stateful Functions (StateFun) 2.2 series, version 2.2.1. This release fixes a critical bug that causes restoring a Stateful Functions cluster from snapshots (checkpoints or savepoints) to fail under certain conditions. *We

Re: 有没有办法把flink Row类转换为Object

2020-11-11 文章 Jark Wu
你要的是这个功能么? 1.11 上已经支持了。 CloseableIterator result = Table#execute#collect() On Wed, 11 Nov 2020 at 15:23, ZT.Ren <18668118...@163.com> wrote: > 基于flink做二次开发中,需要将flink SQL执行结果打印到会话中,会话中执行结果打印流程固定打印List类型数据。 > 大部分查询引擎(比如presto)都会在ResultSet中提供getObject方法,flink中如何实现?

Re: flink 1.11.1 使用sql ,使用hbase作为维表进行temporal join时无法获取数据

2020-11-11 文章 鱼子酱
重新搞了一下,找到原因了 1、没有配置hbase的host Reading reply sessionid:0x3000484bfd0001d, packet:: clientPath:null serverPath:null finish 2、protobuf-java版本过高 java.lang.NoClassDefFoundError: com/google/protobuf/LiteralByteString 这个2个问题一直没发现,是因为报错的信息只有debug级别里面有,我之前的日志级别是info,希望后续如果可能的话,把相关的报错的信息提高级别就更好识别啦 --

Re: 回复:flink1.11 读取kafka avro格式数据发序列化失败

2020-11-11 文章 Jark Wu
我估计你是用的 confluent schema registry 的 avro。 可以使用下在 master 分支提供的 avro-confluent format [1]。 需要自己 build 下源码。 Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/avro-confluent.html On Wed, 11 Nov 2020 at 14:20, 奔跑的小飞袁 wrote: > 这是我尝试输出的message长度 >

Re: Flink cdc mysql 字段是datetime类型时0000-00-00 00:00:00会被flink转成1970-01-01T00:00

2020-11-11 文章 Jark Wu
写个 UDF 再把 1970-01-01T00:00 转回去? On Wed, 11 Nov 2020 at 14:38, 丁浩浩 <18579099...@163.com> wrote: > 当我mysql字段时datetime并且字段值是-00-00 > 00:00:00时,会被转成1970-01-01T00:00,如果我应该如何操作才能保证跟原数值保持一致? > 输出的结果: > 2> (true,1,zhangsan,18,1970-01-01T00:00) > 3> (true,2,lisi,20,2020-11-11T14:17:46) > 4>