??????Flink????Operator????????????Metrics

2021-10-25 文章 xiazhl
web-ui??metrics




--  --
??: 
   "user-zh"



flink keyby??????????????????

2021-10-25 文章 xiazhl
hello everyone??               
      ??keyby??    
  


      ??flink 
streamAPI?? 10
              
??flinkidkeyby 
id


      ??keyby  
??:3:1?? ??

Re: Re: Flink SQL 1.12 批量数据导入,如果加速性能

2021-10-25 文章 WuKong
Hi:
 就是最简单的 定义一个Source table 一个Sink table 相同的Schema , 比如 insert into tableB select 
* from tableA ; 执行启8个并行度的话, 会有个7个并行度是Finish 状态 只有一个 在串行的导入数据, 其中schema 例如:


CREATE TABLE tableA  (
columnOne STRING,
columnTwo BIGINT,
PRIMARY KEY (`columnTwo `) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://x/?useUnicode=true&characterEncoding=UTF-8',
  'table-name' = 'table-name',
  'username' = 'xxx',
  'password' = 'xxx',
  'driver' = 'com.mysql.jdbc.Driver'
);


CREATE TABLE tableB  (
columnOne STRING,
columnTwo BIGINT,
PRIMARY KEY (`columnTwo `) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://x/?useUnicode=true&characterEncoding=UTF-8',
  'table-name' = 'table-name',
  'username' = 'xxx',
  'password' = 'xxx',
  'driver' = 'com.mysql.jdbc.Driver'
);



---
Best,
WuKong
 
发件人: Caizhi Weng
发送时间: 2021-10-26 12:43
收件人: flink中文邮件组
主题: Re: Flink SQL 1.12 批量数据导入,如果加速性能
Hi!
 
我通过 Flink SQL 无论怎么加大并行度, 都是单并行度导入
 
 
你是如何加大并行度的?除 source 外其他节点也是单并行度吗,还是说可以成功加大?能否分享一下你的 SQL 便于解答?
 
WuKong  于2021年10月26日周二 上午11:36写道:
 
> Hi:
> 我目前有个场景, 需要通过基于Flink SQL 进行历史数据导入, 比如Source 端是一张MYSQL 表, Sink端
> 也是一张MSYQL 表, 我通过Flink SQL 无论怎么加大并行度, 都是单并行度导入,速率很慢, 请问有什么需要配置的吗? 或者其他解决方案
> 可以基于SQL进行大批量数据导入,数十亿量级。
>
>
>
> ---
> Best,
> WuKong
>


Flink没有Operator级别的数据量Metrics

2021-10-25 文章 Ada Luna
Flink只能看到Task级别的流入流出数据量,而没有Operator级别的。这个是出于性能考量吗?未来会加入一个开关,可以看到Operator级别的,方便debug吗?


Re: How to execute multi SQL in one job

2021-10-25 文章 Jake

Hi, you can do like this: 

```java
val statementSet = tableEnv.createStatementSet()
val insertSqlBuffer = ListBuffer.empty[String]

val calciteParser = new 
CalciteParser(SqlUtil.getSqlParserConfig(tableEnv.getConfig))
sqlArr
.foreach(item => {
println(item)
val itemNode = calciteParser.parse(item)

itemNode match {
case sqlSet: SqlSet => {
configuration.setString(sqlSet.getKeyString, 
sqlSet.getValueString)
}
case _: RichSqlInsert => insertSqlBuffer += item
case _ => {
println(item)
val itemResult = tableEnv.executeSql(item)
itemResult.print()
}
}
})

// execute batch inserts
if (insertSqlBuffer.size > 0) {
insertSqlBuffer.foreach(item => {
println("insert sql: " + item)
statementSet.addInsertSql(item)
})
val explain = statementSet.explain()
println(explain)
statementSet.execute()
}

```


> On Oct 26, 2021, at 11:27, 刘建刚  wrote:
> 
> I have multi batch SQL commands separated by semicolon(;). The SQL commands 
> need to be executed in order(In other cases, the SQL command may share 
> sources or sinks). I want to execute them in one job. When I use 
> tableEnv.executeSql(multiSQL), it will throw errors.  How can I execute them 
> in one job? Thanks.



Re: Flink SQL 1.12 批量数据导入,如果加速性能

2021-10-25 文章 Caizhi Weng
Hi!

我通过 Flink SQL 无论怎么加大并行度, 都是单并行度导入


你是如何加大并行度的?除 source 外其他节点也是单并行度吗,还是说可以成功加大?能否分享一下你的 SQL 便于解答?

WuKong  于2021年10月26日周二 上午11:36写道:

> Hi:
> 我目前有个场景, 需要通过基于Flink SQL 进行历史数据导入, 比如Source 端是一张MYSQL 表, Sink端
> 也是一张MSYQL 表, 我通过Flink SQL 无论怎么加大并行度, 都是单并行度导入,速率很慢, 请问有什么需要配置的吗? 或者其他解决方案
> 可以基于SQL进行大批量数据导入,数十亿量级。
>
>
>
> ---
> Best,
> WuKong
>


Re: How to execute multi SQL in one job

2021-10-25 文章 刘建刚
Thanks very much. Forgive me for the simple question. I have found the doc
in the latest code. My inner code is too old...

Jake  于2021年10月26日周二 上午11:39写道:

>
> Hi, you can do like this:
>
> ```java
>
> val statementSet = tableEnv.createStatementSet()
> val insertSqlBuffer = ListBuffer.empty[String]
>
>
> val calciteParser = new CalciteParser(SqlUtil.getSqlParserConfig
> (tableEnv.getConfig))
> sqlArr
> .foreach(item => {
> println(item)
> val itemNode = calciteParser.parse(item)
>
> itemNode match {
> case sqlSet: SqlSet => {
> configuration.setString(sqlSet.getKeyString, sqlSet.getValueString)
> }
> case _: RichSqlInsert => insertSqlBuffer += item
> case _ => {
> println(item)
> val itemResult = tableEnv.executeSql(item)
> itemResult.print()
> }
> }
> })
>
> // execute batch inserts
> if (insertSqlBuffer.size > 0) {
> insertSqlBuffer.foreach(item => {
> println("insert sql: " + item)
> statementSet.addInsertSql(item)
> })
> val explain = statementSet.explain()
> println(explain)
> statementSet.execute()
> }
>
>
> ```
>
>
> On Oct 26, 2021, at 11:27, 刘建刚  wrote:
>
> I have multi batch SQL commands separated by semicolon(;). The SQL
> commands need to be executed in order(In other cases, the SQL command may
> share sources or sinks). I want to execute them in one job. When I
> use tableEnv.executeSql(multiSQL), it will throw errors.  How can I execute
> them in one job? Thanks.
>
>
>


Flink SQL 1.12 批量数据导入,如果加速性能

2021-10-25 文章 WuKong
Hi:
我目前有个场景, 需要通过基于Flink SQL 进行历史数据导入, 比如Source 端是一张MYSQL 表, Sink端 也是一张MSYQL 表, 
我通过Flink SQL 无论怎么加大并行度, 都是单并行度导入,速率很慢, 请问有什么需要配置的吗? 或者其他解决方案 
可以基于SQL进行大批量数据导入,数十亿量级。



---
Best,
WuKong


How to execute multi SQL in one job

2021-10-25 文章 刘建刚
I have multi batch SQL commands separated by semicolon(;). The SQL commands
need to be executed in order(In other cases, the SQL command may share
sources or sinks). I want to execute them in one job. When I
use tableEnv.executeSql(multiSQL), it will throw errors.  How can I execute
them in one job? Thanks.


Re: flink1.12.1 读取kafka的数据写入到clickhouse如何支持upsert操作呢

2021-10-25 文章 Caizhi Weng
Hi!

你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData
是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。

扯  于2021年10月26日周二 上午9:49写道:

>
> 您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:
>
> clickhouse建表语句如下:
> CREATE TABLE test_local.tzling_tb3(
> uuid String,
> product String,
> platform String,
> batchId String,
> id String,
> account String,
> customerId String,
> reportName String,
> dt String,
> campaign String,
> adGroup String,
> generalField String,
> currency String,
> impressions String,
> cost String,
> clicks String,
> conversions String,
> createDateTime String,
> createTime BIGINT,
> key String,
> pdate String
> )engine = MergeTree PARTITION BY pdate order by createTime;
> 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。
>
> processData.addSink(new MSKUpsertClickHouseSink());
> 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为:
> 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。
>
> 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?
>


Re: flink写mysql问题

2021-10-25 文章 Caizhi Weng
Hi!

在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢


JdbcDynamicTableSink 不包含具体 sink function
的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的
jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。

写 mysql 的 qps 只能到几百,反压严重


jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval
就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。

算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成


checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint
也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows

a <806040...@qq.com.invalid> 于2021年10月26日周二 上午9:49写道:

>
> 各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢
>
> 2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗


flink??mysql????

2021-10-25 文章 a
flink??mysqlsinkJdbcDynamicTableSink??checkpoint??1.??flush??mysql??
2.??mysql??qps??sum??1000??/s

flink1.12.1 ????kafka????????????clickhouse????????upsert??????

2021-10-25 文章 ??
      
flink??clickHouse??flink1.12.1kafkaupsertclickhouse??


clickhouse??
CREATE TABLE test_local.tzling_tb3(
    uuid String,
    product String,
    platform String,
batchId String,
id String,
account String,
customerId String,
reportName String,
dt String,
campaign String,
adGroup String,
generalField String,
currency String,
impressions String,
cost String,
clicks String,
conversions String,
createDateTime String,
createTime BIGINT,
key String,
pdate String
)engine = MergeTree PARTITION BY pdate order by createTime;


??uuid updateappend??


processData.addSink(new MSKUpsertClickHouseSink());
MSKUpsertClickHouseSink.javaclickhouse??sink 
??uuid??append??uuid??append??,??1??clickhouseuuid??




Re: flink yarn-per-job???? ynm????????????

2021-10-25 文章 Lawulu



-- Original --
From:  "Yang Wang";

回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-25 文章 杨浩
currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
>Hi!
>
>这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
>metrics 读取,见 [1]。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
>
>杨浩  于2021年10月25日周一 上午10:20写道:
>
>> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


Re: flink yarn-per-job模式 ynm命令不起作用

2021-10-25 文章 Yang Wang
-t参数需要搭配-D一起来使用,而不是使用-y来引导

例如:-t yarn-per-job -Dyarn.application.name=flink-test

Best,
Yang

Lawulu  于2021年10月25日周一 上午11:41写道:

> 例如:
> bin/flink run -ynm flink-test -t yarn-per-job  --detached
> ./examples/streaming/TopSpeedWindowing.jar
>
>
> 在yarn ui上面看name还是 Flink per-job cluster


关于flink客户端脚本在Yarn-session模式下无法加载第三方依赖的问题

2021-10-25 文章 陈启竹
HI ALL:
   
背景:我使用flink客户端脚本提交作业到Yarn集群,通过-C参数指定udf依赖包的URL,yarn.ship-files参数指定了依赖包目录,在yarn-per-job模式下作业可正常运行得到正确结果,但在yarn-session模式下,却报ClassNotFoundException,查看taskmanager.log,依赖包确实没有加载到classpath。
两种提交命令分别如下:
1、flink-1.13.2/bin/flink run -t yarn-per-job 
-Dyarn.ship-files=/home/chenqizhu/myflink/usrlib -C 
file:///home/chenqizhu/myflink/usrlib/flink-sql-udf-1.0-SNAPSHOT.jar 
flink-test-1.0-SNAPSHOT.jar
2、flink-1.13.2/bin/flink run -t yarn-session 
-Dyarn.application.id=application_1607512101880_345557 
-Dyarn.ship-files=/home/chenqizhu/myflink/usrlib -C 
file:///home/chenqizhu/myflink/usrlib/flink-sql-udf-1.0-SNAPSHOT.jar 
flink-test-1.0-SNAPSHOT.jar
   
   问题:为什么在yarn-session模式下报错,per-job模式却可正常运行
   
如果以上命令不适用于Yarn-session模式,请问我用yarn-session模式提交作业时,在不把udf依赖打进作业jar的情况下,该如何加载udf依赖包?


客户端版本:1.13.2
taskmanager.log报错信息:
ClassLoader info: URLClassLoader:
Classnot resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Causedby: java.lang.ClassNotFoundException: 
com.cqz.component.flink.sql.udf.SumFunction
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(NativeMethod)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
... 10 more