[ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Qingsheng Ren
The Apache Flink community is very happy to announce the release of
Apache Flink CDC 3.1.0.

Apache Flink CDC is a distributed data integration tool for real time
data and batch data, bringing the simplicity and elegance of data
integration via YAML to describe the data movement and transformation
in a data pipeline.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink CDC can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20cdc

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Qingsheng Ren


[ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 Thread Qingsheng Ren
The Apache Flink community is very happy to announce the release of
Apache Flink CDC 3.1.0.

Apache Flink CDC is a distributed data integration tool for real time
data and batch data, bringing the simplicity and elegance of data
integration via YAML to describe the data movement and transformation
in a data pipeline.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink CDC can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20cdc

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Qingsheng Ren


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Qingsheng Ren
Congratulations and big THANK YOU to everyone helping with this release!

Best,
Qingsheng

On Fri, Oct 27, 2023 at 10:18 AM Benchao Li  wrote:

> Great work, thanks everyone involved!
>
> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
> >
> > Thanks for the great work!
> >
> > Best,
> > Rui
> >
> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam  wrote:
> >
> > > Finally! Thanks to all!
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 2023年10月27日 03:58,Alexander Fedulov 
> 写道:
> > > >
> > > > Great work, thanks everyone!
> > > >
> > > > Best,
> > > > Alexander
> > > >
> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser <
> martijnvis...@apache.org>
> > > > wrote:
> > > >
> > > >> Thank you all who have contributed!
> > > >>
> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin 
> > > >>
> > > >>> Thanks for the great work! Congratulations
> > > >>>
> > > >>>
> > > >>> Best,
> > > >>> Feng Jin
> > > >>>
> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu 
> wrote:
> > > >>>
> > >  Congratulations, Well done!
> > > 
> > >  Best,
> > >  Leonard
> > > 
> > >  On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee <
> lincoln.8...@gmail.com>
> > >  wrote:
> > > 
> > > > Thanks for the great work! Congrats all!
> > > >
> > > > Best,
> > > > Lincoln Lee
> > > >
> > > >
> > > > Jing Ge  于2023年10月27日周五 00:16写道:
> > > >
> > > >> The Apache Flink community is very happy to announce the
> release of
> > > > Apache
> > > >> Flink 1.18.0, which is the first release for the Apache Flink
> 1.18
> > > > series.
> > > >>
> > > >> Apache Flink® is an open-source unified stream and batch data
> > >  processing
> > > >> framework for distributed, high-performing, always-available,
> and
> > > > accurate
> > > >> data applications.
> > > >>
> > > >> The release is available for download at:
> > > >> https://flink.apache.org/downloads.html
> > > >>
> > > >> Please check out the release blog post for an overview of the
> > > > improvements
> > > >> for this release:
> > > >>
> > > >>
> > > >
> > > 
> > > >>>
> > > >>
> > >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > > >>
> > > >> The full release notes are available in Jira:
> > > >>
> > > >>
> > > >
> > > 
> > > >>>
> > > >>
> > >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > > >>
> > > >> We would like to thank all contributors of the Apache Flink
> > > >> community
> > >  who
> > > >> made this release possible!
> > > >>
> > > >> Best regards,
> > > >> Konstantin, Qingsheng, Sergey, and Jing
> > > >>
> > > >
> > > 
> > > >>>
> > > >>
> > >
> > >
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Qingsheng Ren
Congratulations and big THANK YOU to everyone helping with this release!

Best,
Qingsheng

On Fri, Oct 27, 2023 at 10:18 AM Benchao Li  wrote:

> Great work, thanks everyone involved!
>
> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
> >
> > Thanks for the great work!
> >
> > Best,
> > Rui
> >
> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam  wrote:
> >
> > > Finally! Thanks to all!
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 2023年10月27日 03:58,Alexander Fedulov 
> 写道:
> > > >
> > > > Great work, thanks everyone!
> > > >
> > > > Best,
> > > > Alexander
> > > >
> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser <
> martijnvis...@apache.org>
> > > > wrote:
> > > >
> > > >> Thank you all who have contributed!
> > > >>
> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin 
> > > >>
> > > >>> Thanks for the great work! Congratulations
> > > >>>
> > > >>>
> > > >>> Best,
> > > >>> Feng Jin
> > > >>>
> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu 
> wrote:
> > > >>>
> > >  Congratulations, Well done!
> > > 
> > >  Best,
> > >  Leonard
> > > 
> > >  On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee <
> lincoln.8...@gmail.com>
> > >  wrote:
> > > 
> > > > Thanks for the great work! Congrats all!
> > > >
> > > > Best,
> > > > Lincoln Lee
> > > >
> > > >
> > > > Jing Ge  于2023年10月27日周五 00:16写道:
> > > >
> > > >> The Apache Flink community is very happy to announce the
> release of
> > > > Apache
> > > >> Flink 1.18.0, which is the first release for the Apache Flink
> 1.18
> > > > series.
> > > >>
> > > >> Apache Flink® is an open-source unified stream and batch data
> > >  processing
> > > >> framework for distributed, high-performing, always-available,
> and
> > > > accurate
> > > >> data applications.
> > > >>
> > > >> The release is available for download at:
> > > >> https://flink.apache.org/downloads.html
> > > >>
> > > >> Please check out the release blog post for an overview of the
> > > > improvements
> > > >> for this release:
> > > >>
> > > >>
> > > >
> > > 
> > > >>>
> > > >>
> > >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > > >>
> > > >> The full release notes are available in Jira:
> > > >>
> > > >>
> > > >
> > > 
> > > >>>
> > > >>
> > >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > > >>
> > > >> We would like to thank all contributors of the Apache Flink
> > > >> community
> > >  who
> > > >> made this release possible!
> > > >>
> > > >> Best regards,
> > > >> Konstantin, Qingsheng, Sergey, and Jing
> > > >>
> > > >
> > > 
> > > >>>
> > > >>
> > >
> > >
>
>
>
> --
>
> Best,
> Benchao Li
>


[SUMMARY] Flink 1.18 Release Sync 05/30/2023

2023-05-30 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights from the release sync of 1.18 on May 30.

1. @developers please update the progress of your features on 1.18 release
wiki page [1] ! That will help us a lot to have an overview of the entire
release cycle.

2. We found a JIRA issue (FLINK-18356) [2] that doesn't have an assignee,
which is a CI instability of the flink-table-planner module. It'll be nice
if someone in the community could pick it up and make some investigations
:-)

There are 6 weeks before the feature freeze date (Jul 11). The next release
sync will be on Jun 13, 2023. Welcome to join us [3]!

[1] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release
[2] https://issues.apache.org/jira/browse/FLINK-18356
[3] Zoom meeting:
https://us04web.zoom.us/j/79158702091?pwd=8CXPqxMzbabWkma5b0qFXI1IcLbxBh.1

Best regards,
Jing, Konstantin, Sergey and Qingsheng


[ANNOUNCE] Starting with Flink 1.18 Release Sync

2023-04-03 Thread Qingsheng Ren
Hi everyone,

As a fresh start of the Flink release 1.18, I'm happy to share with you
that the first release sync meeting of 1.18 will happen tomorrow on
Tuesday, April 4th at 10am (UTC+2) / 4pm (UTC+8). Welcome and feel free to
join us and share your ideas about the new release cycle!

Details of joining the release sync can be found in the 1.18 release wiki
page [1].

All contributors are invited to update the same wiki page [1] and include
features targeting the 1.18 release.

Looking forward to seeing you all in the meeting!

[1] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release

Best regards,
Jing, Konstantin, Sergey and Qingsheng


[ANNOUNCE] Starting with Flink 1.18 Release Sync

2023-04-03 Thread Qingsheng Ren
Hi everyone,

As a fresh start of the Flink release 1.18, I'm happy to share with you
that the first release sync meeting of 1.18 will happen tomorrow on
Tuesday, April 4th at 10am (UTC+2) / 4pm (UTC+8). Welcome and feel free to
join us and share your ideas about the new release cycle!

Details of joining the release sync can be found in the 1.18 release wiki
page [1].

All contributors are invited to update the same wiki page [1] and include
features targeting the 1.18 release.

Looking forward to seeing you all in the meeting!

[1] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release

Best regards,
Jing, Konstantin, Sergey and Qingsheng


Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-24 Thread Qingsheng Ren
I'd like to say thank you to all contributors of Flink 1.17. Your support
and great work together make this giant step forward!

Also like Matthias mentioned, feel free to leave us any suggestions and
let's improve the releasing procedure together.

Cheers,
Qingsheng

On Fri, Mar 24, 2023 at 5:00 PM Etienne Chauchot 
wrote:

> Congrats to all the people involved!
>
> Best
>
> Etienne
>
> Le 23/03/2023 à 10:19, Leonard Xu a écrit :
> > The Apache Flink community is very happy to announce the release of
> Apache Flink 1.17.0, which is the first release for the Apache Flink 1.17
> series.
> >
> > Apache Flink® is an open-source unified stream and batch data processing
> framework for distributed, high-performing, always-available, and accurate
> data applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements for this release:
> >
> https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/
> >
> > The full release notes are available in Jira:
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351585
> >
> > We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> >
> > Best regards,
> > Qingsheng, Martijn, Matthias and Leonard
>


[SUMMARY] Flink 1.17 Release Sync 2/28/2023

2023-02-28 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights from Flink 1.17 release sync on
2/28/2023.

Release testing:
- All release testing tasks have finished in the last week. Big thanks to
our contributors and volunteers for the effort on this!

1.17 Blockers:
There are 2 blockers currently: FLINK-31092 / FLINK-31104. We hope that
these blockers can be addressed in the next day so we can create the RC
later this week.

CI Instabilities & other issues:
There are two CI instability issues: FLINK-31134 / FLINK-18356.

Release management:
- Release announcement is under review by release managers.
- Tasks related to 1.17 release management are tracked in FLINK-31146.

The next release meeting will be on Mar 7, 2023. Feel free to join us!

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126

Best regards,
Qingsheng, Matthias, Martijn and Leonard


[SUMMARY] Flink 1.17 Release Sync 2/14/2023

2023-02-14 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights from Flink 1.17 release sync on
2/14/2023.

Release testing:

- The original deadline of cross-team testing is Feb 21, 2023 (next
Tuesday). We will monitor the status throughout the week and hopefully
conclude everything before the deadline.
- Please change the status icon to “smiling face” in the release wiki page
if the testing has been completed.
- We always look for volunteers: picking up a cross-team testing task is
much appreciated.

Release management:

- We’ll create a JIRA issue for release management tasks (such as creating
RCs, release notes, announcements), in order to improve review-ability.
(Thanks @ for the proposal!)

Test instabilities:

- FLINK-31036 was prioritized as a blocker and is being investigated now.

The next release meeting will be on Feb 21, 2023. Feel free to join us!

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126

Best regards,
Leonard, Matthias, Martijn and Qingsheng


[SUMMARY] Flink 1.17 Release Sync 1/17/2023

2023-01-17 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights from the release sync on 1/17/2023.

- CI & Performance: totally 5 blocker issues. Owners should have been
pinged.

- FLIP-272 [1] has finished and there will be a blog post before 1.17
release.

- PR about publishing SBOM [2] has been merged, and also should be added in
other repos like connectors and flink-ml

There are 2 weeks before the feature freeze on Jan 31, 2023. The next
release meeting will be on Jan 24, 2023. Feel free to join us!

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-272%3A+Generalized+delegation+token+support
[2] https://github.com/apache/flink/pull/21606

Best Regards,
Martijn, Matthias, Leonard and Qingsheng


[SUMMARY] Flink 1.17 Release Sync 1/10/2023

2023-01-10 Thread Qingsheng Ren
 Hi devs and users,

I'd like to share some highlights from the release sync on 1/10/2023.

- CI stabilities: owners of blocker issues should have been pinged offline.
- Priorities of the test instabilities: test instabilities are prioritized
as Critical and become blocker as soon as we notice that they are newly
introduced.
- Feature freeze has been extended until *Jan 31, 2023*
- Release sync switches to weekly from now as we are approaching the
feature freeze date. Welcome to join us!
- @Developers please share in the Slack dev channel if your FLIP needs more
people to look into it. It’ll be nice to close all FLIPs within this week
considering the timeline.

There are 3 weeks before the feature freeze on Jan 31, 2023. The next
release meeting will be on Jan 17, 2023. Feel free to join us if you are
interested!

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126

Wish you all a happy new year :-)

Best Regards,
Qingsheng, Leonard, Matthias and Martijn


[SUMMARY] Flink 1.17 Release Sync 11/29/2022

2022-11-29 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights from the release sync on 11/29/2022.

1. @Contributors please update your progress on the release 1.17 wiki page
[1] before the sync meeting so that everyone could track it.

2. We have new CI stability tickets and owners should have been pinged on
JIRA or offline. Please take a look at your inbox.

3. Externalizing connectors is progressing well. We could expect most
connectors being externalized in the 1.17 release cycle.

4. It’ll be great to have some input for monitoring the performance test.
Currently there’s a slack channel reporting benchmark results periodically
as discussed in the dev ML [2], and we’re working on formalizing the work
on regression testing.

There are 7 weeks before the feature freeze on 1/17/2023. The next release
meeting will be on December 13th, 2022. Feel free to join us if you
are interested!

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126

Best regards,
Leonard, Martijn, Matthias and Qingsheng

[1] https://cwiki.apache.org/confluence/display/FLINK/1.17+Release
[2] https://lists.apache.org/thread/zok62sx4m50c79htfp18ymq5vmtgbgxj


[SUMMARY] Flink 1.17 Release Sync 11/1/2022

2022-11-01 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights about the 1.17 release sync on 11/1/2022.

- The release sync is scheduled for biweekly Tuesdays at 9am (Central
European Standard Time) / 4pm (China Standard Time).

- Retrospective for 1.16: a discussion [1] is opened by Matthias for
collecting feedback about the last release cycle.

- Release tracking page: @committers please update the page [2] in the
coming week

- CI stabilities / blockers: PR for FLINK-24119 is opened and waiting for a
green CI and review.

- Martijn is working on a handbook for release managers, which could also
benefit RMs in future releases.

- Tickets for new contributors: we encourage committers to add a "starter"
label in easy-to-fix tickets as described in Flink Jira Process [3], which
is helpful for new contributors to find the correct path.

The next release sync will be on November 15th, 2022.

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126

Best regards,

Leonard, Matthias, Martijn and Qingsheng

[1] https://lists.apache.org/thread/ypt6m3qm6rh5jvhg53jvm1qpyy58mpxp
[2] https://cwiki.apache.org/confluence/display/FLINK/1.17+Release
[3] https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process


[DISCUSS] Planning Flink 1.17

2022-10-20 Thread Qingsheng Ren
Hi everyone,

As we are approaching the official release of Flink 1.16, it’s a good time
to kick off some discussions and march toward 1.17.

- Release managers

Leonard Xu and I would like to volunteer as release managers for 1.17, and
it would be great to have someone else working together on this release.
Please let us know if you have any interest!

- Timeline

Having 1.16 will be released in the next few days and the 4 months release
cycle after that, we propose to set the feature freezing date on *January
17th, 2023* (aligned with our version number 1.17 :-)), so that everyone
could enjoy the holiday season and Chinese new year.

- What we’ll be focusing

Similar to our previous releases, we’d like to keep an eye on the timeline,
CI stability, release testing, and any communication and coordination
across teams and developers. One thing we’d like to mention in particular
is compatibility, which is a frequent complaint from our ecosystem
developers and users. We encourage all committers to do an extra manual
check to see if any public interface is touched before merging a PR. We
could discuss details in another thread later and update the contributing
guidelines to list which should be treated as public APIs. Please feel free
to raise any discussions if you have anything else to emphasize
specifically.

- Collecting features

We'll create a wiki page under this directory[1] for collecting new
features targeted in 1.17 as we always did before to give everyone an
overview and track the progress. Please don’t hesitate to share your ideas
on the page. In the meantime, we’d like to kindly invite our committers to
think about and plan what we could deliver to developers and users in this
release.

Looking forward to working with you all in the coming 1.17 release!

Best regards,
Qingsheng Ren and Leonard Xu
Ververica (Alibaba)

[1]
https://cwiki.apache.org/confluence/display/FLINK/Release+Management+and+Feature+Plan


Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-13 Thread Qingsheng Ren
Hi devs and users,

It looks like we are getting an initial consensus in the discussion so I
started a voting thread [1] just now. Looking forward to your feedback!

[1] https://lists.apache.org/thread/ozlf82mkm6ndx2n1vdgq532h156p4lt6

Best,
Qingsheng


On Thu, Oct 13, 2022 at 10:41 PM Jing Ge  wrote:

> Hi Qingsheng,
>
> Thanks for the clarification. +1, I like the idea. Pointing both numXXXOut
> and numXXXSend to the same external data transfer metric does not really
> break the new SinkV2 design, since there was no requirement to monitor the
> internal traffic. So, I think both developer and user can live with it. It
> might not be the perfect solution but is indeed the currently best
> trade-off solution after considering the backward compatibility.  I would
> suggest firing a follow-up ticket after the PR to take care of the new
> metric for the internal traffic in the future.
>
> Best regards,
> Jing
>
>
> On Thu, Oct 13, 2022 at 3:08 PM Qingsheng Ren  wrote:
>
>> Hi Jing,
>>
>> Thanks for the reply!
>>
>> Let me rephrase my proposal: we’d like to use numXXXOut registered on
>> SinkWriterOperator to reflect the traffic to the external system for
>> compatibility with old versions before 1.15, and make numXXXSend have the
>> same value as numXXXOut for compatibility within 1.15. That means both
>> numXXXOut and numXXXSend are used for external data transfers, which end
>> users care more about. As for the internal traffic within the sink, we
>> could name a new metric for it because this is a _new_ feature in the _new_
>> sink, and end users usually don’t pay attention to internal implementation.
>> The name of the new metric could be discussed later after 1.16 release.
>>
>> > but it might end up with monitoring unexpected metrics, which is even
>> worse for users, i.e. I didn't change anything, but something has been
>> broken since the last update.
>>
>> Yeah this is exactly what we are trying to fix with this proposal. I
>> believe users are more concerned with the output to the external system
>> than the internal data delivery in the sink, so I think we’ll have more
>> cases reporting like “I set up a panel on numRecordsOut in sink to monitor
>> the output of the job, but after upgrading to 1.15 this value is extremely
>> low and I didn’t change anything” if we stick to the current situation. I
>> think only a few end users care about the number of committables sending to
>> downstream as most of them don’t care how the sink works.
>>
>> We do need a re-design to fully distinguish the internal and external
>> traffic on metrics, not only in sink but in all operators as it’s quite
>> common for operators to make IO. This needs time to design, discuss, adjust
>> and vote, but considering this is blocking 1.16, maybe it’s better to
>> rescue the compatibility for now, and leave the huge reconstruction to
>> future versions (maybe 2.0).
>>
>> Best,
>> Qingsheng
>>
>> On Wed, Oct 12, 2022 at 7:21 PM Jing Ge  wrote:
>>
>>> Hi Qingsheng,
>>>
>>> Just want to make sure we are on the same page. Are you suggesting
>>> switching the naming between "numXXXSend" and "numXXXOut" or reverting all
>>> the changes we did with FLINK-26126 and FLINK-26492?
>>>
>>> For the naming switch, please pay attention that the behaviour has been
>>> changed since we introduced SinkV2[1]. So, please be aware of different
>>> numbers(behaviour change) even with the same metrics name. Sticking with
>>> the old name with the new behaviour (very bad idea, IMHO) might seem like
>>> saving the effort in the first place, but it might end up with monitoring
>>> unexpected metrics, which is even worse for users, i.e. I didn't change
>>> anything, but something has been broken since the last update.
>>>
>>> For reverting, I am not sure how to fix the issue mentioned in
>>> FLINK-26126 after reverting all changes. Like Chesnay has already pointed
>>> out, with SinkV2 we have two different output lines - one with the external
>>> system and the other with the downstream operator. In this case,
>>> "numXXXSend" is rather a new metric than a replacement of "numXXXOut". The
>>> "numXXXOut" metric can still be used, depending on what the user wants to
>>> monitor.
>>>
>>>
>>> Best regards,
>>> Jing
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/51fc20db30d001a95de95b3b9993eeb06f558f6c/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/met

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-13 Thread Qingsheng Ren
Hi Jing,

Thanks for the reply!

Let me rephrase my proposal: we’d like to use numXXXOut registered on
SinkWriterOperator to reflect the traffic to the external system for
compatibility with old versions before 1.15, and make numXXXSend have the
same value as numXXXOut for compatibility within 1.15. That means both
numXXXOut and numXXXSend are used for external data transfers, which end
users care more about. As for the internal traffic within the sink, we
could name a new metric for it because this is a _new_ feature in the _new_
sink, and end users usually don’t pay attention to internal implementation.
The name of the new metric could be discussed later after 1.16 release.

> but it might end up with monitoring unexpected metrics, which is even
worse for users, i.e. I didn't change anything, but something has been
broken since the last update.

Yeah this is exactly what we are trying to fix with this proposal. I
believe users are more concerned with the output to the external system
than the internal data delivery in the sink, so I think we’ll have more
cases reporting like “I set up a panel on numRecordsOut in sink to monitor
the output of the job, but after upgrading to 1.15 this value is extremely
low and I didn’t change anything” if we stick to the current situation. I
think only a few end users care about the number of committables sending to
downstream as most of them don’t care how the sink works.

We do need a re-design to fully distinguish the internal and external
traffic on metrics, not only in sink but in all operators as it’s quite
common for operators to make IO. This needs time to design, discuss, adjust
and vote, but considering this is blocking 1.16, maybe it’s better to
rescue the compatibility for now, and leave the huge reconstruction to
future versions (maybe 2.0).

Best,
Qingsheng

On Wed, Oct 12, 2022 at 7:21 PM Jing Ge  wrote:

> Hi Qingsheng,
>
> Just want to make sure we are on the same page. Are you suggesting
> switching the naming between "numXXXSend" and "numXXXOut" or reverting all
> the changes we did with FLINK-26126 and FLINK-26492?
>
> For the naming switch, please pay attention that the behaviour has been
> changed since we introduced SinkV2[1]. So, please be aware of different
> numbers(behaviour change) even with the same metrics name. Sticking with
> the old name with the new behaviour (very bad idea, IMHO) might seem like
> saving the effort in the first place, but it might end up with monitoring
> unexpected metrics, which is even worse for users, i.e. I didn't change
> anything, but something has been broken since the last update.
>
> For reverting, I am not sure how to fix the issue mentioned in FLINK-26126
> after reverting all changes. Like Chesnay has already pointed out, with
> SinkV2 we have two different output lines - one with the external system
> and the other with the downstream operator. In this case, "numXXXSend" is
> rather a new metric than a replacement of "numXXXOut". The "numXXXOut"
> metric can still be used, depending on what the user wants to monitor.
>
>
> Best regards,
> Jing
>
> [1]
> https://github.com/apache/flink/blob/51fc20db30d001a95de95b3b9993eeb06f558f6c/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java#L48
>
>
> On Wed, Oct 12, 2022 at 12:48 PM Qingsheng Ren  wrote:
>
>> As a supplement, considering it could be a big reconstruction
>> redefining internal and external traffic and touching metric names in
>> almost all operators, this requires a lot of discussions and we might
>> do it finally in Flink 2.0. I think compatibility is a bigger blocker
>> in front of us, as the output of sink is a metric that users care a
>> lot about.
>>
>> Thanks,
>> Qingsheng
>>
>> On Wed, Oct 12, 2022 at 6:20 PM Qingsheng Ren  wrote:
>> >
>> > Thanks Chesnay for the reply. +1 for making a unified and clearer
>> > metric definition distinguishing internal and external data transfers.
>> > As you described, having IO in operators is quite common such as
>> > dimension tables in Table/SQL API. This definitely deserves a FLIP and
>> > an overall design.
>> >
>> > However I think it's necessary to change the metric back to
>> > numRecordsOut instead of sticking with numRecordsSend in 1.15 and
>> > 1.16. The most important argument is for compatibility as I mentioned
>> > in my previous email, otherwise all users have to modify their configs
>> > of metric systems after upgrading to Flink 1.15+, and all custom
>> > connectors have to change their implementations to migrate to the new
>> > metric name. I believe other ones participating and approving this
>> &g

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-12 Thread Qingsheng Ren
As a supplement, considering it could be a big reconstruction
redefining internal and external traffic and touching metric names in
almost all operators, this requires a lot of discussions and we might
do it finally in Flink 2.0. I think compatibility is a bigger blocker
in front of us, as the output of sink is a metric that users care a
lot about.

Thanks,
Qingsheng

On Wed, Oct 12, 2022 at 6:20 PM Qingsheng Ren  wrote:
>
> Thanks Chesnay for the reply. +1 for making a unified and clearer
> metric definition distinguishing internal and external data transfers.
> As you described, having IO in operators is quite common such as
> dimension tables in Table/SQL API. This definitely deserves a FLIP and
> an overall design.
>
> However I think it's necessary to change the metric back to
> numRecordsOut instead of sticking with numRecordsSend in 1.15 and
> 1.16. The most important argument is for compatibility as I mentioned
> in my previous email, otherwise all users have to modify their configs
> of metric systems after upgrading to Flink 1.15+, and all custom
> connectors have to change their implementations to migrate to the new
> metric name. I believe other ones participating and approving this
> proposal share the same concern about compatibility too. Also
> considering this issue is blocking the release of 1.16, maybe we could
> fix this asap, and as for defining a new metric for internal data
> transfers we can have an in-depth discussion later. WDYT?
>
> Best,
> Qingsheng
>
> On Tue, Oct 11, 2022 at 6:06 PM Chesnay Schepler  wrote:
> >
> > Currently I think that would be a mistake.
> >
> > Ultimately what we have here is the culmination of us never really 
> > considering how the numRecordsOut metric should behave for operators that 
> > emit data to other operators _and_ external systems. This goes beyond sinks.
> > This even applies to numRecordsIn, for cases where functions query/write 
> > data from/to the outside, (e.g., Async IO).
> >
> > Having 2 separate metrics for that, 1 exclusively for internal data 
> > transfers, and 1 exclusively for external data transfers, is the only way 
> > to get a consistent metric definition in the long-run.
> > We can jump back-and-forth now or just commit to it.
> >
> > I don't think we can really judge this based on FLIP-33. It was IIRC 
> > written before the two phase sinks were added, which heavily blurred the 
> > lines of what a sink even is. Because it definitely is _not_ the last 
> > operator in a chain anymore.
> >
> > What I would suggest is to stick with what we got (although I despise the 
> > name numRecordsSend), and alias the numRecordsOut metric for all 
> > non-TwoPhaseCommittingSink.
> >
> > On 11/10/2022 05:54, Qingsheng Ren wrote:
> >
> > Thanks for the details Chesnay!
> >
> > By “alias” I mean to respect the original definition made in FLIP-33 for 
> > numRecordsOut, which is the number of records written to the external 
> > system, and keep numRecordsSend as the same value as numRecordsOut for 
> > compatibility.
> >
> > I think keeping numRecordsOut for the output to the external system is more 
> > intuitive to end users because in most cases the metric of data flow output 
> > is more essential. I agree with you that a new metric is required, but 
> > considering compatibility and users’ intuition I prefer to keep the initial 
> > definition of numRecordsOut in FLIP-33 and name a new metric for sink 
> > writer’s output to downstream operators. This might be against consistency 
> > with metrics in other operators in Flink but maybe it’s acceptable to have 
> > the sink as a special case.
> >
> > Best,
> > Qingsheng
> > On Oct 10, 2022, 19:13 +0800, Chesnay Schepler , wrote:
> >
> > > I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
> >
> > But that's not possible. If it were that simple there would have never been 
> > a need to introduce another metric in the first place.
> >
> > It's a rather fundamental issue with how the new sinks work, in that they 
> > emit data to the external system (usually considered as "numRecordsOut" of 
> > sinks) while _also_ sending data to a downstream operator (usually 
> > considered as "numRecordsOut" of tasks).
> > The original issue was that the numRecordsOut of the sink counted both 
> > (which is completely wrong).
> >
> > A new metric was always required; otherwise you inevitably end up breaking 
> > some semantic.
> > Adding a new metric for what the sink writes to the external system is, for 
> > better or worse, m

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-12 Thread Qingsheng Ren
Thanks Chesnay for the reply. +1 for making a unified and clearer
metric definition distinguishing internal and external data transfers.
As you described, having IO in operators is quite common such as
dimension tables in Table/SQL API. This definitely deserves a FLIP and
an overall design.

However I think it's necessary to change the metric back to
numRecordsOut instead of sticking with numRecordsSend in 1.15 and
1.16. The most important argument is for compatibility as I mentioned
in my previous email, otherwise all users have to modify their configs
of metric systems after upgrading to Flink 1.15+, and all custom
connectors have to change their implementations to migrate to the new
metric name. I believe other ones participating and approving this
proposal share the same concern about compatibility too. Also
considering this issue is blocking the release of 1.16, maybe we could
fix this asap, and as for defining a new metric for internal data
transfers we can have an in-depth discussion later. WDYT?

Best,
Qingsheng

On Tue, Oct 11, 2022 at 6:06 PM Chesnay Schepler  wrote:
>
> Currently I think that would be a mistake.
>
> Ultimately what we have here is the culmination of us never really 
> considering how the numRecordsOut metric should behave for operators that 
> emit data to other operators _and_ external systems. This goes beyond sinks.
> This even applies to numRecordsIn, for cases where functions query/write data 
> from/to the outside, (e.g., Async IO).
>
> Having 2 separate metrics for that, 1 exclusively for internal data 
> transfers, and 1 exclusively for external data transfers, is the only way to 
> get a consistent metric definition in the long-run.
> We can jump back-and-forth now or just commit to it.
>
> I don't think we can really judge this based on FLIP-33. It was IIRC written 
> before the two phase sinks were added, which heavily blurred the lines of 
> what a sink even is. Because it definitely is _not_ the last operator in a 
> chain anymore.
>
> What I would suggest is to stick with what we got (although I despise the 
> name numRecordsSend), and alias the numRecordsOut metric for all 
> non-TwoPhaseCommittingSink.
>
> On 11/10/2022 05:54, Qingsheng Ren wrote:
>
> Thanks for the details Chesnay!
>
> By “alias” I mean to respect the original definition made in FLIP-33 for 
> numRecordsOut, which is the number of records written to the external system, 
> and keep numRecordsSend as the same value as numRecordsOut for compatibility.
>
> I think keeping numRecordsOut for the output to the external system is more 
> intuitive to end users because in most cases the metric of data flow output 
> is more essential. I agree with you that a new metric is required, but 
> considering compatibility and users’ intuition I prefer to keep the initial 
> definition of numRecordsOut in FLIP-33 and name a new metric for sink 
> writer’s output to downstream operators. This might be against consistency 
> with metrics in other operators in Flink but maybe it’s acceptable to have 
> the sink as a special case.
>
> Best,
> Qingsheng
> On Oct 10, 2022, 19:13 +0800, Chesnay Schepler , wrote:
>
> > I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
>
> But that's not possible. If it were that simple there would have never been a 
> need to introduce another metric in the first place.
>
> It's a rather fundamental issue with how the new sinks work, in that they 
> emit data to the external system (usually considered as "numRecordsOut" of 
> sinks) while _also_ sending data to a downstream operator (usually considered 
> as "numRecordsOut" of tasks).
> The original issue was that the numRecordsOut of the sink counted both (which 
> is completely wrong).
>
> A new metric was always required; otherwise you inevitably end up breaking 
> some semantic.
> Adding a new metric for what the sink writes to the external system is, for 
> better or worse, more consistent with how these metrics usually work in Flink.
>
> On 10/10/2022 12:45, Qingsheng Ren wrote:
>
> Thanks everyone for joining the discussion!
>
> > Do you have any idea what has happened in the process here?
>
> The discussion in this PR [1] shows some details and could be helpful to 
> understand the original motivation of the renaming. We do have a test case 
> for guarding metrics but unfortunaly the case was also modified so the 
> defense was broken.
>
> I think the reason why both the developer and the reviewer forgot to trigger 
> an discussion and gave a green pass on the change is that metrics are quite 
> “trivial” to be noticed as public APIs. As mentioned by Martijn I couldn’t 
> find a place noting that metrics are public APIs and should be treated 
> carefully wh

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Qingsheng Ren
Thanks for the details Chesnay!

By “alias” I mean to respect the original definition made in FLIP-33 for 
numRecordsOut, which is the number of records written to the external system, 
and keep numRecordsSend as the same value as numRecordsOut for compatibility.

I think keeping numRecordsOut for the output to the external system is more 
intuitive to end users because in most cases the metric of data flow output is 
more essential. I agree with you that a new metric is required, but considering 
compatibility and users’ intuition I prefer to keep the initial definition of 
numRecordsOut in FLIP-33 and name a new metric for sink writer’s output to 
downstream operators. This might be against consistency with metrics in other 
operators in Flink but maybe it’s acceptable to have the sink as a special case.

Best,
Qingsheng
On Oct 10, 2022, 19:13 +0800, Chesnay Schepler , wrote:
> > I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut
>
> But that's not possible. If it were that simple there would have never been a 
> need to introduce another metric in the first place.
>
> It's a rather fundamental issue with how the new sinks work, in that they 
> emit data to the external system (usually considered as "numRecordsOut" of 
> sinks) while _also_ sending data to a downstream operator (usually considered 
> as "numRecordsOut" of tasks).
> The original issue was that the numRecordsOut of the sink counted both (which 
> is completely wrong).
>
> A new metric was always required; otherwise you inevitably end up breaking 
> some semantic.
> Adding a new metric for what the sink writes to the external system is, for 
> better or worse, more consistent with how these metrics usually work in Flink.
>
> On 10/10/2022 12:45, Qingsheng Ren wrote:
> > Thanks everyone for joining the discussion!
> >
> > > Do you have any idea what has happened in the process here?
> >
> > The discussion in this PR [1] shows some details and could be helpful to 
> > understand the original motivation of the renaming. We do have a test case 
> > for guarding metrics but unfortunaly the case was also modified so the 
> > defense was broken.
> >
> > I think the reason why both the developer and the reviewer forgot to 
> > trigger an discussion and gave a green pass on the change is that metrics 
> > are quite “trivial” to be noticed as public APIs. As mentioned by Martijn I 
> > couldn’t find a place noting that metrics are public APIs and should be 
> > treated carefully while contributing and reviewing.
> >
> > IMHO three actions could be made to prevent this kind of changes in the 
> > future:
> >
> > a. Add test case for metrics (which we already have in SinkMetricsITCase)
> > b. We emphasize that any public-interface breaking changes should be 
> > proposed by a FLIP or discussed in mailing list, and should be listed in 
> > the release note.
> > c. We remind contributors and reviewers about what should be considered as 
> > public API, and include metric names in it.
> >
> > For b and c these two pages [2][3] might be proper places.
> >
> > About the patch to revert this, it looks like we have a consensus on 1.16. 
> > As of 1.15 I think it’s worthy to trigger a minor version. I didn’t see 
> > complaints about this for now so it should be OK to save the situation 
> > asap. I’m with Xintong’s idea to treat numXXXSend as an alias of numXXXOut 
> > considering there could possibly some users have already adapted their 
> > system to the new naming, and have another internal metric for reflecting 
> > number of outgoing committable batches (actually the numRecordsIn of sink 
> > committer operator should be carrying this info already).
> >
> > [1] https://github.com/apache/flink/pull/18825
> > [2] https://flink.apache.org/contributing/contribute-code.html
> > [3] https://flink.apache.org/contributing/reviewing-prs.html
> >
> > Best,
> > Qingsheng
> > On Oct 10, 2022, 17:40 +0800, Xintong Song , wrote:
> > > +1 for reverting these changes in Flink 1.16.
> > >
> > > For 1.15.3, can we make these metrics available via both names (numXXXOut 
> > > and numXXXSend)? In this way we don't break it for those who already 
> > > migrated to 1.15 and numXXXSend. That means we still need to change 
> > > SinkWriterOperator to use another metric name in 1.15.3, which IIUC is 
> > > internal to Flink sink.
> > >
> > > I'm overall +1 to change numXXXOut back to its original semantics. AFAIK 
> > > (from meetup / flink-forward questionaires), most users do not migrate to 
> > > a new Flink release immediate

Re: [DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-10 Thread Qingsheng Ren
gt; > > which many connectors and users depend on.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > >
> > > > On Mon, 10 Oct 2022 at 11:36, Jingsong Li  
> > > > wrote:
> > > >
> > > >> Thanks for driving, Qingsheng.
> > > >>
> > > >> +1 for reverting sink metric name.
> > > >>
> > > >> We often forget that metric is also one of the important APIs.
> > > >>
> > > >> +1 for releasing 1.15.3 to fix this.
> > > >>
> > > >> Best,
> > > >> Jingsong
> > > >>
> > > >> On Sun, Oct 9, 2022 at 11:35 PM Becket Qin  
> > > >> wrote:
> > > >> >
> > > >> > Thanks for raising the discussion, Qingsheng,
> > > >> >
> > > >> > +1 on reverting the breaking changes.
> > > >> >
> > > >> > In addition, we might want to release a 1.15.3 to fix this and update
> > > >> the previous release docs with this known issue, so that users can 
> > > >> upgrade
> > > >> to 1.15.3 when they hit it. It would also be good to add some backwards
> > > >> compatibility tests on metrics to avoid unintended breaking changes 
> > > >> like
> > > >> this in the future.
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Jiangjie (Becket) Qin
> > > >> >
> > > >> > On Sun, Oct 9, 2022 at 10:35 AM Qingsheng Ren  
> > > >> > wrote:
> > > >> >>
> > > >> >> Hi devs and users,
> > > >> >>
> > > >> >> I’d like to start a discussion about reverting a breaking change 
> > > >> >> about
> > > >> sink metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].
> > > >> >>
> > > >> >> TL;DR
> > > >> >>
> > > >> >> All sink metrics with name “numXXXOut” defined in FLIP-33 are 
> > > >> >> replace
> > > >> by “numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric 
> > > >> names
> > > >> are public APIs, this is a breaking change to end users and not 
> > > >> backward
> > > >> compatible. Also unfortunately this breaking change was not discussed 
> > > >> in
> > > >> the mailing list before.
> > > >> >>
> > > >> >> Background
> > > >> >>
> > > >> >> As defined previously in FLIP-33 (the FLIP page has been changed so
> > > >> please refer to the old version [3] ), metric “numRecordsOut” is used 
> > > >> for
> > > >> reporting the total number of output records since the sink started 
> > > >> (number
> > > >> of records written to the external system), and similarly for
> > > >> “numRecordsOutPerSecond”, “numBytesOut”, “numBytesOutPerSecond” and
> > > >> “numRecordsOutError”. Most sinks are following this naming and 
> > > >> definition.
> > > >> However, these metrics are ambiguous in the new Sink API as “numXXXOut”
> > > >> could be used by the output of SinkWriterOperator for reporting number 
> > > >> of
> > > >> Committables delivered to SinkCommitterOperator. In order to resolve 
> > > >> the
> > > >> conflict, FLINK-26126 and FLINK-26492 changed names of these metrics 
> > > >> with
> > > >> “numXXXSend”.
> > > >> >>
> > > >> >> Necessity of reverting this change
> > > >> >>
> > > >> >> - Metric names are actually public API, as end users need to 
> > > >> >> configure
> > > >> metric collecting and alerting system with metric names. Users have to
> > > >> reset all configurations related to affected metrics.
> > > >> >> - This could also affect custom and external sinks not maintained by
> > > >> Flink, which might have implemented with numXXXOut metrics.
> > > >> >> - The number of records sent to external system is way more 
> > > >> >> important
> > > >> than the number of Committables sent to SinkCommitterOperator, as the
> > > >> latter one is just an internal implementation of sink. We could have a 
> > > >> new
> > > >> metric name for the latter one instead.
> > > >> >> - We could avoid splitting the project by version (like “plz use
> > > >> numXXXOut before 1.15 and use numXXXSend after”) if we revert it ASAP,
> > > >> cosidering 1.16 is still not released for now.
> > > >> >>
> > > >> >> As a consequence, I’d like to hear from devs and users about your
> > > >> opinion on changing these metrics back to “numXXXOut”.
> > > >> >>
> > > >> >> Looking forward to your reply!
> > > >> >>
> > > >> >> [1] https://issues.apache.org/jira/browse/FLINK-26126
> > > >> >> [2] https://issues.apache.org/jira/browse/FLINK-26492
> > > >> >> [1] FLIP-33, version 18:
> > > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883136
> > > >> >>
> > > >> >> Best,
> > > >> >> Qingsheng
> > > >>
> > > >
> > > >


[DISCUSS] Reverting sink metric name changes made in 1.15

2022-10-08 Thread Qingsheng Ren
Hi devs and users,

I’d like to start a discussion about reverting a breaking change about sink 
metrics made in 1.15 by FLINK-26126 [1] and FLINK-26492 [2].

TL;DR

All sink metrics with name “numXXXOut” defined in FLIP-33 are replace by 
“numXXXSend” in FLINK-26126 and FLINK-26492. Considering metric names are 
public APIs, this is a breaking change to end users and not backward 
compatible. Also unfortunately this breaking change was not discussed in the 
mailing list before.

Background

As defined previously in FLIP-33 (the FLIP page has been changed so please 
refer to the old version [3] ), metric “numRecordsOut” is used for reporting 
the total number of output records since the sink started (number of records 
written to the external system), and similarly for “numRecordsOutPerSecond”, 
“numBytesOut”, “numBytesOutPerSecond” and “numRecordsOutError”. Most sinks are 
following this naming and definition. However, these metrics are ambiguous in 
the new Sink API as “numXXXOut” could be used by the output of 
SinkWriterOperator for reporting number of Committables delivered to 
SinkCommitterOperator. In order to resolve the conflict, FLINK-26126 and 
FLINK-26492 changed names of these metrics with “numXXXSend”.

Necessity of reverting this change

- Metric names are actually public API, as end users need to configure metric 
collecting and alerting system with metric names. Users have to reset all 
configurations related to affected metrics.
- This could also affect custom and external sinks not maintained by Flink, 
which might have implemented with numXXXOut metrics.
- The number of records sent to external system is way more important than the 
number of Committables sent to SinkCommitterOperator, as the latter one is just 
an internal implementation of sink. We could have a new metric name for the 
latter one instead.
- We could avoid splitting the project by version (like “plz use numXXXOut 
before 1.15 and use numXXXSend after”) if we revert it ASAP, cosidering 1.16 is 
still not released for now.

As a consequence, I’d like to hear from devs and users about your opinion on 
changing these metrics back to “numXXXOut”.

Looking forward to your reply!

[1] https://issues.apache.org/jira/browse/FLINK-26126
[2] https://issues.apache.org/jira/browse/FLINK-26492
[1] FLIP-33, version 18: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883136

Best,
Qingsheng


Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Qingsheng Ren
Hi Gopi,

What about using a window with a custom trigger? The window is doing nothing 
but aggregating your input to a collection. The trigger accepts metadata from 
the low input stream so it can fire and purge the window (emit all elements in 
the window to downstream) on arrival of metadata. 

Best,
Qingsheng

> On Jun 27, 2022, at 12:46, Gopi Krishna M  wrote:
> 
> Hi,
> I've a scenario where I use connected streams where one is a low throughput 
> metadata stream and another one is a high throughput data stream. I use 
> CoProcessFunction that operates on a data stream with behavior controlled by 
> a metadata stream.
> 
> Is there a way to slow down/pause the high throughput data stream until I've 
> received one entry from the metadata stream? It's possible that by the time I 
> get the first element from the metadata stream, I might get 1000s of items 
> from the data stream. One option is to create a state to buffer the data 
> stream within the operator. Is there any other option which doesn't need this 
> state management?
> 
> Thanks,
> Gopi



Re: [ANNOUNCE] Apache Flink 1.14.5 released

2022-06-26 Thread Qingsheng Ren
Thanks Xingbo for driving this release!

Best, 
Qingsheng

> On Jun 22, 2022, at 11:50, Xingbo Huang  wrote:
> 
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.14.5, which is the fourth bugfix release for the Apache Flink 1.14 
> series.
> 
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>  
> The release is available for download at:
> https://flink.apache.org/downloads.html
>  
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2022/06/22/release-1.14.5.html
>  
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351388
>  
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
>  
> Regards,
> Xingbo



Re: flink1.15中kafka source、sink对kafka-client的版本要求是否可降低

2022-06-26 Thread Qingsheng Ren
Hi,

目前 Kafka connector 会依赖于高版本 kafka-clients 的一些 API,而且 sink 端为支持 exactly-once 
语义使用了反射。Flink 社区考虑到 Kafka client 本身保证了较好后向兼容性,因此不再提供使用旧版本 client 的 Kafka 
connector,针对 5 年前发布的 Kafka 0.11 版本进行适配也不太现实。

祝好,
庆盛

> On Jun 23, 2022, at 19:37, yidan zhao  wrote:
> 
> 如题,我想问下相关了解的同学,目前只是升级 kafka-client 新版本,换了下接口用法。还是依赖到部分新版本client才有的功能呢?
> 是否有可能基于低版本 kafka-client 实现呢?
> 
> 可以的话我可能自己覆盖实现下。
> 因为高版本kafka-client不支持公司的kafka,公司kafka是开源kafka外层加了一层proxy。使用太高版本kafka
> client访问会有问题(推荐的是0.11,我测试最多到2.2的client)。



Re: Flink消费kafka实时同步到MongoDB出现丢数据

2022-06-26 Thread Qingsheng Ren
Hi,

Flink Kafka connector 会在 checkpoint 完成后将位点提交至 Kafka broker,但是 Flink 并不会依赖于提交到 
Kafka broker 上的位点做故障恢复,而是使用 checkpoint 中存储的位点恢复。

关于丢失数据个人建议可以先从小数据量开始复现问题,然后从 source 至 sink 再排查。

祝好,
庆盛

> On Jun 26, 2022, at 11:54, casel.chen  wrote:
> 
> mysql cdc -> kafka -> mongodb
> 写了一个flink 
> 1.13.2作业从kafka消费mysql整库变更topic并实时同步写入mongodb,也开启了checkpoint,但实测下来发现从savepoint恢复和从groupOffsets恢复会造成数据丢失,请问这应该怎么排查?代码仓库地址:https://github.com/ChenShuai1981/mysql2mongodb.git
> 我的MongodbSink有实现CheckpointedFunction,并在snapshotState方法中会等待所有子线程完成写mongodb。
> 
> 
> flink消费kafka处理数据后提交kafka 
> offset的流程是怎样的?一开始消费kafka获取到pendingOffsets,如何确保这些pendingOffsets都处理完成然后全部提交呢?有没有这块源码解析资料?
> 



Re: Flink-1.15.0 消费kafka提交offset失败?

2022-06-26 Thread Qingsheng Ren
Hi,

这个是 Apache Kafka consumer 的一个已知问题,参见 FLINK-28060 [1] 和 KAFKA-13840 [2]。

[1] https://issues.apache.org/jira/browse/FLINK-28060
[2] https://issues.apache.org/jira/browse/KAFKA-13840

祝好,
庆盛

> On Jun 27, 2022, at 09:16, RS  wrote:
> 
> Hi,
> 请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理?
> 
> 
> 现象如下:
> 1. 任务没有异常,
> 2. 数据能正常消费处理,不影响数据使用
> 3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset
> 4. 部分任务的从Kafka的offset提交失败,部分正常
> 
> 
> WARN日志如下:
> 2022-06-27 01:07:42,725 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11398 (max part counter=1).
> 2022-06-27 01:07:42,830 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11398.
> 2022-06-27 01:07:43,820 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11476 (max part counter=0).
> 2022-06-27 01:07:43,946 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11476.
> 2022-06-27 01:07:45,218 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11521 (max part counter=47).
> 2022-06-27 01:07:45,290 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11521.
> 2022-06-27 01:07:45,521 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 11443
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-27 01:07:45,990 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 11398
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 
> 
> Thanks~



Re: Kafka Consumer commit error

2022-06-15 Thread Qingsheng Ren
Hi,

Thanks for reporting the issue and the demo provided by Christian!

I traced the code and think it's a bug in KafkaConsumer (see KAFKA-13563 [1]). 
We probably need to bump the Kafka client to 3.1 to fix it but we should check 
the compatilibity issue first because it’s crossing major version of Kafka (2.x 
-> 3.x). 

[1] https://issues.apache.org/jira/browse/KAFKA-13563

Best, 

Qingsheng

> On Jun 15, 2022, at 02:14, Martijn Visser  wrote:
> 
> Hi Christian,
> 
> There's another similar error reported by someone else. I've linked the 
> tickets together and asked one of the Kafka maintainers to have a look at 
> this.
> 
> Best regards,
> 
> Martijn
> 
> Op di 14 jun. 2022 om 17:16 schreef Christian Lorenz 
> :
> Hi Alexander,
> 
>  
> 
> I’ve created a Jira ticket here 
> https://issues.apache.org/jira/browse/FLINK-28060.
> 
> Unfortunately this is causing some issues to us.
> 
> I hope with the attached demo project the root cause of this can also be 
> determined, as this is reproducible in Flink 1.15.0, but not in Flink 1.14.4.
> 
>  
> 
> Kind regards,
> 
> Christian
> 
>  
> 
> Von: Alexander Fedulov 
> Datum: Montag, 13. Juni 2022 um 23:42
> An: Christian Lorenz 
> Cc: "user@flink.apache.org" 
> Betreff: Re: Kafka Consumer commit error
> 
>  
> 
> This email has reached Mapp via an external source
> 
>  
> 
> Hi Christian,
> 
>  
> 
> thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this 
> application. Do you think this might still be related?
> 
>  
> 
> No, in that case, Kafka transactions are not used, so it should not be 
> relevant.
> 
>  
> 
> Best,
> 
> Alexander Fedulov
> 
>  
> 
> On Mon, Jun 13, 2022 at 3:48 PM Christian Lorenz  
> wrote:
> 
> Hi Alexander,
> 
>  
> 
> thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this 
> application. Do you think this might still be related?
> 
>  
> 
> Best regards,
> 
> Christian
> 
>  
> 
>  
> 
> Von: Alexander Fedulov 
> Datum: Montag, 13. Juni 2022 um 13:06
> An: "user@flink.apache.org" 
> Cc: Christian Lorenz 
> Betreff: Re: Kafka Consumer commit error
> 
>  
> 
> This email has reached Mapp via an external source
> 
>  
> 
> Hi Christian,
> 
>  
> 
> you should check if the exceptions that you see after the broker is back from 
> maintenance are the same as the ones you posted here. If you are using 
> EXACTLY_ONCE, it could be that the later errors are caused by Kafka purging 
> transactions that Flink attempts to commit [1].
> 
>  
> 
> Best,
> 
> Alexander Fedulov
> 
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance
> 
>  
> 
> On Mon, Jun 13, 2022 at 12:04 PM Martijn Visser  
> wrote:
> 
> Hi Christian,
> 
>  
> 
> I would expect that after the broker comes back up and recovers completely, 
> these error messages would disappear automagically. It should not require a 
> restart (only time). Flink doesn't rely on Kafka's checkpointing mechanism 
> for fault tolerance. 
> 
>  
> 
> Best regards,
> 
>  
> 
> Martijn
> 
>  
> 
> Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz 
> :
> 
> Hi,
> 
>  
> 
> we have some issues with a job using the flink-sql-connector-kafka (flink 
> 1.15.0/standalone cluster). If one broker e.g. is restarted for maintainance 
> (replication-factor=2), the taskmanagers executing the job are constantly 
> logging errors on each checkpoint creation:
> 
>  
> 
> Failed to commit consumer offsets for checkpoint 50659
> 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
>  Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> 
> Caused by: 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
>  The coordinator is not available.
> 
>  
> 
> AFAICT the error itself is produced by the underlying kafka consumer. 
> Unfortunately this error cannot be reproduced on our test system.
> 
> From my understanding this error might occur once, but follow up checkpoints 
> / kafka commits should be fine again.
> 
> Currently my only way of “fixing” the issue is to restart the taskmanagers.
> 
>  
> 
> Is there maybe some kafka consumer setting which would help to circumvent 
> this?
> 
>  
> 
> Kind regards,
> 
> Christian
> 
> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
> München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
> 
> This e-mail is from Mapp Digital and its international legal entities and may 
> contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the 
> e-mail or any attachments. Instead, please notify the sender and delete the 
> e-mail and any attachments.
> Please consider the environment before printing. Thank you.
> 
> Mapp Digital Germany GmbH with registered offices at 

Re: Flink Shaded dependencies and extending Flink APIs

2022-06-13 Thread Qingsheng Ren
Hi Andrew,

This is indeed a tricky case since Flink doesn't provide non-shaded
JAR for flink-json. One hacky solution in my mind is like:

1. Create a module let's say "wikimedia-event-utilities-shaded" that
relocates Jackson in the same way and uses the same Jackson version as
flink-shaded-jackson
2. Deploy the module to a local or remote Maven repository
3. Let your custom format depend on the
"wikimedia-event-utilities-shaded" module, then all Jackson
dependencies are relocated in the same way.

Another solution is that you can serialize then deserialize the
"different" ObjectNode to do the conversion but this sacrifices the
performance.

Hope this could be helpful!

Best regards,

Qingsheng

On Thu, Jun 9, 2022 at 8:29 PM Andrew Otto  wrote:
>
> Hi all,
>
> I'm working on an integration project trying to write some library code that 
> will allow us at the Wikimedia Foundation to use Flink with our 'Event 
> Platform'.  Specifically, I'm trying to write a reusable step near the end of 
> a pipeline that will ensure our JSON events satisfy some criteria before 
> producing them to Kafka.  Details here.
>
> I'm experimenting with writing my own custom format to do this.  But all I 
> really need to do is override JsonRowDataSerializationSchema's serialize 
> method and augment and validate the ObjectNode before it is serialized to 
> byte[].
>
> I'm running into an issue where the ObjectNode that is used by Flink here is 
> the shaded one: 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,
>  whereas the WMF code I want to use to augment the ObjectNode is using a 
> regular non shaded one.  I can't pass the shaded ObjectNode instance to a 
> function that takes a non shaded one, and I can't cast the shaded ObjectNode 
> to non shaded either.
>
> My Q is: is there a way to extend Flink APIs that use shaded dependencies?  I 
> suppose I could copy/paste the whole of the "json" format code that I need 
> into my project and just make it my own, but this feels quite obnoxious.
>
> Thank you!
> -Andrew Otto
>  Wikimedia Foundation
>
>


Fwd: Apache flink doesn't work with avro kafka topic with multiple event types

2022-06-13 Thread Qingsheng Ren
Hi Sucheth,

If you are referring to Table / SQL API, I'm afraid it doesn't support
schema evolution or different types from one Kafka table. An
alternative way is to consume the topic with raw format [1] and do
deserialization with a UDTF. If you are using the DataStream API, you
can implement the KafkaRecordDeserializationSchema interface to
deserialize the message consumed from Kafka.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/raw/

Best regards,

Qingsheng

On Sun, Jun 12, 2022 at 12:53 PM Sucheth S  wrote:
>
> Hi,
>
> Apache Flink doesn't work with Avro serialized kafka topic with multiple 
> event types (
> TopicRecordNameStrategy for subject )
>
> Is there a way to read a generic record from avro serialized kafka topic 
> which can have messages with different schemas, basically 
> TopicRecordNameStrategy for the subject in the schema registry. ?
>
>
> Regards,
> Sucheth Shivakumar
> website : https://sucheths.com
> mobile : +1(650)-576-8050
> San Mateo, United States


Re: [External] Re: Source vs SourceFunction and testing

2022-06-09 Thread Qingsheng Ren
Hi Carlos,

FLIP-238 [1] is proposing a FLIP-27-based data generator source and I
think this is what you are looking for. This FLIP was created just
days ago so it may take some time to get accepted and released.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source

Cheers,

Qingsheng

On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos
 wrote:
>
> Hi everyone!
>
> Sorry for reopening the thread, but I am having some problems related to this 
> case while migrating our code from Flink 1.12 to Flink 1.15.
>
> We have a base project that encapsulates a ton of common code and 
> configurations. One of the abstractions we have is an AbstractDataStreamJob 
> class that has generic Sources and Sinks. We implemented it like this since 
> Flink 1.8, following the recommendations of the Flink documentation [1]:
>
> "Apache Flink provides a JUnit rule called MiniClusterWithClientResource for 
> testing complete jobs against a local, embedded mini cluster. called 
> MiniClusterWithClientResource.
> ...
> A few remarks on integration testing with MiniClusterWithClientResource:
> - In order not to copy your whole pipeline code from production to test, make 
> sources and sinks pluggable in your production code and inject special test 
> sources and test sinks in your tests.
> ..."
>
> This way, we can create the real Kafka Sources and Sinks in the Main class of 
> the job, and also create the test Sources and Sinks in the Junit tests, and 
> inject them in the AbstractDataStreamJob class.
>
> The problem comes with the new Source interface and the end to end tests 
> against the local embedded mini cluster. Prior to Flink 1.15, we used the 
> FromElementsFunction to create the test SourceFunction. Now that we changed 
> the code to use the new Source interface, we cannot use the 
> FromElementsFunction anymore, and we haven't found an equivalent 
> FromElementsSource class with the same functionality but implemented using 
> the new Source API.
>
> We want to keep the same structure in the AbstractDataStreamJob class (with 
> generic and pluggable sources and sinks), as we think it is the most elegant 
> and generic solution.
>
> Is it planned to implement a FromElementsSource class that extends the new 
> Source API? Is there any other alternative that may serve as a workaround for 
> the moment?
>
> We have tried to implement a custom Source for this use case, but it seems 
> like an overwhelming task and we do not want to reinvent the wheel either. If 
> it is planned to implement the FromElementsSource we'd rather prefer to wait 
> for it.
>
> Thanks!
> Carlos
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource
>
> -Original Message-
> From: Qingsheng Ren 
> Sent: miércoles, 25 de mayo de 2022 12:10
> To: Piotr Domagalski 
> Cc: user@flink.apache.org
> Subject: [External] Re: Source vs SourceFunction and testing
>
> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with 
> links and attachments.
>
> Glad to see you have resolved the issue!
>
> If you want to learn more about the Source API, the Flink document [1] has a 
> detailed description about it. The original proposal FLIP-27 [2] is also a 
> good reference.
>
> [1] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1W8=
> [2] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterface=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5V3MIQ=
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 17:54, Piotr Domagalski  wrote:
> >
> > Thank you Qingsheng, this context helps a lot!
> >
> > And once again thank you all for being such a helpful community!
> >
> > P.S. I actually struggled for a bit trying to understand why my refactored 
> > solution which accepts DataStream<> wouldn't work ("no operators defined in 
> > the streaming topology"). Turns out, my assumption that I can call 
> > StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get 
> > the same environment, was wrong. I had env.addSource and env.fromSource 
> > calls us

Re: filesink part files roll over

2022-06-06 Thread Qingsheng Ren
Hi Sucheth,

Please see https://issues.apache.org/jira/browse/FLINK-27910

Best,

Qingsheng

> On Jun 5, 2022, at 23:21, Sucheth S  wrote:
> 
> Hi,
> 
> Can someone please help me with this please - 
> https://stackoverflow.com/q/72496963/9125940 ?
> 
> Regards,
> Sucheth Shivakumar
> website : https://sucheths.com
> mobile : +1(650)-576-8050
> San Mateo, United States



Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

2022-06-02 Thread Qingsheng Ren
Thanks for the input ChangZhuo.

Could you check if the configuration "classloader.resolve-order” is
set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
changes related to the user code classloader in Flink 1.15. If my
assumption is correct, you package the gcs-connector into your job JAR
but the Hadoop FS dependencies are not included, so
org.apache.hadoop.fs.FileSystem is loaded by app classloader from
flink-s3-fs-hadoop.jar under the lib of Flink, but
GoogleHadoopFileSystem is loaded by user code classloader from job
JAR. Setting the resolve order to "parent-first" could bypass the
issue [1] so I assume you have this config in 1.14 but not in 1.15.
Please forgive me if I understand incorrectly!

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

On Thu, Jun 2, 2022 at 11:22 AM ChangZhuo Chen (陳昌倬)  wrote:
>
> On Thu, Jun 02, 2022 at 11:17:19AM +0800, Qingsheng Ren wrote:
> > Hi ChangZhuo,
> >
> > I assume it’s a classloading issue but I can’t track down to the root cause 
> > in code. Would you mind sharing the entire exception stack and some JM/TM 
> > logs related to file system?
>
> The following is exception log we have. Please let us know if you need
> other logs.
>
> ps. .listPath() is the function I mentioned earlier.
>
>
> 2022-06-02 00:25:57,825 WARN  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application failed unexpectedly:
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) ~[?:?]
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) 
> ~[?:?]
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) 
> ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(Unknown 
> Source) ~[?:?]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) 
> ~[?:?]
> at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source) ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
>  ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
>  ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) 
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>  [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
> at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
> at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) 
> [?:?]
> at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) 
> [?:?]
> Caused by: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
> ... 14 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: 
> The main method caused an error: class 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class 
> org.apache.hadoop.fs.FileSystem 
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and 
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> at 
> org.apache.flink.client.pro

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

2022-06-01 Thread Qingsheng Ren
Hi ChangZhuo,

I assume it’s a classloading issue but I can’t track down to the root cause in 
code. Would you mind sharing the entire exception stack and some JM/TM logs 
related to file system?

Best regards, 

Qingsheng

> On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬)  wrote:
> 
> Hi,
> 
> We use GCS as storage, and have the following functions to list files in
> GCS path for Flink batch mode to buidl states:
> 
> 
>  def listPath(p: String): Seq[String] = {
>val path = new Path(p)
>val fs = path.getFileSystem(new Configuration())
>fs.listStatus(path) match {
>  case null => Seq()
>  case xs => xs.map(_.getPath.toString)
>}
>  }
> 
> This function works fine in Flink 1.14. However, in Flink 1.15, we have
> the following exception:
> 
>  Caused by: java.lang.ClassCastException: class 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class 
> org.apache.hadoop.fs.FileSystem 
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and 
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
>  at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) 
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) 
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>  at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) 
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) 
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) 
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) 
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>  at .listPath() ~[?:?]
> 
> We found a similar issue in Spark [0]. However, we are not sure if it is
> related, and if it is, how can we apply this fix. Any help is welcome.
> 
> 
> [0] https://issues.apache.org/jira/browse/SPARK-9206
> 
> 
> -- 
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B



Re: Can we use CheckpointedFunction with the new Source api?

2022-05-31 Thread Qingsheng Ren
Hi Qing, 

Thanks for the input. I think having a stateful function to accumulate the tree 
after source is a reasonable solution to me. Under your design a split is 
mapping to a znode so the state persisted in the source reader would be 
per-node information, and it’s hard to accumulate them under the current 
abstraction of source. Also I’m a little bit curious about the use case. If the 
downstream requires the whole tree, does that means the parallelism of the 
accumulator has to be 1? Please forgive me if my understanding is incorrect. 

Another idea in my mind is that if you are also providing a reusable *table* 
source, you can wrap the source and the accumulating function together into a 
DataStreamScanProvider and provide as one table source to user. This might look 
a bit neater. 

Cheers, 

Qingsheng

> On May 31, 2022, at 16:04, Qing Lim  wrote:
> 
> Hi Qingsheng, thanks for getting back.
> 
> I manage to find a workaround, but if you can provide other suggestions it'd 
> be great too.
> 
> I followed the documentation here: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
> 
> I implemented a Custom Source that emits all changes from a Zookeeper node 
> (recursively), I modelled it such that everytime there's a change in the 
> tree, it will produce a split for the specific node, then the reader is 
> responsible to fetch the latest state of the node, and emit.
> This works fine, but we have a use case to emit the whole tree (recursively), 
> conceptually it is quite simple, I just need to accumulate the whole tree as 
> State, and then emit the State whenever it updates.
> 
> I was trying to achieve this inside the Source, because this is meant to be 
> something reusable within our organization, which is why I asked the original 
> question, but I have now solved it by implementing a stateful Map function 
> instead, it is a bit less ergonomic, but acceptable on my end. So if you have 
> an alternative, please share with me, thank you.
> 
> Kind regards
> 
> -Original Message-
> From: Qingsheng Ren  
> Sent: 31 May 2022 03:57
> To: Qing Lim 
> Cc: user@flink.apache.org
> Subject: Re: Can we use CheckpointedFunction with the new Source api?
> 
> Hi Qing,
> 
> I’m afraid CheckpointedFunction cannot be applied to the new source API, but 
> could you share the abstractions of your source implementation, like which 
> component a split maps to etc.? Maybe we can try to do some workarounds. 
> 
> Best, 
> 
> Qingsheng
> 
>> On May 30, 2022, at 20:09, Qing Lim  wrote:
>> 
>> Hi, is it possible to use CheckpointedFunction with the new Source api? (The 
>> one in package org.apache.flink.api.connector.source)
>> 
>> My use case:
>> 
>> I have a custom source that emit individual nodes update from a tree, and I 
>> wish to create a stream of the whole Tree snapshots, so I will have to 
>> accumulate all updates and keep it as state. In addition to this, I wish to 
>> expose this functionality as a library to my organization.
>> 
>> The custom source is written using the new Source api, I wonder if we can 
>> attach state to it?
>> 
>> Kind regards
>> 
>> This e-mail and any attachments are confidential to the addressee(s) and may 
>> contain information that is legally privileged and/or confidential. If you 
>> are not the intended recipient of this e-mail you are hereby notified that 
>> any dissemination, distribution, or copying of its content is strictly 
>> prohibited. If you have received this message in error, please notify the 
>> sender by return e-mail and destroy the message and all copies in your 
>> possession.
>> 
>> 
>> To find out more details about how we may collect, use and share your 
>> personal information, please see https://www.mwam.com/privacy-policy. This 
>> includes details of how calls you make to us may be recorded in order for us 
>> to comply with our legal and regulatory obligations.
>> 
>> 
>> To the extent that the contents of this email constitutes a financial 
>> promotion, please note that it is issued only to and/or directed only at 
>> persons who are professional clients or eligible counterparties as defined 
>> in the FCA Rules. Any investment products or services described in this 
>> email are available only to professional clients and eligible 
>> counterparties. Persons who are not professional clients or eligible 
>> counterparties should not rely or act on the contents of this email.
>> 
>> 
>> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
>> Authority. Marshall Wace LLP is a limited liability part

Re: FileSource SourceReader failure scenario

2022-05-31 Thread Qingsheng Ren
Hi Meghajit,

Good question! To make a short answer: splits won’t be returned back to 
enumerator by reader once they are assigned and *checkpointed*. 

As described by the JavaDoc of SplitEnumerator#addSplitsBack [1]:

> Add a split back to the split enumerator. It will only happen when a 
> SourceReader fails and there are splits assigned to it after the last 
> successful checkpoint.

Suppose we have split A and reader 0, and we have a flow like this:

Checkpoint 100 -> Split assignment (A -> 0) -> Checkpoint 101

After checkpoint 101 the state of split A will be managed by reader 0, which 
means if the reader fails and rolls back to checkpoint 101, the state of split 
A should be recovered by reader instead of returning to the enumerator because 
the split has been delivered to the reader and successfully stored into the 
reader’s checkpoint 101. But if reader 0 fails before checkpoint 101 and rolls 
back to 100, reader 0 is not aware of the assignment of split A, then A will be 
added back to the enumerator and be assigned again.

In a nulshell, if a split is assigned to a reader and a checkpoint is made 
successfully, it should be reader’s responsibility to handle the state and 
recover, and the split won’t be returned to the enumerator. A split won’t be 
duplicately assigned or read under this pattern. 

Hope this is helpful!

[1] 
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html

Cheers, 

Qingsheng


> On May 31, 2022, at 16:29, Meghajit Mazumdar  
> wrote:
> 
> Hello,
> 
> I had a question with regards to the behaviour of FileSource and SourceReader 
> in cases of failures. Let me know if I missed something conceptually.
> 
> We are running a Parquet File Source. Let's say, we supply the source with a 
> directory path containing 5 files and the Flink job is configured to run with 
> a parallelism of 2.
> 
> When the job starts, 2 SourceReaders are created and when they ask for 
> splits, the split assigner assigns them one file each, which they start 
> processing.
> 
> Now, the documentation of FileSplitAssigner.addSplits method says the 
> following: 
> Adds a set of splits to this assigner. This happens for example when some 
> split processing failed and the splits need to be re-added, or when new 
> splits got discovered.
> 
> I understand this means that un-processed splits or splits that were not 
> processed completely due to some error with the SourceReader get added back 
> to the split assigner to be re-assigned to some other SourceReader.
> 
> However, the documentation of FileRecordFormat.restoreReader has this 
> statement written:
> Restores a reader from a checkpointed position. This method is called when 
> the reader is recovered from a checkpoint and the reader has previously 
> stored an offset into the checkpoint, by returning from the 
> FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative 
> offset. That value is supplied as the restoredOffset.
> 
> I am somewhat confused by these 2 documentation statements. If the split is 
> added back to the split assigner when split processing got failed by a 
> SourceReader (maybe due to some exception or fatal error), then the split 
> could be re-assigned to any other SourceReader next. Even if the failed 
> SourceReader comes back and starts processing the file from the last 
> checkpointed offset, there would be duplicate processing as the file could 
> have been assigned to somebody else in the meantime. Then what is the purpose 
> of `restoreReader` ? Or, am I missing something ?
> 
> -- 
> Regards,
> Meghajit



Re: Can we use CheckpointedFunction with the new Source api?

2022-05-30 Thread Qingsheng Ren
Hi Qing,

I’m afraid CheckpointedFunction cannot be applied to the new source API, but 
could you share the abstractions of your source implementation, like which 
component a split maps to etc.? Maybe we can try to do some workarounds. 

Best, 

Qingsheng

> On May 30, 2022, at 20:09, Qing Lim  wrote:
> 
> Hi, is it possible to use CheckpointedFunction with the new Source api? (The 
> one in package org.apache.flink.api.connector.source)
>  
> My use case:
>  
> I have a custom source that emit individual nodes update from a tree, and I 
> wish to create a stream of the whole Tree snapshots, so I will have to 
> accumulate all updates and keep it as state. In addition to this, I wish to 
> expose this functionality as a library to my organization.
>  
> The custom source is written using the new Source api, I wonder if we can 
> attach state to it?
>  
> Kind regards
>  
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. If you 
> are not the intended recipient of this e-mail you are hereby notified that 
> any dissemination, distribution, or copying of its content is strictly 
> prohibited. If you have received this message in error, please notify the 
> sender by return e-mail and destroy the message and all copies in your 
> possession.
> 
> 
> To find out more details about how we may collect, use and share your 
> personal information, please see https://www.mwam.com/privacy-policy. This 
> includes details of how calls you make to us may be recorded in order for us 
> to comply with our legal and regulatory obligations.
> 
> 
> To the extent that the contents of this email constitutes a financial 
> promotion, please note that it is issued only to and/or directed only at 
> persons who are professional clients or eligible counterparties as defined in 
> the FCA Rules. Any investment products or services described in this email 
> are available only to professional clients and eligible counterparties. 
> Persons who are not professional clients or eligible counterparties should 
> not rely or act on the contents of this email.
> 
> 
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P. ("MWNA"), which is registered with 
> the US Securities and Exchange Commission ("SEC") as an investment adviser.  
> Registration with the SEC does not imply that MWNA or its employees possess a 
> certain level of skill or training.
> 



Re: Source vs SourceFunction and testing

2022-05-25 Thread Qingsheng Ren
Glad to see you have resolved the issue! 

If you want to learn more about the Source API, the Flink document [1] has a 
detailed description about it. The original proposal FLIP-27 [2] is also a good 
reference. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/sources/
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Cheers, 

Qingsheng

> On May 25, 2022, at 17:54, Piotr Domagalski  wrote:
> 
> Thank you Qingsheng, this context helps a lot!
> 
> And once again thank you all for being such a helpful community!
> 
> P.S. I actually struggled for a bit trying to understand why my refactored 
> solution which accepts DataStream<> wouldn't work ("no operators defined in 
> the streaming topology"). Turns out, my assumption that I can call 
> StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get 
> the same environment, was wrong. I had env.addSource and env.fromSource calls 
> using one instance of the environment, but then called env.execute() on 
> another instance :facepalm:
> 
> On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren  wrote:
> Hi Piotr,
> 
> I’d like to share my understanding about this. Source and SourceFunction are 
> both interfaces to data sources. SourceFunction was designed and introduced 
> earlier and as the project evolved, many shortcomings emerged. Therefore, the 
> community re-designed the source interface and introduced the new Source API 
> in FLIP-27 [1]. 
> 
> Finally we will deprecate the SourceFunction and use Source as the only 
> interface for all data sources, but considering the huge cost of migration 
> you’ll see SourceFunction and Source co-exist for some time, like the 
> ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource 
> as a pioneer has already migrated to the new Source API.
> 
> I think the API to end users didn't change a lot: both 
> env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
> and you could apply downstream transformations onto it. 
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>  
> 
> Cheers,
> 
> Qingsheng
> 
> > On May 25, 2022, at 03:19, Piotr Domagalski  wrote:
> > 
> > Hi Ken,
> > 
> > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> > navigating the type system and being still confused about differences 
> > between Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> > 
> > I think the DataStream<> type is what I'm looking for? That is, then I can 
> > use:
> > 
> > DataStream source = env.fromSource(getKafkaSource(params), 
> > watermarkStrategy, "Kafka");
> > when using KafkaSource in the normal setup
> > 
> > and
> > DataStream s = env.addSource(new ParallelTestSource<>(...));
> > when using the testing source [1]
> > 
> > Does that sound right?
> > 
> > [1] 
> > https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> > 
> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler  
> > wrote:
> > Hi Piotr,
> > 
> > The way I handle this is via a workflow class that uses a builder approach 
> > to specifying inputs, outputs, and any other configuration settings.
> > 
> > The inputs are typically DataStream.
> > 
> > This way I can separate out the Kafka inputs, and use testing sources that 
> > give me very precise control over the inputs (e.g. I can hold up on right 
> > side data to ensure my stateful left join junction is handling deferred 
> > joins properly). I can also use Kafka unit test support (either kafka-junit 
> > or Spring embedded Kafka) if needed.
> > 
> > Then in the actual tool class (with a main method) I’ll wire up the real 
> > Kafka sources, with whatever logic is required to convert the consumer 
> > records to what the workflow is expecting.
> > 
> > — Ken
> > 
> >> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
> >> 
> >> Hi,
> >> 
> >> I'm wondering: what ithe recommended way to structure the job which one 
> >> would like to test later on with `MiniCluster`.
> >> 
> >> I've looked at the flink-training repository examples [1] and they tend to 
> >> expose the main job as a class that accepts a `SourceFunction` and a 
> >> `SinkFunction`, which make sense. But then, my job is normally constructed 
> >> with `KafkaSource` which is then passed to `env.fromSource(.

Re: Kafka source 检测到分区变更时发生 WakeupException

2022-05-24 Thread Qingsheng Ren
Hi,

感谢反馈,看上去是一个 bug。可以在 Apache JIRA [1] 上新建一个 ticket 吗?

[1] https://issues.apache.org/jira

> On May 25, 2022, at 11:35, 邹璨  wrote:
> 
> flink版本: 1.14.3
> 模块:connectors/kafka 
> 问题描述:
> 我在使用kafka分区动态发现时发生了WakeupException,导致Job失败。异常信息如下
> 
> 2022-05-10 15:08:03
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: org.apache.kafka.common.errors.WakeupException
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
> at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
> at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ... 6 more
> 
> 
> 
> 根据异常栈轨迹简单查阅源码发现,KafkaSource在处理分区变更时,会通过调用consumer.wakeup中断正在拉取数据的consumer。
> 随后在处理分区变更时会调用consumer.position方法,由于consumer已经被唤醒,此时会抛出WakeupException。
> 
> 
> 
> 
> 
> 
> 此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。
> 如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。
> 如果此电子邮件发送不正确,请立即联系 NAVER 
> Security(dl_naversecur...@navercorp.com)。然后删除所有原件、副本和附件。谢谢您的合作。
> ​
> This email and the information contained in this email are intended solely 
> for the recipient(s) addressed above and may contain information that is 
> confidential and/or privileged or whose disclosure is prohibited by law or 
> other reasons.
> If you are not the intended recipient of this email, please be advised that 
> any unauthorized storage, duplication, dissemination, distribution or 
> disclosure of all or part of this email is strictly prohibited.
> If you received this email in error, please immediately contact NAVER 
> Security (dl_naversecur...@navercorp.com) and delete this email and any 
> copies and attachments from your system. Thank you for your cooperation.​



Re: Source vs SourceFunction and testing

2022-05-24 Thread Qingsheng Ren
Hi Piotr,

I’d like to share my understanding about this. Source and SourceFunction are 
both interfaces to data sources. SourceFunction was designed and introduced 
earlier and as the project evolved, many shortcomings emerged. Therefore, the 
community re-designed the source interface and introduced the new Source API in 
FLIP-27 [1]. 

Finally we will deprecate the SourceFunction and use Source as the only 
interface for all data sources, but considering the huge cost of migration 
you’ll see SourceFunction and Source co-exist for some time, like the 
ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource as 
a pioneer has already migrated to the new Source API.

I think the API to end users didn't change a lot: both 
env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
and you could apply downstream transformations onto it. 

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 

Cheers,

Qingsheng

> On May 25, 2022, at 03:19, Piotr Domagalski  wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> 
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler  
> wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>> 
>> -- 
>> Piotr Domagalski
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski



Re: Json Deserialize in DataStream API with array length not fixed

2022-05-24 Thread Qingsheng Ren
Hi Zain,

I assume you are using DataStream API as described in the subject of your 
email, so I think you can define any functions/transformations to parse the 
json value, even the schema is changing. 

It looks like the value of field “array_coordinates” is a an escaped 
json-formatted STRING instead of an json object, so I prefer to parse the input 
json string first using Jackson (or any json parser you like), extract the 
field “array_coordinates” as a string, remove all backslashs to un-escape the 
string, and use Jackson again to parse it. 

If you are using Table / SQL API, I’m afaid you have to use UDTF to parse the 
input because the schema varies in the field “array_coordinates”. 

Hope this could be helpful!

Cheers, 

Qingsheng

> On May 21, 2022, at 14:58, Zain Haider Nemati  wrote:
> 
> Hi Folks,
> I have data coming in this format:
> 
> {
> “data”: {
> “oid__id”:  “61de4f26f01131783f162453”,
> “array_coordinates”:“[ { \“speed\” : \“xxx\“, \“accuracy\” : 
> \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” : 
> \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : { 
> \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” : 
> \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : 
> \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : 
> \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
> “batchId”:  “xxx",
> “agentId”:  “xxx",
> “routeKey”: “40042-12-01-2022",
> “__v”:  0
> },
> “metadata”: {
> “timestamp”:“2022-05-02T18:49:52.619827Z”,
> “record-type”:  “data”,
> “operation”:“load”,
> “partition-key-type”:   “primary-key”,
> “schema-name”:  “xxx”,
> “table-name”:   “xxx”
> }
> }
> 
> Where length of array coordinates array varies is not fixed in the source is 
> their any way to define a json deserializer for this? If so would really 
> appreciate if I can get some help on this



Re: How can I set job parameter in flink sql

2022-05-24 Thread Qingsheng Ren
Hi,

You can take use of the configuration “pipeline.global-job-parameters” [1] to 
pass your custom configs all the way into the UDF. For example you can execute 
this in SQL client:

SET pipeline.global-job-parameters=black_list_path:/root/list.properties;

Then you can get the value “/root/list.properties” by 
context.getJobParameter(“black_list_path”, “your_default_value”) in the open() 
method of your UDF.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#pipeline-global-job-parameters

Cheers, 

Qingsheng

> On May 11, 2022, at 14:36, wang <24248...@163.com> wrote:
> 
> Hi dear engineer,
> 
> I want to override the function open() in my UDF, like:
> 
> 
> 
> In open() function, I want to fetch the configred value "black_list_path", 
> then simply print that value out. And I config this value in ./sql-client.sh 
> console:
> 
> SET black_list_path = /root/list.properties
> 
> Then I run this UDF, but what printed is /config/list.properties(this is the 
> default value as I set in context.getJobParameter("black_list_path", 
> "/config/list/properties")), not /root/list.properties which I set in 
> ./sql-client.sh console.
> 
> So could you please show me the correct way to set black_list_path is sql ? 
> Thanks so much!
> 
> 
> Thanks && Reards,
> Hunk
> 
> 
> 
> 
> 
>  



Re: Weird Flink Kafka source watermark behavior

2022-04-13 Thread Qingsheng Ren
Another solution would be setting the parallelism = #partitions, so that one 
parallelism would be responsible for reading exactly one partition.

Qingsheng

> On Apr 13, 2022, at 17:52, Qingsheng Ren  wrote:
> 
> Hi Jin, 
> 
> Unfortunately I don’t have any quick bypass in mind except increasing the 
> tolerance of out of orderness. 
> 
> Best regards, 
> 
> Qingsheng
> 
>> On Apr 8, 2022, at 18:12, Jin Yi  wrote:
>> 
>> confirmed that moving back to FlinkKafkaConsumer fixes things.
>> 
>> is there some notification channel/medium that highlights critical 
>> bugs/issues on the intended features like this pretty readily?
>> 
>> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi  wrote:
>> based on symptoms/observations on the first operator (LogRequestFilter) 
>> watermark and event timestamps, it does seem like it's the bug.  things 
>> track fine (timestamp > watermark) for the first batch of events, then the 
>> event timestamps go back into the past and are "late".
>> 
>> looks like the 1.14 backport just got in 11 days ago 
>> (https://github.com/apache/flink/pull/19128).  is there a way to easily test 
>> this fix locally?  based on the threads, should i just move back to 
>> FlinkKafkaConsumer until 1.14.5?
>> 
>> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren  wrote:
>> Hi Jin,
>> 
>> If you are using new FLIP-27 sources like KafkaSource, per-partition 
>> watermark (or per-split watermark) is a default feature integrated in 
>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which 
>> happens during the first fetch of the source that records in the first split 
>> pushes the watermark far away, then records from other splits will be 
>> treated as late events.  
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>> 
>> Best regards,
>> 
>> Qingsheng
>> 
>> 
>>> On Apr 8, 2022, at 15:54, Jin Yi  wrote:
>>> 
>>> how should the code look like to verify we're using per-partition 
>>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in 
>>> 1.14.4?
>>> 
>>> we currently have it looking like:
>>> 
>>> streamExecutionEnvironment.fromSource(
>>>   KafkaSource.builder().build(),
>>>   watermarkStrategy,
>>>   "whatever",
>>>   typeInfo);
>>> 
>>> when running this job with the streamExecutionEnviornment parallelism set 
>>> to 1, and the kafka source having 30 partitions, i'm seeing weird behaviors 
>>> where the first operator after this source consumes events out of order 
>>> (and therefore, violates watermarks).  the operator simply checks to see 
>>> what "type" of event something is and uses side outputs to output the 
>>> type-specific messages.  here's a snippet of the event timestamp going back 
>>> before the current watermark (first instance of going backwards in time in 
>>> bold):
>>> 
>>> 2022-04-08 05:47:06,315 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>>> 2022-04-08 05:47:06,316 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>>> 2022-04-08 05:47:06,316 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>>> 2022-04-08 05:47:06,316 WARN  
>

Re: Weird Flink Kafka source watermark behavior

2022-04-13 Thread Qingsheng Ren
Hi Jin, 

Unfortunately I don’t have any quick bypass in mind except increasing the 
tolerance of out of orderness. 

Best regards, 

Qingsheng

> On Apr 8, 2022, at 18:12, Jin Yi  wrote:
> 
> confirmed that moving back to FlinkKafkaConsumer fixes things.
> 
> is there some notification channel/medium that highlights critical 
> bugs/issues on the intended features like this pretty readily?
> 
> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi  wrote:
> based on symptoms/observations on the first operator (LogRequestFilter) 
> watermark and event timestamps, it does seem like it's the bug.  things track 
> fine (timestamp > watermark) for the first batch of events, then the event 
> timestamps go back into the past and are "late".
> 
> looks like the 1.14 backport just got in 11 days ago 
> (https://github.com/apache/flink/pull/19128).  is there a way to easily test 
> this fix locally?  based on the threads, should i just move back to 
> FlinkKafkaConsumer until 1.14.5?
> 
> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren  wrote:
> Hi Jin,
> 
> If you are using new FLIP-27 sources like KafkaSource, per-partition 
> watermark (or per-split watermark) is a default feature integrated in 
> SourceOperator. You might hit the bug described in FLINK-26018 [1], which 
> happens during the first fetch of the source that records in the first split 
> pushes the watermark far away, then records from other splits will be treated 
> as late events.  
> 
> [1] https://issues.apache.org/jira/browse/FLINK-26018
> 
> Best regards,
> 
> Qingsheng
> 
> 
> > On Apr 8, 2022, at 15:54, Jin Yi  wrote:
> > 
> > how should the code look like to verify we're using per-partition 
> > watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in 
> > 1.14.4?
> > 
> > we currently have it looking like:
> > 
> > streamExecutionEnvironment.fromSource(
> >KafkaSource.builder().build(),
> >watermarkStrategy,
> >"whatever",
> >typeInfo);
> > 
> > when running this job with the streamExecutionEnviornment parallelism set 
> > to 1, and the kafka source having 30 partitions, i'm seeing weird behaviors 
> > where the first operator after this source consumes events out of order 
> > (and therefore, violates watermarks).  the operator simply checks to see 
> > what "type" of event something is and uses side outputs to output the 
> > type-specific messages.  here's a snippet of the event timestamp going back 
> > before the current watermark (first instance of going backwards in time in 
> > bold):
> > 
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> > 2022-04-08 05:47:06,317 WARN  
> > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
> > [] - LogRequestFilter ts: 1649284172666 watermark: 

Re: Is there any way to get the ExecutionConfigurations in Dynamic factory class

2022-04-13 Thread Qingsheng Ren
Hi Anitha,

I’m not quite familiar  with BigQuery. Is it possible to postpone the operation 
to SplitEnumerator, in which you could get parallelism by 
org.apache.flink.api.connector.source.SplitEnumeratorContext#currentParallelism?
 

If you are using SourceFunction you can get access to current parallelism with 
org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks,
 but we strongly suggest to use Source instead of SourceFunction. 

Cheers, 

Qingsheng

> On Apr 11, 2022, at 13:58, Anitha Thankappan 
>  wrote:
> 
> Hi Qingsheng,
> 
> I am creating BigQuery read sessions in the factory class level. I want to 
> set the number of Streams in the read session using the parallelism set at 
> Execution environment.
> This is to execute each stream by each executor to achieve parallel execution 
> of streams.
> 
> 
> Thanks and Regards,
> Anitha Thankappan
> 
> 
> On Fri, Apr 8, 2022 at 11:53 AM Qingsheng Ren  wrote:
> Hi Anitha,
> 
> AFAIK DynamicTableSourceFactory doesn’t expose interface for getting 
> parallelism. Could you elaborate on why you need parallelism in table 
> factory? Maybe we could find other ways to fulfill your requirement. 
> 
> Best regards, 
> 
> Qingsheng
> 
> > On Apr 7, 2022, at 16:11, Anitha Thankappan 
> >  wrote:
> > 
> > Hi 
> > 
> > I have developed a BigQuery Flink connector by implementing 
> > DynamicTableSourceFactory.
> > I have a requirement to :
> >get the configured parallelism value of 
> > StreamExecutionEnvironment in the Factory class.
> >or
> >set the parallelism at Factory class or Table source 
> > class level.
> > Please help me on this.
> > 
> > 
> > Thanks and Regards,
> > Anitha Thankappan
> > 
> > This message contains information that may be privileged or confidential 
> > and is the property of the Quantiphi Inc and/or its affiliates. It is 
> > intended only for the person to whom it is addressed. If you are not the 
> > intended recipient, any review, dissemination, distribution, copying, 
> > storage or other use of all or any portion of this message is strictly 
> > prohibited. If you received this message in error, please immediately 
> > notify the sender by reply e-mail and delete this message in its entirety
> 
> 
> This message contains information that may be privileged or confidential and 
> is the property of the Quantiphi Inc and/or its affiliates. It is intended 
> only for the person to whom it is addressed. If you are not the intended 
> recipient, any review, dissemination, distribution, copying, storage or other 
> use of all or any portion of this message is strictly prohibited. If you 
> received this message in error, please immediately notify the sender by reply 
> e-mail and delete this message in its entirety



Re: Discuss making KafkaSubscriber Public

2022-04-13 Thread Qingsheng Ren
Thanks for the proposal Mason! I think exposing `KafkaSubscriber` as public API 
is helpful for users to implement more complex subscription logics. 

+1 (non-binding)

Cheers, 

Qingsheng

> On Apr 12, 2022, at 11:46, Mason Chen  wrote:
> 
> Hi Flink Devs,
> 
> I was looking to contribute to 
> https://issues.apache.org/jira/browse/FLINK-24660, which is a ticket to track 
> changing the KafkaSubscriber from Internal to PublicEvolving.
> 
> In the PR, it seems a few of us have agreement on making the subscriber 
> pluggable in the KafkaSource, but I'd like to raise the question 
> nevertheless. Furthermore, there is also interest from various Flink mailing 
> threads and on the Jira ticket itself for the ticket, so I think the change 
> would be beneficial to the users. There is already some feedback to make the 
> contract of handling removed splits by the KafkaSource and subscriber clearer 
> in the docs.
> 
> I have yet to address all the PR feedback, but does anyone have any concerns 
> before I proceed further?
> 
> Best,
> Mason



Re: flink jdbc connector不支持source

2022-04-10 Thread Qingsheng Ren
Hi,

JDBC connector 是支持 source 的,应该是没有将最新的文档同步翻译成中文,可以参考一下英文文档 [1]。

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/

祝好

> On Apr 10, 2022, at 11:07, casel.chen  wrote:
> 
> 现有一个场景是需要用flink一次性批量将某个mysql库下指定表(不同schema)同步到hudi表里面,查了一下官网flink jdbc 
> connector [1] 文档说明只支持sink,不支持source。请问社区有支持计划吗?如果没有的话,自己要如何开发,可以给个例子吗?谢谢!
> 
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/



Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Qingsheng Ren
Hi Jin,

If you are using new FLIP-27 sources like KafkaSource, per-partition watermark 
(or per-split watermark) is a default feature integrated in SourceOperator. You 
might hit the bug described in FLINK-26018 [1], which happens during the first 
fetch of the source that records in the first split pushes the watermark far 
away, then records from other splits will be treated as late events.  

[1] https://issues.apache.org/jira/browse/FLINK-26018

Best regards,

Qingsheng


> On Apr 8, 2022, at 15:54, Jin Yi  wrote:
> 
> how should the code look like to verify we're using per-partition watermarks 
> if we moved away from FlinkKafkaConsumer to KafkaSource in 1.14.4?
> 
> we currently have it looking like:
> 
> streamExecutionEnvironment.fromSource(
>KafkaSource.builder().build(),
>watermarkStrategy,
>"whatever",
>typeInfo);
> 
> when running this job with the streamExecutionEnviornment parallelism set to 
> 1, and the kafka source having 30 partitions, i'm seeing weird behaviors 
> where the first operator after this source consumes events out of order (and 
> therefore, violates watermarks).  the operator simply checks to see what 
> "type" of event something is and uses side outputs to output the 
> type-specific messages.  here's a snippet of the event timestamp going back 
> before the current watermark (first instance of going backwards in time in 
> bold):
> 
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> 2022-04-08 05:47:06,317 WARN  
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] 
> - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> 
> 
> 
> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill  wrote:
> I dove deeper.  I wasn't actually using per-partition watermarks.  Thank you 
> for the help!
> 
> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill  wrote:
> Thanks, Thias and Dongwon.
> 
> I'll keep debugging this with the idle watermark turned off.
> 
> Next TODOs:
> - Verify that we’re using per-partition watermarks.  Our code matches the 
> example but maybe something is disabling it.
> - Enable logging of partition-consumer assignment, to see if that is the 
> cause of the problem.
> - Look at adding flags to set the source parallelism to see if that fixes the 
> issue.
> 
> Yes, I've seen Flink talks on creating our own watermarks through Kafka.  
> Sounds like a good idea.
> 
> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim  wrote:
> I totally agree with Schwalbe that per-partition watermarking allows # source 
> tasks < # kafka partitions. 
> 
> Otherwise, Dan, you should suspect other possibilities like what Schwalbe 
> said.
> 
> Best,
> 
> Dongwon
> 
> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias 
>  wrote:
> Hi San, Dongwon,
> 
>  
> 
> I share the opinion that when per-partition watermarking is enabled, you 
> should observe correct behavior … would be interesting to see why it does not 
> work for you.
> 
>  
> 
> I’d like to clear one tiny misconception here when you write:
> 
>  
> 
> >> - The same issue happens even if I use an idle watermark.
> 
>  
> 
> You would expect to see glitches with watermarking when you enable idleness.
> 
> Idleness sort of trades watermark correctness for reduces latency when 
> processing timers (much 

Re: Is there any way to get the ExecutionConfigurations in Dynamic factory class

2022-04-08 Thread Qingsheng Ren
Hi Anitha,

AFAIK DynamicTableSourceFactory doesn’t expose interface for getting 
parallelism. Could you elaborate on why you need parallelism in table factory? 
Maybe we could find other ways to fulfill your requirement. 

Best regards, 

Qingsheng

> On Apr 7, 2022, at 16:11, Anitha Thankappan  
> wrote:
> 
> Hi 
> 
> I have developed a BigQuery Flink connector by implementing 
> DynamicTableSourceFactory.
> I have a requirement to :
>get the configured parallelism value of 
> StreamExecutionEnvironment in the Factory class.
>or
>set the parallelism at Factory class or Table source 
> class level.
> Please help me on this.
> 
> 
> Thanks and Regards,
> Anitha Thankappan
> 
> This message contains information that may be privileged or confidential and 
> is the property of the Quantiphi Inc and/or its affiliates. It is intended 
> only for the person to whom it is addressed. If you are not the intended 
> recipient, any review, dissemination, distribution, copying, storage or other 
> use of all or any portion of this message is strictly prohibited. If you 
> received this message in error, please immediately notify the sender by reply 
> e-mail and delete this message in its entirety



Re: FlinkKafkaProducer - Avro - Schema Registry

2022-04-07 Thread Qingsheng Ren
Hi Dan,

In FlinkKafkaProducer, records are serialized by the SerializationSchema 
specified in the constructor, which is the “schema” 
(ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class)) in 
your case, instead of the serializer specified in producer properties. The 
default serializer used by FlinkKafkaProducer is ByteArraySerializer, so the 
flow of serialization would be:

[AvroObject] -> SerializationSchema -> [Bytes] -> ByteArraySerializer -> [Bytes]

So I think removing KafkaAvroSerializer from producer config and use 
AvroSerializationSchema is the right way. As you mentioned that messages could 
not be consumed back successfully, could you provide more information about how 
you consume message from Kafka (like using KafkaSource by Flink or just a 
KafkaConsumer, maybe also the configuration you are using)?

Best regards,

Qingsheng


> On Apr 5, 2022, at 16:54, Dan Serb  wrote:
> 
> Hi guys,
>  
> I’m working on a solution where I ingest Kafka Records and I need to sink 
> them to another topic using Avro and Schema Registry.
> The problem I’m facing, is that I can’t find a suitable configuration that 
> actually works for me.
>  
> I’m going to explain.
>  
>   • I have a KafkaSource that consumes basically the initial stream of 
> data.
>   • I have an Operator that maps the kafka records to Avro Objects (Java 
> POJOs generated using mvn avro plugin, based on .avsc files)
>   • I register the schemas in Schema Registry using the mvn 
> schema-registry:register plugin/goal (registering the schema type as AVRO.
>   • I have a FlinkKafkaProducer where I provide a 
> serialization schema of type ConfluentRegistrySerializationSchema.
>  
> My Kafka Properties for the Producer:
>  
> kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> kafkaProps.put(
> KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, 
> "http://schemaregistry:38081;);
> kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> KafkaAvroSerializer.class);
> kafkaProps.put("auto.register.schemas", false);
> kafkaProps.put("use.latest.version", true);
>  
> As I learned from other tutorials/articles, I need to basically use 
> KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
> This will bring me eventually in the place from KafkaAvroSerializer, where 
> based on how the record actually looks, it will get me the schema, it will go 
> to the schema registry and bring the schema for the needed record, and 
> serialize it before it gets sent.
> The problem I’m having, is that, in the FlinkKafkaProducer class, in invoke() 
> method, the keyedSchema is null in my case, but kafkaSchema is not null, and 
> it basically does a ‘pre-serialization’ that is transforming my Record into a 
> byte[]. This has an effect when it ends up in the KafkaAvroSerializer, as the 
> Record is already a byte[] and it basically returns back a schema of type 
> “bytes” instead of returning the schema I have for that SpecificRecord. And 
> when it brings the propper schema from the schema registry, it basically 
> fails for not being compatible. Schema {} is not compatible with schema of 
> type “bytes”.
>  
> For more context, this is how my Processor looks at this moment.
>  
> DataStream kafkaRecords =
> env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka");
> 
> SingleOutputStreamOperator producedRecords =
> kafkaRecords
> .map(
> value -> {
>   String kafkaKey = value.get(KEY).asText();
>   String kafkaRecordJson = 
> MAPPER.writeValueAsString(value.get(VALUE));
>   return Converter.convert(kafkaKey, kafkaRecordJson);
> })
> .returns(TypeInformation.of(AvroObject.class));
> 
> AvroSerializationSchema schema =
> 
> ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class);
> 
> FlinkKafkaProducer< AvroObject > kafkaProducer =
> new FlinkKafkaProducer<>("sink_topic", schema, kafkaProps);
> 
> producedRecords.addSink(kafkaProducer);
> 
> env.execute();
>  
> Exception:
> Caused by: java.io.IOException: Incompatible schema { avro schema here} }with 
> refs [] of type AVRO for schema "bytes".
>  
> PS: If I remove the KafkaAvroSerializer from the producer properties, it 
> works fine, but when I consume the messages, the first message gets consumed 
> but the values from the record are default ones. And the second message 
> throws exception EOFExcetion – could not debug yet exactly the cause. It 
> seems like, when I don’t have the KafkaAvroSerializer, is not actually going 
> to use the schema registry to get the schema back and use that as a 
> serializer, so I definitely need to have that there, but I still think I need 
> to do some more config changes maybe in other places, because it’s definitely 
> not working as expected.
>  
> Thanks a lot!
> I would appreciate at least some points where I could investigate more and if 
> there 

Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread Qingsheng Ren
Hi,

If the schema of records is not fixed I’m afraid you have to do it in UDTF. 

Best, 

Qingsheng

> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks for your quick response! 
> 
> And I tried the format "raw", seems it only support single physical column, 
> and in our project reqiurement, there are more than one hundred columns in 
> sink table. So I need combine those columns into one string in a single UDF?
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> 
> At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
> >Hi,
> >
> >You can construct the final json string in your UDTF, and write it to Kafka 
> >sink table with only one field, which is the entire json string constructed 
> >in UDTF, and use raw format [1] in the sink table:
> >
> >CREATE TABLE TableSink (
> >`final_json_string` STRING
> >) WITH (
> >‘connector’ = ‘kafka’,
> >‘format’ = ‘raw’,
> >...
> >)
> >
> >So the entire flow would be like:
> >
> >1. Kafka source table reads data
> >2. UDTF parses the ‘content’ field, and construct the final json (id, 
> >content without backslash) string you need, maybe using Jackson [2] or other 
> >json tools
> >3. Insert the constructed json string as the only field in sink table
> >
> >The key problem is that the schema of field “content” is not fixed, so you 
> >have to complete most logics in UDTF. 
> >
> >[1] 
> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
> >[2] https://github.com/FasterXML/jackson
> >
> >Best regards, 
> >
> >Qingsheng
> >
> >
> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> >> 
> >> Hi,
> >> 
> >> Thanks so much for your support! 
> >> 
> >> But sorry to say I'm still confused about it. No matter what the udf looks 
> >> like, the first thing I need confirm is the type of 'content' in 
> >> TableSink, what's the type of it should be, should I use type Row, like 
> >> this?
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` ROW
> >>  )
> >>  WITH (
> >>  ...
> >> );
> >> 
> >> This type is only suitable for source input {"schema": "schema_infos", 
> >> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> >> 
> >> But the json key name and format of 'content' in source is variable, if 
> >> the source input is 
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> 
> >> I should define `content` in TableSink with type `content` ROW >> STRING, BackgroundColor STRING, Height BIGINT>, like this:
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` ROW >> BIGINT>
> >>  )
> >>  WITH (
> >>  ...
> >> );
> >> 
> >> And in input json also might contains json array, like: 
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> >> 
> >> 
> >> So is there some common type I can use which can handle all input json 
> >> formats?  
> >> 
> >> Thanks so much!!
> >> 
> >> 
> >> Thanks && Regards,
> >> Hunk
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> At 2022-04-01 17:25:59, "Qingsheng Ren"  >> > wrote:
> >> >Hi, 
> >> >
> >> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >> >final json string manually. The key problem is that the field “content” 
> >> >is actually a STRING, although it looks like a json object. Cur

Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread Qingsheng Ren
Hi,

You can construct the final json string in your UDTF, and write it to Kafka 
sink table with only one field, which is the entire json string constructed in 
UDTF, and use raw format [1] in the sink table:

CREATE TABLE TableSink (
`final_json_string` STRING
) WITH (
‘connector’ = ‘kafka’,
‘format’ = ‘raw’,
...
)

So the entire flow would be like:

1. Kafka source table reads data
2. UDTF parses the ‘content’ field, and construct the final json (id, content 
without backslash) string you need, maybe using Jackson [2] or other json tools
3. Insert the constructed json string as the only field in sink table

The key problem is that the schema of field “content” is not fixed, so you have 
to complete most logics in UDTF. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
[2] https://github.com/FasterXML/jackson

Best regards, 

Qingsheng


> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks so much for your support! 
> 
> But sorry to say I'm still confused about it. No matter what the udf looks 
> like, the first thing I need confirm is the type of 'content' in TableSink, 
> what's the type of it should be, should I use type Row, like this?
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` ROW
>  )
>  WITH (
>  ...
> );
> 
> This type is only suitable for source input {"schema": "schema_infos", 
> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> 
> But the json key name and format of 'content' in source is variable, if the 
> source input is 
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> I should define `content` in TableSink with type `content` ROW BackgroundColor STRING, Height BIGINT>, like this:
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` ROW
>  )
>  WITH (
>  ...
> );
> 
> And in input json also might contains json array, like: 
> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> 
> 
> So is there some common type I can use which can handle all input json 
> formats?  
> 
> Thanks so much!!
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> At 2022-04-01 17:25:59, "Qingsheng Ren"  > wrote:
> >Hi, 
> >
> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >final json string manually. The key problem is that the field “content” is 
> >actually a STRING, although it looks like a json object. Currently the json 
> >format provided by Flink could not handle this kind of field defined as 
> >STRING. Also considering the schema of this “content” field is not fixed 
> >across records, Flink SQL can’t use one DDL to consume / produce records 
> >with changing schema. 
> >
> >Cheers,
> >
> >Qingsheng
> >
> >> On Mar 31, 2022, at 21:42, wang <
> 24248...@163.com
> > wrote:
> >> 
> >> Hi dear engineer,
> >> 
> >> Thanks so much for your precious time reading my email!
> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> and encountered one problem about json format data, hope you can take a 
> >> look, thanks! Below is the description of my issue.
> >> 
> >> I use kafka as source and sink, I define kafka source table like this:
> >> 
> >>  CREATE TABLE TableSource (
> >>   schema STRING,
> >>   payload ROW(
> >>   `id` STRING,
> >>   `content` STRING
> >>  )
> >>  )
> >>  WITH (
> >>  'connector' = 'kafka',
> >>  'topic' = 'topic_source',
> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >>  '
> properties.group.id
> ' = 'all_gp',
> >>  'scan.startup.mode' = 'group-offsets',
> >>  'format' = 'json',

Re: Unbalanced distribution of keyed stream to downstream parallel operators

2022-04-01 Thread Qingsheng Ren
Hi Isidoros,

Two assumptions in my mind: 

1. Records are not evenly distributed across different keys, e.g. some 
accountId just has more events than others. If the record distribution is 
predicable, you can try to combine other fields or include more information 
into the key field to help balancing the distribution. 

2. Keys themselves are not distributed evenly. In short the subtask ID that a 
key belongs to is calculated by murmurHash(key.hashCode()) % maxParallelism, so 
if the distribution of keys is quite strange, it’s possible that most keys drop 
into the same subtask with the algorithm above. AFAIK there isn't such kind of 
metric for monitoring number of keys in a subtask, but I think you can simply 
investigate it with a map function after keyBy. 

Hope this would be helpful!

Qingsheng

> On Apr 1, 2022, at 17:37, Isidoros Ioannou  wrote:
> 
> Hello,
> 
> we ran a flink application version 1.13.2 that consists of a kafka source 
> with one partition so far
> then we filter the data based on some conditions, mapped to POJOS and we 
> transform to a KeyedStream based on an accountId long property from the POJO. 
> The downstream operators are 10 CEP operators that run with parallelism of 14 
> and the maxParallelism is set to the (operatorParallelism * 
> operatorParallelism).
> As you see in the image attached the events are distributed unevenly so some 
> subtasks are busy and others are idle.
> Is there any way to distribute evenly the load to the subtasks? Thank you in 
> advance.
> 
>  



Re: Could you please give me a hand about json object in flink sql

2022-04-01 Thread Qingsheng Ren
Hi, 

I’m afraid you have to use a UDTF to parse the content and construct the final 
json string manually. The key problem is that the field “content” is actually a 
STRING, although it looks like a json object. Currently the json format 
provided by Flink could not handle this kind of field defined as STRING. Also 
considering the schema of this “content” field is not fixed across records, 
Flink SQL can’t use one DDL to consume / produce records with changing schema. 

Cheers,

Qingsheng

> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
> 
> Hi dear engineer,
> 
> Thanks so much for your precious time reading my email!
> Resently I'm working on the Flink sql (with version 1.13) in my project and 
> encountered one problem about json format data, hope you can take a look, 
> thanks! Below is the description of my issue.
> 
> I use kafka as source and sink, I define kafka source table like this:
> 
>  CREATE TABLE TableSource (
>   schema STRING,
>   payload ROW(
>   `id` STRING,
>   `content` STRING
>  )
>  )
>  WITH (
>  'connector' = 'kafka',
>  'topic' = 'topic_source',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'all_gp',
>  'scan.startup.mode' = 'group-offsets',
>  'format' = 'json',
>  'json.fail-on-missing-field' = 'false',
>  'json.ignore-parse-errors' = 'true'
>  );
> 
> Define the kafka sink table like this:
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` STRING NOT NULL
>  )
>  WITH (
>  'connector' = 'kafka',
>  'topic' = 'topic_sink',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'format' = 'json',
>  'json.fail-on-missing-field' = 'false',
>  'json.ignore-parse-errors' = 'true'
> );
> 
> 
> Then insert into TableSink with data from TableSource:
> INSERT INTO TableSink SELECT id, content FROM TableSource;
> 
> Then I use "kafka-console-producer.sh" to produce data below into topic 
> "topic_source" (TableSource):
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"name\":\"Jone\",\"age\":20}"}}
> 
> 
> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
> output is:
> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
> 
> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
> I want the the value of "content" is json object, not json string.
> 
> And what's more, the format of "content" in TableSource is not fixed, it can 
> be any json formated(or json array format) string, such as:
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> 
> So my question is, how can I transform json format string(like 
> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
> (like{"name":"Jone","age":20} ).
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
>  



Re: Where is the "Partitioned All Cache" doc?

2022-03-28 Thread Qingsheng Ren
Hi, 

The optimization you mentioned is only applicable for the product provided by 
Alibaba Cloud. In open-source Apache Flink there isn’t a unique caching 
abstraction for all lookup tables, and each connector has there own cache 
implementation. For example JDBC uses Guava cache and FileSystem uses in-memory 
HashMap, and both of them don’t load all records in dim table into the cache. 

Best, 

Qingsheng


> On Mar 28, 2022, at 12:26, dz902  wrote:
> 
> Hi,
> 
> I've read some docs
> (https://help.aliyun.com/document_detail/182011.html) stating Flink
> optimization technique using:
> 
> - partitionedJoin = 'true'
> - cache = 'ALL'
> - blink.partialAgg.enabled=true
> 
> However I could not find any official doc references. Are these
> supported at all?
> 
> Also "partitionedJoin" seemed to have the effect of shuffling input by
> joining key so they can fit into memory. I read this
> (https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html)
> and believes this is already a default behavior of Flink.
> 
> Is this optimization not needed even for huge input tables?
> 
> Thanks,
> Dai



Re: Datetime format

2022-03-28 Thread Qingsheng Ren
Hi,

File system table sink doesn’t provide APIs for changing the prefix or suffix 
of the generated filename. Maybe you can consider trying DataStream connector 
and set OutputFileConfig manually to specify prefix and suffix of generating 
filenames.

Best, 

Qingsheng

> On Mar 28, 2022, at 13:10, lan tran  wrote:
> 
> Hi team, 
> So basically, when I use Flink Table API to generate the files and store in 
> S3. The format files will be like this 
> part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any 
> way that we can config this files names (by adding the last_modified_value) 
> to this files name ?
> 
> Best,
> Quynh
>  
>  
>  
> Sent from Mail for Windows



Re: Flink kafka consumer disconnection, application processing stays behind

2022-03-25 Thread Qingsheng Ren
Hi Isidoros, 

Your watermark strategy looks fine to me. I’m not quite sure if it is related. 

Best regards, 

Qingsheng

> On Mar 24, 2022, at 21:11, Isidoros Ioannou  wrote:
> 
> Hi Qingsheng,
> 
> thank you a lot for you response.
> The message I see from the consumer before the log exception I provided 
> previously is this:
> "locationInformation": 
> "org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:778)",
> "logger": "org.apache.kafka.clients.NetworkClient",
> "message": "[Consumer 
> clientId=consumer-realtime-analytics-eu-production-node2-2, 
> groupId=realtime-analytics-eu-production-node2] Disconnecting from node 3 due 
> to request timeout."
> 
> I saw it in debug mode and thats the reason I increased the 
> "request.timeout.ms". 
> I will follow your advice and investigate the broker logs once the event 
> occurs again.
> 
> Regarding the backpressure. the 10 cep operators we have, use some iterative 
> conditions that add some burden and in periods of high load the operators are 
> getting red in flink ui so these add the backpressure.
> However, in mediocre load the operators are performing fine, except when we 
> have disconnections. It seems that after the disconnection the watermarks are 
> not emmited quickly causing the operators not to release the
> data to sinks. I don't know actually if I have helped, but is there any 
> chance that it would be a problem of how we have configured the watermarks?
> 
> Στις Πέμ 24 Μαρ 2022 στις 10:27 π.μ., ο/η Qingsheng Ren  
> έγραψε:
> Hi Isidoros,
> 
> I’m not sure in which kind of way the timeout and the high back pressure are 
> related, but I think we can try to resolve the request timeout issue first. 
> You can take a look at the request log on Kafka broker and see if the request 
> was received by broker, and how long it takes for broker to handle it. By 
> default the request log is on WARN level, and you may want to increase it to 
> DEBUG or TRACE to reveal more information. 
> 
> Another thought in my mind is about the content of the record, since you 
> mentioned extremely high back pressure after the disconnection issue. If some 
> messages are quite large or complex, it might block the network or require 
> more resources to make the serde, even burden some operator in the pipeline 
> and finally lead to back pressure. Once the back pressure happens in the 
> pipeline, you can try to locate the operator causing the back pressure and 
> make some analysis to see why the throughput drops, or dump the record to see 
> if there’s something special in it. 
> 
> Hope these could be helpful! 
> 
> Best regards, 
> 
> Qingsheng
> 
> > On Mar 23, 2022, at 19:19, Isidoros Ioannou  wrote:
> > 
> > Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is 
> > a kafka topic with one partition so far and we are using the 
> > FlinkKafkaConsumer (kafka-connector-1.13.2) 
> > Sometimes we get some errors from the consumer like the below:
> > 
> > "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)",
> > "logger": "org.apache.kafka.clients.FetchSessionHandler",
> > "message": "[Consumer 
> > clientId=consumer-realtime-analytics-eu-production-node2-2, 
> > groupId=realtime-analytics-eu-production-node2] Error sending fetch request 
> > (sessionId=1343463307, epoch=172059) to node 3: 
> > org.apache.kafka.common.errors.DisconnectException.",
> > "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map 
> > -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0",
> > 
> > With the debug logging it appeared that this happens due to request timeout 
> > so I have increased the request.timeout.ms to 6 , however it did not 
> > resolve the issue. Even if I get the disconnection I can see that after 1s 
> > the consumer sends a successful fetchRequest.
> > 
> > The problem we have noticed is that after the disconnection the application 
> > stays behind from processing. the backpressure on the source gets 100% and 
> > the app consumes events at a lower rate even if we do not have much traffic 
> > to cope with. 
> > 
> > We use eventTime and the watermarks are not generated in the consumer since 
> > we have one partition. the source is the following
> > 
> > DataStream stream =
> > 
> > env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> 
> > 

Re: Flink kafka consumer disconnection, application processing stays behind

2022-03-24 Thread Qingsheng Ren
Hi Isidoros,

I’m not sure in which kind of way the timeout and the high back pressure are 
related, but I think we can try to resolve the request timeout issue first. You 
can take a look at the request log on Kafka broker and see if the request was 
received by broker, and how long it takes for broker to handle it. By default 
the request log is on WARN level, and you may want to increase it to DEBUG or 
TRACE to reveal more information. 

Another thought in my mind is about the content of the record, since you 
mentioned extremely high back pressure after the disconnection issue. If some 
messages are quite large or complex, it might block the network or require more 
resources to make the serde, even burden some operator in the pipeline and 
finally lead to back pressure. Once the back pressure happens in the pipeline, 
you can try to locate the operator causing the back pressure and make some 
analysis to see why the throughput drops, or dump the record to see if there’s 
something special in it. 

Hope these could be helpful! 

Best regards, 

Qingsheng

> On Mar 23, 2022, at 19:19, Isidoros Ioannou  wrote:
> 
> Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is a 
> kafka topic with one partition so far and we are using the FlinkKafkaConsumer 
> (kafka-connector-1.13.2) 
> Sometimes we get some errors from the consumer like the below:
> 
> "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)",
> "logger": "org.apache.kafka.clients.FetchSessionHandler",
> "message": "[Consumer 
> clientId=consumer-realtime-analytics-eu-production-node2-2, 
> groupId=realtime-analytics-eu-production-node2] Error sending fetch request 
> (sessionId=1343463307, epoch=172059) to node 3: 
> org.apache.kafka.common.errors.DisconnectException.",
> "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map -> 
> Filter -> Timestamps/Watermarks -> Filter (1/1)#0",
> 
> With the debug logging it appeared that this happens due to request timeout 
> so I have increased the request.timeout.ms to 6 , however it did not 
> resolve the issue. Even if I get the disconnection I can see that after 1s 
> the consumer sends a successful fetchRequest.
> 
> The problem we have noticed is that after the disconnection the application 
> stays behind from processing. the backpressure on the source gets 100% and 
> the app consumes events at a lower rate even if we do not have much traffic 
> to cope with. 
> 
> We use eventTime and the watermarks are not generated in the consumer since 
> we have one partition. the source is the following
> 
> DataStream stream =
> 
> env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> 
> !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId);
> 
> and then we assign the following watermark: 
> 
> WatermarkStrategy. >forBoundedOutOfOrderness(Duration.ofSeconds(3))
> .withTimestampAssigner((element, recordTimestamp) -> 
> element.getMessage().getDateTime().atZone(journalTimezone).toInstant()
> .toEpochMilli()).withIdleness(Duration.ofMinutes(1));
> 
> the upstream operators are 10 cep operators with a parallelism of 15 and then 
> there is a union of the data emitted from the CEP operators and added to 
> firehose sink.
> Another thing is that we ran two parallel instances of the same application 
> i.e two kinesis analytics nodes (one for debug purposes), the debug node has 
> checkpointing disabled.
> 
> Could you please give me some advice on where to look to find a solution to 
> this issue?
> Thanks in advance
> 
> 



Re: how to set kafka sink ssl properties

2022-03-18 Thread Qingsheng Ren
Hi, 

Your usage looks good to me, but could you provide the exception (if any) or 
the unexpected behavior you met after starting the job? It’s difficult to debug 
with only these configurations. 

Best regards, 

Qingsheng

> On Mar 18, 2022, at 01:04, HG  wrote:
> 
> Hi Matthias,
> 
> It should be probably be like this:
> 
> Properties SinkkafkaProps  = new Properties();
> SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> outputBrokers);
> SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType);
> SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation);
> SinkkafkaProps.setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs);
> SinkkafkaProps.setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint);
> 
> 
> KafkaSink kSink = KafkaSink.builder()
> .setBootstrapServers(outputBrokers)
> .setKafkaProducerConfig(SinkkafkaProps)
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic(kafkaOutputTopic)
> .setValueSerializationSchema(new SimpleStringSchema())
> .build()
> )
> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> .build();
> 
> Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl :
> Could you share more details on what's not working? Is the 
> ssl.trustore.location accessible from the Flink nodes?
> 
> Matthias
> 
> On Thu, Mar 17, 2022 at 4:00 PM HG  wrote:
> Hi all,
> I am probably not the smartest but I cannot find how to set ssl-properties 
> for a Kafka Sink. 
> My assumption was that it would be just like the Kafka Consumer
> 
> KafkaSource source = KafkaSource.builder()
> .setProperties(kafkaProps)
> .setProperty("ssl.truststore.type", trustStoreType)
> .setProperty("ssl.truststore.password", trustStorePassword)
> .setProperty("ssl.truststore.location", trustStoreLocation)
> .setProperty("security.protocol", securityProtocol)
> .setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs)
> .setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint)
> .setGroupId(inputGroupId)
> .setClientIdPrefix(clientId)
> .setTopics(kafkaInputTopic)
> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
> JSONKeyValueDeserializationSchema(fetchMetadata)))
> 
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .build();
> 
> But that seems not to be the case.
> 
> Any quick pointers?
> 
> Regards Hans-Peter



Re: Kafka source with multiple partitions loses data during savepoint recovery

2022-03-18 Thread Qingsheng Ren
Hi Sharon, 

Could you check the log after starting the job with savepoint? If you have INFO 
log enabled you will get an entry “Consumer subtask {} will start reading {} 
partitions with offsets in restored state: {}” [1] in the log, which shows the 
starting offset of partitions. This might be helpful to reveal the problem.

BTW FlinkKafkaConsumer has been marked as deprecated since 1.14. Please 
consider switching to the new KafkaSource if you are developing new 
applications. 

[1] 
https://github.com/apache/flink/blob/a2df2665b6ff411a2aeb9b204fd9d46a2af0ecfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L614-L618

Best regards, 

Qingsheng

> On Mar 18, 2022, at 13:28, Sharon Xie  wrote:
> 
> Hi, 
> 
> I'm seeing an odd behavior for Kafka source where some records are dropped 
> during recovery. 
> 
> My test set up is: Kafka source topic -> pass through flink job -> Kafka sink 
> topic
> There are 10 partitions in the source & sink topics.
> 
> Test Steps
> * Start the flink job, send 5 records (first batch) to the source topic, and 
> read the sink. I see all 5 records without issue.
> * Stop the job with a savepoint
> * Send another 10 records (second batch) to the source topic
> * Start the job with the savepoint
> 
> Expect: read from the beginning of the sink topic, I should see all 15 
> records from the first and second batches.
> Actual: Some random records in the second batches are missing.
> 
> My guess is that the savepoint only contains offsets with partitions that 
> received records from the first batch. Other partitions didn't have a state 
> and by default read from the `latest` offset during recovery. So records from 
> the second batch that fell into the previously empty partitions are never 
> processed. 
> 
> However, based on the source code, I'd expect the partitions without records 
> from the 1st batch to be initialized with `earliest-offset`. But this is not 
> the behavior I saw. What do I miss?
> 
> I'm using Flink 1.14.3. May I know  if there is anything I missed? If not, 
> what's the reason for such behavior? Otherwise, is this a bug?
> 
> 
> 
> Thanks,
> Sharon



Re: [External] Require help regarding possible issue/bug I'm facing while using Flink

2022-03-07 Thread Qingsheng Ren
Hi De Xun, 

Unfortunately MAP, ARRAY and ROW types are supported by Flink Parquet format 
only since Flink 1.15 (see FLINK-17782 [1], not released yet). You may want to 
upgrade Flink version to 1.15 once it is released, or make your own 
implementation based on the latest code on master branch for now.

Also answered on your StackOverflow post. Hope this would be helpful!

[1] https://issues.apache.org/jira/browse/FLINK-17782

Best Regards, 

Qingsheng

> On Mar 7, 2022, at 18:48, De Xun Chia  wrote:
> 
> Hi Qingsheng Ren,
> 
> 
> Thank you for the help! It worked and I have updated the StackOverflow post 
> with the correct source code answer so other people can use it.
> 
> 
> I am now faced with another issue though - I have created a new StackOverflow 
> post
>  for it. If it about writing out complex parquet data types. If you are able 
> to examine it please do, otherwise help me direct it to another dev who has 
> knowledge about it. 
> 
> 
> Have a great week ahead!
> 
> 
> Best Regards,
> 
> De Xun
> 
> 
> On Mon, Mar 7, 2022, 3:19 PM  wrote:
> Hi De Xun, 
> 
> I created an answer in the StackOverflow and hope it would be helpful. I’d 
> like repost my answer here for the convenience of people in mailing lists.
> 
> The first call of RowRowConverter::toInternal is an internal implementation 
> for making a deep copy of the StreamRecord emitted by table source, which is 
> independent from the converter in your map function. 
> 
> The reason of the NPE is that the RowRowConverter in the map function is not 
> initialized by calling RowRowConverter::open. You can use RichMapFunction 
> instead to invoke the RowRowConverter::open in RichMapFunction::open.
> 
> Best regards,
> 
> Qingsheng Ren
> 
> > On Mar 7, 2022, at 09:16, Chia De Xun .  wrote:
> > 
> > Greetings,
> > 
> > I'm facing a difficult issue/bug while working with Flink. Would definitely 
> > appreciate some official expert help on this issue. I have posted my 
> > problem on StackOverflow, but have no replies at the moment. 
> > 
> > Let me know if you have any questions/clarifications for me! It would be 
> > best appreciated.
> > 
> > Best Regards,
> > De Xun
> 



Re: Require help regarding possible issue/bug I'm facing while using Flink

2022-03-06 Thread Qingsheng Ren
Hi De Xun, 

I created an answer in the StackOverflow and hope it would be helpful. I’d like 
repost my answer here for the convenience of people in mailing lists.

The first call of RowRowConverter::toInternal is an internal implementation for 
making a deep copy of the StreamRecord emitted by table source, which is 
independent from the converter in your map function. 

The reason of the NPE is that the RowRowConverter in the map function is not 
initialized by calling RowRowConverter::open. You can use RichMapFunction 
instead to invoke the RowRowConverter::open in RichMapFunction::open.

Best regards,

Qingsheng Ren

> On Mar 7, 2022, at 09:16, Chia De Xun .  wrote:
> 
> Greetings,
> 
> I'm facing a difficult issue/bug while working with Flink. Would definitely 
> appreciate some official expert help on this issue. I have posted my problem 
> on StackOverflow, but have no replies at the moment. 
> 
> Let me know if you have any questions/clarifications for me! It would be best 
> appreciated.
> 
> Best Regards,
> De Xun



Re: Kafka Source Recovery Behavior

2021-11-16 Thread Qingsheng Ren
Hi Mason,

Sorry for my late response!
> quote_type
> “there was no logic to filter/remove splits”

Yes we indeed miss a split removal mechanism. Actually this is quite a tricky 
one considering exactly-once semantic: there’s risk of losing data if we remove 
a partition / topic from Kafka. There was a discussion about this topic in the 
user mailing list: 
https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt

An immature solution in my mind is that we can remove a split with the help of 
watermark. Once the watermark in a split has been pushed to end of global 
window, then we can assume that there’s no more new records in the split and we 
can remove it safely. But, this will invalidate all previous checkpoints 
because these split might not exist anymore in the external system (like topic 
has been removed in Kafka).

Hope this could answer your question and looking forward to your inspiring 
ideas!

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Nov 10, 2021, 11:32 PM +0800, Mason Chen , wrote:
>
> there was no logic to filter/remove splits


Re: 一些关于flink rabbitmq connector的疑问

2021-10-29 Thread Qingsheng Ren
Hi Ken, 

Thanks for reaching out and sorry for making confusion!

Like Leonard mentioned we definitely honor every connector in Flink community. 
And under the situation that we have more and more connectors to maintain and 
limited guys and resources focusing on connector-wise issues, some connectors 
like Kafka and JDBC might receive more support.

I’m very glad to see that RabbitMQ connector has a lot of active users, and 
also will be appreciate if friends from RabbitMQ side could help us improve 
RabbitMQ connector, like migrating to FLIP-27 Source API and FLIP-143 Sink API~

感谢联系,也很抱歉之前的表述产生了误解!

如 Leonard 所说我们重视 Flink 社区中的每一个 connector。考虑到目前我们有越来越多的 connector 需要维护,并且专注于 
connector 的人手和资源都比较有限,有些像 Kafka 和 JDBC 这样的 connector 可能会获得更多的支持。

非常开心能看到 RabbitMQ connector 有如此多的活跃用户,也希望来自 RabbitMQ 社区的朋友能够帮助我们改进 RabbitMQ 
connector,比如迁移至 FLIP-27 Source API 和 FLIP-143 Sink API~



> 2021年10月29日 下午12:52,Leonard Xu  写道:
> 
> Hi, Peng
> 
> There’s no doubt that RabbitMQ is a good open source community with active 
> users. 
> I understand what @renqschn means is that Flink RabbitMQ  Connector is one 
> connector with few users among the many connectors in the Flink project.  
> From my observation, the connector that is used more in the Flink project 
> should be Kafka. Filesystem, JDBC and so on. So, please help us to promote 
> Flink in the RabbitMQ community and let more RabbitMQ users know and then use 
> the Flink RabbitMQ Connector, which will give the Flink community more 
> motivation to improve the Flink RabbitMQ Connector.
> 
> Best,
> Leonard
> 
>> 在 2021年10月29日,11:13,Ken Peng  写道:
>> 
>> I am one of the Forum Moderators for RabbitMQ, which does have a lot of
>> active users. :)
>> If you have any questions about RMQ please subscribe to its official group
>> and ask there.
>> rabbitmq-users+subscr...@googlegroups.com
>> 
>> Regards.
>> 
>> 
>> On Fri, Oct 29, 2021 at 11:09 AM 任庆盛  wrote:
>> 
>>> 您好,
>>> 
>>> 从代码来看 RabbitMQ Sink 的确没有语义保证。目前 RabbitMQ
>>> 由于社区用户不多,相对的维护频率也比较低,如果感兴趣的话也欢迎您参与社区的贡献~
>>> 
>>> 
>>> 
 2021年10月28日 下午7:53,wx liao  写道:
 
 你好:
 
>>> 冒昧打扰,最近项目在使用flink,sink端是rabbitmq,但是查看项目源码发现RMQSink好像并没有对消息不丢失做保证,没有看到有使用waitForConfirm()或者是confirm
>>> listener,想问一下RMQSink部分是否没有保证at least once?希望可以得到解答,谢谢。
>>> 
>>> 
> 



Re: java.lang.LinkageError: loader constraint violation

2021-10-28 Thread Qingsheng Ren
你好,

可以检查一下 Flink 集群的 lib 目录下是不是同时存在 Kafka 相关的类,从异常来看应该是有类冲突。

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 28, 2021, 10:44 AM +0800, casel.chen , wrote:
> flink作业提交报如下异常,请问root cause是什么?要怎么修复呢?
>
>
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader 
> (instance of org/apache/flink/util/ChildFirstClassLoader) previously 
> initiated loading for a different type with name 
> "org/apache/kafka/clients/consumer/ConsumerRecord"
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
>
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>
> at java.lang.Class.getDeclaredMethods0(Native Method)
>
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
>
> at java.lang.Class.getDeclaredMethod(Class.java:2128)
>
> at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1643)
>
> at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
>
> at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
>
> at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
>
> at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
>
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685)
>
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668)
>
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1637)
>
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1623)


Re: 回复:回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-27 Thread Qingsheng Ren
你好!

如果使用的是基于 FLIP-27 实现的 KafkaSource,可以配置 enable.auto.commit = true 和 
auto.commit.interval.ms = {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交 
offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在 
checkpoint 时提交位点。

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 27, 2021, 4:59 PM +0800, 杨浩 , wrote:
> 请问有办法和现有监控兼容么?开启checkpoint时,让消费组的offset实时更新
> 在 2021-10-25 21:58:28,"杨浩"  写道:
> > currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
> > 在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
> > > Hi!
> > >
> > > 这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
> > > metrics 读取,见 [1]。
> > >
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
> > >
> > > 杨浩  于2021年10月25日周一 上午10:20写道:
> > >
> > > > 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


Re: Flink 1.14 doesn’t suppport kafka consummer 0.11 or lower?

2021-10-19 Thread Qingsheng Ren
Hi Jary,

Flink removed Kafka 0.10 & 0.11 connector since 1.12, because Kafka supports 
bidirectional compatibility since version 0.10, which means you can use a newer 
version client to communicate with your old version broker (e.g. Kafka client 
2.4.1 & Kafka broker 0.11) [1]. You can try to switch to a higher version Kafka 
client and it should work.

[1] https://kafka.apache.org/protocol.html#protocol_compatibility

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 20, 2021, 11:18 AM +0800, Jary Zhen , wrote:
> Hi, everyone
>
> I'm using Flink 1.14 to consume Kafka data, which version is 0.11. And there 
> are some errors while running.
> > quote_type
> > Caused by: java.lang.NoSuchMethodError: 
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
> > at 
> > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:113)
> > at 
> > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> > at 
> > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> > at 
> > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> > at 
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> After Checking the Flink-connector-kafka code.
> consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT))
> It seems  the current Flink version doesn't support the low Kafka version. 
> Which use poll( long timeout ) not poll(Duration timeout)
> public ConsumerRecords poll(long timeout)
> So. is this  a bug or The Flink user must use high Kafka version.


Re: SplitFetcherManager custom error handler

2021-10-18 Thread Qingsheng Ren
Hi Mason,

It’ll be great to have your contribution! Also could you provide more specific 
descriptions about your use case? It looks like you are implementing a custom 
Kafka connector so I’m not sure if handling the exception directly in the split 
reader is a possible solution.

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 19, 2021, 8:31 AM +0800, Mason Chen , wrote:
> Hi all,
>
> I am implementing a Kafka connector with some custom error handling that is 
> aligned with our internal infrastructure. `SplitFetcherManager` has a 
> hardcoded error handler in the constructor and I was wondering if it could be 
> exposed by the classes that extend it. Happy to contribute if people are 
> interested.
>
> Best,
> Mason


Re: Problem with Flink job and Kafka.

2021-10-18 Thread Qingsheng Ren
Hi Marco,

Sorry I forgot to cc the user mailing list just now.

From the exception message it looks like a versioning issue. Could you provide 
some additional information, such as Flink & Kafka connector version, Kafka 
broker version, and full exception stack? Also it will be helpful to paste part 
of your code (on DataStream API) or SQL (on Table & SQL API).

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 19, 2021, 9:28 AM +0800, Marco Villalobos , 
wrote:
> I have the simplest Flink job that simply deques off of a kafka topic and 
> writes to another kafka topic, but with headers, and manually copying the 
> event time into the kafka sink.
>
> It works as intended, but sometimes I am getting this error:
>
> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
> write a non-default producerId at version 1.
>
> Does anybody know what this means and how to fix this?
>
> Thank you.
>


Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread Qingsheng Ren
Thanks for driving this discussion Arvid! I think this will be one giant leap 
for Flink community. Externalizing connectors would give connector developers 
more freedom in developing, releasing and maintaining, which can attract more 
developers for contributing their connectors and expand the Flink ecosystems.

Considering the position for hosting connectors, I prefer to use an individual 
organization outside Apache umbrella. If we keep all connectors under Apache, I 
think there’s not quite difference comparing keeping them in the Flink main 
repo. Connector developers still require permissions from Flink committers to 
contribute, and release process should follow Apache rules, which are against 
our initial motivations of externalizing connectors.

Using an individual Github organization will maximum the freedom provided to 
developers. An ideal structure in my mind would be like 
"github.com/flink-connectors/flink-connector-xxx". The new established 
flink-extended org might be another choice, but considering the amount of 
connectors, I prefer to use an individual org for connectors to avoid flushing 
other repos under flink-extended.

In the meantime, we need to provide a well-established standard / guideline for 
contributing connectors, including CI, testing, docs (maybe we can’t provide 
resources for running them, but we should give enough guide on how to setup 
one) to keep the high quality of connectors. I’m happy to help building these 
fundamental bricks. Also since Kafka connector is widely used among Flink 
users, we can make Kafka connector a “model” of how to build and contribute a 
well-qualified connector into Flink ecosystem, and we can still use this 
trusted one for Flink E2E tests.

Again I believe this will definitely boost the expansion of Flink ecosystem. 
Very excited to see the progress!

Best,

Qingsheng Ren
On Oct 15, 2021, 8:47 PM +0800, Arvid Heise , wrote:
> Dear community,
> Today I would like to kickstart a series of discussions around creating an 
> external connector repository. The main idea is to decouple the release cycle 
> of Flink with the release cycles of the connectors. This is a common approach 
> in other big data analytics projects and seems to scale better than the 
> current approach. In particular, it will yield the following changes.
>  • Faster releases of connectors: New features can be added more quickly, 
> bugs can be fixed immediately, and we can have faster security patches in 
> case of direct or indirect (through dependencies) security flaws. • New 
> features can be added to old Flink versions: If the connector API didn’t 
> change, the same connector jar may be used with different Flink versions. 
> Thus, new features can also immediately be used with older Flink versions. A 
> compatibility matrix on each connector page will help users to find suitable 
> connector versions for their Flink versions. • More activity and 
> contributions around connectors: If we ease the contribution and development 
> process around connectors, we will see faster development and also more 
> connectors. Since that heavily depends on the chosen approach discussed 
> below, more details will be shown there. • An overhaul of the connector page: 
> In the future, all known connectors will be shown on the same page in a 
> similar layout independent of where they reside. They could be hosted on 
> external project pages (e.g., Iceberg and Hudi), on some company page, or may 
> stay within the main Flink reposi    tory. Connectors may receive some sort 
> of quality seal such that users can quickly access the production-readiness 
> and we could also add which community/company promises which kind of support. 
> • If we take out (some) connectors out of Flink, Flink CI will be faster and 
> Flink devs will experience less build stabilities (which mostly come from 
> connectors). That would also speed up Flink development.
> Now I’d first like to collect your viewpoints on the ideal state. Let’s first 
> recap which approaches, we currently have:
>  • We have half of the connectors in the main Flink repository. Relatively 
> few of them have received updates in the past couple of months. • Another 
> large chunk of connectors are in Apache Bahir. It recently has seen the first 
> release in 3 years. • There are a few other (Apache) projects that maintain a 
> Flink connector, such as Apache Iceberg, Apache Hudi, and Pravega. • A few 
> connectors are listed on company-related repositories, such as Apache Pulsar 
> on StreamNative and CDC connectors on Ververica.
> My personal observation is that having a repository per connector seems to 
> increase the activity on a connector as it’s easier to maintain. For example, 
> in Apache Bahir all connectors are built against the same Flink version, 
> which may not be desirable when c

Re: KafkaFetcher [] - Committing offsets to Kafka failed.

2021-08-27 Thread Qingsheng Ren
Hi Hemant,

One possible reason is that another Kafka consumer is using the same consumer 
group id as the one in FlinkKafkaConsumer. You can try to use another group.id 
in FlinkKafkaConsumer to validate this.

If it’s not group id’s problem, there are some Kafka consumer metrics [1] that 
might be helpful for debugging this, such as “time-between-poll-avg”, 
“heartbeat-rate” and so forth, to check whether it’s poll interval’s problem as 
suggested by Kafka’s exception. All Kafka consumer metrics are registered under 
metric group “KafkaConsumer” in Flink’s metric system.

Besides, it might be helpful to set logging level of 
org.apache.kafka.clients.consumer to DEBUG or TRACE,  which can provide more 
information about why offset commit is failed.

Hope this can help you~

[1] https://kafka.apache.org/documentation/#consumer_monitoring

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Aug 26, 2021, 10:25 PM +0800, bat man , wrote:
> Hi,
>
> I am using flink 12.1 to consume data from kafka in a streaming job. Using 
> the flink-connector-kafka_2.12:1.12.1. Kafka broker version is 2.2.1
>  In logs I see warnings like this -
>
> 2021-08-26 13:36:49,903 WARN 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member.
> This means that the time between subsequent calls to poll() was longer than 
> the configured max.poll.interval.ms, which typically implies that the poll 
> loop is spending too much time message processing.
> You can address this either by increasing max.poll.interval.ms or by reducing 
> the maximum size of batches returned in poll() with max.poll.records.
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:790)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:910)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:890)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:258)
>
> I understand that this might not cause an issue as checkpointing is not 
> impacted, however metrics monitoring might as I am using burrow to monitor 
> group offsets. I have already tried to change below properties in kafka 
> producer configs -
>
>         kafkaProps.setProperty("max.poll.interval.ms","90");
>         kafkaProps.setProperty("max.poll.records","200");
>         kafkaProps.setProperty("heartbeat.interval.ms","1000");
>         kafkaProps.setProperty("request.timeout.ms","4");
>         kafkaProps.setProperty("session.timeout.ms","1");
> But the warnings are still present in the logs.
>
> In addition I see this error just before this warn -
> ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - 
> [Consumer clientId=consumer-3, groupId=xxx] Offset commit failed on partition 
> xxx-1 at offset 33651:
> The coordinator is not aware of this member.
>
> The code uses watermarkstrategy to extract timestamp and emit watermark.
>
> Any clue is much appreciated.
>
> Thanks,
> Hemant


Re: KafkaSource metrics

2021-05-25 Thread Qingsheng Ren
Hi Oscar,

Thanks for raising this problem! Currently metrics of KafkaConsumer are not 
registered in Flink as in FlinkKafkaConsumer. A ticket has been created on 
JIRA, and hopefully we can fix it in next release.

https://issues.apache.org/jira/browse/FLINK-22766

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com

On May 25, 2021, 2:35 PM +0800, 陳樺威 , wrote:
> Hi Ardhani,
>
> Thanks for your kindly reply.
>
> Our team use your provided metrics before, but the metrics disappear after 
> migrate to new KafkaSource.
>
> We initialize KafkaSource in following code.
> ```
val consumer: KafkaSource[T] = KafkaSource.builder()
 .setProperties(properties)
 .setTopics(topic)
 .setValueOnlyDeserializer(deserializer)
 
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
 .build()

env
 .fromSource(consumer, WatermarkStrategy.noWatermarks(), uid)
 .setParallelism(math.min(parallelism, env.getParallelism))
 .setMaxParallelism(parallelism)
 .name(uid).uid(uid)
 .rebalance
> ```
>
> Oscar
>
Ardhani Narasimha  於 2021年5月25日 週二 上午12:08寫道:
> Use below respectively
>
> flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - 
> Consumer rate
> flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer 
> lag
> flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit 
> latency
>
> unsure if reactive mode makes any difference.
> > On Mon, May 24, 2021 at 7:44 PM 陳樺威  wrote:
> > > Hello,
> > >
> > > Our team tries to test reactive mode and replace FlinkKafkaConsumer with 
> > > the new KafkaSource.
> > > But we can’t find the KafkaSource metrics list. Does anyone have any 
> > > idea? In our case, we want to know the Kafka consume delay and consume 
> > > rate.
> > >
> > > Thanks,
> > > Oscar
>
> ---
> IMPORTANT: The contents of this email and any attachments are confidential 
> and protected by applicable laws. If you have received this email by mistake, 
> please (i) notify the sender immediately; (ii) delete it from your database; 
> and (iii) do not disclose the contents to anyone or make copies thereof. 
> Razorpay accepts no liability caused due to any inadvertent/ unintentional 
> data transmitted through this email.
> ---


Re: Flink job消费kafka 失败,无法拿到offset值

2021-04-22 Thread Qingsheng Ren
你好 Jacob,

从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题:

1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。
2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 
的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。
3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 
TRACE,在日志中获取到更多的信息以帮助排查。

希望有所帮助!

[1] 
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/

—
Best Regards,

Qingsheng Ren
在 2021年4月14日 +0800 PM12:13,Jacob <17691150...@163.com>,写道:
> 有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
> Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。
>
> Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:
>
> java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
> Timeout of 6ms expired before the position for partition Test-topic-27
> could be determined
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
> 6ms expired before the position for partition Test-topic-27 could be
> determined
>
> 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。
>
> 请指教
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: kafka数据源jar包使用异常

2021-04-08 Thread Qingsheng Ren
Hi,

从错误来看是在作业 JAR 里面缺少了 Flink Kafka connector 相关的类。可以确认一下 JAR 包里面是否把 Flink Kafka 
connector 相关的类打进去了,在 Maven POM 依赖中引用了 Kafka connector 并不意味着一定会被打进作业 JAR 中。

--
Best Regards,

Qingsheng Ren
Real-time Computing Department, Alibaba Cloud
Alibaba Group
Email: renqs...@gmail.com


在 2021年4月7日 +0800 PM3:27,小猫爱吃鱼 <1844061...@qq.com>,写道:
> Hi,
>   我在使用flink-1.13的过程中,尝试使用kafka数据源。
>   
> 我把flink-example中的stream-WordCount进行了修改,使其从本地kafka读取数据,直接砸idea运行也结果良好,可以正常运行。但是使用mvn打包后的jar直接提交给本地编译的flink
>  binary(本地启动的standlone flink),会报以下异常。
>
>
> java.lang.RuntimeException: Could not look up the main(String[]) method from 
> the class org.apache.flink.streaming.examples.wordcount.WordCount2: 
> org/apache/flink/stream
> ing/connectors/kafka/KafkaDeserializationSchema
> at 
> org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)
> at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:161)
> at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:65)
> at 
> org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
> at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:851)
> at 
> org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:271)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
> at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema
> at java.lang.Class.getDeclaredMethods0(Native Method)
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
> at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
> at java.lang.Class.getMethod0(Class.java:3018)
> at java.lang.Class.getMethod(Class.java:1784)
> at 
> org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:307)
> ... 10 more
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ... 16 more
>
>
> 我查询的解决方式是在pom中改变对应的依赖,但是我不理解该如何处理,我在flink-example-stream的pom文件中找到了对于kafka-connector的依赖,上层的pom文件没有相关的依赖,请问我该如何处理这一问题?
> 我可以保证的是pim文件只有对于新增示例项目的修改,没有修改其他的依赖关系
>
>
> 非常感谢!


Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

2020-08-02 Thread Qingsheng Ren
Hi Vikash,

Sorry for the late reply. Is your version of Flink kafka *connector* 1.10.1
too? Actually it's a bug in the connector, so I think you need to upgrade
the connector to 1.10.1 too, not just Flink itself.

I tried Flink 1.10.0/1.10.1 + flink-kafka-connector 1.10.0 and indeed
reproduced the bug. After upgrading flink-kafka-connector to 1.10.1, the
error disappeared.

On Fri, Jul 31, 2020 at 7:02 PM Vikash Dat  wrote:

> Thanks for the reply. I am currently using 1.10 but also saw it happens in
> 1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to
> 1.10 at the moment. Are there any known work arounds?
>
> On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren  wrote:
>
>> Hi Vikash,
>>
>> It's a bug about classloader used in `abortTransaction()` method in
>> `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in
>> 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version
>> 1.10.0?
>>
>>
>> Vikash Dat  于2020年7月30日周四 下午9:26写道:
>>
>>> Has anyone had success with using exactly_once in a kafka producer in
>>> flink?
>>> As of right now I don't think the code shown in the docs
>>> (
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer
>>> )
>>> actually works.
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>
>>
>> --
>> Best Regards,
>>
>> *Qingsheng Ren*
>>
>> Electrical and Computer Engineering
>> Carnegie Mellon University
>>
>> Email: renqs...@gmail.com
>>
>

-- 
Best Regards,

*Qingsheng Ren*

Electrical and Computer Engineering
Carnegie Mellon University

Email: renqs...@gmail.com


Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

2020-07-31 Thread Qingsheng Ren
Hi Vikash,

It's a bug about classloader used in `abortTransaction()` method in
`FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in
1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version
1.10.0?


Vikash Dat  于2020年7月30日周四 下午9:26写道:

> Has anyone had success with using exactly_once in a kafka producer in
> flink?
> As of right now I don't think the code shown in the docs
> (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer
> )
> actually works.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards,

*Qingsheng Ren*

Electrical and Computer Engineering
Carnegie Mellon University

Email: renqs...@gmail.com