??????Flink????Operator????????????Metrics
web-ui??metrics -- -- ??: "user-zh"
flink keyby??????????????????
hello everyone?? ??keyby?? ??flink streamAPI?? 10 ??flinkidkeyby id ??keyby ??:3:1?? ??
Re: Re: Flink SQL 1.12 批量数据导入,如果加速性能
Hi: 就是最简单的 定义一个Source table 一个Sink table 相同的Schema , 比如 insert into tableB select * from tableA ; 执行启8个并行度的话, 会有个7个并行度是Finish 状态 只有一个 在串行的导入数据, 其中schema 例如: CREATE TABLE tableA ( columnOne STRING, columnTwo BIGINT, PRIMARY KEY (`columnTwo `) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://x/?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'table-name', 'username' = 'xxx', 'password' = 'xxx', 'driver' = 'com.mysql.jdbc.Driver' ); CREATE TABLE tableB ( columnOne STRING, columnTwo BIGINT, PRIMARY KEY (`columnTwo `) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://x/?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'table-name', 'username' = 'xxx', 'password' = 'xxx', 'driver' = 'com.mysql.jdbc.Driver' ); --- Best, WuKong 发件人: Caizhi Weng 发送时间: 2021-10-26 12:43 收件人: flink中文邮件组 主题: Re: Flink SQL 1.12 批量数据导入,如果加速性能 Hi! 我通过 Flink SQL 无论怎么加大并行度, 都是单并行度导入 你是如何加大并行度的?除 source 外其他节点也是单并行度吗,还是说可以成功加大?能否分享一下你的 SQL 便于解答? WuKong 于2021年10月26日周二 上午11:36写道: > Hi: > 我目前有个场景, 需要通过基于Flink SQL 进行历史数据导入, 比如Source 端是一张MYSQL 表, Sink端 > 也是一张MSYQL 表, 我通过Flink SQL 无论怎么加大并行度, 都是单并行度导入,速率很慢, 请问有什么需要配置的吗? 或者其他解决方案 > 可以基于SQL进行大批量数据导入,数十亿量级。 > > > > --- > Best, > WuKong >
Flink没有Operator级别的数据量Metrics
Flink只能看到Task级别的流入流出数据量,而没有Operator级别的。这个是出于性能考量吗?未来会加入一个开关,可以看到Operator级别的,方便debug吗?
Re: How to execute multi SQL in one job
Hi, you can do like this: ```java val statementSet = tableEnv.createStatementSet() val insertSqlBuffer = ListBuffer.empty[String] val calciteParser = new CalciteParser(SqlUtil.getSqlParserConfig(tableEnv.getConfig)) sqlArr .foreach(item => { println(item) val itemNode = calciteParser.parse(item) itemNode match { case sqlSet: SqlSet => { configuration.setString(sqlSet.getKeyString, sqlSet.getValueString) } case _: RichSqlInsert => insertSqlBuffer += item case _ => { println(item) val itemResult = tableEnv.executeSql(item) itemResult.print() } } }) // execute batch inserts if (insertSqlBuffer.size > 0) { insertSqlBuffer.foreach(item => { println("insert sql: " + item) statementSet.addInsertSql(item) }) val explain = statementSet.explain() println(explain) statementSet.execute() } ``` > On Oct 26, 2021, at 11:27, 刘建刚 wrote: > > I have multi batch SQL commands separated by semicolon(;). The SQL commands > need to be executed in order(In other cases, the SQL command may share > sources or sinks). I want to execute them in one job. When I use > tableEnv.executeSql(multiSQL), it will throw errors. How can I execute them > in one job? Thanks.
Re: Flink SQL 1.12 批量数据导入,如果加速性能
Hi! 我通过 Flink SQL 无论怎么加大并行度, 都是单并行度导入 你是如何加大并行度的?除 source 外其他节点也是单并行度吗,还是说可以成功加大?能否分享一下你的 SQL 便于解答? WuKong 于2021年10月26日周二 上午11:36写道: > Hi: > 我目前有个场景, 需要通过基于Flink SQL 进行历史数据导入, 比如Source 端是一张MYSQL 表, Sink端 > 也是一张MSYQL 表, 我通过Flink SQL 无论怎么加大并行度, 都是单并行度导入,速率很慢, 请问有什么需要配置的吗? 或者其他解决方案 > 可以基于SQL进行大批量数据导入,数十亿量级。 > > > > --- > Best, > WuKong >
Re: How to execute multi SQL in one job
Thanks very much. Forgive me for the simple question. I have found the doc in the latest code. My inner code is too old... Jake 于2021年10月26日周二 上午11:39写道: > > Hi, you can do like this: > > ```java > > val statementSet = tableEnv.createStatementSet() > val insertSqlBuffer = ListBuffer.empty[String] > > > val calciteParser = new CalciteParser(SqlUtil.getSqlParserConfig > (tableEnv.getConfig)) > sqlArr > .foreach(item => { > println(item) > val itemNode = calciteParser.parse(item) > > itemNode match { > case sqlSet: SqlSet => { > configuration.setString(sqlSet.getKeyString, sqlSet.getValueString) > } > case _: RichSqlInsert => insertSqlBuffer += item > case _ => { > println(item) > val itemResult = tableEnv.executeSql(item) > itemResult.print() > } > } > }) > > // execute batch inserts > if (insertSqlBuffer.size > 0) { > insertSqlBuffer.foreach(item => { > println("insert sql: " + item) > statementSet.addInsertSql(item) > }) > val explain = statementSet.explain() > println(explain) > statementSet.execute() > } > > > ``` > > > On Oct 26, 2021, at 11:27, 刘建刚 wrote: > > I have multi batch SQL commands separated by semicolon(;). The SQL > commands need to be executed in order(In other cases, the SQL command may > share sources or sinks). I want to execute them in one job. When I > use tableEnv.executeSql(multiSQL), it will throw errors. How can I execute > them in one job? Thanks. > > >
Flink SQL 1.12 批量数据导入,如果加速性能
Hi: 我目前有个场景, 需要通过基于Flink SQL 进行历史数据导入, 比如Source 端是一张MYSQL 表, Sink端 也是一张MSYQL 表, 我通过Flink SQL 无论怎么加大并行度, 都是单并行度导入,速率很慢, 请问有什么需要配置的吗? 或者其他解决方案 可以基于SQL进行大批量数据导入,数十亿量级。 --- Best, WuKong
How to execute multi SQL in one job
I have multi batch SQL commands separated by semicolon(;). The SQL commands need to be executed in order(In other cases, the SQL command may share sources or sinks). I want to execute them in one job. When I use tableEnv.executeSql(multiSQL), it will throw errors. How can I execute them in one job? Thanks.
Re: flink1.12.1 读取kafka的数据写入到clickhouse如何支持upsert操作呢
Hi! 你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData 是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。 扯 于2021年10月26日周二 上午9:49写道: > > 您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下: > > clickhouse建表语句如下: > CREATE TABLE test_local.tzling_tb3( > uuid String, > product String, > platform String, > batchId String, > id String, > account String, > customerId String, > reportName String, > dt String, > campaign String, > adGroup String, > generalField String, > currency String, > impressions String, > cost String, > clicks String, > conversions String, > createDateTime String, > createTime BIGINT, > key String, > pdate String > )engine = MergeTree PARTITION BY pdate order by createTime; > 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。 > > processData.addSink(new MSKUpsertClickHouseSink()); > 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为: > 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。 > > 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢? >
Re: flink写mysql问题
Hi! 在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢 JdbcDynamicTableSink 不包含具体 sink function 的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的 jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。 写 mysql 的 qps 只能到几百,反压严重 jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval 就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。 算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成 checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint 也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。 [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows a <806040...@qq.com.invalid> 于2021年10月26日周二 上午9:49写道: > > 各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢 > > 2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗
flink??mysql????
flink??mysqlsinkJdbcDynamicTableSink??checkpoint??1.??flush??mysql?? 2.??mysql??qps??sum??1000??/s
flink1.12.1 ????kafka????????????clickhouse????????upsert??????
flink??clickHouse??flink1.12.1kafkaupsertclickhouse?? clickhouse?? CREATE TABLE test_local.tzling_tb3( uuid String, product String, platform String, batchId String, id String, account String, customerId String, reportName String, dt String, campaign String, adGroup String, generalField String, currency String, impressions String, cost String, clicks String, conversions String, createDateTime String, createTime BIGINT, key String, pdate String )engine = MergeTree PARTITION BY pdate order by createTime; ??uuid updateappend?? processData.addSink(new MSKUpsertClickHouseSink()); MSKUpsertClickHouseSink.javaclickhouse??sink ??uuid??append??uuid??append??,??1??clickhouseuuid??
Re: flink yarn-per-job???? ynm????????????
-- Original -- From: "Yang Wang";
回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控
currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets 在 2021-10-25 10:31:12,"Caizhi Weng" 写道: >Hi! > >这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过 >metrics 读取,见 [1]。 > >[1] >https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors > >杨浩 于2021年10月25日周一 上午10:20写道: > >> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度
Re: flink yarn-per-job模式 ynm命令不起作用
-t参数需要搭配-D一起来使用,而不是使用-y来引导 例如:-t yarn-per-job -Dyarn.application.name=flink-test Best, Yang Lawulu 于2021年10月25日周一 上午11:41写道: > 例如: > bin/flink run -ynm flink-test -t yarn-per-job --detached > ./examples/streaming/TopSpeedWindowing.jar > > > 在yarn ui上面看name还是 Flink per-job cluster
关于flink客户端脚本在Yarn-session模式下无法加载第三方依赖的问题
HI ALL: 背景:我使用flink客户端脚本提交作业到Yarn集群,通过-C参数指定udf依赖包的URL,yarn.ship-files参数指定了依赖包目录,在yarn-per-job模式下作业可正常运行得到正确结果,但在yarn-session模式下,却报ClassNotFoundException,查看taskmanager.log,依赖包确实没有加载到classpath。 两种提交命令分别如下: 1、flink-1.13.2/bin/flink run -t yarn-per-job -Dyarn.ship-files=/home/chenqizhu/myflink/usrlib -C file:///home/chenqizhu/myflink/usrlib/flink-sql-udf-1.0-SNAPSHOT.jar flink-test-1.0-SNAPSHOT.jar 2、flink-1.13.2/bin/flink run -t yarn-session -Dyarn.application.id=application_1607512101880_345557 -Dyarn.ship-files=/home/chenqizhu/myflink/usrlib -C file:///home/chenqizhu/myflink/usrlib/flink-sql-udf-1.0-SNAPSHOT.jar flink-test-1.0-SNAPSHOT.jar 问题:为什么在yarn-session模式下报错,per-job模式却可正常运行 如果以上命令不适用于Yarn-session模式,请问我用yarn-session模式提交作业时,在不把udf依赖打进作业jar的情况下,该如何加载udf依赖包? 客户端版本:1.13.2 taskmanager.log报错信息: ClassLoader info: URLClassLoader: Classnot resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Causedby: java.lang.ClassNotFoundException: com.cqz.component.flink.sql.udf.SumFunction at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) at java.lang.Class.forName0(NativeMethod) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ... 10 more