Re: Flink sql retract to append

2024-04-30 文章 Zijun Zhao
v/table/sql/queries/deduplication/ > > 1.11版本不知道是不是支持 > > > > From: 焦童 > > Date: 2024-04-30 11:25 > > To: user-zh > > Subject: Flink sql retract to append > > Hello , > > 我使用Flink 1.11 版本 sql 进行数据去重(通过 group by > 形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream > 中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位 > >

Re: Flink sql retract to append

2024-04-30 文章 焦童
谢谢你的建议 但是top-1也会产生回撤信息 > 2024年4月30日 15:27,ha.fen...@aisino.com 写道: > > 可以参考这个 > https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/table/sql/queries/deduplication/ > 1.11版本不知道是不是支持 > > From: 焦童 > Date: 2024-04-30 11:25 > To: user-zh > Subjec

Flink sql retract to append

2024-04-29 文章 焦童
Hello , 我使用Flink 1.11 版本 sql 进行数据去重(通过 group by 形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream 中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位

Re:Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的 在 2024-03-08 11:08:51,"Yu Chen" 写道: >Hi iasiuide, >方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc >connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。 > >[1] https://issues.apache.org/jira/browse/FLINK-33365 > >祝好~ >

Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 Yu Chen
Hi iasiuide, 方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。 [1] https://issues.apache.org/jira/browse/FLINK-33365 祝好~ > 2024年3月8日 11:02,iasiuide 写道: > > > > > 图片可能加载不出来,下面是图片中的sql片段 > .. > END AS trans_type, > >

flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide
下面的sql片段中 ods_ymfz_prod_sys_divide_order 为kafka source表 dim_ymfz_prod_sys_trans_log 为mysql为表 dim_ptfz_ymfz_merchant_info 为mysql为表 flink web ui界面的执行计划片段如下: [1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS

Re: flink sql作业如何统计端到端延迟

2024-03-04 文章 Shawn Huang
Flink有一个端到端延迟的指标,可以参考以下文档[1],看看是否有帮助。 [1] https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/ops/metrics/#end-to-end-latency-tracking Best, Shawn Huang casel.chen 于2024年2月21日周三 15:31写道: > flink sql作业从kafka消费mysql过来的canal > json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?m

Re:Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-21 文章 Xuyang
;context.currentWatermark(); >>... >>} >> >> >> >> >> >> >> >>-- >> >>Best! >>Xuyang >> >> >> >> >> >>在 2024-02-20 19:38:44,"Feng Jin" 写道: >&g

flink sql作业如何统计端到端延迟

2024-02-20 文章 casel.chen
flink sql作业从kafka消费mysql过来的canal json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。 doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time - update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法? 查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批

Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
st! >Xuyang > > > > > >在 2024-02-20 19:38:44,"Feng Jin" 写道: >>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. >> >>Best, >>Feng >> >>On Tue, Feb 20, 2024 at 4:35 PM casel.chen wrote: >> >>&g

Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Xuyang
g Jin" 写道: >我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. > >Best, >Feng > >On Tue, Feb 20, 2024 at 4:35 PM casel.chen wrote: > >> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? >> >> >> public cl

Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Feng Jin
我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime. Best, Feng On Tue, Feb 20, 2024 at 4:35 PM casel.chen wrote: > 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? > > > public class XxxSinkFunction extends RichSinkFunction implements > Check

flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 casel.chen
请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark? public class XxxSinkFunction extends RichSinkFunction implements CheckpointedFunction, CheckpointListener { @Override public synchronized void invoke(RowData rowData, Context context) throws IOException

RE: Flink SQL Windowing TVFs

2023-12-28 文章 Jiabao Sun
Hi, 在 1.14.0 版本中,CUMULATE 函数是需要用在GROUP BY聚合场景下的[1]。 部署到生产的 SQL 是否包含了 GROUP BY 表达式? 本地测试的Flink版本是不是1.14.0? Best, Jiabao [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate On 2023/12/29 04:57:09 "jiaot...@mail.jj.cn" wrote: > Hi,

Flink SQL作业配置'table.exec.sink.upsert-materialize'参数会影响TIMESTAMP类型精度?

2023-11-30 文章 casel.chen
线上有一个flink sql作业,创建和更新时间列使用的是 TIMESTAMP(3) 类型,没有配置'table.exec.sink.upsert-materialize'参数时是正常时间写入的`-MM-dd HH:mm:ss.SSS`格式, 然后添加了'table.exec.sink.upsert-materialize'='NONE'参数后,输出的时间格式变成了 `-MM-dd HH:mm:ss.SS`。数据类型变成了TIMESTAMP(6),请问这是已知的issue么? -U[2023-11-29T21:11:02.327, 2023-11-29

Re:回复: flink sql如何实现json字符数据解析?

2023-11-29 文章 casel.chen
>发件人: casel.chen >发送时间: 2023-11-22 20:54 >收件人: user-zh@flink.apache.org >主题: flink sql如何实现json字符数据解析? >输入: > >{ > > "uuid":"", > > "body_data": > "[{\"fild1\":1"1231","fild2\":1"2341&

Re:Re: flink sql如何实现json字符数据解析?

2023-11-29 文章 casel.chen
2023-11-23 15:10:00,"jinzhuguang" 写道: >Flink >SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。 >比如: > >SourceT: ( > uuid String, > body_data ARRAY> >) > >SinkT ( > result ARRAY String, body_data.fild2 Str

Re: 关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-24 文章 jinzhuguang
感谢大佬,我找到了。 所以说SQL类的内建函数实际上使用的是calcite的能力,而flink自己的内建函数是在table api中使用 > 2023年11月24日 17:07,Xuyang 写道: > > Hi, > 关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS > NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL > > > > > -- > >

Re:关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-24 文章 Xuyang
Hi, 关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL -- Best! Xuyang 在 2023-11-24 15:15:04,"jinzhuguang" 写道: >flink 1.18.0 > > >例如我写下一条SQL: > select * from KafkaTable where

关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-23 文章 jinzhuguang
flink 1.18.0 例如我写下一条SQL: select * from KafkaTable where id is not null; IS NOT NULL应该属于系统内建函数,于是我找到相关代码: public static final BuiltInFunctionDefinition IS_NOT_NULL = BuiltInFunctionDefinition.newBuilder() .name("isNotNull") .kind(SCALAR)

Re: flink sql如何实现json字符数据解析?

2023-11-22 文章 jinzhuguang
Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。 比如: SourceT: ( uuid String, body_data ARRAY> ) SinkT ( result ARRAY> ) Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as body_data.fild1, body_data[1].

flink sql如何实现json字符数据解析?

2023-11-22 文章 casel.chen
输入: { "uuid":"", "body_data": "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]" } 输出: [ { "uuid": "", "body_data: null, "body_data.fild1": "123”, "body_data.fild2": "234" }, { "uuid": "", "body_data": null, "body_data.fild1":

flink sql支持批量lookup join

2023-11-21 文章 casel.chen
一行数据带了三个待lookup查询的key,分别是key1,key2和key3 id key1 key2 key3 想实现批量lookup查询返回一行数据 id value1 value2 value3 查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示 id key1 key2 key3 先将多列转成多行 id key1 id key2 id key3 分别进行lookup join后得到 id value1 id value2 id value3 最后多行转多列返回一行数据 id

Re: flink sql作业如何支持配置流?

2023-11-20 文章 Yu Chen
gt; > > 在 2023-11-20 19:24:47,"casel.chen" 写道: >> 我有一个flink >> sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。 >> >> >> create table customer_conf_tbl ( >> customer_id STRING >> ) with ( >> 'conn

flink sql作业如何支持配置流?

2023-11-20 文章 casel.chen
我有一个flink sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。 create table customer_conf_tbl ( customer_id STRING ) with ( 'connector' = 'apollo', '其他属性' ); select * from biz_table where customer_id in (select string_split(customer_id, ',') from

Flink sql 1.17.1 字段类型 DECIMAL(10, 0) 无法执行sql

2023-11-14 文章 刘聪聪
Flink 1.17.1 遇到 DECIMAL(10, 0)类型字段,直接无法运行,我用强转都不行,还是报数组越界,去除 DECIMAL(10, 0)类型字段,sql运行都正常。

Re:Re:Re:回复: flink sql不支持show create catalog 吗?

2023-10-30 文章 casel.chen
谢谢解答,我查了一下目前有两种CatalogStore实现,一个是基于内存的,另一个是基于文件系统的。 请问要如何配置基于文件系统的CatalogStore?这个文件可以在对象存储上吗?flink sql client要如何使用这个CatalogStore? 谢谢! 在 2023-10-30 10:28:34,"Xuyang" 写道: >Hi, CatalogStore 的引入我理解是为了Catalog能被更好地管理、注册和元数据存储,具体motivation可以参考Flip295[1]. >我的理解是倒不是说“引入Ca

Re: flink sql如何处理脏数据问题?

2023-10-29 文章 ying lin
还有一种做法就是使用datastream,datastream支持sideoutput,但 flink sql不支持,不过有一种迂回的做法就是flinksql -> datastream -> flink sql,可以查一下官网资料,flinksql和datastream可以互相转换。 Xuyang 于2023年10月30日周一 10:17写道: > Flink SQL目前对于脏数据没有类似side output的机制来输出,这个需求用自定义connector应该可以实现。 > > > > > > > &

Re:Re:回复: flink sql不支持show create catalog 吗?

2023-10-29 文章 Xuyang
> > > > > > > >在 2023-10-20 17:03:46,"李宇彬" 写道: >>Hi Feng >> >> >>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。 >>| | >> 回复的原邮件

Re:回复: flink sql不支持show create catalog 吗?

2023-10-29 文章 casel.chen
alog,管理起来很麻烦,有这个特性会好很多。 >| | > 回复的原邮件 >| 发件人 | Feng Jin | >| 发送日期 | 2023年10月20日 13:18 | >| 收件人 | | >| 主题 | Re: flink sql不支持show create catalog 吗? | >hi casel > > >从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。 > > >Best, >Feng > &

flink sql如何处理脏数据问题?

2023-10-28 文章 casel.chen
场景:使用flink sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka topic或者写入一个文件便于事后审查。这个目前有办法做到吗?

回复: flink sql不支持show create catalog 吗?

2023-10-20 文章 李宇彬
Hi Feng 我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。 | | 回复的原邮件 | 发件人 | Feng Jin | | 发送日期 | 2023年10月20日 13:18 | | 收件人 | | | 主题 | Re: flink sql不支持show create catalog 吗? | hi casel 从 1.18 开始,引入

Re: flink sql不支持show create catalog 吗?

2023-10-19 文章 Feng Jin
hi casel 从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。 Best, Feng On Fri, Oct 20, 2023 at 11:55 AM casel.chen wrote: > 之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink > sql不支持show create catalog 。 > 而据我所知doris是支持show create c

flink sql不支持show create catalog 吗?

2023-10-19 文章 casel.chen
之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink sql不支持show create catalog 。 而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?

Re: Flink SQL的状态清理

2023-10-17 文章 Jane Chan
:01 PM 小昌同学 wrote: > 你好,老师,我也是这样设置的,我这边是flink sql client,但是我去flink web ui界面并没有看到这个配置生效。 > > > | | > 小昌同学 > | > | > ccc0606fight...@163.com > | > 回复的原邮件 > | 发件人 | Jane Chan | > | 发送日期 | 2023年9月25日 11:24 | > | 收件人 | | > | 主题 | Re: Flink SQL的状态清理 |

Re: 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 jinzhuguang
感谢大佬!!! > 2023年10月13日 10:44,tanjialiang 写道: > > Hi, > 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922 > > > best wishes, > tanjialiang. > > > 回复的原邮件 > | 发件人 | jinzhuguang | > | 发送日期 | 2023年10月13日 10:39 | > | 收件人 | u

回复:关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 tanjialiang
Hi, 这个问题已经在1.17.0修复,详细可以看https://issues.apache.org/jira/browse/FLINK-30922 best wishes, tanjialiang. 回复的原邮件 | 发件人 | jinzhuguang | | 发送日期 | 2023年10月13日 10:39 | | 收件人 | user-zh | | 主题 | 关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。 | 首先,我的Flink版本为1.16.0 为了方便理解,我以Kafka作为案例来描述

关于Flink SQL无法对带metadata列的表在执行insert into时,指定具体的列名的问题。

2023-10-12 文章 jinzhuguang
'='earliest-offset', 'properties.bootstrap.servers'='localhost:9092', 'format'='json', 'json.ignore-parse-errors' = 'true' ); 正常情况: Flink SQL> insert into kafka_sink select user_id,name,`timestamp` from orders; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement

回复: Flink SQL的状态清理

2023-10-09 文章 小昌同学
你好,老师,我也是这样设置的,我这边是flink sql client,但是我去flink web ui界面并没有看到这个配置生效。 | | 小昌同学 | | ccc0606fight...@163.com | 回复的原邮件 | 发件人 | Jane Chan | | 发送日期 | 2023年9月25日 11:24 | | 收件人 | | | 主题 | Re: Flink SQL的状态清理 | Hi, 可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1] [1] https

Re: Flink SQL的状态清理

2023-09-24 文章 Jane Chan
set("table.exec.state.ttl", "86400 s") > > > > > | | > faronzz > | > | > faro...@163.com > | > > > 回复的原邮件 > | 发件人 | 小昌同学 | > | 发送日期 | 2023年09月21日 17:06 | > | 收件人 | user-zh | > | 主题 | Flink SQL的状态清理 | > > > 各位

回复:Flink SQL的状态清理

2023-09-21 文章 faronzz
试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s") | | faronzz | | faro...@163.com | 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2023年09月21日 17:06 | | 收件人 | user-zh | | 主题 | Flink SQL的状态清理 | 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置s

Flink SQL的状态清理

2023-09-21 文章 小昌同学
各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛 | | 小昌同学 | | ccc0606fight...@163.com |

Re: flink sql语句转成底层处理函数

2023-08-28 文章 Feng Jin
-zh@flink.apache.org | > | 抄送至 | | > | 主题 | Re: flink sql语句转成底层处理函数 | > 如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划 > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/ > > On Sun, Aug 27, 2023 at 5:23 PM 海风 <18751805...@163.com

回复:flink sql语句转成底层处理函数

2023-08-27 文章 海风
嗯,执行计划确实可以看到一些信息,只是还想知道是否还有比较好的方式能看具体有哪些底层函数以及状态,从而更方便去分析性能相关问题的 回复的原邮件 | 发件人 | Shammon FY | | 日期 | 2023年08月28日 12:05 | | 收件人 | user-zh@flink.apache.org | | 抄送至 | | | 主题 | Re: flink sql语句转成底层处理函数 | 如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划 [1] https://nightlies.apache.org/flink

Re: flink sql语句转成底层处理函数

2023-08-27 文章 Shammon FY
如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/ On Sun, Aug 27, 2023 at 5:23 PM 海风 <18751805...@163.com> wrote: > 请教下,是否可以去查询一个flink > sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢? > > >

flink sql语句转成底层处理函数

2023-08-27 文章 海风
请教下,是否可以去查询一个flink sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢?

Re: flink1.17.1版本 flink sql多表关联优化

2023-08-24 文章 xiaohui zhang
这种join写法会随时更新里面每一个字段,最终产出结果的业务含义是什么呢? 如果是取每个vehicle_code对应的最新统计指标值,是否可以用支持partial update的存储,用多个单独的sql直接写入目前就可以了 周先明 于2023年8月4日周五 11:01写道: > Regular Join 默认把数据都存储在State中,通常会结合TTL来进行优化 > > guanyq 于2023年8月3日周四 15:59写道: > > > 请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutio

Re: flink sql作业状态跨存储系统迁移问题

2023-08-18 文章 Tianwang Li
> [3] > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#state-savepoints-dir > > On Sat, Jul 29, 2023 at 11:09 AM casel.chen wrote: > > > 我们要将当前在Hadoop Yarn上运行的flink > > sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。 >

Re: flink1.17.1版本 flink sql多表关联优化

2023-08-03 文章 周先明
Regular Join 默认把数据都存储在State中,通常会结合TTL来进行优化 guanyq 于2023年8月3日周四 15:59写道: > 请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式 > > select > date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time, > b.vehicle_code, >

flink1.17.1版本 flink sql多表关联优化

2023-08-03 文章 guanyq
请问下多个表关联,这种flink sql如何优化呢,直接关联优点跑不动RuntimeExecutionMode.STREAMING 模式 select date_format(a.create_time, '-MM-dd HH:mm:ss') as create_time, b.vehicle_code, a.item_name, a.item_value, c.item_value as vehicle_score, d.current_fault

Re: flink sql作业状态跨存储系统迁移问题

2023-08-02 文章 Hangxiang Yu
oop Yarn上运行的flink > sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。 > 又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。 > > > 查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink > sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ? 感觉state > pro

flink sql作业状态跨存储系统迁移问题

2023-07-28 文章 casel.chen
我们要将当前在Hadoop Yarn上运行的flink sql作业迁移到K8S上,状态存储介质要从HDFS更换到对象存储,以便作业能够从之前保存点恢复,升级对用户无感。 又因为flink作业状态文件内容中包含有绝对路径,所以不能通过物理直接复制文件的办法实现。 查了一下官网flink state processor api目前读取状态需要传参uid和flink状态类型,但问题是flink sql作业的uid是自动生成的,状态类型我们也无法得知,请问有没有遍历目录下保存的所有状态并将其另存到另一个文件系统目录下的API ? 感觉state processor api更适合

flink sql 传参数问题

2023-07-12 文章 1
Hello: 请教2个问题。 1、flink 使用sql-client.sh -f xx.sql 怎么传递参数修改sql里面的文件。比如MySQL,Kafka的连接地址。 2、flink sql消费Kafka 设置group-offset,group.id之前没提交过,会直接报错。怎么设置成没提交过从earliest消费等等。 感谢大家

Re: flink on native k8s里如何使用flink sql gateway

2023-07-05 文章 Shammon FY
Hi, 我们的做法是启动Flink集群后,在其他节点(pod或者独立启动)启动Sql-Gateway,通过Flink的地址远程连接Flink集群,这样Sql-Gateway的部署和Flink集群完全分开 Best, Shammon FY On Tue, Jul 4, 2023 at 10:52 AM chaojianok wrote: > 大家好,请教个问题。 > > 用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql > gateway,大家有什么好的方案吗? > 目前的

Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-15 文章 daniel sun
退订 On Thu, Jun 15, 2023 at 7:23 PM im huzi wrote: > 退订 > > On Tue, Jun 13, 2023 at 08:51 casel.chen wrote: > > > 线上跑了200多个flink > > > sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。 > > flink > > > sql作业的指标

Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-15 文章 im huzi
退订 On Tue, Jun 13, 2023 at 08:51 casel.chen wrote: > 线上跑了200多个flink > sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。 > flink > sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称, > 请问这个问题有什么好的办法解决吗?

Re: Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-14 文章 Feng Jin
配置参数之后, task name 也会简化. Best, Feng On Wed, Jun 14, 2023 at 11:23 AM casel.chen wrote: > > > > > > > > > > > > > 谢谢,除了operator name,我看了flink sql作业生成的task name也很长,目前有办法可以简化下吗?例如 > > > flink_taskmanager_job_task_operator_fetch_total{j

Re:Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-13 文章 casel.chen
谢谢,除了operator name,我看了flink sql作业生成的task name也很长,目前有办法可以简化下吗?例如 flink_taskmanager_job_task_operator_fetch_total{job_id="4c24ce399f369ba2b7ae5ce51ec034d3",task_id="5c4ca2fea30dcf09bf3ee40c495fe808",task_attempt_id="5110227bf582bd21ecf6102625fadc16",ho

Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-12 文章 Feng Jin
/rest_api/#jobmanager-metrics Best, Feng On Tue, Jun 13, 2023 at 8:51 AM casel.chen wrote: > 线上跑了200多个flink > sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。 > flink > sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长

flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-12 文章 casel.chen
线上跑了200多个flink sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。 flink sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称, 请问这个问题有什么好的办法解决吗?

Flink SQL对同一kafka source进行多sink操作时会报javax.management.InstanceAlreadyExistsException异常

2023-06-02 文章 Jeff
sql示例: create table kafka_source() with ('connector'='kafka'); insert into sink_table1 select * from kafka_source; insert into sink_table2 select * from kafka_source; 报错内容如下: javax.management.InstanceAlreadyExistsException:

Re: 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 文章 Shammon FY
Hi 可以将天级时间和其他需要聚合的字段组成key,使用聚合算子,默认会每条数据完成计算后实时输出结果 Best, Shammon FY On Fri, May 26, 2023 at 3:44 PM casel.chen wrote: > 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 文章 casel.chen
用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

Re: 使用flink sql创建版本视图无法正常使用

2023-05-17 文章 Shammon FY
Hi, 你邮件里的图片无法显示,也没办法看到具体的错误信息 Best, Shammon FY On Thu, May 18, 2023 at 10:15 AM arkey w wrote: > flink版本:1.14.5 > 在项目使用版本表时,准备使用版本视图,但创建后无法正常使用。后根据官网提供的示例( Versioned Tables | Apache Flink >

使用flink sql创建版本视图无法正常使用

2023-05-17 文章 arkey w
flink版本:1.14.5 在项目使用版本表时,准备使用版本视图,但创建后无法正常使用。后根据官网提供的示例( Versioned Tables | Apache Flink )进行验证也同样无法使用,创建sql如下: 创建事实表: [image: image.png] 创建版本视图: [image: image.png] [image: image.png] Temporal

flink sql case when 中文数据写入doris出现乱码

2023-05-17 文章 casel.chen
使用flink sql写mysql表数据到doris表,发现case when语句判断交易类型使用了中文,写入后在doris查出是乱码,而mysql其他中文字段写入是正确的,想问一下这个sql中出现的乱码问题要解决?

回复:Flink SQL CEP如何处理双(多)流输入?

2023-05-12 文章 CloudFunny
双流join? 回复的原邮件 | 发件人 | casel.chen | | 发送日期 | 2023年05月12日 11:52 | | 收件人 | user-zh@flink.apache.org | | 主题 | Flink SQL CEP如何处理双(多)流输入? | 请问Flink SQL CEP只能处理单流输入吗?网上看到的例子都是在同一个输入流中进行CEP处理,有没有双(多)输入流下使用CEP处理的例子?谢谢!

Flink SQL CEP如何处理双(多)流输入?

2023-05-11 文章 casel.chen
请问Flink SQL CEP只能处理单流输入吗?网上看到的例子都是在同一个输入流中进行CEP处理,有没有双(多)输入流下使用CEP处理的例子?谢谢!

Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-10 文章 Hongshun Wang
uot;first") > .subtype(E1.class) > .where(...) > .followedBy("second") > .subtype(E2.class) > .where(...) > > 如果使用Flink SQL,可以直接使用双流Join+窗口实现 > > Best, > Shammon FY > > > > > On Wed, May 10, 2023 at 2:24 AM casel.chen wro

Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 文章 Shammon FY
Hi 如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种 DataStream s1 = ...; DataStream s2 = ...; DataStream s = s1.union(s1)...; Pattern = Pattern.begin("first") .subtype(E1.class) .where(...) .followedBy("second") .subtype(E2.class) .where(..

使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 文章 casel.chen
需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现? 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。

flink sql canal json格式侧输出parse error记录问题

2023-05-06 文章 casel.chen
线上使用flink sql消费kafka topic canal json格式数据,发现有一些数据中有的时间字段值为-00-00 00:00:00无法被解析,于是加了'canal-json.ignore-parse-errors = true' 参数,作业是能够正常运行了,但同时我们也希望知道哪些数据解析失败以便发给上游业务系统去自查。想问一下除了ignore外,有办法将这些parse error数据输出到另外一个kafka topic吗?谢谢!

Re: flink sql的codegen导致metaspace OOM疑问

2023-03-29 文章 Shammon FY
Hi 自增id可以为同一个作业的多个codegen类生成唯一类名 一般metaspace可以通过fullgc释放,你可以查看你的集群metaspace大小,是否触发了了fullgc Best, Shammon FY On Wednesday, March 29, 2023, tanjialiang wrote: > Hi all, >我有一个通过flink kubernetes operator定时提交到同一个session作业(底层是将flink > sql转JobGraph的逻辑下推到了JobManager执行),当他跑了一段时间后,Job

flink sql的codegen导致metaspace OOM疑问

2023-03-29 文章 tanjialiang
Hi all, 我有一个通过flink kubernetes operator定时提交到同一个session作业(底层是将flink sql转JobGraph的逻辑下推到了JobManager执行),当他跑了一段时间后,JobManager报了metaspace OOM. 经过排查后发现是flink sql codegen生成的代码类有一个自增ID,这些类在使用完后不会释放。 疑问: 1. flink sql codegen做这样的一个自增ID有什么特殊意义吗? 2. java中通过类加载器加载的类有什么办法可以释放?

找到多个default类型的ExecutorFactory导致提交flink sql作业失败

2023-03-28 文章 casel.chen
我的实时作业项目想解析sql获取到TableIdentifier做sql血缘,使用的版本是flink 1.15.2,同时引入了 flink-table-planner_2.12 和 flink-table-planner-loader 依赖,debug时发现 TableEnvironmentImpl create(EnvironmentSettings settings) 方法会调用 FactoryUtil.discoverFactory(classLoader, ExecutorFactory.class,

flink sql upsert mysql问题

2023-03-27 文章 小昌同学
你好,我这边使用flink sql实现四条流的关联,后续实现case when的逻辑,并且将数据插入到mysql,但是从结果数据来看,数据存在部分丢失,代码我粘贴再后面,麻烦各位老师指导,下面是sql【create function get_json_value as 'com.nesc.flink.udf.GetJsonValue'; set 'table.exec.sink.not-null-enforcer'='drop'; 测试环境 CREATE TABLE dm_cust_oact_prog_ri ( cust_id STRING COMMENT '客户

Re: 项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 后启动报解析配置异常

2023-03-25 文章 Leonard Xu
flink-sql-connector-xx 都是uber jar, 不应该在项目中直接uber jar,你在项目中应该引入 flink-connector-xx 依赖并自己管理。 Best, Leonard > On Mar 25, 2023, at 3:25 PM, casel.chen wrote: > > 项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar > 后启动过程中报如下异常,查了一下该jar下有oracle.xml.jaxp.JXDocumentBuilderFacto

项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 后启动报解析配置异常

2023-03-25 文章 casel.chen
项目中引入 flink-sql-connector-oracle-cdc-2.3.0.jar 后启动过程中报如下异常,查了一下该jar下有oracle.xml.jaxp.JXDocumentBuilderFactory类,有什么办法解决么? ERROR StatusLogger Caught javax.xml.parsers.ParserConfigurationException setting feature http://xml.org/sax/features/external-general-entities to false

Re: flink sql作业监控指标operator name和task name超长导致prometheus OOM问题

2023-03-24 文章 Weihua Hu
casel.chen wrote: > 使用prometheus监控flink > sql作业,发现没一会儿工夫就将prometheus内存(30GB)占满了,查了一下是因为作业指标名称过长导致的,像flink > sql作业这种operator name和task name默认是根据sql内容拼装的,一旦sql出现的列名很多就会导致指标名称过长。 > 请问这种情况Flink社区有什么建议?prometheus抓取的时候能够过滤掉吗?只保留operator_id和task_id。 > 要是自己想将现有拼装名称修改成哈希值的话应该改哪个类呢?谢谢!

flink sql作业监控指标operator name和task name超长导致prometheus OOM问题

2023-03-24 文章 casel.chen
使用prometheus监控flink sql作业,发现没一会儿工夫就将prometheus内存(30GB)占满了,查了一下是因为作业指标名称过长导致的,像flink sql作业这种operator name和task name默认是根据sql内容拼装的,一旦sql出现的列名很多就会导致指标名称过长。 请问这种情况Flink社区有什么建议?prometheus抓取的时候能够过滤掉吗?只保留operator_id和task_id。 要是自己想将现有拼装名称修改成哈希值的话应该改哪个类呢?谢谢!

Re: Flink-Sql Watermarkers问题

2023-03-15 文章 ying lin
Flink SQL 现在只能在create table 语句中指定watermark,另外一种迂回的做法,就是参考一下Flink SQL 把Tabe转成流,然后在流上做清洗后再指定watermark

回复: Flink-Sql Watermarkers问题

2023-03-13 文章 吴先生
好的感谢,我关注下 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年3月13日 18:49 | | 收件人 | | | 主题 | Re: Flink-Sql Watermarkers问题 | Hi 目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下 https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark

Re: Flink-Sql Watermarkers问题

2023-03-13 文章 Shammon FY
Hi 目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下 https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL Best, Shammon.FY On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote: > hi, > 我在使用Flink-Sql 1.14版本时能否不在cre

Flink-Sql Watermarkers问题

2023-03-13 文章 吴先生
hi, 我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线 | | 吴先生 | | 15951914...@163.com |

flink sql多条cdc数据流实时regular join如何减少作业状态?

2023-03-11 文章 casel.chen
当前flink实时作业接的kafka canal json格式的cdc数据,mysql表会有新增和更新数据,但不会有物理删除。 如果直接多条cdc数据流实时关联会导致作业状态很大,请教: 1. 有没有什么办法可以减少作业状态? 2. cdc格式的retract流可以加去重变成append流吗? 3. 使用append流多流关联是不是能减少作业状态?

Re: flink sql

2023-03-03 文章 小昌同学
sonLee Replied Message | From | 小昌同学 | | Date | 03/3/2023 15:50 | | To | user-zh | | Subject | flink sql | 各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能 | | 小昌同学 | | ccc0606fight...@163.com |

flink sql

2023-03-02 文章 小昌同学
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能 | | 小昌同学 | | ccc0606fight...@163.com |

Re: flink sql接cdc数据源关联维表写入下游数据库发现漏数据

2023-03-02 文章 Shengkai Fang
听上去像是数据乱序了。可以看看这个文档对应的解决下[1] [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/determinism/ Best, Shengkai casel.chen 于2023年3月1日周三 16:18写道: > flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。 > > 随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生

Re: flink sql jdbc connector是否支持多流拼接?

2023-03-02 文章 Shengkai Fang
hi. 手动使用 join 将多个流拼接起来? Best, Shengkai casel.chen 于2023年3月2日周四 21:01写道: > flink sql jdbc connector是否支持多流拼接? > 业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。 > 每条流更新大宽表的一部分字段。

flink sql jdbc connector是否支持多流拼接?

2023-03-02 文章 casel.chen
flink sql jdbc connector是否支持多流拼接? 业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。 每条流更新大宽表的一部分字段。

Re: Re: Flink SQL 如何优化以及处理反压

2023-03-01 文章 Guojun Li
于2023年1月31日周二 17:22写道: > > > >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 > >> > >> > >> 发件人: lxk > >> 发送时间: 2023年1月31日 15:16 > >> 收件人: user-zh@flink.apache.org > >> 主题: Flink SQL 如何优化以及处理反压 > >> > >> Flink

flink sql接cdc数据源关联维表写入下游数据库发现漏数据

2023-03-01 文章 casel.chen
flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。 随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。 请问: 1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生? 2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗? [1] https://nightlies.apache.org/flink/flink-docs

Re: 使用flink sql 将kafka的数据同步到mysql无法删除。

2023-02-25 文章 Jane Chan
"mobile number", "63f73b3f2e77497da91286fc": "telephone number" }, "after": null, "source": {...}, "op": "d", "ts_ms": 1677342340042, "transaction": null } flink sql Flink SQL> insert into `电话` s

Re: 在计算Window Top-N时,Flink SQL 时间语义不生效

2023-02-24 文章 Shuo Cheng
更乱了哦...可以尝试加个附件或推到 github, 贴个链接 On Fri, Feb 24, 2023 at 4:59 PM wei_yuze wrote: > >

在计算Window Top-N时,Flink SQL 时间语义不生效

2023-02-24 文章 wei_yuze

使用flink sql 将kafka的数据同步到mysql无法删除。

2023-02-23 文章 陈佳豪
-建表语法如下 String kafka = "CREATE TABLE `电话` " + "(`rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机` VARCHAR(255), " + " PRIMARY KEY (`rowID`) NOT ENFORCED ) " + " WITH " + "('connector' = 'jdbc', " + " 'driver' = 'com.mysql.cj.jdbc.Driver', " + " 'url' =

Re: Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 Shuo Cheng
>> Hi > >> >> > >> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作 > >> >> > >> >> Best, > >> >> Shammon > >> >> > >> >> > >> >> On Sun, Feb 19, 2023 at 1:43 PM

Re:Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 casel.chen
>> > Hi, >> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 >> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert >> into >> >> > >> >> > >> >> > Thanks >> >> > >

Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 Shuo Cheng
先执行一个 group by 主键,然后再执行insert > into > >> > > >> > > >> > Thanks > >> > > >> > > >> > > >> > 在 2023-02-17 15:56:51,"casel.chen" 写道: > >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular i

Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 Weihua Hu
nnector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 > >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert > into > >> > > >> > > >> > Thanks > >> > > >> > > >> > > >> > 在 2023-02-17 1

  1   2   3   4   5   6   7   8   9   10   >