Re: 分组查询时,select的字段是否一定要都在group by中吗?

2020-11-30 文章 Leonard Xu
Hi, bulterman

你的分析是对的,group by pk的query是可以优化到筛选全部列的,这可以是个优化点,只是flink 现在还没有做, 和 Flink pk的 NOT 
ENFORCED 并没有关系,NOT NEOFRCED是说Flink不持有数据,不像数据库持有数据可以在读取时做校验。 
个人感觉这是个小的优化点,如果很急需可以考虑在社区开个issue.


祝好,
Leonard Xu
 

> 在 2020年12月1日,13:40,bulterman <15618338...@163.com> 写道:
> 
> Hi ALL,
>我用Flink SQL 建了一张表,主键也设置了,执行形如"select * from test_table group by 主键 " 
> 会报Expression 'XXX' is not being group的错误,通常来说按主键group by的话不是可以确定唯一性的吗? 
> 难道是因为建表语句中flink的主键约束模式只支持 NOT ENFROCED吗?  这里有点不太明白



Re:Re: flink sql cdc sum 结果出现NULL

2020-11-30 文章 kandy.wang









@Jianzhi Zhang

嗯,是这个原因,感谢 回复。 就是decimal的精度问题




在 2020-12-01 13:24:23,"Jianzhi Zhang"  写道:
>是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现
>
>> 2020年11月19日 下午10:41,kandy.wang  写道:
>> 
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>   `id` INT UNSIGNED AUTO_INCREMENT,
>>   `spu_id` BIGINT NOT NULL,
>>   `leaving_price`  DECIMAL(10, 5)
>>   PRIMARY KEY ( `id` ),
>>   unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> 
>> 
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>   `spu_id` BIGINT ,
>>   `leaving_price`  DECIMAL(10, 5),
>>PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>  'connector' = 'jdbc',
>>   'url' = 'jdbc:mysql://...',
>>   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>   'username' = '...',
>>   'password' = '..'
>> );
>> 
>> 
>> --binlog 2mysql
>> 
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> 
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> 
>> FROM hive.database.table
>> 
>> group by v_spu_id;
>> 
>> 
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> 
>> 
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>> 
>> 
>> 
>> 
>> 


Re: 关于flink cdc sql转出Stream流问题

2020-11-30 文章 jsqf
可以使用这种方式:
DataStream dstream = tableEnv.toAppendStream(sourceTable,
RowData.class);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


分组查询时,select的字段是否一定要都在group by中吗?

2020-11-30 文章 bulterman
Hi ALL,
我用Flink SQL 建了一张表,主键也设置了,执行形如"select * from test_table group by 主键 " 
会报Expression 'XXX' is not being group的错误,通常来说按主键group by的话不是可以确定唯一性的吗? 
难道是因为建表语句中flink的主键约束模式只支持 NOT ENFROCED吗?  这里有点不太明白

Re: flink sql cdc sum 结果出现NULL

2020-11-30 文章 Jianzhi Zhang
是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现

> 2020年11月19日 下午10:41,kandy.wang  写道:
> 
> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>   `id` INT UNSIGNED AUTO_INCREMENT,
>   `spu_id` BIGINT NOT NULL,
>   `leaving_price`  DECIMAL(10, 5)
>   PRIMARY KEY ( `id` ),
>   unique key idx_spu_id (spu_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> 
> 
> --flink表
> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>   `spu_id` BIGINT ,
>   `leaving_price`  DECIMAL(10, 5),
>PRIMARY KEY ( `spu_id`) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://...',
>   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>   'username' = '...',
>   'password' = '..'
> );
> 
> 
> --binlog 2mysql
> 
> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> 
> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> 
> FROM hive.database.table
> 
> group by v_spu_id;
> 
> 
> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> 
> 
> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> 有什么好的排查思路么?
> 
> 
> 
> 
> 



使用flink-sql解析debezium采集的mysql timestamp字段报错

2020-11-30 文章 王羽凡
flink-sql-client执行建表:

CREATE TABLE source_xxx (
 id INT,
 ctime TIMESTAMP
) WITH (
 'connector' = 'kafka',
 'topic' = 'xxx',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'debezium-json',
 'scan.startup.mode' = 'earliest-offset',
 'debezium-json.schema-include' = 'false',
 'debezium-json.ignore-parse-errors' = 'false'
);

查询:
select * from source_xxx;
[ERROR] Could not execute SQL statement. Reason:
java.time.format.DateTimeParseException: Text '2018-07-10T23:47:35Z' could
not be parsed at index 10

mysql源表中ctime字段为timestamp类型,增加'debezium-json.timestamp-format.standard' =
'ISO-8601'配置依然报错,结尾多了个Z。
想咨询一下,这块儿是flink-sql和debezium采集的timestamp格式不兼容么?还是我debezium的配置,或者使用的flink-sql类型有问题?


Re: 使用 StreamingFileSink后 checkpoint状态中的数据如何hive读取

2020-11-30 文章 admin
hi,
你需要使用oncheckpoint的policy,这样在每次Checkpoint时会滚动文件

> 2020年11月30日 下午4:57,liliang <904716...@qq.com> 写道:
> 
> 本人使用的StreamingFileSink将数据按照行保存到hdfs中
>  StreamingFileSink streamingFileSink = StreamingFileSink.
>forRowFormat(new Path(path), new
> SimpleStringEncoder("UTF-8"))
>.withBucketAssigner(bucketAssigner)
>.withRollingPolicy(
>DefaultRollingPolicy.builder()
> 
> .withRolloverInterval(TimeUnit.HOURS.toMillis(1))
> 
> .withInactivityInterval(TimeUnit.MINUTES.toMillis(30)) 
>.withMaxPartSize(1024 * 1024 * 1024)
>.build())
>.withOutputFileConfig(
>OutputFileConfig.builder()
>.withPartSuffix(partSuffix)
>.build()
>)
>.build();
> 配置如上,checkpoint的配置是10分钟一次,现在有个疑惑想要问下,现在hdfs上文件只是在半个小时都是未完成状态,
> 如 .part-0-11606723036.inprogress.5b46f31b-8289-44e9-ae26-997f3e479446
> 这种的处于
> inprocress状态,但是我这checkpoint是10分钟一次,如果我的任务在29分钟挂了,那么hdfs上这个文件就肯定不是FINISHED状态,那么那20分钟的数据我这应该怎么处理.
> 我这现在按照默认的处理中,hive对于inprogress的数据是直接过滤掉的,我这把文件改成正常的名称是能读取到
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 退订

2020-11-30 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingb

elvis  于2020年12月1日周二 上午9:42写道:

> 退订


Re: Flink检查点失败,原因写着是job failed,但我的job明明好好的。

2020-11-30 文章 赵一旦
并没有,restored为0。

熊云昆  于2020年12月1日周二 上午8:44写道:

> job没有失败重启过吗?感觉是重启过吧
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-11-30 22:23:54,"赵一旦"  写道:
> >如题。
> >
> >Checkpoint Detail:
> >Path: - Discarded: - Failure Message: The job has failed.
> >
> >
> >如上,请问一般啥情况呢这是。
>


Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 zhisheng
不需要,设置用户名和密码就行

Best
zhisheng

HunterXHunter <1356469...@qq.com> 于2020年12月1日周二 上午9:46写道:

> 你说的是es的 xpack 认证吗,需要你载入certificate文件是吗
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


????

2020-11-30 文章 elvis


Re: flink-1.11.2 job启动不起来,

2020-11-30 文章 zhisheng
hi,正超

建议把作业的日志发一下?

Best
zhisheng

神奇哥哥 <759341...@qq.com> 于2020年12月1日周二 上午9:38写道:

> 你好,此问题我也遇到。目前已解决。
> 解决办法:
> 查看你pom文件中是否引入了hadoop相关依赖,Flink 1.11需要把hadoop相关依赖注释掉。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 HunterXHunter
你说的是es的 xpack 认证吗,需要你载入certificate文件是吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 zhisheng
1.12 支持了,参考
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/elasticsearch.html#username

Kyle Zhang  于2020年12月1日周二 上午9:35写道:

> Hi,你说的是这个问题么
>
> https://issues.apache.org/jira/browse/FLINK-16788
>
> On Mon, Nov 30, 2020 at 7:23 PM cljb...@163.com  wrote:
>
> > 看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗?
> > 除了用api之外。
> >
> > 感谢!
> >
> >
> >
> > cljb...@163.com
> >
>


Re: flink-1.11.2 job启动不起来,

2020-11-30 文章 神奇哥哥
注意是不是hadoop-client包冲突,hbase-server中也依赖了此包,需要排除。 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-30 文章 神奇哥哥
注意是不是hadoop-client包冲突,hbase-server中也依赖了此包,需要排除。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-30 文章 神奇哥哥
你好,此问题我也遇到。目前已解决。
解决办法:
查看你pom文件中是否引入了hadoop相关依赖,Flink 1.11需要把hadoop相关依赖注释掉。 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-1.11.2 job启动不起来,

2020-11-30 文章 神奇哥哥
你好,此问题我也遇到。目前已解决。
解决办法:
查看你pom文件中是否引入了hadoop相关依赖,Flink 1.11需要把hadoop相关依赖注释掉。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql client 报错java.net.NoRouteToHostException: 没有到主机的路由

2020-11-30 文章 奚焘
本人刚学习flink ,下载解压了flink,启动./sql-client.sh embedded ,输入SELECT 'Hello World';报错
Flink SQL> SELECT 'Hello World';
[ERROR] Could not execute SQL statement. Reason:
java.net.NoRouteToHostException: 没有到主机的路由



--
Sent from: http://apache-flink.147419.n8.nabble.com/


摄像头视频流采集

2020-11-30 文章 Xia(Nate) Qu
请教各位:

我们想做多个监控摄像头的视频流采集平台,摄像头的数量大概有1000-5000个,摄像头的流数据直接发到采集平台,之后平台可以将数据写到Hadoop或者用于机器学习消费,不知道flink是不是适合这样的场景呢?谢谢


屈夏


Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 Kyle Zhang
Hi,你说的是这个问题么

https://issues.apache.org/jira/browse/FLINK-16788

On Mon, Nov 30, 2020 at 7:23 PM cljb...@163.com  wrote:

> 看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗?
> 除了用api之外。
>
> 感谢!
>
>
>
> cljb...@163.com
>


Re:Flink检查点失败,原因写着是job failed,但我的job明明好好的。

2020-11-30 文章 熊云昆
job没有失败重启过吗?感觉是重启过吧

















在 2020-11-30 22:23:54,"赵一旦"  写道:
>如题。
>
>Checkpoint Detail:
>Path: - Discarded: - Failure Message: The job has failed.
>
>
>如上,请问一般啥情况呢这是。


flink cdc 如何保证group agg结果正确性

2020-11-30 文章 kandy.wang
insert into kudu.default_database.index_agg
SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as 
leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
FROM  XX.XX.XX
group by v_spu_id;


XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。

flink cdc 如何保证group agg结果正确性

2020-11-30 文章 kandy.wang
insert into kudu.default_database.index_agg
SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as 
leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
FROM  XX.XX.XX
group by v_spu_id;


XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。

Re: flink任务运行不久后报netty错误

2020-11-30 文章 赵一旦
希望有人回答下这个问题,比较奇怪,也不是很好排查原因。

赵一旦  于2020年11月27日周五 下午9:25写道:

> 如下报错:
> 19:59:56.128 [Flink Netty Client (8009) Thread 6] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exce
> ption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.214 [Flink Netty Client (8009) Thread 13] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.254 [Flink Netty Client (8009) Thread 15] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.262 [Flink Netty Client (8009) Thread 17] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.270 [Flink Netty Client (8009) Thread 19] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.279 [Flink Netty Client (8009) Thread 21] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.310 [Flink Netty Client (8009) Thread 23] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.320 [Flink Netty Client (8009) Thread 20] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.325 [Flink Netty Client (8009) Thread 22] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> epoll_wait(..) failed: Operation now in progress
> 19:59:56.339 [Flink Netty Client (8009) Thread 25] WARN
>  org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
> Unexpected exc
> eption in the selector loop.
>
>
> 不清楚啥情况。
>


flink taskmanager netty报错。

2020-11-30 文章 赵一旦
如下图,报错报的是关于selector loop中错误。希望有大神帮忙分析下可能原因。
22:23:17.045 [Flink Netty Client (2007) Thread 28] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
22:23:17.045 [Flink Netty Client (2007) Thread 15] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
22:23:17.045 [Flink Netty Client (2007) Thread 16] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
22:23:17.045 [Flink Netty Server (2007) Thread 19] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Resource temporarily unavailable
22:23:17.204 [Flink Netty Client (2007) Thread 13] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
22:23:17.222 [Flink Netty Client (2007) Thread 27] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
22:23:17.257 [Flink Netty Client (2007) Thread 18] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
22:23:17.257 [Flink Netty Client (2007) Thread 21] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
22:23:17.257 [Flink Netty Server (2007) Thread 20] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Resource temporarily unavailable


Flink检查点失败,原因写着是job failed,但我的job明明好好的。

2020-11-30 文章 赵一旦
如题。

Checkpoint Detail:
Path: - Discarded: - Failure Message: The job has failed.


如上,请问一般啥情况呢这是。


flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 cljb...@163.com
看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗?
除了用api之外。

感谢!



cljb...@163.com


Re: Re: Re:回复:带有状态的算子保存checkpoint失败

2020-11-30 文章 Congxian Qiu
checkpoint 失败了可以看看 是超时了,还是有 task snapshot 失败了,可以从 JM log
中来发现。超时的话,可以看下是数据量大需要时间久,还是 timeout 啥的设置太短;异常的话可以从对应的 tm log 看下为啥 snapshot
失败了

Best,
Congxian


王默  于2020年11月27日周五 下午11:43写道:

> checkpoint失败是在web页面上发现的,您看下截图https://imgchr.com/i/Dr3PNn
> 看taskmanager日志确实没有超时,也没有其他异常
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-11-27 21:39:50,"赵一旦"  写道:
> >失败原因也不写,怎么个不能保存。。。超时?还是啥。
> >
> >魏积乾  于2020年11月27日周五 下午7:08写道:
> >
> >> flink-csv-1.11.2.jar
> >> flink-dist_2.11-1.11.2.jar
> >> flink-json-1.11.2.jar
> >> flink-shaded-zookeeper-3.4.14.jar
> >> flink-table_2.11-1.11.2.jar
> >> flink-table-blink_2.11-1.11.2.jar
> >> log4j-1.2-api-2.12.1.jar
> >> log4j-api-2.12.1.jar
> >> log4j-core-2.12.1.jar
> >> log4j-slf4j-impl-2.12.1.jar
> >> flink-metrics-prometheus_2.12-1.11.2.jar
> >>
> >> 按时间排了个序,这是最新的包。
> >>
> >>
> >>
> >> 发自我的iPhone
> >>
> >>
> >> -- 原始邮件 --
> >> 发件人: 王默  >> 发送时间: 2020年11月27日 18:41
> >> 收件人: user-zh  harry...@foxmail.com
> >> 
> >> 主题: 回复:Re:回复:带有状态的算子保存checkpoint失败
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-11-27 17:34:39,"魏积乾"  >>
> 我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
> >> 希望对你有帮助发自我的iPhone  
> --
> >> 原始邮件 -- 发件人: 王默  >> 发送时间: 2020年11月27日 17:22 收件人: user-zh  gt;
> >> 主题: 回复:带有状态的算子保存checkpoint失败
> >>
> >>
> >>
> >> 
>


flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2020-11-30 文章 jindy_liu
flink 版本: 1.11.2

*
Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[64_40108_0_1]: version
conflict, required seqNo [95958], primary term [1]. current document has
seqNo [99881] and primary term [1]]]*


完整信息:

2020-11-13 11:07:04
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:383)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:388)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:309)
at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[64_40108_0_1]: version
conflict, required seqNo [95958], primary term [1]. current document has
seqNo [99881] and primary term [1]]]
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:496)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:407)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:138)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:196)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1581)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1663)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:590)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:333)
at
org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:327)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at

Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 文章 zilong xiao
原来如此,我觉得是一个不错的想法,但是其实对用户来说,最好除了写SQL之外,其他事情都不要做是最好


Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 文章 silence
你好:

这个原因最开始已经说明了,main jar就是将传入的sql参数进行解析封装,而sql里用到的udf、connector之类的类型希望可以做到动态指定
一方面可以做到灵活的依赖控制,减少main jar的大小
另一方吧可以减少不同connector和udf,或不同版本connector和udf的依赖冲突的可能性

ps:假如平台有数十种connector和数百个udf都打到一个fast jar里想想都觉得不太优雅吧



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 文章 zilong xiao
Hi silence,

想问下为什么一定要submit参数呢?我理解如果是做平台的话,用户如果有多个jar依赖,为什么不把这些jar统一打包到任务主jar里呢?,平台可以提供一些公共依赖,比如flink,hadoop等

silence  于2020年11月30日周一 下午5:20写道:

> 看了很多同学回复yarn的解决方案
>
> 我这再补充一下:
> 还是希望可以提供更通用的submit参数来解决此问题,
> 包括提交到standalone集群时可以额外指定本地依赖jar
>
> 有没有cli相关的同学可以跟进下建议
> 谢谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教一下目前flink submit能不能指定额外的依赖jar

2020-11-30 文章 silence
看了很多同学回复yarn的解决方案

我这再补充一下:
还是希望可以提供更通用的submit参数来解决此问题,
包括提交到standalone集群时可以额外指定本地依赖jar

有没有cli相关的同学可以跟进下建议
谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL导致Prometheus内存暴涨

2020-11-30 文章 Luna Wong
我看了源码了。operator name截断了。但是task name没截断。task name是那些operator name拼起来的
所以特别长。现在我只是魔改源码临时截断了一下,咱还是在issue里讨论吧

Jark Wu  于2020年11月26日周四 下午8:53写道:
>
> IIRC, runtime will truncate the operator name to max 80 characters, see
> `TaskMetricGroup#METRICS_OPERATOR_NAME_MAX_LENGTH`.
> You can search the log if there are "The operator name {} exceeded the {}
> characters length limit and was truncated.".
>
> On Thu, 26 Nov 2020 at 18:18, hailongwang <18868816...@163.com> wrote:
>
> >
> >
> >
> > Hi,
> >  是的,个人觉得可以提供一个配置项来控制 task Name。
> >  完整的 task name 有助于排查问题等,简短的 task name 有助于在生产环境中 metric
> > 的采集,可以极大较少发送的网络开销,存储空间等。
> >  已建立个了 issue :https://issues.apache.org/jira/browse/FLINK-20375
> >
> >
> > Best,
> > Hailong
> >
> > 在 2020-11-24 14:19:40,"Luna Wong"  写道:
> > >FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。
> > >下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。
> > >
> >
> > >task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed"
> >


使用 StreamingFileSink后 checkpoint状态中的数据如何hive读取

2020-11-30 文章 liliang
本人使用的StreamingFileSink将数据按照行保存到hdfs中
  StreamingFileSink streamingFileSink = StreamingFileSink.
forRowFormat(new Path(path), new
SimpleStringEncoder("UTF-8"))
.withBucketAssigner(bucketAssigner)
.withRollingPolicy(
DefaultRollingPolicy.builder()
   
.withRolloverInterval(TimeUnit.HOURS.toMillis(1))
   
.withInactivityInterval(TimeUnit.MINUTES.toMillis(30)) 
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartSuffix(partSuffix)
.build()
)
.build();
配置如上,checkpoint的配置是10分钟一次,现在有个疑惑想要问下,现在hdfs上文件只是在半个小时都是未完成状态,
如 .part-0-11606723036.inprogress.5b46f31b-8289-44e9-ae26-997f3e479446
这种的处于
inprocress状态,但是我这checkpoint是10分钟一次,如果我的任务在29分钟挂了,那么hdfs上这个文件就肯定不是FINISHED状态,那么那20分钟的数据我这应该怎么处理.
我这现在按照默认的处理中,hive对于inprogress的数据是直接过滤掉的,我这把文件改成正常的名称是能读取到




--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于flink cdc sql转出Stream流问题

2020-11-30 文章 yujianbo
代码采用sql方式接入mysql cdc数据然后转出Stream流, 写入kudu,但是不知道怎么去获取
row里面的主键字段是哪一个和字段名称和类型等?
或者toRetractStream可以指定其他的class???

下面是代码
==
tableEnv.executeSql(createTableSql);
Table table = tableEnv.sqlQuery(querySql);
DataStream> dataStream =
tableEnv.toRetractStream(table, Row.class);
dataStream.print().setParallelism(1);
==



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on native k8s deploy issue

2020-11-30 文章 Yang Wang
如果你是用的ClusterIP的暴露方式,那任务提交只能在K8s内进行的
因为外部环境无法解析到K8s内部的service(也就是tuiwen-flink-rest.flink)

你可以在K8s集群内起一个Pod来充当Flink client,然后在Pod内进行任务提交


Best,
Yang

吴松  于2020年11月24日周二 下午4:23写道:

> 不好意思,这个报错应该是内存的问题。 我想说的是一下的报错。
>
>
>
>
>
>
> 2020-11-24 16:19:33,569 ERROR
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient [] - A
> Kubernetes exception occurred.
> java.net.UnknownHostException: tuiwen-flink-rest.flink: Name or service
> not known
> at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
> ~[?:1.8.0_252]
> at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
> ~[?:1.8.0_252]
> at
> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
> ~[?:1.8.0_252]
> at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
> ~[?:1.8.0_252]
> at java.net.InetAddress.getAllByName(InetAddress.java:1193)
> ~[?:1.8.0_252]
> at java.net.InetAddress.getAllByName(InetAddress.java:1127)
> ~[?:1.8.0_252]
> at java.net.InetAddress.getByName(InetAddress.java:1077)
> ~[?:1.8.0_252]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:193)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:113)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:188)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:188)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
> 2020-11-24 16:19:33,606 ERROR
> org.apache.flink.kubernetes.cli.KubernetesSessionCli  
>  [] - Error while running the Flink session.
> java.lang.RuntimeException:
> org.apache.flink.client.deployment.ClusterRetrieveException: Could not
> create the RestClusterClient.
> at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:117)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:188)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:188)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
> Caused by: org.apache.flink.client.deployment.ClusterRetrieveException:
> Could not create the RestClusterClient.
> ... 6 more
> Caused by: java.net.UnknownHostException: tuiwen-flink-rest.flink: Name or
> service not known
> at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
> ~[?:1.8.0_252]
> at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
> ~[?:1.8.0_252]
> at
> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
> ~[?:1.8.0_252]
> at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
> ~[?:1.8.0_252]
> at java.net.InetAddress.getAllByName(InetAddress.java:1193)
> ~[?:1.8.0_252]
> at java.net.InetAddress.getAllByName(InetAddress.java:1127)
> ~[?:1.8.0_252]
> at java.net.InetAddress.getByName(InetAddress.java:1077)
> ~[?:1.8.0_252]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:193)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:113)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> ... 5 more
>
>
> 
> The program finished with the following exception:
>
>
> java.lang.RuntimeException:
> org.apache.flink.client.deployment.ClusterRetrieveException: Could not
> create the RestClusterClient.
> at
> 

Re: flink-json 函数用法

2020-11-30 文章 Yan,Yunpeng(DXM,PB)
Hi:
那我再看看json相关的信息,然后刚才测试的时候发现这个另一个问题
select ENCODE('ISO-8859-1', F_sp_withdraw_user_name) from t_sp_user_info 
where F_sp_withdraw_user_name is not null;

 Exception in thread "main" 
org.apache.flink.table.client.SqlClientException: Unexpected exception. This is 
a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.api.TableException: Unsupported conversion 
from data type 'BINARY(1)' (conversion class: [B) to type information. Only 
data types that originated from type information fully support a reverse 
conversion.
at 
org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
   使用flnk-sql的ENCODE 进行转码的时候显示如下问题ENCODE(F_sp_withdraw_user_name,'ISO-8859-1') 
也是一样的
   
 
在 2020/11/30 15:53,“Benchao Li” 写入:

Hi,

目前Flink SQL应该还没有正式支持json函数吧,上面的报错信息看起来也是符合预期的,说的是目前还找不到这个函数。

相关信息可以参考:https://issues.apache.org/jira/browse/FLINK-9477


Yan,Yunpeng(DXM,PB)  于2020年11月30日周一 下午2:18写道:

> Flink SQL> select JSON_OBJECT('product_type' VALUE product_type)
> > from income_fee
> > ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: No match found for
> function signature JSON_OBJECT(, , )
>
> Flink SQL> select JSON_OBJECT('product_type' VALUE product_type)
> > from sp_income_fee
> > where enabled = 1
> > group by id;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: No match found for
> function signature JSON_OBJECT(, , )
>
> Flink SQL> select JSON_ARRAYAGG(product_type)
> > from income_fee
> > where f_enabled = 1;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Unsupported Function:
> 'JSON_ARRAYAGG_ABSENT_ON_NULL'
>
> 闫云鹏
> DXM 支付业务部
> 地址:北京市海淀区西北旺东路度小满金融总部
> 邮编:100085
> 手机:13693668213
> 邮箱:yanyunp...@duxiaoman.com
>
> 度小满金融
> 精于科技 值得信赖
>
>
>
> 在 2020/11/30 11:05,“caozhen” 写入:
>
> 可以把使用方法和 报错信息 发下嘛?
>
>
>
>
> Yan,Yunpeng(DXM,PB) wrote
> > Hi:
> >   尝试使用flink-sql将聚合结果json展示的时候发现flink是支持JSON_OBJECTAGG, JSON_ARRAY,
> > JSON_OBJECT 等这种函数的(使用的默认的blink),
> 但是总是报错函数的用法不对,有相关资料来介绍这些函数的使用方法的吗?或者示例
> >
> > 闫云鹏
> > DXM 支付业务部
> > 地址:北京市海淀区西北旺东路度小满金融总部
> > 邮编:100085
> > 手机:13693668213
> > 邮箱:
>
> > yanyunpeng@
>
> > mailto:
>
> > yanyunpeng@
>
> > 
> >
> > 度小满金融
> >
> > 精于科技 值得信赖
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>

-- 

Best,
Benchao Li