flink sql join 内存占用以及数据延迟问题咨询

2021-04-12 文章 董建
最近看了 云邪 大佬关于flink cdc sql的视频,并且动手操作了 例子 https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B 感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下: flink connector cdc 直接对接订单表,物流表,商品表表的binlog 1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在? 2、假如是全量join , 这

Re: Flink-SQL合并多条记录到Map中

2021-04-09 文章 RL_LEE
我使用UDAF的方式解决了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink-SQL合并多条记录到Map中

2021-04-07 文章 RL_LEE
我希望能将某些维度下过去24小时的每一小时的统计结果计算出,然后合并保存在一个map中 在写SQL时,我尝试将多条计算结果合并保存至Map中: create table to_redis( biz_name STRING, mchnt_id STRING, zb_value MAP ) WITH ( 'connector' = 'redis', 'redis-mode' = 'single', 'host' = '172.30.251.225', 'port' = '10006', 'password' = 'xxx',

Flink-SQL将多条查询结果合并保存至Map中

2021-04-07 文章 RL_LEE
我希望能将某些维度下过去24小时的每一小时的统计结果计算出,然后合并保存在一个map中 在写SQL时,我尝试将多条计算结果合并保存至Map中: create table to_redis( biz_name STRING, mchnt_id STRING, zb_value MAP ) WITH ( 'connector' = 'redis', 'redis-mode' = 'single', 'host' = '172.30.251.225', 'port' = '10006', 'password' =

Re: flink-sql 客户端采用execution.target: yarn-per-job 模式,如何指定提交的队列??

2021-04-02 文章 HunterXHunter
flink run -yD yarn.application.queue=x -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-sql 客户端采用execution.target: yarn-per-job 模式,如何指定提交的队列??

2021-04-02 文章 JasonLee
hi 可以通过在 flink-conf.yaml 配置文件中添加 yarn.application.queue 参数来设置 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql count distonct 优化

2021-03-30 文章 Robin Zhang
Hi,guomuhua `The number of inputs accumulated by local aggregation every time is based on mini-batch interval. It means local-global aggregation depends on mini-batch optimization is enabled ` ,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数.

Re: flink sql count distonct 优化

2021-03-30 文章 Robin Zhang
Hi,Jark 我理解疑问中的sql是一个普通的agg操作,只不过分组的键是时间字段,不知道您说的 `我看你的作业里面是window agg` ,这个怎么理解 Best, Robin Jark wrote >> 如果不是window agg,开启参数后flink会自动打散是吧 > 是的 > >> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗? > 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。 > > Best, > Jark > > [1]: >

flink-sql ??????????execution.target: yarn-per-job ????????????????????????????

2021-03-29 文章 ????
1??yarn-session?? 2??yarn-perjob?? -- ?? Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e66_1616483562588_2358_01_02() timed out. at

Re: flink sql count distonct 优化

2021-03-26 文章 Jark Wu
> 如果不是window agg,开启参数后flink会自动打散是吧 是的 > 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗? 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。 Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation On Fri,

Re: flink sql count distonct 优化

2021-03-25 文章 guomuhua
Jark wrote > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window > agg支持这个参数了。可以期待下。 > > Best, > Jark > > On Wed, 24 Mar 2021 at 19:29, Robin Zhang > vincent2015qdlg@ > > wrote: > >> Hi,guomuhua >> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 >> >> Best, >> Robin >> >> >>

Re: flink sql count distonct 优化

2021-03-25 文章 Jark Wu
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window agg支持这个参数了。可以期待下。 Best, Jark On Wed, 24 Mar 2021 at 19:29, Robin Zhang wrote: > Hi,guomuhua > 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 > > Best, > Robin > > > guomuhua wrote > > 在SQL中,如果开启了 local-global 参数:set > >

Re: flink sql count distonct 优化

2021-03-24 文章 Robin Zhang
Hi,guomuhua 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。 Best, Robin guomuhua wrote > 在SQL中,如果开启了 local-global 参数:set > table.optimizer.agg-phase-strategy=TWO_PHASE; > 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true; > set >

flink sql 写hive并行度设置问题

2021-03-23 文章 ggc
Hi, 请问: env.setParallelism(8); source = select * from table1, Table filterTable = source.filter(x-> x>10).limit(1); try (CloseableIterator rows = filterTable.execute().collect()) { while (rows.hasNext()) { Row r = rows.next(); String a = r.getField(1).toString();

flink sql 并行度问题

2021-03-23 文章 ggc
Hi, 请问: env.setParallelism(8); source = select * from table1, Table filterTable = source.filter(x-> x>10); try (CloseableIterator rows = filterTable.execute().collect()) { while (rows.hasNext()) { Row r = rows.next(); String a = r.getField(1).toString();

flink sql count distonct 优化

2021-03-23 文章 guomuhua
在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE; 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true; set table.optimizer.distinct-agg.split.bucket-num=1024; 还需要对应的将SQL改写为两段式吗? 例如: 原SQL: SELECT day,

Re: Flink SQL JDBC connector不能checkpoint

2021-03-18 文章 Gengshen Zhao
//ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#joins best, amenhub 发件人: Gengshen Zhao 发送时间: 2021-03-18 16:26 收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> 主题: Flink SQL JDBC connector不能checkpoint Flink开发者们,你们好: 我在使用flink开发过程中遇到一个问题,在使用jdbc做

Re: Flink SQL JDBC connector不能checkpoint

2021-03-18 文章 Gengshen Zhao
//ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#joins best, amenhub 发件人: Gengshen Zhao 发送时间: 2021-03-18 16:26 收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> 主题: Flink SQL JDBC connector不能checkpoint Flink开发者们,你们好: 我在使用flink开发过程中遇到一个问题,在使用jdbc做

Re: Flink SQL JDBC connector不能checkpoint

2021-03-18 文章 amenhub
hi, 请问使用的Flink版本是什么呢?猜测你应该是写成普通的join方式了,可参考 [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#joins best, amenhub 发件人: Gengshen Zhao 发送时间: 2021-03-18 16:26 收件人: user-zh@flink.apache.org 主题: Flink SQL JDBC connector不能checkpoint Flink开发者们,你们好: 我

Flink SQL JDBC connector不能checkpoint

2021-03-18 文章 Gengshen Zhao
Flink开发者们,你们好: 我在使用flink开发过程中遇到一个问题,在使用jdbc做维度表关联时,该算子很快就finished了,从而导致无法正常的checkoint(我看源码中checkpoint前会检查所有算子状态必须为running),请问目前有什么参数可以使jdbc不finished或者在算子finished后依然可以checkpoint么?如果没有,那对这种情况的支持是否列入flink未来版本的开发计划中? 期待你们的回信 祝各位工作顺利,谢谢 赵庚申 赵庚申 Phone:15383463958

Flink SQL JDBC connector不能checkpoint

2021-03-18 文章 Gengshen Zhao
Flink开发者们,你们好: 我在使用flink开发过程中遇到一个问题,在使用jdbc做维度表关联时,该算子很快就finished了,从而导致无法正常的checkoint(我看源码中checkpoint前会检查所有算子状态必须为running),请问目前有什么参数可以使jdbc不finished或者在算子finished后依然可以checkpoint么?如果没有,那对这种情况的支持是否列入flink未来版本的开发计划中? 期待你们的回信 祝各位工作顺利,谢谢 赵庚申 Phone:15383463958

回复:【flink sql group by 时间窗口】

2021-03-17 文章 guoyb
这个问题换种写法解决了,从MySQL数据库表里取时间戳字段再转timestamp,可以实现滚动窗口,没报错。 从MySQL表里直接取datetime类型,jdbc表flink设置timestamp类型,会报错,直接取source的时间类型字段是不是转换有点问题。 ---原始邮件--- 发件人: "guoyb"<861277...@qq.com 发送时间: 2021年3月17日(周三) 下午5:43 收件人: "user-zh"

Re: Flink sql 实现全局row_number()分组排序

2021-03-17 文章 Kurt Young
直接 SQL Top-N 即可: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n Best, Kurt On Tue, Mar 16, 2021 at 3:40 PM Tian Hengyu wrote: > 咋么有人啊~~~ > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

【flink sql group by 时间窗口】

2021-03-17 文章 guoyb
版本1.12.1 请问,支持事件时间吗?应该设置为哪种时间类型。 as_of_time time group by xx , tumble( as_of_time, interval "5" second) sql client报错, window aggregate can only be defined over a time attribute column, but time(0) encountered

Re: flink sql 的 count(distinct )问题

2021-03-16 文章 Benchao Li
Hi, 你可以理解为用的是MapState来保存的状态。 op <520075...@qq.com> 于2021年3月16日周二 下午3:00写道: > 各位大佬好,想问下flinksql里的count (distinct)默认是用哪种state保存的状态 -- Best, Benchao Li

Re: Flink sql 实现全局row_number()分组排序

2021-03-16 文章 Tian Hengyu
咋么有人啊~~~ -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql ?? count(distinct )????

2021-03-16 文章 op
??flinksqlcount (distinct??state??

Flink sql 实现全局row_number()分组排序

2021-03-14 文章 Tian Hengyu
在做实时数仓的时候,有需求要使用flink sql实现全局的row_number(),请教下各位有啥方案吗? 目前想的是,将流进行row number处理后存储到hbase中,然后每次处理流数据都和hbase进行关联,row_number处理后将最新结果存入hbase中,即通过对hbase的实时读写实现全局row_number(). 请问以上方法可行不,,实时读hbase关联,然后在写入最新数据到hbase,效率会有问题吗,这样能满足实时的需求吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql滑动窗口TOPN,输出结果有问题

2021-03-13 文章 me
Flink版本flink1.11 从第二个窗口后的结果就出现了,更新数据和废弃数据的,现在的输出是table转datastream然后filter为true的结果, 但是我想实现的TOP2 每个窗口只输出俩条数据,现在filter为true的结果是>2条的,求问怎么才能正常输出我想要的TOP2的数据? 执行SQL:select * from ( select *,ROW_NUMBER() OVER (PARTITION BY window_end ORDER BY counter DESC) as row_num from ( select world,count(world)

Flink sql中插入null值失败

2021-03-11 文章 Jimmy Zhang
Flink sql中如何插入null值,有人了解吗?目前,insert 语句values中直接写null在zeppelin上报错了。 | Best, Jimmy | Signature is customized by Netease Mail Master

Re: Re: flink sql如何从远程加载jar包中的udf

2021-03-11 文章 Jeff Zhang
Zeppelin 支持加载UDF jar的,可以参考下面的代码,不过架构上可能与你们的原有架构会有所差别 https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2#8iONE https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L469 chenxyz 于2021年3月12日周五 上午9:42写道: >

Re:Re: flink sql如何从远程加载jar包中的udf

2021-03-11 文章 chenxyz
目前这种方法不可行,在公司的平台化系统里提交flink任务,自己能掌控的只有代码这块。 在 2021-03-11 16:39:24,"silence" 写道: >启动时通过-C加到classpath里试试 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/

回复:flink sql如何从远程加载jar包中的udf

2021-03-11 文章 邓从宝
请user-zh 不要再发邮件了 -- 发件人:silence 发送时间:2021年3月11日(星期四) 16:39 收件人:user-zh 主 题:Re: flink sql如何从远程加载jar包中的udf 启动时通过-C加到classpath里试试 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql如何从远程加载jar包中的udf

2021-03-11 文章 silence
启动时通过-C加到classpath里试试 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql如何从远程加载jar包中的udf

2021-03-10 文章 chenxyz
1.10应该是registerFunction吧,当前jar包中没有这个类(这个类在远程jar包中),这种方法没办法实例化TableFunction。 > 2021年3月11日 上午11:21,HunterXHunter <1356469...@qq.com> 写道: > > 通过 createTemporarySystemFunction 试试看呢 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql如何从远程加载jar包中的udf

2021-03-10 文章 HunterXHunter
通过 createTemporarySystemFunction 试试看呢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql如何从远程加载jar包中的udf

2021-03-10 文章 chenxyz
我们将开发的udf放在远程服务器,需要动态地加载jar包。Flink版本1.10,代码如下 public static void main(String[] args) throws Exception { StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSet = EnvironmentSettings .newInstance()

flink sql sink多数据源问题

2021-03-10 文章 casel.chen
请教一下flink sql多条数据sink用 statement set 语句时, 1. 如果其中一条sink条发生背压或故障,会影响其他sink流吗? 2. 在flink sql cdc 消费同一张mysql表sink进多种数据源场景下,例如 mysql -> fink cdc -> mongodb & polardb 建议是启多个作业分别etl,还是分两段 mysql -> flink cdc -> kafka -> flink -> mongodb & polardb ... 呢?关系数据库端接入同时多个cdc会不会影响性能?

Re:Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-09 文章 Michael Ran
1.两套逻辑结果,只能定时任务做check2.同一套逻辑,就要具体分析了,只要不是一个人、一套代码逻辑出来的,都有可能出问题 在 2021-03-09 12:51:50,"Smile" 写道: >对,离线和实时的计算语义本来就是不一样的,所以这个地方也没有特别完美的解决方案,一般都是 case by case 看一下。 >有一些显而易见的问题比如 Join 是否关联成功这种还是比较容易查,其他的确实不太好判断。 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-08 文章 Smile
对,离线和实时的计算语义本来就是不一样的,所以这个地方也没有特别完美的解决方案,一般都是 case by case 看一下。 有一些显而易见的问题比如 Join 是否关联成功这种还是比较容易查,其他的确实不太好判断。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

回复:【flink sql-client 读写 Kerberos认证的hive】

2021-03-08 文章 guoyb
好的,谢谢! ---原始邮件--- 发件人: "Rui Li"https://issues.apache.org/jira/browse/FLINK-20913 有关了,这个issue是1.12.2修复的,可以升级一下试试。 On Mon, Mar 8, 2021 at 2:15 PM guoyb <861277...@qq.com wrote: 您好! hive.metastore.sasl.enabled 是true 启动sql client的时候,可以正常读取到认证信息,并读取metastore的表名。 读和写,认证就失败了。 ---原始邮件---

Re: 【flink sql-client 读写 Kerberos认证的hive】

2021-03-08 文章 Rui Li
; > > > ---原始邮件--- > 发件人: "Rui Li" 发送时间: 2021年3月8日(周一) 中午12:12 > 收件人: "user-zh" 主题: Re: 【flink sql-client 读写 Kerberos认证的hive】 > > > Hi, > > > 从你发的stacktrace来看,走到了set_ugi方法说明client认为server没有开启kerberos。确认一下你HiveCatalog这边指定的hive-site.xml是否配置

Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-08 文章 jindy_liu
恩,这里有个问题就是,假设我们以离线结果为基准去对比,但离线结果一般天级或小时级,但实时部分可能是秒级的,两个结果在连线环境做比较,也不好去看这个结果有差异的时候,到底实时计算部分有没有问题! 有很多种原因可能会导致这个结果不准确。。。比如flink sql的bug或都流式消息丢失了等等! -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-08 文章 Smile
你好, 实时和离线对数的问题确实也比较难,没有很完美的解决方案。 一般可以考虑把实时产出结果也落离线表,然后对两张离线表做对比,离线 Join 上然后跑具体对比逻辑即可。 Smile jindy_liu wrote > 有没有大佬有思路可以参考下? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Sent from: http://apache-flink.147419.n8.nabble.com/

回复:【flink sql-client 读写 Kerberos认证的hive】

2021-03-07 文章 guoyb
您好! hive.metastore.sasl.enabled 是true 启动sql client的时候,可以正常读取到认证信息,并读取metastore的表名。 读和写,认证就失败了。 ---原始邮件--- 发件人: "Rui Li"

Re: 【flink sql-client 读写 Kerberos认证的hive】

2021-03-07 文章 Rui Li
7, 2021 at 5:49 PM 861277...@qq.com <861277...@qq.com> wrote: > >> 环境: >> flink1.12.1 >> hive2.1.0 >> CDH6.2.0 >> >> >> 【问题描述】 >> 在没开启Kerberos认证时,可以正常读写hive表 >> >> 开启Kerberos认证后, >> 启动时可以正常读取到hive metastor

Re: 【flink sql-client 读写 Kerberos认证的hive】

2021-03-07 文章 Rui Li
【问题描述】 > 在没开启Kerberos认证时,可以正常读写hive表 > > 开启Kerberos认证后, > 启动时可以正常读取到hive metastore的元数据信息,读写不了表。 > > > 【sql-client.sh embedded】 > Flink SQL show tables; > dimension_table > dimension_table1 > test > > > Flink SQL

flink sql 这种实时计算结果如何与离线计算的结果做数据比对?

2021-03-07 文章 jindy_liu
有没有大佬有思路可以参考下? -- Sent from: http://apache-flink.147419.n8.nabble.com/

【flink sql-client 读写 Kerberos认证的hive】

2021-03-07 文章 861277...@qq.com
环境: flink1.12.1 hive2.1.0 CDH6.2.0 【问题描述】 在没开启Kerberos认证时,可以正常读写hive表 开启Kerberos认证后, 启动时可以正常读取到hive metastore的元数据信息,读写不了表。 【sql-client.sh embedded】 Flink SQL show tables; dimension_table dimension_table1 test Flink SQL select * from test; [ERROR] Could not execute SQL statement. Reason

?????? flink sql??????????????io??????????

2021-03-05 文章 ????
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join flink sql?? temporal-table join io casel.chen

Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
Hi Jark. 对于 upsert-kafka connector 有两个疑问: 1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* ` ,我试了下每次都是从 earliest 开始; 2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize 算子之后会变成2条,这个不是很理解? Qishang 于2021年3月5日周五 上午11:14写道: > > 某些原因导致上游 kafka partition

Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Qishang
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。 学到了,感谢。 Jark Wu 于2021年3月4日周四 下午11:11写道: > 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize > 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json > 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true >

Re: flink sql中如何使用异步io关联维表?

2021-03-04 文章 Leonard Xu
目前Flink SQL 中的connector都没实现异步io关联维表,接口是上已经支持了的,如果是自己实现可以参考[1] 另外,HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用[2] 祝好 [1]https://github.com/apache/flink/blob/73cdd3d0d9f6a807b3e47c09eef7983c9aa180c7/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories

Re: flink sql中如何使用异步io关联维表?

2021-03-04 文章 peibin wang
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join flink sql的 temporal-table join 应该都是通过异步io来关联维表的 casel.chen 于2021年3月3日周三 下午10:54写道: > flink sql中如何使用异步io关联维表?官网文档有介绍么?

Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Jark Wu
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 forward。 Best, Jark [1]:

Flink??????????????????????sql????????????

2021-03-04 文章 ????
flink??sql??sql? example: tEnv.registerDataStream("tableName", dataStream, "id, name, age ,time"); Table result = tEnv.sqlQuery("SQL" ); ??SQL??

flink sql没有jar包如何恢复

2021-03-04 文章 huayuan
如题 官方的恢复是flink run -s path xxx.jar 那么flink sql没有jar包如何恢复呢 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("execution.savepoint.path","xxx")貌

flink sql没有jar包如何恢复?

2021-03-04 文章 huayuan
如题 官方的恢复是flink run -s path xxx.jar 那么flink sql没有jar包如何恢复呢 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("execution.savepoint.path","xxx")貌

远程提交flink sql设置了checkpoint,flink sql没有jar包如何恢复呢?

2021-03-04 文章 huayuan
如题 官方的恢复是flink run -s path xxx.jar 那么flink sql没有jar包如何恢复呢 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); Configuration configuration = bsTableEnv.getConfig().getConfiguration(); configuration.setString("execution.savepoint.path","xxx")貌

Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-03 文章 Qishang
Hi 社区。 Flink 1.12.1 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 forword 的ETL没有作用。 insert into table_a select id,udf(a),b,c from table_b; 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? 2. 这个可以改变默认

Re: flink sql中如何使用异步io关联维表?

2021-03-03 文章 HunterXHunter
定义一个 sourcetable -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql中如何使用异步io关联维表?

2021-03-03 文章 casel.chen
flink sql中如何使用异步io关联维表?官网文档有介绍么?

Flink-SQL-Connector扩展问题

2021-02-28 文章 guaishushu1...@163.com
在将旧版本升级至1.12版本中,需要支持proctime和eventime时发现 DefinedProctimeAttribute该方法已过期,但是查看官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E4%BD%BF%E7%94%A8-tablesource-%E5%AE%9A%E4%B9%89-1 实例仍然使用 DefinedProctimeAttribute该方法 且并没有说明替换方法?

Re:Re: Re: Flink SQL 应用情况请教

2021-02-27 文章 邮件帮助中心
gt; > >发件人: yinghua...@163.com >发送时间: 2021-02-27 14:23 >收件人: user-zh >主题: Re: Flink SQL 应用情况请教 >这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No > Data,sink显示的是No Watermark >我的SQL语句如下: >CREATE TABLE t_stock_match_p_1( > id

Re: Re: Flink SQL 应用情况请教

2021-02-27 文章 xg...@126.com
1503,61,15811,1614405166858 1504,61,15813,1614405333871 1505,61,15814,1614405544862 1506,61,15814,1614405673863 就这几条数据,并行度设置为1 发件人: yinghua...@163.com 发送时间: 2021-02-27 14:23 收件人: user-zh 主题: Re: Flink SQL 应用情况请教 这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其

Re: Flink SQL 应用情况请教

2021-02-26 文章 yinghua...@163.com
这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No Data,sink显示的是No Watermark 我的SQL语句如下: CREATE TABLE t_stock_match_p_1( id VARCHAR, stkcode INT, volume INT, matchtime BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'-MM-dd HH:mm:ss')),

Re: Flink SQL 应用情况请教

2021-02-26 文章 占英华
不是指标显示问题,是数据一直没写到mysql中,也没啥错误日志,然后今天早上我把任务重启了下,数据就全部写入到mysql中了 > 在 2021年2月26日,15:02,Smile 写道: > > 你好, > > 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和 > numRecordsOut,看是哪个算子开始有输入没输出的。 > 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。 > > GroupWindowAggregate(groupBy=[stkcode],

Re: Flink sql 1.12写入hive报metastore失败

2021-02-26 文章 will he
我也遇到类似的问题了, 求问楼主最后怎么解决的. -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink sql 1.12写入hive报metastore失败

2021-02-26 文章 will he
我也遇到相同的问题了, 区别在于我是有一个springboot的项目提交的sql, 1.11.3上是好的, 换成1.12.1之后就不行了.sql-client本身可以执行, 但是我自己在springboot里面就提交不了sql了. 报的错是一样的, 求问楼主最后怎么解决的, 我以为应该是包有冲突, 但是具体是哪个jar包有冲突我还说不上来. -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL 应用情况请教

2021-02-25 文章 Smile
你好, 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和 numRecordsOut,看是哪个算子开始有输入没输出的。 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。 GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$, matchtime, 6)], properties=[w$start, w$end, w$rowtime, w$proctime],

Flink SQL 应用情况请教

2021-02-25 文章 yinghua...@163.com
我们在使用Flink SQL 统计一分钟内各个股票的交易量,SQL代码如下: CREATE TABLE t_stock_match_p_1( id VARCHAR, stkcode INT, volume INT, matchtime TIMESTAMP, WATERMARK FOR matchtime as matchtime ) WITH ( 'connector' = 'kafka-0.10', 'topic' = 'stock_match_p_1', 'scan.startup.mode' = 'latest-offset

Re: flink sql 并发数问题

2021-02-25 文章 Smile
Hi Jeff, 对于 SQL,现在只能设置整个 SQL 的并发,不能单独提高某个算子的并发。 不过可以考虑把消费 Kafka 的部分用 DataStream 来实现,然后再把 DataStream 转成 Table 去跑 SQL。这样消费 Kafka 的并发和 SQL 的并发就可以分开来设置了。 还有一个想法是如果你的 Kafka Source 到 UDF 之间有 hash (比如 Group By)之类的重分发的逻辑,是否可以忽略 Kafka

Flink SQL UDF 如何自定义Metrics

2021-02-24 文章 xingoo
HI, 如题,想要在Flink SQL中通过自定义UDF增加指标,从而实现自定义告警。那么如何在UDF中获取到RuntimeContext从而修改Metrics呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql tumble window 时区问题

2021-02-24 文章 xuhaiLong
hi all: 使用flink sql发现一个时区问题,在flink 1.11.3,flink 1.10 都有发现。 使用eventtime,datestream 转换为table,对times字段使用 rowtime。数据为 161421840,执行完rowtime 后变成 161418960 直接就少了8小时,导致后续的开窗都有问题。 代码参考:https://paste.ubuntu.com/p/xYpWNrR9MT/

flink sql 并发数问题

2021-02-24 文章 Jeff
hi all, 用flink sql消费kafka数据,有效并发数是由kafka分区数来决定的,请问有什么方法提高有效并发数吗? 因为有一个UDF是请求python http服务,速度不快,有没有方法单独提高这一块的并发数呢?

Re: Re: Flink SQL 写入Hive问题请教

2021-02-22 文章 Rui Li
是的,hive表必须存在HiveCatalog里才能正常读写 On Tue, Feb 23, 2021 at 10:14 AM yinghua...@163.com wrote: > > Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行? > > > > yinghua...@163.com > > 发件人: Rui Li > 发送时间: 2021-02-23 10:05 > 收件人: user-zh >

Re: Re: Flink SQL 写入Hive问题请教

2021-02-22 文章 yinghua...@163.com
Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行? yinghua...@163.com 发件人: Rui Li 发送时间: 2021-02-23 10:05 收件人: user-zh 主题: Re: Re: Flink SQL 写入Hive问题请教 你好, 用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么? On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote: > 我增加调试

Re: Re: Flink SQL 写入Hive问题请教

2021-02-22 文章 Rui Li
你好, 用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么? On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote: > 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert > into时创建Hive表时提示没有连接器的配置 > Table options are: 'is_generic'='false' > 'partition.time-extractor.timestamp-pattern'='$dt $hr' >

Re: Re:Re: Flink SQL 写入Hive问题请教

2021-02-22 文章 eriendeng
在hive catalog下创建kafka source表会在hive metastore中创建一张仅包含元数据的表,hive不可查,flink任务中可以识别并当成hive表读,然后只需要在hive dialect下正常读出写入即可。 参考 https://my.oschina.net/u/2828172/blog/4415970 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: Flink SQL 写入Hive问题请教

2021-02-22 文章 邮件帮助中心
我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert into时创建Hive表时提示没有连接器的配置 Table options are: 'is_generic'='false' 'partition.time-extractor.timestamp-pattern'='$dt $hr' 'sink.partition-commit.delay'='0S' 'sink.partition-commit.policy.kind'='metastore,success-file'

flink sql 写入clickhouse性能优化

2021-02-22 文章 kandy.wang
flink-jdbc写入clickhouse 在flink 1.12版本 tps只能到35W左右的tps,各位,有什么可以性能调优?

Re: Flink SQL 写入Hive问题请教

2021-02-22 文章 eriendeng
你这没有把dialect set成hive吧,走到了else分支。default dialect是需要指定connector的,参考文档的kafka到hive代码 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink SQL 写入Hive问题请教

2021-02-21 文章 yinghua...@163.com
我们在开发一个Flink SQL 框架,在从kafka读取数据加工写入到Hive时一直不成功,sql脚本如下: CREATE TABLE hive_table_from_kafka ( collect_time STRING, content1 STRING, content2 STRING ) PARTITIONED BY ( dt STRING,hr STRING ) TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr', 'sink.partition

Flink SQL并发度问题

2021-02-20 文章 guaishushu1...@163.com
这几天研究了flink table 转化为stream node 的源码,发现是某个算子的并发度取决于上一个算子的并发度。 但是在实际测试过程中发现使用window aggregate 语句时候 该算子的并发度和上游的source不一致 和我cli 命令配置的并发度一致 这是为什么呢? guaishushu1...@163.com

flink sql source kafka sink 到 mysql 遇主健冲突出现append现象

2021-02-19 文章 Yu Wang
MP, primary key (building_id, sofa_id, local_date, `hour`) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'username' = 'x' 'pass

Re: Flink SQL temporal table join with Hive 报错

2021-02-19 文章 Leonard Xu
> > 二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key > > 这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' = > 'all',但是还是因为没有 primary Key,所以无法 run。 > > 现在就是针对第二种情况,因为Hive的维度表不是我维护的,很多人都在用,所以不能修改去加上 primary key,无法进行 join. 第二种情况,hive表不是streaming读的,相当于是一张静态表,每次都是加载最新的全量,所以配置如下参数即可

Flink SQL时间序列化问题

2021-02-19 文章 guaishushu1...@163.com
Flink-1.12.0 SQL定义timestamp(3)格式出现时间解析问题 CREATE TABLE user_log1 ( user_id string, ts TIMESTAMP(3), proc_time as PROCTIME()) WITH ( Caused by: java.io.IOException: Failed to deserialize JSON '{"user_id":"1188","ts":"2021-02-19T17:52:20.921Z"}'. at

Re: Flink SQL temporal table join with Hive 报错

2021-02-10 文章 macia kk
Hi, Leonard 我们的业务变得越来越复杂,所以现在需要 Join Hive 维表的情况非常普遍。现在维表分三种情况 一,维表没有分区,没有 primary key 这时候 `'streaming-source.partition.include' = 'latest',因为没有 parition,所以 latest 应该加载的就是全部的数据。 二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key 这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' =

Re: Flink SQL temporal table join with Hive 报错

2021-02-09 文章 Leonard Xu
Hi, macia > 在 2021年2月9日,10:40,macia kk 写道: > > SELECT *FROM >( >SELECT tt.* >FROM >input_tabe_01 tt >FULL OUTER JOIN input_tabe_02 mt >ON (mt.transaction_sn = tt.reference_id) >and tt.create_time >= mt.create_time + INTERVAL

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 文章 macia kk
SELECT *FROM ( SELECT tt.* FROM input_tabe_01 tt FULL OUTER JOIN input_tabe_02 mt ON (mt.transaction_sn = tt.reference_id) and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES and tt.create_time <=

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 文章 Rui Li
Hi, 那join的语句是怎么写的呢? On Mon, Feb 8, 2021 at 2:45 PM macia kk wrote: > 图就是哪个报错 > > 建表语句如下,表示公共表,我也没有改的权限. > > CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT > 'country', `currency` string COMMENT 'currency', `exchange_rate` > decimal(25,10) COMMENT 'exchange rate') >

Re: Flink SQL temporal table join with Hive 报错

2021-02-07 文章 macia kk
图就是哪个报错 建表语句如下,表示公共表,我也没有改的权限. CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT 'country', `currency` string COMMENT 'currency', `exchange_rate` decimal(25,10) COMMENT 'exchange rate') PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd') ROW FORMAT SERDE

Re: Flink SQL temporal table join with Hive 报错

2021-02-07 文章 Rui Li
你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么? On Mon, Feb 8, 2021 at 10:33 AM macia kk wrote: > Currently the join key in Temporal Table Join can not be empty. > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错 > > [image: image.png] > -- Best regards! Rui Li

Flink SQL temporal table join with Hive 报错

2021-02-07 文章 macia kk
Currently the join key in Temporal Table Join can not be empty. 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错 [image: image.png]

Re: Flink SQL Hive 使用 partition.include 结果跟文档不一致

2021-02-04 文章 Leonard Xu
Hi > 在 2021年2月5日,09:47,macia kk 写道: > > the `latest` only works` when the > streaming hive source table used as temporal table. 只能用在temporal(时态)表中,时态表只能在 temporal join(也就是我们常说的维表join) 中使用 祝好

Flink SQL Hive 使用 partition.include 结果跟文档不一致

2021-02-04 文章 macia kk
the streaming hive source table used as temporal table. By default the option is `all`. 报错 Flink SQL> SELECT * FROM exrate_table /*+ OPTIONS('streaming-source.enable'='true','streaming-source.partition.include' = 'latest') */; [ERROR] Could not execute SQL statement. Rea

Re: flink sql

2021-02-04 文章 HunterXHunter
我做了。。 添加了一个sql语法类似 "select " + "msg," + "count(1) cnt" + " from test" + " where msg = 'hello' " + " group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " + " EMIT \n" + " WITH

flink1.10??flink-sql,pyflink????hdfs??sink??????????append????????????????????????

2021-02-02 文章 ??????
?? ??hdfs??sink?? ?? CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'= 'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode' = 'append')

Flink SQL关于'connector' = 'filesystem‘的问题求助!

2021-02-02 文章 yinghua...@163.com
今天在使用Flink 1.11.3版本使用Flink SQL将kafka中数据导入到HDFS上时提示如下的错误 Caused by: org.apache.flink.table.api.TableException: Could not load service provider for factories. at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:346

flink sql

2021-02-01 文章 ???????L
hi, ?? ??1.12flink sql ??datastream?,

<    2   3   4   5   6   7   8   9   10   11   >