你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11
版本开始就已经用的是这个 calcite 版本了。

所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个
issue 来报一个 bug。

PS: 
上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。

[1] https://issues.apache.org/jira/browse/CALCITE-3531

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 11:06写道:
>
> 您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite 
> 中修复了,https://github.com/apache/calcite/pull/1602/files
> 但是,flink 中引用的 calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                          
>                                               "user-zh"                       
>                                                              
> <libenc...@apache.org&gt;;
> 发送时间:&nbsp;2024年5月20日(星期一) 上午10:32
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: flinksql 经过优化后,group by字段少了
>
>
>
> 看起来像是因为 "dt = cast(CURRENT_DATE&nbsp; as string)" 推导 dt 这个字段是个常量,进而被优化掉了。
>
> 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid&gt; 于2024年5月19日周日 01:01写道:
> &gt;
> &gt; create view tmp_view as
> &gt; SELECT
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; dt, -- 2
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; uid, -- 0
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; uname, -- 1
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; uage -- 3
> &gt; from
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; kafkaTable
> &gt; where dt = cast(CURRENT_DATE&nbsp; as string);
> &gt;
> &gt; insert into printSinkTable
> &gt; select
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; dt, uid, uname, sum(uage)
> &gt; from tmp_view
> &gt; group by
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; dt,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; uid,
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; uname;
> &gt;
> &gt;
> &gt;
> &gt; sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。
> &gt; 但是,经过优化后,生成的 物理结构如下:
> &gt; == Optimized Execution Plan ==
> &gt; Sink(table=[default_catalog.default_database.printSinkTable], 
> fields=[dt, uid, uname, EXPR$3])
> &gt; +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3])
> &gt; &amp;nbsp; &amp;nbsp;+- GroupAggregate(groupBy=[uid, uname], 
> select=[uid, uname, SUM(uage) AS EXPR$3])
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; +- Exchange(distribution=[hash[uid, 
> uname]])
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;+- 
> Calc(select=[uid, uname, uage], where=[(dt = CAST(CURRENT_DATE()))])
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; +- 
> TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], 
> fields=[uid, uname, dt, uage])
> &gt;
> &gt;
> &gt;
> &gt; 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li

回复