Re: 分组查询时,select的字段是否一定要都在group by中吗?
Hi, bulterman 你的分析是对的,group by pk的query是可以优化到筛选全部列的,这可以是个优化点,只是flink 现在还没有做, 和 Flink pk的 NOT ENFORCED 并没有关系,NOT NEOFRCED是说Flink不持有数据,不像数据库持有数据可以在读取时做校验。 个人感觉这是个小的优化点,如果很急需可以考虑在社区开个issue. 祝好, Leonard Xu > 在 2020年12月1日,13:40,bulterman <15618338...@163.com> 写道: > > Hi ALL, >我用Flink SQL 建了一张表,主键也设置了,执行形如"select * from test_table group by 主键 " > 会报Expression 'XXX' is not being group的错误,通常来说按主键group by的话不是可以确定唯一性的吗? > 难道是因为建表语句中flink的主键约束模式只支持 NOT ENFROCED吗? 这里有点不太明白
Re:Re: flink sql cdc sum 结果出现NULL
@Jianzhi Zhang 嗯,是这个原因,感谢 回复。 就是decimal的精度问题 在 2020-12-01 13:24:23,"Jianzhi Zhang" 写道: >是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现 > >> 2020年11月19日 下午10:41,kandy.wang 写道: >> >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> `id` INT UNSIGNED AUTO_INCREMENT, >> `spu_id` BIGINT NOT NULL, >> `leaving_price` DECIMAL(10, 5) >> PRIMARY KEY ( `id` ), >> unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >> `spu_id` BIGINT , >> `leaving_price` DECIMAL(10, 5), >>PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://...', >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> 'username' = '...', >> 'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >>
Re: 关于flink cdc sql转出Stream流问题
可以使用这种方式: DataStream dstream = tableEnv.toAppendStream(sourceTable, RowData.class); -- Sent from: http://apache-flink.147419.n8.nabble.com/
分组查询时,select的字段是否一定要都在group by中吗?
Hi ALL, 我用Flink SQL 建了一张表,主键也设置了,执行形如"select * from test_table group by 主键 " 会报Expression 'XXX' is not being group的错误,通常来说按主键group by的话不是可以确定唯一性的吗? 难道是因为建表语句中flink的主键约束模式只支持 NOT ENFROCED吗? 这里有点不太明白
Re: flink sql cdc sum 结果出现NULL
是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现 > 2020年11月19日 下午10:41,kandy.wang 写道: > > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > `id` INT UNSIGNED AUTO_INCREMENT, > `spu_id` BIGINT NOT NULL, > `leaving_price` DECIMAL(10, 5) > PRIMARY KEY ( `id` ), > unique key idx_spu_id (spu_id) > )ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > --flink表 > CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( > `spu_id` BIGINT , > `leaving_price` DECIMAL(10, 5), >PRIMARY KEY ( `spu_id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://...', > 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > 'username' = '...', > 'password' = '..' > ); > > > --binlog 2mysql > > insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > > SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > > FROM hive.database.table > > group by v_spu_id; > > > hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > > > 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > 有什么好的排查思路么? > > > > >
使用flink-sql解析debezium采集的mysql timestamp字段报错
flink-sql-client执行建表: CREATE TABLE source_xxx ( id INT, ctime TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset', 'debezium-json.schema-include' = 'false', 'debezium-json.ignore-parse-errors' = 'false' ); 查询: select * from source_xxx; [ERROR] Could not execute SQL statement. Reason: java.time.format.DateTimeParseException: Text '2018-07-10T23:47:35Z' could not be parsed at index 10 mysql源表中ctime字段为timestamp类型,增加'debezium-json.timestamp-format.standard' = 'ISO-8601'配置依然报错,结尾多了个Z。 想咨询一下,这块儿是flink-sql和debezium采集的timestamp格式不兼容么?还是我debezium的配置,或者使用的flink-sql类型有问题?
Re: 使用 StreamingFileSink后 checkpoint状态中的数据如何hive读取
hi, 你需要使用oncheckpoint的policy,这样在每次Checkpoint时会滚动文件 > 2020年11月30日 下午4:57,liliang <904716...@qq.com> 写道: > > 本人使用的StreamingFileSink将数据按照行保存到hdfs中 > StreamingFileSink streamingFileSink = StreamingFileSink. >forRowFormat(new Path(path), new > SimpleStringEncoder("UTF-8")) >.withBucketAssigner(bucketAssigner) >.withRollingPolicy( >DefaultRollingPolicy.builder() > > .withRolloverInterval(TimeUnit.HOURS.toMillis(1)) > > .withInactivityInterval(TimeUnit.MINUTES.toMillis(30)) >.withMaxPartSize(1024 * 1024 * 1024) >.build()) >.withOutputFileConfig( >OutputFileConfig.builder() >.withPartSuffix(partSuffix) >.build() >) >.build(); > 配置如上,checkpoint的配置是10分钟一次,现在有个疑惑想要问下,现在hdfs上文件只是在半个小时都是未完成状态, > 如 .part-0-11606723036.inprogress.5b46f31b-8289-44e9-ae26-997f3e479446 > 这种的处于 > inprocress状态,但是我这checkpoint是10分钟一次,如果我的任务在29分钟挂了,那么hdfs上这个文件就肯定不是FINISHED状态,那么那20分钟的数据我这应该怎么处理. > 我这现在按照默认的处理中,hive对于inprogress的数据是直接过滤掉的,我这把文件改成正常的名称是能读取到 > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 退订
Hi, 退订请发邮件到 user-zh-unsubscr...@flink.apache.org 详细的可以参考 [1] [1] https://flink.apache.org/zh/community.html#section-1 Best, Xingb elvis 于2020年12月1日周二 上午9:42写道: > 退订
Re: Flink检查点失败,原因写着是job failed,但我的job明明好好的。
并没有,restored为0。 熊云昆 于2020年12月1日周二 上午8:44写道: > job没有失败重启过吗?感觉是重启过吧 > > > > > > > > > > > > > > > > > > 在 2020-11-30 22:23:54,"赵一旦" 写道: > >如题。 > > > >Checkpoint Detail: > >Path: - Discarded: - Failure Message: The job has failed. > > > > > >如上,请问一般啥情况呢这是。 >
Re: flink sql es写入时,用户名密码认证不支持
不需要,设置用户名和密码就行 Best zhisheng HunterXHunter <1356469...@qq.com> 于2020年12月1日周二 上午9:46写道: > 你说的是es的 xpack 认证吗,需要你载入certificate文件是吗 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
????
Re: flink-1.11.2 job启动不起来,
hi,正超 建议把作业的日志发一下? Best zhisheng 神奇哥哥 <759341...@qq.com> 于2020年12月1日周二 上午9:38写道: > 你好,此问题我也遇到。目前已解决。 > 解决办法: > 查看你pom文件中是否引入了hadoop相关依赖,Flink 1.11需要把hadoop相关依赖注释掉。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql es写入时,用户名密码认证不支持
你说的是es的 xpack 认证吗,需要你载入certificate文件是吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql es写入时,用户名密码认证不支持
1.12 支持了,参考 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/elasticsearch.html#username Kyle Zhang 于2020年12月1日周二 上午9:35写道: > Hi,你说的是这个问题么 > > https://issues.apache.org/jira/browse/FLINK-16788 > > On Mon, Nov 30, 2020 at 7:23 PM cljb...@163.com wrote: > > > 看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗? > > 除了用api之外。 > > > > 感谢! > > > > > > > > cljb...@163.com > > >
Re: flink-1.11.2 job启动不起来,
注意是不是hadoop-client包冲突,hbase-server中也依赖了此包,需要排除。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-1.11.2提交到yarn一直处于CREATED中
注意是不是hadoop-client包冲突,hbase-server中也依赖了此包,需要排除。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-1.11.2提交到yarn一直处于CREATED中
你好,此问题我也遇到。目前已解决。 解决办法: 查看你pom文件中是否引入了hadoop相关依赖,Flink 1.11需要把hadoop相关依赖注释掉。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-1.11.2 job启动不起来,
你好,此问题我也遇到。目前已解决。 解决办法: 查看你pom文件中是否引入了hadoop相关依赖,Flink 1.11需要把hadoop相关依赖注释掉。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql client 报错java.net.NoRouteToHostException: 没有到主机的路由
本人刚学习flink ,下载解压了flink,启动./sql-client.sh embedded ,输入SELECT 'Hello World';报错 Flink SQL> SELECT 'Hello World'; [ERROR] Could not execute SQL statement. Reason: java.net.NoRouteToHostException: 没有到主机的路由 -- Sent from: http://apache-flink.147419.n8.nabble.com/
摄像头视频流采集
请教各位: 我们想做多个监控摄像头的视频流采集平台,摄像头的数量大概有1000-5000个,摄像头的流数据直接发到采集平台,之后平台可以将数据写到Hadoop或者用于机器学习消费,不知道flink是不是适合这样的场景呢?谢谢 屈夏
Re: flink sql es写入时,用户名密码认证不支持
Hi,你说的是这个问题么 https://issues.apache.org/jira/browse/FLINK-16788 On Mon, Nov 30, 2020 at 7:23 PM cljb...@163.com wrote: > 看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗? > 除了用api之外。 > > 感谢! > > > > cljb...@163.com >
Re:Flink检查点失败,原因写着是job failed,但我的job明明好好的。
job没有失败重启过吗?感觉是重启过吧 在 2020-11-30 22:23:54,"赵一旦" 写道: >如题。 > >Checkpoint Detail: >Path: - Discarded: - Failure Message: The job has failed. > > >如上,请问一般啥情况呢这是。
flink cdc 如何保证group agg结果正确性
insert into kudu.default_database.index_agg SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss') FROM XX.XX.XX group by v_spu_id; XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。 怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。
flink cdc 如何保证group agg结果正确性
insert into kudu.default_database.index_agg SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss') FROM XX.XX.XX group by v_spu_id; XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。 怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。
Re: flink任务运行不久后报netty错误
希望有人回答下这个问题,比较奇怪,也不是很好排查原因。 赵一旦 于2020年11月27日周五 下午9:25写道: > 如下报错: > 19:59:56.128 [Flink Netty Client (8009) Thread 6] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exce > ption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.214 [Flink Netty Client (8009) Thread 13] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.254 [Flink Netty Client (8009) Thread 15] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.262 [Flink Netty Client (8009) Thread 17] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.270 [Flink Netty Client (8009) Thread 19] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.279 [Flink Netty Client (8009) Thread 21] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.310 [Flink Netty Client (8009) Thread 23] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.320 [Flink Netty Client (8009) Thread 20] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.325 [Flink Netty Client (8009) Thread 22] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > epoll_wait(..) failed: Operation now in progress > 19:59:56.339 [Flink Netty Client (8009) Thread 25] WARN > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - > Unexpected exc > eption in the selector loop. > > > 不清楚啥情况。 >
flink taskmanager netty报错。
如下图,报错报的是关于selector loop中错误。希望有大神帮忙分析下可能原因。 22:23:17.045 [Flink Netty Client (2007) Thread 28] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Operation now in progress 22:23:17.045 [Flink Netty Client (2007) Thread 15] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Operation now in progress 22:23:17.045 [Flink Netty Client (2007) Thread 16] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Operation now in progress 22:23:17.045 [Flink Netty Server (2007) Thread 19] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Resource temporarily unavailable 22:23:17.204 [Flink Netty Client (2007) Thread 13] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Operation now in progress 22:23:17.222 [Flink Netty Client (2007) Thread 27] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Operation now in progress 22:23:17.257 [Flink Netty Client (2007) Thread 18] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Operation now in progress 22:23:17.257 [Flink Netty Client (2007) Thread 21] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Operation now in progress 22:23:17.257 [Flink Netty Server (2007) Thread 20] WARN org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop - Unexpected exception in the selector loop. org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: epoll_wait(..) failed: Resource temporarily unavailable
Flink检查点失败,原因写着是job failed,但我的job明明好好的。
如题。 Checkpoint Detail: Path: - Discarded: - Failure Message: The job has failed. 如上,请问一般啥情况呢这是。
flink sql es写入时,用户名密码认证不支持
看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗? 除了用api之外。 感谢! cljb...@163.com
Re: Re: Re:回复:带有状态的算子保存checkpoint失败
checkpoint 失败了可以看看 是超时了,还是有 task snapshot 失败了,可以从 JM log 中来发现。超时的话,可以看下是数据量大需要时间久,还是 timeout 啥的设置太短;异常的话可以从对应的 tm log 看下为啥 snapshot 失败了 Best, Congxian 王默 于2020年11月27日周五 下午11:43写道: > checkpoint失败是在web页面上发现的,您看下截图https://imgchr.com/i/Dr3PNn > 看taskmanager日志确实没有超时,也没有其他异常 > > > > > > > > > > > > > > > > > > 在 2020-11-27 21:39:50,"赵一旦" 写道: > >失败原因也不写,怎么个不能保存。。。超时?还是啥。 > > > >魏积乾 于2020年11月27日周五 下午7:08写道: > > > >> flink-csv-1.11.2.jar > >> flink-dist_2.11-1.11.2.jar > >> flink-json-1.11.2.jar > >> flink-shaded-zookeeper-3.4.14.jar > >> flink-table_2.11-1.11.2.jar > >> flink-table-blink_2.11-1.11.2.jar > >> log4j-1.2-api-2.12.1.jar > >> log4j-api-2.12.1.jar > >> log4j-core-2.12.1.jar > >> log4j-slf4j-impl-2.12.1.jar > >> flink-metrics-prometheus_2.12-1.11.2.jar > >> > >> 按时间排了个序,这是最新的包。 > >> > >> > >> > >> 发自我的iPhone > >> > >> > >> -- 原始邮件 -- > >> 发件人: 王默 >> 发送时间: 2020年11月27日 18:41 > >> 收件人: user-zh harry...@foxmail.com > >> > >> 主题: 回复:Re:回复:带有状态的算子保存checkpoint失败 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-11-27 17:34:39,"魏积乾" >> > 我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 > >> 希望对你有帮助发自我的iPhone > -- > >> 原始邮件 -- 发件人: 王默 >> 发送时间: 2020年11月27日 17:22 收件人: user-zh gt; > >> 主题: 回复:带有状态的算子保存checkpoint失败 > >> > >> > >> > >> >
flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。
flink 版本: 1.11.2 * Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[64_40108_0_1]: version conflict, required seqNo [95958], primary term [1]. current document has seqNo [99881] and primary term [1]]]* 完整信息: 2020-11-13 11:07:04 java.lang.RuntimeException: An error occurred in ElasticsearchSink. at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:309) at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[64_40108_0_1]: version conflict, required seqNo [95958], primary term [1]. current document has seqNo [99881] and primary term [1]]] at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:496) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:407) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:138) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:196) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1581) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1663) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:590) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:333) at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:327) at org.apache.flink.elasticsearch7.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181) at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448) at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) at
Re: 请教一下目前flink submit能不能指定额外的依赖jar
原来如此,我觉得是一个不错的想法,但是其实对用户来说,最好除了写SQL之外,其他事情都不要做是最好
Re: 请教一下目前flink submit能不能指定额外的依赖jar
你好: 这个原因最开始已经说明了,main jar就是将传入的sql参数进行解析封装,而sql里用到的udf、connector之类的类型希望可以做到动态指定 一方面可以做到灵活的依赖控制,减少main jar的大小 另一方吧可以减少不同connector和udf,或不同版本connector和udf的依赖冲突的可能性 ps:假如平台有数十种connector和数百个udf都打到一个fast jar里想想都觉得不太优雅吧 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教一下目前flink submit能不能指定额外的依赖jar
Hi silence, 想问下为什么一定要submit参数呢?我理解如果是做平台的话,用户如果有多个jar依赖,为什么不把这些jar统一打包到任务主jar里呢?,平台可以提供一些公共依赖,比如flink,hadoop等 silence 于2020年11月30日周一 下午5:20写道: > 看了很多同学回复yarn的解决方案 > > 我这再补充一下: > 还是希望可以提供更通用的submit参数来解决此问题, > 包括提交到standalone集群时可以额外指定本地依赖jar > > 有没有cli相关的同学可以跟进下建议 > 谢谢 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教一下目前flink submit能不能指定额外的依赖jar
看了很多同学回复yarn的解决方案 我这再补充一下: 还是希望可以提供更通用的submit参数来解决此问题, 包括提交到standalone集群时可以额外指定本地依赖jar 有没有cli相关的同学可以跟进下建议 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL导致Prometheus内存暴涨
我看了源码了。operator name截断了。但是task name没截断。task name是那些operator name拼起来的 所以特别长。现在我只是魔改源码临时截断了一下,咱还是在issue里讨论吧 Jark Wu 于2020年11月26日周四 下午8:53写道: > > IIRC, runtime will truncate the operator name to max 80 characters, see > `TaskMetricGroup#METRICS_OPERATOR_NAME_MAX_LENGTH`. > You can search the log if there are "The operator name {} exceeded the {} > characters length limit and was truncated.". > > On Thu, 26 Nov 2020 at 18:18, hailongwang <18868816...@163.com> wrote: > > > > > > > > > Hi, > > 是的,个人觉得可以提供一个配置项来控制 task Name。 > > 完整的 task name 有助于排查问题等,简短的 task name 有助于在生产环境中 metric > > 的采集,可以极大较少发送的网络开销,存储空间等。 > > 已建立个了 issue :https://issues.apache.org/jira/browse/FLINK-20375 > > > > > > Best, > > Hailong > > > > 在 2020-11-24 14:19:40,"Luna Wong" 写道: > > >FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。 > > >下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。 > > > > > > > >task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed" > >
使用 StreamingFileSink后 checkpoint状态中的数据如何hive读取
本人使用的StreamingFileSink将数据按照行保存到hdfs中 StreamingFileSink streamingFileSink = StreamingFileSink. forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8")) .withBucketAssigner(bucketAssigner) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.HOURS.toMillis(1)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(30)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .withOutputFileConfig( OutputFileConfig.builder() .withPartSuffix(partSuffix) .build() ) .build(); 配置如上,checkpoint的配置是10分钟一次,现在有个疑惑想要问下,现在hdfs上文件只是在半个小时都是未完成状态, 如 .part-0-11606723036.inprogress.5b46f31b-8289-44e9-ae26-997f3e479446 这种的处于 inprocress状态,但是我这checkpoint是10分钟一次,如果我的任务在29分钟挂了,那么hdfs上这个文件就肯定不是FINISHED状态,那么那20分钟的数据我这应该怎么处理. 我这现在按照默认的处理中,hive对于inprogress的数据是直接过滤掉的,我这把文件改成正常的名称是能读取到 -- Sent from: http://apache-flink.147419.n8.nabble.com/
关于flink cdc sql转出Stream流问题
代码采用sql方式接入mysql cdc数据然后转出Stream流, 写入kudu,但是不知道怎么去获取 row里面的主键字段是哪一个和字段名称和类型等? 或者toRetractStream可以指定其他的class??? 下面是代码 == tableEnv.executeSql(createTableSql); Table table = tableEnv.sqlQuery(querySql); DataStream> dataStream = tableEnv.toRetractStream(table, Row.class); dataStream.print().setParallelism(1); == -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink on native k8s deploy issue
如果你是用的ClusterIP的暴露方式,那任务提交只能在K8s内进行的 因为外部环境无法解析到K8s内部的service(也就是tuiwen-flink-rest.flink) 你可以在K8s集群内起一个Pod来充当Flink client,然后在Pod内进行任务提交 Best, Yang 吴松 于2020年11月24日周二 下午4:23写道: > 不好意思,这个报错应该是内存的问题。 我想说的是一下的报错。 > > > > > > > 2020-11-24 16:19:33,569 ERROR > org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient [] - A > Kubernetes exception occurred. > java.net.UnknownHostException: tuiwen-flink-rest.flink: Name or service > not known > at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > ~[?:1.8.0_252] > at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) > ~[?:1.8.0_252] > at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) > ~[?:1.8.0_252] > at java.net.InetAddress.getAllByName0(InetAddress.java:1277) > ~[?:1.8.0_252] > at java.net.InetAddress.getAllByName(InetAddress.java:1193) > ~[?:1.8.0_252] > at java.net.InetAddress.getAllByName(InetAddress.java:1127) > ~[?:1.8.0_252] > at java.net.InetAddress.getByName(InetAddress.java:1077) > ~[?:1.8.0_252] > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:193) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:113) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:188) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:188) > [flink-dist_2.12-1.11.2.jar:1.11.2] > 2020-11-24 16:19:33,606 ERROR > org.apache.flink.kubernetes.cli.KubernetesSessionCli > [] - Error while running the Flink session. > java.lang.RuntimeException: > org.apache.flink.client.deployment.ClusterRetrieveException: Could not > create the RestClusterClient. > at > org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:117) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:188) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:188) > [flink-dist_2.12-1.11.2.jar:1.11.2] > Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: > Could not create the RestClusterClient. > ... 6 more > Caused by: java.net.UnknownHostException: tuiwen-flink-rest.flink: Name or > service not known > at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > ~[?:1.8.0_252] > at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) > ~[?:1.8.0_252] > at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) > ~[?:1.8.0_252] > at java.net.InetAddress.getAllByName0(InetAddress.java:1277) > ~[?:1.8.0_252] > at java.net.InetAddress.getAllByName(InetAddress.java:1193) > ~[?:1.8.0_252] > at java.net.InetAddress.getAllByName(InetAddress.java:1127) > ~[?:1.8.0_252] > at java.net.InetAddress.getByName(InetAddress.java:1077) > ~[?:1.8.0_252] > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:193) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:113) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > ... 5 more > > > > The program finished with the following exception: > > > java.lang.RuntimeException: > org.apache.flink.client.deployment.ClusterRetrieveException: Could not > create the RestClusterClient. > at >
Re: flink-json 函数用法
Hi: 那我再看看json相关的信息,然后刚才测试的时候发现这个另一个问题 select ENCODE('ISO-8859-1', F_sp_withdraw_user_name) from t_sp_user_info where F_sp_withdraw_user_name is not null; Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'BINARY(1)' (conversion class: [B) to type information. Only data types that originated from type information fully support a reverse conversion. at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259) 使用flnk-sql的ENCODE 进行转码的时候显示如下问题ENCODE(F_sp_withdraw_user_name,'ISO-8859-1') 也是一样的 在 2020/11/30 15:53,“Benchao Li” 写入: Hi, 目前Flink SQL应该还没有正式支持json函数吧,上面的报错信息看起来也是符合预期的,说的是目前还找不到这个函数。 相关信息可以参考:https://issues.apache.org/jira/browse/FLINK-9477 Yan,Yunpeng(DXM,PB) 于2020年11月30日周一 下午2:18写道: > Flink SQL> select JSON_OBJECT('product_type' VALUE product_type) > > from income_fee > > ; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.sql.validate.SqlValidatorException: No match found for > function signature JSON_OBJECT(, , ) > > Flink SQL> select JSON_OBJECT('product_type' VALUE product_type) > > from sp_income_fee > > where enabled = 1 > > group by id; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.sql.validate.SqlValidatorException: No match found for > function signature JSON_OBJECT(, , ) > > Flink SQL> select JSON_ARRAYAGG(product_type) > > from income_fee > > where f_enabled = 1; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Unsupported Function: > 'JSON_ARRAYAGG_ABSENT_ON_NULL' > > 闫云鹏 > DXM 支付业务部 > 地址:北京市海淀区西北旺东路度小满金融总部 > 邮编:100085 > 手机:13693668213 > 邮箱:yanyunp...@duxiaoman.com > > 度小满金融 > 精于科技 值得信赖 > > > > 在 2020/11/30 11:05,“caozhen” 写入: > > 可以把使用方法和 报错信息 发下嘛? > > > > > Yan,Yunpeng(DXM,PB) wrote > > Hi: > > 尝试使用flink-sql将聚合结果json展示的时候发现flink是支持JSON_OBJECTAGG, JSON_ARRAY, > > JSON_OBJECT 等这种函数的(使用的默认的blink), > 但是总是报错函数的用法不对,有相关资料来介绍这些函数的使用方法的吗?或者示例 > > > > 闫云鹏 > > DXM 支付业务部 > > 地址:北京市海淀区西北旺东路度小满金融总部 > > 邮编:100085 > > 手机:13693668213 > > 邮箱: > > > yanyunpeng@ > > > mailto: > > > yanyunpeng@ > > > > > > > 度小满金融 > > > > 精于科技 值得信赖 > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > > -- Best, Benchao Li