Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-06 文章 JasonLee
hi 那你只需要设置从 latest-offset 开始消费,并且禁用 checkpoint 就行了,至于重启的次数,可以通过 metrics 中的 numRestarts 去获取. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-06 文章 Yun Tang
hi, 本质上来说,你的做法有点hack其实不推荐,如果非要这么做的话,你还可以通过 numRestarts [1] 的指标来看重启了多少次。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#availability 祝好 唐云 From: yidan zhao Sent: Friday, June 4, 2021 11:52 To: user-zh Subject: Re: 关于flink

flink sql cdc作数据同步作业数太多

2021-06-06 文章 casel.chen
flink sql cdc作数据同步,因为是基于库+表级别的,表数量太多导致作业数太多。请问能否用flink sql cdc基于库级别同步?这样作业数量会少很多。

flink sql调整算子并行度的方法有哪些?

2021-06-05 文章 casel.chen
flink sql调整算子并行度的方法有哪些?通过 sql hint 可以调整吗?

Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 文章 yidan zhao
那就挺难受的,我之前还想过一个办法是,禁止sql 任务的检查点功能就可以了。但是呢,flink禁止检查点后web UI就无法显示重启次数了,任务的重启的监控现在都只能通过开启检查点才能反映出来(ckpt失败数量、ckpt restored数量)。 JasonLee <17610775...@163.com> 于2021年6月4日周五 上午11:49写道: > > hi > > sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的. > > > > - > Best Wishes > JasonLee > -- > Sent from:

Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 文章 JasonLee
hi sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 文章 yidan zhao
如题,按照官方文档的kafka source部分,有如下配置说明。 scan.startup.mode : optionalgroup-offsetsStringStartup mode for Kafka consumer, valid values are 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start Reading Position for more details. 其中Reading

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 yujianbo
好的非常感谢,我拿几个任务测试一波,看看性能能不能接受! Hi, 没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。 总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level [1]

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 Yun Tang
-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size 祝好 唐云 From: yujianbo <15205029...@163.com> Sent: Wednesday, June 2, 2021 15:29 To: user-zh@flink.apache.org Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源

flink sql作业表定义部分字段问题

2021-06-02 文章 casel.chen
有一个flink sql mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢? 21/06/02 18:54:22 [Source: TableSourceScan(table

怎么避免flink sql cdc作业重启后重新从头开始消费binlog?

2021-06-02 文章 casel.chen
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' = 'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key error,有什么办法可以避免吗? CREATE TABLE `mysql_source` ( `id` STRING, `acct_id` STRING, `acct_name` STRING, `acct_type` STRING, `acct_bal` STRING, PRIMARY KEY (`id`) NOT ENFORCED

Re: Re: Flink SQL 1.11.3问题请教

2021-06-02 文章 yinghua...@163.com
yinghua...@163.com 发件人: WeiXubin 发送时间: 2021-06-02 17:44 收件人: user-zh 主题: Re: Flink SQL 1.11.3问题请教 不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 [{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL

Re: Flink SQL 1.11.3问题请教

2021-06-02 文章 WeiXubin
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 [{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL 编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到 sink。 Row row = new Row(arity); collect(row);

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 yujianbo
/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99 祝好 唐云 From: yujianbo <[hidden email]> Sent: Tuesday, June 1, 2021 10:51 To: [hidden email] <[hidden email]> Subject: Re: Flink Sql 的/checkpoint/

flink sql cli 模式下,flink-conf.yaml 配置checkpoint无法生效

2021-06-02 文章 guozhi mang
: {execution.savepoint.ignore-unclaimed-state=false, execution.attached=true, yarn.application.id=xxx, execution.shutdown-on-attached-exit=false, pipeline.jars=[file:/opt/xxx/flink-1.13.0/opt/flink-sql-client_2.11-1.13.0.jar], high-availability.cluster-id=application_1620482572059_3697, pipeline.classpaths

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 文章 HunterXHunter
那会一直增大下去吗,我跑了4天,ckp一直变大,没有稳定的迹象。是不是我需要调整compaction的配置 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 文章 Yun Tang
生产中关闭增量checkpoint,主要原因是对于大规模作业来说,全量checkpoint一方面会对底层DFS来说每次需要上传的数据量变大,另一方面,也会增长单次checkpoint的 e2e duration,有checkpoint超时失败的风险。 祝好 唐云 From: HunterXHunter <1356469...@qq.com> Sent: Tuesday, June 1, 2021 11:44 To: user-zh@flink.apache.org Subject: Re: Flink

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 HunterXHunter
我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink SQL 1.11.3问题请教

2021-05-31 文章 yinghua...@163.com
我们使用Nifi将数据采集到kafka,然后使用Flink处理kafka中的数据,现在Nifi如果将多条数据当做一条记录(写入kafka中的数据格式为csv或者json)写入kafka后(就是一个offset中包含了多条数据),Flink只会处理其中的一条数据?有没有什么办法让Flink都处理一个offset中的多条数据? yinghua...@163.com

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
感谢大佬的回复!以后优先现在邮箱这边讨论发现问题再去提个issue! 我的 idleStateRetention确实是设置3600秒,我先进行测试看看。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc 采集mysql binlog 可以保留before,after的字段吗

2021-05-31 文章 董建
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗? 按照官方的例子,定义表结构后,是最新的字段值? 能否同时保留before和after?

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 Yun Tang
/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99 祝好 唐云 From: yujianbo <15205029...@163.com> Sent: Tuesday, June 1, 2021 10:51 To: user-zh@flink.apache.org Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制? 没有

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
没有更好的方式吗,这样治标不治本,那我大状态任务会有很多问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
没有更好的方式吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 HunterXHunter
关闭 增量checkpoint -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
有没有大佬帮忙看看 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink sql的state ttl设置

2021-05-31 文章 LakeShen
或许你可以参考这个: [image: image.png] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/ Best, LakeShen chenchencc <1353637...@qq.com> 于2021年5月28日周五 下午4:30写道: > 想问下state ttl能针对单表设置吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 yujianbo
一、环境: 1、版本:1.12.0 2、flink sql 3、已经设置了setIdleStateRetention 为1小时 4、状态后端是rocksDB, 增量模式 5、源数据没有数据激增情况,任务已经跑了两天 二、详情 具体sql见第三大点,就是普通的group by统计的 sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。 我sql的groupby维度有加一个具体的分钟

Re: 回复:Flink sql的state ttl设置

2021-05-28 文章 chenchencc
想问下state ttl能针对单表设置吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:Re: flink sql cdc并行度问题

2021-05-28 文章 Zorro
如果你是自己实现MongoDB sink的话,你描述的方法看起来是可行的,不过这种实现方式相对比较复杂。 sql keyby可以考虑使用flink提供的 Deduplication 功能。这样的话MongoDB sink就可以开多个并行度,而不用考虑不同key的顺序问题了 -- Sent from:

如何根据flink sql解析出作业的血缘关系?

2021-05-27 文章 casel.chen
如何根据flink sql解析出作业的血缘关系?找到类似这样的血缘关系:source table A --> lookup table B --> sink table C

Re:Re: flink sql cdc并行度问题

2021-05-27 文章 casel.chen
我的作业是用flink sql消费mysql cdc binlog并实时同步到mongodb。如果只开一个并行度的话,mongodb的写入速度可能追不上mysql的写入。所以我需要在sink端开大并行度。 我不清楚用sql怎么写keyBy,是不是要group by pk呢?我原来的想法是在MongoDBSinkFunction中开一个线程池,每个线程对应下游sink的一个并行度,每个线程带一个queue,MongoDBSinkFunction根据数据PK往对应的queue发数据,每个消费者线程从自己的queue pull数据再进行批量插入。不知道这样可不可行

Flink sql 执行计划

2021-05-27 文章 流弊
想问下有啥方式能看到flink table的状态保存时间范围吗?

Re: flink sql cdc并行度问题

2021-05-26 文章 Zorro
mysql-cdc connector只能设置一个并行度,主要可能有这些原因: 1. mysql binlog本质上是一个文件,多个并行度消费需要避免重复 2. 多个并行度消费难以保证顺序 sink可以设为多个并行度,但是顺序不一定,如果需要主键相同的记录发到同一个sink线程可以先做一个keyby,并且保证keyby并行度与sink并行度相同,这样基本上能够保证数据forward传输,不过也不能100%保证有序。 如果需要保证有序还是建议sink并行度为1 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc并行度问题

2021-05-24 文章 casel.chen
flink sql作业:消费mysql binlog将数据同步到 mongodb 问题: 1. mysql-cdc connector只能设置成一个并行度吗? 2. 可以增大mongodb的sink并行度吗?可以的话,要如何设置?它保证主键相同的记录会发到同一个分区sink吗?

Re: flink sql支持Common Table Expression (CTE)吗?

2021-05-23 文章 Jingsong Li
支持。 如果只是在单个sql中复用expression,和temporary view基本一样,区别不大。 在某些优化路径上不同,一般没有实质影响。 Best, Jingsong On Fri, May 21, 2021 at 11:32 PM casel.chen wrote: > flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view > xxx 来实现?CTE和temporary view的区别是什么? > 例如 > > >

flink sql支持Common Table Expression (CTE)吗?

2021-05-21 文章 casel.chen
flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view xxx 来实现?CTE和temporary view的区别是什么? 例如 with toronto_ppl as ( SELECT DISTINCT name FROM population WHERE country = "Canada" AND city = "Toronto" ) , avg_female_salary as (

flink sql支持创建临时函数吗?

2021-05-21 文章 casel.chen
如下 CREATE TEMPORARY FUNCTION get_seniority(tenure INT64) AS ( CASE WHEN tenure < 1 THEN "analyst" WHEN tenure BETWEEN 1 and 3 THEN "associate" WHEN tenure BETWEEN 3 and 5 THEN "senior" WHEN tenure > 5 THEN "vp" ELSE "n/a" END ); SELECT name ,

flink sql运行在阿里云k8s用oss作为checkpoint存储介质出错

2021-05-21 文章 casel.chen
flink sql运行在阿里云k8s用oss作为checkpoint存储介质,在作业启动过程中出错,请问这个NoSuchKey是指什么?flink在获取checkpoint作restore吗? 2021-05-21 10:56:10,278 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_source]], fields=[id, acct_seq_id

Re: flink 1.13.0 ,使用flink sql 链接数据库是否支持多模式,即表名为schema.name

2021-05-19 文章 Shengkai Fang
请问是要用正则表达式匹配数据库中的table吗?‘org.users’ 是一个正则表达式吗? Best, Shengkai Asahi Lee <978466...@qq.com> 于2021年5月19日周三 下午2:01写道: > hi! >flink jdbc 是否有考虑支持表基于模式查询?如下 table-name写法: > CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status > BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH

flink 1.13.0 ??????flink sql ??????????????????????????????????schema.name

2021-05-19 文章 Asahi Lee
hi! flink jdbc ?? table-name?? CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' =

Re: flink sql源表定义字段列表问题

2021-05-17 文章 HunterXHunter
不需要提供全部字段 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql写mysql中文乱码问题

2021-05-17 文章 casel.chen
我的flink sql作业如下 SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM( mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT

flink sql源表定义字段列表问题

2021-05-17 文章 casel.chen
采用flink sql定义源表时,哪些connector支持提供部分字段,哪些connector必须要提供全量字段呢? 这边常用的connector主要有kafka, jdbc, clickhouse和mysql-cdc。 cdc是不是必须要提供对应表的全量字段呢?如果上游mysql业务表新增了字段,flink sql作业会不会出错? kafka表定义是否支持部分字段?

Flink SQL CodeGenException

2021-05-15 文章 sherlock c
Flink version: 1.12.0 在使用 Flink 执行 Flink SQL 流表 join 维表, 运行报错(流表SQL 和维表SQL单独运行都没有问题), 错误堆栈信息如下: Exception in thread "main" java.lang.RuntimeException: org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common type of GeneratedExpression(field$18,isNull$17,,S

Re:回复:flink sql写hbase问题

2021-05-13 文章 酷酷的浑蛋
不是,原因找到了,是函数多次嵌套导致,flink原始类型是not null,不能转换为string,这个报错信息真的是蛋疼,让人迷惑 在 2021-05-13 10:09:49,"allanqinjy" 写道: >光看异常,应该是你插入了空值吧,你插入hbase的时候做个filter过滤吧,比如你的rowkey空了,你往hbase插入应该是不行的。你可以试试。 > > >| | >allanqinjy >| >| >allanqi...@163.com >| >签名由网易邮箱大师定制 > > >在2021年05月12日 19:23,酷酷的浑蛋 写道:

flink sql怎样将change log stream转换成append log stream?

2021-05-13 文章 casel.chen
flink sql怎样将change log stream转换成append log stream? 通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + group by timestamp这种方式聚合。 问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!

回复:flink sql写hbase问题

2021-05-12 文章 allanqinjy
光看异常,应该是你插入了空值吧,你插入hbase的时候做个filter过滤吧,比如你的rowkey空了,你往hbase插入应该是不行的。你可以试试。 | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制 在2021年05月12日 19:23,酷酷的浑蛋 写道: Mismatch of function's argument data type 'STRING NOT NULL' and actual argument type 'STRING'.sql有些长,大概就是在执行 insert hbase sql时

flink sql如何修改执行计划?

2021-05-12 文章 casel.chen
flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。 我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!

flink sql写hbase问题

2021-05-12 文章 酷酷的浑蛋
Mismatch of function's argument data type 'STRING NOT NULL' and actual argument type 'STRING'.sql有些长,大概就是在执行 insert hbase sql时 报了上面的错误,请问这种错误是什么原因?

回复: Flink sql 自定义udaf函数 出现 No match found for function signature count_uadf()

2021-05-07 文章 JackJia
您好,能否把解决的思路介绍一下? 祝好 在2020年12月18日 10:38,丁浩浩<18579099...@163.com> 写道: 问题我自己已经解决。 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道: flink版本:1.11.1 udaf函数代码来自于阿里云官网文档 以下是代码 public class TestSql { public static void main(String[] args) throws Exception {

Flink SQL问题请教:Flink SQL中支持在一个TableEnvionment中多个DML语句提交时共用一个Yarn任务来运行吗?

2021-04-30 文章 yinghua...@163.com
yinghua...@163.com

flink sql 1.12 minibatch??????

2021-04-28 文章 op
flink sql 1.12 minibatch?? val config = tConfig.getConfiguration() config.setString("table.exec.mini-batch.enabled", "true") // mini-batch is enabled config.setString("table.exec.mini-batch.allow-latency", "true") config.set

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-27 文章 Shengkai Fang
hi, gen. 近期内应该就会发布,应该是五一左右就会发布1.13的版本。 Best, Shengkai gen 于2021年4月27日周二 下午8:57写道: > hi, Shengkai > 非常感谢你的解答, 解决了困扰我几天的问题。 > 按照你的建议 ,我使用 今天(2021-4-27) 主干版本,运行正常,发现确实是已经修复的。 > 我之前使用的版本是 1.12.2。 > > > 目前最新的release版本是1.12.2 ,应该还没有包含这个修复。不知道你是否了解 1.13的发布计划。 > > > > -- > Sent from:

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-27 文章 gen
hi, Shengkai 非常感谢你的解答, 解决了困扰我几天的问题。 按照你的建议 ,我使用 今天(2021-4-27) 主干版本,运行正常,发现确实是已经修复的。 我之前使用的版本是 1.12.2。 目前最新的release版本是1.12.2 ,应该还没有包含这个修复。不知道你是否了解 1.13的发布计划。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-26 文章 Shengkai Fang
Hi gen 我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。 Best, Shengkai Shengkai Fang 于2021年4月27日周二 上午11:46写道: > 请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1] > > [1] https://github.com/apache/flink/pull/15548 > > gen 于2021年4月27日周二 上午9:40写道: > >> Hi, all >> >> 请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。 >> >>

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-26 文章 Shengkai Fang
请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1] [1] https://github.com/apache/flink/pull/15548 gen 于2021年4月27日周二 上午9:40写道: > Hi, all > > 请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。 > > tEnv.executeSql( > """ > | SELECT t.* FROM ( > | SELECT EvtParser(request) as t FROM parsed_nginx_log >

Re: DataStreamAPI 与flink sql疑问

2021-04-26 文章 Shengkai Fang
Flink支持将DataStream 转换成一个 Table,然后通过API进行操作。如果想跟SQL相结合,可以将Table注册成一个 temporary view。 Best, Shengkai HunterXHunter <1356469...@qq.com> 于2021年4月27日周二 上午9:46写道: > 你试过吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-26 文章 gen
Hi, all 请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。 tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin) 自定义函数 EvtParser @DataTypeHint("ROW") def eval(line: String) = {...} 详细代码 class

flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-26 文章 gen
目前无法通过t.* 将嵌套的字段查询出来。 val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema 其中自定义函数 EvtParser 定义如下。 @DataTypeHint("ROW") def eval(line: String) = {

Re: DataStreamAPI 与flink sql疑问

2021-04-26 文章 HunterXHunter
你试过吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 使用cdc 同步数据到ES7,报错 Detail: 无法为包含1073741350字节的字符串缓冲区扩大525个更多字节

2021-04-26 文章 william
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0

DataStreamAPI 与flink sql疑问

2021-04-26 文章 张锴
flink版本使用的是1.12.2.。请问如果在Dstream 上用一些Operater,比如map ,flatmap,process等,可以在其重写的方法中使用tableEnv.sqlQuery("xxx") tableEnv.createTemporaryView(),这种sql吗,能这样结合吗?

Re: Flink SQL Metrics中Kafka Offset请教

2021-04-25 文章 占英华
非常感谢! > 在 2021年4月25日,19:19,JasonLee <17610775...@163.com> 写道: > > hi > > currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets: > 表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以 > committedOffsets 会比 currentOffsets 大 1 > > > > - > Best Wishes > JasonLee > -- > Sent

Re: Flink SQL Metrics中Kafka Offset请教

2021-04-25 文章 JasonLee
hi currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets: 表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以 committedOffsets 会比 currentOffsets 大 1 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 使用cdc 同步postgresql的数据到ES,报错: org.postgresql.util.PSQLException: 错误: 内存用尽

2021-04-25 文章 william
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[flink-sql-connector-postgres-cdc-1.2.0.jar:1.2.0

Flink SQL Metrics中Kafka Offset请教

2021-04-25 文章 邮件帮助中心
Flink SQL任务提交后,从JobManager监控指标中发现kafka的offset有2个指标信息,currentOffsets和committedOffsets,当Kafka无新增数据,程序运行一段时间后,发现指标仪表盘上显示 currentOffsets:2897 committedOffsets:2898 这2个值没有变化(应该是数据已经消费完毕了),现在的疑惑是:怎么这2个offset的值还不一致?committedOffsets表示已经提交和保存state中的offset吗?currentOffsets表示啥含义?烦请指教下,多谢!

Re: Re:回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 文章 Xi Shen
我这边有使用jdbc table属性加了本地缓存 尝试把cache size设置为400/2/4,然后重启,消费kafka速度都是需要慢慢上涨 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:?????? flink sql????kafka join??????????????????????

2021-04-22 文章 Michael Ran
:2021??4??22??(??) 10:50 >??:"user-zh" >:Re: flink sqlkafka join?? > > > >??SQLparse json??join >SQL??join70s=3.8k???

??????flink sql cdc????kafka????????????????????

2021-04-22 文章 ????
flink-cdcSourceRecord??SourceRecord??topic?? ??Debezium mysql-conectorkafka-connectortopic?? ?? ??+??+topic??

Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 文章 Xi Shen
Cache设置大小为2w,超时时间为2h 实际上整个表大小为3w左右,考虑到整个表实际只有十几兆。我会尝试cache size设置为4w,保证整个表都能装进cache里。看会不会好一点 但是我查到现在怀疑跟savepoint有关: - 如果我设置kafka offset=earliest,不带savepoint重启,flink job启动消费时,lag有5000w左右,但是1分钟内就能达到约7k/s的消费速度。如下图,job在14:31启动,前面的速度特别大是因为offset重置,但是在14:33已经达到7.5k的消费速度

Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 文章 Xi Shen
读JDBC table是有缓存的,看了源码,是用Guava cache实现 文档上说,整个Task Manager进程共享使用一个Cache,所以应该和广播的效果是一样的?所以应该不是查询TiDB导致的性能问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:flink sql cdc发到kafka消息表名信息缺失问题

2021-04-22 文章 casel.chen
debezium不就好了,也就是flink-cdc为什么集成debezium的原因,更新前后都是一个完整的record -- 原始邮件 -- 发件人: "user-zh" ; 发送时间: 2021年4月22日(星期四) 上午9:41 收件人: "user-zh@flink.apache.org"; 主题: flink sql cdc发到kafka消息表名信息缺失问题 最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal s

?????? flink sql????kafka join??????????????????????

2021-04-21 文章 ????
Tidb??Tidb??TiDBstructured-streaming?? ?? ---- ??:

??????flink sql cdc????kafka????????????????????

2021-04-21 文章 ????
??flink??debeziumcanal??kafka, canalafter ??debeziumflink-cdc??debezium??record

Re: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-21 文章 Xi Shen
为了测试到底是因为SQL里的parse json导致性能瓶颈,还是因为join维表 我在SQL中去掉join维表之后重启,发现只需要70s即可达到消费速度=3.8k,整个因为重启导致的积压被压缩到3分钟 所以应该是维表JOIN的问题 现在连的数据库是TiDB,连接串属性为 useUnicode=true=UTF-8=Asia/Shanghai=true -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql cdc发到kafka消息表名信息缺失问题

2021-04-21 文章 casel.chen
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢? CREATE TABLE

Re:回复: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-21 文章 casel.chen
nal-json就好了。 > > >--原始邮件-- >发件人: > "user-zh" >

flink sql消费kafka join普通表为何会性能爬坡?

2021-04-21 文章 Vincent Dong
大家好, flink sql消费kafka join普通表是会性能爬坡吗? 背景是flink 1.12.0 使用flink sql在yarn per-job发布,消费kafka topic=trades,然后join 数据库里的维表 shop_meta 现在发现每次重启flink sql job,或上游突然增加大量写入时,flink sql的消费速度总是慢慢增加上来,这样就会造成上游积压,等flink sql消费速度上来之后才能慢慢把积压消费完毕。 更多的信息: trades是avro格式,大概有10个字段,但其中有一个字段full_info是一个大json,我这边写了处理

?????? flink sql ?????? mysql cdc ?? canal json ????????kafka????

2021-04-21 文章 ????
??flink-cdc,??kafka,format canal-json ---- ??: "user-zh"

Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-21 文章 Qishang
Hi casel. flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。 https://github.com/ververica/flink-cdc-connectors/blob/master/README.md casel.chen 于2021年4月20日周二 下午6:18写道: > 目标是用flink作业实现类似canal server的功能 > > > CREATE TABLE `binlog_table` ( > > `id` INT, > >

回复:如何将flink sql 查询语句的count值取出

2021-04-20 文章 guoyb
可以看看这个demo https://github.com/bingoguo93/flink-1.12-sql-demo/blob/main/src/main/java/org/example/mysql/tableQueryMysql.java ---原始邮件--- 发件人: "张锴"

如何将flink sql 查询语句的count值取出

2021-04-20 文章 张锴
我使用的flink版本1.12.2。 有个问题请教一下,如何在flink sql 查询语句中将count值取出。 先举个例子: val total: Int = hiveContext.sql("select count(*) from a").collect()(0)(0).toString.toInt 可以把count值拿出来,那如果用flink sql去做的话 怎样取出查询后的结果呢。 1、是否flink sql可以这么做? 2、如果可以应该怎么写呢?

如何将flink sql 查询语句的count值取出

2021-04-20 文章 张锴
flink版本1.12.2。 有个问题请教一下,如何在flink sql 查询语句中将count值取出 例如:tableEnv.sqlQuery("select count(*) from a") 将这个count值取出并返回。

flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-20 文章 casel.chen
目标是用flink作业实现类似canal server的功能 CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter`

Re: flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2021-04-20 文章 Leonard Xu
Hi 如果只是sql作业,使用flink-sql-connector-elasticsearch6_2.11_1.10.0 就可以了,如果纯datastream作业使用flink-connector-elasticsearch6_2.11_1.10.0 就可以了 如果两个包都要使用,有两个思路 1. 你自己自己打个包,把上面两个包的依赖放在一起。 2. 和1类似,shade掉flink-connector-elasticsearch6_2.11_1.10.0 我没实际打过,你可以动手试下。 祝好 > 在 2021年4月20日,14:13,william <

Re: flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2021-04-20 文章 william
你好,我也遇到了同样的问题,请问你们是怎么解决的,谢谢! -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于Flink SQL中Interval Join使用时watermark的疑惑

2021-04-15 文章 HunterXHunter
问题一: Interval Join doesn't support consuming update and delete changes是因为输入A或者B是一个更新流 问题二:interval join使用buffer来缓存A和B的数据,没有放在state里,只有在watermark超过下边界会触发onEventtime清理 buffer。 延迟问题:没有类似statettl的配置,interval join不受statettl控制清除状态 乱序问题:如果 B的数据时间小于 watermark则匹配不到,一切是跟watermmark相关 以上个人看源码理解的。希望有帮助 --

Re: 关于Flink SQL中Interval Join使用时watermark的疑惑

2021-04-15 文章 HunterXHunter
1: Interval Join doesn't support consuming update and delete changes 是因为A或B是一个update stream 2: Interval Join 的临时数据是放在buffer中,当wtm超过边界时间就会清理掉 buffer也就join不到了。所以 statettl无法控制A流的缓存数据。 延迟问题:所以如果wtm不更新,A流的数据不会被清理因为不受statettl控制 乱序问题:如果B流的旧时间小于 watermark就join不上 以上是个人理解、、 -- Sent from:

Re: flink sql 写hdfs问题

2021-04-15 文章 JasonLee
hi 你需要添加下面两个参数: 'csv.line-delimiter'='', 'csv.disable-quote-character'='true' - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 写hdfs问题

2021-04-15 文章 酷酷的浑蛋
flink.version=1.12.0 Create table t1( a varchar )with ( connector=kafka format=json ) Create table t2( a varchar )with ( connector=filesystem format=csv ) SQL: Insert into t2 select a from t1 发送: {"a":[{"a1":1,"a2":2}]} Hdfs的结果为:"[{""a1"":1,""a2"":2}]”

flink sql 写hdfs问题

2021-04-15 文章 酷酷的浑蛋
flink.version=1.12.0 Create table t1( a varchar )with ( connector=kafka format=json ) Create table t2( a varchar )with ( connector=filesystem format=csv ) SQL: Insert into t2 select a from t1 发送: {"a":[{"a1":1,"a2":2}]} Hdfs的结果为:"[{""a1"":1,""a2"":2}]”

flink sql写filesystem问题

2021-04-15 文章 酷酷的浑蛋
Create table t1( a varchar )with ( connector=kafka format=json ) Create table t2( a varchar )with ( connector=filesystem format=csv ) SQL: Insert into t2 select a from t1 发送: {"a":[{"a1":1,"a2":2}]} Hdfs的结果为:"[{""a1"":1,""a2"":2}]” 问题:为什么一个双引号变成了2个双引号?

Re: 提交flink-sql 出现无法部署到yarn集群

2021-04-14 文章 Rui Li
可以按照log里提示的container ID去找对应的container日志来看看 On Wed, Apr 14, 2021 at 8:00 PM 张锴 wrote: > 在用flink-sql的方式连接hive时 ,出现以下错误: > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Could

Re: 提交flink-sql 出现无法部署到yarn集群

2021-04-14 文章 Jeff Zhang
看 yarn app log 张锴 于2021年4月14日周三 下午8:00写道: > 在用flink-sql的方式连接hive时 ,出现以下错误: > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Could not deploy Y

提交flink-sql 出现无法部署到yarn集群

2021-04-14 文章 张锴
在用flink-sql的方式连接hive时 ,出现以下错误: The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not deploy Yarn job cluster. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java

关于Flink SQL中Interval Join使用时watermark的疑惑

2021-04-14 文章 xuty
在Flink SQL定义了两张kafka表(A表和B表),类型为debezium-json,然后要进行区间JOIN,SQL类似这样: select * from A left join B on A.id = B.id and B.dt BETWEEN A.dt and A.dt + INTERVAL '30' SECOND 第一个问题是:想要在A和B表中显示定义watermark(dt字段即event_time)来解决可能出现的乱序问题,但是报错了,不太明白这个报错,是否是flink sql中目前还不支持Interval Join中定义watermark? Interval

Re: flink sql 客户端连接hive 3.1.0出现connection refused异常

2021-04-13 文章 Rui Li
你好, 可以看一下SQL client的日志,里面应该有更详细的堆栈信息。 On Wed, Apr 14, 2021 at 10:42 AM 张锴 wrote: > flink版本1.12.2 ,在交互式界面执行flink sql 连接hive操作时,出现连接拒绝异常,内容如下: > Flink SQL> use catalog myhive; > > Flink SQL> show tables; > dim_kcl_customer_source_1h_all > mytest >

flink sql 客户端连接hive 3.1.0出现connection refused异常

2021-04-13 文章 张锴
flink版本1.12.2 ,在交互式界面执行flink sql 连接hive操作时,出现连接拒绝异常,内容如下: Flink SQL> use catalog myhive; Flink SQL> show tables; dim_kcl_customer_source_1h_all mytest Flink SQL> select * from dim_kcl_customer_source_1h_all limit 10; 2021-04-14 10:22:24,451 WARN org.apache.hadoop.hive.conf

Re:回复:flink sql join 内存占用以及数据延迟问题咨询

2021-04-12 文章 董建
nectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B 感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下: flink connector cdc 直接对接订单表,物流表,商品表表的binlog 1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在? 2、假如是全量join , 这些数据是全部保存在内存中吗?如果业务表的数据很大或者每天的增量很大,flink使用这种方式,内存是否有瓶颈? 3、如果是具有窗口属性的join,假如流1join流2,如果流

<    1   2   3   4   5   6   7   8   9   10   >