Re: flinksql 经过优化后,group by字段少了

2024-05-20 Thread Lincoln Lee
Hi,

可以尝试下 1.17 或更新的版本, 这个问题在 flink 1.17.0 中已修复[1]。
批处理中做这个 remove 优化是符合语义的,而在流中不能直接裁剪,
对于相关时间函数的说明文档[2]中也进行了更新

[1] https://issues.apache.org/jira/browse/FLINK-30006
[2]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e5%86%85%e7%bd%ae%e5%87%bd%e6%95%b0%e7%9a%84%e7%a1%ae%e5%ae%9a%e6%80%a7


Best,
Lincoln Lee


℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 22:07写道:

> 当前用的是 flink 1.16 版本,这个issue虽然合并到了 calcite-1.22.0 中,但是在之后一段时间内,又被新的 pr (
> https://github.com/apache/calcite/pull/1735/files)合并了。
> 所以,当前flink中是仍然存在这个问题。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2024年5月20日(星期一) 中午12:51
> 收件人:"user-zh"
> 主题:Re: flinksql 经过优化后,group by字段少了
>
>
>
> 你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11
> 版本开始就已经用的是这个 calcite 版本了。
>
> 所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个
> issue 来报一个 bug。
>
> PS:
> 上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。
>
> [1] https://issues.apache.org/jira/browse/CALCITE-3531
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid 于2024年5月20日周一 11:06写道:
> 
>  您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite
> 中修复了,https://github.com/apache/calcite/pull/1602/files
>  <https://github.com/apache/calcite/pull/1602/files>; 但是,flink 中引用的
> calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
> 
> 发件人:
> "user-zh"
>   发送时间:nbsp;2024年5月20日(星期一) 上午10:32
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flinksql 经过优化后,group by字段少了
> 
> 
> 
>  看起来像是因为 "dt = cast(CURRENT_DATEnbsp; as string)" 推导 dt
> 这个字段是个常量,进而被优化掉了。
> 
>  将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
> 
>  ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalidgt; 于2024年5月19日周日 01:01写道:
>  gt;
>  gt; create view tmp_view as
>  gt; SELECT
>  gt;nbsp;nbsp;nbsp;nbsp; dt, -- 2
>  gt;nbsp;nbsp;nbsp;nbsp; uid, -- 0
>  gt;nbsp;nbsp;nbsp;nbsp; uname, -- 1
>  gt;nbsp;nbsp;nbsp;nbsp; uage -- 3
>  gt; from
>  gt;nbsp;nbsp;nbsp;nbsp; kafkaTable
>  gt; where dt = cast(CURRENT_DATEnbsp; as string);
>  gt;
>  gt; insert into printSinkTable
>  gt; select
>  gt;nbsp;nbsp;nbsp;nbsp; dt, uid, uname,
> sum(uage)
>  gt; from tmp_view
>  gt; group by
>  gt;nbsp;nbsp;nbsp;nbsp; dt,
>  gt;nbsp;nbsp;nbsp;nbsp; uid,
>  gt;nbsp;nbsp;nbsp;nbsp; uname;
>  gt;
>  gt;
>  gt;
>  gt; sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname
> 三个字段进行聚合求和操作。
>  gt; 但是,经过优化后,生成的 物理结构如下:
>  gt; == Optimized Execution Plan ==
>  gt;
> Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt,
> uid, uname, EXPR$3])
>  gt; +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid,
> uname, EXPR$3])
>  gt; amp;nbsp; amp;nbsp;+- GroupAggregate(groupBy=[uid,
> uname], select=[uid, uname, SUM(uage) AS EXPR$3])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; +-
> Exchange(distribution=[hash[uid, uname]])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp;
> amp;nbsp;+- Calc(select=[uid, uname, uage], where=[(dt =
> CAST(CURRENT_DATE()))])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp;
> amp;nbsp; amp;nbsp; +- TableSourceScan(table=[[default_catalog,
> default_database, kafkaTable]], fields=[uid, uname, dt, uage])
>  gt;
>  gt;
>  gt;
>  gt; 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
> 
> 
> 
>  --
> 
>  Best,
>  Benchao Li
>
>
>
> --
>
> Best,
> Benchao Li


Re: flink version stable

2024-03-30 Thread Lincoln Lee
Hi Thierry,

The flink connectors have been separated from the main flink repository[1],
using separate repositories and release process[2].
For example, https://github.com/apache/flink-connector-kafka for the Kafka
connector, and its latest release is v3.1.0[3].
You can follow new releases of specific connectors on the mailing list.

[1] https://lists.apache.org/thread/7qr8jc053y8xpygcwbhlqq4r7c7fj1p3
[2]
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
[3] https://lists.apache.org/thread/sz7f4o1orh96zgjjztcp5gh85l3ks26x


Best,
Lincoln Lee


Fokou Toukam, Thierry  于2024年3月29日周五
21:41写道:

> I’m asking because I am seeing that the latest version don’t have all
> libraries such as Kafka connector
>
> *Thierry FOKOU *| * IT M.A.Sc <http://M.A.Sc> student*
>
> Département de génie logiciel et TI
>
> École de technologie supérieure  |  Université du Québec
>
> 1100, rue Notre-Dame Ouest
>
> Montréal (Québec)  H3C 1K3
>
> [image: image001] <http://etsmtl.ca/>
> --
> *De :* Junrui Lee 
> *Envoyé :* Friday, March 29, 2024 5:11:03 AM
> *À :* user 
> *Objet :* Re: flink version stable
>
> Hi,
>
> The latest stable version of FLINK is 1.19.0
>
>
>
>
> Fokou Toukam, Thierry 
> 于2024年3月29日周五 16:25写道:
>
> Hi, just want to know which version of flink is stable?
>
> *Thierry FOKOU *| * IT M.A.Sc <http://m.a.sc/> Student*
>
> Département de génie logiciel et TI
>
> École de technologie supérieure  |  Université du Québec
>
> 1100, rue Notre-Dame Ouest
>
> Montréal (Québec)  H3C 1K3
>
> Tél +1 (438) 336-9007
>
> [image: image001] <http://etsmtl.ca/>
>
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Lincoln Lee
Congrats, thanks for the great work!


Best,
Lincoln Lee


Peter Huang  于2024年3月20日周三 22:48写道:

> Congratulations
>
>
> Best Regards
> Peter Huang
>
> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang  wrote:
>
>>
>> Congratulations
>>
>>
>>
>> Best,
>> Huajie Wang
>>
>>
>>
>> Leonard Xu  于2024年3月20日周三 21:36写道:
>>
>>> Hi devs and users,
>>>
>>> We are thrilled to announce that the donation of Flink CDC as a
>>> sub-project of Apache Flink has completed. We invite you to explore the new
>>> resources available:
>>>
>>> - GitHub Repository: https://github.com/apache/flink-cdc
>>> - Flink CDC Documentation:
>>> https://nightlies.apache.org/flink/flink-cdc-docs-stable
>>>
>>> After Flink community accepted this donation[1], we have completed
>>> software copyright signing, code repo migration, code cleanup, website
>>> migration, CI migration and github issues migration etc.
>>> Here I am particularly grateful to Hang Ruan, Zhongqaing Gong, Qingsheng
>>> Ren, Jiabao Sun, LvYanquan, loserwang1024 and other contributors for their
>>> contributions and help during this process!
>>>
>>>
>>> For all previous contributors: The contribution process has slightly
>>> changed to align with the main Flink project. To report bugs or suggest new
>>> features, please open tickets
>>> Apache Jira (https://issues.apache.org/jira).  Note that we will no
>>> longer accept GitHub issues for these purposes.
>>>
>>>
>>> Welcome to explore the new repository and documentation. Your feedback
>>> and contributions are invaluable as we continue to improve Flink CDC.
>>>
>>> Thanks everyone for your support and happy exploring Flink CDC!
>>>
>>> Best,
>>> Leonard
>>> [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>>>
>>>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Lincoln Lee
Congrats, thanks for the great work!


Best,
Lincoln Lee


Peter Huang  于2024年3月20日周三 22:48写道:

> Congratulations
>
>
> Best Regards
> Peter Huang
>
> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang  wrote:
>
>>
>> Congratulations
>>
>>
>>
>> Best,
>> Huajie Wang
>>
>>
>>
>> Leonard Xu  于2024年3月20日周三 21:36写道:
>>
>>> Hi devs and users,
>>>
>>> We are thrilled to announce that the donation of Flink CDC as a
>>> sub-project of Apache Flink has completed. We invite you to explore the new
>>> resources available:
>>>
>>> - GitHub Repository: https://github.com/apache/flink-cdc
>>> - Flink CDC Documentation:
>>> https://nightlies.apache.org/flink/flink-cdc-docs-stable
>>>
>>> After Flink community accepted this donation[1], we have completed
>>> software copyright signing, code repo migration, code cleanup, website
>>> migration, CI migration and github issues migration etc.
>>> Here I am particularly grateful to Hang Ruan, Zhongqaing Gong, Qingsheng
>>> Ren, Jiabao Sun, LvYanquan, loserwang1024 and other contributors for their
>>> contributions and help during this process!
>>>
>>>
>>> For all previous contributors: The contribution process has slightly
>>> changed to align with the main Flink project. To report bugs or suggest new
>>> features, please open tickets
>>> Apache Jira (https://issues.apache.org/jira).  Note that we will no
>>> longer accept GitHub issues for these purposes.
>>>
>>>
>>> Welcome to explore the new repository and documentation. Your feedback
>>> and contributions are invaluable as we continue to improve Flink CDC.
>>>
>>> Thanks everyone for your support and happy exploring Flink CDC!
>>>
>>> Best,
>>> Leonard
>>> [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>>>
>>>


[ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Lincoln Lee
The Apache Flink community is very happy to announce the release of Apache
Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282

We would like to thank all contributors of the Apache Flink community who
made this release possible!


Best,
Yun, Jing, Martijn and Lincoln


[ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Lincoln Lee
The Apache Flink community is very happy to announce the release of Apache
Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282

We would like to thank all contributors of the Apache Flink community who
made this release possible!


Best,
Yun, Jing, Martijn and Lincoln


Re: [DISCUSS][FLINK-32993] Datagen connector handles length-constrained fields according to the schema definition by default

2023-11-21 Thread Lincoln Lee
Thanks Yubin and Jane for the discussion!

+1 to fix this bug, although it's usually used as a test source, it's
important to provide the correct behavior for users.

for the invalid field length configured by users, I think it's better to
raise an error instead of using default value silently.

take Jane's example above:
1. For fixed-length data types, we should not accept another with option to
overwrite the length semantic in the schema
2. For variable-length data types, both two DDLs looks ok since STRING is
equal to VARCHAR(2147483647) and the user defined length is not beyond
definition,
but the following one is invalid:
CREATE TABLE t1 (
   f0 VARCHAR(128)
) WITH ('connector' = 'datagen', 'fields.f0.length' = '256');

Another thing we may also take into considering(not a bug, but relevant),
is to support variable length semantics for varchar, since the length 128
in varchar(128) is just max length, we can extending datagen to generate
variable length values(maybe a new option to enable it, e.g.,
'fields.f0.var-len'='true'). Of course, this is a new feature that is not
part of this problem.

Best,
Lincoln Lee


Jane Chan  于2023年11月21日周二 21:07写道:

> Hi Yubin,
>
> Thanks for driving this discussion. Perhaps a specific example can better
> illustrate the current issue.
>
> Considering the following DDL, f0 will always be generated with a default
> char length of 100, regardless of char(5), bcause the connector option
> 'fields.f0.length' is not specified [1].
>
>> CREATE TABLE foo (
>>f0 CHAR(5)
>> ) WITH ('connector' = 'datagen');
>>
>
> Since it's often the case for a fixed-length type to specify length
> explictly in the DDL, the current design can be confusing for users to some
> extent.
>
> However, for the proposed changes, it would be preferable to provide
> specific details on how to handle the "not be user-defined" scenario. For
> example, should it be ignored or should an exception be thrown?
>
> To be more specific,
> 1. For fixed-length data types, what happens for the following two DDLs
>
>> CREATE TABLE foo (
>>f0 CHAR(5)
>> ) WITH ('connector' = 'datagen', 'fields.f0.length' = '10');
>>
>> CREATE TABLE bar (
>>f0 CHAR(5)
>> ) WITH ('connector' = 'datagen', 'fields.f0.length' = '1');
>>
>
> 2. For variable-length data types, what happens for the following two DDLs
>
>> CREATE TABLE meow (
>>f0 VARCHAR(20)
>> ) WITH ('connector' = 'datagen', 'fields.f0.length' = '10');
>>
>> CREATE TABLE purr (
>>f0 STRING
>> ) WITH ('connector' = 'datagen', 'fields.f0.length' = '10');
>>
>
> Best,
> Jane
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/datagen/#fields-length
>
>
> On Mon, Nov 20, 2023 at 8:46 PM 李宇彬  wrote:
>
>> Hi everyone,
>>
>>
>> Currently, the Datagen connector generates data that doesn't match the
>> schema definition
>> when dealing with fixed-length and variable-length fields. It defaults to
>> a unified length of 100
>> and requires manual configuration by the user. This violates the
>> correctness of schema constraints
>> and hampers ease of use.
>>
>>
>> Jane Chan and I have discussed offline and I will summarize our
>> discussion below.
>>
>>
>> To enhance the datagen connector to automatically generate data that
>> conforms to the schema
>> definition without additional manual configuration, we propose handling
>> the following data types
>> appropriately [1]:
>>   1. For fixed-length data types (char, binary), the length should be
>> defined by the schema definition
>>  and not be user-defined.
>>   2. For variable-length data types (varchar, varbinary), the length
>> should be defined by the schema
>>   definition, but allow for user-defined lengths that are smaller
>> than the schema definition.
>>
>>
>>
>> Looking forward to your feedback :)
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-32993
>>
>>
>> Best,
>> Yubin
>>
>>


[SUMMARY] Flink 1.19 Release Sync 11/14/2023

2023-11-15 Thread Lincoln Lee
Hi devs and users,

Yesterday was the first release sync of Flink 1.19, I’d like to share the
summary:

- Sync meeting
We switched back to google meet because there's some account limitation for
zoom on some region and the google meet is available when creator is not
online.
The meeting will happen every 2 weeks and switch to weekly after the
feature freeze.

- Feature freezing date
Jan 26, 2024

- Features & issues tracking
The community has collected many features on the 1.19 wiki page[1] and it
is encouraged to continuously updating the page for contributors, also
there exists large amounts of jira issues[2].
Please be aware that, for all `@Public` APIs that are intended to be
changed / removed in release 2.0, the deprecation work should be completed
in 1.19.
Another important thing is that since a lot of the work in 1.19 is also
related to the 2.0 release, tagging related issues with '2.0-related' tag
will make it easier for the 2.0 release managers to track progress.

- Daily work divisions
In general, every release manager will be working on all daily issues. For
some major tasks, in order to make sure there will at least always be
someone to take care of them, they have been assigned to specific release
managers[1]. If you need support in each of these areas, please don't
hesitate to contact us.

- Blockers
  - FLINK-31449 Remove DeclarativeSlotManager related logic @Xintong will
track it
  - FLINK-33531 Nightly Python fails @Dian Fu will look at this
  - FLINK-18356 flink-table-planner Exit code 137 on ci pipeline @Matthias
pr reviewing

- Retrospective of 1.18 release
Thanks for the efforts from previous release managers and also several
valuable thoughts and suggestions:
  - The release process now has a jira template, which will make the work
easier for the new release managers, and the overall steps will still
documented on the wiki page and continuously updated in the next releases.
We'll also be looking at automation to continue to streamline releases.
  - 1.18 experienced relatively long release testing, We found that finding
volunteers to join the testing after rc is ready can be a long wait. So in
1.19 we will try to find volunteers earlier(we added a new column:
volunteers for testing on the wiki page[1]), and before release testing,
let the feature developers describe the detailed testing steps, so that
subsequent testing can go faster.
  - The documentation build and flink-docker CI have been migrated to
GHA(Github actions), there's still a lot of work to be done to migrate the
CI pipeline from azure to GHA[3], and welcome to join in for our goal of
improving the experience of our contributors!

The next release sync will be on November 28th, 2023.

Google Meet: https://meet.google.com/vcx-arzs-trv

[1] https://cwiki.apache.org/confluence/display/FLINK/1.19+Release
[2] https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=592
[3] https://issues.apache.org/jira/browse/FLINK-27075

Best regards,
Yun, Jing, Martijn and Lincoln


[SUMMARY] Flink 1.19 Release Sync 11/14/2023

2023-11-15 Thread Lincoln Lee
Hi devs and users,

Yesterday was the first release sync of Flink 1.19, I’d like to share the
summary:

- Sync meeting
We switched back to google meet because there's some account limitation for
zoom on some region and the google meet is available when creator is not
online.
The meeting will happen every 2 weeks and switch to weekly after the
feature freeze.

- Feature freezing date
Jan 26, 2024

- Features & issues tracking
The community has collected many features on the 1.19 wiki page[1] and it
is encouraged to continuously updating the page for contributors, also
there exists large amounts of jira issues[2].
Please be aware that, for all `@Public` APIs that are intended to be
changed / removed in release 2.0, the deprecation work should be completed
in 1.19.
Another important thing is that since a lot of the work in 1.19 is also
related to the 2.0 release, tagging related issues with '2.0-related' tag
will make it easier for the 2.0 release managers to track progress.

- Daily work divisions
In general, every release manager will be working on all daily issues. For
some major tasks, in order to make sure there will at least always be
someone to take care of them, they have been assigned to specific release
managers[1]. If you need support in each of these areas, please don't
hesitate to contact us.

- Blockers
  - FLINK-31449 Remove DeclarativeSlotManager related logic @Xintong will
track it
  - FLINK-33531 Nightly Python fails @Dian Fu will look at this
  - FLINK-18356 flink-table-planner Exit code 137 on ci pipeline @Matthias
pr reviewing

- Retrospective of 1.18 release
Thanks for the efforts from previous release managers and also several
valuable thoughts and suggestions:
  - The release process now has a jira template, which will make the work
easier for the new release managers, and the overall steps will still
documented on the wiki page and continuously updated in the next releases.
We'll also be looking at automation to continue to streamline releases.
  - 1.18 experienced relatively long release testing, We found that finding
volunteers to join the testing after rc is ready can be a long wait. So in
1.19 we will try to find volunteers earlier(we added a new column:
volunteers for testing on the wiki page[1]), and before release testing,
let the feature developers describe the detailed testing steps, so that
subsequent testing can go faster.
  - The documentation build and flink-docker CI have been migrated to
GHA(Github actions), there's still a lot of work to be done to migrate the
CI pipeline from azure to GHA[3], and welcome to join in for our goal of
improving the experience of our contributors!

The next release sync will be on November 28th, 2023.

Google Meet: https://meet.google.com/vcx-arzs-trv

[1] https://cwiki.apache.org/confluence/display/FLINK/1.19+Release
[2] https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=592
[3] https://issues.apache.org/jira/browse/FLINK-27075

Best regards,
Yun, Jing, Martijn and Lincoln


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Lincoln Lee
Thanks for the great work! Congrats all!

Best,
Lincoln Lee


Jing Ge  于2023年10月27日周五 00:16写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.18.0, which is the first release for the Apache Flink 1.18 series.
>
> Apache Flink® is an open-source unified stream and batch data processing
> framework for distributed, high-performing, always-available, and accurate
> data applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this release:
>
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Best regards,
> Konstantin, Qingsheng, Sergey, and Jing
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Lincoln Lee
Thanks for the great work! Congrats all!

Best,
Lincoln Lee


Jing Ge  于2023年10月27日周五 00:16写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.18.0, which is the first release for the Apache Flink 1.18 series.
>
> Apache Flink® is an open-source unified stream and batch data processing
> framework for distributed, high-performing, always-available, and accurate
> data applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this release:
>
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Best regards,
> Konstantin, Qingsheng, Sergey, and Jing
>


Re: [DISCUSS][FLINK-31788][FLINK-33015] Add back Support emitUpdateWithRetract for TableAggregateFunction

2023-09-07 Thread Lincoln Lee
Thanks to Jane for following up on this issue!  +1 for adding it back first.

For the deprecation, considering that users aren't usually motivated to
upgrade to a major version (1.14, from two years ago, wasn't that old,
which may be
part of the reason for not receiving more feedback), I'd recommend holding
off on removing `TableAggregateFunction` until we have a replacement for
it,
e.g., user-defined-operator as Jark mentioned or something else.

Best,
Lincoln Lee


Best,
Lincoln Lee


Jark Wu  于2023年9月7日周四 21:30写道:

> +1 to fix it first.
>
> I also agree to deprecate it if there are few people using it,
> but this should be another discussion thread within dev+user ML.
>
> In the future, we are planning to introduce user-defined-operator
> based on the TVF functionality which I think can fully subsume
> the UDTAG, cc @Timo Walther .
>
> Best,
> Jark
>
> On Thu, 7 Sept 2023 at 11:44, Jane Chan  wrote:
>
> > Hi devs,
> >
> > Recently, we noticed an issue regarding a feature regression related to
> > Table API. `org.apache.flink.table.functions.TableAggregateFunction`
> > provides an API `emitUpdateWithRetract` [1] to cope with updated values,
> > but it's not being called in the code generator. As a result, even if
> users
> > override this method, it does not work as intended.
> >
> > This issue has been present since version 1.15 (when the old planner was
> > deprecated), but surprisingly, only two users have raised concerns about
> it
> > [2][3].
> >
> > So, I would like to initiate a discussion to bring it back. Of course, if
> > few users use it, we can also consider deprecating it.
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#retraction-example
> > [2] https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp
> > [3] https://www.mail-archive.com/user-zh@flink.apache.org/msg15230.html
> >
> > Best,
> > Jane
> >
>


Re: AsyncDataStream: Retries keep executing after timeout

2022-12-21 Thread Lincoln Lee
Hi Yoni,

Sorry for the late response! I checked the issue and it is indeed a bug, I
have created a ticket(https://issues.apache.org/jira/browse/FLINK-30477)
and open a pr  to fix it, the reproduced case was added
in AsyncWaitOperatorTest#testProcessingTimeWithTimeoutFunctionUnorderedWithRetry
& testProcessingTimeWithTimeoutFunctionOrderedWithRetry. You can wait for
the fix after merge or try the patch if urgently needed.

Thanks again for reporting this!

Best,
Lincoln Lee


Yoni Gibbs  于2022年12月15日周四 18:10写道:

> Hi Lincoln,
>
> Thanks very much for the reply! The issue seems to occur both in local
> development in the IDE, and when running in a Flink cluster. Below is the
> full Java code to replicate the issue. I generated an empty project
> following the instructions at
> https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/datastream/,
> namely I ran this:
>
> $ mvn archetype:generate \-DarchetypeGroupId=org.apache.flink \
> -DarchetypeArtifactId=flink-walkthrough-datastream-java \
> -DarchetypeVersion=1.16.0 \-DgroupId=timeoutretry \
> -DartifactId=timeoutretry \-Dversion=0.1 \-Dpackage=timeoutretry \
> -DinteractiveMode=false
>
>
> I then deleted the two generated files and created one called
> TimeoutRetry.java, with the code below. Here I've created a dummy source
> that simply emits one value, for simplicity. (Note that I first came
> across the issue when working with a Kinesis source, but in order to rule
> Kinesis out of the equation I created the dummy source instead.) Then I
> added an async function which I've hard-coded to wait 500ms then fail.
>
> package timeoutretry;
>
> import org.apache.flink.streaming.api.datastream.AsyncDataStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
> import org.apache.flink.streaming.util.retryable.RetryPredicates;
>
> import java.time.LocalDateTime;
> import java.time.format.DateTimeFormatter;
> import java.util.Collections;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
>
> public class TimeoutRetry {
> private static void log(String message) {
> 
> System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_TIME) + " 
> :: " + message);
> }
>
> public static class SingleValueSource extends RichSourceFunction {
> private volatile boolean cancelled = false;
> private volatile boolean alreadySentValue = false;
>
> @Override
> public void run(SourceContext ctx) {
> while (!cancelled) {
> synchronized (ctx.getCheckpointLock()) {
> if (!alreadySentValue) {
> ctx.collect("foo");
> alreadySentValue = true;
> }
> }
> }
> }
>
> @Override
> public void cancel() {
> cancelled = true;
> }
> }
>
> public static class ExampleRichAsyncFunction extends 
> RichAsyncFunction {
>
> @Override
> public void asyncInvoke(String input, ResultFuture 
> resultFuture) {
> log("Received " + input);
>
> // resultFuture.completeExceptionally(new Exception("Dummy 
> error"));
> // The line above gets expected output:
> //   09:39:50.668851 :: Received foo
> //   09:39:52.671624 :: Received foo
> //   09:39:54.671417 :: Timed out handling fooo
>
> CompletableFuture.runAsync(() -> {
> try {
> Thread.sleep(500);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> resultFuture.completeExceptionally(new Exception("Dummy 
> error"));
> });
> // The block above gets unexpected output:
> //09:57:01.574928 :: Received foo
> //09:57:04.084659 :: Received foo
> //09:57:05.581016 :: Timed out handling foo
> //09:57:06.590309 :: Received foo
> //09:57:09.099132 :: Received foo
> //09:57:11.605754 :: Received foo
> //09:57:14.114028 :: Received 

Re: AsyncDataStream: Retries keep executing after timeout

2022-12-14 Thread Lincoln Lee
hi,
   Is this case running like a it case locally, or a streaming job running
on a cluster? If it's the former, one thing I can think of is local testing
using bounded datasource(has few test records) that will end input very
fastly and then trigger the endOfInput logic of AsyncWaitOperator, that is
it finishes all in fight delayed retry items immediately(asyncInvoke will
be called as the last attempt before the operator exits and as the final
result, regardless of whether it has timed out or not), this may be one
more attempt than when the job does not end in normal running.
   For a long running job, the retry will start from stratch when job
recover from restart(regardless of how many times it has been retried
before), this may also result more attempts and longer time for retry
elements.
   If you can provide more information about the test, maybe we can further
clarify what the problem is.

Best,
Lincoln Lee


Yoni Gibbs  于2022年12月13日周二 23:46写道:

> Hi,
>
> I've got a Kinesis consumer which reacts to each record by doing some
> async work using an implementation of RichAsyncFunction. I'm adding a
> retry strategy. After x failed attempts I want this to time out and give up
> by returning no data (i.e. not be treated as a failure).
>
> Here is a cut down version of my code, which works as expected (in Kotlin,
> I hope that's OK - can supply Java translation if required):
>
> val targetStream = AsyncDataStream
> .unorderedWaitWithRetry(
> inputStream,
> object : RichAsyncFunction() {
> override fun asyncInvoke(input: String, resultFuture: 
> ResultFuture) {
> println("Received input: $input")
> resultFuture.completeExceptionally(Exception("Error from 
> inside CompletableFuture"))
> }
>
> override fun timeout(input: String, resultFuture: 
> ResultFuture) {
> println("Timeout")
> resultFuture.complete(listOf())
> }
> },
> 4,
> TimeUnit.SECONDS,
> 100,
> AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(5, 2_000)
> .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
> .build()
> )
>
> This will time out after 4 seconds, and the retry strategy is set to retry
> every two seconds. If I run that I get the output I expect, namely:
>
> Received input: foo
> Received input: foo
> Timeout
>
> Importantly, I see that asyncInvoke is only called twice, because by the
> time the third invocation is due to occur, the timeout has already kicked
> in and marked this record as handled.
>
> However the above is clearly unrealistic as it calls
> resultFuture.completeExceptionally immediately rather than asynchronously
> after some work as taken place. So now I replace the asyncInvoke 
> implementation
> above with the following:
>
> override fun asyncInvoke(input: String, resultFuture: ResultFuture) {
> println("Received input: $input")
> CompletableFuture.supplyAsync {
> Thread.sleep(500)
> resultFuture.completeExceptionally(Exception("Error from inside 
> CompletableFuture"))
> }
> }
>
> Now I get output which I don't expect, which shows that after the timeout,
> asyncInvoke continues to be called a few more times.
>
> That seems wrong to me: shouldn't it stop being called because timeout has
> already been invoked and it called resultFuture.complete()?
>
> I might well just be misunderstanding something here.
>
> Thanks in advance,
>
> Yoni.
>


Re: flink 1.16 lookup join重试策略问题

2022-12-07 Thread Lincoln Lee
如果结束时还未关联上,就视为当前记录不存在,按 inner join 过滤或 left join 补 null 值
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/queries/hints/#%e5%bc%80%e5%90%af%e7%bc%93%e5%ad%98%e5%af%b9%e9%87%8d%e8%af%95%e7%9a%84%e5%bd%b1%e5%93%8d


Best,
Lincoln Lee


casel.chen  于2022年12月7日周三 23:40写道:

> 我们有场景会发生维表数据后于事实流表数据到达,使用flink 1.16 lookup
> join重试策略时,如果超过重试次数还没关联上会发生什么?待关联字段值都为null么?


Re: 如何扩展flink sql以实现延迟调用?

2022-12-06 Thread Lincoln Lee
双流 join 的场景下可以考虑关联条件引入时间属性,变成 interval join 可以实现一定的时间对齐(
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/joins/#interval-joins
)
另外如果可以使用 lookup join 单边驱动关联并且不是所有数据都需要等待的话,可以尝试 lookup join 的延迟重试
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup

Best,
Lincoln Lee


casel.chen  于2022年12月7日周三 11:52写道:

> 有人能够解答一下吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-26 11:20:34,"casel.chen"  写道:
> >双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink
> sql实现?如果当前不支持,需要怎样扩展flink sql呢?
>


Re: flinksql 维表join

2022-10-27 Thread Lincoln Lee
Hi,
   Flink 的 lookup join 目前不支持对维表进行预处理, 并且需要有对维表原始字段的等值连接条件(因为需要通过确定的字段值去查找)
示例中 t4 字段不做计算应该是 work 的, 比如 udf(t1.telephone_no) = t4.de_mobile

Best,
Lincoln Lee


Fei Han  于2022年10月27日周四 12:12写道:

> 大家好!请教几个问题
>  1. FlinkSQL维表join的时候,能不能把维表先用flinksql 当做一个临时表,然后在用临时表做维表join
> 例如:临时表
> WITH employee_tmp AS(
>  select
>  userid as userid,
>  name as name,
>  mobile as de_mobile
>  from ygp_dwd_catalog.flink_dwd.employee
> )
>  select
>  *
>  from ( select
>  *
>  from ygp_dwd_catalog.flink_dwd.xxx ) t1
>  left join employee_tmp FOR SYSTEM_TIME AS OF .
> 以上这样写对不对,测试的时候发现是null。
> 2.维表FOR SYSTEM_TIME AS OF 后面关联条件能否如下方式写:
> FOR SYSTEM_TIME AS OF t1.proc_time as t4 on udf(t1.telephone_no) =
> udf(t4.de_mobile)
> 上面写法报错如下:
>  Temporal table join requires an equality condition on fields of table。
> 但问题的关键是UDF在hive和presto都可以跑起来的。FlinkSQL就报了相同的条件要
>


Re: Flink 1.15 Deduplicate之后Interval Join出错

2022-10-17 Thread Lincoln Lee
Hi,
  基于事件时间的 first row 去重是可能会产生更新消息的, 所以在 interval join 还不支持更新输入时会报错,
当前一个可行的方式时考虑基于 proctime 进行 first row 去重 (这将不会输出更新消息)

Best,
Lincoln Lee


余列冰  于2022年10月15日周六 09:46写道:

> Hi!
>
> 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15
>
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
> ```sql
> CREATE TEMPORARY TABLE `source` (
>   id INT,
>   name STRING,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
>
> CREATE TEMPORARY TABLE B (
>   id INT,
>   `start` INT,
>   `end` INT,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
> create TEMPORARY view A as
> select id, name, event_time from (
>   select id, name, event_time,
>   row_number() over(partition by id, name, event_time order by event_time
> asc) as rn
>   from source
> )
> where rn = 1;
>
> SELECT *
> FROM A, B
> WHERE
> A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND
> A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND
> B.event_time + INTERVAL '10' SECOND;
> ```
>
> 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。
> ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME])
> ```
>
> 请问如何在使用Deduplicate之后进行Interval Join?
>
>
> > -原始邮件-
> > 发件人: LB 
> > 发送时间: 2022-10-15 09:39:31 (星期六)
> > 收件人: user-zh 
> > 抄送:
> > 主题: Flink 1.15 Deduplicate之后Interval Join出错
> >
> > 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval
> Join出现问题。我使用的Flink版本是1.15 我希望使用Flink的Interval
> Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE TEMPORARY TABLE `source`
> (   id INT,   name STRING,   event_time TIMESTAMP(3),   WATERMARK FOR
> event_time AS event_time ) WITH (   'connector' = 'datagen' ); CREATE
> TEMPORARY TABLE B (   id INT,   `start` INT,   `end` INT,   event_time
> TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) WITH (
>  'connector' = 'datagen' ); create TEMPORARY view A as select id, name,
> event_time from (   select id, name, event_time,   row_number()
> over(partition by id, name, event_time order by event_time asc) as rn
>  from source ) where rn = 1; SELECT * FROM A, B WHEREA.id = B.id AND
> A.id = B.`start` AND A.id <= B.`end` ANDA.event_time BETWEEN
> B.event_time - INTERVAL '10' SECOND ANDB.event_time + INTERVAL '10'
> SECOND; ``` 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?
>


Re: 来自潘明文的邮件

2022-06-24 Thread Lincoln Lee
Hi,
   邮件中直接贴图片无法正常看到,可以发下文本

Best,
Lincoln Lee


潘明文  于2022年6月24日周五 16:36写道:

> 你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.
>
>


Re: lookup join对应task无法正常恢复?

2022-06-24 Thread Lincoln Lee
Hi,
   请问使用的 mysql 维表是 flink sql 内置的 jdbc connector 吗? 如果是的话,对应内部的 cache 只是读缓存,
并不会持久化, 任务重启或者到达设定的缓存淘汰条件就失效了
   如果是自己开发的维表,建议增加相应的数据加载日志, 以便确认 failover 时的处理是否有异常

Best,
Lincoln Lee


Xuchao  于2022年6月24日周五 17:15写道:

> 您好!
> 我在使用flink时遇到一些问题。
> flink-1.14.4
> sqlserver-cdc-2.2.1
> yarn-per-job
>
> 我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点;
> sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据;
> 检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据;
>
> 以上,可能是什么问题,应该如何解决呢?
>
> 期待回复!
> best wishes!
>
> 附日志:
> 2022-06-24 14:55:45,950 ERROR
> com.ververica.cdc.debezium.internal.Handover [] - Reporting
> error:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the change event producer. This connector will be
> stopped.
> at
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_301]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_301]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
> Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数
> cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> ... 7 more
> 2022-06-24 14:55:45,953 INFO  io.debezium.embedded.EmbeddedEngine
> [] - Stopping the embedded engine
> 2022-06-24 14:55:45,954 INFO
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
> [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> carflow]], fields=[id, plate_license, site_id, create_time, flow_type,
> circle_id]) -> Calc(select=[id, plate_license, site_id, create_time,
> (create_time + -2880:INTERVAL HOUR) AS c_time, flow_type, circle_id])
> -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0
> discarding 0 drained requests
> 2022-06-24 14:55:45,955 INFO  io.debezium.embedded.EmbeddedEngine
> [] - Stopping the embedded engine
> 2022-06-24 14:55:45,957 WARN  org.apache.flink.runtime.taskmanager.Task
> [] - Source: TableSourceScan(table=[[default_catalog,
> default_database, carflow]], fields=[id, plate_license, site_id,
> create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license,
> site_id, create_time, (create_time + -2880:INTERVAL HOUR) AS c_time,
> flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time],
> watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02) switched
> from RUNNI

Re: Re: 使用join+聚合时,checkpoint异常

2022-06-21 Thread Lincoln Lee
Hi,
   确认了下, cdc source 目前全量结束后 task 还是保持的,不会 finish, 这里的 finished task 应该是你提到的
" 使用了lookup join + 外部mysql维表,任务开始时,全量加载了一次维表数据,对应task状态就变成了finished。"

Best,
Lincoln Lee


amber_...@qq.com.INVALID  于2022年6月21日周二 14:35写道:

> 非常感谢!你的建议很有用。
>
> 我在代码中添加execution.checkpointing.checkpoints-after-tasks-finish.enabled相关配置,完美解决了问题。
> 我使用了lookup join + 外部mysql维表,任务开始时,全量加载了一次维表数据,对应task状态就变成了finished。
>
> best wishes!
>
>
> amber_...@qq.com
>
> 发件人: Lincoln Lee
> 发送时间: 2022-06-21 11:18
> 收件人: user-zh
> 主题: Re: Re: 使用join+聚合时,checkpoint异常
> Hi,
>   从描述来看, 因为使用了 cdc source (猜测是先 全量 后增量同步),  全量阶段完成时对应的 task 会到达 finished
> 状态, 在 1.14 版本中, 对应的配置项 `
> execution.checkpointing.checkpoints-after-tasks-finish.enabled` 默认值是关闭的
> (1.15+ 版本默认会开启), 可以开启或升级到 1.15 版本后再观察下
>
> > because Some tasks of the job have already finished and checkpointing
> with finished tasks is not enabled
>
> Best,
> Lincoln Lee
>
>
> amber_...@qq.com.INVALID  于2022年6月21日周二 10:27写道:
>
> > 感谢!
> > 未发生背压,但我在日志中发现了一些异常信息,如下:
> > Failed to trigger checkpoint for job 297c5a840f8fd3a1cbcb63825200e8d4
> > because Some tasks of the job have already finished and checkpointing
> with
> > finished tasks is not enabled. Failure reason: Not all required tasks are
> > currently running.
> >
> > 通过web ui可以看到,确实有一部分任务是finished状态。
> >
> > 是否因为我关联多张维表的时候,同时使用了lookup join和普通join呢?
> > --
> > amber_...@qq.com
> >
> >
> > *发件人:* Shengkai Fang 
> > *发送时间:* 2022-06-21 09:53
> > *收件人:* user-zh 
> > *主题:* Re: 使用join+聚合时,checkpoint异常
> > hi.
> >
> > 这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。
> >
> > Best,
> > Shengkai
> >
> > amber_...@qq.com.INVALID  于2022年6月21日周二
> 09:43写道:
> >
> > > 您好!
> > > 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> > > 当我提交普通数据同步任务时,一切正常;
> > > 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
> > > Memory使用率始终是100%;
> > > 以下是我的checkpoint配置:
> > >
> > >
> > > 我尝试增加Task Managed内存,但使用率总是100%;
> > > 当我关闭增量检查点时,无任何变化;
> > > 当我将State Backend切换为hashmap时,Managed
> Memory使用率回归正常,但checkpoint仍然无法工作;
> > >
> > > 期待你的回复。
> > > 祝好!
> > > --
> > > amber_...@qq.com
> > >
> >
> >
>


Re: Re: 使用join+聚合时,checkpoint异常

2022-06-20 Thread Lincoln Lee
Hi,
  从描述来看, 因为使用了 cdc source (猜测是先 全量 后增量同步),  全量阶段完成时对应的 task 会到达 finished
状态, 在 1.14 版本中, 对应的配置项 `
execution.checkpointing.checkpoints-after-tasks-finish.enabled` 默认值是关闭的
(1.15+ 版本默认会开启), 可以开启或升级到 1.15 版本后再观察下

> because Some tasks of the job have already finished and checkpointing
with finished tasks is not enabled

Best,
Lincoln Lee


amber_...@qq.com.INVALID  于2022年6月21日周二 10:27写道:

> 感谢!
> 未发生背压,但我在日志中发现了一些异常信息,如下:
> Failed to trigger checkpoint for job 297c5a840f8fd3a1cbcb63825200e8d4
> because Some tasks of the job have already finished and checkpointing with
> finished tasks is not enabled. Failure reason: Not all required tasks are
> currently running.
>
> 通过web ui可以看到,确实有一部分任务是finished状态。
>
> 是否因为我关联多张维表的时候,同时使用了lookup join和普通join呢?
> --
> amber_...@qq.com
>
>
> *发件人:* Shengkai Fang 
> *发送时间:* 2022-06-21 09:53
> *收件人:* user-zh 
> *主题:* Re: 使用join+聚合时,checkpoint异常
> hi.
>
> 这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。
>
> Best,
> Shengkai
>
> amber_...@qq.com.INVALID  于2022年6月21日周二 09:43写道:
>
> > 您好!
> > 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> > 当我提交普通数据同步任务时,一切正常;
> > 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
> > Memory使用率始终是100%;
> > 以下是我的checkpoint配置:
> >
> >
> > 我尝试增加Task Managed内存,但使用率总是100%;
> > 当我关闭增量检查点时,无任何变化;
> > 当我将State Backend切换为hashmap时,Managed Memory使用率回归正常,但checkpoint仍然无法工作;
> >
> > 期待你的回复。
> > 祝好!
> > --
> > amber_...@qq.com
> >
>
>


Re: 关于flinksql聚合函数实现的学习疑问

2022-06-01 Thread Lincoln Lee
flink sql 的实现可以参考下 flink-table planner 部分的代码
从 datastream 层面来说, 可以基于 KeyedProcessFunction, 比如
datastream.keyby(...).process(keyedProcessFunction)... 来实现自己定义的逻辑

Best,
Lincoln Lee


hdxg1101300...@163.com  于2022年6月1日周三 15:49写道:

> 您好:
>最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
>比如这样一条sql语句:
>  select
> dim,
> count(*) as pv,
> sum(price) as sum_price,
> max(price) as max_price,
> min(price) as min_price,
> -- 计算 uv 数
> count(distinct user_id) as uv,
> UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS
> STRING)) * 1000  as window_start
> from source_table
> group by
> dim,
> tumble(row_time, interval '1' minute);
> 在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
> 如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数
> aggregate(AggregateFunction aggFunction, WindowFunction W> windowFunction)
>
> 是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
> 谢谢!
>
>
> hdxg1101300...@163.com
>


Re: 1.13.5版本sql大小64k限制bug

2022-05-26 Thread Lincoln Lee
是 compile 遇到  beyond 64k 异常了?
可以考虑使用 1.14 及以上版本, 引入了自动的 code splitting:
https://issues.apache.org/jira/browse/FLINK-23007

Best,
Lincoln Lee


godfrey he  于2022年5月26日周四 23:29写道:

> 确认一下是sql文本超过64k?具体的异常是什么?
>
> Yun Tang  于2022年5月26日周四 10:06写道:
> >
> > Hi
> >
> > 请使用英文在dev社区发送邮件。另外关于使用方面的问题,建议向user-zh 频道发送,已经帮你转发到相关邮件列表了。
> >
> >
> > 祝好
> > 唐云
> > 
> > From: Lose control ./ <286296...@qq.com.INVALID>
> > Sent: Tuesday, May 24, 2022 9:15
> > To: dev 
> > Subject: 1.13.5版本sql大小64k限制bug
> >
> > 请问各位大神,1.13.5版本sql大小64k限制如何修改啊?谢谢
>


Re: 回撤流的窗口统计

2021-06-10 Thread Lincoln Lee
你好,
  目前的 tumble window 还不支持输入是更新流, 可以用 group by ts 配合 state ttl 来实现。
'table.exec.state.ttl' 是作业级别设置, 当前 sql 还不支持更细粒度的 state ttl 设置。如果有具体的想法,可以在
flink dev 邮件列表发起讨论或提交 jira issue,欢迎贡献社区!

lincoln lee


casel.chen  于2021年6月5日周六 下午2:24写道:

> 上游是binlog cdc消费获取的回撤流,现要使用flink sql统计分析该回撤流上每5分钟的sum值,不能使用常规tumble
> window是吗?只能使用group by ts配合state TTL进行?
> 另外,问一下flink sql的state TTL只能是全局设置吗?能够通过在sql hint上添加从而可以细粒度控制吗?


Re: flinksql ttl不生效

2021-06-09 Thread Lincoln Lee
你好,
sql 作业可以尝试设置作业参数 "table.exec.state.ttl" 观察下效果
另外开启 "state.backend.incremental" 也可以减少 checkpoint 的大小
参数说明:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#checkpoints-and-state-backends

lincoln lee


chenchencc <1353637...@qq.com> 于2021年6月8日周二 下午3:11写道:

> 版本:1.12.2
> sql:
> SELECT id, name, message,ts
> SELECT
> ROW_NUMBER() OVER (PARTITION BY name
> ORDER BY ts DESC) AS rowNum
> FROM persons_message_table_kafka
> WHERE rowNum = 1
> 过期时间设置:tableEnv.getConfig().setIdleStateRetention(Duration.ofhour(3));
>
> 问题:checkpoint数据一直在线上增加,一开始90m,然后每天增长20m,但是源数据并没有太多增长
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>