Hi, 可以尝试下 1.17 或更新的版本, 这个问题在 flink 1.17.0 中已修复[1]。 批处理中做这个 remove 优化是符合语义的,而在流中不能直接裁剪, 对于相关时间函数的说明文档[2]中也进行了更新
[1] https://issues.apache.org/jira/browse/FLINK-30006 [2] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e5%86%85%e7%bd%ae%e5%87%bd%e6%95%b0%e7%9a%84%e7%a1%ae%e5%ae%9a%e6%80%a7 Best, Lincoln Lee ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 22:07写道: > 当前用的是 flink 1.16 版本,这个issue虽然合并到了 calcite-1.22.0 中,但是在之后一段时间内,又被新的 pr ( > https://github.com/apache/calcite/pull/1735/files)合并了。 > 所以,当前flink中是仍然存在这个问题。 > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > libenc...@apache.org>; > 发送时间: 2024年5月20日(星期一) 中午12:51 > 收件人: "user-zh"<user-zh@flink.apache.org>; > > 主题: Re: flinksql 经过优化后,group by字段少了 > > > > 你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11 > 版本开始就已经用的是这个 calcite 版本了。 > > 所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个 > issue 来报一个 bug。 > > PS: > 上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。 > > [1] https://issues.apache.org/jira/browse/CALCITE-3531 > > ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 11:06写道: > > > > 您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite > 中修复了,https://github.com/apache/calcite/pull/1602/files > > <https://github.com/apache/calcite/pull/1602/files>>; 但是,flink 中引用的 > calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。 > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > > 发件人: > "user-zh" > <libenc...@apache.org&gt;; > > 发送时间:&nbsp;2024年5月20日(星期一) 上午10:32 > > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;; > > > > 主题:&nbsp;Re: flinksql 经过优化后,group by字段少了 > > > > > > > > 看起来像是因为 "dt = cast(CURRENT_DATE&nbsp; as string)" 推导 dt > 这个字段是个常量,进而被优化掉了。 > > > > 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛? > > > > ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid&gt; 于2024年5月19日周日 01:01写道: > > &gt; > > &gt; create view tmp_view as > > &gt; SELECT > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; dt, -- 2 > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; uid, -- 0 > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; uname, -- 1 > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; uage -- 3 > > &gt; from > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; kafkaTable > > &gt; where dt = cast(CURRENT_DATE&nbsp; as string); > > &gt; > > &gt; insert into printSinkTable > > &gt; select > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; dt, uid, uname, > sum(uage) > > &gt; from tmp_view > > &gt; group by > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; dt, > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; uid, > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; uname; > > &gt; > > &gt; > > &gt; > > &gt; sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname > 三个字段进行聚合求和操作。 > > &gt; 但是,经过优化后,生成的 物理结构如下: > > &gt; == Optimized Execution Plan == > > &gt; > Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt, > uid, uname, EXPR$3]) > > &gt; +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, > uname, EXPR$3]) > > &gt; &amp;nbsp; &amp;nbsp;+- GroupAggregate(groupBy=[uid, > uname], select=[uid, uname, SUM(uage) AS EXPR$3]) > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; +- > Exchange(distribution=[hash[uid, uname]]) > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; > &amp;nbsp;+- Calc(select=[uid, uname, uage], where=[(dt = > CAST(CURRENT_DATE()))]) > > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; > &amp;nbsp; &amp;nbsp; +- TableSourceScan(table=[[default_catalog, > default_database, kafkaTable]], fields=[uid, uname, dt, uage]) > > &gt; > > &gt; > > &gt; > > &gt; 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢 > > > > > > > > -- > > > > Best, > > Benchao Li > > > > -- > > Best, > Benchao Li