Re: 发现flinksql写hive比写hdfs慢很多

2020-11-25 文章 admin
补充一下 我的flink版本是1.11.1
翻了下邮件列表,有个https://issues.apache.org/jira/browse/FLINK-19121 
 性能问题 
,不知道是否跟这个有关

> 2020年11月26日 上午11:49,admin <17626017...@163.com> 写道:
> 
> Hi,all
> 两个job,都从同一个kafka读数据,一份写入hdfs,一份写入hive,都是分钟分区,并发都是200。运行一段时间后发现写hive要落后hdfs很多,而且hive任务对应的hdfs路径下,某一分区内的文件甚至跨度2个小时之久。大家遇到过这种情况没
> 附上对应ddl
> hive:
> CREATE EXTERNAL TABLE hive_table (
>log_timestamp BIGINT,
>ip STRING,
>`raw` STRING
> ) PARTITIONED BY (`day` STRING, `hour` STRING,`minute` STRING) STORED AS 
> PARQUET
> TBLPROPERTIES (
>'parquet.compression'='SNAPPY',
>'sink.partition-commit.policy.kind' = 'success-file',
>'sink.partition-commit.success-file.name' = '_SUCCESS'
> );
> 
> Hdfs:
> 
> CREATE TABLE hdfs_table (
>log_timestamp BIGINT,
>ip STRING,
>`raw` STRING,
>`day` STRING, `hour` STRING,`minute` STRING
> ) PARTITIONED BY (`day` , `hour` ,`minute`) WITH (
>'connector'='filesystem',
>'path'='hdfs://xx/test.db/hdfs_table',
>'format'='parquet',
>'parquet.compression'='SNAPPY',
>'sink.partition-commit.policy.kind' = 'success-file’,
>'sink.partition-commit.success-file.name' = '_SUCCESS'
> );
> 
> 
> 实际hdfs文件对比:
> 
> -rw-r--r--   3 hadoop hadoop1514862 2020-11-26 09:26 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-150-824
> -rw-r--r--   3 hadoop hadoop   10798011 2020-11-26 09:34 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-830
> -rw-r--r--   3 hadoop hadoop4002618 2020-11-26 09:35 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-831
> -rw-r--r--   3 hadoop hadoop8057522 2020-11-26 09:51 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-844
> -rw-r--r--   3 hadoop hadoop6675744 2020-11-26 09:52 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-845
> -rw-r--r--   3 hadoop hadoop4062571 2020-11-26 09:51 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-844
> -rw-r--r--   3 hadoop hadoop   10247973 2020-11-26 09:52 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-845
> -rw-r--r--   3 hadoop hadoop 483029 2020-11-26 09:53 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-846
> -rw-r--r--   3 hadoop hadoop9440221 2020-11-26 09:16 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-816
> -rw-r--r--   3 hadoop hadoop5346956 2020-11-26 09:17 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-817
> -rw-r--r--   3 hadoop hadoop4940718 2020-11-26 09:51 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-844
> -rw-r--r--   3 hadoop hadoop9687410 2020-11-26 09:52 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-845
> -rw-r--r--   3 hadoop hadoop  51998 2020-11-26 09:53 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-846
> -rw-r--r--   3 hadoop hadoop   3518 2020-11-26 09:37 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-833
> -rw-r--r--   3 hadoop hadoop   13801987 2020-11-26 09:39 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-834
> -rw-r--r--   3 hadoop hadoop 963288 2020-11-26 09:40 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-835
> -rw-r--r--   3 hadoop hadoop6036601 2020-11-26 09:27 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-825
> -rw-r--r--   3 hadoop hadoop8864235 2020-11-26 09:29 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-826
> -rw-r--r--   3 hadoop hadoop   10865872 2020-11-26 09:37 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-833
> -rw-r--r--   3 hadoop hadoop4031077 2020-11-26 09:39 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-834
> -rw-r--r--   3 hadoop hadoop 228350 2020-11-26 09:09 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-811
> 

使用per-job部署成功的flink sql应用但是用applicationMode部署失败,提交到yarn上不到2秒就死掉,并且读取不到日志

2020-11-25 文章 shimin huang
```
14:56:44.536 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error
while running the command.
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn Application Cluster
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:414)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
[hadoop-common-3.0.0-cdh6.3.2.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.2.jar:1.11.2]
```

部署会报这种错,通过yarn logs -applicationId xxx去看yarn log,也看不到相关日志


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 文章 yujianbo
感谢Jark!
上次调整了mysql连接参数解决了超时问题,但是目前还是同步这张表的时候,就是在Snapshot快结束阶段卡主,报连接异常,请问这个地方应该排查哪个地方?

一、环境:
1、版本:1.11.2
2、flink CDC 用Stream  API 从mysql  同步到kudu
3、*这张表有3400万数据,老是在3340左右就卡住,目前已经把sink 到kudu直接改成 print输出还是会出现一模一样的报错。*


日志如下:
==
2020-11-26 14:00:15,293 ERROR *io.debezium.connector.mysql.SnapshotReader*  

[] - Failed due to error: Aborting snapshot due to error when last running
'SELECT * FROM `uchome`.`forums_post_12`': *Communications link failure*

The last packet successfully received from the server was 16 milliseconds
ago.  The last packet sent successfully to the server was 335,794
milliseconds ago.
*org.apache.kafka.connect.errors.ConnectException: Communications link
failure*

The last packet successfully received from the server was 16 milliseconds
ago.  The last packet sent successfully to the server was 335,794
milliseconds ago. Error code: 0; SQLSTATE: 08S01.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[flinkcdc4mysql2kudu.jar:?]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[flinkcdc4mysql2kudu.jar:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
~[flinkcdc4mysql2kudu.jar:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_231]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_231]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
*Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException:
Communications link failure*

The last packet successfully received from the server was 16 milliseconds
ago.  The last packet sent successfully to the server was 335,794
milliseconds ago.
at
com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
~[flinkcdc4mysql2kudu.jar:?]
at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:813)
~[flinkcdc4mysql2kudu.jar:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:747)
~[flinkcdc4mysql2kudu.jar:?]
... 3 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications
link failure

The last packet successfully received from the server was 16 milliseconds
ago.  The last packet sent successfully to the server was 335,794
milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
~[?:1.8.0_231]
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
~[?:1.8.0_231]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_231]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_231]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.clearInputStream(NativeProtocol.java:837)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:652)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:986)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendQueryString(NativeProtocol.java:921)
~[flinkcdc4mysql2kudu.jar:?]
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:1165)
~[flinkcdc4mysql2kudu.jar:?]
at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:801)
~[flinkcdc4mysql2kudu.jar:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:747)
~[flinkcdc4mysql2kudu.jar:?]
... 3 more
Caused by: java.io.IOException: Socket is closed
at
com.mysql.cj.protocol.AbstractSocketConnection.getMysqlInput(AbstractSocketConnection.java:72)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.clearInputStream(NativeProtocol.java:833)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:652)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:986)
~[flinkcdc4mysql2kudu.jar:?]
at
com.mysql.cj.protocol.a.NativeProtocol.sendQueryString(NativeProtocol.java:921)
~[flinkcdc4mysql2kudu.jar:?]
at 

?????? flink sql ????mysql ??????????

2020-11-25 文章 ??????
Hi
DEBUG??mysql




----
??: 
   "user-zh"



datastream里的map里面没法用dataset-sql吗

2020-11-25 文章 马里奥
map会持续输出list[Bean],每一个List[Bean]都是独立的,然后想针对这个List[Bean]进行复杂olap型的sql计算操作,之前的做法是先把List[Bean]插入数据库,再基于数据库进行sql计算。现在想在dataStream的一个Map里把这个List[Bean]转换成dataset,然后基于把dataset注册成table进行sql计算,尝试了下,发现不允许这样的操作。

还有什么方法能够直接通过flink实现这样的需求呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11.2 一直不入hbase问题

2020-11-25 文章 simon
Hi,ALL:
我这边测试flink入hbase发现一直入不了hbase,也没有报错。我的具体环境如下:
Hbase版本1.1.2with hdp2.6.5
Hadoop版本2.7.3
Flink版本1.11.2
Hbasesink:
create table f_volteics_gm_all_h_hbase (
`msisdn` string,
`INFO` ROW,
PRIMARY KEY (`msisdn`) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4.3',
 'table-name' = 'f_volteics_gm_all_h',
 'zookeeper.quorum' = '192.168.6.200:2181',
 'sink.buffer-flush.max-size'='0',
 'sink.buffer-flush.max-rows'='0'

处理逻辑从kafka读数据,写入hbase。如下:

我将我的程序打成jar包,在yarn上运行。结果没有报错,但一直没有数据入hbase。从监控页面上看bytes sent一直为0如下:


发送自 Windows 10 版邮件应用



回复:请教设置uid的最佳实践

2020-11-25 文章 flink小猪
你这里绝大多数都不用设置吧


| |
18579099920
|
|
邮箱:18579099...@163.com
|

签名由 网易邮箱大师 定制

在2020年11月26日 12:09,范瑞 写道:
无状态算子可以不设置


Best
fanrui



---原始邮件---
发件人: "kingdomad"

回复:请教设置uid的最佳实践

2020-11-25 文章 范瑞
无状态算子可以不设置


Best
fanrui



---原始邮件---
发件人: "kingdomad"

请教设置uid的最佳实践

2020-11-25 文章 kingdomad
请教大佬,使用savepoint的话最好是给每个算子设置一个唯一的uid吧?
那么问题就来了,我的程序里面算子太多了,每个都要设置一个uid吗?
那代码会相当冗余,给每个uid命名也是一个头疼的问题。比如下面的代码:
val stream = env.addSource(...).uid(...)
.filter(...).uid(...)
.map(...).uid(...)
.map(...).uid(...)
.map(...).uid(...)
.filter(...).uid(...)
.map(...).uid(...)
.filter(...).uid(...)
.map(...).uid(...)
.assignTimestampsAndWatermarks(...).uid(...)


请问该如何做比较好?




--

kingdomad



发现flinksql写hive比写hdfs慢很多

2020-11-25 文章 admin
Hi,all
两个job,都从同一个kafka读数据,一份写入hdfs,一份写入hive,都是分钟分区,并发都是200。运行一段时间后发现写hive要落后hdfs很多,而且hive任务对应的hdfs路径下,某一分区内的文件甚至跨度2个小时之久。大家遇到过这种情况没
附上对应ddl
hive:
CREATE EXTERNAL TABLE hive_table (
log_timestamp BIGINT,
ip STRING,
`raw` STRING
) PARTITIONED BY (`day` STRING, `hour` STRING,`minute` STRING) STORED AS PARQUET
TBLPROPERTIES (
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.success-file.name' = '_SUCCESS'
);

Hdfs:

CREATE TABLE hdfs_table (
log_timestamp BIGINT,
ip STRING,
`raw` STRING,
`day` STRING, `hour` STRING,`minute` STRING
) PARTITIONED BY (`day` , `hour` ,`minute`) WITH (
'connector'='filesystem',
'path'='hdfs://xx/test.db/hdfs_table',
'format'='parquet',
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file’,
'sink.partition-commit.success-file.name' = '_SUCCESS'
);


实际hdfs文件对比:

-rw-r--r--   3 hadoop hadoop1514862 2020-11-26 09:26 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-150-824
-rw-r--r--   3 hadoop hadoop   10798011 2020-11-26 09:34 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-830
-rw-r--r--   3 hadoop hadoop4002618 2020-11-26 09:35 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-831
-rw-r--r--   3 hadoop hadoop8057522 2020-11-26 09:51 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-844
-rw-r--r--   3 hadoop hadoop6675744 2020-11-26 09:52 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-845
-rw-r--r--   3 hadoop hadoop4062571 2020-11-26 09:51 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-844
-rw-r--r--   3 hadoop hadoop   10247973 2020-11-26 09:52 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-845
-rw-r--r--   3 hadoop hadoop 483029 2020-11-26 09:53 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-846
-rw-r--r--   3 hadoop hadoop9440221 2020-11-26 09:16 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-816
-rw-r--r--   3 hadoop hadoop5346956 2020-11-26 09:17 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-817
-rw-r--r--   3 hadoop hadoop4940718 2020-11-26 09:51 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-844
-rw-r--r--   3 hadoop hadoop9687410 2020-11-26 09:52 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-845
-rw-r--r--   3 hadoop hadoop  51998 2020-11-26 09:53 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-846
-rw-r--r--   3 hadoop hadoop   3518 2020-11-26 09:37 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-833
-rw-r--r--   3 hadoop hadoop   13801987 2020-11-26 09:39 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-834
-rw-r--r--   3 hadoop hadoop 963288 2020-11-26 09:40 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-835
-rw-r--r--   3 hadoop hadoop6036601 2020-11-26 09:27 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-825
-rw-r--r--   3 hadoop hadoop8864235 2020-11-26 09:29 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-826
-rw-r--r--   3 hadoop hadoop   10865872 2020-11-26 09:37 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-833
-rw-r--r--   3 hadoop hadoop4031077 2020-11-26 09:39 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-834
-rw-r--r--   3 hadoop hadoop 228350 2020-11-26 09:09 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-811
-rw-r--r--   3 hadoop hadoop   14661395 2020-11-26 09:11 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-812
-rw-r--r--   3 hadoop hadoop5451995 2020-11-26 09:29 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-160-826
-rw-r--r--   3 hadoop hadoop9149301 2020-11-26 09:30 

Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 Kyle Zhang
上面写错了,table要转成stream再打印

On Thu, Nov 26, 2020 at 11:46 AM Kyle Zhang  wrote:

> 调用executeSql,应该输出到另一张表里,比如printTable才能打印。
> 要不就是bsTableEnv.sqlQuery("select * from meson_budget_data").print()打印出来;
>
> On Thu, Nov 26, 2020 at 9:54 AM Leonard Xu  wrote:
>
>> Hi
>>
>> 调用了executeSql,作业就已经执行了,不用再调用下面的这个执行方法,看你已经配置了cp,确认下mysql的参数是否正确。
>>
>> > 在 2020年11月25日,18:42,冯草纸  写道:
>> >
>> > env.execute("sql test");
>> > // bsTableEnv.execute("sql test");
>>
>>


Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 Kyle Zhang
调用executeSql,应该输出到另一张表里,比如printTable才能打印。
要不就是bsTableEnv.sqlQuery("select * from meson_budget_data").print()打印出来;

On Thu, Nov 26, 2020 at 9:54 AM Leonard Xu  wrote:

> Hi
>
> 调用了executeSql,作业就已经执行了,不用再调用下面的这个执行方法,看你已经配置了cp,确认下mysql的参数是否正确。
>
> > 在 2020年11月25日,18:42,冯草纸  写道:
> >
> > env.execute("sql test");
> > // bsTableEnv.execute("sql test");
>
>


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 文章 Jark Wu
这个倒还好,毕竟任务不会非常多,这点压力MySQL还是能抗住的。

文档中的描述不太准确,不配 server id ,不会对 MySQL 链接造成冲击的。 我更新了下文档。
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job



On Thu, 26 Nov 2020 at 09:53, yujianbo <15205029...@163.com> wrote:

> 感谢Jark的回答,还想请问大佬,想问社区的mysql cdc 的wiki上说具有许多的不同的 server
> id去连接mysql服务器,会造成mysql
> cpu和连接高峰。那想问我们cdc采用sql指定不同的 serverid 去拉不同的表, 是不是这样的cdc 任务也不要太多?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 Leonard Xu
Hi

调用了executeSql,作业就已经执行了,不用再调用下面的这个执行方法,看你已经配置了cp,确认下mysql的参数是否正确。

> 在 2020年11月25日,18:42,冯草纸  写道:
> 
> env.execute("sql test");
> // bsTableEnv.execute("sql test");



Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 文章 yujianbo
感谢Jark的回答,还想请问大佬,想问社区的mysql cdc 的wiki上说具有许多的不同的 server id去连接mysql服务器,会造成mysql
cpu和连接高峰。那想问我们cdc采用sql指定不同的 serverid 去拉不同的表, 是不是这样的cdc 任务也不要太多?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 文章 Jark Wu
1. 默认是随机的。所以可能会重复。
2,3.  有问题。server id 是库级别的。

On Wed, 25 Nov 2020 at 17:41, yujianbo <15205029...@163.com> wrote:

> 主要是为了实现解析自定义的schema,sink端好输出到下游。
> 想请教一个问题:
>
> https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job
> 看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:
> 1、 如果是不同的stream 任务 的它的server id是不是同一个?
> 2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题
> 3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


?????? flink sql ????mysql ??????????

2020-11-25 文章 ??????
Hi,
 ??execute??
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useAnyPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
settings);


bsTableEnv.getConfig().getConfiguration().set(
   ExecutionCheckpointingOptions.CHECKPOINTING_MODE, 
CheckpointingMode.EXACTLY_ONCE);
bsTableEnv.getConfig().getConfiguration().set(
   ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(10));


String ddl = "CREATE TABLE meson_budget_data (\n" +
" ID BIGINT,\n" +
" Gangtise_Code 
STRING,\n" +
" InnerCode 
INT,\n" +
" Tradingday 
DATE, \n" +
" ClosePrice 
DOUBLE,\n" +
" Close_3day 
DOUBLE,\n" +
" Close_5day 
DOUBLE,\n" +
" Close_10day 
DOUBLE,\n" +
" Close_20day 
DOUBLE,\n" +
" Close_60day 
DOUBLE,\n" +
" Close_120day 
DOUBLE,\n" +
" Close_250day 
DOUBLE,\n" +
" PRIMARY KEY 
(ID) NOT ENFORCED\n" +
" ) WITH(\n" +
"  
'connector' = 'jdbc',\n" +
"  'url' = 
'jdbc:mysql://gangtisedb.mysql.database.chinacloudapi.cn_3306',\n" +
"  
'table-name' = 'gangtise_budget.meson_budget_data',\n" +
// "  
'username' = 'root',\n" +
// "  
'password' = '123456',\n" +
"  'driver' 
= 'com.mysql.jdbc.Driver'\n" +
" )";


bsTableEnv.executeSql(ddl);
TableResult mtr = bsTableEnv.executeSql("select * from meson_budget_data");
mtr.print();


env.execute("sql test");
// bsTableEnv.execute("sql test");



??


----
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

??????flink sql ????mysql ??????????

2020-11-25 文章 ??????
mysql




----
??: 
   "??" 
   


Re:回复:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题

2020-11-25 文章 hailongwang
另一张表可以这么定义:
 String rTable = "CREATE TABLE r_table (  " +
" r_a INT,  " +
" r_b string,  " +
" r_pt AS now(), " +
 "WATERMARK FOR r_pt AS r_pt" +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='5',  " +
" 'fields.r_a.min'='1',  " +
" 'fields.r_a.max'='5',  " +
" 'fields.r_b.length'='5'  " +
")";


Best,
Hailong




在 2020-11-25 19:05:04,"Asahi Lee" <978466...@qq.com> 写道:
>你好!
>   那两条拥有不同时间属性的流如何join呢?或者这样的需求如何处理?
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
><18868816...@163.com;
>发送时间:2020年11月25日(星期三) 晚上7:31
>收件人:"user-zh"
>主题:Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
>
>
>
>Hi,
> 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
> 而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
> Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
>
>
>Best,
>Hailong
>
>在 2020-11-25 16:23:27,"Asahi Lee" <978466...@qq.com 写道:
>你好! 我需要将事件时间的流同处理时间的流做Interval 
>Join时提示错误,我是用的是flink 
>1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 
>我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - 
>INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>
>import 
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class Test1 {
>
> public static void main(String[] args) {
> StreamExecutionEnvironment 
>bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings 
>= 
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment 
>bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>
> String lTable = "CREATE TABLE 
>l_table ( " +
>
> " l_a INT, " +
>
> " l_b string, " +
>
> " l_rt AS localtimestamp, " +
>
> " WATERMARK FOR l_rt AS l_rt " +
>
> ") WITH ( " +
>
> " 'connector' = 'datagen', " +
>
> " 'rows-per-second'='5', " +
>
> " 'fields.l_a.min'='1', " +
>
> " 'fields.l_a.max'='5', " +
>
> " 'fields.l_b.length'='5' " +
>
> ")";
> bsTableEnv.executeSql(lTable);
>
> String rTable = "CREATE TABLE 
>r_table ( " +
>
> " r_a INT, " +
>
> " r_b string, " +
>
> " r_pt AS proctime() " +
>
> ") WITH ( " +
>
> " 'connector' = 'datagen', " +
>
> " 'rows-per-second'='5', " +
>
> " 'fields.r_a.min'='1', " +
>
> " 'fields.r_a.max'='5', " +
>
> " 'fields.r_b.length'='5' " +
>
> ")";
> bsTableEnv.executeSql(rTable);
>
> String printTable = "CREATE 
>TABLE print (" +
>
> " l_a INT, " +
>
> " l_b string, " +
>
> " l_rt timestamp(3), " +
>
> " r_a INT, " +
>
> " r_b string, " +
>
> " r_pt timestamp(3) " +
>
> ") WITH ( " +
>
> " 'connector' = 'print' " +
>
> ") ";
>
> 
>bsTableEnv.executeSql(printTable);
>
> // 运行成功
>// Table joinTable = 
>bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = 
>r_table.r_a and l_table.l_rt = r_table.r_pt");
>
> // 运行错误,提示Rowtime attributes 
>must not be in the input rows of a regular join. As a workaround you can cast 
>the time attributes of input tables to TIMESTAMP before.
> Table joinTable = 
>bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = 
>r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND 
>r_table.r_pt + INTERVAL '5' SECOND");
>
> bsTableEnv.executeSql("insert 
>into print select * from " + joinTable);
>
> }
>
>}


flink on native k8s 没有

2020-11-25 文章 jiangjiguang719
flink版本:1.11
k8s版本:1.18  
flink on native k8s   按照官网配置,可以看到jobmanager ui
但是没有tm为0 slot为0  提交任务 就会一直卡主


角色配置:
[root@node20 rbac]# cat rbac-role.yaml 
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  namespace: di-flink-dev
  name: flink-admin
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["create","delete","get", "watch", "list"]


角色绑定:
[root@node20 rbac]# cat rbac-serviceaccount.yaml 
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: flink-admin-bind
  namespace: di-flink-dev
subjects:
- kind: Group
  name: system:serviceaccounts:di-flink-dev
  apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: Role
  name: flink-admin
  apiGroup: rbac.authorization.k8s.io


session启动命令:
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-cluster-test01 
-Dkubernetes.namespace=di-flink-dev 
-Dkubernetes.rest-service.exposed.type=NodePort 
-Dtaskmanager.memory.process.size=1028m -Dkubernetes.taskmanager.cpu=2 
-Dtaskmanager.numberOfTaskSlots=2 -Dresourcemanager.taskmanager-timeout=360


任务提交命令:
./bin/flink run -d -t kubernetes-session 
-Dkubernetes.cluster-id=my-cluster-test01 -Dkubernetes.namespace=di-flink-dev 
examples/streaming/WindowJoin.jar




??????flink 1.11.2 rowtime??proctime?? Interval Join ????????????

2020-11-25 文章 Asahi Lee
??
   
??join




----
??: 
   "user-zh"

<18868816...@163.com;
:2020??11??25??(??) 7:31
??:"user-zh"

Re: Unsubscribe

2020-11-25 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo


Xev Orm  于2020年11月25日周三 下午12:26写道:

> -help
>
> Xev Orm  于2020年11月25日周三 下午12:25写道:
>
> > Unsubscribe
> >
>


flink内存设置求推荐

2020-11-25 文章 赵一旦
现在tasManager设置如下:

taskmanager.memory.flink.size:  71680m
taskmanager.memory.framework.heap.size: 512m
taskmanager.memory.framework.off-heap.size: 512m
taskmanager.memory.task.off-heap.size:  17920m
taskmanager.memory.managed.size:512m
taskmanager.memory.jvm-metaspace.size:  512m


taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 1024mb
taskmanager.memory.network.max: 1536mb
taskmanager.memory.segment-size: 128kb


容器大小100g。

所以最终网络1.5g。

流式任务不需要managed mem,设置了512m意思下。

合理不。


Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题

2020-11-25 文章 hailongwang
Hi,
   因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
   而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
   Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。


Best,
Hailong

在 2020-11-25 16:23:27,"Asahi Lee" <978466...@qq.com> 写道:
>你好! 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 
>1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 我的 l_table.l_rt = 
>r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND 
>AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class Test1 {
>
>public static void main(String[] args) {
>StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
>String lTable = "CREATE TABLE l_table (  " +
>" l_a INT,  " +
>" l_b string,  " +
>" l_rt AS localtimestamp,  " +
>" WATERMARK FOR l_rt AS l_rt  " +
>") WITH (  " +
>" 'connector' = 'datagen',  " +
>" 'rows-per-second'='5',  " +
>" 'fields.l_a.min'='1',  " +
>" 'fields.l_a.max'='5',  " +
>" 'fields.l_b.length'='5'  " +
>")";
>bsTableEnv.executeSql(lTable);
>
>String rTable = "CREATE TABLE r_table (  " +
>" r_a INT,  " +
>" r_b string,  " +
>" r_pt AS proctime()  " +
>") WITH (  " +
>" 'connector' = 'datagen',  " +
>" 'rows-per-second'='5',  " +
>" 'fields.r_a.min'='1',  " +
>" 'fields.r_a.max'='5',  " +
>" 'fields.r_b.length'='5'  " +
>")";
>bsTableEnv.executeSql(rTable);
>
>String printTable = "CREATE TABLE print (" +
>"  l_a INT,  " +
>"  l_b string,  " +
>"  l_rt timestamp(3),  " +
>"  r_a INT,  " +
>"  r_b string,  " +
>"  r_pt timestamp(3)  " +
>") WITH (  " +
>" 'connector' = 'print' " +
>") ";
>
>bsTableEnv.executeSql(printTable);
>
>// 运行成功
>//Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
>r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");
>
>// 运行错误,提示Rowtime attributes must not be in the input rows of a 
> regular join. As a workaround you can cast the time attributes of input 
> tables to TIMESTAMP before.
>Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
> r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - 
> INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");
>
>bsTableEnv.executeSql("insert into print select * from " + joinTable);
>
>}
>
>}


Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 caozhen




是不是没有加这一行代码,tableEnv.execute("test");



AlfredFeng wrote
> Hi All,
>   我在Idea里用flink-jdbc-connector连接mysql,
> 建完表后执行env.executeSql("select * from my_table").print()方法,只打印了表头,没有数据是什么原因?
> flink版本1.11.2





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 文章 yujianbo
主要是为了实现解析自定义的schema,sink端好输出到下游。
想请教一个问题:
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job
看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:
1、 如果是不同的stream 任务 的它的server id是不是同一个?
2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题
3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql ????mysql ??????????

2020-11-25 文章 ??????
Hi All,
  Ideaflink-jdbc-connectormysql, 
env.executeSql("select * from 
my_table").print() flink1.11.2

flink 1.11.2 rowtime??proctime?? Interval Join ????????????

2020-11-25 文章 Asahi Lee
?? Interval 
Join??flink 
1.11.2joinjoin??
  l_table.l_rt = r_table.r_pt l_table.l_rt BETWEEN 
r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' 
SECOND??package join;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test1 {

public static void main(String[] args) {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);

String lTable = "CREATE TABLE l_table (  " +
" l_a INT,  " +
" l_b string,  " +
" l_rt AS localtimestamp,  " +
" WATERMARK FOR l_rt AS l_rt  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='5',  " +
" 'fields.l_a.min'='1',  " +
" 'fields.l_a.max'='5',  " +
" 'fields.l_b.length'='5'  " +
")";
bsTableEnv.executeSql(lTable);

String rTable = "CREATE TABLE r_table (  " +
" r_a INT,  " +
" r_b string,  " +
" r_pt AS proctime()  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='5',  " +
" 'fields.r_a.min'='1',  " +
" 'fields.r_a.max'='5',  " +
" 'fields.r_b.length'='5'  " +
")";
bsTableEnv.executeSql(rTable);

String printTable = "CREATE TABLE print (" +
"  l_a INT,  " +
"  l_b string,  " +
"  l_rt timestamp(3),  " +
"  r_a INT,  " +
"  r_b string,  " +
"  r_pt timestamp(3)  " +
") WITH (  " +
" 'connector' = 'print' " +
") ";

bsTableEnv.executeSql(printTable);

// 
//Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt");

// ??Rowtime attributes must not be in the input rows of a 
regular join. As a workaround you can cast the time attributes of input tables 
to TIMESTAMP before.
Table joinTable = bsTableEnv.sqlQuery("select * from l_table join 
r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - 
INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND");

bsTableEnv.executeSql("insert into print select * from " + joinTable);

}

}