Re: [VOTE] Release 1.11.6/1.12.7/1.13.5/1.14.2, release candidate #1

2021-12-15 Thread Stephan Ewen
+1 (binding)

  - Verified commit history, looks good
  => stumbled over the changes in the "create_release_branch.sh ",
which are present in each release commit. [1]
  => agree that these are not an issue, because this is an out-of-band
release
  - Release notes for 1.14.2 are off, contain incorrect entry "FLINK-25222:
Remove NetworkFailureProxy used for Kafka connector tests"
  - Checked that released binaries and jars reference correct Scala versions
  - Ran streaming examples against binary releases for 1.12.7, 1.13.5,
1.14.2. Execution logs look correct.
  - Other checks (licenses, no binaries) carry over from previous releases

[1]
https://github.com/apache/flink/commit/6fd4b1c0ef2ddd12751889218445ce0e60ff6c80#diff-94c70ce1a0abddcd83314c83b46080d8edbcd919b737f316cd6f72006d464074


On Wed, Dec 15, 2021 at 5:54 PM Seth Wiesman  wrote:

> +1 (non-binding)
>
> - Checked diff of all versions and verified dep upgrade
> - Verified checksum and signatures
> - Built 1.14 from source
> - checked blog post
>
> Seth
>
> On Wed, Dec 15, 2021 at 10:22 AM Yu Li  wrote:
>
> > +1
> >
> > * Verified checksums and signatures
> > * Reviewed website PR
> >- Minor: left a comment to mention CVE-2021-45046
> > * Checked and confirmed new tags only contain log4j version bump
> > * Checked release notes and found no issues
> >- I've moved FLINK-25317 to 1.14.3
> >
> > Thanks for driving these releases Chesnay!
> >
> > Best Regards,
> > Yu
> >
> >
> > On Wed, 15 Dec 2021 at 21:29, Chesnay Schepler 
> wrote:
> >
> > > FYI; the publication of the python release for 1.11/1.12 will be
> delayed
> > > because we hit the project size limit on pypi again, and increasing
> that
> > > limit may take a while.
> > > On the positive side, this gives us more time to fix the mac builds.
> > >
> > > On 15/12/2021 03:55, Chesnay Schepler wrote:
> > > > Hi everyone,
> > > >
> > > > This vote is for the emergency patch releases for 1.11, 1.12, 1.13
> and
> > > > 1.14 to address CVE-2021-44228/CVE-2021-45046.
> > > > It covers all 4 releases as they contain the same changes (upgrading
> > > > Log4j to 2.16.0) and were prepared simultaneously by the same person.
> > > > (Hence, if something is broken, it likely applies to all releases)
> > > >
> > > > Note: 1.11/1.12 are still missing the Python Mac releases.
> > > >
> > > >
> > > > Please review and vote on the release candidate #1 for the versions
> > > > 1.11.6, 1.12.7, 1.13.5 and 1.14.2, as follows:
> > > > [ ] +1, Approve the releases
> > > > [ ] -1, Do not approve the releases (please provide specific
> comments)
> > > >
> > > > The complete staging area is available for your review, which
> includes:
> > > > * JIRA release notes [1],
> > > > * the official Apache source releases and binary convenience releases
> > > > to be deployed to dist.apache.org [2], which are signed with the key
> > > > with fingerprint C2EED7B111D464BA [3],
> > > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > > * source code tags [5],
> > > > * website pull request listing the new releases and adding
> > > > announcement blog post [6].
> > > >
> > > > The vote will be open for at least 24 hours. The minimum vote time
> has
> > > > been shortened as the changes are minimal and the matter is urgent.
> > > > It is adopted by majority approval, with at least 3 PMC affirmative
> > > > votes.
> > > >
> > > > Thanks,
> > > > Chesnay
> > > >
> > > > [1]
> > > > 1.11:
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351056
> > > > 1.12:
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351057
> > > > 1.13:
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351058
> > > > 1.14:
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351059
> > > > [2]
> > > > 1.11: https://dist.apache.org/repos/dist/dev/flink/flink-1.11.6-rc1/
> > > > 1.12: https://dist.apache.org/repos/dist/dev/flink/flink-1.12.7-rc1/
> > > > 1.13: https://dist.apache.org/repos/dist/dev/flink/flink-1.13.5-rc1/
> > > > 1.14: https://dist.apache.org/repos/dist/dev/flink/flink-1.14.2-rc1/
> > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > [4]
> > > > 1.11:
> > > >
> https://repository.apache.org/content/repositories/orgapacheflink-1460
> > > > 1.12:
> > > >
> https://repository.apache.org/content/repositories/orgapacheflink-1462
> > > > 1.13:
> > > >
> https://repository.apache.org/content/repositories/orgapacheflink-1459
> > > > 1.14:
> > > >
> https://repository.apache.org/content/repositories/orgapacheflink-1461
> > > > [5]
> > > > 1.11:
> https://github.com/apache/flink/releases/tag/release-1.11.6-rc1
> > > > 1.12:
> https://github.com/apache/flink/releases/tag/release-1.12.7-rc1
> > > > 1.13:
> https://github.com/apache/flink/releases/tag/release-1.13.5-rc1
> > > > 1.14:
> 

Re: [CANCELLED] Release 1.11.5/1.12.6/1.13.4/1.14.1, release candidate #1

2021-12-15 Thread Stephan Ewen
That's right, they are referenced in POMs published with the jars, though.
But that's minor.



On Wed, Dec 15, 2021 at 12:28 PM Chesnay Schepler 
wrote:

> AFAIK none of the jars we publish actually contains log4j.
> It's only bundled by the distribution/python binaries/docker images.
>
> Hence I don't think the jars help in this case.
>
> On 15/12/2021 10:42, Stephan Ewen wrote:
> > Given that these artifacts are published already, users can use them if
> > they want to update now:
> >
> > For example:
> > https://search.maven.org/artifact/org.apache.flink/flink-core/1.14.1/jar
> >
> > Just for the users that really want to update now (rather than rely on
> the
> > mitigation via config) and are not as much concerned about the remaining
> > weakness in log4j 2.15.0
> >
> > On Tue, Dec 14, 2021 at 11:18 PM Seth Wiesman 
> wrote:
> >
> >> Thank you for managing these updates Chesnay!
> >>
> >>
> >>
> >> On Tue, Dec 14, 2021 at 3:51 PM Chesnay Schepler 
> >> wrote:
> >>
> >>> Since the maven artifacts have already been published we will use the
> >>> next patch version for each release, i.e.:
> >>> 1.11.6
> >>> 1.12.7
> >>> 1.13.5
> >>> 1.14.2
> >>>
> >>> (We could technically just update the source/binaries, but that seems
> >>> fishy).
> >>>
> >>> On 14/12/2021 22:38, Chesnay Schepler wrote:
> >>>> I'm canceling the release because the issue was not fully fixed in
> >>>> Log4j 2.15.0; see CVE-2021-45046.
> >>>>
> >>>> I will start preparing new release candidates that use Log4j 2.16.0 .
> >>>>
> >>>> On 14/12/2021 21:28, Chesnay Schepler wrote:
> >>>>> The vote duration has passed and we have approved the releases.
> >>>>>
> >>>>> Binding votes:
> >>>>> * Stephan
> >>>>> * Till
> >>>>> * Xintong
> >>>>> * Zhu
> >>>>> * Gordon
> >>>>>
> >>>>> I will not finalize the release.
> >>>>>
> >>>>> On 13/12/2021 20:28, Chesnay Schepler wrote:
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> This vote is for the emergency patch releases for 1.11, 1.12, 1.13
> >>>>>> and 1.14 to address CVE-2021-44228.
> >>>>>> It covers all 4 releases as they contain the same changes (upgrading
> >>>>>> Log4j to 2.15.0) and were prepared simultaneously by the same
> person.
> >>>>>> (Hence, if something is broken, it likely applies to all releases)
> >>>>>>
> >>>>>> Please review and vote on the release candidate #1 for the versions
> >>>>>> 1.11.5, 1.12.6, 1.13.4 and 1.14.1, as follows:
> >>>>>> [ ] +1, Approve the releases
> >>>>>> [ ] -1, Do not approve the releases (please provide specific
> >> comments)
> >>>>>> The complete staging area is available for your review, which
> >> includes:
> >>>>>> * JIRA release notes [1],
> >>>>>> * the official Apache source releases and binary convenience
> >>>>>> releases to be deployed to dist.apache.org [2], which are signed
> >>>>>> with the key with fingerprint C2EED7B111D464BA [3],
> >>>>>> * all artifacts to be deployed to the Maven Central Repository [4],
> >>>>>>  * *the jars for 1.13/1.14 are still being built*
> >>>>>> * source code tags [5],
> >>>>>> * website pull request listing the new releases and adding
> >>>>>> announcement blog post [6].
> >>>>>>
> >>>>>> The vote will be open for at least 24 hours. The minimum vote time
> >>>>>> has been shortened as the changes are minimal and the matter is
> >> urgent.
> >>>>>> It is adopted by majority approval, with at least 3 PMC affirmative
> >>>>>> votes.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Chesnay
> >>>>>>
> >>>>>> [1]
> >>>>>> 1.11:
> >>>>>>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350327
> >>>>>> 1.12:
> >>>>>>
> >

Re: [ANNOUNCE] New Apache Flink Committer - Matthias Pohl

2021-12-15 Thread Stephan Ewen
Congratulations, Matthias, and welcome to the Flink committers!

On Mon, Dec 13, 2021 at 4:39 AM 刘建刚  wrote:

> Congratulations!
>
> Best
> Liu Jiangang
>
> Nicholas Jiang  于2021年12月13日周一 11:23写道:
>
> > Congratulations, Matthias!
> >
> > Best,
> > Nicholas Jiang
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk

2021-12-15 Thread Stephan Ewen
Congrats and welcome!

On Mon, Dec 13, 2021 at 4:39 AM 刘建刚  wrote:

> Congratulations!
>
> Best
> Liu Jiangang
>
> Nicholas Jiang  于2021年12月13日周一 11:28写道:
>
> > Congratulations, Ingo!
> >
> > Best,
> > Nicholas Jiang
> >
>


Re: [CANCELLED] Release 1.11.5/1.12.6/1.13.4/1.14.1, release candidate #1

2021-12-15 Thread Stephan Ewen
Given that these artifacts are published already, users can use them if
they want to update now:

For example:
https://search.maven.org/artifact/org.apache.flink/flink-core/1.14.1/jar

Just for the users that really want to update now (rather than rely on the
mitigation via config) and are not as much concerned about the remaining
weakness in log4j 2.15.0

On Tue, Dec 14, 2021 at 11:18 PM Seth Wiesman  wrote:

> Thank you for managing these updates Chesnay!
>
>
>
> On Tue, Dec 14, 2021 at 3:51 PM Chesnay Schepler 
> wrote:
>
> > Since the maven artifacts have already been published we will use the
> > next patch version for each release, i.e.:
> > 1.11.6
> > 1.12.7
> > 1.13.5
> > 1.14.2
> >
> > (We could technically just update the source/binaries, but that seems
> > fishy).
> >
> > On 14/12/2021 22:38, Chesnay Schepler wrote:
> > > I'm canceling the release because the issue was not fully fixed in
> > > Log4j 2.15.0; see CVE-2021-45046.
> > >
> > > I will start preparing new release candidates that use Log4j 2.16.0 .
> > >
> > > On 14/12/2021 21:28, Chesnay Schepler wrote:
> > >> The vote duration has passed and we have approved the releases.
> > >>
> > >> Binding votes:
> > >> * Stephan
> > >> * Till
> > >> * Xintong
> > >> * Zhu
> > >> * Gordon
> > >>
> > >> I will not finalize the release.
> > >>
> > >> On 13/12/2021 20:28, Chesnay Schepler wrote:
> > >>> Hi everyone,
> > >>>
> > >>> This vote is for the emergency patch releases for 1.11, 1.12, 1.13
> > >>> and 1.14 to address CVE-2021-44228.
> > >>> It covers all 4 releases as they contain the same changes (upgrading
> > >>> Log4j to 2.15.0) and were prepared simultaneously by the same person.
> > >>> (Hence, if something is broken, it likely applies to all releases)
> > >>>
> > >>> Please review and vote on the release candidate #1 for the versions
> > >>> 1.11.5, 1.12.6, 1.13.4 and 1.14.1, as follows:
> > >>> [ ] +1, Approve the releases
> > >>> [ ] -1, Do not approve the releases (please provide specific
> comments)
> > >>>
> > >>> The complete staging area is available for your review, which
> includes:
> > >>> * JIRA release notes [1],
> > >>> * the official Apache source releases and binary convenience
> > >>> releases to be deployed to dist.apache.org [2], which are signed
> > >>> with the key with fingerprint C2EED7B111D464BA [3],
> > >>> * all artifacts to be deployed to the Maven Central Repository [4],
> > >>> * *the jars for 1.13/1.14 are still being built*
> > >>> * source code tags [5],
> > >>> * website pull request listing the new releases and adding
> > >>> announcement blog post [6].
> > >>>
> > >>> The vote will be open for at least 24 hours. The minimum vote time
> > >>> has been shortened as the changes are minimal and the matter is
> urgent.
> > >>> It is adopted by majority approval, with at least 3 PMC affirmative
> > >>> votes.
> > >>>
> > >>> Thanks,
> > >>> Chesnay
> > >>>
> > >>> [1]
> > >>> 1.11:
> > >>>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350327
> > >>> 1.12:
> > >>>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350328
> > >>> 1.13:
> > >>>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350686
> > >>> 1.14:
> > >>>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350512
> > >>> [2]
> > >>> 1.11: https://dist.apache.org/repos/dist/dev/flink/flink-1.11.5-rc1/
> > >>> 1.12: https://dist.apache.org/repos/dist/dev/flink/flink-1.12.6-rc1/
> > >>> 1.13: https://dist.apache.org/repos/dist/dev/flink/flink-1.13.4-rc1/
> > >>> 1.14: https://dist.apache.org/repos/dist/dev/flink/flink-1.14.1-rc1/
> > >>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > >>> [4]
> > >>> 1.11/1.12:
> > >>>
> https://repository.apache.org/content/repositories/orgapacheflink-1455
> > >>> 1.13:
> > >>>
> https://repository.apache.org/content/repositories/orgapacheflink-1457
> > >>> 1.14:
> > >>>
> https://repository.apache.org/content/repositories/orgapacheflink-1456
> > >>> [5]
> > >>> 1.11:
> https://github.com/apache/flink/releases/tag/release-1.11.5-rc1
> > >>> 1.12:
> https://github.com/apache/flink/releases/tag/release-1.12.6-rc1
> > >>> 1.13:
> https://github.com/apache/flink/releases/tag/release-1.13.4-rc1
> > >>> 1.14:
> https://github.com/apache/flink/releases/tag/release-1.14.1-rc1
> > >>> [6] https://github.com/apache/flink-web/pull/489
> > >>>
> > >>
> > >
> >
> >
>


Re: [VOTE] Release 1.11.5/1.12.6/1.13.4/1.14.1, release candidate #1

2021-12-13 Thread Stephan Ewen
+1 (binding)

 - Verified that commit history is identical to previous release (except
dependency upgrade and release version commit)
 - Verified that the source releases reference updated dependency and
binary releases contain updated dependency
 - Blog post looks good
 - ran bundled examples against 1.14.1 binary release, worked as expected.

On Mon, Dec 13, 2021 at 9:22 PM Seth Wiesman  wrote:

> +1 (non-binding)
>
> - Checked Log4J version and updated license preambles on all releases
> - Verified signatures on sources
> - Reviewed blog post
>
> Seth
>
> On Mon, Dec 13, 2021 at 1:42 PM Jing Ge  wrote:
>
> > +1   LGTM. Many thanks for your effort!
> >
> > On Mon, Dec 13, 2021 at 8:28 PM Chesnay Schepler 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > This vote is for the emergency patch releases for 1.11, 1.12, 1.13 and
> > > 1.14 to address CVE-2021-44228.
> > > It covers all 4 releases as they contain the same changes (upgrading
> > > Log4j to 2.15.0) and were prepared simultaneously by the same person.
> > > (Hence, if something is broken, it likely applies to all releases)
> > >
> > > Please review and vote on the release candidate #1 for the versions
> > > 1.11.5, 1.12.6, 1.13.4 and 1.14.1, as follows:
> > > [ ] +1, Approve the releases
> > > [ ] -1, Do not approve the releases (please provide specific comments)
> > >
> > > The complete staging area is available for your review, which includes:
> > > * JIRA release notes [1],
> > > * the official Apache source releases and binary convenience releases
> to
> > > be deployed to dist.apache.org [2], which are signed with the key with
> > > fingerprint C2EED7B111D464BA [3],
> > > * all artifacts to be deployed to the Maven Central Repository [4],
> > >  * *the jars for 1.13/1.14 are still being built*
> > > * source code tags [5],
> > > * website pull request listing the new releases and adding announcement
> > > blog post [6].
> > >
> > > The vote will be open for at least 24 hours. The minimum vote time has
> > > been shortened as the changes are minimal and the matter is urgent.
> > > It is adopted by majority approval, with at least 3 PMC affirmative
> > votes.
> > >
> > > Thanks,
> > > Chesnay
> > >
> > > [1]
> > > 1.11:
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350327
> > > 1.12:
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350328
> > > 1.13:
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350686
> > > 1.14:
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350512
> > > [2]
> > > 1.11: https://dist.apache.org/repos/dist/dev/flink/flink-1.11.5-rc1/
> > > 1.12: https://dist.apache.org/repos/dist/dev/flink/flink-1.12.6-rc1/
> > > 1.13: https://dist.apache.org/repos/dist/dev/flink/flink-1.13.4-rc1/
> > > 1.14: https://dist.apache.org/repos/dist/dev/flink/flink-1.14.1-rc1/
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > > 1.11/1.12:
> > > https://repository.apache.org/content/repositories/orgapacheflink-1455
> > > 1.13:
> > > https://repository.apache.org/content/repositories/orgapacheflink-1457
> > > 1.14:
> > > https://repository.apache.org/content/repositories/orgapacheflink-1456
> > > [5]
> > > 1.11: https://github.com/apache/flink/releases/tag/release-1.11.5-rc1
> > > 1.12: https://github.com/apache/flink/releases/tag/release-1.12.6-rc1
> > > 1.13: https://github.com/apache/flink/releases/tag/release-1.13.4-rc1
> > > 1.14: https://github.com/apache/flink/releases/tag/release-1.14.1-rc1
> > > [6] https://github.com/apache/flink-web/pull/489
> > >
> >
>


[DISCUSS] Immediate dedicated Flink releases for log4j vulnerability

2021-12-12 Thread Stephan Ewen
Hi all!

Without doubt, you heard about the log4j vulnerability [1].

There is an advisory blog post on how to mitigate this in Apache Flink [2],
which involves setting a config option and restarting the processes. That
is fortunately a relatively simple fix.

Despite this workaround, I think we should do an immediate release with the
updated dependency. Meaning not waiting for the next bug fix releases
coming in a few weeks, but releasing asap.
The mood I perceive in the industry is pretty much panicky over this, and I
expect we will see many requests for a patched release and many discussions
why the workaround alone would not be enough due to certain guidelines.

I suggest that we preempt those discussions and create releases the
following way:

  - we take the latest already released versions from each release branch:
 ==> 1.14.0, 1.13.3, 1.12.5, 1.11.4
  - we add a single commit to those that just updates the log4j dependency
  - we release those as 1.14.1, 1.13.4, 1.12.6, 1.11.5, etc.
  - that way we don't need to do functional release tests, because the
released code is identical to the previous release, except for the log4j
dependency
  - we can then continue the work on the upcoming bugfix releases as
planned, without high pressure

I would suggest creating those RCs immediately and release them with a
special voting period (24h or so).

WDYT?

Best,
Stephan

[1] https://nvd.nist.gov/vuln/detail/CVE-2021-44228
[2] https://flink.apache.org/2021/12/10/log4j-cve.html


Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-24 Thread Stephan Ewen
Thanks for all the details and explanation.

With the conclusion of the discussion, also +1 from my side for this FLIP

On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li  wrote:

> Thanks Stephan and Timo, I have a rough look at your replies. They are
> all valuable opinions. I will take time to discuss, explain and
> improve them.
>
> Hi Timo,
> > At least a final "I will start the vote soon. Last call for comments."
> would have been nice.
>
> I replied in the DISCUSS thread that we began to vote. If there are
> supplementary comments or reply "pause voting first, I will reply
> later", we can suspend or cancel the voting at any time.
> I understand why the FLIP must take three days to vote, so that more
> people can see it and put forward their opinions.
>
> Best,
> Jingsong
>
> On Sat, Nov 13, 2021 at 1:27 AM Timo Walther  wrote:
> >
> > Hi everyone,
> >
> > even though the DISCUSS thread was open for 2 weeks. I have the feeling
> > that the VOTE was initiated to quickly. At least a final "I will start
> > the vote soon. Last call for comments." would have been nice.
> >
> > I also added some comments in the DISCUSS thread. Let's hope we can
> > resolve those soon.
> >
> > Regards,
> > Timo
> >
> > On 12.11.21 16:36, Stephan Ewen wrote:
> > > Hi all!
> > >
> > > I have a few questions on the design still, posted those in the
> [DISCUSS]
> > > thread.
> > > It would be great to clarify those first before concluding this vote.
> > >
> > > Thanks,
> > > Stephan
> > >
> > >
> > > On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:
> > >
> > >> +1 (binding)
> > >>
> > >> Thanks for the great work Jingsong!
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> On Thu, 11 Nov 2021 at 19:41, JING ZHANG 
> wrote:
> > >>
> > >>> +1 (non-binding)
> > >>>
> > >>> A small suggestion:
> > >>> The message queue is currently used to store middle layer data of the
> > >>> streaming data warehouse. We hope use built-in dynamic table storage
> to
> > >>> store those middle layer.
> > >>> But those middle data of the streaming data warehouse are often
> provided
> > >> to
> > >>> all business teams in a company. Some teams have not use Apache
> Flink as
> > >>> compute engine yet. In order to continue server those teams, the
> data in
> > >>> built-in dynamic table storage may be needed to copied to message
> queue
> > >>> again.
> > >>> If *the built-in storage could provide same consumer API as the
> commonly
> > >>> used message queues*, data copying may be avoided. So the built-in
> > >> dynamic
> > >>> table storage may be promoted faster in the streaming data warehouse
> > >>> business.
> > >>>
> > >>> Best regards,
> > >>> Jing Zhang
> > >>>
> > >>> Yufei Zhang  于2021年11月11日周四 上午9:34写道:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> +1 (non-binding)
> > >>>>
> > >>>> Very interesting design. I saw a lot of discussion on the generic
> > >>>> interface design, good to know it will address extensibility.
> > >>>>
> > >>>> Cheers,
> > >>>> Yufei
> > >>>>
> > >>>>
> > >>>> On 2021/11/10 02:51:55 Jingsong Li wrote:
> > >>>>> Hi everyone,
> > >>>>>
> > >>>>> Thanks for all the feedback so far. Based on the discussion[1] we
> > >> seem
> > >>>>> to have consensus, so I would like to start a vote on FLIP-188 for
> > >>>>> which the FLIP has now also been updated[2].
> > >>>>>
> > >>>>> The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless
> > >>>>> there is an objection or insufficient votes.
> > >>>>>
> > >>>>> [1]
> https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7
> > >>>>> [2]
> > >>>>
> > >>>
> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > >>>>>
> > >>>>> Best,
> > >>>>> Jingsong
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-23 Thread Stephan Ewen
Thanks for digging into this.
Regarding this query:

INSERT INTO the_table
  SELECT window_end, COUNT(*)
FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
GROUP BY window_end
  HAVING now() - window_end <= INTERVAL '14' DAYS;

I am not sure I understand what the conclusion is on the data retention
question, where the continuous streaming SQL query has retention semantics.
I think we would need to answer the following questions (I will call the
query that computed the managed table the "view materializer query" - VMQ).

(1) I guess the VMQ will send no updates for windows beyond the "retention
period" is over (14 days), as you said. That makes sense.

(2) Will the VMQ send retractions so that the data will be removed from the
table (via compactions)?
  - if yes, this seems semantically better for users, but it will be
expensive to keep the timers for retractions.
  - if not, we can still solve this by adding filters to queries against
the managed table, as long as these queries are in Flink.
  - any subscriber to the changelog stream would not see strictly a correct
result if we are not doing the retractions

(3) Do we want time retention semantics handled by the compaction?
  - if we say that we lazily apply the deletes in the queries that read the
managed tables, then we could also age out the old data during compaction.
  - that is cheap, but it might be too much of a special case to be very
relevant here.

(4) Do we want to declare those types of queries "out of scope" initially?
  - if yes, how many users are we affecting? (I guess probably not many,
but would be good to hear some thoughts from others on this)
  - should we simply reject such queries in the optimizer as "not possible
to support in managed tables"? I would suggest that, always better to tell
users exactly what works and what not, rather than letting them be
surprised in the end. Users can still remove the HAVING clause if they want
the query to run, and that would be better than if the VMQ just silently
ignores those semantics.

Thanks,
Stephan


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-16 Thread Stephan Ewen
Hi Jingsong!

Thank you for all the explanations. To follow up on the points:


(1) Log implementation

Good to hear you are looking to make this extensible.


(2) change tracking

Understood, makes sense (used for re-processing).


(3) Log Scan Startup mode.

Your explanation makes sense.

As I understand it, that means, though, that any managed table that uses
another managed table as a source will automatically always use the "full
scan" mode (snapshot + log subscription).


(4) Data retention

> - The data of the snapshot will never expire, and the user needs to
> delete the partition by themselves if needed.
> - The expiration time of the log is unlimited by default, but it can
> be configured. In fact, we only need the latest log by default,
> because we have saved the previous snapshot.

Are you referring to cleanup here in the sense of garbage collection, or
also the deletion of data that makes the Managed Table semantically wrong?

Let's assume I define a managed table with such a query below, where the
"interactions" table is derived from a stream of Kafka records.

 INSERT INTO the_table
  SELECT window_end, COUNT(*)
FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
GROUP BY window_end
  HAVING now() - window_end <= INTERVAL '14' DAYS;

When a user does "Select *" from that table, they don't want to see old
data, it would make the query reading from the managed table incorrect. Who
would then filter or would prune the old data? The query maintaining the
managed table? The query reading from the managed table?

Maybe this isn't something you want to solve in the first version, but it
would be good to have a definite answer what the plan in case data
retention is part of the query semantics.

If the assumption is that managed tables can never have retention defined
in their query semantics (and the assumption is that all filtering is done
by the query that reads the managed table), then I think this is a
supercritical design property that we need to make very explicit for users
to understand.


(5) Unified format in the log

Makes sense.


(6, 7) PK mode and consistency

I get the technical reason that with PKs there is a case for duplicate
filtering, but I am skeptical of the user experience if this details
implicitly changes the consistency users see. This may not be relevant for
Flink queries that read from the changelog, but it is relevant for external
programs subscribing to the changelog.

What would be the additional overhead of creating a separate config switch
"log consistency" with values "transactional", "eventual".
By default, this would be transactional regardless of whether the result
has a PK or not. If there is a PK, then users can optimize the latency if
they want by using the "eventual" setting. That means there is not a
surprise for users through the fact that changing the schema of the table
(adding a PK) suddenly changes the consistency semantics.

If I understand correctly, "transactional" with PK would actually not be
correct, it would still contain some duplicates.
But that means we cannot expose a correct changelog to users (external
subscribers) in all cases?

The PK behavior seems like something that needs some general improvement in
the SQL engine.


(8) Optimistic locking.

Thanks for clarifying, that makes sense. I think that would be good to add
to the FLIP. The reason why something is done is as important as what
exactly is being done.


Thanks a lot!
Stephan

On Mon, Nov 15, 2021 at 10:41 AM Jingsong Li  wrote:

> Hi Timo,
>
> > It would be great if we can add not only `Receive any type of changelog`
> but also `Receive any type of datatype`.
>
> Nice, I think we can.
>
> > Please clarify whether the compact DDL is a synchronous or asynchrounous
> operation in the API? So far all DDL was synchrounous. And only DML
> asynchrounous.
>
> It should be a synchronous operation.
>
> > I find this 'change-tracking' = 'false' a bit confusing. Even in batch
> scenarios we have a changelog, only with insert-only changes. Can you
> elaborate? Wouldn't 'exclude-from-log-store' or 'exclude-log-store' or
> 'log.disabled' be more accurate?
>
> Change tracking is from Oracle and snowflake [1][2][3]. It matches the
> "emit-changes" syntax. It means that after closing, the downstream
> consumption cannot obtain the corresponding changes.
>
> > DESCRIBE DETAIL TABLE
>
> +1 to `DESCRIBE TABLE EXTENDED`.
>
> > Set checkpoint interval to 1 min if checkpoint is not enabled
> when the planner detects a sink to built-in dynamic table.
> This sounds like too much magic to me.
>
> You are right. And one minute may not be enough for all situations. +1
> to throw detailed exception to alert user.
>
> > GenericCatalog to `Catalog#supportesTableStorage`
>
> I originally thought about completely distinguishing it from external
> catalog, but it is also possible to add a new method.
>
> > CatalogBaseTable.TableKind
>
> Yes, we can create a new TableKind for this table. 

Re: [DISCUSS] Update Policy for old releases

2021-11-12 Thread Stephan Ewen
I am of a bit different opinion here, I don't think LTS is the best way to
go.

To my understanding, the big issue faced by users is that an upgrade is
simply too hard. Too many things change in subtle ways, you cannot just
take the previous application and configuration and expect it to run the
same after the upgrade. If that was much easier, users would be able to
upgrade more easily and more frequently (maybe not every month, but every
six months or so).

In contrast, LTS is more about how long one provides patches and releases.
The upgrade problem is the same, between LTS versions, upgrades should
still be smooth. To make LTS to LTS smooth, we need to solve the same issue
as making it smooth from individual version to individual version. Unless
we expect non-linear upgrade paths with migration tools, which I am not
convinced we should do. It seems to be the opposite of where the industry
is moving (upgrade fast and frequently by updating images).

The big downside of LTS versions is that almost no one ends up using the
other versions, so we get little feedback on features. We will end up
having a feature in Flink for three releases and still barely anyone will
have used it, so we will lack the confidence to turn it on by default.
I also see the risk that the community ends up taking compatibility with
non-LTS releases not as serious, because "it is anyways not an LTS version".


We could look at making the  upgrades smoother, by starting to observe the
issues listed here.
I think we need to do that anyways, because that is what I hear users
bringing up more and more.  If after that we still feel like there is a
problem, then let's revisit this issue.
  - API compatibility (signatures and behavior)
  - Make it possible to pin SQL semantics of a query across releases
(FLIP-190 works on this)
  - Must be possible to use same configs as before in a new Flink version
and keep the same behavior that way
  - REST API must be stable (signature and semantics)
  - Make it possible to mix different client/cluster versions (stable
serializations for JobGraph, etc.)

The issue that we define officially two supported versions, but many
committers like to backport things for one more version is something we can
certainly look at, to bring some consistency in there.

Best,
Stephan


On Fri, Nov 12, 2021 at 9:17 AM Martijn Visser 
wrote:

> Thanks for bringing this up for discussion Piotr. One the one hand I think
> it's a good idea, because of the reasons you've mentioned. On the other
> hand, having an LTS version will remove an incentive for some users to
> upgrade, which will result in fewer Flink users who will test new features
> because they wait for the next LTS version to upgrade. I can see that
> particularly happening for large enterprises. Another downside I can
> imagine is that upgrading from one LTS version to another LTS version will
> become more complicated because more changes have been made between those
> versions.
>
> Related to my second remark, would/could introducing an LTS version would
> also trigger a follow-up discussion that we could potentially introduce
> breaking changes in a next LTS version, like a Flink 2.0 [1] ?
>
> Best regards,
>
> Martijn
>
> [1] https://issues.apache.org/jira/browse/FLINK-3957
>
> On Fri, 12 Nov 2021 at 08:59, Fabian Paul 
> wrote:
>
> > Thanks for bringing up this topic Piotr.
> > I also think we should try to decouple our release cycles from our
> support
> > plans. Currently we are very limited by the approach because faster
> release
> > cycles result in also faster deprecation of versions.
> >
> >
> > Therefore I am also favoring version 2 where we can align the next LTS
> > version
> > with our development speed. Option 1 I think can easily lead to confusion
> > when
> > the number of supported releases constantly changes.
> >
> > Best,
> > Fabian
> >
> >
>


Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-12 Thread Stephan Ewen
Hi all!

I have a few questions on the design still, posted those in the [DISCUSS]
thread.
It would be great to clarify those first before concluding this vote.

Thanks,
Stephan


On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:

> +1 (binding)
>
> Thanks for the great work Jingsong!
>
> Best,
> Jark
>
> On Thu, 11 Nov 2021 at 19:41, JING ZHANG  wrote:
>
> > +1 (non-binding)
> >
> > A small suggestion:
> > The message queue is currently used to store middle layer data of the
> > streaming data warehouse. We hope use built-in dynamic table storage to
> > store those middle layer.
> > But those middle data of the streaming data warehouse are often provided
> to
> > all business teams in a company. Some teams have not use Apache Flink as
> > compute engine yet. In order to continue server those teams, the data in
> > built-in dynamic table storage may be needed to copied to message queue
> > again.
> > If *the built-in storage could provide same consumer API as the commonly
> > used message queues*, data copying may be avoided. So the built-in
> dynamic
> > table storage may be promoted faster in the streaming data warehouse
> > business.
> >
> > Best regards,
> > Jing Zhang
> >
> > Yufei Zhang  于2021年11月11日周四 上午9:34写道:
> >
> > > Hi,
> > >
> > > +1 (non-binding)
> > >
> > > Very interesting design. I saw a lot of discussion on the generic
> > > interface design, good to know it will address extensibility.
> > >
> > > Cheers,
> > > Yufei
> > >
> > >
> > > On 2021/11/10 02:51:55 Jingsong Li wrote:
> > > > Hi everyone,
> > > >
> > > > Thanks for all the feedback so far. Based on the discussion[1] we
> seem
> > > > to have consensus, so I would like to start a vote on FLIP-188 for
> > > > which the FLIP has now also been updated[2].
> > > >
> > > > The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless
> > > > there is an objection or insufficient votes.
> > > >
> > > > [1] https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7
> > > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-11-12 Thread Stephan Ewen
Hi all!

Thank you for the writeup of this feature. I like the general direction a
lot.

There are some open questions and confusing details still, which I think we
need to clarify first to make this feature really good.
Below are questions/suggestions on the FLIP:

Best,
Stephan

===

*(1) Log Implementation*

I agree with Eron that we should not design this hardwired to Kafka. Let's
have the internal interfaces in place to make this open to other streaming
storage systems as well.
The config options seem to be designed in a way that is Kafka-exclusive.
Can we change this, for example to something like
  - storage.log.system=kafka
  - storage.log.kafka.properties.bootstrap.servers
  - storage.log.kafka.retention

*(2) Change Tracking*

I am not sure I understand this fully. When a batch query inserts without
change tracking what happens then?
  - does it skip writing to the change log?
  - does it simply overwrite the managed table with the new result?
  - something different?

*(3) "table-storage.log.scan.startup.mode"*

Somehow the presence of this flag seems to break the abstraction of managed
tables.
Let's say someone creates a managed table that is computed via a query over
another managed table. It would need all the data from the previous table,
or it would be inconsistent.

What is the reason to have this setting? Support cases where one doesn't
need all past data (let's say only data from the previous month)? Exposing
this again somewhat destroys the nice "transparent out of the box"
behavior, because now users need to think again about the incremental
building of the tables. I think that case shows that we miss a bit better
handling of data retention (see next point).

Also, this seems to be a per-query setting, more than a global setting, so
should this be part of the config with which the query is submitted that
reads from the table-storage?

The names could also be improved a bit, I think, for example we could call
it just  "table-storage.log.scan" with values "full", "latest",
"from-timestamp".

*(4) Data retention*

I am wondering how and when data is ever cleaned up.
For example, when the table definition has a time attribute and predicate
so that the managed table should only contain the data from the previous
month. How does old data get cleaned up? Only through deletes coming from
timers in the Flink SQL layer?

I think if we want this to be really good and efficient, we need to look at
dropping data during the compaction. The compaction should know it needs to
retain only data from WaterMark - 1 month or so. That is somewhat similar
to the optimization I proposed also for SQL in general, to get rid of
timers and only use TTL (and compaction filters) for data expiration. I
think for managed tables, this is even more crucial for performance.

But it would mean that we need to have a better model for inferring
required data retention based on predicates over the time columns, and not
simply just have fixed retention based on the watermark.


*(5) Different formats for cases with PK and without PK*

The FLIP proposes Debezium-Avro for cases without a PK and just Arvo for
cases with PK.

Do we expect that some users directly subscribe to the Table Changelog,
meaning directly read via a Kafka Consumer from the topic?
  - I would expect that this will happen, because users want to avoid
writing the log twice (one for Flink managed table queries, one for
external subscribers).
  - If this is publicly exposed, then the fact that it uses different
formats in different cases (PK or no PK) seems really confusing and not
intuitive for users.
  - Can the format be just Debezium-JSON in all cases?

*(6) Different consistency guarantees with PK and without PK*

Is this purely an internal implementation detail, or will users see a
difference? My understanding is that users see a difference.
Having that difference implicitly happen when users add a PK reference
seems very confusing to me. What about cases where the table has a PK
(because users want the data in Kafka that way) but want transactional
consistency?

If we need the "low-latency eventual consistency" mode with PKs, I would
suggest making this a separate mode that users can choose to activate if
they want.
We can restrict it to cases that have a PK, but not automatically change
the behavior when a PK is declared.

*(7) Eventual Consistency Mode vs. Faster Checkpoints*

The eventual consistency mode with PK seems mainly there to get lower
latency for the changelog. What latencies are we looking for here?
There is also the work on generalized incremental checkpoints, which should
get the latency down to a few seconds, would that be good enough?

The current Upsert Kafka Source (which would be used with PK eventual
consistency mode) has a big inefficiency in the way it needs to retain all
state to convert the records to changelog records. That is also a high
price to pay for that mode.

*(8) Concurrent Write / Locking*

I don't 

[jira] [Created] (FLINK-24852) Cleanup of Orphaned Incremental State Artifacts

2021-11-09 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-24852:


 Summary: Cleanup of Orphaned Incremental State Artifacts
 Key: FLINK-24852
 URL: https://issues.apache.org/jira/browse/FLINK-24852
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.14.0
Reporter: Stephan Ewen


Shared State Artifacts (state files in the "shared" folder in the DFS / 
ObjectStore) can become orphaned in various situations:

* When a TaskManager fails right after it created a state file but before the 
checkpoint was ack-ed to the JobManager, that state file will be orphaned.
* When the JobManager fails all state newly added for the currently pending 
checkpoint will be orphaned.

These state artifacts are currently impossible to be cleaned up manually, 
because it isn't easily possible to understand whether they are still being 
used (referenced by any checkpoint).

We should introduce a "garbage collector" that identifies and deletes such 
orphaned state artifacts.

h2. Idea for a cleanup mechanism

A periodic cleanup thread would periodically execute a cleanup procedure that 
searches for and deletes the orphaned artifacts.
To identify those artifacts, the cleanup procedure needs the following inputs:

* The oldest retained checkpoint ID
* A snapshot of the shared state registry
* A way to identify for each state artifact from which checkpoint it was 
created.

The cleanup procedure would
* enumerate all state artifacts (for example files in the "shared" directory)
* For each one check whether it was created earlier than the oldest retained 
checkpoint. If not, that artifact would be skipped, because it might come from 
a later pending checkpoint, or later canceled checkpoint.
* Finally, the procedure checks if the state artifact is known by the shared 
state registry. If yes, the artifact is kept, if not, it is orphaned and will 
be deleted.

Because the cleanup procedure is specific to the checkpoint storage, it should 
probably be instantiated from the checkpoint storage.

To make it possible to identify the checkpoint for which a state artifact was 
created, we can put that checkpoint ID into the state file name, for example 
format the state name as {{"_"}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Creating an external connector repository

2021-10-28 Thread Stephan Ewen
Thank you all, for the nice discussion!

>From my point of view, I very much like the idea of putting connectors in a
separate repository. But I would argue it should be part of Apache Flink,
similar to flink-statefun, flink-ml, etc.

I share many of the reasons for that:
  - As argued many times, reduces complexity of the Flink repo, increases
response times of CI, etc.
  - Much lower barrier of contribution, because an unstable connector would
not de-stabilize the whole build. Of course, we would need to make sure we
set this up the right way, with connectors having individual CI runs, build
status, etc. But it certainly seems possible.


I would argue some points a bit different than some cases made before:

(a) I believe the separation would increase connector stability. Because it
really forces us to work with the connectors against the APIs like any
external developer. A mono repo is somehow the wrong thing if you in
practice want to actually guarantee stable internal APIs at some layer.
Because the mono repo makes it easy to just change something on both sides
of the API (provider and consumer) seamlessly.

Major refactorings in Flink need to keep all connector API contracts
intact, or we need to have a new version of the connector API.

(b) We may even be able to go towards more lightweight and automated
releases over time, even if we stay in Apache Flink with that repo.
This isn't yet fully aligned with the Apache release policies, yet, but
there are board discussions about whether there can be bot-triggered
releases (by dependabot) and how that could fit into the Apache process.

This doesn't seem to be quite there just yet, but seeing that those start
is a good sign, and there is a good chance we can do some things there.
I am not sure whether we should let bots trigger releases, because a final
human look at things isn't a bad thing, especially given the popularity of
software supply chain attacks recently.


I do share Chesnay's concerns about complexity in tooling, though. Both
release tooling and test tooling. They are not incompatible with that
approach, but they are a task we need to tackle during this change which
will add additional work.



On Tue, Oct 26, 2021 at 10:31 AM Arvid Heise  wrote:

> Hi folks,
>
> I think some questions came up and I'd like to address the question of the
> timing.
>
> Could you clarify what release cadence you're thinking of? There's quite
> > a big range that fits "more frequent than Flink" (per-commit, daily,
> > weekly, bi-weekly, monthly, even bi-monthly).
>
> The short answer is: as often as needed:
> - If there is a CVE in a dependency and we need to bump it - release
> immediately.
> - If there is a new feature merged, release soonish. We may collect a few
> successive features before a release.
> - If there is a bugfix, release immediately or soonish depending on the
> severity and if there are workarounds available.
>
> We should not limit ourselves; the whole idea of independent releases is
> exactly that you release as needed. There is no release planning or
> anything needed, you just go with a release as if it was an external
> artifact.
>
> (1) is the connector API already stable?
> > From another discussion thread [1], connector API is far from stable.
> > Currently, it's hard to build connectors against multiple Flink versions.
> > There are breaking API changes both in 1.12 -> 1.13 and 1.13 -> 1.14 and
> >  maybe also in the future versions,  because Table related APIs are still
> > @PublicEvolving and new Sink API is still @Experimental.
> >
>
> The question is: what is stable in an evolving system? We recently
> discovered that the old SourceFunction needed to be refined such that
> cancellation works correctly [1]. So that interface is in Flink since 7
> years, heavily used also outside, and we still had to change the contract
> in a way that I'd expect any implementer to recheck their implementation.
> It might not be necessary to change anything and you can probably change
> the the code for all Flink versions but still, the interface was not stable
> in the closest sense.
>
> If we focus just on API changes on the unified interfaces, then we expect
> one more change to Sink API to support compaction. For Table API, there
> will most likely also be some changes in 1.15. So we could wait for 1.15.
>
> But I'm questioning if that's really necessary because we will add more
> functionality beyond 1.15 without breaking API. For example, we may add
> more unified connector metrics. If you want to use it in your connector,
> you have to support multiple Flink versions anyhow. So rather then focusing
> the discussion on "when is stuff stable", I'd rather focus on "how can we
> support building connectors against multiple Flink versions" and make it as
> painless as possible.
>
> Chesnay pointed out to use different branches for different Flink versions
> which sounds like a good suggestion. With a mono-repo, we can't use
> branches 

Re: [NOTICE] CiBot improvements

2021-10-11 Thread Stephan Ewen
Great initiative, thanks for doing this!

On Mon, Oct 11, 2021 at 10:52 AM Till Rohrmann  wrote:

> Thanks a lot for this effort Chesnay! The improvements sound really good.
>
> Cheers,
> Till
>
> On Mon, Oct 11, 2021 at 8:46 AM David Morávek  wrote:
>
> > Nice! Thanks for the effort Chesnay, this is really a huge step forward!
> >
> > Best,
> > D.
> >
> > On Mon, Oct 11, 2021 at 6:02 AM Xintong Song 
> > wrote:
> >
> > > Thanks for the effort, @Chesnay. This is super helpful.
> > >
> > > @Jing,
> > > Every push to the PR branch should automatically trigger an entire new
> > > build. `@flinkbot run azure` should only be used when you want to
> re-run
> > > the failed stages without changing the PR. E.g., when running into
> known
> > > unstable cases that are unrelated to the PR.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Oct 11, 2021 at 11:45 AM JING ZHANG 
> > wrote:
> > >
> > > > Hi Chesnay,
> > > > Thanks for the effort. It is a very useful improvement.
> > > > I have a minor question. Please forgive me if the question is too
> > naive.
> > > > Since '@flinkbot run azure' now behaves like "Rerun failed jobs", is
> > > there
> > > > any way to trigger an entirely new build? Because some times I'm not
> > sure
> > > > the passed cases in the last build could still success in the new
> build
> > > > because of introduced updates in new commit.
> > > >
> > > > Best,
> > > > JING ZHANG
> > > >
> > > >
> > > > Yangze Guo  于2021年10月11日周一 上午10:31写道:
> > > >
> > > > > Thanks for that great job, Chesnay! "Rerun failed jobs" will help a
> > > lot.
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Sun, Oct 10, 2021 at 4:56 PM Chesnay Schepler <
> ches...@apache.org
> > >
> > > > > wrote:
> > > > > >
> > > > > > I made a number of changes to the CiBot over the weekend.
> > > > > >
> > > > > > - '@flinkbot run azure' previously triggered an entirely new
> build
> > > > based
> > > > > > on the last completed one. It now instead retries the last
> > completed
> > > > > > build, only running the jobs that actually failed. It basically
> > > behaves
> > > > > > like the "Rerun failed jobs" button in the Azure UI.
> > > > > > - various optimizations to increase responsiveness (primarily by
> > > doing
> > > > > > significantly less unnecessary work / requests to GH)
> > > > > > - removed TravisCI support (since we no longer support a release
> > that
> > > > > > used Travis)
> > > > > >
> > > > > > Please ping me if you spot anything weird.
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-24366) Unnecessary/misleading error message about failing restores when tasks are already canceled.

2021-09-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-24366:


 Summary: Unnecessary/misleading error message about failing 
restores when tasks are already canceled.
 Key: FLINK-24366
 URL: https://issues.apache.org/jira/browse/FLINK-24366
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.13.2, 1.14.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.13.3, 1.15.0, 1.14.1


The following line is logged in all cases where the restore operation fails. 
The check whether the task is canceled comes only after that line.

The fix would be to move the log line to after the check.

{code}
Exception while restoring my-stateful-task from alternative (1/1), will retry 
while more alternatives are available.
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24343) Revisit Scheduler and Coordinator Startup Procedure

2021-09-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-24343:


 Summary: Revisit Scheduler and Coordinator Startup Procedure
 Key: FLINK-24343
 URL: https://issues.apache.org/jira/browse/FLINK-24343
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.2, 1.14.0
Reporter: Stephan Ewen
 Fix For: 1.15.0


We need to re-examine the startup procedure of the scheduler, and how it 
interacts with the startup of the operator coordinators.

We need to make sure the following conditions are met:
  - The Operator Coordinators are started before the first action happens that 
they need to be informed of. That includes as task being ready, a checkpoint 
happening, etc.

  - The scheduler must be started to the point that it can handle 
"failGlobal()" calls, because the coordinators might trigger that during their 
startup when an exception in "start()" occurs.

/cc [~chesnay]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-173: Support DAG of algorithms

2021-09-17 Thread Stephan Ewen
I think this will be a useful addition.

Regarding the API and specific design decisions: I think this looks ok.
I didn't dig very deep and would be fine to just go with the author's
proposal. The main motivation for having a separate flink-ml repository was
to develop more easily, make changes and iterate faster without having to
weight every design as carefully as we need to do it in core Flink.

So +1 from my side

On Fri, Sep 10, 2021 at 4:33 AM Dong Lin  wrote:

> Hi all,
>
> We would like to start the vote for FLIP-173: Support DAG of
> algorithms [1]. This FLIP was discussed in this thread [2].
>
> The proposal extended the Flink ML API to support DAG of algorithms where
> each algorithm could have multiple inputs and multiple outputs. It also
> extended Flink ML API to support online learning scenarios where a
> long-running Model instance needs to be continuously updated by the latest
> model data generated by another long-running Estimator instance.
>
> The vote will be open for at least 72 hours, following the consensus voting
> process.
>
> Thanks!
> Dong Lin and Zhipeng Zhang
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615783
> [2]
>
> https://lists.apache.org/thread.html/r6729f351fb1bc13a93754c199d5fee1051cc8146e22374737c578779%40%3Cdev.flink.apache.org%3E
>


[jira] [Created] (FLINK-24255) Test Environment / Mini Cluster do not forward configuration.

2021-09-10 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-24255:


 Summary: Test Environment / Mini Cluster do not forward 
configuration.
 Key: FLINK-24255
 URL: https://issues.apache.org/jira/browse/FLINK-24255
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


When using {{StreamExecutionEnvironment 
getExecutionEnvironment(Configuration)}}, the config should determine the 
characteristics of the execution.

The config is for example passed to the local environment in the local 
execution case, and used during the instantiation of the MiniCluster.

But when using the {{TestStreamEnvironment}} and the 
{{MiniClusterWithClientRule}}, the config is ignored.

The issue is that the {{StreamExecutionEnvironmentFactory}} in 
{{TestStreamEnvironment}} ignores the config that is passed to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Apache Flink Stateful Functions 3.1.0, release candidate #1

2021-08-26 Thread Stephan Ewen
+1 (binding)

  - Build project on Java 8 (maven command line) with full end to end tests
  - Compiled and ran tests in IDE (IntelliJ) with Java 11 (possible after
some manual config, see comment below to automate this)
  - No binaries in the distribution
  - Verified license and notice files
  - Checked README


Minor issues found, none of which are release blockers:

  - Some warnings in the command line due to a missing version of a plugin.
Fixing that is good for stability (and better resilience against supply
chain attacks): See this PR for a fix:
https://github.com/apache/flink-statefun/pull/261

  - Building in IDE (IntelliJ) with Java 11 does not work out of the box,
due to issues with the Java Module System and the use of Unsafe in
generated ProtoBuf classes. See this PR for a fix:
https://github.com/apache/flink-statefun/pull/262



On Thu, Aug 26, 2021 at 8:32 AM Tzu-Li (Gordon) Tai 
wrote:

> +1 (binding)
>
> - Built from source with Java 11 and Java 8 (mvn clean install
> -Prun-e2e-tests)
> - verified signatures and hashes
> - verified NOTICE files of Maven artifacts properly list actual bundled
> dependencies
> - Ran GoLang greeter and showcase with the proposed Dockerfiles for 3.1.0
> - Ran a local smoke E2E against the Java SDK, with adjusted parameters to
> run for a longer period of time
>
> Thanks for driving the release Igal!
>
> Cheers,
> Gordon
>
> On Thu, Aug 26, 2021 at 4:06 AM Seth Wiesman  wrote:
>
> > +1 (non-binding)
> >
> > - verified signatures and hashes
> > - Checked licenses
> > - ran mvn clean install -Prun-e2e-tests
> > - ran golang greeter and showcase from the playground [1]
> >
> > Seth
> >
> > [1] https://github.com/apache/flink-statefun-playground/pull/12
> >
> > On Wed, Aug 25, 2021 at 9:44 AM Igal Shilman  wrote:
> >
> > > +1 from my side:
> > >
> > > Here are the results of my RC2 testing:
> > >
> > > - verified the signatures and hashes
> > > - verified that the source distribution doesn't contain any binary
> files
> > > - ran mvn clean install -Prun-e2e-tests
> > > - ran Java and Python greeters from the playground [1] with the new
> > module
> > > structure, and async transport enabled.
> > > - verified that the docker image [2] builds and inspected the contents
> > > manually.
> > >
> > > Thanks,
> > > Igal
> > >
> > > [1] https://github.com/apache/flink-statefun-playground/tree/dev
> > > [2] https://github.com/apache/flink-statefun-docker/pull/15
> > >
> > >
> > > On Tue, Aug 24, 2021 at 3:34 PM Igal Shilman  wrote:
> > >
> > > > Sorry, the subject of the previous message should have said "[VOTE]
> > > Apache
> > > > Flink Stateful Functions 3.1.0, release candidate #2".
> > > >
> > > >
> > > > On Tue, Aug 24, 2021 at 3:24 PM Igal Shilman 
> wrote:
> > > >
> > > >> Hi everyone,
> > > >>
> > > >> Please review and vote on the release candidate #2 for the version
> > 3.1.0
> > > >> of Apache Flink Stateful Functions, as follows:
> > > >> [ ] +1, Approve the release
> > > >> [ ] -1, Do not approve the release (please provide specific
> comments)
> > > >>
> > > >> **Testing Guideline**
> > > >>
> > > >> You can find here [1] a page in the project wiki on instructions for
> > > >> testing.
> > > >> To cast a vote, it is not necessary to perform all listed checks,
> > > >> but please mention which checks you have performed when voting.
> > > >>
> > > >> **Release Overview**
> > > >>
> > > >> As an overview, the release consists of the following:
> > > >> a) Stateful Functions canonical source distribution, to be deployed
> to
> > > >> the release repository at dist.apache.org
> > > >> b) Stateful Functions Python SDK distributions to be deployed to
> PyPI
> > > >> c) Maven artifacts to be deployed to the Maven Central Repository
> > > >> d) New Dockerfiles for the release
> > > >> e) GoLang SDK tag statefun-sdk-go/v3.1.0-rc2
> > > >>
> > > >> **Staging Areas to Review**
> > > >>
> > > >> The staging areas containing the above mentioned artifacts are as
> > > >> follows, for your review:
> > > >> * All artifacts for a) and b) can be found in the corresponding dev
> > > >> repository at dist.apache.org [2]
> > > >> * All artifacts for c) can be found at the Apache Nexus Repository
> [3]
> > > >>
> > > >> All artifacts are signed with the key
> > > >> 73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4]
> > > >>
> > > >> Other links for your review:
> > > >> * JIRA release notes [5]
> > > >> * source code tag "release-3.1.0-rc2" [6]
> > > >> * PR for the new Dockerfiles [7]
> > > >>
> > > >> **Vote Duration**
> > > >>
> > > >> The voting time will run for at least 72 hours (since RC1). We are
> > > >> targeting this vote to last until Thursday. 26th of August, 6pm CET.
> > > >> If it is adopted by majority approval, with at least 3 PMC
> affirmative
> > > >> votes, it will be released.
> > > >>
> > > >> Thanks,
> > > >> Igal
> > > >>
> > > >> [1]
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release
> > > >> [2]

[ANNOUNCE] Dropping "CheckpointConfig.setPreferCheckpointForRecovery()"

2021-08-24 Thread Stephan Ewen
Hi Flink Community!

A quick heads-up: We suggest removing the setting
"CheckpointConfig.setPreferCheckpointForRecovery()" [1].

The setting has been deprecated since Flink 1.12 and is strongly
discouraged, because it can lead to data loss or data duplication in
different scenarios.
Please see also https://issues.apache.org/jira/browse/FLINK-20427 for
background.

Are there any concerns about deprecating this issue? Is anyone relying on
this setting right now?

For a long-term solution to ensuring that there is no slow recovery from
savepoints: Some committers (me included) are working on a proposal to
support more efficient savepoints and to ensure that intermediate
savepoints don't interfere with side effects. Then we can always exclude
them from recovery without risk of data loss or duplication.

Best,
Stephan


[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L493


[jira] [Created] (FLINK-23843) Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure instead of Process Kill

2021-08-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23843:


 Summary: Exceptions during 
"SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure 
instead of Process Kill
 Key: FLINK-23843
 URL: https://issues.apache.org/jira/browse/FLINK-23843
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


Currently, when a the method "SplitEnumeratorContext.runInCoordinatorThread()" 
throws an exception, the effect is a process kill of the JobManager process.

The chain how the process kill happens is:
* An exception bubbling up in the executor, killing the executor thread
* The executor starts a replacement thread, which is forbidden by the thread 
factory (as a safety net) and causes a process kill.

We should prevent such exceptions from bubbling up in the coordinator executor.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23842) Add log messages for reader registrations and split requests.

2021-08-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23842:


 Summary: Add log messages for reader registrations and split 
requests.
 Key: FLINK-23842
 URL: https://issues.apache.org/jira/browse/FLINK-23842
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


Currently, there are is nothing logged when source enumerators get reader 
registration events, or when they receive split requests.

While some specific source implementations log this in their implementation, 
for the general case, this information is missing, even though it is super 
valuable when debugging and understanding the work assignment behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-180: Adjust StreamStatus and Idleness definition

2021-08-09 Thread Stephan Ewen
+1 (binding)

On Mon, Aug 9, 2021 at 12:08 PM Till Rohrmann  wrote:

> +1 (binding)
>
> Cheers,
> Till
>
> On Thu, Aug 5, 2021 at 9:09 PM Arvid Heise  wrote:
>
> > Dear devs,
> >
> > I'd like to open a vote on FLIP-180: Adjust StreamStatus and Idleness
> > definition [1] which was discussed in this thread [2].
> > The vote will be open for at least 72 hours unless there is an objection
> or
> > not enough votes.
> >
> > Best,
> >
> > Arvid
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> > [2]
> >
> >
> https://lists.apache.org/thread.html/r8357d64b9cfdf5a233c53a20d9ac62b75c07c925ce2c43e162f1e39c%40%3Cdev.flink.apache.org%3E
> >
>


[jira] [Created] (FLINK-23649) Add RocksDB packages to parent-first classloading patterns.

2021-08-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23649:


 Summary: Add RocksDB packages to parent-first classloading 
patterns.
 Key: FLINK-23649
 URL: https://issues.apache.org/jira/browse/FLINK-23649
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.13.2
Reporter: Stephan Ewen
 Fix For: 1.14.0


RocksDB classes are currently loaded child-first.

Because of that, it can happen that the RocksDB library is attempted to be 
loaded multiple times (by different classloaders).

That is prevented by JNI and results in an error as reported in this mail for 
example
https://lists.apache.org/x/thread.html/rbc3ca24efe13b25e802af9739a6877276503363ffbdc5914ffdad7be@%3Cuser.flink.apache.org%3E

We should prevent accidental repeated loading of RocksDB, because we rely on 
the fact that only one DB is created per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23630) Make EventTimeWindowCheckpointingITCase and LocalRecoveryITCase run on Windows.

2021-08-04 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23630:


 Summary: Make EventTimeWindowCheckpointingITCase and 
LocalRecoveryITCase run on Windows.
 Key: FLINK-23630
 URL: https://issues.apache.org/jira/browse/FLINK-23630
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


This need a fix in the test where it creates the paths for the checkpoint 
storage locations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23629) Remove redundant test cases in EventTimeWindowCheckpointingITCase

2021-08-04 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23629:


 Summary: Remove redundant test cases in 
EventTimeWindowCheckpointingITCase
 Key: FLINK-23629
 URL: https://issues.apache.org/jira/browse/FLINK-23629
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


HashMap state store snapshots are always async right now, sync snapshots are no 
longer supported.

We should adjust the {{EventTimeWindowCheckpointingITCase}} to remove the now 
redundant cases {{MEM_ASYNC}} and {{FILE_ASYNC}} parameter runs.

The test is very time-intensive, so this is quite a time saver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-04 Thread Stephan Ewen
For the details of what causes this regression, I would add @Yun Tang
 to this discussion.

On Wed, Aug 4, 2021 at 2:36 PM Yuval Itzchakov  wrote:

> We are heavy users of RocksDB and have had several issues with memory
> managed in Kubernetes, most of them actually went away when we upgraded
> from Flink 1.9 to 1.13.
>
> Do we know why there's such a huge performance regression? Can we improve
> this somehow with some flag tweaking? It would be great if we see a more in
> depth explanation of the gains vs losses of upgrading.
>
> On Wed, Aug 4, 2021 at 3:08 PM Stephan Ewen  wrote:
>
>> Hi all!
>>
>> *!!!  If you are a big user of the Embedded RocksDB State Backend and
>> have performance sensitive workloads, please read this !!!*
>>
>> I want to quickly raise some awareness for a RocksDB version upgrade we
>> plan to do, and some possible impact on application performance.
>>
>> *We plan to upgrade RocksDB to version 6.20.* That version of RocksDB
>> unfortunately introduces some non-trivial performance regression. In our
>> Nexmark Benchmark, at least one query is up to 13% slower.
>> With some fixes, this can be improved, but even then there is an overall 
>> *regression
>> up to 6% in some queries*. (See attached table for results from relevant
>> Nexmark Benchmark queries).
>>
>> We would do this update nonetheless, because we need to get new features
>> and bugfixes from RocksDB in.
>>
>> Please respond to this mail thread if you have major concerns about this.
>>
>>
>> *### Fallback Plan*
>>
>> Optionally, we could fall back to Plan B, which is to upgrade RocksDB
>> only to version 5.18.4.
>> Which has no performance regression (after applying a custom patch).
>>
>> While this spares us the performance degradation of RocksDB 6.20.x, this
>> has multiple disadvantages:
>>   - Does not include the better memory stability (strict cache control)
>>   - Misses out on some new features which some users asked about
>>   - Does not have the latest RocksDB bugfixes
>>
>> The latest point is especially bad in my opinion. While we can
>> cherry-pick some bugfixes back (and have done this in the past), users
>> typically run into an issue first and need to trace it back to RocksDB,
>> then one of the committers can find the relevant patch from RocksDB master
>> and backport it. That isn't the greatest user experience.
>>
>> Because of those disadvantages, we would prefer to do the upgrade to the
>> newer RocksDB version despite the unfortunate performance regression.
>>
>> Best,
>> Stephan
>>
>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


[ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-04 Thread Stephan Ewen
Hi all!

*!!!  If you are a big user of the Embedded RocksDB State Backend and have
performance sensitive workloads, please read this !!!*

I want to quickly raise some awareness for a RocksDB version upgrade we
plan to do, and some possible impact on application performance.

*We plan to upgrade RocksDB to version 6.20.* That version of RocksDB
unfortunately introduces some non-trivial performance regression. In our
Nexmark Benchmark, at least one query is up to 13% slower.
With some fixes, this can be improved, but even then there is an
overall *regression
up to 6% in some queries*. (See attached table for results from relevant
Nexmark Benchmark queries).

We would do this update nonetheless, because we need to get new features
and bugfixes from RocksDB in.

Please respond to this mail thread if you have major concerns about this.


*### Fallback Plan*

Optionally, we could fall back to Plan B, which is to upgrade RocksDB only
to version 5.18.4.
Which has no performance regression (after applying a custom patch).

While this spares us the performance degradation of RocksDB 6.20.x, this
has multiple disadvantages:
  - Does not include the better memory stability (strict cache control)
  - Misses out on some new features which some users asked about
  - Does not have the latest RocksDB bugfixes

The latest point is especially bad in my opinion. While we can cherry-pick
some bugfixes back (and have done this in the past), users typically run
into an issue first and need to trace it back to RocksDB, then one of the
committers can find the relevant patch from RocksDB master and backport it.
That isn't the greatest user experience.

Because of those disadvantages, we would prefer to do the upgrade to the
newer RocksDB version despite the unfortunate performance regression.

Best,
Stephan


Re: [VOTE] FLIP-150: Introduce Hybrid Source

2021-07-28 Thread Stephan Ewen
+1 (binding)

Would be good to move this across the finish line.
It seems this thread is held up mainly by formalities at this point.

On Fri, Jul 2, 2021 at 4:27 AM Israel Ekpo  wrote:

> +1 (non-binding)
>
> On Thu, Jul 1, 2021 at 6:45 PM Elkhan Dadashov 
> wrote:
>
> > +1 (non-binding)
> >
> > On 2021/07/01 05:49:44 蒋晓峰 wrote:
> > > Hi everyone,
> > >
> > >
> > >
> > >
> > > Thanks for all the feedback to Hybrid Source so far. Based on the
> > discussion[1] we seem to have consensus, so I would like to start a vote
> on
> > FLIP-150 for which the FLIP has also been updated[2].
> > >
> > >
> > >
> > >
> > > The vote will last for at least 72 hours (Sun, Jul 4th 12:00 GMT)
> unless
> > there is an objection or insufficient votes.
> > >
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Nicholas Jiang
> > >
> > >
> > >
> > >
> > > [1]
> >
> https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E
> > >
> > > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> >
>


Re: [DISCUSS] Address deprecation warnings when upgrading dependencies

2021-07-15 Thread Stephan Ewen
>
> If someone started preparing a junit5 migration PR they will run into
> merge conflicts if everyone now starts replacing these instances at will.


This sounds like a good case for fixing it in one step during the upgrade.
Otherwise folks will start individually fixing this individually when they
encounter the deprecated methods.

On Wed, Jul 14, 2021 at 5:31 PM Chesnay Schepler  wrote:

> If someone started preparing a junit5 migration PR they will run into
> merge conflicts if everyone now starts replacing these instances at will.
>
> There are also some options on the table on how to actually do the
> migration; we can use hamcrest of course, or create a small wrapper in
> our test utils that retains the signature junit signature (then we'd
> just have to adjust imports).
>
> On 14/07/2021 16:33, Stephan Ewen wrote:
> > @Chesnay - can you elaborate on this? In the classes I worked with so
> far,
> > it was a 1:1 replacement of "org.junit.Assert.assertThat()" to
> > "org.hamcrest.MatcherAssert.assertThat()".
> > What other migration should happen there?
> >
> > On Wed, Jul 14, 2021 at 11:13 AM Chesnay Schepler 
> > wrote:
> >
> >> It may be better to not do that to ease the migration to junit5, where
> >> we have to address exactly these usages.
> >>
> >> On 14/07/2021 09:57, Till Rohrmann wrote:
> >>> I actually found
> >>> myself recently, whenever touching a test class, replacing Junit's
> >>> assertThat with Hamcrest's version which felt quite tedious.
> >>
> >>
>
>


Re: [DISCUSS] Address deprecation warnings when upgrading dependencies

2021-07-14 Thread Stephan Ewen
@Chesnay - can you elaborate on this? In the classes I worked with so far,
it was a 1:1 replacement of "org.junit.Assert.assertThat()" to
"org.hamcrest.MatcherAssert.assertThat()".
What other migration should happen there?

On Wed, Jul 14, 2021 at 11:13 AM Chesnay Schepler 
wrote:

> It may be better to not do that to ease the migration to junit5, where
> we have to address exactly these usages.
>
> On 14/07/2021 09:57, Till Rohrmann wrote:
> > I actually found
> > myself recently, whenever touching a test class, replacing Junit's
> > assertThat with Hamcrest's version which felt quite tedious.
>
>
>


Re: [DISCUSS] Address deprecation warnings when upgrading dependencies

2021-07-14 Thread Stephan Ewen
@Chesnay - Good question. I think we can be pragmatic there. If you upgrade
Jackson, pick a class that uses it and look for the common methods. If
everything is fine there, it is probably fine overall. If one or two
deprecated method usages are overlooked, no problem, that's not an issue.
If a common method gets deprecated, then chance is high you spot it quickly
when looking at some Jackson usages.

@Yangze - Agreed, we should do the same internally and make sure we keep
our own API use and examples up to date.


On Wed, Jul 14, 2021 at 11:00 AM Chesnay Schepler 
wrote:

> How do you propose to do this in practice?
> Let's say I bump jackson, how would I find all new usages of deprecated
> APIs?
> Build it locally and grep the maven output for jackson?
>
> On 14/07/2021 10:51, Yangze Guo wrote:
> > +1 for fixing deprecation warnings when bumping/changing dependencies.
> >
> > Not only for the dependencies, we also use the deprecated API of Flink
> > itself in `flink-examples` and the document, e.g. the #writeAsText. I
> > think it would be good to do a clean-up for usability. WDYT?
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Jul 14, 2021 at 3:57 PM Till Rohrmann 
> wrote:
> >> I think this suggestion makes a lot of sense, Stephan. +1 for fixing
> >> deprecation warnings when bumping/changing dependencies. I actually
> found
> >> myself recently, whenever touching a test class, replacing Junit's
> >> assertThat with Hamcrest's version which felt quite tedious.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Jul 13, 2021 at 6:15 PM Stephan Ewen  wrote:
> >>
> >>> Hi all!
> >>>
> >>> I would like to propose that we make it a project standard that when
> >>> upgrading a dependency, deprecation issues arising from that need to be
> >>> fixed in the same step. If the new dependency version deprecates a
> method
> >>> in favor of another method, all usages in the code need to be replaced
> >>> together with the upgrade.
> >>>
> >>> We are accumulating deprecated API uses over time, and it floods logs
> and
> >>> IDEs with deprecation warnings. I find this is a problem, because the
> >>> irrelevant warnings more and more drown out the actually relevant
> warnings.
> >>> And arguably, the deprecation warning isn't fully irrelevant, it can
> cause
> >>> problems in the future when the method is actually removed.
> >>> We need the general principle that a change leaves the codebase in at
> least
> >>> as good shape as before, otherwise things accumulate over time and the
> >>> overall quality goes down.
> >>>
> >>> The concrete example that motivated this for me is the JUnit dependency
> >>> upgrade. Pretty much every test I looked at recently is quite yellow
> (due
> >>> to junit Matchers.assertThat being deprecated in the new JUnit
> version).
> >>> This is easily fixed (even a string replace and spotless:apply goes a
> long
> >>> way), so I would suggest we try and do these things in one step in the
> >>> future.
> >>>
> >>> Curious what other committers think about this suggestion.
> >>>
> >>> Best,
> >>> Stephan
> >>>
>
>


Re: [DISCUSS] FLIP-182: Watermark alignment

2021-07-13 Thread Stephan Ewen
@Eron Wright   The per-split watermarks are the
default in the new source interface (FLIP-27) and come for free if you use
the SplitReader.

Based on that, it is also possible to unsubscribe individual splits to
solve the alignment in the case where operators have multiple splits
assigned.
Piotr and I already discussed that, but concluded that the implementation
of that is largely orthogonal.

I am a bit worried, though, that if we release and advertise the alignment
without handling this case, we create a surprise for quite a few users.
While this is admittedly valuable for some users, I think we need to
position this accordingly. I would not fully advertise this before we have
the second part implemented as well.



On Mon, Jul 12, 2021 at 7:18 PM Eron Wright 
wrote:

> The notion of per-split watermarks seems quite interesting.  I think the
> idleness feature could benefit from a per-split approach too, because
> idleness is typically related to whether any splits are assigned to a given
> operator instance.
>
>
> On Mon, Jul 12, 2021 at 3:06 AM 刘建刚  wrote:
>
> > +1 for the source watermark alignment.
> > In the previous flink version, the source connectors are different in
> > implementation and it is hard to make this feature. When the consumed
> data
> > is not aligned or consuming history data, it is very easy to cause the
> > unalignment. Source alignment can resolve many unstable problems.
> >
> > Seth Wiesman  于2021年7月9日周五 下午11:25写道:
> >
> > > +1
> > >
> > > In my opinion, this limitation is perfectly fine for the MVP. Watermark
> > > alignment is a long-standing issue and this already moves the ball so
> far
> > > forward.
> > >
> > > I don't expect this will cause many issues in practice, as I understand
> > it
> > > the FileSource always processes one split at a time, and in my
> > experience,
> > > 90% of Kafka users have a small number of partitions scale their
> > pipelines
> > > to have one reader per partition. Obviously, there are larger-scale
> Kafka
> > > topics and more sources that will be ported over in the future but I
> > think
> > > there is an implicit understanding that aligning sources adds latency
> to
> > > pipelines, and we can frame the follow-up "per-split" alignment as an
> > > optimization.
> > >
> > > On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski <
> piotr.nowoj...@gmail.com>
> > > wrote:
> > >
> > > > Hey!
> > > >
> > > > A couple of weeks ago me and Arvid Heise played around with an idea
> to
> > > > address a long standing issue of Flink: lack of watermark/event time
> > > > alignment between different parallel instances of sources, that can
> > lead
> > > to
> > > > ever growing state size for downstream operators like WindowOperator.
> > > >
> > > > We had an impression that this is relatively low hanging fruit that
> can
> > > be
> > > > quite easily implemented - at least partially (the first part
> mentioned
> > > in
> > > > the FLIP document). I have written down our proposal [1] and you can
> > also
> > > > check out our PoC that we have implemented [2].
> > > >
> > > > We think that this is a quite easy proposal, that has been in large
> > part
> > > > already implemented. There is one obvious limitation of our PoC.
> Namely
> > > we
> > > > can only easily block individual SourceOperators. This works
> perfectly
> > > fine
> > > > as long as there is at most one split per SourceOperator. However it
> > > > doesn't work with multiple splits. In that case, if a single
> > > > `SourceOperator` is responsible for processing both the least and the
> > > most
> > > > advanced splits, we won't be able to block this most advanced split
> for
> > > > generating new records. I'm proposing to solve this problem in the
> > future
> > > > in another follow up FLIP, as a solution that works with a single
> split
> > > per
> > > > operator is easier and already valuable for some of the users.
> > > >
> > > > What do you think about this proposal?
> > > > Best, Piotrek
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > > > [2] https://github.com/pnowojski/flink/commits/aligned-sources
> > > >
> > >
> >
>


[DISCUSS] Address deprecation warnings when upgrading dependencies

2021-07-13 Thread Stephan Ewen
Hi all!

I would like to propose that we make it a project standard that when
upgrading a dependency, deprecation issues arising from that need to be
fixed in the same step. If the new dependency version deprecates a method
in favor of another method, all usages in the code need to be replaced
together with the upgrade.

We are accumulating deprecated API uses over time, and it floods logs and
IDEs with deprecation warnings. I find this is a problem, because the
irrelevant warnings more and more drown out the actually relevant warnings.
And arguably, the deprecation warning isn't fully irrelevant, it can cause
problems in the future when the method is actually removed.
We need the general principle that a change leaves the codebase in at least
as good shape as before, otherwise things accumulate over time and the
overall quality goes down.

The concrete example that motivated this for me is the JUnit dependency
upgrade. Pretty much every test I looked at recently is quite yellow (due
to junit Matchers.assertThat being deprecated in the new JUnit version).
This is easily fixed (even a string replace and spotless:apply goes a long
way), so I would suggest we try and do these things in one step in the
future.

Curious what other committers think about this suggestion.

Best,
Stephan


Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

2021-07-13 Thread Stephan Ewen
Before jumping into the designs of other mechanisms, can we clarify whether
adjusting checkpointing is really the right approach here?

What about storing empty state in the checkpoints, so that any recovery
simply does a full replay of the input.

Regarding the hybrid batch/streaming execution: The nice part about this
would be that no checkpoints are necessary during the batch phase.

On Tue, Jul 13, 2021 at 5:51 AM Senhong Liu  wrote:

> Hi Stephan,
>
> Thank you so much for replying and suggesting! Follow by your question, I
> would give some explanation and new thoughts.
>
> (1) More detailed info about CDC use case.
>
> In the previous design of FLINK-CDC, they would start a full-table scanning
> at the beginning by holding a read-write lock. Taking a checkpoint in the
> middle would be meaningless since there is no guarantee that data would not
> be changed during recovery.
>
> (2) Could hybrid batch/streaming execution solve the problem?
>
> It is acceptable to me and actually, the current Flink-CDC is actually
> implementing a new feature like this[1][2]. But a large batch size could
> still fail the checkpoint frequently after all and it could still be
> confusing for users to understand what's going on.
>
> (3) Making the checkpoint more controllable and reliable.
>
> Overall, I am also looking for a way of making the checkpoint more
> controllable and reliable, since hybrid batch/streaming execution is also a
> tricky way of controlling the checkpoint. But maybe it is not configurable
> or developable for users to implement for their specific use case.
>
> However, combining with your opinion, I am thinking of designing a set of
> REST API to control the checkpoint scheduler might be more acceptable.
>
>
> [1]
> https://github.com/debezium/debezium-design-documents/blob/main/DDD-3.md
> [2]
>
> https://github.com/ververica/flink-cdc-connectors/commit/c6ca6c187471b538a9774258d2572194e1bb855b
>
> Stephan Ewen  于2021年7月13日周二 上午1:25写道:
>
> > Hi!
> >
> > Thanks for writing this FLIP, and interesting idea.
> >
> > I would like to understand a bit better why exactly we need this, and
> what
> > our alternative options are. My main concerns are the following:
> >
> > *(1) Can we achieve this without changing the checkpointing mechanism?*
> >
> > The checkpoint mechanism is already complex and it is super sensitive (a
> > bug there threatens every user with data loss). Such components must stay
> > as minimal as possible. When there is a way to solve this outside
> > checkpointing, then that should be the default approach.
> >
> > To check that, I would really like to understand this specific CDC use
> case
> > a bit more. If I understand it correctly, the issue is that checkpoints
> are
> > not possible while the initial database snapshot is ingested, before the
> > actual Change-Data processing starts.
> >
> > (a) What is the reason that no checkpoint is possible during this time?
> Why
> > is there no way to store in the checkpoint the position of the snapshot
> > ingestion? Is it because the snapshots are streamed in from a JDBC query
> > and that is not deterministic, meaning retrying the query yields
> different
> > rows (or order of rows)?
> >
> > (b) If there can be no checkpoint progress during the snapshot ingestion
> > and all checkpoints are rejected, what prevents us from just storing an
> > empty state in the checkpoint? Meaning the system may have taken a bunch
> of
> > checkpoints, but any recovery would start the source from the beginning.
> > Is there some concern about not emitting data during that database
> snapshot
> > reading phase?
> >
> > (c) There is quite some work in the direction of blending batch and
> > streaming execution, meaning having an initial batch execution step for
> > some data (like the backlog in Kafka, or a DB snapshot) and then
> switching
> > to streaming execution for the real-time stream (new records in Kafka, or
> > CDC records). If we focus on advancing that, we get the behavior that all
> > the initial data is processed in one batch (no intermediate checkpoint),
> > and we also get the performance speedup provided by not having to work
> with
> > RocksDB for the batch step. I think this is where the future is.
> >
> > Regarding the case to not checkpoint in-between transaction markers: I
> > don't know how often these occur, but I would expect potentially very
> > often. If we reject whenever some operator is in-between transaction
> > markers, we will reject very many (possibly almost all) checkpoints once
> we
> > get to a certain

Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

2021-07-12 Thread Stephan Ewen
Hi!

Thanks for writing this FLIP, and interesting idea.

I would like to understand a bit better why exactly we need this, and what
our alternative options are. My main concerns are the following:

*(1) Can we achieve this without changing the checkpointing mechanism?*

The checkpoint mechanism is already complex and it is super sensitive (a
bug there threatens every user with data loss). Such components must stay
as minimal as possible. When there is a way to solve this outside
checkpointing, then that should be the default approach.

To check that, I would really like to understand this specific CDC use case
a bit more. If I understand it correctly, the issue is that checkpoints are
not possible while the initial database snapshot is ingested, before the
actual Change-Data processing starts.

(a) What is the reason that no checkpoint is possible during this time? Why
is there no way to store in the checkpoint the position of the snapshot
ingestion? Is it because the snapshots are streamed in from a JDBC query
and that is not deterministic, meaning retrying the query yields different
rows (or order of rows)?

(b) If there can be no checkpoint progress during the snapshot ingestion
and all checkpoints are rejected, what prevents us from just storing an
empty state in the checkpoint? Meaning the system may have taken a bunch of
checkpoints, but any recovery would start the source from the beginning.
Is there some concern about not emitting data during that database snapshot
reading phase?

(c) There is quite some work in the direction of blending batch and
streaming execution, meaning having an initial batch execution step for
some data (like the backlog in Kafka, or a DB snapshot) and then switching
to streaming execution for the real-time stream (new records in Kafka, or
CDC records). If we focus on advancing that, we get the behavior that all
the initial data is processed in one batch (no intermediate checkpoint),
and we also get the performance speedup provided by not having to work with
RocksDB for the batch step. I think this is where the future is.

Regarding the case to not checkpoint in-between transaction markers: I
don't know how often these occur, but I would expect potentially very
often. If we reject whenever some operator is in-between transaction
markers, we will reject very many (possibly almost all) checkpoints once we
get to a certain scale (1000s of parallel operators).
I think we need a different approach there, for example first grouping
events into transaction sessions, or so.


*(2) Inability to Checkpoint shouldn't become a first-class concept.*

I think this is really a question of Flink design principles and direction.
I believe we need to push Flink to a direction that it can always
checkpoint, and more frequently and more reliably than at the moment.
Various ongoing efforts move the system in that direction, giving it the
ability to checkpoint always (async sources, async sinks, non-blocking
mailbox), more predictably (unaligned checkpoints) and more frequently
(log-based checkpoints).

That is something that makes it much easier to operate the system. When
checkpoints become infrequent or unpredictable in the interval (when they
are not reliable), it becomes a big operational problem.

If we start assuming that operators can arbitrarily reject checkpoints, we
will easily get into situations where many checkpoints are rejected and it
takes quite a while until a checkpoint can happen. There are many jobs with
1000s of operators, and when one of them rejects a checkpoint, the
checkpoint as a whole fails.
Letting operators decide when to take checkpoints is a cool property
that makes many things easier, but I am skeptical whether this is
compatible with global checkpoints.

Not having frequent checkpoints (but very infrequent ones) is really more
of a batch situation, and that brings me back to the point above: I think a
really well-working solution for this would be the hybrid batch/streaming
execution.

For that reason, I am very skeptical that we should add a first-class API
that suggests that a checkpoint happens only sometimes, when everyone
happens to agree to it.
I think that sends the wrong signal, both to developer direction, and to
users in how to implement their applications. Rejecting checkpoints should
be a rare thing, so not something an API suggests can happen all the time.

Rejecting checkpoints for a few times at the beginning of a job might also
be feasible, when later checkpoint generally succeed, but then again, the
proposed API does not suggest that, it suggests any checkpoint is always
rejectably by all operators.

If we really need checkpoint rejection, I think Piotr's idea goes into a
pretty good direction: Having a "RejectionException" that leads to a
checkpoint abort in the regular way, but doesn't increment the
failure counter.

*Conclusion*

Because of the sensitivity of the checkpoint mechanism, and because the API
proposed here suggests a behavior of 

[jira] [Created] (FLINK-23301) StateFun HTTP Ingress

2021-07-07 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23301:


 Summary: StateFun HTTP Ingress
 Key: FLINK-23301
 URL: https://issues.apache.org/jira/browse/FLINK-23301
 Project: Flink
  Issue Type: Sub-task
  Components: Stateful Functions
Reporter: Stephan Ewen


The HTTP ingress would start an HTTP Server at a specified port.

The HTTP server would only handle _POST_ requests. The target function is 
represented by the path to which the request is made, the message contents is 
the body of the POST request.

The following example would send an empty message to the function with the 
address \{{namespace='example', type='greeter', id='Igal'}}.

{code}
curl -X POST http://statefun-ingress:/in/example/greeter/Igal

POST /in/example/greeter/Igal HTTP/1.1
Content-Length: 0
{code}

The example below would send empty message of type 'statefun/string' to the 
function with the address \{{namespace='example', type='greeter', id='Elisa'}} 
and the message contents\{{"{numTimesToGreet: 5}"}}.

curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d 
"\{numTimesToGreet: 5}" http://statefun-ingress:/in/example/greeter/Elisa

POST /in/example/greeter/Elisa HTTP/1.1
Content-Type: text/plain; charset=UTF-8
Content-Length: 20

{numTimesToGreet: 5}
{code}


h3. Data Types

The content type (mime type) specified in the request header of the HTTP 
request will be directly mapped to the statefun types.
For example, a \{{Content-Type: io.statefun.tyes/int}} will set the type of the 
message to \{{io.statefun.tyes/int}}.

As a special case, we map the content type \{{text/plain}} to 
\{{io.statefun.tyes/string}}, to make simple cases and examples work more 
seamlessly.

The following examples would send a message to a function that expectes a 
ProtoBuf encoded type \{{Greeting}} registerd in StateFun as 
\{{example/greeting}}.

{code}
> curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d 
> "\{numTimesToGreet: 5}"

> CONTENTS=`echo 'numTimesToGreet: 5' | ./protoc --encode Greeting 
> example_types.proto`
> echo $CONTENTS | curl -X POST -H "Content-Type: example/greeting" 
> --data-binary @- http://statefun-ingress:/in/example/greeter/Bobby
{code}


h3. Sender and Responses

We want to support round-trip-style interactions, meaning posting a request and 
receiving a response.
Given the async messaging nature of StateFun, this might not be necessarily in 
one HTTP request which immediately gives you the corresponsing response. 
Instead, it can be in issuing (POST) a request to the HTTP ingress and polling 
(GET) a response from an associated HTTP Egress.

To support these kind of patterns, the HTTP ingress will assign a random 
request correlation ID in the HTTP response.
Furthermore, the ingress will optionally set the \{{sender()}} field of the 
created message to reference a configured associated egress.

The ingress config woud add an entry referencing the egress (like 
\{{'paired_egress_name: httpout'}}).

{code}
> curl -X POST -i http://statefun-ingress:/in/example/greeter/Igal
POST /in/example/greeter/Igal HTTP/1.1
Content-Length: 0

HTTP/1.1 200 OK
StateFun-Request-Correlation-ID: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5
Content-Length: 0
{code}

The created message would have no body, but would have the \{{sender() = 
{{egress/httpout/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5}}.

_Note: We would need to extend the message address scheme to be able to 
reference egresses.
The egress itself can grab the correlation ID from the ID part of the address, 
because the HTTP egres doesn't use that field (in fact, no egress currently 
interprets the ID)._

h3. Singleton Instantiation

To avoid port conflicts, we need to do a singleton instantiation per JVM.
This can be achieved by using a statically referenced context to hold the 
instantiated servier and a reference to the

In the future, we can look into extending this to avoid setup/teardown when 
operators are cancelled for recovery.
The server would then live as long as the StateFun application (job) lives (or 
more precisely, as long as the slot lives, which is the duration that the 
TaskManager is associated with the StateFun deployment - typically the entire 
lifetime).

To achieve that, we would tear down the server in a [shutdown hook of the 
user-code 
classloader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L145).
 Instead of letting the first source set up the server, the first source would 
register its output as the stream for the server to push messages to.


h3. Configuration parameters

- Bind host (default 0.0.0.0)
 - Bind port (default )
 - Path (default "in") (for the path in the URL 
\{{http(s)://:}})

- Egress pair name, for setting the egress that replies sh

[jira] [Created] (FLINK-23281) StateFun - Simplify Getting Stared Experience with HTTP Ingresses and Egresses

2021-07-06 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23281:


 Summary: StateFun - Simplify Getting Stared Experience with HTTP 
Ingresses and Egresses
 Key: FLINK-23281
 URL: https://issues.apache.org/jira/browse/FLINK-23281
 Project: Flink
  Issue Type: New Feature
  Components: Stateful Functions
Reporter: Stephan Ewen


To make it easier to get started with StateFun, we want to reduce the 
dependencies on other systems and tools that are currently required to get your 
first program running.
_(For reference, you currently need a docker-compose setup with at least Flink, 
Kafka, ZooKeeper, and then you need to interact with it using Kafka command 
line tools (or other clients) to publish messages to the ingress topic.)_

This issue aims to add simple pre-packaged HTTP ingresses/egresses that can be 
used for examples and exploration, and can be used with standard tools (like 
\{{curl}}). That reduces the barrier to exploration.
_(Citing @ssc here: you have roughlyone lunchbreak of time to get a developer 
excited. Many devs just play around for about 45 minutes, and when they don't 
see some preliminary success with simple examples, they drop the exploration.)_

An example interaction could be:
{code}
> curl -X POST -i http://:/in/example/greeter/Igal
HTTP/2 200
request-id: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5

> curl -X GET 
> http://:/out/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5
Hello for the 1337th time...
{code}

*Note:* The HTTP Ingress/Egress here are different from the HTTP state access 
from FLINK-23261.
State requests against the state access API (FLINK-23261) only interacts with 
state entries and never invoke functions. In contrast, messages against the 
here-proposed Ingress/Egress send messages to functions like any other ingress.

This is the umbrella issue. Dedicated tasks for ingress/egress and request 
correlation are in the subtasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23261) StateFun - HTTP State Access Interface

2021-07-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23261:


 Summary: StateFun - HTTP State Access Interface 
 Key: FLINK-23261
 URL: https://issues.apache.org/jira/browse/FLINK-23261
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Stephan Ewen


h2. Functionality

To improve operations of StateFun applications, we should offer an interface to 
query and manipulate state entries.
 This can be used for exploration and debugging purposes, and to repair state 
if the application state needs to be corrected.

To make this simple, I would suggest an HTTP REST interface:
 - GET: read state entry for key
 - PUT: set/overwrite state for key
 - DELETE: drop state entry for key

The URLs could be: 
{{http(s)://statefun-service/state}},
 where the string {{//}} is the fully-qualified address 
of the target, and the {{statename}} is the name under which that persistent 
state is stored.

Keys are always UTF-8 strings in StateFun, so they can be encoded in the URL.

For the responses, we would use the common codes 200 for GET/PUT/DELETE success 
and 404 for GET/DELETE not found.
 The state values, as returned from GET requests, would be generally just the 
bytes, and not interpreted by this request handling.

The integrate of the StateFun type system and HTTP content types (mime types) 
is up for further discussion.
 One option is set the content type response header to 
{{"statefun/"}}, where all non-simple types map to 
{{Content-Type: application/octet-stream}}. We may make an exception for 
strings which could be returned as {{Content-Type: text/plain; charset=UTF-8}}.
 Later refinement is possible, like auto-stringifying contents when the request 
indicates to only accept {{text/plain}} responses.
h2. Failure Guarantees

The initial failure guarantees for PUT/DELETE would be none - the requests 
would be handled best effort.

We can easily extend this later in one of two ways:
 - Delay responses to the HTTP requests until the next checkpoint is complete. 
That makes synchronous interaction easy and sounds like a good match for a more 
admin-style interface.
 - Return the current checkpoint ID and offer a way to poll until the next 
checkpoint is completed. This avoid blocking requests, but puts more burden on 
the caller.
 Given the expected nature of the use cases for PUT/DELETE are more of a 
"admin/fix" nature, I would suggest to go with synchronous requests, for 
simplicity.

h2. Implementation

There are two options to implement this:
 (1) A Source/Sink (Ingress/Egress) pair
 (2) An Operator Coordinator with HTTP Requests

*Option (1) - Source/Sink pair*

We would implement an specific source that is both an ingress and an egress.
 The source would spawn a HTTP server (singleton per TM process).

Requests would be handled as follows:
 - Am HTTP request gets a generated correlation-ID.
 - The source injects a new message type (a "control message") into the stream. 
That message holds the Correlation-ID, the parallel subtask index of the 
originating source, and the target address and state name.
 - The function dispatcher handled these message in a special way, retrieving 
the state and sending an Egress message with the Correlation-ID to the parallel 
subtask of the egress as indicated by the message's subtask index.
 - The Egress (which is the same instance as the ingress source) uses to 
correlation ID to respond to the request.

Advantages:
 - No changes necessary in Flink
 - Might sustain higher throughput, due to multiple HTTP endpoints

Disadvantages:
 - Additional HTTP servers and ports require more setup (like service 
definitions on K8s).
 - Need to introduce new control message type and extend function dispatcher to 
handle them.
 - Makes a hard assumption that sources run on all slots. Needs "ugly" 
singleton hack to start only one server per TM process.

*Option (2) - Operator Coordinator*

Operator Coordinators are instances that run on the {{JobManager}} and can 
communicate with the Tasks via RPC.

Coordinators can receive calls from HTTP handlers at the JobManager's HTTP 
endpoint.
 An example for this is the Result Fetching through HTTP/OperatorCoordinator 
requests.
 We would need a patch to Flink to allow registering custom URLs and passing 
the path as a parameter to the request.

The RPCs can be processed in the mailbox on the Tasks, making them thread safe.
 This would also completely avoid the round-trip (source-to-sink) problem, the 
tasks simply need to send a response back to the RPC.

Advantages:
 - Reuse existing HTTP Endpoint and port. No need to have an additional HTTP 
server and port and service, for this admin-style requests, this approach 
re-uses Flink's admin HTTP endpoint.
 - No need for singleton HTTP Server logic in Tasks
 - Does require the assumption that all TMs run an instance of all ope

Re: [DISCUSS] Feedback Collection Jira Bot

2021-07-01 Thread Stephan Ewen
ls
> > > "stale-assigned" and "pull-request-available" in order to review those
> > with
> > > priority. That's also why I am not a fan of excluding tickets with
> > > "pull-request-available" from the bot. The bot can help to make these
> > > tickets visible to reviewers.
> > >
> > > @Jing Zhang: That's a problem. We should try to change the permissions
> > > accordingly or need to find a different solution.
> > >
> > > @Piotr, Kurt: Instead of closing tickets, we could introduce an
> > additional
> > > priority like "Not a Priority" to which we move tickets. No ticket
> would
> > be
> > > closed automatically.
> > >
> > > Overall, the following changes could resolve most of the concerns, I
> > > believe:
> > >
> > > * Ignore tickets with a fixVersion for all rules but the
> stale-unassigned
> > > role.
> > >
> > > * We change the time intervals as follows, accepting reality a bit more
> > ;)
> > >
> > > * stale-assigned only after 30 days (instead of 14 days)
> > > * stale-critical only after 14 days (instead of 7 days)
> > > * stale-major only after 60 days (instead of 30 days)
> > >
> > > * Introduce "Not a Priority" priority and stop closing tickets.
> > >
> > > * Change default priority for new tickets of Flink project to "Minor"
> > >
> > > The measures, I proposed above, still try to achieve the goals
> mentioned
> > > and agreed upon in the original discussion thread, which were the
> > > following:
> > >
> > >
> > >-
> > >
> > >clearer communication and expectation management with the community
> > >-
> > >
> > >   a user or contributor should be able to judge the urgency of a
> > ticket
> > >   by its priority
> > >   -
> > >
> > >   if a ticket is assigned to someone the expectation that someone
> is
> > >   working on it should hold
> > >   -
> > >
> > >generally reduce noise in Jira
> > >-
> > >
> > >reduce overhead of committers to ask about status updates of
> > >contributions or bug reports
> > >-
> > >
> > >   “Are you still working on this?”
> > >   -
> > >
> > >   “Are you still interested in this?”
> > >   -
> > >
> > >   “Does this still happen on Flink 1.x?”
> > >   -
> > >
> > >   “Are you still experiencing this issue?”
> > >   -
> > >
> > >   “What is the status of the implementation”?
> > >   -
> > >
> > >while still encouraging users to add new tickets and to leave
> feedback
> > >about existing tickets
> > >
> > >
> > > Kurt, Stephan, if you'd like to change the bot to "just close very old
> > > tickets", I suggest you start a separate discussion and subsequent
> voting
> > > thread.
> > >
> > > Best,
> > >
> > > Konstantin
> > >
> > >
> > > On Wed, Jun 30, 2021 at 9:01 AM Kurt Young  wrote:
> > >
> > > > +1 to Stephan's opinion, with just one minor difference. For my
> > > experience
> > > > and a project
> > > > as big as Flink, picking up an issue created 1-2 years ago seems
> normal
> > > to
> > > > me. To be more
> > > > specific, during the blink planner merge, I created lots of clean up
> &
> > > > refactor issues, trying
> > > > to make the code be more clean. I haven't had a chance to resolve all
> > > these
> > > > but I think they are
> > > > still good improvements. Thus I would propose we don't close any
> stall
> > > > issues for at least 2 years.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Wed, Jun 30, 2021 at 7:22 AM Stephan Ewen 
> wrote:
> > > >
> > > > > Being a bit late to the party, and don't want to ask to change
> > > > everything,
> > > > > just maybe some observation.
> > > > >
> > > > > My main observation and concern is still that this puts pressure
> and
> > > > > confusion on contributors, which are mostly blocked on committers
> for
> &

Re: [DISCUSS] Feedback Collection Jira Bot

2021-06-29 Thread Stephan Ewen
Being a bit late to the party, and don't want to ask to change everything,
just maybe some observation.

My main observation and concern is still that this puts pressure and
confusion on contributors, which are mostly blocked on committers for
reviews, or are taking tickets as multi-week projects. I think it is not a
great experience for contributors, when they are already unsure why their
work isn't getting the attention from committers that they hoped for, to
then see issues unassigned or deprioritized automatically. I think we
really need to weigh this discouragement of contributors against the desire
for a tidy ticket system.
I also think by now this isn't just a matter of phrasing the bot's message
correctly. Auto unassignment and deprioritization sends a subtle message
that jira resolution is a more important goal than paying attention to
contributors (at least I think that is how it will be perceived by many).

Back to the original motivation, to not have issues lying around forever,
ensuring there is closure eventually.
For that, even much longer intervals would be fine. Like pinging every
three months, closing after three pings - would resolve most tickets in a
year, which is not too bad in the scope of a project like Flink. Many
features/wishes easily move to two releases in the future, which is almost
a year. We would get rid of long dead tickets and interfere little with
current tickets. Contributors can probably understand ticket closing after
a year of inactivity.

I am curious if a very simple bot that really just looks at stale issues
(no comment/change in three months), pings the
issue/reporter/assignee/watchers and closes it after three pings would do
the job.
We would get out of the un-assigning business (which can send very tricky
signals) and would rely on reporters/assignees/watchers to unassign if they
see that the contributor abandoned the issue. With a cadence of three
months for pinging, this isn't much work for the ones that get pinged.

Issues where we rely on faster handling are probably the ones where
committers have a stake in getting those into an upcoming release, so these
tend to be watched anyways.

On Wed, Jun 23, 2021 at 2:39 PM JING ZHANG  wrote:

> Hi Konstantin, Chesnay,
>
> > I would like it to not unassign people if a PR is open. These are
> > usually blocked by the reviewer, not the assignee, and having the
> > assignees now additionally having to update JIRA periodically is a bit
> > like rubbing salt into the wound.
>
> I agree with Chesnay about not un-assign an issue if a PR is open.
> Besides, Could assignees remove the "stale-assigned" tag  by themself? It
> seems assignees have no permission to delete the tag if the issue is not
> created by themselves.
>
> Best regards,
> JING ZHANG
>
> Konstantin Knauf  于2021年6月23日周三 下午4:17写道:
>
> > > I agree there are such tickets, but I don't see how this is addressing
> my
> > concerns. There are also tickets that just shouldn't be closed as I
> > described above. Why do you think that duplicating tickets and losing
> > discussions/knowledge is a good solution?
> >
> > I don't understand why we are necessarily losing discussion/knowledge.
> The
> > tickets are still there, just in "Closed" state, which are included in
> > default Jira search. We could of course just add a label, but closing
> seems
> > clearer to me given that likely this ticket will not get comitter
> attention
> > in the foreseeable future.
> >
> > > I would like to avoid having to constantly fight against the bot. It's
> > already responsible for the majority of my daily emails, with quite
> little
> > benefit for me personally. I initially thought that after some period of
> > time it will settle down, but now I'm afraid it won't happen.
> >
> > Can you elaborate which rules you are running into mostly? I'd rather
> like
> > to understand how we work right now and where this conflicts with the
> Jira
> > bot vs slowly disabling the jira bot via labels.
> >
> > On Wed, Jun 23, 2021 at 10:00 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Konstantin,
> > >
> > > > In my opinion it is important that we close tickets eventually. There
> > are
> > > a
> > > > lot of tickets (bugs, improvements, tech debt) that over time became
> > > > irrelevant, out-of-scope, irreproducible, etc.  In my experience,
> these
> > > > tickets are usually not closed by anyone but the bot.
> > >
> > > I agree there are such tickets, but I don't see how this is addressing
> my
> > > concerns. There are also tickets that just shouldn't be closed as I
> > > described above. Why do you think that duplicating tickets and losing
> > > discussions/knowledge is a good solution?
> > >
> > > I would like to avoid having to constantly fight against the bot. It's
> > > already responsible for the majority of my daily emails, with quite
> > little
> > > benefit for me personally. I initially thought that after some period
> of
> > > time it will settle down, but now I'm afraid it won't 

Re: [DISCUSS] FLIP-172: Support custom transactional.id prefix in FlinkKafkaProducer

2021-06-28 Thread Stephan Ewen
Sounds good from my side, please go ahead.

On Fri, Jun 25, 2021 at 5:31 PM Wenhao Ji  wrote:

> Thanks Stephan and Piotr for your replies. It seems that there is no
> problem or concern about this feature. If there is no further
> objection, I will start a vote thread for FLIP-172.
>
> Thanks,
> Wenhao
>
> On Wed, Jun 23, 2021 at 3:41 PM Piotr Nowojski 
> wrote:
> >
> > Hi,
> >
> > +1 from my side on this idea. I do not see any problems that could be
> > caused by this change.
> >
> > Best,
> > Piotrek
> >
> > śr., 23 cze 2021 o 08:59 Stephan Ewen  napisał(a):
> >
> > > The motivation and the proposal sound good to me, +1 from my side.
> > >
> > > Would be good to have a quick opinion from someone who worked
> specifically
> > > with Kafka, maybe Becket or Piotr?
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Sat, Jun 12, 2021 at 9:50 AM Wenhao Ji 
> wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> I would like to open this discussion thread to take about the FLIP-172
> > >> <
> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-172%3A+Support+custom+transactional.id+prefix+in+FlinkKafkaProducer
> > >> >,
> > >> which aims to provide a way to support specifying a custom
> > >> transactional.id
> > >> in the FlinkKafkaProducer class.
> > >>
> > >> I am looking forwards to your feedback and suggestions!
> > >>
> > >> Thanks,
> > >> Wenhao
> > >>
> > >
>


Re: [DISCUSS] Incrementally deprecating the DataSet API

2021-06-23 Thread Stephan Ewen
Thanks for writing this up, this also reflects my understanding.

I think a blog post would be nice, ideally with an explicit call for
feedback so we learn about user concerns.
A blog post has a lot more reach than an ML thread.

Best,
Stephan


On Wed, Jun 23, 2021 at 12:23 PM Timo Walther  wrote:

> Hi everyone,
>
> I'm sending this email to make sure everyone is on the same page about
> slowly deprecating the DataSet API.
>
> There have been a few thoughts mentioned in presentations, offline
> discussions, and JIRA issues. However, I have observed that there are
> still some concerns or different opinions on what steps are necessary to
> implement this change.
>
> Let me summarize some of the steps and assumpations and let's have a
> discussion about it:
>
> Step 1: Introduce a batch mode for Table API (FLIP-32)
> [DONE in 1.9]
>
> Step 2: Introduce a batch mode for DataStream API (FLIP-134)
> [DONE in 1.12]
>
> Step 3: Soft deprecate DataSet API (FLIP-131)
> [DONE in 1.12]
>
> We updated the documentation recently to make this deprecation even more
> visible. There is a dedicated `(Legacy)` label right next to the menu
> item now.
>
> We won't deprecate concrete classes of the API with a @Deprecated
> annotation to avoid extensive warnings in logs until then.
>
> Step 4: Drop the legacy SQL connectors and formats (FLINK-14437)
> [DONE in 1.14]
>
> We dropped code for ORC, Parque, and HBase formats that were only used
> by DataSet API users. The removed classes had no documentation and were
> not annotated with one of our API stability annotations.
>
> The old functionality should be available through the new sources and
> sinks for Table API and DataStream API. If not, we should bring them
> into a shape that they can be a full replacement.
>
> DataSet users are encouraged to either upgrade the API or use Flink
> 1.13. Users can either just stay at Flink 1.13 or copy only the format's
> code to a newer Flink version. We aim to keep the core interfaces (i.e.
> InputFormat and OutputFormat) stable until the next major version.
>
> We will maintain/allow important contributions to dropped connectors in
> 1.13. So 1.13 could be considered as kind of a DataSet API LTS release.
>
> Step 5: Drop the legacy SQL planner (FLINK-14437)
> [DONE in 1.14]
>
> This included dropping support of DataSet API with SQL.
>
> Step 6: Connect both Table and DataStream API in batch mode (FLINK-20897)
> [PLANNED in 1.14]
>
> Step 7: Reach feature parity of Table API/DataStream API with DataSet API
> [PLANNED for 1.14++]
>
> We need to identify blockers when migrating from DataSet API to Table
> API/DataStream API. Here we need to estabilish a good feedback pipeline
> to include DataSet users in the roadmap planning.
>
> Step 7: Drop the Gelly library
>
> No concrete plan yet. Latest would be the next major Flink version aka
> Flink 2.0.
>
> Step 8: Drop DataSet API
>
> Planned for the next major Flink version aka Flink 2.0.
>
>
> Please let me know if this matches your thoughts. We can also convert
> this into a blog post or mention it in the next release notes.
>
> Regards,
> Timo
>
>


Re: [DISCUSS] Drop Mesos in 1.14

2021-06-23 Thread Stephan Ewen
I would prefer to remove Mesos from the Flink core as well.

I also had a similar thought as Seth: As far as I know, you can package
applications to run on Mesos with "Marathon". That would be like deploying
an opaque Flink standalone cluster on Mesos
The implication is similar to going from an active integration to a
standalone cluster (like from native Flink Kubernetes Application
Deployment to a Standalone Application Deployment on Kubernetes): You need
to make sure the number of TMs / slots and the parallelism fit together (or
use the new reactive mode). Other than that, I think it should work well
for streaming jobs.

Having a Flink-Marathon template in https://flink-packages.org/ would be a
nice thing for Mesos users.

@Oleksandr What do you think about that?

On Wed, Jun 23, 2021 at 11:31 AM Leonard Xu  wrote:

> + 1 for dropping Mesos. I checked both commit history and mail list, the
> Mesos related issue/user question has been rarely appeared.
>
> Best,
> Leonard
>
>


Re: [DISCUSS] Do not merge PRs with "unrelated" test failures.

2021-06-23 Thread Stephan Ewen
+1 to Xintong's proposal

On Wed, Jun 23, 2021 at 1:53 PM Till Rohrmann  wrote:

> I would first try to not introduce the exception for local builds. It makes
> it quite hard for others to verify the build and to make sure that the
> right things were executed. If we see that this becomes an issue then we
> can revisit this idea.
>
> Cheers,
> Till
>
> On Wed, Jun 23, 2021 at 4:19 AM Yangze Guo  wrote:
>
> > +1 for appending this to community guidelines for merging PRs.
> >
> > @Till Rohrmann
> > I agree that with this approach unstable tests will not block other
> > commit merges. However, it might be hard to prevent merging commits
> > that are related to those tests and should have been passed them. It's
> > true that this judgment can be made by the committers, but no one can
> > ensure the judgment is always precise and so that we have this
> > discussion thread.
> >
> > Regarding the unstable tests, how about adding another exception:
> > committers verify it in their local environment and comment in such
> > cases?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 22, 2021 at 8:23 PM 刘建刚  wrote:
> > >
> > > It is a good principle to run all tests successfully with any change.
> > This
> > > means a lot for project's stability and development. I am big +1 for
> this
> > > proposal.
> > >
> > > Best
> > > liujiangang
> > >
> > > Till Rohrmann  于2021年6月22日周二 下午6:36写道:
> > >
> > > > One way to address the problem of regularly failing tests that block
> > > > merging of PRs is to disable the respective tests for the time being.
> > Of
> > > > course, the failing test then needs to be fixed. But at least that
> way
> > we
> > > > would not block everyone from making progress.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Tue, Jun 22, 2021 at 12:00 PM Arvid Heise 
> wrote:
> > > >
> > > > > I think this is overall a good idea. So +1 from my side.
> > > > > However, I'd like to put a higher priority on infrastructure then,
> in
> > > > > particular docker image/artifact caches.
> > > > >
> > > > > On Tue, Jun 22, 2021 at 11:50 AM Till Rohrmann <
> trohrm...@apache.org
> > >
> > > > > wrote:
> > > > >
> > > > > > Thanks for bringing this topic to our attention Xintong. I think
> > your
> > > > > > proposal makes a lot of sense and we should follow it. It will
> > give us
> > > > > > confidence that our changes are working and it might be a good
> > > > incentive
> > > > > to
> > > > > > quickly fix build instabilities. Hence, +1.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Tue, Jun 22, 2021 at 11:12 AM Xintong Song <
> > tonysong...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > In the past a couple of weeks, I've observed several times that
> > PRs
> > > > are
> > > > > > > merged without a green light from the CI tests, where failure
> > cases
> > > > are
> > > > > > > considered *unrelated*. This may not always cause problems, but
> > would
> > > > > > > increase the chance of breaking our code base. In fact, it has
> > > > occurred
> > > > > > to
> > > > > > > me twice in the past few weeks that I had to revert a commit
> > which
> > > > > breaks
> > > > > > > the master branch due to this.
> > > > > > >
> > > > > > > I think it would be nicer to enforce a stricter rule, that no
> PRs
> > > > > should
> > > > > > be
> > > > > > > merged without passing CI.
> > > > > > >
> > > > > > > The problems of merging PRs with "unrelated" test failures are:
> > > > > > > - It's not always straightforward to tell whether a test
> > failures are
> > > > > > > related or not.
> > > > > > > - It prevents subsequent test cases from being executed, which
> > may
> > > > fail
> > > > > > > relating to the PR changes.
> > > > > > >
> > > > > > > To make things easier for the committers, the following
> > exceptions
> > > > > might
> > > > > > be
> > > > > > > considered acceptable.
> > > > > > > - The PR has passed CI in the contributor's personal workspace.
> > > > Please
> > > > > > post
> > > > > > > the link in such cases.
> > > > > > > - The CI tests have been triggered multiple times, on the same
> > > > commit,
> > > > > > and
> > > > > > > each stage has at least passed for once. Please also comment in
> > such
> > > > > > cases.
> > > > > > >
> > > > > > > If we all agree on this, I'd update the community guidelines
> for
> > > > > merging
> > > > > > > PRs wrt. this proposal. [1]
> > > > > > >
> > > > > > > Please let me know what do you think.
> > > > > > >
> > > > > > > Thank you~
> > > > > > >
> > > > > > > Xintong Song
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > >
> > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>


Re: [DISCUSS] FLIP-172: Support custom transactional.id prefix in FlinkKafkaProducer

2021-06-23 Thread Stephan Ewen
The motivation and the proposal sound good to me, +1 from my side.

Would be good to have a quick opinion from someone who worked specifically
with Kafka, maybe Becket or Piotr?

Best,
Stephan


On Sat, Jun 12, 2021 at 9:50 AM Wenhao Ji  wrote:

> Hi everyone,
>
> I would like to open this discussion thread to take about the FLIP-172
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-172%3A+Support+custom+transactional.id+prefix+in+FlinkKafkaProducer
> >,
> which aims to provide a way to support specifying a custom
> transactional.id
> in the FlinkKafkaProducer class.
>
> I am looking forwards to your feedback and suggestions!
>
> Thanks,
> Wenhao
>


[jira] [Created] (FLINK-23093) Limit number of I/O pool and Future threads in Mini Cluster

2021-06-22 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23093:


 Summary: Limit number of I/O pool and Future threads in Mini 
Cluster
 Key: FLINK-23093
 URL: https://issues.apache.org/jira/browse/FLINK-23093
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Stephan Ewen
 Fix For: 1.14.0


When running tests on CI via the minicluster, the mini cluster typically spawns 
100s of I/O threads, both in the MiniCluster I/O pool and in the TM I/O pool.

The standard rule for the maximum pool size is 4*num-cores, but the number of 
cores can be fairly large these days. Various Java versions also mess up core 
counting when running in containers (JVM container might have been given 2 
cores as resource limits, but the JVM counts the system as a whole, like 64/128 
cores).

This is both a nuisance for debugging, and a big waste of memory (each thread 
takes by default around 1MB when spawned, so the test JVM wastes 100s of MBs 
for nothing).

I would suggest to set a default of 8 I/O threads for the Mini Cluster. The 
scaling-with-cores is important for proper TM/JM deployments, but not for the 
Mini Cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22753) Activate log-based Checkpoints for StateFun

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22753:


 Summary: Activate log-based Checkpoints for StateFun
 Key: FLINK-22753
 URL: https://issues.apache.org/jira/browse/FLINK-22753
 Project: Flink
  Issue Type: Sub-task
  Components: Stateful Functions
Reporter: Stephan Ewen


Once available, we should use log-based checkpointing in StateFun, to have 
predictable checkpointing times and predictably low end-to-end exactly-once 
latencies.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22752) Add robust default state configuration to StateFun

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22752:


 Summary: Add robust default state configuration to StateFun
 Key: FLINK-22752
 URL: https://issues.apache.org/jira/browse/FLINK-22752
 Project: Flink
  Issue Type: Sub-task
Reporter: Stephan Ewen


We aim to reduce the state configuration complexity by applying a default 
configuration with robust settings, based on lessons learned in Flink.

*(1) Always use RocksDB.*

_That is already the case._

We keep this for now, as long as the only other alternative are backends with 
Objects on the heap, which are tricky in terms of predictable JVM performance. 
RocksDB has a significant performance cost, but more robust behavior.

*(2) Activate local recovery by default.*

That makes recovery cheao for soft tasks failures and gracefully cancelled 
tasks.
We need to set these options:
  - {{state.backend.local-recovery: true}}
  - {{taskmanager.state.local.root-dirs: }} - some local directory that 
will not possibly be wiped by the OS periodically, so typically some local 
directory that is not {{/tmp}}, for example {{/local/state/recovery}}.
  - {{state.backend.rocksdb.localdir: }} - a directory on the same FS / 
device as above, so that one can create hard links between them (required for 
RocksDB local checkpoints), for example {{/local/state/rocksdb}}.

Flink will most likely adopt this as a default setting as well in the future.
It still makes sense to pre-configer a different RocksDB working directory than 
{{/tmp}}.

*(3) Activate partitioned indexes by default.*

This may cost minimal performance in some cases, but can avoid massive 
performance regression in cases where the index blocks no longer fit into the 
memory cache (may happen more frequently when there are too many ColumnFamilies 
= states).

Set {{state.backend.rocksdb.memory.partitioned-index-filters: true}}.

See FLINK-20496 for details.

*(4) Increase number of transfer threads by default.*

This speeds up state recovery in many cases. The default value in Flink is a 
bit conservative, to avoid spamming DFS (like HDFS) by default. The more 
cloud-centric StateFun setups should be safe to use higher default value.

Set {{state.backend.rocksdb.checkpoint.transfer.thread.num: 8}}.

*(5) Increase RocksDB compaction threads by default.*

The number of RocksDB compaction threads is frequently a bottleneck.
Increasing it costs virtually nothing and mitigates that bottleneck in most 
cases.

{{state.backend.rocksdb.thread.num: 4}} (this value is chosen under the 
assumption that there is only one slot).




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22751) Reduce JVM Metaspace memory for StateFun

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22751:


 Summary: Reduce JVM Metaspace memory for StateFun
 Key: FLINK-22751
 URL: https://issues.apache.org/jira/browse/FLINK-22751
 Project: Flink
  Issue Type: Sub-task
  Components: Stateful Functions
Reporter: Stephan Ewen


Flink by default reserves quite a lot of metaspace memory (256 MB) out of the 
total memory budget, to accommodate applications that load (and reload) a lot 
of classes. That is necessary because user code is executed directly in the JVM.

StateFun by default doesn't execute code in the JVM, so it needs much less 
Metaspace memory. I would suggest to reduce the Metaspace size to something 
like 64MB or 96MB by default.

{{taskmanager.memory.jvm-metaspace.size: 64m}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22749) Apply a robust default State Backend Configuration

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22749:


 Summary: Apply a robust default State Backend Configuration
 Key: FLINK-22749
 URL: https://issues.apache.org/jira/browse/FLINK-22749
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Stephan Ewen


We should update the default state backend configuration with default settings 
that reflect lessons-learned about robust setups.

(1) Always use the RocksDB State Backend. That is already the case.

(2) Active Partitioned Index filters by default. This may cost some overhead in 
specific cases, but helps with massive performance regressions when we have too 
many ColumnFamilies (too many states) such that the cache can no longer hold 
all index files.

We need to add {{state.backend.rocksdb.memory.partitioned-index-filters: true}} 
to the config.

See FLINK-20496 for details.
There is a chance that Flink makes this the default in the future as well, then 
we could remove it again from the StateFun setup.

(3) Activate local recovery by default.

That should speed up the recovery of all non-hard-crashed TMs by a lot.
We need to configure
  - {{state.backend.local-recovery: true}}
  - {{taskmanager.state.local.root-dirs}} to some non-temp directory

For this to work reliably, we need a local directory that is not periodically 
wiped by the OS, so we should not rely on the default ({{/tmp}} directory, but 
set up a dedicated non-temp state directory.

Flink will probably make this the default in the future, but having a 
non-{{/tmp}} directory for the RocksDB and local snapshots makes still a lot of 
sense.

(4) Increase state transfer threads by default, to speed up state restores.

Add to the config: {{state.backend.rocksdb.checkpoint.transfer.thread.num: 8}}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22741) Hide Flink complexity from Stateful Functions

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22741:


 Summary: Hide Flink complexity from Stateful Functions
 Key: FLINK-22741
 URL: https://issues.apache.org/jira/browse/FLINK-22741
 Project: Flink
  Issue Type: New Feature
  Components: Stateful Functions
Affects Versions: statefun-3.0.0
Reporter: Stephan Ewen


This is an umbrella issue for various issues to hide and reduce the complexity 
and surface area (and configuration space) of Apache Flink when using Stateful 
Functions.

The goal of this is to create a setup and configuration that works robustly in 
the vast majority of settings. Users should not be required to configure 
anything Flink-specific, other than 

If this happens at the cost of some minor regression in peak stream throughput, 
we can most likely stomach that in StateFun, because the performance cost is 
commonly dominated by the interaction between StateFun cluster and remote 
function deployments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Statefun] Truncated Messages in Python workers

2021-05-20 Thread Stephan Ewen
Thanks for reporting this, it looks indeed like a potential bug.

I filed this Jira for it: https://issues.apache.org/jira/browse/FLINK-22729

Could you share (here ot in Jira) what the stack on the Python Worker side
is (for example which HTTP server)? Do you know if the message truncation
happens reliably at a certain message size?


On Wed, May 19, 2021 at 2:12 PM Jan Brusch 
wrote:

> Hi,
>
> recently we started seeing the following faulty behaviour in the Flink
> Stateful Functions HTTP communication towards external Python workers.
> This is only occuring when the system is under heavy load.
>
> The Java Application will send HTTP Messages to an external Python
> Function but the external Function fails to parse the message with a
> "Truncated Message Error". Printouts show that the truncated message
> looks as follows:
>
> --
>
> 
>
> my.protobuf.MyClass: 
>
> my.protobuf.MyClass: 
>
> my.protobuf.MyClass: 
>
> my.protobuf.MyClass: 
> --
>
> Which leads to the following Error in the Python worker:
>
> --
>
> Error Parsing Message: Truncated Message
>
> --
>
> Either the sender or the receiver (or something in between) seems to be
> truncacting some (not all) messages at some random point in the payload.
> The source code in both Flink SDKs looks to be correct. We temporarily
> solved this by setting the "maxNumBatchRequests" parameter in the
> external function definition really low. But this is not an ideal
> solution as we believe this adds considerable communication overhead
> between the Java and the Python Functions.
>
> The Stateful Function version is 2.2.2, java8. The Java App as well as
> the external Python workers are deployed in the same kubernetes cluster.
>
>
> Has anyone ever seen this problem before?
>
> Best regards
>
> Jan
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99
> https://www.neuland-bfi.de
>
> https://twitter.com/neuland
> https://facebook.com/neulandbfi
> https://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>


[jira] [Created] (FLINK-22729) Truncated Messages in Python workers

2021-05-20 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22729:


 Summary: Truncated Messages in Python workers
 Key: FLINK-22729
 URL: https://issues.apache.org/jira/browse/FLINK-22729
 Project: Flink
  Issue Type: Bug
  Components: Stateful Functions
Affects Versions: statefun-2.2.2
 Environment: The Stateful Function version is 2.2.2, java8. The Java 
App as well as
the external Python workers are deployed in the same kubernetes cluster.
Reporter: Stephan Ewen
 Fix For: statefun-3.1.0


Recently we started seeing the following faulty behavior in the Flink
Stateful Functions HTTP communication towards external Python workers.
This is only occurring when the system is under heavy load.

The Java Application will send HTTP Messages to an external Python
Function but the external Function fails to parse the message with a
"Truncated Message Error". Printouts show that the truncated message
looks as follows:

{code}


my.protobuf.MyClass: 

my.protobuf.MyClass: 

my.protobuf.MyClass: 

my.protobuf.MyClass: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefun-Truncated-Messages-in-Python-workers-td43831.html





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release 1.13.0, release candidate #2

2021-04-28 Thread Stephan Ewen
Glad to hear that outcome. And no worries about the false alarm.
Thank you for doing thorough testing, this is very helpful!

On Wed, Apr 28, 2021 at 1:04 PM Caizhi Weng  wrote:

> After the investigation we found that this issue is caused by the
> implementation of connector, not by the Flink framework.
>
> Sorry for the false alarm.
>
> Stephan Ewen  于2021年4月28日周三 下午3:23写道:
>
> > @Caizhi and @Becket - let me reach out to you to jointly debug this
> issue.
> >
> > I am wondering if there is some incorrect reporting of failed events?
> >
> > On Wed, Apr 28, 2021 at 8:53 AM Caizhi Weng 
> wrote:
> >
> > > -1
> > >
> > > We're testing this version on batch jobs with large (600~1000)
> > parallelisms
> > > and the following exception messages appear with high frequency:
> > >
> > > 2021-04-27 21:27:26
> > > org.apache.flink.util.FlinkException: An OperatorEvent from an
> > > OperatorCoordinator to a task was lost. Triggering task failover to
> > ensure
> > > consistency. Event: '[NoMoreSplitEvent]', targetTask:  -
> > > execution #0
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.lambda$sendEvent$0(SubtaskGatewayImpl.java:81)
> > > at
> > >
> > >
> >
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> > > at
> > >
> > >
> >
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> > > at
> > >
> > >
> >
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> > > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > at
> > >
> > >
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > > at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > at
> > >
> > >
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > >
> > > Becket Qin is investigating this issue.
> > >
> >
>


Re: [VOTE] Release 1.12.3, release candidate #1

2021-04-28 Thread Stephan Ewen
A quick heads-up:

A fix from 1.13.0 that I backported to 1.12.3 is apparently causing some
issues at larger batch scale.
We are investigating this, but it would affect this release as well.

Please see the mail thread "[VOTE] Release 1.13.0, release candidate #2"
for details.

If we want to make sure this release isn't affected, we revert that issue.
The tickets are:
  - https://issues.apache.org/jira/browse/FLINK-21996
  - https://issues.apache.org/jira/browse/FLINK-18071

On Wed, Apr 28, 2021 at 7:59 AM Robert Metzger  wrote:

> +1 (binding)
>
> - started cluster, ran example job on macos
> - sources look fine
> - Eyeballed the diff:
> https://github.com/apache/flink/compare/release-1.12.2...release-1.12.3-rc1
> .
> According to "git diff release-1.12.2...release-1.12.3-rc1 '*.xml'", there
> was only one external dependency change (snappy-java, which seems to be
> properly reflected in the NOTICE file)
> - the last CI run of the "release-1.12" branch is okay-ish:
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17315=results
> (this one failure occurred:
> https://issues.apache.org/jira/browse/FLINK-20950)
>
>
> On Tue, Apr 27, 2021 at 5:03 PM Dawid Wysakowicz 
> wrote:
>
> > +1 (binding)
> >
> > - Verified checksums and signatures
> > - Reviewed the website PR
> > - Built from sources
> > - verified dependency version upgrades and updates in NOTICE files
> > compared to 1.12.2
> > - started cluster and run WordCount example in BATCH mode and everything
> > looked good
> >
> > On 23/04/2021 23:52, Arvid Heise wrote:
> > > Hi everyone,
> > > Please review and vote on the release candidate #1 for the version
> > 1.12.3,
> > > as follows:
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > >
> > > The complete staging area is available for your review, which includes:
> > > * JIRA release notes [1],
> > > * the official Apache source release and binary convenience releases to
> > be
> > > deployed to dist.apache.org [2], which are signed with the key with
> > > fingerprint 476DAA5D1FF08189 [3],
> > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > * source code tag "release-1.2.3-rc3" [5],
> > > * website pull request listing the new release and adding announcement
> > blog
> > > post [6].
> > >
> > > The vote will be open for at least 72 hours. It is adopted by majority
> > > approval, with at least 3 PMC affirmative votes.
> > >
> > > Thanks,
> > > Your friendly release manager Arvid
> > >
> > > [1]
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349691
> > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.3-rc1/
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1419
> > > [5] https://github.com/apache/flink/releases/tag/release-1.12.3-rc1
> > > [6] https://github.com/apache/flink-web/pull/437
> > >
> >
> >
>


Re: [VOTE] Release 1.13.0, release candidate #2

2021-04-28 Thread Stephan Ewen
@Caizhi and @Becket - let me reach out to you to jointly debug this issue.

I am wondering if there is some incorrect reporting of failed events?

On Wed, Apr 28, 2021 at 8:53 AM Caizhi Weng  wrote:

> -1
>
> We're testing this version on batch jobs with large (600~1000) parallelisms
> and the following exception messages appear with high frequency:
>
> 2021-04-27 21:27:26
> org.apache.flink.util.FlinkException: An OperatorEvent from an
> OperatorCoordinator to a task was lost. Triggering task failover to ensure
> consistency. Event: '[NoMoreSplitEvent]', targetTask:  -
> execution #0
> at
>
> org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.lambda$sendEvent$0(SubtaskGatewayImpl.java:81)
> at
>
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at
>
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Becket Qin is investigating this issue.
>


[jira] [Created] (FLINK-22433) CoordinatorEventsExactlyOnceITCase stalls on Adaptive Scheduler

2021-04-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22433:


 Summary: CoordinatorEventsExactlyOnceITCase stalls on Adaptive 
Scheduler
 Key: FLINK-22433
 URL: https://issues.apache.org/jira/browse/FLINK-22433
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.13.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0, 1.13.1


Logs of the test failure:

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17077=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7030a106-e977-5851-a05e-535de648c9c9=3

Steps to reproduce: Adjust the {{CoordinatorEventsExactlyOnceITCase }} and 
extend the MiniCluster configuration:
{code}
@BeforeClass
public static void startMiniCluster() throws Exception {
final Configuration config = new Configuration();
config.setString(RestOptions.BIND_PORT, "0");
config.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
config.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22406) ReactiveModeITCase.testScaleDownOnTaskManagerLoss()

2021-04-22 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22406:


 Summary: ReactiveModeITCase.testScaleDownOnTaskManagerLoss()
 Key: FLINK-22406
 URL: https://issues.apache.org/jira/browse/FLINK-22406
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.13.0
Reporter: Stephan Ewen


The test is stalling on Azure CI.

https://dev.azure.com/sewen0794/Flink/_build/results?buildId=292=logs=0a15d512-44ac-5ba5-97ab-13a5d066c22c=634cd701-c189-5dff-24cb-606ed884db87=4865



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Task Local Recovery with mountable disks in the cloud

2021-04-21 Thread Stephan Ewen
/cc dev@flink


On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal  wrote:

> Hello,
>
> We've been experimenting with Task-local recovery using Kubernetes. We
> have a way to specify mounting the same disk across Task Manager
> restarts/deletions for when the pods get recreated. In this scenario, we
> noticed that task local recovery does not kick in (as expected based on the
> documentation).
>
> We did try to comment out the code on the shutdown path which cleaned up
> the task local directories before the pod went down / was restarted. We
> noticed that remote recovery kicked in even though the task local state was
> present. I noticed that the slot IDs changed, and was wondering if this is
> the main reason that the task local state didn't get used in this scenario?
>
> Since we're using this shared disk to store the local state across pod
> failures, would it make sense to allow keeping the task local state so that
> we can get faster recovery even for situations where the Task Manager
> itself dies? In some sense, the storage here is disaggregated from the pods
> and can potentially benefit from task local recovery. Any reason why this
> is a bad idea in general?
>
> Is there a way to preserve the slot IDs across restarts? We setup the Task
> Manager to pin the resource-id, but that didn't seem to help. My
> understanding is that the slot ID needs to be reused for task local
> recovery to kick in.
>
> Thanks,
> Sonam
>
>


[jira] [Created] (FLINK-22358) Add missing stability annotation to Split Reader API classes

2021-04-19 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22358:


 Summary: Add missing stability annotation to Split Reader API 
classes
 Key: FLINK-22358
 URL: https://issues.apache.org/jira/browse/FLINK-22358
 Project: Flink
  Issue Type: Task
  Components: Connectors / Common
Reporter: Stephan Ewen
 Fix For: 1.14.0, 1.13.1


The Split Reader API currently has no stability annotations, it is unclear 
which classes are public API, which are internal, which are stable, and which 
are evolving.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22357) Mark FLIP-27 Source API as stable

2021-04-19 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22357:


 Summary: Mark FLIP-27 Source API as stable
 Key: FLINK-22357
 URL: https://issues.apache.org/jira/browse/FLINK-22357
 Project: Flink
  Issue Type: Task
  Components: API / Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


The FLIP-27 source API was properly introduced in 1.11, has undergone some 
major improvements in 1.12.

During the stabilization in 1.13 we needed only one very minor change to those 
interfaces.

I think it is time to declare the core source API interfaces as stable, to 
allow users to safely rely on them. I would suggest to do that for 1.14, 
possibly even backport the annotation change to 1.13.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-04-17 Thread Stephan Ewen
Thanks, Thomas!

@Becket and @Nicholas - would you be ok with that approach?


On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise  wrote:

> Hi Stephan,
>
> Thanks for the feedback!
>
> I agree with the approach of starting with a simple implementation
> that can address a well understood, significant portion of use cases.
>
> I'm planning to continue work on the prototype that I had shared.
> There is production level usage waiting for it fairly soon. I expect
> to open a PR in the coming weeks.
>
> Thomas
>
>
>
>
>
> On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen  wrote:
> >
> > Thanks all for this discussion. Looks like there are lots of ideas and
> > folks that are eager to do things, so let's see how we can get this
> moving.
> >
> > My take on this is the following:
> >
> > There will probably not be one Hybrid source, but possibly multiple ones,
> > because of different strategies/requirements.
> > - One may be very simple, with switching points known up-front. Would
> > be good to have this in a very simple implementation.
> > - There may be one where the switch is dynamic and the readers need
> to
> > report back where they left off.
> > - There may be one that switches back and forth multiple times
> during a
> > job, for example Kakfa going to DFS whenever it falls behind retention,
> in
> > order to catch up again.
> >
> > This also seems hard to "design on paper"; I expect there are nuances in
> a
> > production setup that affect some details of the design. So I'd feel most
> > comfortable in adding a variant of the hybrid source to Flink that has
> been
> > used already in a real use case (not necessarily in production, but maybe
> > in a testing/staging environment, so it seems to meet all requirements).
> >
> >
> > What do you think about the following approach?
> >   - If there is a tested PoC, let's try to get it contributed to Flink
> > without trying to make it much more general.
> >   - When we see similar but a bit different requirements for another
> hybrid
> > source, then let's try to evolve the contributed one.
> >   - If we see new requirements that are so different that they don't fit
> > well with the existing hybrid source, then let us look at building a
> second
> > hybrid source for those requirements.
> >
> > We need to make connector contributions in general more easy, and I think
> > it is not a bad thing to end up with different approaches and see how
> these
> > play out against each other when being used by users. For example
> switching
> > with known boundaries, dynamic switching, back-and-forth-switching, etc.
> > (I know some committers are planning to do some work on making
> > connector contributions easier, with standardized testing frameworks,
> > decoupled CI, etc.)
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise  wrote:
> >
> > > Hi,
> > >
> > > As mentioned in my previous email, I had been working on a prototype
> for
> > > the hybrid source.
> > >
> > > You can find it at https://github.com/tweise/flink/pull/1
> > >
> > > It contains:
> > > * Switching with configurable chain of sources
> > > * Fixed or dynamic start positions
> > > * Test with MockSource and FileSource
> > >
> > > The purpose of the above PR is to gather feedback and help drive
> consensus
> > > on the FLIP.
> > >
> > > * How to support a dynamic start position within the source chain?
> > >
> > > Relevant in those (few?) cases where start positions are not known
> upfront.
> > > You can find an example of what that might look like in the tests:
> > >
> > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > >
> > >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > >
> > > When switching, the enumerator of the previous source needs to
> > > supply information about consumed splits that allows to set the start
> > > position for the next source. That could be something like the last
> > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > > doesn't track finished splits.)
> > >
> > > See previous discussion regarding start/end position. The prototype
> shows
> > > the use of checkpoint state with c

[jira] [Created] (FLINK-22324) Backport FLINK-18071 for 1.12.x

2021-04-16 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22324:


 Summary: Backport FLINK-18071 for 1.12.x
 Key: FLINK-22324
 URL: https://issues.apache.org/jira/browse/FLINK-22324
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.3


See  FLINK-18071 - this issue only tracks the backport to allow closing the 
blocker issue for 1.13.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Releasing Flink 1.12.3

2021-04-16 Thread Stephan Ewen
+1

Thanks for pushing this, Arvid, let's get this fix out asap.



On Fri, Apr 16, 2021 at 9:46 AM Arvid Heise  wrote:

> Dear devs,
>
> Since we just fixed a severe bug that causes the dataflow to halt under
> specific circumstances [1], we would like to release a bugfix asap.
>
> I would volunteer as the release manager and kick off the release process
> on next Monday (April 19th).
> What do you think?
>
> Note that this time around, I would not wait for any specific
> fixes/backports. However, you can still merge all fixes that you'd like to
> see in 1.12.3 until Monday.
>
> Btw the fix is already in master and will be directly applied to the next
> RC of 1.13.0. Flink version 1.11.x and older are not affected.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21992
>


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-04-13 Thread Stephan Ewen
Thanks all for this discussion. Looks like there are lots of ideas and
folks that are eager to do things, so let's see how we can get this moving.

My take on this is the following:

There will probably not be one Hybrid source, but possibly multiple ones,
because of different strategies/requirements.
- One may be very simple, with switching points known up-front. Would
be good to have this in a very simple implementation.
- There may be one where the switch is dynamic and the readers need to
report back where they left off.
- There may be one that switches back and forth multiple times during a
job, for example Kakfa going to DFS whenever it falls behind retention, in
order to catch up again.

This also seems hard to "design on paper"; I expect there are nuances in a
production setup that affect some details of the design. So I'd feel most
comfortable in adding a variant of the hybrid source to Flink that has been
used already in a real use case (not necessarily in production, but maybe
in a testing/staging environment, so it seems to meet all requirements).


What do you think about the following approach?
  - If there is a tested PoC, let's try to get it contributed to Flink
without trying to make it much more general.
  - When we see similar but a bit different requirements for another hybrid
source, then let's try to evolve the contributed one.
  - If we see new requirements that are so different that they don't fit
well with the existing hybrid source, then let us look at building a second
hybrid source for those requirements.

We need to make connector contributions in general more easy, and I think
it is not a bad thing to end up with different approaches and see how these
play out against each other when being used by users. For example switching
with known boundaries, dynamic switching, back-and-forth-switching, etc.
(I know some committers are planning to do some work on making
connector contributions easier, with standardized testing frameworks,
decoupled CI, etc.)

Best,
Stephan


On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise  wrote:

> Hi,
>
> As mentioned in my previous email, I had been working on a prototype for
> the hybrid source.
>
> You can find it at https://github.com/tweise/flink/pull/1
>
> It contains:
> * Switching with configurable chain of sources
> * Fixed or dynamic start positions
> * Test with MockSource and FileSource
>
> The purpose of the above PR is to gather feedback and help drive consensus
> on the FLIP.
>
> * How to support a dynamic start position within the source chain?
>
> Relevant in those (few?) cases where start positions are not known upfront.
> You can find an example of what that might look like in the tests:
>
>
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
>
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
>
> When switching, the enumerator of the previous source needs to
> supply information about consumed splits that allows to set the start
> position for the next source. That could be something like the last
> processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> doesn't track finished splits.)
>
> See previous discussion regarding start/end position. The prototype shows
> the use of checkpoint state with converter function.
>
> * Should readers be deployed dynamically?
>
> The prototype assumes a static source chain that is fixed at job submission
> time. Conceivably there could be use cases that require more flexibility.
> Such as switching one KafkaSource for another. A step in that direction
> would be to deploy the actual readers dynamically, at the time of switching
> source.
>
> Looking forward to feedback and suggestions for next steps!
>
> Thomas
>
> On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise  wrote:
>
> > Hi Nicholas,
> >
> > Thanks for the reply. I had implemented a small PoC. It switches a
> > configurable sequence of sources with predefined bounds. I'm using the
> > unmodified MockSource for illustration. It does not require a
> "Switchable"
> > interface. I looked at the code you shared and the delegation and
> signaling
> > works quite similar. That's a good validation.
> >
> > Hi Kezhu,
> >
> > Thanks for bringing the more detailed discussion regarding the start/end
> > position. I think in most cases the start and end positions will be known
> > when the job is submitted. If we take a File -> Kafka source chain as
> > example, there would most likely be a timestamp at which we want to
> > transition from files to reading from Kafka. So we would either set the
> > start position for Kafka based on that timestamp or provide the offsets
> > directly. (Note that I'm skipping a few related nuances here. In order to
> > achieve an exact switch without duplication or gap, we may also need some
> > overlap and filtering, but that's a separate issue.)
> >
> > The point 

Re: [DISCUSS] Backport FLIP-27 Kafka source connector fixes with API change to release-1.12.

2021-04-13 Thread Stephan Ewen
Hi all!

Generally, avoiding API changes in Bug fix versions is the right thing, in
my opinion.

But this case is a bit special, because we are changing something that
never worked properly in the first place.
So we are not breaking a "running thing" here, but making it usable.

So +1 from my side to backport these changes, I think we make more users
happy than angry with this.

Best,
Stephan


On Thu, Apr 8, 2021 at 11:35 AM Becket Qin  wrote:

> Hi Arvid,
>
> There are interface changes to the Kafka source, and there is a backwards
> compatible change in the base source implementation. Therefore technically
> speaking, users might be able to run the Kafka source in 1.13 with a 1.12
> Flink job. However, it could be tricky because there might be some
> dependent jar conflicts between 1.12 and 1.13. So this solution seems a
> little fragile.
>
> I'd second Till's question if there is an issue for users that start with
> > the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka
> source
> > with API changes.
>
>
> Just to clarify, the bug fixes themselves include API changes, they are not
> separable. So we basically have three options here:
>
> 1. Do not backport fixes in 1.12. So users have to upgrade to 1.13 in order
> to use the new Kafka source.
> 2. Write some completely different fixes for release 1.12 and ask users to
> migrate to the new API when they upgrade to 1.13
> 3. Backport the fix with API changes to 1.12. So users don't need to handle
> interface change when they upgrade to 1.13+.
>
> Personally I think option 3 here is better because it does not really
> introduce any trouble to the users. The downside is that we do need to
> change the API of Kafka source in 1.12. Given that the changed API won't be
> useful without these bug fixes, changing the API seems to be doing more
> good than bad here.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Thu, Apr 8, 2021 at 2:39 PM Arvid Heise  wrote:
>
> > Hi Becket,
> >
> > did you need to change anything to the source interface itself? Wouldn't
> it
> > be possible for users to simply use the 1.13 connector with their Flink
> > 1.12 deployment?
> >
> > I think the late-upgrade argument can be made for any feature, but I also
> > see that the Kafka connector is of high interest.
> >
> > I'd second Till's question if there is an issue for users that start with
> > the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka
> source
> > with API changes.
> >
> > Best,
> >
> > Arvid
> >
> > On Thu, Apr 8, 2021 at 2:54 AM Becket Qin  wrote:
> >
> > > Thanks for the comment, Till and Thomas.
> > >
> > > As far as I know there are some users who have just upgraded their
> Flink
> > > version from 1.8 / 1.9 to Flink 1.12 and might not upgrade the version
> > in 6
> > > months or more. There are also some organizations that have the
> strategy
> > of
> > > not running the latest version of a project, but only the second latest
> > > version with bug fixes. So those users may still benefit from the
> > backport.
> > > However, arguably the old Kafka source is there anyways in 1.12, so
> they
> > > should not be blocked on having the new source.
> > >
> > > I am leaning towards backporting the fixes mainly because this way we
> > might
> > > have more users migrating to the new Source and provide feedback. It
> will
> > > take some time for the users to pick up 1.13, especially for the users
> > > running Flink at large scale. So backporting the fixes to 1.12 would
> help
> > > get the new source to be used sooner.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Apr 8, 2021 at 12:40 AM Thomas Weise  wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for fixing the new KafkaSource issues.
> > > >
> > > > I'm interested in using these fixes with 1.12 for experimental
> > purposes.
> > > >
> > > > +1 for backporting. 1.12 is the current stable release and users who
> > > would
> > > > like to try the FLIP-27 sources are likely to use that release.
> > > >
> > > > Thomas
> > > >
> > > > On Wed, Apr 7, 2021 at 2:50 AM Till Rohrmann 
> > > wrote:
> > > >
> > > > > Hi Becket,
> > > > >
> > > > > If I remember correctly, then we deliberately not documented the
> > Kafka
> > > > > connector in the 1.12 release. Hence, from this point there should
> be
> > > no
> > > > > need to backport any fixes because users are not aware of this
> > feature.
> > > > >
> > > > > On the other hand this also means that we should be able to break
> > > > anything
> > > > > we want to. Consequently, backporting these fixes should be
> possible.
> > > > >
> > > > > The question would probably be whether we want to ship new features
> > > with
> > > > a
> > > > > bug fix release. Do we know of any users who want to use the new
> > Kafka
> > > > > source, are using the 1.12 version and cannot upgrade to 1.13 once
> it
> > > is
> > > > > released? If this is the case, then this could be an argument for
> > > > shipping
> > > > > this feature with 

[jira] [Created] (FLINK-22093) Unstable test "ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling"

2021-04-01 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22093:


 Summary: Unstable test 
"ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling"
 Key: FLINK-22093
 URL: https://issues.apache.org/jira/browse/FLINK-22093
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Stephan Ewen
 Fix For: 1.13.0


The test 
{{ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling()}}
 failed in all profiles in my latest CI build (even though it passes locally).

  - 
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267=ab910030-93db-52a7-74a3-34a0addb481b=8102

  - 
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=6e55a443-5252-5db5-c632-109baf464772=9df6efca-61d0-513a-97ad-edb76d85786a=6698

  - 
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=cc649950-03e9-5fae-8326-2f1ad744b536=51cab6ca-669f-5dc0-221d-1e4f7dc4fc85



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22069) Check Log Pollution for 1.13 release

2021-03-31 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22069:


 Summary: Check Log Pollution for 1.13 release
 Key: FLINK-22069
 URL: https://issues.apache.org/jira/browse/FLINK-22069
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Stephan Ewen
 Fix For: 1.13.0


We should check for log pollution and confusing log lines before the release.
Below are some lines I stumbled over while using Flink during testing.

-

These lines show up on any execution of a local job and make me think I forgot 
to configure something I probably should have, wondering whether this might 
cause problems later?

These have been in Flink for a few releases now, might be worth rephrasing, 
though.

{code}
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.cpu.cores required for local execution is not 
set, setting it to the maximal possible value.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.task.heap.size required for local 
execution is not set, setting it to the maximal possible value.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.task.off-heap.size required for local 
execution is not set, setting it to the maximal possible value.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.network.min required for local 
execution is not set, setting it to its default value 64 mb.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.network.max required for local 
execution is not set, setting it to its default value 64 mb.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.managed.size required for local 
execution is not set, setting it to its default value 128 mb.
{code}

-

These lines show up on every job start, even if there is no recovery but just a 
plain job start. They are not particularly problematic, but also not helping.

{code}
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,855 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
{code}



When using {{DataStream.collect()}} we always have an excpetion in the log for 
the first fetch attempt, before the JM is ready.
The loop retries and the program succeeds, but the exception in the log raises 
confusion about whether there is a swallowed but impactful error.

{code}
7199 [main] WARN  
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
exception occurs when fetching query results
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: 
Unable to get JobMasterGateway for initializing job. The requested operation is 
not available while the JobManager is initializing.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:155)
 ~[classes/:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
 [classes

[jira] [Created] (FLINK-21996) Transient RPC failure without TaskManager failure can lead to split assignment loss

2021-03-26 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-21996:


 Summary: Transient RPC failure without TaskManager failure can 
lead to split assignment loss
 Key: FLINK-21996
 URL: https://issues.apache.org/jira/browse/FLINK-21996
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.12.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.13.0


NOTE: This bug has not been actually observed. It is based on reviews of the 
current implementation.
I would expect it to be a pretty rare case, bu at scale, even the rare cases 
happen often enough.

h2. Problem

Intermediate RPC messages from JM to TM can get dropped, even when the TM is 
not marked as failed.
That can happen when the connection can be recovered before the heartbeat times 
out.

So RPCs generally retry, or handle failures: For example Deploy-Task-RPC 
retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers a 
new checkpoint.

The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a 
Future with the acknowledgement. But if that one fails, we are in the situation 
where we do not know whether the event sending was successful or not (only the 
ack failed).

This is especially tricky for split assignments and checkpoints. Consider this 
sequence of actions:
  1. Coordinator assigns a split. Ack not yet received.
  2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so 
is not included on the Coordinator.
  3. Split assignment RPC response is "failed".
  4. Checkpoint completes.

Now we don't know whether the split was in the checkpoint on the Operator 
(TaskManager) or not, and with that we don't know whether we should add it back 
to the coordinator. We need to do something to make sure the split is now 
either on the coordinator or on the Operator. Currently, the split is 
implicitly assumed to be on the Operator; if it isn't, then that split is lost.

Not, it is worth pointing out that this is a pretty rare situation, because it 
means that the RPC with the split assignment fails and the one for the 
checkpoint succeeds, even though they are in close proximity. The way the 
Akka-based RPC transport works (with retries, etc.), this can happen, but isn't 
very likely. That why we haven't so far seen this bug in practice or haven't 
gotten a report for it, yet.


h2. Proposed solution

The solution has two components:

  1. Fallback to consistent point: If the system doesn't know whether two parts 
are still consistent with each other (here coordinator and Operator), fall back 
to a consistent point. Here that is the case when the Ack-Future for the "Send 
Operator Event" RPC fails or times out. Then we call the scheduler to trigger a 
failover of the target operator to latest checkpoint and signaling the 
coordinator the same. That restores consistency. We can later optimize this 
(see below).

  2. We cannot trigger checkpoints while we are "in limbo" concerning our 
knowledge about splits. Concretely that means that the Coordinator can only 
acknowledge the checkpoint once the Acks for pending Operator Event RPCs 
(Assign-Splits) have arrived. The checkpoint future is conditional on all 
pending RPC futures. If the RPC futures fail (or time out) then the checkpoint 
cannot complete (and the target operator will anyways go through a failover). 
In the common case, RPC round trip time is milliseconds, which would be added 
to the checkpoint latency if the checkpoint happends to overlap with a split 
assignment (most won't).


h2. Possible Future Improvements

Step (1) above can be optimized by going with retries first and sequence 
numbers to deduplicate the calls. That can help reduce the number of cases were 
a failover is needed. However, the number of situations where the RPC would 
need a retry and has a chance of succeeding (the TM is not down) should be very 
few to begin with, so whether this optimization is worth it remains to be seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21935) Remove "state.backend.async" option.

2021-03-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-21935:


 Summary: Remove "state.backend.async" option.
 Key: FLINK-21935
 URL: https://issues.apache.org/jira/browse/FLINK-21935
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stephan Ewen
 Fix For: 1.13.0


Checkpoints are always asynchronous, there is no case ever for a synchronous 
checkpoint.
The RocksDB state backend doesn't even support synchronous snapshots, and the 
HashMap Heap backend also has no good use case for synchronous snapshots (other 
than a very minor reduction in heap objects).

Most importantly, we should not expose this option in the constructors of the 
new state backend API classes, like {{HashMapStateBackend}}. 

I marked this a blocker because it is part of the new user-facing State Backend 
API and I would like to avoid that this option enters this API and causes 
confusion when we eventually remove it.

/cc [~sjwiesman] and [~liyu]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-22 Thread Stephan Ewen
Hi Yingjie!

Thanks for doing those experiments, the results look good. Let's go ahead
with 32M then.

Regarding the key, I am not strongly opinionated there. There are arguments
for both keys, (1) making the key part of the network pool config as you
did here or (2) making it part of the TM config (relative to framework
off-heap memory). I find (1) quite understandable, but it is personal
taste, so I can go with either option.

Best,
Stephan


On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧)  wrote:

> Hi all,
>
> I have tested the default memory size with both batch (tpcds) and
> streaming jobs running in one session cluster (more than 100 queries). The
> result is:
> 1. All settings (16M, 32M and 64M) can work well without any OOM.
> 2. For streaming jobs running after batch jobs, there is no performance or
> stability regression.
> 2. 32M and 64M is better (over 10%) in terms of performance for the test
> batch job on HDD.
>
> Based on the above results, I think 32M is a good default choice, because
> the performance is good enough for the test job and compared to 64M, more
> direct memory can be used by netty and other components. What do you think?
>
> BTW, about the configuration key, do we reach a consensus? I am
> temporarily using taskmanager.memory.network.batch-shuffle-read.size in
> my PR now. Any suggestions about that?
>
> Best,
> Yingjie (Kevin)
>
> --
> 发件人:Guowei Ma
> 日 期:2021年03月09日 17:28:35
> 收件人:曹英杰(北牧)
> 抄 送:Till Rohrmann; Stephan Ewen;
> dev; user; Xintong Song<
> tonysong...@gmail.com>
> 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM
> merge shuffle
>
> Hi, all
>
> Thanks all for your suggestions and feedback.
> I think it is a good idea that we increase the default size of the
> separated pool by testing. I am fine with adding the suffix(".size") to the
> config name, which makes it more clear to the user.
> But I am a little worried about adding a prefix("framework") because
> currently the tm shuffle service is only a shuffle-plugin, which is not a
> part of the framework. So maybe we could add a clear explanation in the
> document?
>
> Best,
> Guowei
>
>
> On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧)  wrote:
>
>> Thanks for the suggestions. I will do some tests and share the results
>> after the implementation is ready. Then we can give a proper default value.
>>
>> Best,
>> Yingjie
>>
>> --
>> 发件人:Till Rohrmann
>> 日 期:2021年03月05日 23:03:10
>> 收件人:Stephan Ewen
>> 抄 送:dev; user; Xintong Song<
>> tonysong...@gmail.com>; 曹英杰(北牧); Guowei Ma<
>> guowei@gmail.com>
>> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
>> shuffle
>>
>> Thanks for this proposal Guowei. +1 for it.
>>
>> Concerning the default size, maybe we can run some experiments and see
>> how the system behaves with different pool sizes.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen  wrote:
>>
>>> Thanks Guowei, for the proposal.
>>>
>>> As discussed offline already, I think this sounds good.
>>>
>>> One thought is that 16m sounds very small for a default read buffer
>>> pool. How risky do you think it is to increase this to 32m or 64m?
>>>
>>> Best,
>>> Stephan
>>>
>>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma  wrote:
>>>
>>>> Hi, all
>>>>
>>>>
>>>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>>>> out-of-the-box experience of using TM merge shuffle is not very good. The
>>>> main reason is that the default configuration always makes users encounter
>>>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>>>> to avoid the problem.
>>>> Goals
>>>>
>>>>1. Don't affect the streaming and pipelined-shuffle-only batch
>>>>setups.
>>>>2. Don't mix memory with different life cycle in the same pool.
>>>>E.g., write buffers needed by running tasks and read buffer needed even
>>>>after tasks being finished.
>>>>3. User can use the TM merge shuffle with default memory
>>>>configurations. (May need further tunings for performance optimization, 
>>>> but
>>>>should not fail with the default configurations.)
>>>>
>>>> Proposal
>>>>
>>>>1. Introduce a co

Re: [ANNOUCE][DISCUSS] Roadmap Update for Website

2021-03-15 Thread Stephan Ewen
I created a Pull Request for the roadmap update:
https://github.com/apache/flink-web/pull/426

Happy to take reviews!

Best,
Stephan


On Wed, Mar 10, 2021 at 1:55 PM Stephan Ewen  wrote:

> Thanks for the positive feedback. Will convert this to a pull request then
> in the next days.
>
> Still looking to beautify the Feature Radar graphic a bit.
>
> Best,
> Stephan
>
>
> On Wed, Mar 10, 2021 at 8:26 AM Dawid Wysakowicz 
> wrote:
>
>> Thank you Stephan for working on this. It's really nice to see the
>> roadmap updated. I also find the "Feature Radar" extremely helpful. It
>> looks good to be published from my side.
>>
>> Best,
>>
>> Dawid
>>
>> On 02/03/2021 15:38, Stephan Ewen wrote:
>> > Hi all!
>> >
>> > The roadmap on the Flink website is quite outdated:
>> > https://flink.apache.org/roadmap.html
>> >
>> > I drafted an update to the roadmap that reflects the currently ongoing
>> > bigger threads.
>> > Not every detail is mentioned there, because this roadmap should give
>> users
>> > a high-level view where the project is going.
>> >
>> > There is also a new "Feature Radar" to help users understand in which
>> stage
>> > of maturity and support individual features are.
>> >
>> > Now, I am sure this will cause some discussion about which feature
>> > should be on which level of maturity. This is my initial proposal which
>> I
>> > checked with some committers, but of course, there may be different
>> > opinions. In that case, please bring this up in this discussion thread
>> and
>> > we find consensus together.
>> >
>> >
>> https://docs.google.com/document/d/1g6T72-8PHTsfhZq8GWWzG8F3PmD-dMuFSzg2AozXNxk/edit?usp=sharing
>> >
>> > The raw figure for the feature radar is:
>> >
>> https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg
>> >
>> > If someone has some graphics skills to make this look more pretty, help
>> is
>> > greatly welcome!
>> > (Only request would be to keep a format that many people (and open
>> tools)
>> > can edit, so maintenance remains easy).
>> >
>> > Looking forward to hearing what you think!
>> >
>> > Best,
>> > Stephan
>> >
>>
>>


Re: [ANNOUCE][DISCUSS] Roadmap Update for Website

2021-03-10 Thread Stephan Ewen
Thanks for the positive feedback. Will convert this to a pull request then
in the next days.

Still looking to beautify the Feature Radar graphic a bit.

Best,
Stephan


On Wed, Mar 10, 2021 at 8:26 AM Dawid Wysakowicz 
wrote:

> Thank you Stephan for working on this. It's really nice to see the
> roadmap updated. I also find the "Feature Radar" extremely helpful. It
> looks good to be published from my side.
>
> Best,
>
> Dawid
>
> On 02/03/2021 15:38, Stephan Ewen wrote:
> > Hi all!
> >
> > The roadmap on the Flink website is quite outdated:
> > https://flink.apache.org/roadmap.html
> >
> > I drafted an update to the roadmap that reflects the currently ongoing
> > bigger threads.
> > Not every detail is mentioned there, because this roadmap should give
> users
> > a high-level view where the project is going.
> >
> > There is also a new "Feature Radar" to help users understand in which
> stage
> > of maturity and support individual features are.
> >
> > Now, I am sure this will cause some discussion about which feature
> > should be on which level of maturity. This is my initial proposal which I
> > checked with some committers, but of course, there may be different
> > opinions. In that case, please bring this up in this discussion thread
> and
> > we find consensus together.
> >
> >
> https://docs.google.com/document/d/1g6T72-8PHTsfhZq8GWWzG8F3PmD-dMuFSzg2AozXNxk/edit?usp=sharing
> >
> > The raw figure for the feature radar is:
> >
> https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg
> >
> > If someone has some graphics skills to make this look more pretty, help
> is
> > greatly welcome!
> > (Only request would be to keep a format that many people (and open tools)
> > can edit, so maintenance remains easy).
> >
> > Looking forward to hearing what you think!
> >
> > Best,
> > Stephan
> >
>
>


[jira] [Created] (FLINK-21695) Increase default value for number of KeyGroups

2021-03-09 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-21695:


 Summary: Increase default value for number of KeyGroups
 Key: FLINK-21695
 URL: https://issues.apache.org/jira/browse/FLINK-21695
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Stephan Ewen
 Fix For: 1.13.0


The current calculation for the number of Key Groups (max parallelism) leads in 
many cases to data skew and to confusion among users.

Specifically, the fact that for maxParallelisms above 128, the default value is 
set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, half of 
the tasks get one keygroup and the other half gets two keygroups, which is very 
skewed.

See section (1) in this "lessons learned" blog post. 
https://engineering.contentsquare.com/2021/ten-flink-gotchas/

We can fix this by
  - either setting a default maxParallelism to something pretty high (2048 for 
example). The cost is that we add the default key group overhead per state 
entry from one byte to two bytes.
  - or we stay with some similar logic, but we instead of {{1.5 x 
operatorParallelism}} we go with some higher multiplier, like {{4 x 
operatorParallelism}}. The price is again that we more quickly reach the point 
where we have two bytes of keygroup encoding overhead, instead of one.

Implementation wise, there is an unfortunate situation that the maxParallelism, 
if not configured, is not stored anywhere in the job graph, but re-derived on 
the JobManager each time it loads a JobGraph vertex (ExecutionJobVertex) which 
does not have a MaxParallelism configured. This relies on the implicit contract 
that this logic never changes.
Changing this logic will instantly break all jobs which have not explicitly 
configured the Max Parallelism. That seems like a pretty heavy design 
shortcoming, unfortunately :-(

A way to partially work around that is by moving the logic that derives the 
maximum parallelism to the {{StreamGraphGenerator}}, so we never create 
JobGraphs where vertices have no configured Max Parallelism (and we keep the 
re-derivation logic for backwards compatibility for persisted JobGraphs).
The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to 
give existing un-configured applications a way to keep restoring from old 
savepoints. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21694) Increase default value of "state.backend.rocksdb.checkpoint.transfer.thread.num"

2021-03-09 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-21694:


 Summary: Increase default value of 
"state.backend.rocksdb.checkpoint.transfer.thread.num"
 Key: FLINK-21694
 URL: https://issues.apache.org/jira/browse/FLINK-21694
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stephan Ewen
 Fix For: 1.13.0


The default value for the number of threads used to download state artifacts 
from checkpoint storage should be increased.

The increase should not pose risk of regression, but does in many cases speed 
up checkpoint recovery significantly.

Something similar was reported in this blog post, item (3).
https://engineering.contentsquare.com/2021/ten-flink-gotchas/

A default value of 8 (eight) sounds like a good default. It should not result 
in excessive thread explosion, and already speeds up recovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUCE][DISCUSS] Roadmap Update for Website

2021-03-08 Thread Stephan Ewen
Thanks, Robert!

As far as I understand, the Reactive Mode is not yet released, that's why I
didn't put it there.
The radar should be updated after each release.

What do you think?

On Sat, Mar 6, 2021 at 3:41 PM Robert Metzger  wrote:

> Thanks a lot for updating the roadmap!
>
> I really like the new Feature Radar, because it makes some assumptions
> floating around on JIRA and the lists properly documented.
>
> I guess the radar is missing the Reactive Mode as an MVP feature.
>
>
> Otherwise, I'm +1 on publishing this soon!
>
> On Tue, Mar 2, 2021 at 3:38 PM Stephan Ewen  wrote:
>
> > Hi all!
> >
> > The roadmap on the Flink website is quite outdated:
> > https://flink.apache.org/roadmap.html
> >
> > I drafted an update to the roadmap that reflects the currently ongoing
> > bigger threads.
> > Not every detail is mentioned there, because this roadmap should give
> users
> > a high-level view where the project is going.
> >
> > There is also a new "Feature Radar" to help users understand in which
> stage
> > of maturity and support individual features are.
> >
> > Now, I am sure this will cause some discussion about which feature
> > should be on which level of maturity. This is my initial proposal which I
> > checked with some committers, but of course, there may be different
> > opinions. In that case, please bring this up in this discussion thread
> and
> > we find consensus together.
> >
> >
> >
> https://docs.google.com/document/d/1g6T72-8PHTsfhZq8GWWzG8F3PmD-dMuFSzg2AozXNxk/edit?usp=sharing
> >
> > The raw figure for the feature radar is:
> >
> >
> https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg
> >
> > If someone has some graphics skills to make this look more pretty, help
> is
> > greatly welcome!
> > (Only request would be to keep a format that many people (and open tools)
> > can edit, so maintenance remains easy).
> >
> > Looking forward to hearing what you think!
> >
> > Best,
> > Stephan
> >
>


Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-06 Thread Stephan Ewen
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool.
How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan


On Fri, Mar 5, 2021 at 4:32 AM Guowei Ma  wrote:

> Hi, all
>
>
> In the Flink 1.12 we introduce the TM merge shuffle. But the
> out-of-the-box experience of using TM merge shuffle is not very good. The
> main reason is that the default configuration always makes users encounter
> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
> to avoid the problem.
> Goals
>
>1. Don't affect the streaming and pipelined-shuffle-only batch setups.
>2. Don't mix memory with different life cycle in the same pool. E.g.,
>write buffers needed by running tasks and read buffer needed even after
>tasks being finished.
>3. User can use the TM merge shuffle with default memory
>configurations. (May need further tunings for performance optimization, but
>should not fail with the default configurations.)
>
> Proposal
>
>1. Introduce a configuration `taskmanager.memory.network.batch-read`
>to specify the size of this memory pool. The default value is 16m.
>2. Allocate the pool lazily. It means that the memory pool would be
>allocated when the TM merge shuffle is used at the first time.
>3. This pool size will not be add up to the TM's total memory size,
>but will be considered part of
>`taskmanager.memory.framework.off-heap.size`. We need to check that the
>pool size is not larger than the framework off-heap size, if TM merge
>shuffle is enabled.
>
>
> In this default configuration, the allocation of the memory pool is almost
> impossible to fail. Currently the default framework’s off-heap memory is
> 128m, which is mainly used by Netty. But after we introduced zero copy, the
> usage of it has been reduced, and you can refer to the detailed data [2].
> Known Limitation
> Usability for increasing the memory pool size
>
> In addition to increasing `taskmanager.memory.network.batch-read`, the
> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
> at the same time. It also means that once the user forgets this, it is
> likely to fail the check when allocating the memory pool.
>
>
> So in the following two situations, we will still prompt the user to
> increase the size of `framework.off-heap.size`.
>
>1. `taskmanager.memory.network.batch-read` is bigger than
>`taskmanager.memory.framework.off-heap.size`
>2. Allocating the pool encounters the OOM.
>
>
> An alternative is that when the user adjusts the size of the memory pool,
> the system automatically adjusts it. But we are not entierly sure about
> this, given its implicity and complicating the memory configurations.
> Potential memory waste
>
> In the first step, the memory pool will not be released once allocated. This
> means in the first step, even if there is no subsequent batch job, the
> pooled memory cannot be used by other consumers.
>
>
> We are not releasing the pool in the first step due to the concern that
> frequently allocating/deallocating the entire pool may increase the GC
> pressue. Investitations on how to dynamically release the pool when it's no
> longer needed is considered a future follow-up.
>
>
> Looking forward to your feedback.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20740
>
> [2] https://github.com/apache/flink/pull/7368.
> Best,
> Guowei
>


Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

2021-03-05 Thread Stephan Ewen
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool.
How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma  wrote:

> Hi, all
>
>
> In the Flink 1.12 we introduce the TM merge shuffle. But the
> out-of-the-box experience of using TM merge shuffle is not very good. The
> main reason is that the default configuration always makes users encounter
> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
> to avoid the problem.
> Goals
>
>1. Don't affect the streaming and pipelined-shuffle-only batch setups.
>2. Don't mix memory with different life cycle in the same pool. E.g.,
>write buffers needed by running tasks and read buffer needed even after
>tasks being finished.
>3. User can use the TM merge shuffle with default memory
>configurations. (May need further tunings for performance optimization, but
>should not fail with the default configurations.)
>
> Proposal
>
>1. Introduce a configuration `taskmanager.memory.network.batch-read`
>to specify the size of this memory pool. The default value is 16m.
>2. Allocate the pool lazily. It means that the memory pool would be
>allocated when the TM merge shuffle is used at the first time.
>3. This pool size will not be add up to the TM's total memory size,
>but will be considered part of
>`taskmanager.memory.framework.off-heap.size`. We need to check that the
>pool size is not larger than the framework off-heap size, if TM merge
>shuffle is enabled.
>
>
> In this default configuration, the allocation of the memory pool is almost
> impossible to fail. Currently the default framework’s off-heap memory is
> 128m, which is mainly used by Netty. But after we introduced zero copy, the
> usage of it has been reduced, and you can refer to the detailed data [2].
> Known Limitation
> Usability for increasing the memory pool size
>
> In addition to increasing `taskmanager.memory.network.batch-read`, the
> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
> at the same time. It also means that once the user forgets this, it is
> likely to fail the check when allocating the memory pool.
>
>
> So in the following two situations, we will still prompt the user to
> increase the size of `framework.off-heap.size`.
>
>1. `taskmanager.memory.network.batch-read` is bigger than
>`taskmanager.memory.framework.off-heap.size`
>2. Allocating the pool encounters the OOM.
>
>
> An alternative is that when the user adjusts the size of the memory pool,
> the system automatically adjusts it. But we are not entierly sure about
> this, given its implicity and complicating the memory configurations.
> Potential memory waste
>
> In the first step, the memory pool will not be released once allocated. This
> means in the first step, even if there is no subsequent batch job, the
> pooled memory cannot be used by other consumers.
>
>
> We are not releasing the pool in the first step due to the concern that
> frequently allocating/deallocating the entire pool may increase the GC
> pressue. Investitations on how to dynamically release the pool when it's no
> longer needed is considered a future follow-up.
>
>
> Looking forward to your feedback.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20740
>
> [2] https://github.com/apache/flink/pull/7368.
> Best,
> Guowei
>


[ANNOUCE][DISCUSS] Roadmap Update for Website

2021-03-02 Thread Stephan Ewen
Hi all!

The roadmap on the Flink website is quite outdated:
https://flink.apache.org/roadmap.html

I drafted an update to the roadmap that reflects the currently ongoing
bigger threads.
Not every detail is mentioned there, because this roadmap should give users
a high-level view where the project is going.

There is also a new "Feature Radar" to help users understand in which stage
of maturity and support individual features are.

Now, I am sure this will cause some discussion about which feature
should be on which level of maturity. This is my initial proposal which I
checked with some committers, but of course, there may be different
opinions. In that case, please bring this up in this discussion thread and
we find consensus together.

https://docs.google.com/document/d/1g6T72-8PHTsfhZq8GWWzG8F3PmD-dMuFSzg2AozXNxk/edit?usp=sharing

The raw figure for the feature radar is:
https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg

If someone has some graphics skills to make this look more pretty, help is
greatly welcome!
(Only request would be to keep a format that many people (and open tools)
can edit, so maintenance remains easy).

Looking forward to hearing what you think!

Best,
Stephan


Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend

2021-02-16 Thread Stephan Ewen
Thanks for clarifying.

Concerning the JM aborted checkpoints and state handles: I was thinking
about it the other day as well and was considering an approach like that:

The core idea is to move the cleanup from JM to TM. That solves two issues:

(1) The StateBackends / DSTL delete the artifacts themselves, meaning we
don't have to make assumptions about the state on the JM. That sounds too
fragile, with easy bugs as soon as some slight assumptions change (see also
bug with incr. checkpoint / savepoint data loss,
https://issues.apache.org/jira/browse/FLINK-21351)

(2) We do not need to clean up from one node. In the past, doing the
cleanup from one node (JM) has sometimes become a bottleneck.

To achieve that, we would need to extend the "notifyCheckpointComplete()"
RPC from the JM to the TM includes both the ID of the completed checkpoint,
and the ID of the earliest retained checkpoint. Then the TM can clean up
all artifacts from earlier checkpoints.

There are two open questions to that design:
(1) On restore, we need to communicate the state handles of the previous
checkpoints to the TM as well, so the TM gets again the full picture of all
state artifacts.
(2) On rescaling, we need to clarify which TM is responsible for releasing
a handle, if they are mapped to multiple TMs. Otherwise we get
double-delete calls. That isn't per se a problem, it is just a bit less
efficient.


Maybe we could think in that direction for the DSTL work?



On Mon, Feb 15, 2021 at 8:44 PM Roman Khachatryan  wrote:

> Thanks for your reply Stephan.
>
> Yes, there is overlap between FLIP-151 and FLIP-158 as both
> address incremental state updates. However, I think that FLIP-151 on top
> of FLIP-158 increases efficiency by:
>
> 1. "Squashing" the changes made to the same key. For example, if some
> counter was changed 10 times then FLIP-151 will send only the last value
> (this allows to send AND store less data compared to FLIP-158)
>
> 2. Keeping in memory only the changed keys and not the values.
> (this allows to reduce memory AND latency (caused by serialization +
> copying on every update) compared to FLIP-158)
>
> (1) can probably be implemented in FLIP-158, but not (2).
>
> I don't think there will be a lot of follow-up efforts and I hope
> @Dawid Wysakowicz , @pnowojski
>  , Yuan Mei and probably
> @Yu Li   will be able to join at different stages.
>
> Regarding using only the confirmed checkpoints, you are right: JM can
> abort non-confirmed checkpoints and discard the state. FLIP-158 has
> the same problem because StateChangelog produces StateHandles that
>  can be discarded by the JM. Currently, potentially discarded
> changes are re-uploaded in both FLIPs.
>
> In FLIP-158 (or follow-up), I planned to improve this part by:
> 1. Limiting max-concurrent-checkpoints to 1, and
> 2. Sending the last confirmed checkpoint ID in RPCs and barriers
> So at the time of checkpoint, backend knows exactly which changes can be
> included.
>
> Handling of removed keys is not related to the aborted checkpoints. They
> are
> needed on recovery to actually remove data from the previous snapshot.
> In FLIP-158 it is again similar: ChangelogStateBackend has to encode
> removal operations and send them to StateChangelog (though no additional
> data structure is required).
>
> Regards,
> Roman
>
>
> On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen  wrote:
>
> > Thanks, Roman for publishing this design.
> >
> > There seems to be quite a bit of overlap with FLIP-158 (generalized
> > incremental checkpoints).
> >
> > I would go with +1 to the effort if it is a pretty self-contained and
> > closed effort. Meaning we don't expect that this needs a ton of
> follow-ups,
> > other than common maintenance and small bug fixes. If we expect that this
> > requires a lot of follow-ups, then we end up splitting our work between
> > this FLIP and FLIP-158, which seems a bit inefficient.
> > What other committers would be involved to ensure the community can
> > maintain this?
> >
> >
> > The design looks fine, in general, with one question:
> >
> > When persisting changes, you persist all changes that have a newer
> version
> > than the latest one confirmed by the JM.
> >
> > Can you explain why it is like that exactly? Alternatively, you could
> keep
> > the latest checkpoint ID for which the state backend persisted the diff
> > successfully to the checkpoint storage, and created a state handle. For
> > each checkpoint, the state backend includes the state handles of all
> > involved chunks. That would be similar to the log-based approach in
> > FLIP-158.
> >
> > I have a suspicion that this is because the JM may have released t

Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend

2021-02-11 Thread Stephan Ewen
Thanks, Roman for publishing this design.

There seems to be quite a bit of overlap with FLIP-158 (generalized
incremental checkpoints).

I would go with +1 to the effort if it is a pretty self-contained and
closed effort. Meaning we don't expect that this needs a ton of follow-ups,
other than common maintenance and small bug fixes. If we expect that this
requires a lot of follow-ups, then we end up splitting our work between
this FLIP and FLIP-158, which seems a bit inefficient.
What other committers would be involved to ensure the community can
maintain this?


The design looks fine, in general, with one question:

When persisting changes, you persist all changes that have a newer version
than the latest one confirmed by the JM.

Can you explain why it is like that exactly? Alternatively, you could keep
the latest checkpoint ID for which the state backend persisted the diff
successfully to the checkpoint storage, and created a state handle. For
each checkpoint, the state backend includes the state handles of all
involved chunks. That would be similar to the log-based approach in
FLIP-158.

I have a suspicion that this is because the JM may have released the state
handle (and discarded the diff) for a checkpoint that succeeded on the task
but didn't succeed globally. So we cannot reference any state handle that
has been handed over to the JobManager, but is not yet confirmed.

This characteristic seems to be at the heart of much of the complexity,
also the handling of removed keys seems to be caused by that.
If we could change that assumption, the design would become simpler.

(Side note: I am wondering if this also impacts the FLIP-158 DSTL design.)

Best,
Stephan


On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Stefan,
>
> Thanks for your reply. Very interesting ideas!
> If I understand correctly, SharedStateRegistry will still be responsible
> for pruning the old state; for that, it will maintain some (ordered)
> mapping between StateMaps and their versions, per key group.
> I think one modification to this approach is needed to support journaling:
> for each entry, maintain a version when it was last fully snapshotted; and
> use this version to find the minimum as you described above.
> I'm considering a better state cleanup and optimization of removals as the
> next step. Anyway, I will add it to the FLIP document.
>
> Thanks!
>
> Regards,
> Roman
>
>
> On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter  >
> wrote:
>
> > Hi,
> >
> > Very happy to see that the incremental checkpoint idea is finally
> becoming
> > a reality for the heap backend! Overall the proposal looks pretty good to
> > me. Just wanted to point out one possible improvement from what I can
> still
> > remember from my ideas back then: I think you can avoid doing periodic
> full
> > snapshots for consolidation. Instead, my suggestion would be to track the
> > version numbers you encounter while you iterate a snapshot for writing
> it -
> > and then you should be able to prune all incremental snapshots that were
> > performed with a version number smaller than the minimum you find. To
> avoid
> > the problem of very old entries that never get modified you could start
> > spilling entries with a certain age-difference compared to the current
> map
> > version so that eventually all entries for an old version are re-written
> to
> > newer snapshots. You can track the version up to which this was done in
> the
> > map and then you can again let go of their corresponding snapshots after
> a
> > guaranteed time.So instead of having the burden of periodic large
> > snapshots, you can make every snapshot work a little bit on the cleanup
> and
> > if you are lucky it might happen mostly by itself if most entries are
> > frequently updated. I would also consider to make map clean a special
> event
> > in your log and consider unticking the versions on this event - this
> allows
> > you to let go of old snapshots and saves you from writing a log of
> > antimatter entries. Maybe the ideas are still useful to you.
> >
> > Best,
> > Stefan
> >
> > On 2020/11/04 01:54:25, Khachatryan Roman  wrote:
> > > Hi devs,>
> > >
> > > I'd like to start a discussion of FLIP-151: Incremental snapshots for>
> > > heap-based state backend [1]>
> > >
> > > Heap backend, while being limited state sizes fitting into memory, also
> > has>
> > > some advantages compared to RocksDB backend:>
> > > 1. Serialization once per checkpoint, not per state modification. This>
> > > allows to “squash” updates to the same keys>
> > > 2. Shorter synchronous phase (compared to RocksDB incremental)>
> > > 3. No need for sorting and compaction, no IO amplification and JNI
> > overhead>
> > > This can potentially give higher throughput and efficiency.>
> > >
> > > However, Heap backend currently lacks incremental checkpoints. This
> > FLIP>
> > > aims to add initial support for them.>
> > >
> > > [1]>
> > >
> >
> 

Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior

2021-02-03 Thread Stephan Ewen
Hi all!

A quick thought on this thread: We see a typical stalemate here, as in so
many discussions recently.
One developer prefers it this way, another one another way. Both have
pro/con arguments, it takes a lot of time from everyone, still there is
little progress in the discussion.

Ultimately, this can only be decided by talking to the users. And it
would also be the best way to ensure that what we build is the intuitive
and expected way for users.
The less the users are into the deep aspects of Flink SQL, the better they
can mirror what a common user would expect (a power user will anyways
figure it out).
Let's find a person to drive that, spell it out in the FLIP as "semantics
TBD", and focus on the implementation of the parts that are agreed upon.

For interviewing the users, here are some ideas for questions to look at:
  - How do they view the trade-off between stable semantics vs.
out-of-the-box magic (faster getting started).
  - How comfortable are they realizing the different meaning of "now()" in
a streaming versus batch context.
  - What would be their expectation when moving a query with the time
functions ("now()") from an unbounded stream (Kafka source without end
offset) to a bounded stream (Kafka source with end offsets), which may
switch execution to batch.

Best,
Stephan


On Tue, Feb 2, 2021 at 3:19 PM Jark Wu  wrote:

> Hi Fabian,
>
> I think we have an agreement that the functions should be evaluated at
> query start in batch mode.
> Because all the other batch systems and traditional databases are this
> behavior, which is standard SQL compliant.
>
> *1. The different point of view is what's the behavior in streaming mode? *
>
> From my point of view, I don't see any potential meaning to evaluate at
> query-start for a 365-day long running streaming job.
> And from my observation, CURRENT_TIMESTAMP is heavily used by Flink
> streaming users and they expect the current behaviors.
> The SQL standard only provides a guideline for traditional batch systems,
> however Flink is a leading streaming processing system
> which is out of the scope of SQL standard, and Flink should define the
> streaming standard. I think a standard should follow users' intuition.
> Therefore, I think we don't need to be standard SQL compliant at this point
> because users don't expect it.
> Changing the behavior of the functions to evaluate at query start for
> streaming mode will hurt most of Flink SQL users and we have nothing to
> gain,
> we should avoid this.
>
> *2. Does it break the unified streaming-batch semantics? *
>
> I don't think so. First of all, what's the unified streaming-batch
> semantic?
> I think it means the* eventual result* instead of the *behavior*.
> It's hard to say we have provided unified behavior for streaming and batch
> jobs,
> because for example unbounded aggregate behaves very differently.
> In batch mode, it only evaluates once for the bounded data and emits the
> aggregate result once.
>  But in streaming mode, it evaluates for each row and emits the updated
> result.
> What we have always emphasized "unified streaming-batch semantics" is [1]
>
> > a query produces exactly the same result regardless whether its input is
> static batch data or streaming data.
>
> From my understanding, the "semantic" means the "eventual result".
> And time functions are non-deterministic, so it's reasonable to get
> different results for batch and streaming mode.
> Therefore, I think it doesn't break the unified streaming-batch semantics
> to evaluate per-record for streaming and
> query-start for batch, as the semantic doesn't means behavior semantic.
>
> Best,
> Jark
>
> [1]: https://flink.apache.org/news/2017/04/04/dynamic-tables.html
>
> On Tue, 2 Feb 2021 at 18:34, Fabian Hueske  wrote:
>
> > Hi everyone,
> >
> > Sorry for joining this discussion late.
> > Let me give some thought to two of the arguments raised in this thread.
> >
> > Time functions are inherently non-determintistic:
> > --
> > This is of course true, but IMO it doesn't mean that the semantics of
> time
> > functions do not matter.
> > It makes a difference whether a function is evaluated once and it's
> result
> > is reused or whether it is invoked for every record.
> > Would you use the same logic to justify different behavior of RAND() in
> > batch and streaming queries?
> >
> > Provide the semantics that most users expect:
> > --
> > I don't think it is clear what most users expect, esp. if we also include
> > future users (which we certainly want to gain) into this assessment.
> > Our current users got used to the semantics that we introduced. So I
> > wouldn't be surprised if they would say stick with the current semantics.
> > However, we are also claiming standard SQL compliance and stress the goal
> > of batch-stream unification.
> > So I would assume that new SQL users expect standard compliant behavior
> for
> > batch and streaming queries.
> >
> >
> > IMO, we should try hard to stick to our goals of 1) 

Re: [DISCUSS] FLIP-158: Generalized incremental checkpoints

2021-01-28 Thread Stephan Ewen
+1 to this FLIP in general.

I like the general idea very much (full disclosure, have been involved in
the discussions and drafting of the design for a while, so I am not a
new/neutral reviewer here).

One thing I would like to see us do here, is reaching out to users early
with this, and validating this approach. It is a very fundamental change
that also shifts certain tradeoffs, like "cost during execution" vs. "cost
on recovery". This approach will increase the data write rate to
S3/HDFS/...
So before we build every bit of the complex implementation, let's try and
validate/test the critical bits with the users.

In my assessment, the most critical bit is the continuous log writing,
which adds overhead during execution time. Recovery is less critical,
there'll be no overhead or additional load, so recovery should be strictly
better than currently.
I would propose we hence focus on the implementation of the logging first
(ignoring recovery, focusing on one target FileSystem/Object store) and
test run this with a few users, see that it works well and whether they
like the new characteristics.

I am also trying to contribute some adjustments to the FLIP text, like more
illustrations/explanations, to make it easier to share this FLIP with a
wider audience, so we can get the above-mentioned user input and validation.

Best,
Stephan




On Thu, Jan 28, 2021 at 10:46 AM Piotr Nowojski 
wrote:

> Hi Roman,
>
> +1 from my side on this proposal. Also big +1 for the recent changes in
> this FLIP in the motivation and high level overview sections.
>
> For me there are quite a bit of unanswered things around how to actually
> implement the proposed changes and especially how to integrate it with the
> state backends and checkpointing, but maybe we can do that in either a
> follow up design docs or discuss it in the tickets or even maybe some PoC.
>
> Piotrek
>
> pt., 15 sty 2021 o 07:49 Khachatryan Roman 
> napisał(a):
>
> > Hi devs,
> >
> > I'd like to start a discussion of FLIP-158: Generalized incremental
> > checkpoints [1]
> >
> > FLIP motivation:
> > Low end-to-end latency is a much-demanded property in many Flink setups.
> > With exactly-once, this latency depends on checkpoint interval/duration
> > which in turn is defined by the slowest node (usually the one doing a
> full
> > non-incremental snapshot). In large setups with many nodes, the
> probability
> > of at least one node being slow gets higher, making almost every
> checkpoint
> > slow.
> >
> > This FLIP proposes a mechanism to deal with this by materializing and
> > uploading state continuously and only uploading the changed part during
> the
> > checkpoint itself. It differs from other approaches in that 1)
> checkpoints
> > are always incremental; 2) works for any state backend.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints
> >
> > Any feedback highly appreciated!
> >
> > Regards,
> > Roman
> >
>


Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements

2021-01-18 Thread Stephan Ewen
Thanks a lot, Yangze and Xintong for this FLIP.

I want to say, first of all, that this is super well written. And the
points that the FLIP makes about how to expose the configuration to users
is exactly the right thing to figure out first.
So good job here!

About how to let users specify the resource profiles. If I can sum the FLIP
and previous discussion up in my own words, the problem is the following:

Operator-level specification is the simplest and cleanest approach, because
> it avoids mixing operator configuration (resource) and scheduling. No
> matter what other parameters change (chaining, slot sharing, switching
> pipelined and blocking shuffles), the resource profiles stay the same.
> But it would require that a user specifies resources on all operators,
> which makes it hard to use. That's why the FLIP suggests going with
> specifying resources on a Sharing-Group.


I think both thoughts are important, so can we find a solution where the
Resource Profiles are specified on an Operator, but we still avoid that we
need to specify a resource profile on every operator?

What do you think about something like the following:
  - Resource Profiles are specified on an operator level.
  - Not all operators need profiles
  - All Operators without a Resource Profile ended up in the default slot
sharing group with a default profile (will get a default slot).
  - All Operators with a Resource Profile will go into another slot sharing
group (the resource-specified-group).
  - Users can define different slot sharing groups for operators like they
do now, with the exception that you cannot mix operators that have a
resource profile and operators that have no resource profile.
  - The default case where no operator has a resource profile is just a
special case of this model
  - The chaining logic sums up the profiles per operator, like it does now,
and the scheduler sums up the profiles of the tasks that it schedules
together.


There is another question about reactive scaling raised in the FLIP. I need
to think a bit about that. That is indeed a bit more tricky once we have
slots of different sizes.
It is not clear then which of the different slot requests the
ResourceManager should fulfill when new resources (TMs) show up, or how the
JobManager redistributes the slots resources when resources (TMs) disappear
This question is pretty orthogonal, though, to the "how to specify the
resources".


Best,
Stephan

On Fri, Jan 8, 2021 at 5:14 AM Xintong Song  wrote:

> Thanks for drafting the FLIP and driving the discussion, Yangze.
> And Thanks for the feedback, Till and Chesnay.
>
> @Till,
>
> I agree that specifying requirements for SSGs means that SSGs need to be
> supported in fine-grained resource management, otherwise each operator
> might use as many resources as the whole group. However, I cannot think of
> a strong reason for not supporting SSGs in fine-grained resource
> management.
>
>
> > Interestingly, if all operators have their resources properly specified,
> > then slot sharing is no longer needed because Flink could slice off the
> > appropriately sized slots for every Task individually.
> >
>
> So for example, if we have a job consisting of two operator op_1 and op_2
> > where each op needs 100 MB of memory, we would then say that the slot
> > sharing group needs 200 MB of memory to run. If we have a cluster with 2
> > TMs with one slot of 100 MB each, then the system cannot run this job. If
> > the resources were specified on an operator level, then the system could
> > still make the decision to deploy op_1 to TM_1 and op_2 to TM_2.
>
>
> Couldn't agree more that if all operators' requirements are properly
> specified, slot sharing should be no longer needed. I think this exactly
> disproves the example. If we already know op_1 and op_2 each needs 100 MB
> of memory, why would we put them in the same group? If they are in separate
> groups, with the proposed approach the system can freely deploy them to
> either a 200 MB TM or two 100 MB TMs.
>
> Moreover, the precondition for not needing slot sharing is having resource
> requirements properly specified for all operators. This is not always
> possible, and usually requires tremendous efforts. One of the benefits for
> SSG-based requirements is that it allows the user to freely decide the
> granularity, thus efforts they want to pay. I would consider SSG in
> fine-grained resource management as a group of operators that the user
> would like to specify the total resource for. There can be only one group
> in the job, 2~3 groups dividing the job into a few major parts, or as many
> groups as the number of tasks/operators, depending on how fine-grained the
> user is able to specify the resources.
>
> Having to support SSGs might be a constraint. But given that all the
> current scheduler implementations already support SSGs, I tend to think
> that as an acceptable price for the above discussed usability and
> flexibility.
>
> @Chesnay
>
> Will 

Re: [DISCUSS] FLIP-157 Migrate Flink Documentation from Jekyll to Hugo

2021-01-17 Thread Stephan Ewen
+1 for this FLIP.

I tried it out locally, and it works well.
Aside from the fact build times, I really like the search feature, which
works pretty well.

Thanks a lot for the initiative, Seth!


On Sun, Jan 17, 2021 at 4:43 PM Jark Wu  wrote:

> I have tried in my local env, and the build time is really fast.
> Looking forward Hugo can help to better cooperate with the translation
> work.
>
> +1 to start a vote.
>
> best,
> Jark
>
> On Fri, 15 Jan 2021 at 23:02, Seth Wiesman  wrote:
>
> > Great, if there aren't any other concerns I will open this up for a vote
> on
> > Monday.
> >
> > Seth
> >
> > On Thu, Jan 14, 2021 at 9:03 AM Seth Wiesman 
> wrote:
> >
> > > Happy to see there is enthusiasm for this change, let me try and
> answers
> > > each of these questions.
> > >
> > > @Jark Wu  Hugo has proper support for i18n which
> means
> > > we can move content into external files that can be easily
> translated[1].
> > > For reference, Kubernetes has successfully used Hugo's built-in
> features
> > to
> > > maintain 14 different languages[2]. Additionally, Hugo's md files are
> > > standard markdown which could allow us to integrate with other tooling.
> > For
> > > example, we may look into using Crowdin for managing translations as
> the
> > > pulsar community does.
> > >
> > > @Till Rohrmann  None that I have found. In the
> > > proof of concept, I have already implemented all the Jekyll
> functionality
> > > we are using in the docs[4]. I have found Hugo shortcodes to be a more
> > > flexible alternative to liquid tags.
> > >
> > > @Chesnay Schepler  Not yet, I do not have access
> to
> > > the build bot (it is PMC only). I will work with INFRA to get Hugo
> > > installed if it is not already and Robert has agreed to set-up the
> build
> > > script on the build bot itself.
> > >
> > > Seth
> > >
> > > [1] https://gohugo.io/functions/i18n/
> > > [2] https://github.com/kubernetes/website/
> > > [3] https://github.com/apache/pulsar-translation
> > > [4]
> > >
> >
> https://github.com/sjwiesman/flink-docs-v2/tree/master/layouts/shortcodes
> > >
> > >
> > >
> > > On Thu, Jan 14, 2021 at 7:03 AM David Anderson 
> > > wrote:
> > >
> > >> I've spent a few hours digging into this with Seth, and can report
> that
> > >> this makes working on the docs much less of a chore.
> > >>
> > >> +1 (with enthusiasm)
> > >>
> > >> Best,
> > >> David
> > >>
> > >> On Thu, Jan 14, 2021 at 1:34 PM Kostas Kloudas 
> > >> wrote:
> > >>
> > >> > +1 for moving to Hugo.
> > >> >
> > >> > Cheers,
> > >> > Kostas
> > >> >
> > >> > On Thu, Jan 14, 2021 at 1:27 PM Wei Zhong 
> > >> wrote:
> > >> > >
> > >> > > +1 for migrating to Hugo.
> > >> > >
> > >> > > Currently we have developed many plugins based on Jekyll because
> the
> > >> > native features of Jekyll cannot meet our needs. It seems all of
> them
> > >> can
> > >> > be supported via Hugo shortcodes and will become more concise.
> > >> > >
> > >> > > Best,
> > >> > > Wei
> > >> > >
> > >> > > > 在 2021年1月14日,18:21,Aljoscha Krettek  写道:
> > >> > > >
> > >> > > > +1
> > >> > > >
> > >> > > > The build times on Jekyll have just become to annoying for me. I
> > >> > realize that that is also a function of how we structure our
> > >> documentation,
> > >> > and especially how we construct the nav sidebar, but I think overall
> > >> moving
> > >> > to Hugo is still a benefit.
> > >> > > >
> > >> > > > Aljoscha
> > >> > > >
> > >> > > > On 2021/01/13 10:14, Seth Wiesman wrote:
> > >> > > >> Hi All,
> > >> > > >>
> > >> > > >> I would like to start a discussion for FLIP-157: Migrating the
> > >> Flink
> > >> > docs
> > >> > > >> from Jekyll to Hugo.
> > >> > > >>
> > >> > > >> This will allow us:
> > >> > > >>
> > >> > > >>  - Proper internationalization
> > >> > > >>  - Working Search
> > >> > > >>  - Sub-second build time ;)
> > >> > > >>
> > >> > > >> Please take a look and let me know what you think.
> > >> > > >>
> > >> > > >> Seth
> > >> > > >>
> > >> > > >>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-157+Migrate+Flink+Documentation+from+Jekyll+to+Hugo
> > >> > >
> > >> >
> > >>
> > >
> >
>


[jira] [Created] (FLINK-20472) Change quickstarts to have a dependency on "flink-dist"

2020-12-03 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20472:


 Summary: Change quickstarts to have a dependency on "flink-dist"
 Key: FLINK-20472
 URL: https://issues.apache.org/jira/browse/FLINK-20472
 Project: Flink
  Issue Type: Improvement
  Components: Quickstarts
Reporter: Stephan Ewen


I suggest we change the quickstarts to have the following dependencies.
  - {{flink-dist}} (provided)
  - log4j (runtime)

That way the projects created form quickstarts have exactly the same 
dependencies as what the distribution provides. That solves all our mismatches 
between needing different dependencies for compiling/running-in-IDE and 
packaging for deployment.

For example, we can add the {{flink-connector-base}} and 
{{flink-connector-files}} back to {{flink-dist}} without having any issues with 
setting up dependencies in the user projects.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20413) Sources should add splits back in "resetSubtask()", rather than in "subtaskFailed()".

2020-11-29 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20413:


 Summary: Sources should add splits back in "resetSubtask()", 
rather than in "subtaskFailed()".
 Key: FLINK-20413
 URL: https://issues.apache.org/jira/browse/FLINK-20413
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
        Reporter: Stephan Ewen
        Assignee: Stephan Ewen
 Fix For: 1.12.0


Because "subtaskFailed()" has no strong order guarantees with checkpoint 
completion, we need to return failed splits in "resetSubtask()" instead.

See FLINK-20396 for a detailed explanation of the race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20412) Collect Result Fetching occasionally fails after a JobManager Failover

2020-11-29 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20412:


 Summary: Collect Result Fetching occasionally fails after a 
JobManager Failover
 Key: FLINK-20412
 URL: https://issues.apache.org/jira/browse/FLINK-20412
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Stephan Ewen


The encountered exception is as blow.

 

The issue can be reproduced by running a test with JobManager failover in a 
tight loop, for example the FileTextLinesITCase from this PR: 
[https://github.com/apache/flink/pull/14199]

 
{code:java}
15335 [main] WARN  
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher - An 
exception occurs when fetching query results
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163)
 ~[classes/:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:134)
 [classes/:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
 [classes/:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
 [classes/:?]
at 
org.apache.flink.streaming.api.datastream.DataStreamUtils.collectRecordsFromUnboundedStream(DataStreamUtils.java:142)
 [classes/:?]
at 
org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSource(FileSourceTextLinesITCase.java:272)
 [test-classes/:?]
at 
org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSourceWithJobManagerFailover(FileSourceTextLinesITCase.java:228)
 [test-classes/:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 [junit-4.12.jar:4.12]
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
[junit-4.12.jar:4.12]
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) 
[junit-4.12.jar:4.12]
at org.junit.rules.RunRules.evaluate(RunRules.java:20) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
[junit-4.12.jar:4.12]
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 [junit-4.12.jar:4.12]
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
[junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
[junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
[junit-4.12.jar:4.12]
at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) 
[junit-4.12.jar:4.12]
at org.junit.rules.RunRules.evaluate(RunRules.java:20) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
[junit-4.12.jar:4.12]
at org.junit.runner.JUnitCore.run(JUnitCore.java:137) 
[junit-4.12.jar:4.12]
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 [junit-rt.jar:?]
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:67)
 [junit-rt.jar

[jira] [Created] (FLINK-20406) Return the Checkpoint ID of the restored Checkpoint in CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks()

2020-11-27 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20406:


 Summary: Return the Checkpoint ID of the restored Checkpoint in 
CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks()
 Key: FLINK-20406
 URL: https://issues.apache.org/jira/browse/FLINK-20406
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


To allow the scheduler to notify Operator Coordinators of subtask restores 
(local failover), we need to know which checkpoint ID was restored. 

This change does not adjust the other restore methods of the Checkpoint 
Coordinator, because the fact that the Scheduler needs to be involved in the 
subtask restore notification at all is only due to a shortcoming of the 
Checkpoint Coordinator: The CC is not aware of subtask restores, it always 
restores all subtasks and relies on the fact that assigning state to a running 
execution attempt has no effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20397) Pass checkpointId to OperatorCoordinator.resetToCheckpoint().

2020-11-27 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20397:


 Summary: Pass checkpointId to 
OperatorCoordinator.resetToCheckpoint().
 Key: FLINK-20397
 URL: https://issues.apache.org/jira/browse/FLINK-20397
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0, 1.11.3


The OperatorCoordinator.resetToCheckpoint() currently lacks the information 
which checkpoint it recovers to.

That forces implementers to assume strict ordering of method calls between 
restore and failure. While that is currently guaranteed in this case, it is not 
guaranteed in other places (see parent issue).

Because of that, we want implementations to not assume method order at all, but 
rely on explicit information passed to the methods (checkpoint IDs). Otherwise 
we end up with mixed implementations that partially infer context from the 
order of method calls, and partially use explicit information that was passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20396) Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()"

2020-11-27 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20396:


 Summary: Replace "OperatorCoordinator.subtaskFailed()" with 
"subtaskRestored()"
 Key: FLINK-20396
 URL: https://issues.apache.org/jira/browse/FLINK-20396
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.11.2
        Reporter: Stephan Ewen
        Assignee: Stephan Ewen
 Fix For: 1.12.0, 1.11.3


There are no strong order guarantees between 
{{OperatorCoordinator.subtaskFailed()}} and 
{{OperatorCoordinator.notifyCheckpointComplete()}}.

It can happen that a checkpoint completes after the notification for task 
failure is sent:
  - {{OperatorCoordinator.checkpoint()}}
  - {{OperatorCoordinator.subtaskFailed()}}
  - {{OperatorCoordinator.checkpointComplete()}}

The subtask failure here does not know whether the previous checkpoint 
completed or not. It cannot decide what state the subtask will be in after 
recovery.
There is no easy fix right now to strictly guarantee the order of the method 
calls, so alternatively we need to provide the necessary information to reason 
about the status of tasks.

We should replace {{OperatorCoordinator.subtaskFailed(int subtask)}} with 
{{OperatorCoordinator.subtaskRestored(int subtask, long checkpoint)}}. That 
implementations get the explicit checkpoint ID for the subtask recovery, and 
can align that with the IDs of checkpoints that were taken.

It is still (in rare cases) possible that for a specific checkpoint C, 
{{OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before 
{{OperatorCoordinator.checkpointComplete(C)}}.


h3. Background

The Checkpointing Procedure is partially asynchronous on the {{JobManager}} / 
{{CheckpointCoordinator}}: After all subtasks acknowledged the checkpoint, the 
finalization (writing out metadata and registering the checkpoint in ZooKeeper) 
happens in an I/O thread, and the checkpoint completes after that.

This sequence of events can happen:
  - tasks acks checkpoint
  - checkpoint fully acknowledged, finalization starts
  - task fails
  - task failure notification is dispatched
  - checkpoint completes.

For task failures and checkpoint completion, no order is defined.

However, for task restore and checkpoint completion, the order is well defined: 
When a task is restored, pending checkpoints are either canceled or complete. 
None can be within finalization. That is currently guaranteed with a lock in 
the {{CheckpointCoordinator}}.
(An implication of that being that restores can be blocking operations in the 
scheduler, which is not ideal from the perspective of making the scheduler 
async/non-blocking, but it is currently essential for correctness).




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20379) New Kafka Connector does not support DeserializationSchema

2020-11-26 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20379:


 Summary: New Kafka Connector does not support DeserializationSchema
 Key: FLINK-20379
 URL: https://issues.apache.org/jira/browse/FLINK-20379
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Stephan Ewen
 Fix For: 1.12.0


The new Kafka Connector defines its own deserialization schema and is 
incompatible with the existing library of deserializers.

That means that users cannot use all of Flink's Formats (Avro, JSON, Csv, 
Protobuf, Confluent Schema Registry, ...) with the new Kafka Connector.

I think we should change the new Kafka Connector to use the existing 
Deserialization classes, so all formats can be used, and users can reuse their 
deserializer implementations.

It would also be good to use the existing KafkaDeserializationSchema. Otherwise 
all users need to migrate their sources again.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20276) Transparent DeCompression of streams missing on new File Source

2020-11-22 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20276:


 Summary: Transparent DeCompression of streams missing on new File 
Source
 Key: FLINK-20276
 URL: https://issues.apache.org/jira/browse/FLINK-20276
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The existing {{FileInputFormat}} applies decompression (gzip, xy, ...) 
automatically on the file input stream, based on the file extension.

We need to add similar functionality for the {{StreamRecordFormat}} of the new 
FileSource to be on par with this functionality.

This can be easily applied in the {{StreamFormatAdapter}} when opening the file 
stream.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20188) Add Documentation for new File Source

2020-11-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20188:


 Summary: Add Documentation for new File Source
 Key: FLINK-20188
 URL: https://issues.apache.org/jira/browse/FLINK-20188
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Apache Flink Stateful Functions 2.2.1, release candidate #1

2020-11-10 Thread Stephan Ewen
+1 (binding)

Compiled source and ran all tests (JDK 8)
Ran example (greeter) with a docker compose setup
Release contains no binaries.
Checked LICENSE and NOTICE for source release
Checked LICENSE and NOTICE for flink-statefun-distribution binary release


On Fri, Nov 6, 2020 at 11:46 PM Igal Shilman  wrote:

> +1 (non binding)
>
> Legal:
> - Checked signatures
> - Checked that no binaries were included in source artifacts
>
> Build
> - Built source `mvn clean install -Prun-e2e-tests` with jdk8
> - Built source mvn clean install -Prun-e2e-tests' with jdk11
> - Built the python SDK
> - Built the Docker Image
>
> Functional:
> - Verified that FLINK-19692 can no longer be reproduced.
> - Run a randomized smoke test ("random clicker"), with a high volume
> producer, and state verification in the presence of task manager failures.
> This program run successfully on this RC, and failed (as expected) on the
> previous version.
>
> Thanks,
> Igal.
>
> On Fri, Nov 6, 2020 at 9:48 AM Tzu-Li (Gordon) Tai 
> wrote:
>
> > +1 (binding)
> >
> > Legal:
> > - Checked signatures
> > - Checked that no binaries were included in source artifacts
> > - Build source `mvn clean install -Prun-e2e-tests`
> > - No dependency changes since 2.2.0, so assuming that NOTICE files are
> > correct (require no changes)
> >
> > Functional:
> > - Verified that FLINK-19692 can no longer be reproduced with a StateFun
> app
> > that deterministically floods the feedback loop with events. Run with a
> > docker-compose setup with parallelism=4, checkpointing to S3, verified
> for
> > all state backends.
> > - Checked that state bootstrap API by running the state bootstrap
> example,
> > and restoring the greeter example from the generated savepoint.
> >
> > On Thu, Nov 5, 2020 at 3:24 AM Robert Metzger 
> wrote:
> >
> > > +1 binding
> > >
> > > Checks:
> > > - source sha and signature
> > > - mvn clean install passed on source archive
> > > - staging artifacts have correct version, also in quickstart, NOTICE
> > files
> > > seem correct
> > >
> > >
> > >
> > > On Wed, Nov 4, 2020 at 11:20 AM Tzu-Li (Gordon) Tai <
> tzuli...@apache.org
> > >
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Please review and vote on the release candidate #1 for the version
> > 2.2.1
> > > of
> > > > Apache Flink Stateful Functions,
> > > > as follows:
> > > > [ ] +1, Approve the release
> > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > >
> > > > ***Testing Guideline***
> > > >
> > > > You can find here [1] a page in the project wiki on instructions for
> > > > testing.
> > > > To cast a vote, it is not necessary to perform all listed checks,
> > > > but please mention which checks you have performed when voting.
> > > >
> > > > ***Release Overview***
> > > >
> > > > As an overview, the release consists of the following:
> > > > a) Stateful Functions canonical source distribution, to be deployed
> to
> > > the
> > > > release repository at dist.apache.org
> > > > b) Stateful Functions Python SDK distributions to be deployed to PyPI
> > > > c) Maven artifacts to be deployed to the Maven Central Repository
> > > > d) New Dockerfiles for the release
> > > > e) Release announcement to be published to the Flink website
> > > >
> > > > ***Staging Areas to Review***
> > > >
> > > > The staging areas containing the above mentioned artifacts are as
> > > follows,
> > > > for your review:
> > > > * All artifacts for a) and b) can be found in the corresponding dev
> > > > repository at dist.apache.org [2]
> > > > * All artifacts for c) can be found at the Apache Nexus Repository
> [3]
> > > >
> > > > All artifacts are signed with the key
> > > > 1C1E2394D3194E1944613488F320986D35C33D6A [4]
> > > >
> > > > Other links for your review:
> > > > * JIRA release notes [5]
> > > > * source code tag "release-2.2.1-rc1" [6]
> > > > * PR for the new Dockerfiles [7]
> > > > * PR for the release announcement blog post [8]
> > > >
> > > > ***Vote Duration***
> > > >
> > > > The voting time will run for 72 hours, lasting until Nov. 7th, 10am
> > CET.
> > > > It is adopted by majority approval, with at least 3 PMC affirmative
> > > votes.
> > > >
> > > > Thanks,
> > > > Gordon
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release
> > > > [2]
> > > https://dist.apache.org/repos/dist/dev/flink/flink-statefun-2.2.1-rc1/
> > > > [3] https://repository.apache.org/content/repositories/org apache
> > > > flink-1401/
> > > > <
> > https://repository.apache.org/content/repositories/orgapacheflink-1401/
> > > >
> > > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > [5]
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349291
> > > > [6]
> > > >
> > > >
> > >
> >
> https://gitbox.apache.org/repos/asf?p=flink-statefun.git;a=commit;h=3d53612fccfe143acbdd1686a942b29c96c6bbd8
> > > > [7] 

[jira] [Created] (FLINK-20063) File Source requests an additional split on every restore.

2020-11-09 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20063:


 Summary: File Source requests an additional split on every restore.
 Key: FLINK-20063
 URL: https://issues.apache.org/jira/browse/FLINK-20063
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Currently, the {{FileSourceReader}} requests a new split when started. That 
includes cases when it was restored from a checkpoint.

So with every restore, the reader increases its split backlog size by one, 
causing problems for balanced split assignments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20049) Simplify handling of "request split".

2020-11-08 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20049:


 Summary: Simplify handling of "request split".
 Key: FLINK-20049
 URL: https://issues.apache.org/jira/browse/FLINK-20049
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


This issue is similar to FLINK-19265

Split assignment events are treated specially by the source API. Users do not 
create them directly but call methods on the contexts to assign splits.

The {{RequestSplitEvent}} is in contrast to that a custom user event and needs 
to be handled like a custom event, when sent by enumerators and received by the 
readers.

That seems a bit confusing and inconsistent, given that {{RequestSplitEvent}} 
is essential for all use cases with pull-based split assignment, which is 
pretty much any batch use case and various streaming use cases. The event 
should be on the same level as the AddSplitEvent.

I suggest that we add a {{SourceReaderContext.requestSplit()}} and 
{{SplitEnumerator.handleSplitRequest()}}, to have split requests and responses 
symmetrical.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   8   9   10   >