Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Jark Wu
Congratulations and welcome!

Best,
Jark

On Thu, 21 Mar 2024 at 10:35, Rui Fan <1996fan...@gmail.com> wrote:

> Congratulations!
>
> Best,
> Rui
>
> On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan  wrote:
>
> > Congrattulations!
> >
> > Best,
> > Hang
> >
> > Lincoln Lee  于2024年3月21日周四 09:54写道:
> >
> >>
> >> Congrats, thanks for the great work!
> >>
> >>
> >> Best,
> >> Lincoln Lee
> >>
> >>
> >> Peter Huang  于2024年3月20日周三 22:48写道:
> >>
> >>> Congratulations
> >>>
> >>>
> >>> Best Regards
> >>> Peter Huang
> >>>
> >>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang 
> wrote:
> >>>
> 
>  Congratulations
> 
> 
> 
>  Best,
>  Huajie Wang
> 
> 
> 
>  Leonard Xu  于2024年3月20日周三 21:36写道:
> 
> > Hi devs and users,
> >
> > We are thrilled to announce that the donation of Flink CDC as a
> > sub-project of Apache Flink has completed. We invite you to explore
> the new
> > resources available:
> >
> > - GitHub Repository: https://github.com/apache/flink-cdc
> > - Flink CDC Documentation:
> > https://nightlies.apache.org/flink/flink-cdc-docs-stable
> >
> > After Flink community accepted this donation[1], we have completed
> > software copyright signing, code repo migration, code cleanup,
> website
> > migration, CI migration and github issues migration etc.
> > Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
> > Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
> contributors
> > for their contributions and help during this process!
> >
> >
> > For all previous contributors: The contribution process has slightly
> > changed to align with the main Flink project. To report bugs or
> suggest new
> > features, please open tickets
> > Apache Jira (https://issues.apache.org/jira).  Note that we will no
> > longer accept GitHub issues for these purposes.
> >
> >
> > Welcome to explore the new repository and documentation. Your
> feedback
> > and contributions are invaluable as we continue to improve Flink CDC.
> >
> > Thanks everyone for your support and happy exploring Flink CDC!
> >
> > Best,
> > Leonard
> > [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
> >
> >
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Jark Wu
Congratulations and welcome!

Best,
Jark

On Thu, 21 Mar 2024 at 10:35, Rui Fan <1996fan...@gmail.com> wrote:

> Congratulations!
>
> Best,
> Rui
>
> On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan  wrote:
>
> > Congrattulations!
> >
> > Best,
> > Hang
> >
> > Lincoln Lee  于2024年3月21日周四 09:54写道:
> >
> >>
> >> Congrats, thanks for the great work!
> >>
> >>
> >> Best,
> >> Lincoln Lee
> >>
> >>
> >> Peter Huang  于2024年3月20日周三 22:48写道:
> >>
> >>> Congratulations
> >>>
> >>>
> >>> Best Regards
> >>> Peter Huang
> >>>
> >>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang 
> wrote:
> >>>
> 
>  Congratulations
> 
> 
> 
>  Best,
>  Huajie Wang
> 
> 
> 
>  Leonard Xu  于2024年3月20日周三 21:36写道:
> 
> > Hi devs and users,
> >
> > We are thrilled to announce that the donation of Flink CDC as a
> > sub-project of Apache Flink has completed. We invite you to explore
> the new
> > resources available:
> >
> > - GitHub Repository: https://github.com/apache/flink-cdc
> > - Flink CDC Documentation:
> > https://nightlies.apache.org/flink/flink-cdc-docs-stable
> >
> > After Flink community accepted this donation[1], we have completed
> > software copyright signing, code repo migration, code cleanup,
> website
> > migration, CI migration and github issues migration etc.
> > Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
> > Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
> contributors
> > for their contributions and help during this process!
> >
> >
> > For all previous contributors: The contribution process has slightly
> > changed to align with the main Flink project. To report bugs or
> suggest new
> > features, please open tickets
> > Apache Jira (https://issues.apache.org/jira).  Note that we will no
> > longer accept GitHub issues for these purposes.
> >
> >
> > Welcome to explore the new repository and documentation. Your
> feedback
> > and contributions are invaluable as we continue to improve Flink CDC.
> >
> > Thanks everyone for your support and happy exploring Flink CDC!
> >
> > Best,
> > Leonard
> > [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
> >
> >
>


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Jark Wu
Congrats! Thanks Lincoln, Jing, Yun and Martijn driving this release.
Thanks all who involved this release!

Best,
Jark


On Mon, 18 Mar 2024 at 16:31, Rui Fan <1996fan...@gmail.com> wrote:

> Congratulations, thanks for the great work!
>
> Best,
> Rui
>
> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
> wrote:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19
> series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements
> > for this bugfix release:
> >
> >
> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> >
> > Best,
> > Yun, Jing, Martijn and Lincoln
> >
>


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Jark Wu
Congrats! Thanks Lincoln, Jing, Yun and Martijn driving this release.
Thanks all who involved this release!

Best,
Jark


On Mon, 18 Mar 2024 at 16:31, Rui Fan <1996fan...@gmail.com> wrote:

> Congratulations, thanks for the great work!
>
> Best,
> Rui
>
> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
> wrote:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19
> series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements
> > for this bugfix release:
> >
> >
> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> >
> > Best,
> > Yun, Jing, Martijn and Lincoln
> >
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Jark Wu
Congratulations and thanks release managers and everyone who has
contributed!

Best,
Jark

On Fri, 27 Oct 2023 at 12:25, Hang Ruan  wrote:

> Congratulations!
>
> Best,
> Hang
>
> Samrat Deb  于2023年10月27日周五 11:50写道:
>
> > Congratulations on the great release
> >
> > Bests,
> > Samrat
> >
> > On Fri, 27 Oct 2023 at 7:59 AM, Yangze Guo  wrote:
> >
> > > Great work! Congratulations to everyone involved!
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Fri, Oct 27, 2023 at 10:23 AM Qingsheng Ren 
> wrote:
> > > >
> > > > Congratulations and big THANK YOU to everyone helping with this
> > release!
> > > >
> > > > Best,
> > > > Qingsheng
> > > >
> > > > On Fri, Oct 27, 2023 at 10:18 AM Benchao Li 
> > > wrote:
> > > >>
> > > >> Great work, thanks everyone involved!
> > > >>
> > > >> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
> > > >> >
> > > >> > Thanks for the great work!
> > > >> >
> > > >> > Best,
> > > >> > Rui
> > > >> >
> > > >> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam 
> > > wrote:
> > > >> >
> > > >> > > Finally! Thanks to all!
> > > >> > >
> > > >> > > Best,
> > > >> > > Paul Lam
> > > >> > >
> > > >> > > > 2023年10月27日 03:58,Alexander Fedulov <
> > alexander.fedu...@gmail.com>
> > > 写道:
> > > >> > > >
> > > >> > > > Great work, thanks everyone!
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Alexander
> > > >> > > >
> > > >> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser <
> > > martijnvis...@apache.org>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > >> Thank you all who have contributed!
> > > >> > > >>
> > > >> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin <
> > > jinfeng1...@gmail.com>
> > > >> > > >>
> > > >> > > >>> Thanks for the great work! Congratulations
> > > >> > > >>>
> > > >> > > >>>
> > > >> > > >>> Best,
> > > >> > > >>> Feng Jin
> > > >> > > >>>
> > > >> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu <
> > xbjt...@gmail.com>
> > > wrote:
> > > >> > > >>>
> > > >> > >  Congratulations, Well done!
> > > >> > > 
> > > >> > >  Best,
> > > >> > >  Leonard
> > > >> > > 
> > > >> > >  On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee <
> > > lincoln.8...@gmail.com>
> > > >> > >  wrote:
> > > >> > > 
> > > >> > > > Thanks for the great work! Congrats all!
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Lincoln Lee
> > > >> > > >
> > > >> > > >
> > > >> > > > Jing Ge  于2023年10月27日周五
> > 00:16写道:
> > > >> > > >
> > > >> > > >> The Apache Flink community is very happy to announce the
> > > release of
> > > >> > > > Apache
> > > >> > > >> Flink 1.18.0, which is the first release for the Apache
> > > Flink 1.18
> > > >> > > > series.
> > > >> > > >>
> > > >> > > >> Apache Flink® is an open-source unified stream and batch
> > data
> > > >> > >  processing
> > > >> > > >> framework for distributed, high-performing,
> > > always-available, and
> > > >> > > > accurate
> > > >> > > >> data applications.
> > > >> > > >>
> > > >> > > >> The release is available for download at:
> > > >> > > >> https://flink.apache.org/downloads.html
> > > >> > > >>
> > > >> > > >> Please check out the release blog post for an overview of
> > the
> > > >> > > > improvements
> > > >> > > >> for this release:
> > > >> > > >>
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > >
> >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > > >> > > >>
> > > >> > > >> The full release notes are available in Jira:
> > > >> > > >>
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > > >> > > >>
> > > >> > > >> We would like to thank all contributors of the Apache
> Flink
> > > >> > > >> community
> > > >> > >  who
> > > >> > > >> made this release possible!
> > > >> > > >>
> > > >> > > >> Best regards,
> > > >> > > >> Konstantin, Qingsheng, Sergey, and Jing
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > > >> > >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >>
> > > >> Best,
> > > >> Benchao Li
> > >
> >
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Jark Wu
Congratulations and thanks release managers and everyone who has
contributed!

Best,
Jark

On Fri, 27 Oct 2023 at 12:25, Hang Ruan  wrote:

> Congratulations!
>
> Best,
> Hang
>
> Samrat Deb  于2023年10月27日周五 11:50写道:
>
> > Congratulations on the great release
> >
> > Bests,
> > Samrat
> >
> > On Fri, 27 Oct 2023 at 7:59 AM, Yangze Guo  wrote:
> >
> > > Great work! Congratulations to everyone involved!
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Fri, Oct 27, 2023 at 10:23 AM Qingsheng Ren 
> wrote:
> > > >
> > > > Congratulations and big THANK YOU to everyone helping with this
> > release!
> > > >
> > > > Best,
> > > > Qingsheng
> > > >
> > > > On Fri, Oct 27, 2023 at 10:18 AM Benchao Li 
> > > wrote:
> > > >>
> > > >> Great work, thanks everyone involved!
> > > >>
> > > >> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
> > > >> >
> > > >> > Thanks for the great work!
> > > >> >
> > > >> > Best,
> > > >> > Rui
> > > >> >
> > > >> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam 
> > > wrote:
> > > >> >
> > > >> > > Finally! Thanks to all!
> > > >> > >
> > > >> > > Best,
> > > >> > > Paul Lam
> > > >> > >
> > > >> > > > 2023年10月27日 03:58,Alexander Fedulov <
> > alexander.fedu...@gmail.com>
> > > 写道:
> > > >> > > >
> > > >> > > > Great work, thanks everyone!
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Alexander
> > > >> > > >
> > > >> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser <
> > > martijnvis...@apache.org>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > >> Thank you all who have contributed!
> > > >> > > >>
> > > >> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin <
> > > jinfeng1...@gmail.com>
> > > >> > > >>
> > > >> > > >>> Thanks for the great work! Congratulations
> > > >> > > >>>
> > > >> > > >>>
> > > >> > > >>> Best,
> > > >> > > >>> Feng Jin
> > > >> > > >>>
> > > >> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu <
> > xbjt...@gmail.com>
> > > wrote:
> > > >> > > >>>
> > > >> > >  Congratulations, Well done!
> > > >> > > 
> > > >> > >  Best,
> > > >> > >  Leonard
> > > >> > > 
> > > >> > >  On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee <
> > > lincoln.8...@gmail.com>
> > > >> > >  wrote:
> > > >> > > 
> > > >> > > > Thanks for the great work! Congrats all!
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Lincoln Lee
> > > >> > > >
> > > >> > > >
> > > >> > > > Jing Ge  于2023年10月27日周五
> > 00:16写道:
> > > >> > > >
> > > >> > > >> The Apache Flink community is very happy to announce the
> > > release of
> > > >> > > > Apache
> > > >> > > >> Flink 1.18.0, which is the first release for the Apache
> > > Flink 1.18
> > > >> > > > series.
> > > >> > > >>
> > > >> > > >> Apache Flink® is an open-source unified stream and batch
> > data
> > > >> > >  processing
> > > >> > > >> framework for distributed, high-performing,
> > > always-available, and
> > > >> > > > accurate
> > > >> > > >> data applications.
> > > >> > > >>
> > > >> > > >> The release is available for download at:
> > > >> > > >> https://flink.apache.org/downloads.html
> > > >> > > >>
> > > >> > > >> Please check out the release blog post for an overview of
> > the
> > > >> > > > improvements
> > > >> > > >> for this release:
> > > >> > > >>
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > >
> >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > > >> > > >>
> > > >> > > >> The full release notes are available in Jira:
> > > >> > > >>
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > > >> > > >>
> > > >> > > >> We would like to thank all contributors of the Apache
> Flink
> > > >> > > >> community
> > > >> > >  who
> > > >> > > >> made this release possible!
> > > >> > > >>
> > > >> > > >> Best regards,
> > > >> > > >> Konstantin, Qingsheng, Sergey, and Jing
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > > >> > >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >>
> > > >> Best,
> > > >> Benchao Li
> > >
> >
>


Re: [DISCUSS][FLINK-31788][FLINK-33015] Add back Support emitUpdateWithRetract for TableAggregateFunction

2023-09-07 Thread Jark Wu
+1 to fix it first.

I also agree to deprecate it if there are few people using it,
but this should be another discussion thread within dev+user ML.

In the future, we are planning to introduce user-defined-operator
based on the TVF functionality which I think can fully subsume
the UDTAG, cc @Timo Walther .

Best,
Jark

On Thu, 7 Sept 2023 at 11:44, Jane Chan  wrote:

> Hi devs,
>
> Recently, we noticed an issue regarding a feature regression related to
> Table API. `org.apache.flink.table.functions.TableAggregateFunction`
> provides an API `emitUpdateWithRetract` [1] to cope with updated values,
> but it's not being called in the code generator. As a result, even if users
> override this method, it does not work as intended.
>
> This issue has been present since version 1.15 (when the old planner was
> deprecated), but surprisingly, only two users have raised concerns about it
> [2][3].
>
> So, I would like to initiate a discussion to bring it back. Of course, if
> few users use it, we can also consider deprecating it.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#retraction-example
> [2] https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp
> [3] https://www.mail-archive.com/user-zh@flink.apache.org/msg15230.html
>
> Best,
> Jane
>


Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread Jark Wu
Congrats everyone!

Best,
Jark

> 2023年7月3日 22:37,Yuval Itzchakov  写道:
> 
> Congrats team!
> 
> On Mon, Jul 3, 2023, 17:28 Jing Ge via user  > wrote:
>> Congratulations!
>> 
>> Best regards,
>> Jing
>> 
>> 
>> On Mon, Jul 3, 2023 at 3:21 PM yuxia > > wrote:
>>> Congratulations!
>>> 
>>> Best regards,
>>> Yuxia
>>> 
>>> 发件人: "Pushpa Ramakrishnan" >> >
>>> 收件人: "Xintong Song" mailto:tonysong...@gmail.com>>
>>> 抄送: "dev" mailto:d...@flink.apache.org>>, "User" 
>>> mailto:user@flink.apache.org>>
>>> 发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30
>>> 主题: Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award
>>> 
>>> Congratulations \uD83E\uDD73 
>>> 
>>> On 03-Jul-2023, at 3:30 PM, Xintong Song >> > wrote:
>>> 
>>> 
>>> Dear Community,
>>> 
>>> I'm pleased to share this good news with everyone. As some of you may have 
>>> already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1].
>>> 
>>> "Apache Flink greatly expanded the use of stream data-processing." -- 
>>> SIGMOD Awards Committee
>>> 
>>> SIGMOD is one of the most influential data management research conferences 
>>> in the world. The Systems Award is awarded to an individual or set of 
>>> individuals to recognize the development of a software or hardware system 
>>> whose technical contributions have had significant impact on the theory or 
>>> practice of large-scale data management systems. Winning of the award 
>>> indicates the high recognition of Flink's technological advancement and 
>>> industry influence from academia.
>>> 
>>> As an open-source project, Flink wouldn't have come this far without the 
>>> wide, active and supportive community behind it. Kudos to all of us who 
>>> helped make this happen, including the over 1,400 contributors and many 
>>> others who contributed in ways beyond code.
>>> 
>>> Best,
>>> Xintong (on behalf of the Flink PMC)
>>> 
>>> [1] https://sigmod.org/2023-sigmod-systems-award/
>>> 



Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-06-01 Thread Jark Wu
+1, I think this can make the grammar more clear.
Please remember to add a release note once the issue is finished.

Best,
Jark

On Thu, 1 Jun 2023 at 11:28, yuxia  wrote:

> Hi, Jingsong. It's hard to provide an option regarding to the fact that we
> also want to decouple Hive with flink planner.
> If we still need this fall back behavior, we will still depend on
> `ParserImpl` provided by flink-table-planner  on HiveParser.
> But to try best to minimize the impact to users and more user-friendly,
> I'll remind users may use set table.sql-dialect = default to switch to
> Flink's default dialect in error message when fail to parse the sql in
> HiveParser.
>
> Best regards,
> Yuxia
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Jingsong Li" 
> 收件人: "Rui Li" 
> 抄送: "dev" , "yuxia" ,
> "User" 
> 发送时间: 星期二, 2023年 5 月 30日 下午 3:21:56
> 主题: Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default
> dialect
>
> +1, the fallback looks weird now, it is outdated.
>
> But, it is good to provide an option. I don't know if there are some
> users who depend on this fallback.
>
> Best,
> Jingsong
>
> On Tue, May 30, 2023 at 1:47 PM Rui Li  wrote:
> >
> > +1, the fallback was just intended as a temporary workaround to run
> catalog/module related statements with hive dialect.
> >
> > On Mon, May 29, 2023 at 3:59 PM Benchao Li  wrote:
> >>
> >> Big +1 on this, thanks yuxia for driving this!
> >>
> >> yuxia  于2023年5月29日周一 14:55写道:
> >>
> >> > Hi, community.
> >> >
> >> > I want to start the discussion about Hive dialect shouldn't fall back
> to
> >> > Flink's default dialect.
> >> >
> >> > Currently, when the HiveParser fail to parse the sql in Hive dialect,
> >> > it'll fall back to Flink's default parser[1] to handle flink-specific
> >> > statements like "CREATE CATALOG xx with (xx);".
> >> >
> >> > As I‘m involving with Hive dialect and have some communication with
> >> > community users who use Hive dialectrecently,  I'm thinking throw
> exception
> >> > directly instead of falling back to Flink's default dialect when fail
> to
> >> > parse the sql in Hive dialect
> >> >
> >> > Here're some reasons:
> >> >
> >> > First of all, it'll hide some error with Hive dialect. For example, we
> >> > found we can't use Hive dialect any more with Flink sql client in
> release
> >> > validation phase[2], finally we find a modification in Flink sql
> client
> >> > cause it, but our test case can't find it earlier for although
> HiveParser
> >> > faill to parse it but then it'll fall back to default parser and pass
> test
> >> > case successfully.
> >> >
> >> > Second, conceptually, Hive dialect should be do nothing with Flink's
> >> > default dialect. They are two totally different dialect. If we do
> need a
> >> > dialect mixing Hive dialect and default dialect , may be we need to
> propose
> >> > a new hybrid dialect and announce the hybrid behavior to users.
> >> > Also, It made some users confused for the fallback behavior. The fact
> >> > comes from I had been ask by community users. Throw an excpetioin
> directly
> >> > when fail to parse the sql statement in Hive dialect will be more
> intuitive.
> >> >
> >> > Last but not least, it's import to decouple Hive with Flink planner[3]
> >> > before we can externalize Hive connector[4]. If we still fall back to
> Flink
> >> > default dialct, then we will need depend on `ParserImpl` in Flink
> planner,
> >> > which will block us removing the provided dependency of Hive dialect
> as
> >> > well as externalizing Hive connector.
> >> >
> >> > Although we hadn't announced the fall back behavior ever, but some
> users
> >> > may implicitly depend on this behavior in theirs sql jobs. So, I
> hereby
> >> > open the dicussion about abandoning the fall back behavior to make
> Hive
> >> > dialect clear and isoloted.
> >> > Please remember it won't break the Hive synatax but the syntax
> specified
> >> > to Flink may fail after then. But for the failed sql, you can use `SET
> >> > table.sql-dialect=default;` to switch to Flink dialect.
> >> > If there's some flink-specific statements we found should be included
> in
> >> > Hive dialect to be easy to use, I think we can still add them as
> specific
> >> > cases to Hive dialect.
> >> >
> >> > Look forwards to your feedback. I'd love to listen the feedback from
> >> > community to take the next steps.
> >> >
> >> > [1]:
> >> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348
> >> > [2]:https://issues.apache.org/jira/browse/FLINK-26681
> >> > [3]:https://issues.apache.org/jira/browse/FLINK-31413
> >> > [4]:https://issues.apache.org/jira/browse/FLINK-30064
> >> >
> >> >
> >> >
> >> > Best regards,
> >> > Yuxia
> >> >
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >
> >
> >
> > --
> > Best regards!
> > Rui Li
>


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-13 Thread Jark Wu
t;>>>> > > The discussion in this PR [1] shows some details and could be
>>>>>> helpful to understand the original motivation of the renaming. We do 
>>>>>> have a
>>>>>> test case for guarding metrics but unfortunaly the case was also modified
>>>>>> so the defense was broken.
>>>>>> > >
>>>>>> > > I think the reason why both the developer and the reviewer forgot
>>>>>> to trigger an discussion and gave a green pass on the change is that
>>>>>> metrics are quite “trivial” to be noticed as public APIs. As mentioned by
>>>>>> Martijn I couldn’t find a place noting that metrics are public APIs and
>>>>>> should be treated carefully while contributing and reviewing.
>>>>>> > >
>>>>>> > > IMHO three actions could be made to prevent this kind of changes
>>>>>> in the future:
>>>>>> > >
>>>>>> > > a. Add test case for metrics (which we already have in
>>>>>> SinkMetricsITCase)
>>>>>> > > b. We emphasize that any public-interface breaking changes should
>>>>>> be proposed by a FLIP or discussed in mailing list, and should be listed 
>>>>>> in
>>>>>> the release note.
>>>>>> > > c. We remind contributors and reviewers about what should be
>>>>>> considered as public API, and include metric names in it.
>>>>>> > >
>>>>>> > > For b and c these two pages [2][3] might be proper places.
>>>>>> > >
>>>>>> > > About the patch to revert this, it looks like we have a consensus
>>>>>> on 1.16. As of 1.15 I think it’s worthy to trigger a minor version. I
>>>>>> didn’t see complaints about this for now so it should be OK to save the
>>>>>> situation asap. I’m with Xintong’s idea to treat numXXXSend as an alias 
>>>>>> of
>>>>>> numXXXOut considering there could possibly some users have already 
>>>>>> adapted
>>>>>> their system to the new naming, and have another internal metric for
>>>>>> reflecting number of outgoing committable batches (actually the
>>>>>> numRecordsIn of sink committer operator should be carrying this info
>>>>>> already).
>>>>>> > >
>>>>>> > > [1] https://github.com/apache/flink/pull/18825
>>>>>> > > [2] https://flink.apache.org/contributing/contribute-code.html
>>>>>> > > [3] https://flink.apache.org/contributing/reviewing-prs.html
>>>>>> > >
>>>>>> > > Best,
>>>>>> > > Qingsheng
>>>>>> > > On Oct 10, 2022, 17:40 +0800, Xintong Song ,
>>>>>> wrote:
>>>>>> > >
>>>>>> > > +1 for reverting these changes in Flink 1.16.
>>>>>> > >
>>>>>> > > For 1.15.3, can we make these metrics available via both names
>>>>>> (numXXXOut and numXXXSend)? In this way we don't break it for those who
>>>>>> already migrated to 1.15 and numXXXSend. That means we still need to 
>>>>>> change
>>>>>> SinkWriterOperator to use another metric name in 1.15.3, which IIUC is
>>>>>> internal to Flink sink.
>>>>>> > >
>>>>>> > > I'm overall +1 to change numXXXOut back to its original
>>>>>> semantics. AFAIK (from meetup / flink-forward questionaires), most users 
>>>>>> do
>>>>>> not migrate to a new Flink release immediately, until the next 1-2 major
>>>>>> releases are out.
>>>>>> > >
>>>>>> > > Best,
>>>>>> > >
>>>>>> > > Xintong
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > > On Mon, Oct 10, 2022 at 5:26 PM Martijn Visser <
>>>>>> martijnvis...@apache.org> wrote:
>>>>>> > >>
>>>>>> > >> Hi Qingsheng,
>>>>>> > >>
>>>>>> > >> Do you have any idea what has happened in the process here? Do
>>>>>> we know why
>>>>>> > >> they were changed? I 

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Jark Wu
Thanks for discovering this problem, Qingsheng!

I'm also +1 for reverting the breaking changes.

IIUC, currently, the behavior of "numXXXOut" metrics of the new and old
sink is inconsistent.
We have to break one of them to have consistent behavior. Sink V2 is an
evolving API which is just introduced in 1.15.
I think it makes sense to break the unstable API instead of the stable API
which many connectors and users depend on.

Best,
Jark



On Mon, 10 Oct 2022 at 11:36, Jingsong Li  wrote:

> Thanks for driving, Qingsheng.
>
> +1 for reverting sink metric name.
>
> We often forget that metric is also one of the important APIs.
>
> +1 for releasing 1.15.3 to fix this.
>
> Best,
> Jingsong
>
> On Sun, Oct 9, 2022 at 11:35 PM Becket Qin  wrote:
> >
> > Thanks for raising the discussion, Qingsheng,
> >
> > +1 on reverting the breaking changes.
> >
> > In addition, we might want to release a 1.15.3 to fix this and update
> the previous release docs with this known issue, so that users can upgrade
> to 1.15.3 when they hit it. It would also be good to add some backwards
> compatibility tests on metrics to avoid unintended breaking changes like
> this in the future.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sun, Oct 9, 2022 at 10:35 AM Qingsheng Ren  wrote:
> >>
> >> Hi devs and users,
> >>
> >> I’d like to start a discussion about reverting a breaking change about
> sink metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].
> >>
> >> TL;DR
> >>
> >> All sink metrics with name “numXXXOut” defined in FLIP-33 are replace
> by “numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric names
> are public APIs, this is a breaking change to end users and not backward
> compatible. Also unfortunately this breaking change was not discussed in
> the mailing list before.
> >>
> >> Background
> >>
> >> As defined previously in FLIP-33 (the FLIP page has been changed so
> please refer to the old version [3] ), metric “numRecordsOut” is used for
> reporting the total number of output records since the sink started (number
> of records written to the external system), and similarly for
> “numRecordsOutPerSecond”, “numBytesOut”, “numBytesOutPerSecond” and
> “numRecordsOutError”. Most sinks are following this naming and definition.
> However, these metrics are ambiguous in the new Sink API as “numXXXOut”
> could be used by the output of SinkWriterOperator for reporting number of
> Committables delivered to SinkCommitterOperator. In order to resolve the
> conflict, FLINK-26126 and FLINK-26492 changed names of these metrics with
> “numXXXSend”.
> >>
> >> Necessity of reverting this change
> >>
> >> - Metric names are actually public API, as end users need to configure
> metric collecting and alerting system with metric names. Users have to
> reset all configurations related to affected metrics.
> >> - This could also affect custom and external sinks not maintained by
> Flink, which might have implemented with numXXXOut metrics.
> >> - The number of records sent to external system is way more important
> than the number of Committables sent to SinkCommitterOperator, as the
> latter one is just an internal implementation of sink. We could have a new
> metric name for the latter one instead.
> >> - We could avoid splitting the project by version (like “plz use
> numXXXOut before 1.15 and use numXXXSend after”) if we revert it ASAP,
> cosidering 1.16 is still not released for now.
> >>
> >> As a consequence, I’d like to hear from devs and users about your
> opinion on changing these metrics back to “numXXXOut”.
> >>
> >> Looking forward to your reply!
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-26126
> >> [2] https://issues.apache.org/jira/browse/FLINK-26492
> >> [1] FLIP-33, version 18:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883136
> >>
> >> Best,
> >> Qingsheng
>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jark Wu
Thank Xingtong for making this possible!

Cheers,
Jark Wu

On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:

> Hi everyone,
>
> I'm very happy to announce that the Apache Flink community has created a
> dedicated Slack workspace [1]. Welcome to join us on Slack.
>
> ## Join the Slack workspace
>
> You can join the Slack workspace by either of the following two ways:
> 1. Click the invitation link posted on the project website [2].
> 2. Ask anyone who already joined the Slack workspace to invite you.
>
> We recommend 2), if available. Due to Slack limitations, the invitation
> link in 1) expires and needs manual updates after every 100 invites. If it
> is expired, please reach out to the dev / user mailing lists.
>
> ## Community rules
>
> When using the community Slack workspace, please follow these community
> rules:
> * *Be respectful* - This is the most important rule!
> * All important decisions and conclusions *must be reflected back to the
> mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
> - The Apache Mottos [3]
> * Use *Slack threads* to keep parallel conversations from overwhelming a
> channel.
> * Please *do not direct message* people for troubleshooting, Jira assigning
> and PR review. These should be picked-up voluntarily.
>
>
> ## Maintenance
>
>
> Committers can refer to this wiki page [4] for information needed for
> maintaining the Slack workspace.
>
>
> Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.
>
>
> Best,
>
> Xintong
>
>
> [1] https://apache-flink.slack.com/
>
> [2] https://flink.apache.org/community.html#slack
>
> [3] http://theapacheway.com/on-list/
>
> [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jark Wu
Thank Xingtong for making this possible!

Cheers,
Jark Wu

On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:

> Hi everyone,
>
> I'm very happy to announce that the Apache Flink community has created a
> dedicated Slack workspace [1]. Welcome to join us on Slack.
>
> ## Join the Slack workspace
>
> You can join the Slack workspace by either of the following two ways:
> 1. Click the invitation link posted on the project website [2].
> 2. Ask anyone who already joined the Slack workspace to invite you.
>
> We recommend 2), if available. Due to Slack limitations, the invitation
> link in 1) expires and needs manual updates after every 100 invites. If it
> is expired, please reach out to the dev / user mailing lists.
>
> ## Community rules
>
> When using the community Slack workspace, please follow these community
> rules:
> * *Be respectful* - This is the most important rule!
> * All important decisions and conclusions *must be reflected back to the
> mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
> - The Apache Mottos [3]
> * Use *Slack threads* to keep parallel conversations from overwhelming a
> channel.
> * Please *do not direct message* people for troubleshooting, Jira assigning
> and PR review. These should be picked-up voluntarily.
>
>
> ## Maintenance
>
>
> Committers can refer to this wiki page [4] for information needed for
> maintaining the Slack workspace.
>
>
> Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.
>
>
> Best,
>
> Xintong
>
>
> [1] https://apache-flink.slack.com/
>
> [2] https://flink.apache.org/community.html#slack
>
> [3] http://theapacheway.com/on-list/
>
> [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-12 Thread Jark Wu
>> > platform where the responses can be searched and shared.
> > >>>>>>>> >> > >
> > >>>>>>>> >> > > It is currently the case that good questions on stack
> > >>>>>>>> overflow
> > >>>>>>>> >> > frequently go unanswered because no one with the necessary
> > >>>>>>>> expertise
> > >>>>>>>> >> takes
> > >>>>>>>> >> > the time to respond. If the Flink community has the
> > collective
> > >>>>>>>> energy
> > >>>>>>>> >> to do
> > >>>>>>>> >> > more user outreach, more involvement on stack overflow
> would
> > >>>>>>>> be a good
> > >>>>>>>> >> > place to start. Adding slack as another way for users to
> > >>>>>>>> request help
> > >>>>>>>> >> from
> > >>>>>>>> >> > those who are already actively providing support on the
> > >>>>>>>> existing
> > >>>>>>>> >> > communication channels might just lead to burnout.
> > >>>>>>>> >> > >
> > >>>>>>>> >> > > On the other hand, there are rather rare, but very
> > >>>>>>>> interesting cases
> > >>>>>>>> >> > where considerable back and forth is needed to figure out
> > >>>>>>>> what's going
> > >>>>>>>> >> on.
> > >>>>>>>> >> > This can happen, for example, when the requirements are
> > >>>>>>>> unusual, or
> > >>>>>>>> >> when a
> > >>>>>>>> >> > difficult to diagnose bug is involved. In these
> > circumstances,
> > >>>>>>>> something
> > >>>>>>>> >> > like slack is much better suited than email or stack
> > overflow.
> > >>>>>>>> >> > >
> > >>>>>>>> >> > > David
> > >>>>>>>> >> > >
> > >>>>>>>> >> > > On Fri, May 6, 2022 at 3:04 PM Becket Qin <
> > >>>>>>>> becket@gmail.com>
> > >>>>>>>> >> wrote:
> > >>>>>>>> >> > >>
> > >>>>>>>> >> > >> Thanks for the proposal, Xintong.
> > >>>>>>>> >> > >>
> > >>>>>>>> >> > >> While I share the same concerns as those mentioned in
> the
> > >>>>>>>> previous
> > >>>>>>>> >> > discussion thread, admittedly there are benefits of having
> a
> > >>>>>>>> slack
> > >>>>>>>> >> channel
> > >>>>>>>> >> > as a supplementary way to discuss Flink. The fact that this
> > >>>>>>>> topic is
> > >>>>>>>> >> raised
> > >>>>>>>> >> > once a while indicates lasting interests.
> > >>>>>>>> >> > >>
> > >>>>>>>> >> > >> Personally I am open to having such a slack channel.
> > >>>>>>>> Although it has
> > >>>>>>>> >> > drawbacks, it serves a different purpose. I'd imagine that
> > for
> > >>>>>>>> people
> > >>>>>>>> >> who
> > >>>>>>>> >> > prefer instant messaging, in absence of the slack channel,
> a
> > >>>>>>>> lot of
> > >>>>>>>> >> > discussions might just take place offline today, which
> leaves
> > >>>>>>>> no public
> > >>>>>>>> >> > record at all.
> > >>>>>>>> >> > >>
> > >>>>>>>> >> > >> One step further, if the channel is maintained by the
> > Flink
> > >>>>>>>> PMC, some
> > >

Re: Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-10 Thread Jark Wu
Hi Francesco,

Yes. The Hive syntax is a syntax plugin provided by Hive connector.

> But right now I don't think It's a good idea adding new features on top,
as it will create only more maintenance burden both for Hive developers and
for table developers.

We are not adding new Hive features, but fixing compatibility or behavior
bugs, and almost all of them
are just related to the Hive connector code, nothing to do with table
planner.

I agree we should investigate how to and how much work to decouple Hive
connector and planner ASAP.
We will come up with a google doc soon. But AFAIK, this may not be a huge
work and not conflict with the bugfix works.

Best,
Jark

On Thu, 10 Mar 2022 at 17:03, Francesco Guardiani 
wrote:

> > We still need some work to make the Hive dialect purely rely on public
> APIs, and the Hive connector should be decopule with table planner.
>
> From the table perspective, I think this is the big pain point at the
> moment. First of all, when we talk about the Hive syntax, we're really
> talking about the Hive connector, as my understanding is that without the
> Hive connector in the classpath you can't use the Hive syntax [1].
>
> The Hive connector is heavily relying on internals [2], and this is an
> important struggle for the table project, as sometimes is impedes and slows
> down development of new features and creates a huge maintenance burden for
> table developers [3]. The planner itself has some classes specific to Hive
> [4], making the codebase of the planner more complex than it already is.
> Some of these are just legacy, others exists because there are some
> abstractions missing in the table planner side, but those just need some
> work.
>
> So I agree with Jark, when the two Hive modules (connector-hive and
> sql-parser-hive) reach a point where they don't depend at all on
> flink-table-planner, like every other connector (except for testing of
> course), we should be good to move them in a separate repo and continue
> committing to them. But right now I don't think It's a good idea adding new
> features on top, as it will create only more maintenance burden both for
> Hive developers and for table developers.
>
> My concern with this plan is: how much realistic is to fix all the planner
> internal leaks in the existing Hive connector/parser? To me this seems like
> a huge task, including a non trivial amount of work to stabilize and design
> new entry points in Table API.
>
> [1] HiveParser
> <https://github.com/apache/flink/blob/a5847e3871ffb9515af9c754bd10c42611976c82/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java>
> [2] HiveParserCalcitePlanner
> <https://github.com/apache/flink/blob/6628237f72d818baec094a2426c236480ee33380/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java>
> [3] Just talking about code coupling, not even mentioning problems like
> dependencies and security updates
> [4] HiveAggSqlFunction
> <https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveAggSqlFunction.java>
>
> On Thu, Mar 10, 2022 at 9:05 AM Martijn Visser 
> wrote:
>
>> Thank you Yuxia for volunteering, that's really much appreciated. It
>> would be great if you can create an umbrella ticket for that.
>>
>> It would be great to get some insights from currently Flink and Hive
>> users which versions are being used.
>> @Jark I would indeed deprecate the old Hive versions in Flink 1.15 and
>> then drop them in Flink 1.16. That would also remove some tech debt and
>> make it less work with regards to externalizing connectors.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, 10 Mar 2022 at 07:39, Jark Wu  wrote:
>>
>>> Thanks Martijn for the reply and summary.
>>>
>>> I totally agree with your plan and thank Yuxia for volunteering the Hive
>>> tech debt issue.
>>> I think we can create an umbrella issue for this and target version
>>> 1.16. We can discuss
>>> details and create subtasks there.
>>>
>>> Regarding dropping old Hive versions, I'm also fine with that. But I
>>> would like to investigate
>>> some Hive users first to see whether it's acceptable at this point. My
>>> first thought was we
>>> can deprecate the old Hive versions in 1.15, and we can discuss dropping
>>> it in 1.16 or 1.17.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Thu, 10 Mar 2022 at 14:19, 罗宇侠(莫辞) 
>>> wrote:
>>

Re: Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-09 Thread Jark Wu
Thanks Martijn for the reply and summary.

I totally agree with your plan and thank Yuxia for volunteering the Hive
tech debt issue.
I think we can create an umbrella issue for this and target version 1.16.
We can discuss
details and create subtasks there.

Regarding dropping old Hive versions, I'm also fine with that. But I would
like to investigate
some Hive users first to see whether it's acceptable at this point. My
first thought was we
can deprecate the old Hive versions in 1.15, and we can discuss dropping it
in 1.16 or 1.17.

Best,
Jark


On Thu, 10 Mar 2022 at 14:19, 罗宇侠(莫辞) 
wrote:

> Thanks Martijn for your insights.
>
> About the tech debt/maintenance with regards to Hive query syntax, I
> would like to chip-in and expect it can be resolved for Flink 1.16.
>
> Best regards,
>
> Yuxia
> ​
>
> --原始邮件 --
> *发件人:*Martijn Visser 
> *发送时间:*Thu Mar 10 04:03:34 2022
> *收件人:*User 
> *主题:*Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax
>
>> (Forwarding this also to the User mailing list as I made a typo when
>> replying to this email thread)
>>
>> -- Forwarded message -
>> From: Martijn Visser 
>> Date: Wed, 9 Mar 2022 at 20:57
>> Subject: Re: [DISCUSS] Flink's supported APIs and Hive query syntax
>> To: dev , Francesco Guardiani <
>> france...@ververica.com>, Timo Walther , <
>> us...@flink.apache.org>
>>
>>
>> Hi everyone,
>>
>> Thank you all very much for your input. From my perspective, I consider
>> batch as a special case of streaming. So with Flink SQL, we can support
>> both batch and streaming use cases and I think we should use Flink SQL as
>> our target.
>>
>> To reply on some of the comments:
>>
>> @Jing on your remark:
>> > Since Flink has a clear vision of unified batch and stream processing,
>> supporting batch jobs will be one of the critical core features to help us
>> reach the vision and let Flink have an even bigger impact in the industry.
>>
>> I fully agree with that statement. I do think that having Hive syntax
>> support doesn't help in that unified batch and stream processing. We're
>> making it easier for batch users to run their Hive batch jobs on Flink, but
>> that doesn't fit the "unified" part since it's focussed on batch, while
>> Flink SQL focusses on batch and streaming. I would have rather invested
>> time in making batch improvements to Flink and Flink SQL vs investing in
>> Hive syntax support. I do understand from the given replies that Hive
>> syntax support is valuable for those that are already running batch
>> processing and would like to run these queries on Flink. I do think that's
>> limited to mostly Chinese companies at the moment.
>>
>> @Jark I think you've provided great input and are spot on with:
>> > Regarding the maintenance concern you raised, I think that's a good
>> point and they are in the plan. The Hive dialect has already been a plugin
>> and option now, and the implementation is located in hive-connector module.
>> We still need some work to make the Hive dialect purely rely on public
>> APIs, and the Hive connector should be decopule with table planner. At that
>> time, we can move the whole Hive connector into a separate repository (I
>> guess this is also in the externalize connectors plan).
>>
>> I'm looping in Francesco and Timo who can elaborate more in depth on the
>> current maintenance issues. I think we need to have a proper plan on how
>> this tech debt/maintenance can be addressed and to get commitment that this
>> will be resolved in Flink 1.16, since we indeed need to move out all
>> previously agreed connectors before Flink 1.16 is released.
>>
>> > From my perspective, Hive is still widely used and there exists many
>> running Hive SQL jobs, so why not to provide users a better experience to
>> help them migrate Hive jobs to Flink? Also, it doesn't conflict with Flink
>> SQL as it's just a dialect option.
>>
>> I do think there is a conflict with Flink SQL; you can't use both of them
>> at the same time, so you don't have access to all features in Flink. That
>> increases feature sparsity and user friction. It also puts a bigger burden
>> on the Flink community, because having both options available means more
>> maintenance work. For example, an upgrade of Calcite is more impactful. The
>> Flink codebase is already rather large and CI build times are already too
>> long. More code means more risk of bugs. If a user at some point wants to
>> change his Hive batch job to a streaming Flink SQL job, there's still
>> migration work for the user, it just needs to happen at a later stage.
>>
>> @Jingsong I think you have a good argument that migrating SQL for Batch
>> ETL is indeed an expensive effort.
>>
>> Last but not least, there was no one who has yet commented on the
>> supported Hive versions and security issues. I've reached out to the Hive
>> community and from the info I've received so far is that only Hive 3.1.x
>> and Hive 2.3.x are still supported. The older Hive 

Re: Re: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-08 Thread Jark Wu
Hi Martijn,

Thanks for starting this discussion. I think it's great
for the community to to reach a consensus on the roadmap
of Hive query syntax.

I agree that the Hive project is not actively developed nowadays.
However, Hive still occupies the majority of the batch market
and the Hive ecosystem is even more active now. For example,
the Apache Kyuubi[1] is a new project that is a JDBC server
which is compatible with HiveServer2. And the Apache Iceberg
and Apache Hudi are mainly using Hive Metastore as the table catalog.
The Spark SQL is 99% compatible with Hive SQL. We have to admit
that Hive is the open-source de facto standard for batch processing.

As far as I can see, almost all the companies (including ByteDance,
Kuaishou, NetEase, etc..) in China are using Hive SQL for batch
processing, even the underlying is using Spark as the engine.
I don't know how the batch users can migrate to Flink if Flink
doesn't provide the Hive compatibility. IMO, in the short term,
Hive syntax compatibility is the ticket for us to have a seat
in the batch processing. In the long term, we can drop it and
focus on Flink SQL itself both for batch and stream processing.

Regarding the maintenance concern you raised, I think that's a good
point and they are in the plan. The Hive dialect has already been
a plugin and option now, and the implementation is located in
hive-connector module. We still need some work to make the Hive
dialect purely rely on public APIs, and the Hive connector should be
decopule with table planner. At that time, we can move the whole Hive
connector into a separate repository (I guess this is also in the
externalize connectors plan).

What do you think?

Best,
Jark

[1]:
https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_thriftserver.html
[2]: https://iceberg.apache.org/docs/latest/spark-configuration/
[3]: https://hudi.apache.org/docs/next/syncing_metastore/

On Tue, 8 Mar 2022 at 11:46, Mang Zhang  wrote:

> Hi Martijn,
>
> Thanks for driving this discussion.
>
> +1 on efforts on more hive/spark syntax compatibility.The hive/spark
> syntax is the most popular in batch computing.Within our company, many
> users have the desire to use Flink to realize the integration of streaming
> and batching,and some users have been running in production for months.And
> we have integrated Flink with our internal remote shuffle service, flink
> save user a lot of development and maintenance costs,user feedback is very
> good.Enrich flink's ecology and provide users with more choices, so I think
> pluggable support for hive/spark dialects is very necessary.We need better
> designs for future multi-source fusion.
>
>
>
>
>
>
>
> Best regards,
>
> Mang Zhang
>
>
>
>
>
> At 2022-03-07 20:52:42, "Jing Zhang"  wrote:
> >Hi Martijn,
> >
> >Thanks for driving this discussion.
> >
> >+1 on efforts on more hive syntax compatibility.
> >
> >With the efforts on batch processing in recent versions(1.10~1.15), many
> >users have run batch processing jobs based on Flink.
> >In our team, we are trying to migrate most of the existing online batch
> >jobs from Hive/Spark to Flink. We hope this migration does not require
> >users to modify their sql.
> >Although Hive is not as popular as it used to be, Hive SQL is still alive
> >because many users still use Hive SQL to run spark jobs.
> >Therefore, compatibility with more HIVE syntax is critical to this
> >migration work.
> >
> >Best,
> >Jing Zhang
> >
> >
> >
> >Martijn Visser  于2022年3月7日周一 19:23写道:
> >
> >> Hi everyone,
> >>
> >> Flink currently has 4 APIs with multiple language support which can be
> used
> >> to develop applications:
> >>
> >> * DataStream API, both Java and Scala
> >> * Table API, both Java and Scala
> >> * Flink SQL, both in Flink query syntax and Hive query syntax
> (partially)
> >> * Python API
> >>
> >> Since FLIP-152 [1] the Flink SQL support has been extended to also
> support
> >> the Hive query syntax. There is now a follow-up FLINK-26360 [2] to
> address
> >> more syntax compatibility issues.
> >>
> >> I would like to open a discussion on Flink directly supporting the Hive
> >> query syntax. I have some concerns if having a 100% Hive query syntax is
> >> indeed something that we should aim for in Flink.
> >>
> >> I can understand that having Hive query syntax support in Flink could
> help
> >> users due to interoperability and being able to migrate. However:
> >>
> >> - Adding full Hive query syntax support will mean that we go from 6
> fully
> >> supported API/language combinations to 7. I think we are currently
> already
> >> struggling with maintaining the existing combinations, let another one
> >> more.
> >> - Apache Hive is/appears to be a project that's not that actively
> developed
> >> anymore. The last release was made in January 2021. It's popularity is
> >> rapidly declining in Europe and the United State, also due Hadoop
> becoming
> >> less popular.
> >> - Related to the previous topic, other software like Snowflake,
> >> 

Re: flinkCDC2.1.1

2022-01-06 Thread Jark Wu
Flink CDC 的问题可以 report 到
https://github.com/ververica/flink-cdc-connectors/issues到

On Thu, 30 Dec 2021 at 14:08, Liu Join  wrote:

> 使用flinkCDC2.1.1读取MySQL数据,一段时间后报错
> 图床链接:报错图片 
>
>
>
> 从 Windows 版邮件 发送
>
>
>


Re: 邮件归档访问不了

2022-01-06 Thread Jark Wu
nabble 服务挂了,用这个地址吧:https://lists.apache.org/list.html?d...@flink.apache.org

On Fri, 31 Dec 2021 at 18:29, Ada Wong  wrote:

> 想看当时的讨论情况,但是这个访问不了。
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Flink-ML-pipeline-API-and-library-code-to-a-separate-repository-named-flink-ml-tc49420.html
>


Re: flink mysql cdc同步字段不识别

2022-01-06 Thread Jark Wu
这个报错日志应该没有关系,是 rest client 的报错,不是正常数据处理流程的报错。

mysql-cdc 没有 jackson json 解析相关的代码。

On Wed, 5 Jan 2022 at 17:09, Fei Han 
wrote:

>
> @all:
> Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下:
>
>  httpResponseStatus=200 OK}
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
> Unrecognized field "status" (class
> org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as
> ignorable (one known property: "errors"])
>  at [Source: UNKNOWN; line: -1, column: -1] (through reference chain:
> org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:987)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1974)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1686)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1635)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:541)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1390)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:483)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> [?:1.8.0_211]
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> [?:1.8.0_211]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> [?:1.8.0_211]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_211]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_211]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
>


Re: [DISCUSS] Creating an external connector repository

2021-10-20 Thread Jark Wu
Hi Konstantin,

> the connectors need to be adopted and require at least one release per
Flink minor release.
However, this will make the releases of connectors slower, e.g. maintain
features for multiple branches and release multiple branches.
I think the main purpose of having an external connector repository is in
order to have "faster releases of connectors"?


>From the perspective of CDC connector maintainers, the biggest advantage of
maintaining it outside of the Flink project is that:
1) we can have a more flexible and faster release cycle
2) we can be more liberal with committership for connector maintainers
which can also attract more committers to help the release.

Personally, I think maintaining one connector repository under the ASF may
not have the above benefits.

Best,
Jark

On Wed, 20 Oct 2021 at 15:14, Konstantin Knauf  wrote:

> Hi everyone,
>
> regarding the stability of the APIs. I think everyone agrees that
> connector APIs which are stable across minor versions (1.13->1.14) are the
> mid-term goal. But:
>
> a) These APIs are still quite young, and we shouldn't make them @Public
> prematurely either.
>
> b) Isn't this *mostly* orthogonal to where the connector code lives? Yes,
> as long as there are breaking changes, the connectors need to be adopted
> and require at least one release per Flink minor release.
> Documentation-wise this can be addressed via a compatibility matrix for
> each connector as Arvid suggested. IMO we shouldn't block this effort on
> the stability of the APIs.
>
> Cheers,
>
> Konstantin
>
>
>
> On Wed, Oct 20, 2021 at 8:56 AM Jark Wu  wrote:
>
>> Hi,
>>
>> I think Thomas raised very good questions and would like to know your
>> opinions if we want to move connectors out of flink in this version.
>>
>> (1) is the connector API already stable?
>> > Separate releases would only make sense if the core Flink surface is
>> > fairly stable though. As evident from Iceberg (and also Beam), that's
>> > not the case currently. We should probably focus on addressing the
>> > stability first, before splitting code. A success criteria could be
>> > that we are able to build Iceberg and Beam against multiple Flink
>> > versions w/o the need to change code. The goal would be that no
>> > connector breaks when we make changes to Flink core. Until that's the
>> > case, code separation creates a setup where 1+1 or N+1 repositories
>> > need to move lock step.
>>
>> From another discussion thread [1], connector API is far from stable.
>> Currently, it's hard to build connectors against multiple Flink versions.
>> There are breaking API changes both in 1.12 -> 1.13 and 1.13 -> 1.14 and
>>  maybe also in the future versions,  because Table related APIs are still
>> @PublicEvolving and new Sink API is still @Experimental.
>>
>>
>> (2) Flink testability without connectors.
>> > Flink w/o Kafka connector (and few others) isn't
>> > viable. Testability of Flink was already brought up, can we really
>> > certify a Flink core release without Kafka connector? Maybe those
>> > connectors that are used in Flink e2e tests to validate functionality
>> > of core Flink should not be broken out?
>>
>> This is a very good question. How can we guarantee the new Source and Sink
>> API are stable with only test implementation?
>>
>>
>> Best,
>> Jark
>>
>>
>>
>>
>>
>> On Tue, 19 Oct 2021 at 23:56, Chesnay Schepler 
>> wrote:
>>
>> > Could you clarify what release cadence you're thinking of? There's quite
>> > a big range that fits "more frequent than Flink" (per-commit, daily,
>> > weekly, bi-weekly, monthly, even bi-monthly).
>> >
>> > On 19/10/2021 14:15, Martijn Visser wrote:
>> > > Hi all,
>> > >
>> > > I think it would be a huge benefit if we can achieve more frequent
>> > releases
>> > > of connectors, which are not bound to the release cycle of Flink
>> itself.
>> > I
>> > > agree that in order to get there, we need to have stable interfaces
>> which
>> > > are trustworthy and reliable, so they can be safely used by those
>> > > connectors. I do think that work still needs to be done on those
>> > > interfaces, but I am confident that we can get there from a Flink
>> > > perspective.
>> > >
>> > > I am worried that we would not be able to achieve those frequent
>> releases
>> > > of connectors if we are putting these connectors under the Apache
>> > umbre

Re: [DISCUSS] Creating an external connector repository

2021-10-20 Thread Jark Wu
Hi,

I think Thomas raised very good questions and would like to know your
opinions if we want to move connectors out of flink in this version.

(1) is the connector API already stable?
> Separate releases would only make sense if the core Flink surface is
> fairly stable though. As evident from Iceberg (and also Beam), that's
> not the case currently. We should probably focus on addressing the
> stability first, before splitting code. A success criteria could be
> that we are able to build Iceberg and Beam against multiple Flink
> versions w/o the need to change code. The goal would be that no
> connector breaks when we make changes to Flink core. Until that's the
> case, code separation creates a setup where 1+1 or N+1 repositories
> need to move lock step.

>From another discussion thread [1], connector API is far from stable.
Currently, it's hard to build connectors against multiple Flink versions.
There are breaking API changes both in 1.12 -> 1.13 and 1.13 -> 1.14 and
 maybe also in the future versions,  because Table related APIs are still
@PublicEvolving and new Sink API is still @Experimental.


(2) Flink testability without connectors.
> Flink w/o Kafka connector (and few others) isn't
> viable. Testability of Flink was already brought up, can we really
> certify a Flink core release without Kafka connector? Maybe those
> connectors that are used in Flink e2e tests to validate functionality
> of core Flink should not be broken out?

This is a very good question. How can we guarantee the new Source and Sink
API are stable with only test implementation?


Best,
Jark





On Tue, 19 Oct 2021 at 23:56, Chesnay Schepler  wrote:

> Could you clarify what release cadence you're thinking of? There's quite
> a big range that fits "more frequent than Flink" (per-commit, daily,
> weekly, bi-weekly, monthly, even bi-monthly).
>
> On 19/10/2021 14:15, Martijn Visser wrote:
> > Hi all,
> >
> > I think it would be a huge benefit if we can achieve more frequent
> releases
> > of connectors, which are not bound to the release cycle of Flink itself.
> I
> > agree that in order to get there, we need to have stable interfaces which
> > are trustworthy and reliable, so they can be safely used by those
> > connectors. I do think that work still needs to be done on those
> > interfaces, but I am confident that we can get there from a Flink
> > perspective.
> >
> > I am worried that we would not be able to achieve those frequent releases
> > of connectors if we are putting these connectors under the Apache
> umbrella,
> > because that means that for each connector release we have to follow the
> > Apache release creation process. This requires a lot of manual steps and
> > prohibits automation and I think it would be hard to scale out frequent
> > releases of connectors. I'm curious how others think this challenge could
> > be solved.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Mon, 18 Oct 2021 at 22:22, Thomas Weise  wrote:
> >
> >> Thanks for initiating this discussion.
> >>
> >> There are definitely a few things that are not optimal with our
> >> current management of connectors. I would not necessarily characterize
> >> it as a "mess" though. As the points raised so far show, it isn't easy
> >> to find a solution that balances competing requirements and leads to a
> >> net improvement.
> >>
> >> It would be great if we can find a setup that allows for connectors to
> >> be released independently of core Flink and that each connector can be
> >> released separately. Flink already has separate releases
> >> (flink-shaded), so that by itself isn't a new thing. Per-connector
> >> releases would need to allow for more frequent releases (without the
> >> baggage that a full Flink release comes with).
> >>
> >> Separate releases would only make sense if the core Flink surface is
> >> fairly stable though. As evident from Iceberg (and also Beam), that's
> >> not the case currently. We should probably focus on addressing the
> >> stability first, before splitting code. A success criteria could be
> >> that we are able to build Iceberg and Beam against multiple Flink
> >> versions w/o the need to change code. The goal would be that no
> >> connector breaks when we make changes to Flink core. Until that's the
> >> case, code separation creates a setup where 1+1 or N+1 repositories
> >> need to move lock step.
> >>
> >> Regarding some connectors being more important for Flink than others:
> >> That's a fact. Flink w/o Kafka connector (and few others) isn't
> >> viable. Testability of Flink was already brought up, can we really
> >> certify a Flink core release without Kafka connector? Maybe those
> >> connectors that are used in Flink e2e tests to validate functionality
> >> of core Flink should not be broken out?
> >>
> >> Finally, I think that the connectors that move into separate repos
> >> should remain part of the Apache Flink project. Larger organizations
> >> tend to approve the use of and contribution to open 

Re: [ANNOUNCE] Flink mailing lists archive service has migrated to Apache Archive service

2021-09-06 Thread Jark Wu
Thanks Leonard,

I have seen many users complaining that the Flink mailing list doesn't
work (they were using Nabble).
I think this information would be very helpful.

Best,
Jark

On Mon, 6 Sept 2021 at 16:39, Leonard Xu  wrote:

> Hi, all
>
> The mailing list archive service Nabble Archive was broken at the end of
> June, the Flink community has migrated the mailing lists archives[1] to
> Apache Archive service by commit[2], you can refer [3] to know more mailing
> lists archives of Flink.
>
> Apache Archive service is maintained by ASF thus the stability is
> guaranteed, it’s a web-based mail archive service which allows you to
> browse, search, interact, subscribe, unsubscribe, etc. with mailing lists.
>
> Apache Archive service shows mails of the last month by default, you can
> specify the date range to browse, search the history mails.
>
>
> Hope it would be helpful.
>
> Best,
> Leonard
>
> [1] The Flink mailing lists in Apache archive service
> dev mailing list archives:
> https://lists.apache.org/list.html?d...@flink.apache.org
> user mailing list archives :
> https://lists.apache.org/list.html?u...@flink.apache.org
> user-zh mailing list archives :
> https://lists.apache.org/list.html?user-zh@flink.apache.org
> [2]
> https://github.com/apache/flink-web/commit/9194dda862da00d93f627fd315056471657655d1
> [3] https://flink.apache.org/community.html#mailing-lists
>


Re: [ANNOUNCE] Flink mailing lists archive service has migrated to Apache Archive service

2021-09-06 Thread Jark Wu
Thanks Leonard,

I have seen many users complaining that the Flink mailing list doesn't
work (they were using Nabble).
I think this information would be very helpful.

Best,
Jark

On Mon, 6 Sept 2021 at 16:39, Leonard Xu  wrote:

> Hi, all
>
> The mailing list archive service Nabble Archive was broken at the end of
> June, the Flink community has migrated the mailing lists archives[1] to
> Apache Archive service by commit[2], you can refer [3] to know more mailing
> lists archives of Flink.
>
> Apache Archive service is maintained by ASF thus the stability is
> guaranteed, it’s a web-based mail archive service which allows you to
> browse, search, interact, subscribe, unsubscribe, etc. with mailing lists.
>
> Apache Archive service shows mails of the last month by default, you can
> specify the date range to browse, search the history mails.
>
>
> Hope it would be helpful.
>
> Best,
> Leonard
>
> [1] The Flink mailing lists in Apache archive service
> dev mailing list archives:
> https://lists.apache.org/list.html?d...@flink.apache.org
> user mailing list archives :
> https://lists.apache.org/list.html?user@flink.apache.org
> user-zh mailing list archives :
> https://lists.apache.org/list.html?user...@flink.apache.org
> [2]
> https://github.com/apache/flink-web/commit/9194dda862da00d93f627fd315056471657655d1
> [3] https://flink.apache.org/community.html#mailing-lists
>


Re: [DISCUSS] Better user experience in the WindowAggregate upon Changelog (contains update message)

2021-07-01 Thread Jark Wu
Sorry, I made a typo above. I mean I prefer proposal (1) that
only needs to set `table.exec.emit.allow-lateness` to handle late events.
`table.exec.emit.late-fire.delay` can be optional which is 0s by default.
`table.exec.state.ttl` will not affect window state anymore, so window state
is still cleaned accurately by watermark.

We don't need to expose `table.exec.emit.late-fire.enabled` on docs and
can remove it in the next version.

Best,
Jark

On Thu, 1 Jul 2021 at 21:20, Jark Wu  wrote:

> Thanks Jing for bringing up this topic,
>
> The emit strategy configs are annotated as Experiential and not public on
> documentations.
> However, I see this is a very useful feature which many users are looking
> for.
> I have posted these configs for many questions like "how to handle late
> events in SQL".
> Thus, I think it's time to make the configuration public and explicitly
> document it. In the long
> term, we would like to propose an EMIT syntax for SQL, but until then we
> can get more
> valuable feedback from users when they are using the configs.
>
> Regarding the exposed configuration, I prefer proposal (2).
> But it would be better not to expose `table.exec.emit.late-fire.enabled`
> on docs and we can
> remove it in the next version.
>
> Best,
> Jark
>
>
> On Tue, 29 Jun 2021 at 11:09, JING ZHANG  wrote:
>
>> When WindowAggregate works upon Changelog which contains update messages,
>> UPDATE BEFORE message may be dropped as a late message. [1]
>>
>> In order to handle late UB message, user needs to set *all* the
>> following 3 parameters:
>>
>> (1) enable late fire by setting
>>
>> table.exec.emit.late-fire.enabled : true
>>
>> (2) set per record emit behavior for late records by setting
>>
>> table.exec.emit.late-fire.delay : 0 s
>>
>> (3) keep window state for extra time after window is fired by setting
>>
>> table.exec.emit.allow-lateness : 1 h// 或者table.exec.state.ttl: 1h
>>
>>
>> The solution has two disadvantages:
>>
>> (1) Users may not realize that UB messages may be dropped as a late
>> event, so they will not set related parameters.
>>
>> (2) When users look for a solution to solve the dropped UB messages
>> problem, the current solution is a bit inconvenient for them because they
>> need to set all the 3 parameters. Besides, some configurations have overlap
>> ability.
>>
>>
>> Now there are two proposals to simplify the 3 parameters a little.
>>
>> (1) Users only need set table.exec.emit.allow-lateness (just like the
>> behavior on Datastream, user only need set allow-lateness), framework could
>> atom set `table.exec.emit.late-fire.enabled` to true and set
>> `table.exec.emit.late-fire.delay` to 0s.
>>
>> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
>> and `table.exec.emit.late-fire.enabled`.
>>
>>
>> (2) Users need set `table.exec.emit.late-fire.enabled` to true and set
>> `table.exec.state.ttl`, framework  could atom set
>> `table.exec.emit.late-fire.delay` to 0s.
>>
>> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
>> and `table.exec.emit.allow-lateness `.
>>
>>
>> Please let me know what you think about the issue.
>>
>> Thank you.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22781
>>
>>
>> Best regards,
>> JING ZHANG
>>
>>
>>
>>


Re: [DISCUSS] Better user experience in the WindowAggregate upon Changelog (contains update message)

2021-07-01 Thread Jark Wu
Thanks Jing for bringing up this topic,

The emit strategy configs are annotated as Experiential and not public on
documentations.
However, I see this is a very useful feature which many users are looking
for.
I have posted these configs for many questions like "how to handle late
events in SQL".
Thus, I think it's time to make the configuration public and explicitly
document it. In the long
term, we would like to propose an EMIT syntax for SQL, but until then we
can get more
valuable feedback from users when they are using the configs.

Regarding the exposed configuration, I prefer proposal (2).
But it would be better not to expose `table.exec.emit.late-fire.enabled` on
docs and we can
remove it in the next version.

Best,
Jark


On Tue, 29 Jun 2021 at 11:09, JING ZHANG  wrote:

> When WindowAggregate works upon Changelog which contains update messages,
> UPDATE BEFORE message may be dropped as a late message. [1]
>
> In order to handle late UB message, user needs to set *all* the following
> 3 parameters:
>
> (1) enable late fire by setting
>
> table.exec.emit.late-fire.enabled : true
>
> (2) set per record emit behavior for late records by setting
>
> table.exec.emit.late-fire.delay : 0 s
>
> (3) keep window state for extra time after window is fired by setting
>
> table.exec.emit.allow-lateness : 1 h// 或者table.exec.state.ttl: 1h
>
>
> The solution has two disadvantages:
>
> (1) Users may not realize that UB messages may be dropped as a late event,
> so they will not set related parameters.
>
> (2) When users look for a solution to solve the dropped UB messages
> problem, the current solution is a bit inconvenient for them because they
> need to set all the 3 parameters. Besides, some configurations have overlap
> ability.
>
>
> Now there are two proposals to simplify the 3 parameters a little.
>
> (1) Users only need set table.exec.emit.allow-lateness (just like the
> behavior on Datastream, user only need set allow-lateness), framework could
> atom set `table.exec.emit.late-fire.enabled` to true and set
> `table.exec.emit.late-fire.delay` to 0s.
>
> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
> and `table.exec.emit.late-fire.enabled`.
>
>
> (2) Users need set `table.exec.emit.late-fire.enabled` to true and set
> `table.exec.state.ttl`, framework  could atom set
> `table.exec.emit.late-fire.delay` to 0s.
>
> And in the later version, we deprecate `table.exec.emit.late-fire.delay`
> and `table.exec.emit.allow-lateness `.
>
>
> Please let me know what you think about the issue.
>
> Thank you.
>
> [1] https://issues.apache.org/jira/browse/FLINK-22781
>
>
> Best regards,
> JING ZHANG
>
>
>
>


Re: [Flink SQL] Lookup join hbase problem

2021-06-28 Thread Jark Wu
Yes. Currently, the HBase lookup source only supports lookup on rowkey.
If there is more than one join on condition, it may fail.

We should support lookup HBase on multiple fields (by Get#setFilter).
Feel free to open issues.

Best,
Jark

On Mon, 28 Jun 2021 at 12:48, 纳兰清风  wrote:

> Hi,
>
>   When I was using hbase table as my lookup table, I got this error:
>
> Caused by: java.lang.IllegalArgumentException: Currently, HBase table
> can only be lookup by single row key.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>
> My SQL is
>
> insert into sink_kafka(user_ucid,city_code,`source`,system_type)
> SELECT t1.ucid AS user_ucid,
>  t1.city_code,
>  t1.`source`,
>  t1.system_type
> FROM tmp_ucid_check t1
> LEFT JOIN dim_hbase_valid_im_commercial_cust_di_cache
> for SYSTEM_TIME AS OF t1.proctime AS t2
> ON concat(t1.ucid,'&',t1.city_code) = t2.rowKey
> WHERE t2.city_code is NOT null
> AND t1.city_code = t2.city_code;
>
> It maybe because the conditions in where clause, being pushed down as  a 
> predicate
> into join clause ?
> How can I solve this problem ?
>
> Thank you
>
>
>
>


Re: Questions about UPDATE_BEFORE row kind in ElasticSearch sink

2021-06-28 Thread Jark Wu
UPDATE_BEFORE is required in cases such as Aggregation with Filter. For
example:

SELECT *
FROM (
  SELECT word, count(*) as cnt
  FROM T
  GROUP BY word
) WHERE cnt < 3;

There is more discussion in this issue:
https://issues.apache.org/jira/browse/FLINK-9528

Best,
Jark

On Mon, 28 Jun 2021 at 13:52, Kai Fu  wrote:

> Hi team,
>
> We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE
> as in code
> .
> We're aware that this is useful to retract output records in some cases,
> but we cannot come up with such a scenario, could anyone name a few cases
> for it.
>
> The other thing we want to do is drop the UPDATE_BEFORE row kind in the ES
> connector to reduce the sink traffic since almost all of our records are
> update. In our case, the records are generated by joining with a couple of
> upsert-kafka data sources. Only primary-key participants in the join
> condition for all join cases, with some granularity/cardinality fan-out in
> the middle. We want to know whether it impacts the final result correctness
> if we drop the records with UPDATE_BEFORE row kind.
>
> --
> *Best wishes,*
> *- Kai*
>


Re: 中文教程更新不及时问题

2021-06-23 Thread Jark Wu
Hi Kevin,

欢迎来到 Apache Flink 开源社区!正如唐云所说,社区非常欢迎每一个贡献,也很珍惜每一份贡献。
但是中文文档的维护是一个非常庞大的工作,涉及到所有的模块,所以需要很多模块的 committer 的协作,
所以有时候难免会有更新不及时。

如果你有发现未翻译的页面且没有相关 JIRA issue,可以直接去创建 issue 并提交 PR。
如果已有相关 issue 和 PR,可以帮助 review,社区目前更缺高质量的 reviewer,这更能加速很多翻译的进度。

Best,
Jark

On Wed, 23 Jun 2021 at 11:04, Yun Tang  wrote:

> Hi Kevin,
>
> 欢迎来到Apache Flink开源社区!
>
> 因为开源社区的工作,一些参与者很多时候都是工作时间之外参与的,可能难免遇到进度更新不及时,或者长时间不再活跃的问题。
>
> 非常欢迎您在相关JIRA
> ticket下面评论和申请权限创建PR,社区一直都欢迎每一位贡献者,对于文档的维护尤其是中文文档的翻译也是非常需要的,如果有任何想要贡献的部分,欢迎直接去JIRA
> ticket下面、github PR下面评论,或者直接创建相关ticket。
>
> 祝好
> 唐云
> --
> *From:* pang fan 
> *Sent:* Monday, June 21, 2021 21:35
> *To:* user-zh@flink.apache.org 
> *Subject:* 中文教程更新不及时问题
>
> 大家好,
>
> 我是Flink的初学者,在跟着
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/try-flink/table_api/
>
> 官方教程(中文版)学习时发现很多中文教程都没有翻译掉,但是去看PR记录又发现很多都已经提了PR但是一直没有合并到主分支,里面很多PR都是几个月前的提的,后来好久都没有更新。
>
> 请问现在还有人在跟这些问题吗?如果有,可以更新下JIRA上的工单状态和代码PR状态,这样有需要我们也能申领工单给社区做一些贡献。
>
>
> 谢谢!
> Kevin Fan
>


Re: hbase async lookup能否保证输出结果有序?

2021-06-17 Thread Jark Wu
可以看下 AsyncWaitOperator 的源码实现。

Best,
Jark

On Tue, 15 Jun 2021 at 18:53, zilong xiao  wrote:

> 想了解下这块如何保证100%有序的呢,感觉异步查询是不是无法保证查询结果的先后,比如网络原因等等。
>
> Jingsong Li  于2021年6月15日周二 下午5:07写道:
>
> > 是有序的。
> >
> > 无序的mode目前并没有支持, 目前可能会影响流计算的正确性
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 15, 2021 at 3:42 PM zilong xiao  wrote:
> >
> > > hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


Re: 邮件退订

2021-06-17 Thread Jark Wu
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jark

On Thu, 17 Jun 2021 at 09:29, wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:

>
> 邮箱变更,退订!
>
>
>
>


Re: 退订

2021-06-17 Thread Jark Wu
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jark

On Tue, 15 Jun 2021 at 23:56, frank.liu  wrote:

> 退订
>
>
> | |
> frank.liu
> |
> |
> frank...@163.com
> |
> 签名由网易邮箱大师定制


Re: Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-17 Thread Jark Wu
社区最近重新设计了 mysql-cdc 的实现,可以支持全量阶段并发读取、checkpoint,移除全局锁依赖。
可以关注 GitHub 仓库的动态 https://github.com/ververica/flink-cdc-connectors。
7月的 meetup 上也会分享相关设计和实现,敬请期待。

Best,
Jark

On Thu, 17 Jun 2021 at 09:34, casel.chen  wrote:

> Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql
> cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-16 17:27:14,"Leonard Xu"  写道:
> >看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception,
> 可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。
> >
> >祝好,
> >Leonard
> >
> >> 在 2021年6月16日,17:05,mokaful <649713...@qq.com> 写道:
> >>
> >> 相同问题,请问有处理方式吗
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Add control mode for flink

2021-06-07 Thread Jark Wu
Thanks Xintong for the summary,

I'm big +1 for this feature.

Xintong's summary for Table/SQL's needs is correct.
The "custom (broadcast) event" feature is important to us
and even blocks further awesome features and optimizations in Table/SQL.
I also discussed offline with @Yun Gao  several times
for this topic,
and we all agreed this is a reasonable feature but may need some careful
design.

Best,
Jark


On Mon, 7 Jun 2021 at 14:52, Xintong Song  wrote:

> Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.
>
> I was part of the preliminary offline discussions before this proposal
> went public. So maybe I can help clarify things a bit.
>
> In short, despite the phrase "control mode" might be a bit misleading,
> what we truly want to do from my side is to make the concept of "control
> flow" explicit and expose it to users.
>
> ## Background
> Jiangang & his colleagues at Kuaishou maintain an internal version of
> Flink. One of their custom features is allowing dynamically changing
> operator behaviors via the REST APIs. He's willing to contribute this
> feature to the community, and came to Yun Gao and me for suggestions. After
> discussion, we feel that the underlying question to be answered is how do
> we model the control flow in Flink. Dynamically controlling jobs via REST
> API can be one of the features built on top of the control flow, and there
> could be others.
>
> ## Control flow
> Control flow refers to the communication channels for sending
> events/signals to/between tasks/operators, that changes Flink's behavior in
> a way that may or may not affect the computation logic. Typical control
> events/signals Flink currently has are watermarks and checkpoint barriers.
>
> In general, for modeling control flow, the following questions should be
> considered.
> 1. Who (which component) is responsible for generating the control
> messages?
> 2. Who (which component) is responsible for reacting to the messages.
> 3. How do the messages propagate?
> 4. When it comes to affecting the computation logics, how should the
> control flow work together with the exact-once consistency.
>
> 1) & 2) may vary depending on the use cases, while 3) & 4) probably share
> many things in common. A unified control flow model would help deduplicate
> the common logics, allowing us to focus on the use case specific parts.
>
> E.g.,
> - Watermarks: generated by source operators, handled by window operators.
> - Checkpoint barrier: generated by the checkpoint coordinator, handled by
> all tasks
> - Dynamic controlling: generated by JobMaster (in reaction to the REST
> command), handled by specific operators/UDFs
> - Operator defined events: The following features are still in planning,
> but may potentially benefit from the control flow model. (Please correct me
> if I'm wrong, @Yun, @Jark)
>   * Iteration: When a certain condition is met, we might want to signal
> downstream operators with an event
>   * Mini-batch assembling: Flink currently uses special watermarks for
> indicating the end of each mini-batch, which makes it tricky to deal with
> event time related computations.
>   * Hive dimension table join: For periodically reloaded hive tables, it
> would be helpful to have specific events signaling that a reloading is
> finished.
>   * Bootstrap dimension table join: This is similar to the previous one.
> In cases where we want to fully load the dimension table before starting
> joining the mainstream, it would be helpful to have an event signaling the
> finishing of the bootstrap.
>
> ## Dynamic REST controlling
> Back to the specific feature that Jiangang proposed, I personally think
> it's quite convenient. Currently, to dynamically change the behavior of an
> operator, we need to set up a separate source for the control events and
> leverage broadcast state. Being able to send the events via REST APIs
> definitely improves the usability.
>
> Leveraging dynamic configuration frameworks is for sure one possible
> approach. The reason we are in favor of introducing the control flow is
> that:
> - It benefits not only this specific dynamic controlling feature, but
> potentially other future features as well.
> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
> framework work together with Flink's consistency mechanism.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚  wrote:
>
>> Thank you for the reply. I have checked the post you mentioned. The
>> dynamic config may be useful sometimes. But it is hard to keep data
>> consistent in flink, for example, what if the dynamic config will take
>> effect when failover. Since dynamic config is a desire for users, maybe
>> flink can support it in some way.
>>
>> For the control mode, dynamic config is just one of the control modes. In
>> the google doc, I have list some other cases. For example, control events
>> are generated in operators or external services. Besides user's dynamic
>> config, flink 

Re: How to recovery from last count when using CUMULATE window after restart flink-sql job?

2021-05-09 Thread Jark Wu
Hi,

When restarting a Flink job, Flink will start the job with an empty state,
because this is a new job.
This is not a special for CUMULATE window, but for all Flink jobs.
If you want to restore a Flink job from a state/savepoint, you have to
specify the savepoint path, see [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint


On Sat, 8 May 2021 at 13:30, Kurt Young  wrote:

> Hi, please use user mailing list only to discuss these issues.
>
> Best,
> Kurt
>
>
> On Sat, May 8, 2021 at 1:05 PM 1095193...@qq.com <1095193...@qq.com>
> wrote:
>
>> Hi
>>I have tried cumalate window function in Flink-1.13 sql to accumulate
>> data from Kafka. When I restart a cumulate window sql job,  last count
>> state is not considered and the count state accumulates from 1. Any
>> solutions can help recovery from last count state when restarting Flink-sql
>> job?
>> Thank you
>> --
>> 1095193...@qq.com
>>
>


Re: Protobuf support with Flink SQL and Kafka Connector

2021-05-05 Thread Jark Wu
Hi Shipeng,

Matthias is correct. FLINK-18202 should address this topic. There is
already a pull request there which is in good shape. You can also download
the PR and build the format jar yourself, and then it should work with
Flink 1.12.

Best,
Jark

On Mon, 3 May 2021 at 21:41, Matthias Pohl  wrote:

> Hi Shipeng,
> it looks like there is an open Jira issue FLINK-18202 [1] addressing this
> topic. You might want to follow up on that one. I'm adding Timo and Jark to
> this thread. They might have more insights.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-18202
>
> On Sat, May 1, 2021 at 2:00 AM Fuyao Li  wrote:
>
>> Hello Shipeng,
>>
>>
>>
>> I am not an expert in Flink, just want to share some of my thoughts.
>> Maybe others can give you better ideas.
>>
>> I think there is no directly available Protobuf support for Flink SQL.
>> However, you can write a user-defined format to support it [1].
>>
>> If you use DataStream API, you can leverage Kryo Serializer to serialize
>> and deserialize with Protobuf format. [2]. There is an out-of-box
>> integration for Protobuf here. You will need to convert it to Flink SQL
>> after data ingestion.
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#user-defined-sources-sinks
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>>
>>
>>
>> Best,
>>
>> Fuyao
>>
>>
>>
>>
>>
>> *From: *Shipeng Xie 
>> *Date: *Friday, April 30, 2021 at 14:58
>> *To: *user@flink.apache.org 
>> *Subject: *[External] : Protobuf support with Flink SQL and Kafka
>> Connector
>>
>> Hi,
>>
>>
>>
>> In
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/
>> ,
>> it does not mention protobuf format. Does Flink SQL support protobuf
>> format? If not, is there any plan to support it in the near future?
>>
>> Thanks!
>>
>


Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-17 Thread Jark Wu
Hi Dylan,

The primary key ordering problem I mean above is about changelog. Batch
queries only emit a final result, and thus don't have changelog, so it's
safe to use batch mode.

The problem only exists in streaming mode with more than 1 parallelism.

Best,
Jark

On Fri, 16 Apr 2021 at 21:40, Dylan Forciea  wrote:

> Jark,
>
>
>
> Thanks for the heads up! I didn’t see this behavior when running in batch
> mode with parallelism turned on. Is it safe to do this kind of join in
> batch mode right now, or am I just getting lucky?
>
>
>
> Dylan
>
>
>
> *From: *Jark Wu 
> *Date: *Friday, April 16, 2021 at 5:10 AM
> *To: *Dylan Forciea 
> *Cc: *Timo Walther , Piotr Nowojski <
> pnowoj...@apache.org>, "user@flink.apache.org" 
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is
> > 1
>
>
>
> HI Dylan,
>
>
>
> I think this has the same reason as
> https://issues.apache.org/jira/browse/FLINK-20374.
>
> The root cause is that changelogs are shuffled by `attr` at second join,
>
> and thus records with the same `id` will be shuffled to different join
> tasks (also different sink tasks).
>
> So the data arrived at sinks are not ordered on the sink primary key.
>
>
>
> We may need something like primary key ordering mechanism in the whole
> planner to fix this.
>
>
>
> Best,
>
> Jark
>
>
>
> On Thu, 15 Apr 2021 at 01:33, Dylan Forciea  wrote:
>
> On a side note - I changed to use the batch mode per your suggestion Timo,
> and my job ran much faster and with deterministic counts with parallelism
> turned on. So I'll probably utilize that for now. However, it would still
> be nice to dig down into why streaming isn't working in case I need that in
> the future.
>
> Dylan
>
> On 4/14/21, 10:27 AM, "Dylan Forciea"  wrote:
>
> Timo,
>
> Here is the plan (hopefully I properly cleansed it of company
> proprietary info without garbling it)
>
> Dylan
>
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.sink], fields=[id,
> attr, attr_mapped])
> +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS
> NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT
> NULL($3), $3, $1)])
>+- LogicalJoin(condition=[=($4, $5)], joinType=[left])
>   :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3],
> $f4=[CASE(IS NOT NULL($3), $3, $1)])
>   :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
>   : :- LogicalTableScan(table=[[default_catalog,
> default_database, table1]])
>   : +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
>   :+- LogicalProject(id2=[$1], attr=[$0])
>   :   +- LogicalTableScan(table=[[default_catalog,
> default_database, table2]])
>   +- LogicalTableScan(table=[[default_catalog, default_database,
> table3]])
>
> == Optimized Logical Plan ==
> Sink(table=[default_catalog.default_database.sink], fields=[id, attr,
> attr_mapped], changelogMode=[NONE])
> +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT
> NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped),
> attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped],
> changelogMode=[I,UB,UA,D])
>+- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)],
> select=[id1, attr, id2, attr0, $f4, attr, attr_mapped],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey],
> changelogMode=[I,UB,UA,D])
>   :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
>   :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT
> NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
>   : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)],
> select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
>   ::- Exchange(distribution=[hash[id1]], changelogMode=[I])
>   ::  +- TableSourceScan(table=[[default_catalog,
> default_database, table1]], fields=[id1, attr], changelogMode=[I])
>   :+- Exchange(distribution=[hash[id2]],
> changelogMode=[I,UB,UA])
>   :   +- GroupAggregate(groupBy=[id2], select=[id2,
> MAX(attr) AS attr], changelogMode=[I,UB,UA])
>   :  +- Exchange(distribution=[hash[id2]],
> changelogMode=[I])
>   : +- TableSourceScan(table=[[default_catalog,
> default_database, table2]], fields=[attr, id2], changelogMode=[I])
>   +- Exchange(distribution=[hash[attr]], changelogMode

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-16 Thread Jark Wu
HI Dylan,

I think this has the same reason as
https://issues.apache.org/jira/browse/FLINK-20374.
The root cause is that changelogs are shuffled by `attr` at second join,
and thus records with the same `id` will be shuffled to different join
tasks (also different sink tasks).
So the data arrived at sinks are not ordered on the sink primary key.

We may need something like primary key ordering mechanism in the whole
planner to fix this.

Best,
Jark

On Thu, 15 Apr 2021 at 01:33, Dylan Forciea  wrote:

> On a side note - I changed to use the batch mode per your suggestion Timo,
> and my job ran much faster and with deterministic counts with parallelism
> turned on. So I'll probably utilize that for now. However, it would still
> be nice to dig down into why streaming isn't working in case I need that in
> the future.
>
> Dylan
>
> On 4/14/21, 10:27 AM, "Dylan Forciea"  wrote:
>
> Timo,
>
> Here is the plan (hopefully I properly cleansed it of company
> proprietary info without garbling it)
>
> Dylan
>
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.sink], fields=[id,
> attr, attr_mapped])
> +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS
> NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT
> NULL($3), $3, $1)])
>+- LogicalJoin(condition=[=($4, $5)], joinType=[left])
>   :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3],
> $f4=[CASE(IS NOT NULL($3), $3, $1)])
>   :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
>   : :- LogicalTableScan(table=[[default_catalog,
> default_database, table1]])
>   : +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
>   :+- LogicalProject(id2=[$1], attr=[$0])
>   :   +- LogicalTableScan(table=[[default_catalog,
> default_database, table2]])
>   +- LogicalTableScan(table=[[default_catalog, default_database,
> table3]])
>
> == Optimized Logical Plan ==
> Sink(table=[default_catalog.default_database.sink], fields=[id, attr,
> attr_mapped], changelogMode=[NONE])
> +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT
> NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped),
> attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped],
> changelogMode=[I,UB,UA,D])
>+- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)],
> select=[id1, attr, id2, attr0, $f4, attr, attr_mapped],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey],
> changelogMode=[I,UB,UA,D])
>   :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
>   :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT
> NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
>   : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)],
> select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
>   ::- Exchange(distribution=[hash[id1]], changelogMode=[I])
>   ::  +- TableSourceScan(table=[[default_catalog,
> default_database, table1]], fields=[id1, attr], changelogMode=[I])
>   :+- Exchange(distribution=[hash[id2]],
> changelogMode=[I,UB,UA])
>   :   +- GroupAggregate(groupBy=[id2], select=[id2,
> MAX(attr) AS attr], changelogMode=[I,UB,UA])
>   :  +- Exchange(distribution=[hash[id2]],
> changelogMode=[I])
>   : +- TableSourceScan(table=[[default_catalog,
> default_database, table2]], fields=[attr, id2], changelogMode=[I])
>   +- Exchange(distribution=[hash[attr]], changelogMode=[I])
>  +- TableSourceScan(table=[[default_catalog, default_database,
> table3]], fields=[attr, attr_mapped], changelogMode=[I])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table1]], fields=[id1, attr])
>
> Stage 3 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table2]], fields=[attr, id2])
>
> Stage 5 : Attr
> content : GroupAggregate(groupBy=[id2], select=[id2,
> MAX(attr) AS attr])
> ship_strategy : HASH
>
> Stage 7 : Attr
> content : Join(joinType=[FullOuterJoin],
> where=[(id1 = id2)], select=[id1, attr, id2, attr0],
> leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
> ship_strategy : HASH
>
> Stage 8 : Attr
> content : Calc(select=[id1, attr, id2,
> attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
> ship_strategy : FORWARD
>
> Stage 10 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> 

Re: flink sql count distonct 优化

2021-03-26 Thread Jark Wu
> 如果不是window agg,开启参数后flink会自动打散是吧
是的

> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation

On Fri, 26 Mar 2021 at 11:00, guomuhua <663021...@qq.com> wrote:

> Jark wrote
> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> > agg支持这个参数了。可以期待下。
> >
> > Best,
> > Jark
> >
> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
>
> > vincent2015qdlg@
>
> > 
> > wrote:
> >
> >> Hi,guomuhua
> >>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
> >>
> >> Best,
> >> Robin
> >>
> >>
> >> guomuhua wrote
> >> > 在SQL中,如果开启了 local-global 参数:set
> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> >> > 或者开启了Partial-Final 参数:set
> >> table.optimizer.distinct-agg.split.enabled=true;
> >> >  set
> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
> >> > 还需要对应的将SQL改写为两段式吗?
> >> > 例如:
> >> > 原SQL:
> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >> >
> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> >> > SELECT day, SUM(cnt) total
> >> > FROM (
> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> >> > FROM T GROUP BY day, MOD(buy_id, 1024))
> >> > GROUP BY day
> >> >
> >> > 还是flink会帮我自动改写SQL,我不用关心?
> >> >
> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> >> > 
> >>
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png
> ;
> >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: pipeline.auto-watermark-interval vs setAutoWatermarkInterval

2021-03-26 Thread Jark Wu
IIUC, pipeline.auto-watermak-interval = 0 just disable **periodic**
watermark emission,
 it doesn't mean the watermark will never be emitted.
In Table API/SQL, it has the same meaning. If watermark interval = 0, we
disable periodic watermark emission,
and emit watermark once it advances.

So I think the SQL documentation is correct.

Best,
Jark

On Tue, 23 Mar 2021 at 22:29, Dawid Wysakowicz 
wrote:

> Hey,
>
> I would like to double check this with Jark and/or Timo. As far as
> DataStream is concerned the javadoc is correct. Moreover the
> pipeline.auto-watermak-interval and setAutoWatermarkInterval are
> effectively the same setting/option. However I am not sure if Table API
> interprets it in the same way as DataStream APi. The documentation you
> linked, Aeden, describes the SQL API.
>
> @Jark @Timo Could you verify if the SQL documentation is correct?
>
> Best,
>
> Dawid
> On 23/03/2021 15:20, Matthias Pohl wrote:
>
> Hi Aeden,
> sorry for the late reply. I looked through the code and verified that the
> JavaDoc is correct. Setting pipeline.auto-watermark-interval to 0 will
> disable the automatic watermark generation. I created FLINK-21931 [1] to
> cover this.
>
> Thanks,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-21931
>
> On Thu, Mar 4, 2021 at 9:53 PM Aeden Jameson 
> wrote:
>
>> Correction: The first link was supposed to be,
>>
>> 1. pipeline.auto-watermark-interval
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
>>
>> On Wed, Mar 3, 2021 at 7:46 PM Aeden Jameson 
>> wrote:
>> >
>> > I'm hoping to have my confusion clarified regarding the settings,
>> >
>> > 1. pipeline.auto-watermark-interval
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
>> >
>> > 2. setAutoWatermarkInterval
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
>> >
>> > I noticed the default value of pipeline.auto-watermark-interval is 0
>> > and according to these docs,
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark
>> ,
>> > it states, "If watermark interval is 0ms, the generated watermarks
>> > will be emitted per-record if it is not null and greater than the last
>> > emitted one." However in the documentation for
>> > setAutoWatermarkInterval the value 0 disables watermark emission.
>> >
>> > * Are they intended to be the same setting? If not how are they
>> > different? Is one for FlinkSql and the other DataStream API?
>> >
>> > --
>> > Thank you,
>> > Aeden
>
>


Re: flink sql count distonct 优化

2021-03-25 Thread Jark Wu
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
agg支持这个参数了。可以期待下。

Best,
Jark

On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
wrote:

> Hi,guomuhua
>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>
> Best,
> Robin
>
>
> guomuhua wrote
> > 在SQL中,如果开启了 local-global 参数:set
> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> > 或者开启了Partial-Final 参数:set
> table.optimizer.distinct-agg.split.enabled=true;
> >  set
> > table.optimizer.distinct-agg.split.bucket-num=1024;
> > 还需要对应的将SQL改写为两段式吗?
> > 例如:
> > 原SQL:
> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >
> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> > SELECT day, SUM(cnt) total
> > FROM (
> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> > FROM T GROUP BY day, MOD(buy_id, 1024))
> > GROUP BY day
> >
> > 还是flink会帮我自动改写SQL,我不用关心?
> >
> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> > 
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: LocalWatermarkAssigner causes predicate pushdown to be skipped

2021-03-07 Thread Jark Wu
Hi Yuval,

That's correct you will always get a LogicalWatermarkAssigner if you
assigned a watermark.
If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner
will be pushed
into TableSource, and then you can push Filter into source if source
implement SupportsFilterPushdown.

Best,
Jark

On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov  wrote:

> Hi Timo,
> After investigating this further, this is actually non related to
> implementing SupportsWatermarkPushdown.
>
> Once I create a TableSchema for my custom source's RowData, and assign it
> a watermark (see my example in the original mail), the plan will always
> include a LogicalWatermarkAssigner. This assigner that is between the
> LogicalTableScan and the LogicalFilter will then go on and fail the
> HepPlanner from invoking the optimization since it requires
> LogicalTableScan to be a direct child of LogicalFilter. Since I have
> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
> work.
>
> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther  wrote:
>
>> Hi Yuval,
>>
>> sorry that nobody replied earlier. Somehow your email fell through the
>> cracks.
>>
>> If I understand you correctly, could would like to implement a table
>> source that implements both `SupportsWatermarkPushDown` and
>> `SupportsFilterPushDown`?
>>
>> The current behavior might be on purpose. Filters and Watermarks are not
>> very compatible. Filtering would also mean that records (from which
>> watermarks could be generated) are skipped. If the filter is very
>> strict, we would not generate any new watermarks and the pipeline would
>> stop making progress in time.
>>
>> Watermark push down is only necessary, if per-partition watermarks are
>> required. Otherwise the watermarks are generated in a subsequent
>> operator after the source. So you can still use rowtime without
>> implementing `SupportsWatermarkPushDown` in your custom source.
>>
>> I will lookp in Shengkai who worked on this topic recently.
>>
>> Regards,
>> Timo
>>
>>
>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>> > Bumping this up again, would appreciate any help if anyone is familiar
>> > with the blink planner.
>> >
>> > Thanks,
>> > Yuval.
>> >
>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov > > > wrote:
>> >
>> > Hi Jark,
>> > Would appreciate your help with this.
>> >
>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>> ro...@apache.org
>> > > wrote:
>> >
>> > Hi Yuval,
>> >
>> > I'm not familiar with the Blink planner but probably Jark can
>> help.
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>> > mailto:yuva...@gmail.com>> wrote:
>> >
>> > Update: When I don't set the watermark explicitly on the
>> > TableSchema, `applyWatermarkStrategy` never gets called on
>> > my ScanTableSource, which does make sense. But now the
>> > question is what should be done? This feels a bit
>> unintuitive.
>> >
>> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>> > mailto:yuva...@gmail.com>> wrote:
>> >
>> > Hi,
>> > Flink 1.12.1, Blink Planner, Scala 2.12
>> >
>> > I have the following logical plan:
>> >
>> >
>>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
>> baz, hello_world, a, b])
>> > +- LogicalProject(value=[$2],
>> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>> > "UTF-16LE"], b=[EMPTY_MAP()])
>> > +- LogicalFilter(condition=[AND(=($4,
>> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>> >+- LogicalWatermarkAssigner(rowtime=[bar],
>> > watermark=[$0])
>> >   +- LogicalTableScan(table=[[default_catalog,
>> > default_database, foo]])
>> >
>> > I have a custom source which creates a TableSchema based
>> > on an external table. When I create the schema, I push
>> > the watermark definition to the schema:
>> >
>> > image.png
>> >
>> > When the HepPlanner starts the optimization phase and
>> > reaches the "PushFilterInotTableSourceScanRule", it
>> > matches on the LogicalFilter in the definition. But
>> > then, since the RelOptRuleOperandChildPolicy is set to
>> > "SOME", it attempts to do a full match on the child
>> > nodes. Since the rule is defined as so:
>> >
>> > image.png
>> >
>> > The child filter fails 

Re: sql 动态修改参数问题

2021-03-04 Thread Jark Wu
看起来是个分段优化复用节点的bug,可以去 JIRA 开个 issue。

Best,
Jark

On Thu, 4 Mar 2021 at 19:37, 酷酷的浑蛋  wrote:

> StatementSet statementSet = tableEnvironment.createStatementSet();
> String sql1 = "insert into test select a,b,c from test_a_12342 /*+
> OPTIONS('table-name'='test_a_1')*/";
> String sql2 = "insert into test select a,b,c from test_a_12342 /*+
> OPTIONS('table-name'='test_a_2')*/";
> statementSet.addInsertSql(sql1);
> statementSet.addInsertSql(sql2);
> statementSet.execute();
>
>
> Sql代码如上,在最终insert后是将test_a_1表的数据插入了两遍,而test_a_2的数据并没有插入,请问这个是bug吗


Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 Thread Jark Wu
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
forward。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate

On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:

> Hi 社区。
> Flink 1.12.1
>
> 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
> forword 的ETL没有作用。
>
> insert into table_a select id,udf(a),b,c from table_b;
>
> 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
> 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
> 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
>
> ```
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, temp_table]], fields=[id...])
>
> Stage 3 : Operator
> content : ChangelogNormalize(key=[id])
> ship_strategy : HASH
>
> Stage 4 : Operator
> content : Calc(select=[...])
> ship_strategy : FORWARD
>
> Stage 5 : Data Sink
> content : Sink: Sink(table=[default_catalog.default_database.table_a],
> fields=[id...])
> ship_strategy : FORWARD
> ```
>


Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 Thread Jark Wu
big +1 from my side.

Best,
Jark

On Thu, 4 Mar 2021 at 20:59, Leonard Xu  wrote:

> +1 for the roadmap.
>
> Thanks Timo for driving this.
>
> Best,
> Leonard
>
> > 在 2021年3月4日,20:40,Timo Walther  写道:
> >
> > Last call for feedback on this topic.
> >
> > It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has
> been accepted for a very long time, I think we don't need another voting
> thread for executing the last implementation step. Please let me know if
> you think differently.
> >
> > I will start deprecating the affected classes and interfaces beginning
> of next week.
> >
> > Regards,
> > Timo
> >
> >
> > On 26.02.21 15:46, Seth Wiesman wrote:
> >> Strong +1
> >> Having two planners is confusing to users and the diverging semantics
> make
> >> it difficult to provide useful learning material. It is time to rip the
> >> bandage off.
> >> Seth
> >> On Fri, Feb 26, 2021 at 12:54 AM Kurt Young  wrote:
> >>>  breaking
> >>> change.>
> >>>
> >>> Hi Timo,
> >>>
> >>> First of all I want to thank you for introducing this planner design
> back
> >>> in 1.9, this is a great work
> >>> that allows lots of blink features to be merged to Flink in a
> reasonably
> >>> short time. It greatly
> >>> accelerates the evolution speed of Table & SQL.
> >>>
> >>> Everything comes with a cost, as you said, right now we are facing the
> >>> overhead of maintaining
> >>> two planners and it causes bugs and also increases imbalance between
> these
> >>> two planners. As
> >>> a developer and also for the good of all Table & SQL users, I also
> think
> >>> it's better for us to be more
> >>> focused on a single planner.
> >>>
> >>> Your proposed roadmap looks good to me, +1 from my side and thanks
> >>> again for all your efforts!
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Thu, Feb 25, 2021 at 5:01 PM Timo Walther 
> wrote:
> >>>
>  Hi everyone,
> 
>  since Flink 1.9 we have supported two SQL planners. Most of the
> original
>  plan of FLIP-32 [1] has been implemented. The Blink code merge has
> been
>  completed and many additional features have been added exclusively to
>  the new planner. The new planner is now in a much better shape than
> the
>  legacy one.
> 
>  In order to avoid user confusion, reduce duplicate code, and improve
>  maintainability and testing times of the Flink project as a whole we
>  would like to propose the following steps to complete FLIP-32:
> 
>  In Flink 1.13:
>  - Deprecate the `flink-table-planner` module
>  - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
> 
>  In Flink 1.14:
>  - Drop `flink-table-planner` early
>  - Drop many deprecated interfaces and API on demand
>  - Rename `flink-table-planner-blink` to `flink-table-planner`
>  - Rename `flink-table-runtime-blink` to `flink-table-runtime`
>  - Remove references of "Blink" in the code base
> 
>  This will have an impact on users that still use DataSet API together
>  with Table API. With this change we will not support converting
> between
>  DataSet API and Table API anymore. We hope to compensate the missing
>  functionality in the new unified TableEnvironment and/or the batch
> mode
>  in DataStream API during 1.14 and 1.15. For this, we are looking for
>  further feedback which features are required in Table API/DataStream
> API
>  to have a smooth migration path.
> 
>  Looking forward to your feedback.
> 
>  Regards,
>  Timo
> 
>  [1]
> 
> 
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
> 
> >>>
> >
>
>


Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 Thread Jark Wu
big +1 from my side.

Best,
Jark

On Thu, 4 Mar 2021 at 20:59, Leonard Xu  wrote:

> +1 for the roadmap.
>
> Thanks Timo for driving this.
>
> Best,
> Leonard
>
> > 在 2021年3月4日,20:40,Timo Walther  写道:
> >
> > Last call for feedback on this topic.
> >
> > It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has
> been accepted for a very long time, I think we don't need another voting
> thread for executing the last implementation step. Please let me know if
> you think differently.
> >
> > I will start deprecating the affected classes and interfaces beginning
> of next week.
> >
> > Regards,
> > Timo
> >
> >
> > On 26.02.21 15:46, Seth Wiesman wrote:
> >> Strong +1
> >> Having two planners is confusing to users and the diverging semantics
> make
> >> it difficult to provide useful learning material. It is time to rip the
> >> bandage off.
> >> Seth
> >> On Fri, Feb 26, 2021 at 12:54 AM Kurt Young  wrote:
> >>>  breaking
> >>> change.>
> >>>
> >>> Hi Timo,
> >>>
> >>> First of all I want to thank you for introducing this planner design
> back
> >>> in 1.9, this is a great work
> >>> that allows lots of blink features to be merged to Flink in a
> reasonably
> >>> short time. It greatly
> >>> accelerates the evolution speed of Table & SQL.
> >>>
> >>> Everything comes with a cost, as you said, right now we are facing the
> >>> overhead of maintaining
> >>> two planners and it causes bugs and also increases imbalance between
> these
> >>> two planners. As
> >>> a developer and also for the good of all Table & SQL users, I also
> think
> >>> it's better for us to be more
> >>> focused on a single planner.
> >>>
> >>> Your proposed roadmap looks good to me, +1 from my side and thanks
> >>> again for all your efforts!
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Thu, Feb 25, 2021 at 5:01 PM Timo Walther 
> wrote:
> >>>
>  Hi everyone,
> 
>  since Flink 1.9 we have supported two SQL planners. Most of the
> original
>  plan of FLIP-32 [1] has been implemented. The Blink code merge has
> been
>  completed and many additional features have been added exclusively to
>  the new planner. The new planner is now in a much better shape than
> the
>  legacy one.
> 
>  In order to avoid user confusion, reduce duplicate code, and improve
>  maintainability and testing times of the Flink project as a whole we
>  would like to propose the following steps to complete FLIP-32:
> 
>  In Flink 1.13:
>  - Deprecate the `flink-table-planner` module
>  - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
> 
>  In Flink 1.14:
>  - Drop `flink-table-planner` early
>  - Drop many deprecated interfaces and API on demand
>  - Rename `flink-table-planner-blink` to `flink-table-planner`
>  - Rename `flink-table-runtime-blink` to `flink-table-runtime`
>  - Remove references of "Blink" in the code base
> 
>  This will have an impact on users that still use DataSet API together
>  with Table API. With this change we will not support converting
> between
>  DataSet API and Table API anymore. We hope to compensate the missing
>  functionality in the new unified TableEnvironment and/or the batch
> mode
>  in DataStream API during 1.14 and 1.15. For this, we are looking for
>  further feedback which features are required in Table API/DataStream
> API
>  to have a smooth migration path.
> 
>  Looking forward to your feedback.
> 
>  Regards,
>  Timo
> 
>  [1]
> 
> 
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
> 
> >>>
> >
>
>


Re: Union fields with time attributes have different types

2021-02-28 Thread Jark Wu
Hi Sebastián,

`endts` in your case is a time attribute which is slightly different than a
regular TIMESTAMP type.
You can manually `cast(endts as timestamp(3)` to make this query work
which removes the time attribute meta.

SELECT `evt`, `value`, `startts`, cast(endts as timestamp(3)
FROM aggs_1m


Best,
Jark

On Mon, 22 Feb 2021 at 05:01, Sebastián Magrí  wrote:

> I'm using a query like this
>
> WITH aggs_1m AS (
>   SELECT
> `evt`,
> `startts`
> `endts`,
> SUM(`value`) AS `value`
>   FROM aggregates_per_minute
> ), aggs_3m AS (
>   SELECT
> `evt`,
> TUMBLE_START(`endts`, INTERVAL '3' MINUTE) AS `startts`,
> TUMBLE_END(`endts`, INTERVAL '3' MINUTE) AS `endts`,
> SUM(`c`) AS `value`
>   FROM aggregates_per_minute
>   GROUP BY t, TUMBLE(`endts`, INTERVAL '3' MINUTE)
> )
> SELECT `evt`, `value`, `startts`, `endts`
> FROM aggs_1m
> UNION
> SELECT `evt`, `value`, `startts`, `endts`
> FROM aggs_3m
>
> But it's throwing this exception
>
> org.apache.flink.table.api.ValidationException: Union fields with time
> attributes have different types.
>
> Doesn't TUMBLE_START(somets, ...) return a TIMESTAMP of the same type?
>
> --
> Sebastián Ramírez Magrí
>


Re: Best way to handle BIGING to TIMESTAMP conversions

2021-02-28 Thread Jark Wu
Hi Sebastián,

You can use `TO_TIMESTAMP(FROM_UNIXTIME(e))` to get a timestamp value.
The BIGINT should be in seconds.  Please note to declare the computed column
 in DDL schema and declare a watermark strategy on this computed field to
make
 the field to be a rowtime attribute. Because streaming over window
requires to
 order by a time attribute.

Best,
Jark

On Sun, 21 Feb 2021 at 07:32, Sebastián Magrí  wrote:

> I have a table with two BIGINT fields for start and end of an event as
> UNIX time in milliseconds. I want to be able to have a resulting column
> with the delta in milliseconds and group by that difference. Also, I want
> to be able to have aggregations with window functions based upon the `end`
> field.
>
> The table definition looks like this:
> |CREATE TABLE sessions (
> |  `ats`   STRING,
> |  `e` BIGINT,
> |  `s` BIGINT,
> |  `proc_time` AS PROCTIME(),
> |  PRIMARY KEY (`ats`, `s`, `e`) NOT ENFORCED
> |)
>
> Then I have a few views like this:
>
> CREATE VIEW second_sessions AS
>   SELECT * FROM sessions
>   WHERE `e` - `s` = 1000
>
> And some windows using these views like this:
>
>   WINDOW w3m AS (
> PARTITION BY `t`
> ORDER BY `proc_time`
> RANGE BETWEEN INTERVAL '3' MINUTE PRECEDING AND CURRENT ROW
>   )
>
> I'd like to use the `e` field for windowing instead of `proc_time`. But I
> keep running into errors with the `TO_TIMESTAMP(BIGINT)` function now
> missing or with unsupported timestamp arithmetics.
>
> What is the best practice for a case such as this?
>
> Best Regards,
> --
> Sebastián Ramírez Magrí
>


Re: LEAD/LAG functions

2021-02-01 Thread Jark Wu
Yes. RANK/ROW_NUMBER is not allowed with ROW/RANGE over window,
i.e. the "ROWS BETWEEN 1 PRECEDING AND CURRENT ROW" clause.

Best,
Jark

On Mon, 1 Feb 2021 at 22:06, Timo Walther  wrote:

> Hi Patrick,
>
> I could imagine that LEAD/LAG are translated into RANK/ROW_NUMBER
> operations that are not supported in this context.
>
> But I will loop in @Jark who might know more about the limitaitons here.
>
> Regards,
> Timo
>
>
> On 29.01.21 17:37, Patrick Angeles wrote:
> > Another (hopefully newbie) question. Trying to use LEAD/LAG over window
> > functions. I get the following error. The exact same query works
> > properly using FIRST_VALUE instead of LEAD.
> >
> > Thanks in advance...
> >
> > - Patrick
> >
> > Flink SQL> describe l1_min ;
> >
> > +---++--+-++---+
> >
> > |name | type | null | key | extras | watermark |
> >
> > +---++--+-++---+
> >
> > |symbol | STRING | true | || |
> >
> > | t_start | TIMESTAMP(3) *ROWTIME* | true | || |
> >
> > | ask_price | DOUBLE | true | || |
> >
> > | bid_price | DOUBLE | true | || |
> >
> > | mid_price | DOUBLE | true | || |
> >
> > +---++--+-++---+
> >
> > 5 rows in set
> >
> >
> > Flink SQL> SELECT
> >
> >> symbol,
> >
> >> t_start,
> >
> >> ask_price,
> >
> >> bid_price,
> >
> >> mid_price,
> >
> >> LEAD (mid_price) OVER x AS prev_price
> >
> >> FROM l1_min
> >
> >> WINDOW x AS (
> >
> >> PARTITION BY symbol
> >
> >> ORDER BY t_start
> >
> >> ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
> >
> >> )
> >
> >> ;
> >
> > *[ERROR] Could not execute SQL statement. Reason:*
> >
> > *org.apache.calcite.sql.validate.SqlValidatorException: ROW/RANGE not
> > allowed with RANK, DENSE_RANK or ROW_NUMBER functions*
> >
>
>


Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-28 Thread Jark Wu
Hi Sebastián,

Could you try to add combine.children="append" attribute to the
transformers configuration?
You can also see the full shade plugin configuration here [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/#transform-table-connectorformat-resources

On Thu, 28 Jan 2021 at 17:28, Sebastián Magrí  wrote:

> Hi Jark!
>
> Please find the full pom file attached.
>
> Best Regards,
>
> On Thu, 28 Jan 2021 at 03:21, Jark Wu  wrote:
>
>> Hi Sebastián,
>>
>> I think Dawid is right.
>>
>> Could you share the pom file? I also tried to
>> package flink-connector-postgres-cdc with ServicesResourceTransformer, and
>> the Factory file contains
>>
>> com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory
>>
>>
>> Best,
>> Jark
>>
>>
>> On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí 
>> wrote:
>>
>>> Thanks a lot for looking into it Dawid,
>>>
>>> In the
>>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
>>> file I only see
>>>
>>> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
>>>
>>> Even after applying the ServicesResourceTransformer.
>>>
>>>
>>> On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Unfortunately I am not familiar with the packaging of
>>>> flink-connector-postgres-cdc. Maybe @Jark could help here?
>>>>
>>>> However, I think the problem that you cannot find the connector is
>>>> caused because of lack of entry in the resulting Manifest file. If there
>>>> are overlapping classes maven does not exclude whole dependencies, but
>>>> rather picks the overlapping class from one of the two. Could you check if
>>>> you see entries for all tables in
>>>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
>>>>
>>>> If not, you could try applying the ServicesResourceTransformer[1]
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> [1]
>>>> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>>>> On 26/01/2021 12:29, Sebastián Magrí wrote:
>>>>
>>>> Hi!
>>>>
>>>> I've reported an issue with the postgresql-cdc connector apparently
>>>> caused by the maven shade plugin excluding either the JDBC connector or the
>>>> cdc connector due to overlapping classes. The issue for reference is here:
>>>>
>>>> https://github.com/ververica/flink-cdc-connectors/issues/90
>>>>
>>>> In the meantime, however, I've been trying to figure out if I can set
>>>> up an exclusion rule to fix this in my pom.xml file, without success.
>>>>
>>>> The `org.postgresql:postgresql` dependency is being added manually by
>>>> me to have a sink on a postgresql table and injected by the cdc connector
>>>> seemingly via its debezium connector dependency.
>>>>
>>>> Any guidance or hints I could follow would be really appreciated.
>>>>
>>>> --
>>>> Sebastián Ramírez Magrí
>>>>
>>>>
>>>
>>> --
>>> Sebastián Ramírez Magrí
>>>
>>
>
> --
> Sebastián Ramírez Magrí
>


Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-27 Thread Jark Wu
Hi Rex,

Currently, it is not state compatible, because we will add a new node
called MiniBatchAssigner after the source which changes the JobGraph , thus
uid is different.

Best,
Jark

On Tue, 26 Jan 2021 at 18:33, Dawid Wysakowicz 
wrote:

> I am pulling in Jark and Godfrey who are more familiar with the internals
> of the planner.
> On 21/01/2021 01:43, Rex Fenley wrote:
>
> Just tested this and I couldn't restore from a savepoint. If I do a new
> job from scratch, can I tune the minibatch parameters and restore from a
> savepoint without having to make yet another brand new job?
>
> Thanks
>
>
> On Wed, Jan 20, 2021 at 12:43 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> Is it safe to convert a non-mini-batch job to a mini-batch job when
>> restoring from a checkpoint or a savepoint?
>>
>> Thanks
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>


Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-27 Thread Jark Wu
Hi Sebastián,

I think Dawid is right.

Could you share the pom file? I also tried to
package flink-connector-postgres-cdc with ServicesResourceTransformer, and
the Factory file contains

com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory


Best,
Jark


On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí  wrote:

> Thanks a lot for looking into it Dawid,
>
> In the
> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> file I only see
>
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
>
> Even after applying the ServicesResourceTransformer.
>
>
> On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> Unfortunately I am not familiar with the packaging of
>> flink-connector-postgres-cdc. Maybe @Jark could help here?
>>
>> However, I think the problem that you cannot find the connector is caused
>> because of lack of entry in the resulting Manifest file. If there are
>> overlapping classes maven does not exclude whole dependencies, but rather
>> picks the overlapping class from one of the two. Could you check if you see
>> entries for all tables in
>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
>>
>> If not, you could try applying the ServicesResourceTransformer[1]
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>> On 26/01/2021 12:29, Sebastián Magrí wrote:
>>
>> Hi!
>>
>> I've reported an issue with the postgresql-cdc connector apparently
>> caused by the maven shade plugin excluding either the JDBC connector or the
>> cdc connector due to overlapping classes. The issue for reference is here:
>>
>> https://github.com/ververica/flink-cdc-connectors/issues/90
>>
>> In the meantime, however, I've been trying to figure out if I can set up
>> an exclusion rule to fix this in my pom.xml file, without success.
>>
>> The `org.postgresql:postgresql` dependency is being added manually by me
>> to have a sink on a postgresql table and injected by the cdc connector
>> seemingly via its debezium connector dependency.
>>
>> Any guidance or hints I could follow would be really appreciated.
>>
>> --
>> Sebastián Ramírez Magrí
>>
>>
>
> --
> Sebastián Ramírez Magrí
>


Re: A few questions about minibatch

2021-01-27 Thread Jark Wu
Hi Rex,

Could you share your query here? It would be helpful to identify the root
cause if we have the query.

1) watermark
The framework automatically adds a node (the MiniBatchAssigner) to generate
watermark events as the mini-batch id to broadcast and trigger mini-batch
in the pipeline.

2) MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]
It generates a new mini-batch id in an interval of 1000ms in system time.
The mini-batch id is represented by the watermark event.

3) TWO_PHASE optimization
If users want to have TWO_PHASE optimization, it requires the aggregate
functions all support the merge() method and the mini-batch is enabled.

Best,
Jark




On Tue, 26 Jan 2021 at 19:01, Dawid Wysakowicz 
wrote:

> I am pulling Jark and Godfrey who are more familiar with the planner
> internals.
>
> Best,
>
> Dawid
> On 22/01/2021 20:11, Rex Fenley wrote:
>
> Hello,
>
> Does anyone have any more information here?
>
> Thanks!
>
> On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> Our job was experiencing high write amplification on aggregates so we
>> decided to give mini-batch a go. There's a few things I've noticed that are
>> different from our previous job and I would like some clarification.
>>
>> 1) Our operators now say they have Watermarks. We never explicitly added
>> watermarks, and our state is essentially unbounded across all time since it
>> consumes from Debezium and reshapes our database data into another store.
>> Why does it say we have Watermarks then?
>>
>> 2) In our sources I see MiniBatchAssigner(interval=[1000ms],
>> mode=[ProcTime], what does that do?
>>
>> 3) I don't really see anything else different yet in the shape of our
>> plan even though we've turned on
>> configuration.setString(
>> "table.optimizer.agg-phase-strategy",
>> "TWO_PHASE"
>> )
>> is there a way to check that this optimization is on? We use user defined
>> aggregate functions, does it work for UDAF?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>


Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Jark Wu
Great examples to understand the problem and the proposed changes, @Kurt!

Thanks Leonard for investigating this problem.
The time-zone problems around time functions and windows have bothered a
lot of users. It's time to fix them!

The return value changes sound reasonable to me, and keeping the return
type unchanged will minimize the surprise to the users.
Besides that, I think it would be better to mention how this affects the
window behaviors, and the interoperability with DataStream.

I think this definitely deserves a FLIP.



Hi zhisheng,

Do you have examples to illustrate which case will get the wrong window
boundaries?
That will help to verify whether the proposed changes can solve your
problem.

Best,
Jark


On Thu, 21 Jan 2021 at 12:54, zhisheng <173855...@qq.com> wrote:

> Thanks to Leonard Xu for discussing this tricky topic. At present, there
> are many Flink jobs in our production environment that are used to count
> day-level reports (eg: count PV/UV ).
>
>
> If use the default Flink SQL, the window time range of the
> statistics is incorrect, then the statistical results will naturally be
> incorrect.
>
>
> The user needs to deal with the time zone manually in order to solve the
> problem.
>
>
> If Flink itself can solve these time zone issues, then I think it will be
> user-friendly.
>
>
> Thank you
>
>
> Best!
> zhisheng
>
>
> --原始邮件--
> 发件人:
>   "dev"
> <
> xbjt...@gmail.com;
> 发送时间:2021年1月19日(星期二) 晚上6:35
> 收件人:"dev"
> 主题:Re: [DISCUSS] Correct time-related function behavior in Flink SQL
>
>
>
> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png;
>
>
>  在 2021年1月19日,16:22,Leonard Xu  
>  Hi, all
> 
>  I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> 
>  Currently some temporal function behaviors are wired to users.
>  1. When users use a PROCTIME() in SQL, the value of PROCTIME()
> has a timezone offset with the wall-clock time in users' local time zone,
> users need to add their local time zone offset manually to get expected
> local timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> 
>  2. Users can not use
> CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP to get wall-clock
> timestamp in local time zone, and thus they need write UDF in their SQL
> just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> 
>  3. Another common case is the time window with day
> interval based on PROCTIME(), user plan to put all data from one day into
> the same window, but the window is assigned using timestamp in UTC+0
> timezone rather than the session timezone which leads to the window starts
> with an offset(e.g: Users in China need to add -8h in their business sql
> start and then +8h when output the result, the conversion like a magic for
> users).
> 
>  These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> 
>  This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> 
> 
>  I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark,
> Snowflake, I made an excel [2] to organize them well, we can use it
> for the next discussion. Please let me know if I missed something.
>  From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP function,
> 
>  FLINK  current behaviorexisted problem other vendors'
> behavior proposed change
>  CURRENT_TIMESTAMP  CURRENT_TIMESTAMP
>  TIMESTAMP(0) NOT NULL
> 
>  #session timezone: UTC
>  2020-12-28T23:52:52
> 
>  #session timezone: UTC+8
>  2020-12-28T23:52:52
> 
>  wall clock:
>  UTC+8: 

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Jark Wu
Great examples to understand the problem and the proposed changes, @Kurt!

Thanks Leonard for investigating this problem.
The time-zone problems around time functions and windows have bothered a
lot of users. It's time to fix them!

The return value changes sound reasonable to me, and keeping the return
type unchanged will minimize the surprise to the users.
Besides that, I think it would be better to mention how this affects the
window behaviors, and the interoperability with DataStream.

I think this definitely deserves a FLIP.



Hi zhisheng,

Do you have examples to illustrate which case will get the wrong window
boundaries?
That will help to verify whether the proposed changes can solve your
problem.

Best,
Jark


On Thu, 21 Jan 2021 at 12:54, zhisheng <173855...@qq.com> wrote:

> Thanks to Leonard Xu for discussing this tricky topic. At present, there
> are many Flink jobs in our production environment that are used to count
> day-level reports (eg: count PV/UV ).
>
>
> If use the default Flink SQL, the window time range of the
> statistics is incorrect, then the statistical results will naturally be
> incorrect.
>
>
> The user needs to deal with the time zone manually in order to solve the
> problem.
>
>
> If Flink itself can solve these time zone issues, then I think it will be
> user-friendly.
>
>
> Thank you
>
>
> Best!
> zhisheng
>
>
> --原始邮件--
> 发件人:
>   "dev"
> <
> xbjt...@gmail.com;
> 发送时间:2021年1月19日(星期二) 晚上6:35
> 收件人:"dev"
> 主题:Re: [DISCUSS] Correct time-related function behavior in Flink SQL
>
>
>
> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png;
>
>
>  在 2021年1月19日,16:22,Leonard Xu  
>  Hi, all
> 
>  I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> 
>  Currently some temporal function behaviors are wired to users.
>  1. When users use a PROCTIME() in SQL, the value of PROCTIME()
> has a timezone offset with the wall-clock time in users' local time zone,
> users need to add their local time zone offset manually to get expected
> local timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> 
>  2. Users can not use
> CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP to get wall-clock
> timestamp in local time zone, and thus they need write UDF in their SQL
> just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> 
>  3. Another common case is the time window with day
> interval based on PROCTIME(), user plan to put all data from one day into
> the same window, but the window is assigned using timestamp in UTC+0
> timezone rather than the session timezone which leads to the window starts
> with an offset(e.g: Users in China need to add -8h in their business sql
> start and then +8h when output the result, the conversion like a magic for
> users).
> 
>  These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> 
>  This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> 
> 
>  I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark,
> Snowflake, I made an excel [2] to organize them well, we can use it
> for the next discussion. Please let me know if I missed something.
>  From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP function,
> 
>  FLINK  current behaviorexisted problem other vendors'
> behavior proposed change
>  CURRENT_TIMESTAMP  CURRENT_TIMESTAMP
>  TIMESTAMP(0) NOT NULL
> 
>  #session timezone: UTC
>  2020-12-28T23:52:52
> 
>  #session timezone: UTC+8
>  2020-12-28T23:52:52
> 
>  wall clock:
>  UTC+8: 

Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

2021-01-13 Thread Jark Wu
Hi Dan,

Sorry for the late reply.

I guess you applied a "deduplication with keeping last row" before the
interval join?
That will produce an updating stream and interval join only supports
append-only input.
You can try to apply "deduplication with keeping *first* row" before the
interval join.
That should produce an append-only stream and interval join can consume
from it.

Best,
Jark



On Tue, 5 Jan 2021 at 20:07, Arvid Heise  wrote:

> Hi Dan,
>
> Which Flink version are you using? I know that there has been quite a bit
> of optimization of deduplication in 1.12, which would reduce the required
> state tremendously.
> I'm pulling in Jark who knows more.
>
> On Thu, Dec 31, 2020 at 6:54 AM Dan Hill  wrote:
>
>> Hi!
>>
>> I'm using Flink SQL to do an interval join.  Rows in one of the tables
>> are not unique.  I'm fine using either the first or last row.  When I try
>> to deduplicate
>> 
>>  and
>> then interval join, I get the following error.
>>
>> IntervalJoin doesn't support consuming update and delete changes which is
>> produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[log_user_id], orderBy=[ts
>> ASC], select=[platform_id, user_id, log_user_id, client_log_ts,
>> event_api_ts, ts])
>>
>> Is there a way to combine these in this order?  I could do the
>> deduplication afterwards but this will result in more state.
>>
>> - Dan
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Statement Sets

2021-01-13 Thread Jark Wu
No. The Kafka reader will be shared, that means Kafka data is only be read
once.

On Tue, 12 Jan 2021 at 03:04, Aeden Jameson  wrote:

> When using statement sets, if two select queries use the same table
> (e.g. Kafka Topic), does each query get its own copy of data?
>
> Thank you,
> Aeden
>


Re: 请问大神们flink-sql可以指定时间清除内存中的全部State吗?

2021-01-13 Thread Jark Wu
为啥不用天级别的tumble window? 自动就帮你清楚 state 了

On Wed, 6 Jan 2021 at 13:53, 徐州州 <25977...@qq.com> wrote:

> 最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStampcurrent_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new
> MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。
> |  select
> |   TO_DATE(cast(doi.DeliveryTime as
> String),'-MM-dd') as  days,
> |   doi.UserId,
> |   count(doi.Code) as   SendTime,
> |   sum(doi.PayAmount / 100) as SendCashcharge,
> |   sum(doi.PayAmount / 100 - ChargeAmount / 100 +
> UseBalance / 100) as  SendCashuse,
> |   sum(doi.CashMoney / 100)as  SendCash
> |from dwd_order_info doi
> |where doi.DeliveryTime cast(current_date AS
> TIMESTAMP) and doi.OrderType = 29 and doi.Status = 50 and doi.Status
> < 60
> |group by TO_DATE(cast(doi.DeliveryTime as
> String),'-MM-dd'), doi.UserId


Re: flink的算子没有类似于spark的cache操作吗?

2021-01-13 Thread Jark Wu
社区已经在做了,可以关注下这个 FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

On Fri, 8 Jan 2021 at 15:42, 张锴  wrote:

> 保存中间变量可以用状态存
>
> 李继  于2021年1月7日周四 下午5:42写道:
>
> > HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作
> >
> > val env = getBatchEnv
> > val ds = env.fromElements("a","b","c")
> >
> > val ds2 = ds.map(x=>{
> >   println("map op")
> >   x.charAt(0).toInt+1
> > })
> >
> > //此操作会打印三遍map op
> > ds2.print()
> >
> > //此操作又会打印三遍map op
> > ds2.filter(_>100).print()
> >
>


Re: flink sqlsubmit自定义程序报错

2021-01-13 Thread Jark Wu
从报错信息看是超时了,看看client与 JM 之间的网络是否通常把。

On Sun, 10 Jan 2021 at 16:23, Fei Han 
wrote:

> 大家好!
>
> 参考云邪写的sqlsubmit提交SQL文件,我修改后提交,SQL文件已经识别了,可以创建表。但是提交任务insert的时候,在local模式下就报错。
> Flink版本是1.12.0。我的提交命令是:$FLINK_HOME/bin/flink run -mip:8081 -d  -p 3 -c
> sql.submit.SqlSubmit $SQL_JAR  -f $sql_file
> 在local模式报错如下:
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute sql
>  at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
>  at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>  at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
>  at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
>  at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:696)
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759)
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
>  at sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:169)
>  at sql.submit.SqlSubmit.callCommand(SqlSubmit.java:89)
>  at sql.submit.SqlSubmit.run(SqlSubmit.java:64)
>  at sql.submit.SqlSubmit.main(SqlSubmit.java:35)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
>  ... 11 more
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job '
> behavior'.
>  at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1951)
>  at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>  at
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:680)
>  ... 22 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>  at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366)
>  at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>  at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>  at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:361)
>  at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
>  at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>  at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at
> 

Re: Row function cannot have column reference through table alias

2021-01-13 Thread Jark Wu
已知问题,后续版本会修复,作为临时解决办法,可以使用直接这样去构造 (b.app_id, b.message),不用添加 ROW 关键字。

On Mon, 11 Jan 2021 at 11:17, 刘海  wrote:

> 使用ROW里面的 表别名.字段名 会出现解析错误,我之前使用hbase也有这个问题,我一般是在查询里面包一层子查询
>
>
> | |
> 刘海
> |
> |
> liuha...@163.com
> |
> 签名由网易邮箱大师定制
> On 1/11/2021 11:04,马阳阳 wrote:
> We have a sql that compose a row with a table’s columns. The simplified
> sql is like:
> INSERT INTO flink_log_sink
> SELECT
> b.id,
> Row(b.app_id, b.message)
> FROM flink_log_source a
> join flink_log_side b
> on a.id = b.id;
>
>
> When we submit the sql to Flink, the sql cannot be parsed, with the
> following error message:
> org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "." at line 11, column 8.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35)
> at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
> at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112)
> at
> cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37)
> at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "." at line 11, column 8.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
> at
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
> at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> ... 15 more
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered
> "." at line 11, column 8.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
> at 

Re: flink sql读kafka元数据问题

2021-01-13 Thread Jark Wu
kafka 读 key fields:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#key-fields

On Wed, 13 Jan 2021 at 15:18, JasonLee <17610775...@163.com> wrote:

> hi
>
> 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: SQL Client并行度设置 问题

2020-12-31 Thread Jark Wu
目前 SQL CLI 还不支持配置 flink-conf.yaml 中的参数,只支持 table 自己的参数。
你可以用 SET table.exec.resource.default-parallelism = xx 来配置 job 并发。

On Thu, 31 Dec 2020 at 17:13, jiangjiguang719 
wrote:

> 通过设置table.exec.hive.infer-source-parallelism=false 已经可以控制并发度,但是存在以下现象:
> 1、无论在streaming 还是在batch 模式,并发度只能由flink-conf.yaml 中的
> parallelism.default控制,其是全局配置,不能做到单个作业配置并发度
> 2、在sql-client-defaults.yaml中设置 parallelism 无效,在SQL Clint 中设置 
> parallelism.default
> 和 parallelism 都无效
>
> 那么如何有效控制 单个任务的并发度呢?
>
> 在 2020-12-31 15:21:36,"Jark Wu"  写道:
> >在 Batch 模式下:
> >
> >1. Hive source 会去推断并发数,并发数由文件数决定。你也可以通过
> >table.exec.hive.infer-source-parallelism=false 来禁止并发推断,
> > 这时候就会用 job 并发。或者设置一个最大的推断并发数
> >table.exec.hive.infer-source-parallelism.max。[1]
> >2. 同上。
> >3. 这里跟 max-parallelism 应该没有关系,应该是你没有配置 max slot 的原因,source 申请的并发太多,而 yarn
> >一时半会儿没这么多资源,所以超时了。
> >   配上 slotmanager.number-of-slots.max 就可以防止 batch 作业无限制地去申请资源。
> >
> >Best,
> >Jark
> >
> >[1]:
> >https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_read_write.html#source-parallelism-inference
> >[2]:
> >https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#slotmanager-number-of-slots-max
> >
> >On Thu, 31 Dec 2020 at 14:56, jiangjiguang719 
> >wrote:
> >
> >> flink1.12版本,使用SQL Client提交任务,读hive表,对并行度有些疑问,以下是现象:
> >> flink-conf.yaml文件中的:
> >>   taskmanager.numberOfTaskSlots: 1   有效
> >>   parallelism.default: 1无效,实际任务的并行度=hive表的文件 且 <= 160
> >> sql-client-defaults.yaml 文件中的:
> >>   execution:
> >> parallelism: 10无效
> >> max-parallelism: 16 当hive表的文件数大于此值时,报资源不足  Deployment took more
> >> than 60 seconds. Please check if the requested resources are available in
> >> the YARN cluster
> >> 问题:
> >> 1、SQL Client提交任务 怎么设置并行度?
> >> 2、为啥parallelism参数是无效的?
> >> 3、当hive表文件数大于max-parallelism 时为啥 发布失败?
>
>
>
>
>


Re: SQL Client并行度设置 问题

2020-12-30 Thread Jark Wu
在 Batch 模式下:

1. Hive source 会去推断并发数,并发数由文件数决定。你也可以通过
table.exec.hive.infer-source-parallelism=false 来禁止并发推断,
 这时候就会用 job 并发。或者设置一个最大的推断并发数
table.exec.hive.infer-source-parallelism.max。[1]
2. 同上。
3. 这里跟 max-parallelism 应该没有关系,应该是你没有配置 max slot 的原因,source 申请的并发太多,而 yarn
一时半会儿没这么多资源,所以超时了。
   配上 slotmanager.number-of-slots.max 就可以防止 batch 作业无限制地去申请资源。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_read_write.html#source-parallelism-inference
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#slotmanager-number-of-slots-max

On Thu, 31 Dec 2020 at 14:56, jiangjiguang719 
wrote:

> flink1.12版本,使用SQL Client提交任务,读hive表,对并行度有些疑问,以下是现象:
> flink-conf.yaml文件中的:
>   taskmanager.numberOfTaskSlots: 1   有效
>   parallelism.default: 1无效,实际任务的并行度=hive表的文件 且 <= 160
> sql-client-defaults.yaml 文件中的:
>   execution:
> parallelism: 10无效
> max-parallelism: 16 当hive表的文件数大于此值时,报资源不足  Deployment took more
> than 60 seconds. Please check if the requested resources are available in
> the YARN cluster
> 问题:
> 1、SQL Client提交任务 怎么设置并行度?
> 2、为啥parallelism参数是无效的?
> 3、当hive表文件数大于max-parallelism 时为啥 发布失败?


Re: Flink SQL 1.11支持将数据写入到Hive吗?

2020-12-11 Thread Jark Wu
1.11的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_read_write.html

1.12的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/

On Fri, 11 Dec 2020 at 15:45, yinghua...@163.com  wrote:

> 看官网介绍是支持的:
>
> 但是找对应的连接器是没有Hive,*是JDBC?*
>
>


Re: computed column转为timestamp类型后进行窗口聚合报错

2020-12-11 Thread Jark Wu
建议将完整的代码展示出来,现在的信息不足以分析问题。

On Fri, 11 Dec 2020 at 11:53, jun su  wrote:

> hi Danny,
>  尝试过是一样报错,debug看了下是LogicalWindowAggregateRuleBase在构建window时没有将Expr信息带下去
> , 只带了别名,导致后续优化规则报错退出
>
> Danny Chan  于2020年12月11日周五 上午11:47写道:
>
> > 有木有尝试补充 watermark 语法
> >
> > jun su  于2020年12月11日周五 上午10:47写道:
> >
> > > hi all,
> > >
> > > flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误:
> > >
> > > ddl:
> > >
> > > CREATE TABLE source(
> > > occur_time BIGINT,
> > > rowtime AS longToTimestamp(occur_time)
> > > ) WITH ('connector' = 'filesystem','format' = 'orc','path' =
> > > '/path/to/data')
> > >
> > > 报错信息:
> > >
> > > Caused by: java.lang.IllegalArgumentException: field [$f0] not found;
> > input
> > > fields are: [occur_time]
> > > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
> > > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
> > > at
> > >
> > >
> >
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
> > > ... 27 more
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su
>


Re: Re: Re: retract stream UDAF使用问题

2020-12-09 Thread Jark Wu
你可以把 upsert kafka 想象成是 mysql 表的实时物化视图,
你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。
那么最后的 sum(amount) 结果自然是 200。

如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka
connector,不定义 pk 即可,也就是当成普通 log 处理了。

关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回 previous 值了。

Best,
Jark



On Thu, 10 Dec 2020 at 15:04, bulterman <15618338...@163.com> wrote:

> 假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200
> 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是
> :0, 100, 200
> 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount =
> Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0?
> 也是因为表中仅保留一条Code X的数据的关系吗?
>
>
> 那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code
> X的Amount,期望的输出是:0,100,300...,应该如何实现?
> 求大佬解惑><
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-10 13:47:57,"Jark Wu"  写道:
> >因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
> >下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。
> >
> >Best,
> >Jark
> >
> >On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote:
> >
> >> // kafka table
> >> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
> >>
> >> "Code STRING,\n" +
> >>
> >> "Amount BIGINT,\n" +
> >>
> >> ..
> >>
> >> "PRIMARY KEY (Code) NOT ENFORCED\n" +
> >>
> >> ") WITH (\n" +
> >>
> >> "'connector' = 'upsert-kafka',\n" +
> >>
> >> "'topic' = 'zzz',\n" +
> >>
> >> "'properties.bootstrap.servers' = '10.0.3.20:9092,
> >> 10.0.3.24:9092,10.0.3.26:9092',\n" +
> >>
> >> "'properties.group.id' = 'sqltest46',\n" +
> >>
> >> "'key.format' = 'raw',\n" +
> >>
> >> "'value.format' = 'json'\n" +
> >>
> >> ")");
> >> // 使用UDAF计算
> >> Table table = bsTableEnv.sqlQuery("select
> >> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
> >> GROUP BY Code");
> >> env.toRetractStream(table,Row.class).print();
> >>
> >>
> >> // UDAF的定义如下
> >> public class MainFundFlowFunc extends AggregateFunction AmountAccum> {
> >> @Override
> >> public Row getValue(AmountAccum acc) {
> >> Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
> >> double mfr = acc.lastAmount > 0 ?
> >> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
> >> return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb
> ,acc.mb,
> >> acc.sb,mf,mfr);
> >> }
> >> @Override
> >> public AmountAccum createAccumulator() {
> >> return new AmountAccum();
> >> }
> >>
> >> public void accumulate(AmountAccum acc, Long amount, Double
> askPrice1,
> >> Double bidPrice1, Double last) {
> >> //..
> >>acc.lastAmount = amount;
> >> acc.lastAskPrice1 = askPrice1;
> >> acc.lastBidPrice1 = bidPrice1;
> >> }
> >> public void retract(AmountAccum acc, Long amount, Double askPrice1,
> >> Double bidPrice1, Double last) {
> >>     acc.lastAmount = amount;
> >> acc.lastAskPrice1 = askPrice1;
> >> acc.lastBidPrice1 = bidPrice1;
> >> }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> // acc
> >> public class AmountAccum {
> >> public Double lastAskPrice1;
> >> public Double lastBidPrice1;
> >>
> >> public Long lastAmount = 0L;
> >>
> >> public Long ebs = 0L;
> >>
> >> public Long bs = 0L;
> >>
> >> public Long ms = 0L;
> >>
> >> public Long ss = 0L;
> >>
> >> public Long ebb = 0L;
> >>
> >> public Long bb = 0L;
> >>
> >> public Long mb = 0L;
> >>
> >> public Long sb = 0L;
> >> }
> >>
> >>
> >> debug观察acc的lastAmount值,一直是0.
> >>
> >>
> >> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
> >> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
> >> 是我的使用姿势不对吗= =
> >>
> >> 在 2020-12-10 11:30:31,"Jark Wu"  写道:
> >> >可以发下代码吗?
> >> >
> >> >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:
> >> >
> >> >> 上游是upsert-kafka connector 创建的table,
> >> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> >> >> (为了测试方便,table里只有同一个PK的数据)
> >>
>


Re: Re: retract stream UDAF使用问题

2020-12-09 Thread Jark Wu
因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。

Best,
Jark

On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote:

> // kafka table
> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
>
> "Code STRING,\n" +
>
> "Amount BIGINT,\n" +
>
> ..
>
> "PRIMARY KEY (Code) NOT ENFORCED\n" +
>
> ") WITH (\n" +
>
> "'connector' = 'upsert-kafka',\n" +
>
> "'topic' = 'zzz',\n" +
>
> "'properties.bootstrap.servers' = '10.0.3.20:9092,
> 10.0.3.24:9092,10.0.3.26:9092',\n" +
>
> "'properties.group.id' = 'sqltest46',\n" +
>
> "'key.format' = 'raw',\n" +
>
> "'value.format' = 'json'\n" +
>
> ")");
> // 使用UDAF计算
> Table table = bsTableEnv.sqlQuery("select
> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
> GROUP BY Code");
> env.toRetractStream(table,Row.class).print();
>
>
> // UDAF的定义如下
> public class MainFundFlowFunc extends AggregateFunction {
> @Override
> public Row getValue(AmountAccum acc) {
> Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
> double mfr = acc.lastAmount > 0 ?
> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
> return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,
> acc.sb,mf,mfr);
> }
> @Override
> public AmountAccum createAccumulator() {
> return new AmountAccum();
> }
>
> public void accumulate(AmountAccum acc, Long amount, Double askPrice1,
> Double bidPrice1, Double last) {
> //..
>acc.lastAmount = amount;
> acc.lastAskPrice1 = askPrice1;
> acc.lastBidPrice1 = bidPrice1;
> }
> public void retract(AmountAccum acc, Long amount, Double askPrice1,
> Double bidPrice1, Double last) {
> acc.lastAmount = amount;
> acc.lastAskPrice1 = askPrice1;
> acc.lastBidPrice1 = bidPrice1;
> }
>
> }
>
>
>
>
> // acc
> public class AmountAccum {
> public Double lastAskPrice1;
> public Double lastBidPrice1;
>
> public Long lastAmount = 0L;
>
> public Long ebs = 0L;
>
> public Long bs = 0L;
>
>     public Long ms = 0L;
>
> public Long ss = 0L;
>
> public Long ebb = 0L;
>
> public Long bb = 0L;
>
> public Long mb = 0L;
>
> public Long sb = 0L;
> }
>
>
> debug观察acc的lastAmount值,一直是0.
>
>
> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
> 是我的使用姿势不对吗= =
>
> 在 2020-12-10 11:30:31,"Jark Wu"  写道:
> >可以发下代码吗?
> >
> >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:
> >
> >> 上游是upsert-kafka connector 创建的table,
> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> >> (为了测试方便,table里只有同一个PK的数据)
>


Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

2020-12-09 Thread Jark Wu
Could you use 4 scalar functions instead of UDTF and map function? For
example;

select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits),
hasWatermelon(fruits)
from T;

I think this can preserve the primary key.

Best,
Jark

On Thu, 3 Dec 2020 at 15:28, Rex Fenley  wrote:

> It appears that even when I pass id through the map function and join back
> with the original table, it does not seem to think that the id passed
> through map is a unique key. Is there any way to solve this while still
> preserving the primary key?
>
> On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley  wrote:
>
>> Even odder, if I pull the constructor of the function into its own
>> variable it "works" (though it appears that map only passes through the
>> fields mapped over which means I'll need an additional join, though now I
>> think I'm on the right path).
>>
>> I.e.
>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>   val func = new SplatFruitsFunc()
>>   return table
>> .map(func($"fruits"))
>> .as(
>>   s"${columnPrefix}_has_orange",
>>   s"${columnPrefix}_has_banana",
>>   s"${columnPrefix}_has_apple",
>>   s"${columnPrefix}_has_watermelon"
>>)
>>.renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>> }
>>
>> ends up giving me the following error instead
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Cannot resolve field [fruits], input field
>> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
>> prefix_has_watermelon].
>>
>> which implies I'll need to join back to the original table like I was
>> doing with the leftOuterJoinLateral originally I suppose.
>>
>>
>> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley  wrote:
>>
>>> Looks like `as` needed to move outside of where it was before to fix
>>> that error. Though now I'm receiving
>>> >org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Aliasing more fields than we actually have.
>>>
>>> Example code now:
>>>
>>> // table will always have pk id
>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>> return table
>>>  .map(
>>>new SplatFruitsFunc()(
>>>  $"fruits"
>>>)
>>>  )
>>>  .as(
>>>   s"${columnPrefix}_has_orange",
>>>   s"${columnPrefix}_has_banana",
>>>   s"${columnPrefix}_has_apple",
>>>   s"${columnPrefix}_has_watermelon"
>>>  )
>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>> }
>>>
>>> class SplatFruitsFunc extends ScalarFunction {
>>>   def eval(fruits: Array[String]): Row = {
>>> val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>>> val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>>> val hasApple: java.lang.Boolean = fruits.contains("Apple")
>>> val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>>> Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>   }
>>>
>>>   override def getResultType(signature: Array[Class[_]]):
>>> TypeInformation[_] =
>>> Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
>>> }
>>>
>>> which afaict correctly follows the documentation.
>>>
>>> Anything here stand out?
>>>
>>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley  wrote:
>>>
 So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
 leftOuterJoinLateral to a map and I'm receiving:
 > org.apache.flink.client.program.ProgramInvocationException: The main
 method caused an error: Only a scalar function can be used in the map
 operator.
 which seems odd because documentation says

 > Performs a map operation with a user-defined scalar function or
 built-in scalar function. The output will be flattened if the output type
 is a composite type.


 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

 Shouldn't this work as an alternative?

 On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley  wrote:

> Hello,
>
> I have a TableFunction and wherever it is applied with a
> leftOuterJoinLateral, my table loses any inference of there being a 
> primary
> key. I see this because all subsequent joins end up with "NoUniqueKey" 
> when
> I know a primary key of id should exist.
>
> I'm wondering if this is expected behavior and if it's possible to
> tell a table directly what the primary key should be?
>
>
> To demonstrate my example:
> My table function checks if an element of a certain type is in a
> string array, and depending on whether or not it is there, it appends a
> column with value true or false. For example, if array "fruits" which 
> could
> possibly contain orange, banana, apple, and watermelon on a row contains
> only `["orange", "apple"]` then it will append `has_orange: true,
> has_banana: false, has_apple: true, has_watermelon: false` as columns to
> the row. This example is 

Re: retract stream UDAF使用问题

2020-12-09 Thread Jark Wu
可以发下代码吗?

On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:

> 上游是upsert-kafka connector 创建的table,
> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> (为了测试方便,table里只有同一个PK的数据)


Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 Thread Jark Wu
Hi Jie,

看起来确实是个问题。
sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
可以帮忙创建个 issue 么?

Best,
Jark

On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>是的,感觉你是对的。
>   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> 时候调用format.flush。
>WDYT @Jark @ Leonard
>
> Best,
> Hailong
>
>
> 在 2020-12-09 17:13:14,"jie mei"  写道:
> >Hi, Community
> >
> >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> >
> >我的问题是:是否有办法强制刷新buffer中的数据入库?
> >
> >
> >@Public
> >public interface OutputFormat extends Serializable {
> >
> >   /**
> >* Configures this output format. Since output formats are
> >instantiated generically and hence parameterless,
> >* this method is the place where the output formats set their
> >basic fields based on configuration values.
> >* 
> >* This method is always called first on a newly instantiated output 
> > format.
> >*
> >* @param parameters The configuration with all parameters.
> >*/
> >   void configure(Configuration parameters);
> >
> >   /**
> >* Opens a parallel instance of the output format to store the
> >result of its parallel instance.
> >* 
> >* When this method is called, the output format it guaranteed to
> >be configured.
> >*
> >* @param taskNumber The number of the parallel instance.
> >* @param numTasks The number of parallel tasks.
> >* @throws IOException Thrown, if the output could not be opened
> >due to an I/O problem.
> >*/
> >   void open(int taskNumber, int numTasks) throws IOException;
> >
> >
> >   /**
> >* Adds a record to the output.
> >* 
> >* When this method is called, the output format it guaranteed to be 
> > opened.
> >*
> >* @param record The records to add to the output.
> >* @throws IOException Thrown, if the records could not be added to
> >to an I/O problem.
> >*/
> >   void writeRecord(IT record) throws IOException;
> >
> >   /**
> >* Method that marks the end of the life-cycle of parallel output
> >instance. Should be used to close
> >* channels and streams and release resources.
> >* After this method returns without an error, the output is
> >assumed to be correct.
> >* 
> >* When this method is called, the output format it guaranteed to be 
> > opened.
> >*
> >* @throws IOException Thrown, if the input could not be closed properly.
> >*/
> >   void close() throws IOException;
> >}
> >
> >
> >--
> >
> >*Best Regards*
> >*Jeremy Mei*
>
>
>
>
>
>


Re: 使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 Thread Jark Wu
postgres-cdc 的表只支持读,不支持写。

On Wed, 9 Dec 2020 at 22:49, zhisheng  wrote:

> sql client 也得重启
>
> 王敏超  于2020年12月9日周三 下午4:49写道:
>
> > 在使用standalone模式,并启动sql
> > cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar,
> > 并且重启过集群。同样方式使用mysql cdc是可以的。
> >
> > Caused by: org.apache.flink.table.api.ValidationException: Could not find
> > any factory for identifier 'postgres-cdc' that implements
> > 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the
> > classpath.
> >
> > Available factory identifiers are:
> >
> > blackhole
> > jdbc
> > kafka
> > print
> > ---
> >
> > 所以我是那里没配置对?
> >
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 Thread Jark Wu
链接错了。重发下。

1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
<https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
表。这一行应该执行不成功把。

Best,
Jark

On Thu, 10 Dec 2020 at 11:09, Jark Wu  wrote:

> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
> 表。这一行应该执行不成功把。
>
> Best,
> Jark
>
> On Wed, 9 Dec 2020 at 15:44, Appleyuchi  wrote:
>
>> 代码是:
>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>> 报错:
>> A group window expects a time attribute for grouping in a stream
>> environment.
>> 但是代码的数据源中已经有时间属性了.
>> 请问应该怎么修改代码?
>> 谢谢
>>
>>
>>
>>
>>
>>
>>
>
>


Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 Thread Jark Wu
1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
表。这一行应该执行不成功把。

Best,
Jark

On Wed, 9 Dec 2020 at 15:44, Appleyuchi  wrote:

> 代码是:
> https://paste.ubuntu.com/p/gVGrj2V7ZF/
> 报错:
> A group window expects a time attribute for grouping in a stream
> environment.
> 但是代码的数据源中已经有时间属性了.
> 请问应该怎么修改代码?
> 谢谢
>
>
>
>
>
>
>


Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-09 Thread Jark Wu
嗯 1.12.0 这两天就会发布。

On Wed, 9 Dec 2020 at 14:45, xiao cai  wrote:

> Hi Jark
> sorry,是1.12.0, 我打错了
>
>
>  Original Message
> Sender: Jark Wu
> Recipient: user-zh
> Date: Wednesday, Dec 9, 2020 14:40
> Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
>
>
> Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的?
> 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai <
> flin...@163.com> wrote: > 好的,计划下周升级测试下,另:1.12.1计划何时发布呢 > > > Original
> Message > Sender: Jark Wu > Recipient: user-zh<
> user-zh@flink.apache.org> > Date: Tuesday, Dec 8, 2020 13:41 > Subject:
> Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > hailong 说的定义成 STRING 是在1.12 版本上支持的,
> > https://issues.apache.org/jira/browse/FLINK-18002 1.12 >
> 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc < >
> wxp4...@outlook.com> wrote: > 可以使用字符串的方式,或者自定义 >
> String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from: >
> http://apache-flink.147419.n8.nabble.com/


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 Thread Jark Wu
关于 rocksdb 的性能调优, @Yun Tang  会更清楚。

On Thu, 10 Dec 2020 at 11:04, Jark Wu  wrote:

> 建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。
>
> 你可以参考下这几篇文章尝试调优下 rocksdb:
>
> https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
> https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
> https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
> https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
>
>
> Best,
> Jark
>
> On Wed, 9 Dec 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:
>
>> 场景上:
>>
>>
>> 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
>>
>>
>> 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
>> 目前测试了一版本flink
>>
>> sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。
>>
>>  所以产生以下想法,不知道可不可行?
>>
>>
>> 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
>> 2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
>> 目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 Thread Jark Wu
建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。

你可以参考下这几篇文章尝试调优下 rocksdb:

https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg


Best,
Jark

On Wed, 9 Dec 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:

> 场景上:
>
>
> 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
>
>
> 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
> 目前测试了一版本flink
>
> sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。
>
>  所以产生以下想法,不知道可不可行?
>
>
> 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
> 2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
> 目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 1.11 kafka cdc与holo sink

2020-12-09 Thread Jark Wu
1. 目前不支持。 已有 issue 跟进支持 https://issues.apache.org/jira/browse/FLINK-20385
2. 配上 canal-json.table.include = 't1'   来过滤表。暂不支持正则过滤。
3. 会

Best,
Jark

On Wed, 9 Dec 2020 at 11:33, 于洋 <1045860...@qq.com> wrote:

> flink sql 1.11 创建kafka source 表 ,kafka数据是canal采集的mysql 信息,'format' =
> 'canal-json', 问题是
> 1,在source表中只能有与msyql对应的schema信息么,(也就是data[{}]里面的字段)能不能获取table,ts这种字段的值?
> 2,对于一个topic中有多张mysql binlog信息的表,kafka source表是如何区分的,依赖于schema的不同吗?
> 3,这种source表,与holo sink 表结合使用,遇到delete类型的数据会在holo中删除该条数据吗?'ignoreDelete' =
> 'false'


Re: flink11 SQL 如何支持双引号字符串

2020-12-09 Thread Jark Wu
跟这个 issue 没有关系。

这个听起来更像是 hive query 兼容的需求? 可以关注下 FLIP-152: Hive Query Syntax Compatibility

https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility

Best,
Jark

On Wed, 9 Dec 2020 at 11:13, zhisheng  wrote:

> 是跟这个 Issue 有关吗?https://issues.apache.org/jira/browse/FLINK-20537
>
> 赵一旦  于2020年12月9日周三 上午10:17写道:
>
> > MARK,学习下。等回复。
> >
> > 莫失莫忘  于2020年12月8日周二 下午6:49写道:
> >
> > > 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中
> > > 字符串都是用双引号表示,例如select * from table1 where column1 =
> > > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。
> > > ps:我看到flink SQL中字符串都必须用 单引号,例如select * from table1 where column1
> =
> > > 'word' 。如何使 字符串 既可以是单引号 也可以是 双引号呢
> >
>


Re: 关于flink sql往postgres写数据遇到的timestamp问题

2020-12-09 Thread Jark Wu
看报错信息,你并没有 insert into postgres,而是只是 select 了 query,这个会将运行结果显式在 sql client
界面上,而不会插入到 postgres 中。

你的 query 中有 timestamp with local time zone, 而 sql client 的 query运行结果的显式
还不支持这个类型。

这个问题的解决可以关注下这个 issue:https://issues.apache.org/jira/browse/FLINK-17948

Best,
Jark

On Tue, 8 Dec 2020 at 19:32, 李轲  wrote:

> 报错信息:
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.api.TableException: Unsupported
> conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL'
> (conversion class: java.time.Instant) to type information. Only data types
> that originated from type information fully support a reverse conversion.
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)
> at
> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
> at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)
> at
> org.apache.flink.table.client.gateway.local.result.CollectStreamResult.(CollectStreamResult.java:71)
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:101)
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:129)
> at
> org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-08 19:24:43,"李轲"  写道:
> >项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法
> >在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义
> >select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE);
> >有没有什么转换方法?或者只插入部分数据的方法?
>


Re: 求助如何用flink1.11.2 on yarn集成CDH的hbase2.0版本

2020-12-09 Thread Jark Wu
1. 提示“找不到hbase包” 具体的异常栈是什么呢?
2. 看你的步骤中也没有加 flink hbase connector jar 到 lib 下,这会导致找不到 hbase table factory
3. flink 1.11 版本的时候还没有提供 hbase 2.x  connector jar
4. flink 1.12 版本支持了 hbase 2.x,理论上也兼容 flink 1.11 集群。


所以你可以试下 download
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2_2.11/1.12.0/flink-sql-connector-hbase-2.2_2.11-1.12.0.jar
这个 jar 到 flink/lib 下(这个 jar 已经 shade 了 hbase jar),然后用
HADOOP_CLASSPATH=`hadoop classpath`集成hadoop,应该就能 work。具体可以参考下 1.12 的文档 [1]。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hbase.html





On Tue, 8 Dec 2020 at 17:40, site  wrote:

> 根据官方提供的方法,用HADOOP_CLASSPATH=`hadoop classpath`集成hadoop成功。
> 因为flink on yarn是用的cdh6集群,所以我想利用现有的classpath中的包含的hbase库,使用
>
> export
> HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*:$HADOOP_CLASSPATH
>
>
> 然后用yarn-session创建一个flink运行环境,再用sql-client连接这个容器,创建hbase映射表,这种用法失败:分析提示是找不到hbase包。
>
>
>
>
> ./bin/yarn-session.sh -d -s 4 -nm common-flink -jm 1024m -tm 4096m
>
> ./bin/sql-client.sh embedded -e conf/sql-env.yaml
>
>
>
>
> sql-env.yaml
>
> configuration:
>
>   execution.target: yarn-session
>
>
>
>
> 再将hbase包复制到flink_home/lib这种方式,结果一下就掉到了深深的坑里:
>
> 尝试1.ClassNotFoundException: org.apache.hadoop.hbase.client.HTable
>
> 尝试2.ClassNotFoundException:
> org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService
>
> 尝试3.ClassNotFoundException:
> org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$BlockingInterface
>
> 尝试4.复制hbase-shaded-client-2.1.0-cdh6.3.0.jar到lib,类冲突整个yarn-session都无法启动容器
>
> 尝试5\6\7.同3
>
> 尝试8\9.ClassNotFoundException:
> org.apache.hbase.thirdparty.com.google.protobuf.RpcController
>
> 尝试9.ClassNotFoundException:
> org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup
>
> 尝试10.NoSuchMethodError:
> org.apache.hadoop.hbase.client.HTable.getTableName()[B
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:272)
>
>
>
>
> 直到尝试9通过复制jar包到lib下都解决了,现在lib包下的hbase依赖包有:
>
> hbase-client-2.1.0-cdh6.3.0.jar
>
> hbase-common-2.1.0-cdh6.3.0.jar
>
> hbase-protocol-2.1.0-cdh6.3.0.jar
>
> hbase-protocol-shaded-2.1.0-cdh6.3.0.jar
>
> hbase-shaded-miscellaneous-2.2.1.jar
>
> hbase-shaded-netty-2.2.1.jar
>
> hbase-shaded-protobuf-2.2.1.jar
>
>
>
>
> 直到尝试10时解决方法除了修改源代码,想问还有什么解决方法没有?或者有什么好的方法集成hbase?


Re: [flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

2020-12-09 Thread Jark Wu
> tabEnv.createTemporaryView("test_table", result,

我看你这不是注册进去了么? 有报什么错么?

最后提交作业执行记得调用 StreamExecutionEnvironment.execute()

Best,
Jark

On Tue, 8 Dec 2020 at 14:54, Tianwang Li  wrote:

> Flink版本:1.10.2
>
> 使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。
>
> 本地测试的结果是一直重复输出数据。
>
> 请问一下DataStream 处理之后,怎么才能注册为 Table。
>
> ---
> 代码如下:
>
> // 异步redis处理
> RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node,
> aggProcessorArgs);
>
> // 获取异步处理流
> DataStream result = AsyncDataStream.orderedWait(
> dataStream,
> asyncFunction,
> 60L,
> TimeUnit.SECONDS,
> 100).returns(outRowTypeInfo);
>
> // 注册为临时 table
> tabEnv.createTemporaryView("test_table", result,
> outRowFields.stream().collect(Collectors.joining(",")));
>
> //
> result.print("out_table>>");
>
> Table test_table = tabEnv.sqlQuery("select * from test_table");
> // 查询临时table
> tabEnv.toAppendStream(test_table, Row.class).print("test_table");
>
>
>
> --
> **
>  tili
> **
>


Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 Thread Jark Wu
Hi 赵一旦,

这部分 jackson 组件已经自动处理了这部分逻辑。

Hi xiaocai,

 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。

Best,
Jark


On Wed, 9 Dec 2020 at 14:34, xiao cai  wrote:

> 好的,计划下周升级测试下,另:1.12.1计划何时发布呢
>
>
>  Original Message
> Sender: Jark Wu
> Recipient: user-zh
> Date: Tuesday, Dec 8, 2020 13:41
> Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
>
>
> hailong 说的定义成 STRING 是在1.12 版本上支持的,
> https://issues.apache.org/jira/browse/FLINK-18002 1.12
> 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc <
> wxp4...@outlook.com> wrote: > 可以使用字符串的方式,或者自定义
> String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from:
> http://apache-flink.147419.n8.nabble.com/


Re: Error while connecting with MSSQL server

2020-12-07 Thread Jark Wu
Hi,

Currently, flink-connector-jdbc doesn't support MS Server dialect. Only
MySQL and Postgres are supported.

Best,
Jark

On Tue, 8 Dec 2020 at 01:20, aj  wrote:

> Hello ,
>
> I am trying to create a table with microsoft sql server  using flink sql
>
> CREATE TABLE sampleSQLSink (
> id INTEGER
> message STRING,
> ts TIMESTAMP(3),
> proctime AS PROCTIME()
> ) WITH (
> 'connector' = 'jdbc',
> 'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
> 'url' = 'jdbc:sqlserver://samplecustsql.database.windows.net:1433
> ;database=customerdb',
> 'username'=
> 'password'=
> 'table-name' =
> );
>
>
> select * from sampleSQLSink
>
>
> I am getting this error
>
> ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Cannot handle such jdbc url:
> jdbc:sqlserver://samplecustsql.database.windows.net:1433
> ;database=customerdb
>
>
> Can somedoby help what is wrong.
>
> I am using microsoft jdbc driver.
>


Re: 回复: flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交

2020-12-07 Thread Jark Wu
数据库两阶段提交,保证 exactly once 语义,社区正在支持,感兴趣的可以在
https://issues.apache.org/jira/browse/FLINK-15578 下面讨论。

Best,
Jark

On Tue, 8 Dec 2020 at 09:14, hdxg1101300...@163.com 
wrote:

>
>
>
>
> hdxg1101300...@163.com
>
> 发件人: hdxg1101300...@163.com
> 发送时间: 2020-12-07 18:40
> 收件人: user-zh
> 主题: 回复: Re: flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交
> 你的意思是 自己实现sink 提交的过程中抛出所有异常并且rollback,是可以做到端对端精确一次的;我个人认为这样可以
> 想和别人交流一下。奈何没有人,就想在社区里问问
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-12-07 17:00
> 收件人: user-zh
> 主题: Re: flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交
> Hi,
>
> > 在 2020年12月7日,16:46,hdxg1101300...@163.com 写道:
> >
> >flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交;
> >自己实现sink开启数据库事务,遇到错误回滚并抛出异常,是否可以实现数据精确一次
>
> Flink
> 写入关系型数据库是可以做到端到端的一致性的,默认是不支持的,需要实现两阶段提交,按照你的思路是可行的。另外社区也有人在做这个feature[1],已经有PR了,你可以参考,预计会在1.13里支持。
>
> 祝好,
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-15578 <
> https://issues.apache.org/jira/browse/FLINK-15578>
>


Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-07 Thread Jark Wu
hailong 说的定义成 STRING 是在1.12 版本上支持的,
https://issues.apache.org/jira/browse/FLINK-18002

1.12 这两天就会发布,如果能升级的话,可以尝试一下。

Best,
Jark

On Tue, 8 Dec 2020 at 11:56, wxpcc  wrote:

> 可以使用字符串的方式,或者自定义 String类型format,内部结构再通过udf去做后续的实现
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 使用RedisSink无法将读取的Kafka数据写入Redis中

2020-12-06 Thread Jark Wu
这个估计和网络和部署有关,建议咨询下华为云的技术支持。

On Sun, 6 Dec 2020 at 20:40, 赵一旦  wrote:

> 连接不上,你的华为云确认和redis服务器连通吗?
>
> 追梦的废柴  于2020年12月6日周日 下午8:35写道:
>
> > 各位:
> > 晚上好!
> > 现在我所在的项目组在调研Flink框架,有一个指标需要读取Kafka中的数据然后使用Redis存储最终的结果。
> >
> >
> 我们在pom文件中引入了flink-redis的connector,然后按照官方的RedisSink案例,在本地开发的时候可以正常写入到某台服务器上的Redis中,
> > 但是当我把程序打成Jar包之后,部署到服务器(华为云MRS)上使用flink
> > run提交到yarn之后总是在报错,无法写入到Redis中,各位知道是为什么吗?
> > 问题已经卡了我两天了,一点进展都没有,有劳各位帮忙解答一下,Thank you!
> > 报错如下:
> > redis.client.jedis.exceptions.JedisConnectionException:Could not get a
> > resource from the pool at .
> >
> >
>


Re: 动态表 Change Log 格式

2020-12-04 Thread Jark Wu
是完整的记录。

upsert kafka 就是这样子实现的,只存储最新镜像。
但是有的 query 是会产生 delete 消息的,所以有时候还是需要存下 delete,像 upsert kafka 里就存成了kafka 的
tombstone 消息。

Best,
Jark

On Fri, 4 Dec 2020 at 17:00, jie mei  wrote:

> Hi, Community
>
> Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER,
> DELETE).
> 其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗?
> 我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。
> 此外,Delete语句对应的数据是完整记录还是操作日志呢?
>
> 这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。
> 并通过额外的逻辑判断来获得最新的数据是可行的。
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>


Re: 为什么要关闭calcite的隐式转换功能

2020-12-04 Thread Jark Wu
社区已经开始 Hive query 语法兼容的设计讨论,可以关注下:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html

Best,
Jark

On Fri, 4 Dec 2020 at 15:37, stgztsw  wrote:

> 我觉得既然社区准备兼容hive,隐式转换和其他hive的语法兼容还是必须的。实际生产环境里运行的hive
> sql往往都是很复杂的,目前按flink对于hive的兼容程度,大部分的hivesql基本都无法运行成功。(其他欠缺的还有不支持bangEquel,
> create table as 等等,这边就不一一列举了),希望社区能够对hive这块支持的更完善一点。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 生产hive sql 迁移flink 11 引擎,碰到的问题

2020-12-04 Thread Jark Wu
Hi,

Flink SQL 1.11 暂时还不兼容 Hive SQL 语法。这个功能的设计,最近才在社区中讨论,预计1.13中支持。可以关注下这个
design 的讨论:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html


Best,
Jark

On Fri, 4 Dec 2020 at 11:45, 莫失莫忘  wrote:

> 最近尝试把一个生产 hive sql 任务,执行引擎切换成 flink 1.11.2 ,发现flink 11 对hive
> SQL的支持有下列问题1、不支持 双引号 表示字符串
> 2、不支持 != 表示不等运算
> 3、不支持 类型隐式转换
> 4、不支持 split 函数
> 5、hive 不区分大小写,flink区分大小写
> 6、join右表 不支持是一个子查询(Calcite bug
> https://issues.apache.org/jira/browse/CALCITE-2152)
> 7、不支持 create table table1 as select * from pokes; 中的 as
>
>
>
> 暂时只测到这些问题。总体感觉flink11 对 hive SQL的语句支持还不够,无法把已有离线 hive sql 任务直接 切换到flink 引擎。


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-04 Thread Jark Wu
这个听起来不太合理。总得报个什么错 作业再失败吧。 或者TaskManager 的日志中有没有什么异常信息?

On Fri, 4 Dec 2020 at 09:23, chenjb  wrote:

>
> 谢谢老哥关注,我是int对应成bigint了,原以为bigint范围更大应该可以兼容,没认真看文档,然后mysql有错误日志但程序没报错,注意力就放mysql那边了。这两天试了下flink-cdc-sql发现有些有错误的场景不报错,比如连不上mysql(密码错误)也是程序直接退出exit
> 0,没任何报错或提示,是还要设置什么吗?我是java小白,这块不是很懂
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Partitioned tables in SQL client configuration.

2020-12-03 Thread Jark Wu
Only legacy connectors (`connector.type=kafka` instead of
`connector=kafka`) are supported in the YAML at the moment. You can use
regular DDL instead. There is a similar discussion in
https://issues.apache.org/jira/browse/FLINK-20260 these days.

Best,
Jark

On Thu, 3 Dec 2020 at 00:52, Till Rohrmann  wrote:

> Hi Maciek,
>
> I am pulling in Timo who might help you with this problem.
>
> Cheers,
> Till
>
> On Tue, Dec 1, 2020 at 6:51 PM Maciek Próchniak  wrote:
>
>> Hello,
>>
>> I try to configure SQL Client to query partitioned ORC data on local
>> filesystem. I have directory structure like that:
>>
>> /tmp/table1/startdate=2020-11-28
>>
>> /tmp/table1/startdate=2020-11-27
>>
>> etc.
>>
>>
>> If I run SQL Client session and create table by hand:
>>
>> create table tst (column1 string, startdate string) partitioned by
>> (startdate) with ('connector'='filesystem', 'format'='orc',
>> 'path'='/tmp/table1');
>>
>> everything runs fine:
>>
>> explain select * from tst where startdate='2020-11-27'
>>
>> shows that only one partition in 'readPartitions'
>>
>>
>> However, I struggle to configure table in .yaml config.
>>
>> I tried like this (after some struggle, as "partition.keys" setting
>> doesn't seem to be documented...) :
>>
>> tables:
>>- name: tst2
>>  type: source-table
>>  connector: filesystem
>>  path: "/tmp/table1"
>>  format: orc
>>  partition.keys:
>>- name: startdate
>>  schema:
>>- name: column1
>>  data-type: string
>>- name: startdate
>>  data-type: string
>>
>> and it more or less works - queries are executed properly. However,
>> partitions are not pruned:
>>
>> explain select * from tst2 where startdate='2020-11-27'
>>
>> show all partitions in 'readPartitions'
>>
>>
>> Any idea what can be wrong? I'm using Flink 1.11.2
>>
>>
>> thanks,
>>
>> maciek
>>
>>
>>


Re: flink sql实时计算分位数如何实现

2020-12-03 Thread Jark Wu
可以看下UDAF的文档:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregate-functions




On Thu, 3 Dec 2020 at 12:06, 爱成绕指柔 <1194803...@qq.com> wrote:

> 你好:
>   目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。
>   期待你的答复,谢谢!


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 Thread Jark Wu
可以尝试下 Zeppelin, 与 flink sql 的集成做的挺好的。

Best,
Jark

On Thu, 3 Dec 2020 at 21:55, yinghua...@163.com  wrote:

> 这个我没说清楚,就是flink sql客户端,我们想弄个客户端给公司其他部门使用,那些部门同事只会一些sql,对于代码编程欠缺
>
> > 在 2020年12月3日,21:52,Shawn Huang  写道:
> >
> > 你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。
> >
> > Best,
> > Shawn Huang
> >
> >
> > yinghua...@163.com  于2020年12月3日周四 下午8:46写道:
> >
> >>
>


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-03 Thread Jark Wu
是不是 unsigned int 惹的祸...

On Thu, 3 Dec 2020 at 15:15, chenjb  wrote:

> 破案了,字段类型没按官网的要求对应起来,对应起来后正常了
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL共享source 问题

2020-12-03 Thread Jark Wu
1. 是不是共享了 source,看下 web ui 中的拓扑图就知道了
2. 追数据的时候,或者下游消费速度不一的时候,分区之间消费不均衡是很正常的。
3. 你可以调大 sink 的并发,以及增加 buffer size 来缓解这个问题。

Best,
Jark

On Wed, 2 Dec 2020 at 19:22, zz  wrote:

> hi各位:
> 目前我有一个任务,source table是读取一个topic生成的,但是有6个sink,使用了多条insert
> 语句输出到同一张mysql表中,按照我的理解,这些insert语句
> 应该都是共享这个source table的,读取kafka只需要读取一次,但是在运行过程中发现kafka
> topic有的分区消费的很快有的分区很慢,请问一下可能是什么原因呢?
> topic一共是18个分区,任务是18个并行度


Re: I defined a Kafka dynamic table in SQL-Client, but the kafka theme had some elements in the wrong format, so an exception was thrown in SQL-Client. Can we define the Kafka dynamic table with some

2020-12-03 Thread Jark Wu
我觉得这应该是个 bug,已创建 issue: https://issues.apache.org/jira/browse/FLINK-20470

On Wed, 2 Dec 2020 at 18:02, mr.meng...@ouglook.com 
wrote:

> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/QQ%E6%88%AA%E5%9B%BE111.jpg>
>
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/%E6%97%A0%E6%A0%87%E9%A2%981211.png
> >
> Caused by: java.io.IOException: Failed to deserialize JSON ''.
> at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
> ~[flink-json-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
> ~[flink-json-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> Caused by: java.lang.ClassCastException:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode
> cannot be cast to
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 为什么要关闭calcite的隐式转换功能

2020-12-03 Thread Jark Wu
隐式转换功能,是一个非常重要的 public API ,需要经过社区仔细地讨论,例如哪些类型之间可以类型转换。
目前社区还没有规划这个功能,如果需要的话,可以在社区中开个 issue。

Best,
Jark

On Wed, 2 Dec 2020 at 18:33, stgztsw  wrote:

> 目前flink sql,flink hive
> sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗?


Re: flink sql 1.11.1 貌似出现bug

2020-12-03 Thread Jark Wu
看样子是提交作业超时失败了,请确认
1. flink cluster 已经起来了
2. sql client 的环境与 flink cluster 环境连通
3. sql-client-defaults.yaml 中配置了正确的 gateway-address 地址 (如果是本地 cluster,则不用配置)

Best,
Jark

On Wed, 2 Dec 2020 at 14:12, zzy  wrote:

> 遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql
>
>
> sql语句如下:
> CREATE TABLE sls_log_sz_itsp (
>   request STRING,
>   http_bundleId STRING,
>   upstream_addr STRING,
>   http_appid STRING,
>   bodyUserId STRING,
>   http_sequence STRING,
>   http_version STRING,
>   response_body STRING,
>   uri STRING,
>   bytes_sent STRING,
>   http_userId STRING,
>   http_cityId STRING,
>   http_user_agent STRING,
>   http_deviceType STRING,
>   record_time STRING,
>   rt AS TO_TIMESTAMP(DATE_FORMAT(record_time,'-MM-dd HH:mm:ss')),
>   WATERMARK FOR rt AS rt - INTERVAL '5' SECOND,
>   request_time STRING,
>   request_body STRING,
>   request_length STRING,
>   nginx_id STRING,
>   proxy_add_x_forwarded_for STRING,
>   http_deviceId STRING,
>   host STRING,
>   upstream_response_time STRING,
>   status STRING
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = '0.11',
>  'connector.topic' = 'sls',
>  'connector.properties.zookeeper.connect' =
> 'hadoop85:2181,hadoop86:2181,hadoop87:2181',
>  'connector.properties.bootstrap.servers' =
> 'hadoop85:9092,hadoop86:9092,hadoop87:9092',
>  'connector.properties.group.id' = 'log-sz-itsp',
>  'connector.startup-mode' = 'latest-offset',
>  'format.type' = 'json'
> );
>
>
>
>  CREATE TABLE sz_itsp_test(
> request STRING,
> request_count BIGINT NOT NULL,
> window_end TIMESTAMP(3)
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' =
> 'jdbc:mysql://hadoop85:3306/test?useSSL=false=true',
> 'connector.table' = 'sz_itsp_test',
> 'connector.driver' = 'com.mysql.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = '00',
> 'connector.write.flush.max-rows' = '1',
> 'connector.write.flush.interval' = '2s',
> 'connector.write.max-retries' = '3'
> );
>
>
> INSERT INTO sz_itsp_test
> SELECT
>request,
>count(request) request_count,
>TUMBLE_END(rt, INTERVAL '5' MINUTE) AS window_end
>  FROM sls_log_sz_itsp
>  WHERE nginx_id = 'sz-itsp' AND nginx_id IS NOT NULL
>  GROUP BY TUMBLE(rt, INTERVAL '5' MINUTE), request
>  ;
>
>
> sql client使用中出现如下报错:
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: java.lang.RuntimeException: Error running SQL job.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:608)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:529)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299)
> at java.util.Optional.ifPresent(Optional.java:159)
> at
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
> at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:605)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> 

Re: Flink TableAPI Issue: cannot assign instance of org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap to field

2020-12-03 Thread Jark Wu
检查下提交作业的 flink 版本,和 yarn 集群上部署的 flink 版本是否一致。
或者可能是你集群中有两个不同版本的 flink-shaded-jackson 包。

On Wed, 2 Dec 2020 at 11:55, Zed  wrote:

> When I submitted a flink-table-sql job to yarn, the following exception
> came
> out. Wondering how to solve it. Anyone can help me with that? Appreciate
> it
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap
> to field
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
> of type java.util.concurrent.ConcurrentHashMap in instance of
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache
> at
>
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
>
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
> 

  1   2   3   4   5   6   7   >