Re:RE: lock up表过滤条件下推导致的bug
CompiledPlan plan = env.compilePlanSql("insert into out_console " + " select r.apply_id from t_purch_apply_sent_route r " + " left join t_purch_apply_sent_route_goods FOR SYSTEM_TIME AS OF r.pt as t " + "ON t.apply_id = r.apply_id and t.isdel = r.isdel" + " where r.apply_id = 61558439941351 and t.route_goods_id is not null and t.is_change = 2 " ); 在 2023-12-25 20:46:36,"Jiabao Sun" 写道: >Hi, > >邮件中的图片没显示出来,麻烦把 SQL 贴出来一下。 > >Best, >Jiabao > > >On 2023/12/25 12:22:41 杨光跃 wrote: >> 我的sql如下: >> 、 >> >> >> t_purch_apply_sent_route 是通过flink cdc创建的 >> t_purch_apply_sent_route_goods 是普通的jdbc >> 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据 >> 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推 >> 这应该算是bug吧,或者要满足我的预期,该怎么写sql? >> >> >> >>
lock up表过滤条件下推导致的bug
我的sql如下: 、 t_purch_apply_sent_route 是通过flink cdc创建的 t_purch_apply_sent_route_goods 是普通的jdbc 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推 这应该算是bug吧,或者要满足我的预期,该怎么写sql?
退订
退订 | | 杨光跃 | | yangguangyuem...@163.com |
flink 版本视图不触发水印导致流阻塞的问题
select a.card as card,a.cust as cust, b.city as city ,cast(a.ts as TIMESTAMP) ts,c.city from case3_TOPIC_A a left join cust_data FOR SYSTEM_TIME AS OF a.ts as b on a.cust = b.cust left join view_case3_TOPIC_B FOR SYSTEM_TIME AS OF a.ts as c on a.cust = c.cust; view_case3_TOPIC_B 是一个版本视图,现在的问题是如果view_case3_TOPIC_B 的数据不更新, case3_TOPIC_A 即使有新数据,也不往外输出。 怎么才能做到 case3_TOPIC_A 有数据就会立马触发呢 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制
普通表join版本表,怎么得到append表
大佬们,请教个问题, insert into sink_2 select a.`time`,c.cust,b.mobile from case2_TOPIC_A a left join card_data b on a.card = b.card left join view_new_card_info c on a.card = c.card; case2_TOPIC_A 是一个普通表,view_new_card_info 是维表, 我要插入的 sink_2 其实应该是一个apped表。 为什么提交的时候要求 please declare primary key for sink table when query contains update/delete record. 我这个只需要追加就可以了吧,该怎么处理呢? | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制
回复:flink sql 空闲数据源场景如何配置
大佬,再请教个问题, insert into sink_2 select a.`time`,c.cust,b.mobile from case2_TOPIC_A a left join card_data b on a.card = b.card left join view_new_card_info c on a.card = c.card; case2_TOPIC_A 是一个普通表,view_new_card_info 是维表, 我要插入的 sink_2 其实应该是一个apped表。 为什么提交的时候要求 please declare primary key for sink table when query contains update/delete record. 我这个只需要追加就可以了吧,该怎么处理呢? | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月30日 15:44,杨光跃 写道: 收到了,谢谢。 在sql-client里面执行 : set table.exec.source.idle-timeout = 10s; | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月30日 15:36,silence 写道: 可参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout -- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 10:54 收件人:user-zh@flink.apache.org 主 题:flink sql 空闲数据源场景如何配置 在代码中可以通过 .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制
回复:flink sql 空闲数据源场景如何配置
收到了,谢谢。 在sql-client里面执行 : set table.exec.source.idle-timeout = 10s; | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月30日 15:36,silence 写道: 可参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout -- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 10:54 收件人:user-zh@flink.apache.org 主 题:flink sql 空闲数据源场景如何配置 在代码中可以通过 .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制
flink sql 空闲数据源场景如何配置
在代码中可以通过 .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制
回复:flinksql写入hive问题
写入hive在读取,我试了下是可以的。。。 第一步: CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/admin/hive/conf' ); 第二部 USE CATALOG myhive; 第三步 select * from hive_table; 猜测可能的问题,我们本地部署设置的slot都是1,你可能是在跑着写入任务,没有资源跑读取任务? 你可以设置把写入任务停了,或者设置方言问 : SET table.sql-dialect=hive; 然后在查询试试。 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 18:00,Geoff nie 写道: 非常感谢!我是在sql-client上提交的,修改配置文件已经成功提交了。hive表下分区文件名如下: part-f3fa374b-c563-49c8-bd7a-b3bd7a5fb66d-0-2 还有两个问题请教下: 1.我通过如下创建了kafka流表,通过flink-sql查 kafka_table 是有数据的, 但是hdfs上却无文件,为什么呢 。 2.hive_table如上已经成功写入数据了,但是为啥flink-sql及hive却读取不到hive表数据呢,SELECT * FROM hive_table WHERE dt='2021-06-21' and hr='18'; SET table.sql-dialect=default; CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND ) WITH ( 'connector'='kafka', 'topic'='t_kafka_03', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='192.168.1.*:19092,192.168.1.*:19092,192.168.1.*:19092', 'properties.group.id' = 'testGroup10', 'format'='json' ); 烦请帮忙看下。感谢感谢。 在 2021-06-24 16:12:35,"杨光跃" 写道: 检查点,checkpoint ,如果是jar包发布,直接在代码里写就可以。 如果用的sql-client提交sql ,可以在配置文件: sql-client-defaults.yaml 中加入如下配置: configuration: execution.checkpointing.interval: 1000 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 16:09,Geoff nie 写道: 非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢! 在 2021-06-24 14:47:24,"杨光跃" 写道: 分区的提交需要开启checkpoint,要配置下 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 14:44,Geoff nie 写道: 您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。 在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道: 转发邮件信息 发件人:"潘永克" <13311533...@163.com> 发送日期:2021-02-11 11:12:39 收件人:d...@flink.apache.org 主题:flinksql写入hive问题 咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 ".partinprogress"类似的文件。flink1.12.0 基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0. 问题截图如下: 创建hive表 SET table.sql-dialect=hive; CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' ); 插入数据 INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table; 文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
回复:flinksql写入hive问题
检查点,checkpoint ,如果是jar包发布,直接在代码里写就可以。 如果用的sql-client提交sql ,可以在配置文件: sql-client-defaults.yaml 中加入如下配置: configuration: execution.checkpointing.interval: 1000 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 16:09,Geoff nie 写道: 非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢! 在 2021-06-24 14:47:24,"杨光跃" 写道: 分区的提交需要开启checkpoint,要配置下 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 14:44,Geoff nie 写道: 您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。 在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道: 转发邮件信息 发件人:"潘永克" <13311533...@163.com> 发送日期:2021-02-11 11:12:39 收件人:d...@flink.apache.org 主题:flinksql写入hive问题 咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 ".partinprogress"类似的文件。flink1.12.0 基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0. 问题截图如下: 创建hive表 SET table.sql-dialect=hive; CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' ); 插入数据 INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table; 文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
回复:flinksql写入hive问题
分区的提交需要开启checkpoint,要配置下 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 14:44,Geoff nie 写道: 您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。 在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道: 转发邮件信息 发件人:"潘永克" <13311533...@163.com> 发送日期:2021-02-11 11:12:39 收件人:d...@flink.apache.org 主题:flinksql写入hive问题 咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 ".partinprogress"类似的文件。flink1.12.0 基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0. 问题截图如下: 创建hive表 SET table.sql-dialect=hive; CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' ); 插入数据 INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table; 文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
回复:flink 1.12如何实现window topN功能
应该是这样吧 1. 第一步以主键group by 以及分时间窗口 SELECT 主键, TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, FROM source_event group by TUMBLE(ts, INTERVAL '10' SECOND), 主键 2. 根据上一步的结果取top5 select * from (select * ,ROW_NUMBER() OVER (PARTITION BY wStart ORDER BY 处理时间字段 ) as rownum from 上一步的虚拟表) where rownum <= 5 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月23日 10:58,casel.chen 写道: 你指的是TopN吗?https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n 但我想知道window topN写法,跟这个还不一样。 我的需求是: cdc场景同一个主键数据变更频繁,我想定义一个5秒处理时间窗口,在该窗口内取同一主键最新变更记录。用flink sql 1.12如何实现? 在 2021-06-23 10:18:01,"杨光跃" 写道: Apache Flink 1.12 Documentation: Queries | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月23日 10:09,casel.chen 写道: 请不要截图哦 在 2021-06-23 09:47:46,"杨光跃" 写道: 1.12也支持的 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月23日 09:45,casel.chen 写道: 官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?
回复:flink 1.12如何实现window topN功能
Apache Flink 1.12 Documentation: Queries | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月23日 10:09,casel.chen 写道: 请不要截图哦 在 2021-06-23 09:47:46,"杨光跃" 写道: 1.12也支持的 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月23日 09:45,casel.chen 写道: 官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?
回复:flink 1.12如何实现window topN功能
1.12也支持的 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月23日 09:45,casel.chen 写道: 官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?