Re: [VOTE] Release 1.11.6/1.12.7/1.13.5/1.14.2, release candidate #1
+1 (binding) - Verified commit history, looks good => stumbled over the changes in the "create_release_branch.sh ", which are present in each release commit. [1] => agree that these are not an issue, because this is an out-of-band release - Release notes for 1.14.2 are off, contain incorrect entry "FLINK-25222: Remove NetworkFailureProxy used for Kafka connector tests" - Checked that released binaries and jars reference correct Scala versions - Ran streaming examples against binary releases for 1.12.7, 1.13.5, 1.14.2. Execution logs look correct. - Other checks (licenses, no binaries) carry over from previous releases [1] https://github.com/apache/flink/commit/6fd4b1c0ef2ddd12751889218445ce0e60ff6c80#diff-94c70ce1a0abddcd83314c83b46080d8edbcd919b737f316cd6f72006d464074 On Wed, Dec 15, 2021 at 5:54 PM Seth Wiesman wrote: > +1 (non-binding) > > - Checked diff of all versions and verified dep upgrade > - Verified checksum and signatures > - Built 1.14 from source > - checked blog post > > Seth > > On Wed, Dec 15, 2021 at 10:22 AM Yu Li wrote: > > > +1 > > > > * Verified checksums and signatures > > * Reviewed website PR > >- Minor: left a comment to mention CVE-2021-45046 > > * Checked and confirmed new tags only contain log4j version bump > > * Checked release notes and found no issues > >- I've moved FLINK-25317 to 1.14.3 > > > > Thanks for driving these releases Chesnay! > > > > Best Regards, > > Yu > > > > > > On Wed, 15 Dec 2021 at 21:29, Chesnay Schepler > wrote: > > > > > FYI; the publication of the python release for 1.11/1.12 will be > delayed > > > because we hit the project size limit on pypi again, and increasing > that > > > limit may take a while. > > > On the positive side, this gives us more time to fix the mac builds. > > > > > > On 15/12/2021 03:55, Chesnay Schepler wrote: > > > > Hi everyone, > > > > > > > > This vote is for the emergency patch releases for 1.11, 1.12, 1.13 > and > > > > 1.14 to address CVE-2021-44228/CVE-2021-45046. > > > > It covers all 4 releases as they contain the same changes (upgrading > > > > Log4j to 2.16.0) and were prepared simultaneously by the same person. > > > > (Hence, if something is broken, it likely applies to all releases) > > > > > > > > Note: 1.11/1.12 are still missing the Python Mac releases. > > > > > > > > > > > > Please review and vote on the release candidate #1 for the versions > > > > 1.11.6, 1.12.7, 1.13.5 and 1.14.2, as follows: > > > > [ ] +1, Approve the releases > > > > [ ] -1, Do not approve the releases (please provide specific > comments) > > > > > > > > The complete staging area is available for your review, which > includes: > > > > * JIRA release notes [1], > > > > * the official Apache source releases and binary convenience releases > > > > to be deployed to dist.apache.org [2], which are signed with the key > > > > with fingerprint C2EED7B111D464BA [3], > > > > * all artifacts to be deployed to the Maven Central Repository [4], > > > > * source code tags [5], > > > > * website pull request listing the new releases and adding > > > > announcement blog post [6]. > > > > > > > > The vote will be open for at least 24 hours. The minimum vote time > has > > > > been shortened as the changes are minimal and the matter is urgent. > > > > It is adopted by majority approval, with at least 3 PMC affirmative > > > > votes. > > > > > > > > Thanks, > > > > Chesnay > > > > > > > > [1] > > > > 1.11: > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351056 > > > > 1.12: > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351057 > > > > 1.13: > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351058 > > > > 1.14: > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351059 > > > > [2] > > > > 1.11: https://dist.apache.org/repos/dist/dev/flink/flink-1.11.6-rc1/ > > > > 1.12: https://dist.apache.org/repos/dist/dev/flink/flink-1.12.7-rc1/ > > > > 1.13: https://dist.apache.org/repos/dist/dev/flink/flink-1.13.5-rc1/ > > > > 1.14: https://dist.apache.org/repos/dist/dev/flink/flink-1.14.2-rc1/ > > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > > > [4] > > > > 1.11: > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1460 > > > > 1.12: > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1462 > > > > 1.13: > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1459 > > > > 1.14: > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1461 > > > > [5] > > > > 1.11: > https://github.com/apache/flink/releases/tag/release-1.11.6-rc1 > > > > 1.12: > https://github.com/apache/flink/releases/tag/release-1.12.7-rc1 > > > > 1.13: > https://github.com/apache/flink/releases/tag/release-1.13.5-rc1 > > > > 1.14: >
Re: [CANCELLED] Release 1.11.5/1.12.6/1.13.4/1.14.1, release candidate #1
That's right, they are referenced in POMs published with the jars, though. But that's minor. On Wed, Dec 15, 2021 at 12:28 PM Chesnay Schepler wrote: > AFAIK none of the jars we publish actually contains log4j. > It's only bundled by the distribution/python binaries/docker images. > > Hence I don't think the jars help in this case. > > On 15/12/2021 10:42, Stephan Ewen wrote: > > Given that these artifacts are published already, users can use them if > > they want to update now: > > > > For example: > > https://search.maven.org/artifact/org.apache.flink/flink-core/1.14.1/jar > > > > Just for the users that really want to update now (rather than rely on > the > > mitigation via config) and are not as much concerned about the remaining > > weakness in log4j 2.15.0 > > > > On Tue, Dec 14, 2021 at 11:18 PM Seth Wiesman > wrote: > > > >> Thank you for managing these updates Chesnay! > >> > >> > >> > >> On Tue, Dec 14, 2021 at 3:51 PM Chesnay Schepler > >> wrote: > >> > >>> Since the maven artifacts have already been published we will use the > >>> next patch version for each release, i.e.: > >>> 1.11.6 > >>> 1.12.7 > >>> 1.13.5 > >>> 1.14.2 > >>> > >>> (We could technically just update the source/binaries, but that seems > >>> fishy). > >>> > >>> On 14/12/2021 22:38, Chesnay Schepler wrote: > >>>> I'm canceling the release because the issue was not fully fixed in > >>>> Log4j 2.15.0; see CVE-2021-45046. > >>>> > >>>> I will start preparing new release candidates that use Log4j 2.16.0 . > >>>> > >>>> On 14/12/2021 21:28, Chesnay Schepler wrote: > >>>>> The vote duration has passed and we have approved the releases. > >>>>> > >>>>> Binding votes: > >>>>> * Stephan > >>>>> * Till > >>>>> * Xintong > >>>>> * Zhu > >>>>> * Gordon > >>>>> > >>>>> I will not finalize the release. > >>>>> > >>>>> On 13/12/2021 20:28, Chesnay Schepler wrote: > >>>>>> Hi everyone, > >>>>>> > >>>>>> This vote is for the emergency patch releases for 1.11, 1.12, 1.13 > >>>>>> and 1.14 to address CVE-2021-44228. > >>>>>> It covers all 4 releases as they contain the same changes (upgrading > >>>>>> Log4j to 2.15.0) and were prepared simultaneously by the same > person. > >>>>>> (Hence, if something is broken, it likely applies to all releases) > >>>>>> > >>>>>> Please review and vote on the release candidate #1 for the versions > >>>>>> 1.11.5, 1.12.6, 1.13.4 and 1.14.1, as follows: > >>>>>> [ ] +1, Approve the releases > >>>>>> [ ] -1, Do not approve the releases (please provide specific > >> comments) > >>>>>> The complete staging area is available for your review, which > >> includes: > >>>>>> * JIRA release notes [1], > >>>>>> * the official Apache source releases and binary convenience > >>>>>> releases to be deployed to dist.apache.org [2], which are signed > >>>>>> with the key with fingerprint C2EED7B111D464BA [3], > >>>>>> * all artifacts to be deployed to the Maven Central Repository [4], > >>>>>> * *the jars for 1.13/1.14 are still being built* > >>>>>> * source code tags [5], > >>>>>> * website pull request listing the new releases and adding > >>>>>> announcement blog post [6]. > >>>>>> > >>>>>> The vote will be open for at least 24 hours. The minimum vote time > >>>>>> has been shortened as the changes are minimal and the matter is > >> urgent. > >>>>>> It is adopted by majority approval, with at least 3 PMC affirmative > >>>>>> votes. > >>>>>> > >>>>>> Thanks, > >>>>>> Chesnay > >>>>>> > >>>>>> [1] > >>>>>> 1.11: > >>>>>> > >> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350327 > >>>>>> 1.12: > >>>>>> > >
Re: [ANNOUNCE] New Apache Flink Committer - Matthias Pohl
Congratulations, Matthias, and welcome to the Flink committers! On Mon, Dec 13, 2021 at 4:39 AM 刘建刚 wrote: > Congratulations! > > Best > Liu Jiangang > > Nicholas Jiang 于2021年12月13日周一 11:23写道: > > > Congratulations, Matthias! > > > > Best, > > Nicholas Jiang > > >
Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk
Congrats and welcome! On Mon, Dec 13, 2021 at 4:39 AM 刘建刚 wrote: > Congratulations! > > Best > Liu Jiangang > > Nicholas Jiang 于2021年12月13日周一 11:28写道: > > > Congratulations, Ingo! > > > > Best, > > Nicholas Jiang > > >
Re: [CANCELLED] Release 1.11.5/1.12.6/1.13.4/1.14.1, release candidate #1
Given that these artifacts are published already, users can use them if they want to update now: For example: https://search.maven.org/artifact/org.apache.flink/flink-core/1.14.1/jar Just for the users that really want to update now (rather than rely on the mitigation via config) and are not as much concerned about the remaining weakness in log4j 2.15.0 On Tue, Dec 14, 2021 at 11:18 PM Seth Wiesman wrote: > Thank you for managing these updates Chesnay! > > > > On Tue, Dec 14, 2021 at 3:51 PM Chesnay Schepler > wrote: > > > Since the maven artifacts have already been published we will use the > > next patch version for each release, i.e.: > > 1.11.6 > > 1.12.7 > > 1.13.5 > > 1.14.2 > > > > (We could technically just update the source/binaries, but that seems > > fishy). > > > > On 14/12/2021 22:38, Chesnay Schepler wrote: > > > I'm canceling the release because the issue was not fully fixed in > > > Log4j 2.15.0; see CVE-2021-45046. > > > > > > I will start preparing new release candidates that use Log4j 2.16.0 . > > > > > > On 14/12/2021 21:28, Chesnay Schepler wrote: > > >> The vote duration has passed and we have approved the releases. > > >> > > >> Binding votes: > > >> * Stephan > > >> * Till > > >> * Xintong > > >> * Zhu > > >> * Gordon > > >> > > >> I will not finalize the release. > > >> > > >> On 13/12/2021 20:28, Chesnay Schepler wrote: > > >>> Hi everyone, > > >>> > > >>> This vote is for the emergency patch releases for 1.11, 1.12, 1.13 > > >>> and 1.14 to address CVE-2021-44228. > > >>> It covers all 4 releases as they contain the same changes (upgrading > > >>> Log4j to 2.15.0) and were prepared simultaneously by the same person. > > >>> (Hence, if something is broken, it likely applies to all releases) > > >>> > > >>> Please review and vote on the release candidate #1 for the versions > > >>> 1.11.5, 1.12.6, 1.13.4 and 1.14.1, as follows: > > >>> [ ] +1, Approve the releases > > >>> [ ] -1, Do not approve the releases (please provide specific > comments) > > >>> > > >>> The complete staging area is available for your review, which > includes: > > >>> * JIRA release notes [1], > > >>> * the official Apache source releases and binary convenience > > >>> releases to be deployed to dist.apache.org [2], which are signed > > >>> with the key with fingerprint C2EED7B111D464BA [3], > > >>> * all artifacts to be deployed to the Maven Central Repository [4], > > >>> * *the jars for 1.13/1.14 are still being built* > > >>> * source code tags [5], > > >>> * website pull request listing the new releases and adding > > >>> announcement blog post [6]. > > >>> > > >>> The vote will be open for at least 24 hours. The minimum vote time > > >>> has been shortened as the changes are minimal and the matter is > urgent. > > >>> It is adopted by majority approval, with at least 3 PMC affirmative > > >>> votes. > > >>> > > >>> Thanks, > > >>> Chesnay > > >>> > > >>> [1] > > >>> 1.11: > > >>> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350327 > > >>> 1.12: > > >>> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350328 > > >>> 1.13: > > >>> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350686 > > >>> 1.14: > > >>> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350512 > > >>> [2] > > >>> 1.11: https://dist.apache.org/repos/dist/dev/flink/flink-1.11.5-rc1/ > > >>> 1.12: https://dist.apache.org/repos/dist/dev/flink/flink-1.12.6-rc1/ > > >>> 1.13: https://dist.apache.org/repos/dist/dev/flink/flink-1.13.4-rc1/ > > >>> 1.14: https://dist.apache.org/repos/dist/dev/flink/flink-1.14.1-rc1/ > > >>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > >>> [4] > > >>> 1.11/1.12: > > >>> > https://repository.apache.org/content/repositories/orgapacheflink-1455 > > >>> 1.13: > > >>> > https://repository.apache.org/content/repositories/orgapacheflink-1457 > > >>> 1.14: > > >>> > https://repository.apache.org/content/repositories/orgapacheflink-1456 > > >>> [5] > > >>> 1.11: > https://github.com/apache/flink/releases/tag/release-1.11.5-rc1 > > >>> 1.12: > https://github.com/apache/flink/releases/tag/release-1.12.6-rc1 > > >>> 1.13: > https://github.com/apache/flink/releases/tag/release-1.13.4-rc1 > > >>> 1.14: > https://github.com/apache/flink/releases/tag/release-1.14.1-rc1 > > >>> [6] https://github.com/apache/flink-web/pull/489 > > >>> > > >> > > > > > > > >
Re: [VOTE] Release 1.11.5/1.12.6/1.13.4/1.14.1, release candidate #1
+1 (binding) - Verified that commit history is identical to previous release (except dependency upgrade and release version commit) - Verified that the source releases reference updated dependency and binary releases contain updated dependency - Blog post looks good - ran bundled examples against 1.14.1 binary release, worked as expected. On Mon, Dec 13, 2021 at 9:22 PM Seth Wiesman wrote: > +1 (non-binding) > > - Checked Log4J version and updated license preambles on all releases > - Verified signatures on sources > - Reviewed blog post > > Seth > > On Mon, Dec 13, 2021 at 1:42 PM Jing Ge wrote: > > > +1 LGTM. Many thanks for your effort! > > > > On Mon, Dec 13, 2021 at 8:28 PM Chesnay Schepler > > wrote: > > > > > Hi everyone, > > > > > > This vote is for the emergency patch releases for 1.11, 1.12, 1.13 and > > > 1.14 to address CVE-2021-44228. > > > It covers all 4 releases as they contain the same changes (upgrading > > > Log4j to 2.15.0) and were prepared simultaneously by the same person. > > > (Hence, if something is broken, it likely applies to all releases) > > > > > > Please review and vote on the release candidate #1 for the versions > > > 1.11.5, 1.12.6, 1.13.4 and 1.14.1, as follows: > > > [ ] +1, Approve the releases > > > [ ] -1, Do not approve the releases (please provide specific comments) > > > > > > The complete staging area is available for your review, which includes: > > > * JIRA release notes [1], > > > * the official Apache source releases and binary convenience releases > to > > > be deployed to dist.apache.org [2], which are signed with the key with > > > fingerprint C2EED7B111D464BA [3], > > > * all artifacts to be deployed to the Maven Central Repository [4], > > > * *the jars for 1.13/1.14 are still being built* > > > * source code tags [5], > > > * website pull request listing the new releases and adding announcement > > > blog post [6]. > > > > > > The vote will be open for at least 24 hours. The minimum vote time has > > > been shortened as the changes are minimal and the matter is urgent. > > > It is adopted by majority approval, with at least 3 PMC affirmative > > votes. > > > > > > Thanks, > > > Chesnay > > > > > > [1] > > > 1.11: > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350327 > > > 1.12: > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350328 > > > 1.13: > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350686 > > > 1.14: > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350512 > > > [2] > > > 1.11: https://dist.apache.org/repos/dist/dev/flink/flink-1.11.5-rc1/ > > > 1.12: https://dist.apache.org/repos/dist/dev/flink/flink-1.12.6-rc1/ > > > 1.13: https://dist.apache.org/repos/dist/dev/flink/flink-1.13.4-rc1/ > > > 1.14: https://dist.apache.org/repos/dist/dev/flink/flink-1.14.1-rc1/ > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > > [4] > > > 1.11/1.12: > > > https://repository.apache.org/content/repositories/orgapacheflink-1455 > > > 1.13: > > > https://repository.apache.org/content/repositories/orgapacheflink-1457 > > > 1.14: > > > https://repository.apache.org/content/repositories/orgapacheflink-1456 > > > [5] > > > 1.11: https://github.com/apache/flink/releases/tag/release-1.11.5-rc1 > > > 1.12: https://github.com/apache/flink/releases/tag/release-1.12.6-rc1 > > > 1.13: https://github.com/apache/flink/releases/tag/release-1.13.4-rc1 > > > 1.14: https://github.com/apache/flink/releases/tag/release-1.14.1-rc1 > > > [6] https://github.com/apache/flink-web/pull/489 > > > > > >
[DISCUSS] Immediate dedicated Flink releases for log4j vulnerability
Hi all! Without doubt, you heard about the log4j vulnerability [1]. There is an advisory blog post on how to mitigate this in Apache Flink [2], which involves setting a config option and restarting the processes. That is fortunately a relatively simple fix. Despite this workaround, I think we should do an immediate release with the updated dependency. Meaning not waiting for the next bug fix releases coming in a few weeks, but releasing asap. The mood I perceive in the industry is pretty much panicky over this, and I expect we will see many requests for a patched release and many discussions why the workaround alone would not be enough due to certain guidelines. I suggest that we preempt those discussions and create releases the following way: - we take the latest already released versions from each release branch: ==> 1.14.0, 1.13.3, 1.12.5, 1.11.4 - we add a single commit to those that just updates the log4j dependency - we release those as 1.14.1, 1.13.4, 1.12.6, 1.11.5, etc. - that way we don't need to do functional release tests, because the released code is identical to the previous release, except for the log4j dependency - we can then continue the work on the upcoming bugfix releases as planned, without high pressure I would suggest creating those RCs immediately and release them with a special voting period (24h or so). WDYT? Best, Stephan [1] https://nvd.nist.gov/vuln/detail/CVE-2021-44228 [2] https://flink.apache.org/2021/12/10/log4j-cve.html
Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage
Thanks for all the details and explanation. With the conclusion of the discussion, also +1 from my side for this FLIP On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li wrote: > Thanks Stephan and Timo, I have a rough look at your replies. They are > all valuable opinions. I will take time to discuss, explain and > improve them. > > Hi Timo, > > At least a final "I will start the vote soon. Last call for comments." > would have been nice. > > I replied in the DISCUSS thread that we began to vote. If there are > supplementary comments or reply "pause voting first, I will reply > later", we can suspend or cancel the voting at any time. > I understand why the FLIP must take three days to vote, so that more > people can see it and put forward their opinions. > > Best, > Jingsong > > On Sat, Nov 13, 2021 at 1:27 AM Timo Walther wrote: > > > > Hi everyone, > > > > even though the DISCUSS thread was open for 2 weeks. I have the feeling > > that the VOTE was initiated to quickly. At least a final "I will start > > the vote soon. Last call for comments." would have been nice. > > > > I also added some comments in the DISCUSS thread. Let's hope we can > > resolve those soon. > > > > Regards, > > Timo > > > > On 12.11.21 16:36, Stephan Ewen wrote: > > > Hi all! > > > > > > I have a few questions on the design still, posted those in the > [DISCUSS] > > > thread. > > > It would be great to clarify those first before concluding this vote. > > > > > > Thanks, > > > Stephan > > > > > > > > > On Fri, Nov 12, 2021 at 7:22 AM Jark Wu wrote: > > > > > >> +1 (binding) > > >> > > >> Thanks for the great work Jingsong! > > >> > > >> Best, > > >> Jark > > >> > > >> On Thu, 11 Nov 2021 at 19:41, JING ZHANG > wrote: > > >> > > >>> +1 (non-binding) > > >>> > > >>> A small suggestion: > > >>> The message queue is currently used to store middle layer data of the > > >>> streaming data warehouse. We hope use built-in dynamic table storage > to > > >>> store those middle layer. > > >>> But those middle data of the streaming data warehouse are often > provided > > >> to > > >>> all business teams in a company. Some teams have not use Apache > Flink as > > >>> compute engine yet. In order to continue server those teams, the > data in > > >>> built-in dynamic table storage may be needed to copied to message > queue > > >>> again. > > >>> If *the built-in storage could provide same consumer API as the > commonly > > >>> used message queues*, data copying may be avoided. So the built-in > > >> dynamic > > >>> table storage may be promoted faster in the streaming data warehouse > > >>> business. > > >>> > > >>> Best regards, > > >>> Jing Zhang > > >>> > > >>> Yufei Zhang 于2021年11月11日周四 上午9:34写道: > > >>> > > >>>> Hi, > > >>>> > > >>>> +1 (non-binding) > > >>>> > > >>>> Very interesting design. I saw a lot of discussion on the generic > > >>>> interface design, good to know it will address extensibility. > > >>>> > > >>>> Cheers, > > >>>> Yufei > > >>>> > > >>>> > > >>>> On 2021/11/10 02:51:55 Jingsong Li wrote: > > >>>>> Hi everyone, > > >>>>> > > >>>>> Thanks for all the feedback so far. Based on the discussion[1] we > > >> seem > > >>>>> to have consensus, so I would like to start a vote on FLIP-188 for > > >>>>> which the FLIP has now also been updated[2]. > > >>>>> > > >>>>> The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless > > >>>>> there is an objection or insufficient votes. > > >>>>> > > >>>>> [1] > https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7 > > >>>>> [2] > > >>>> > > >>> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > >>>>> > > >>>>> Best, > > >>>>> Jingsong > > >>>>> > > >>>> > > >>> > > >> > > > > > > > > -- > Best, Jingsong Lee >
Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage
Thanks for digging into this. Regarding this query: INSERT INTO the_table SELECT window_end, COUNT(*) FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES)) GROUP BY window_end HAVING now() - window_end <= INTERVAL '14' DAYS; I am not sure I understand what the conclusion is on the data retention question, where the continuous streaming SQL query has retention semantics. I think we would need to answer the following questions (I will call the query that computed the managed table the "view materializer query" - VMQ). (1) I guess the VMQ will send no updates for windows beyond the "retention period" is over (14 days), as you said. That makes sense. (2) Will the VMQ send retractions so that the data will be removed from the table (via compactions)? - if yes, this seems semantically better for users, but it will be expensive to keep the timers for retractions. - if not, we can still solve this by adding filters to queries against the managed table, as long as these queries are in Flink. - any subscriber to the changelog stream would not see strictly a correct result if we are not doing the retractions (3) Do we want time retention semantics handled by the compaction? - if we say that we lazily apply the deletes in the queries that read the managed tables, then we could also age out the old data during compaction. - that is cheap, but it might be too much of a special case to be very relevant here. (4) Do we want to declare those types of queries "out of scope" initially? - if yes, how many users are we affecting? (I guess probably not many, but would be good to hear some thoughts from others on this) - should we simply reject such queries in the optimizer as "not possible to support in managed tables"? I would suggest that, always better to tell users exactly what works and what not, rather than letting them be surprised in the end. Users can still remove the HAVING clause if they want the query to run, and that would be better than if the VMQ just silently ignores those semantics. Thanks, Stephan
Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage
Hi Jingsong! Thank you for all the explanations. To follow up on the points: (1) Log implementation Good to hear you are looking to make this extensible. (2) change tracking Understood, makes sense (used for re-processing). (3) Log Scan Startup mode. Your explanation makes sense. As I understand it, that means, though, that any managed table that uses another managed table as a source will automatically always use the "full scan" mode (snapshot + log subscription). (4) Data retention > - The data of the snapshot will never expire, and the user needs to > delete the partition by themselves if needed. > - The expiration time of the log is unlimited by default, but it can > be configured. In fact, we only need the latest log by default, > because we have saved the previous snapshot. Are you referring to cleanup here in the sense of garbage collection, or also the deletion of data that makes the Managed Table semantically wrong? Let's assume I define a managed table with such a query below, where the "interactions" table is derived from a stream of Kafka records. INSERT INTO the_table SELECT window_end, COUNT(*) FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5' MINUTES)) GROUP BY window_end HAVING now() - window_end <= INTERVAL '14' DAYS; When a user does "Select *" from that table, they don't want to see old data, it would make the query reading from the managed table incorrect. Who would then filter or would prune the old data? The query maintaining the managed table? The query reading from the managed table? Maybe this isn't something you want to solve in the first version, but it would be good to have a definite answer what the plan in case data retention is part of the query semantics. If the assumption is that managed tables can never have retention defined in their query semantics (and the assumption is that all filtering is done by the query that reads the managed table), then I think this is a supercritical design property that we need to make very explicit for users to understand. (5) Unified format in the log Makes sense. (6, 7) PK mode and consistency I get the technical reason that with PKs there is a case for duplicate filtering, but I am skeptical of the user experience if this details implicitly changes the consistency users see. This may not be relevant for Flink queries that read from the changelog, but it is relevant for external programs subscribing to the changelog. What would be the additional overhead of creating a separate config switch "log consistency" with values "transactional", "eventual". By default, this would be transactional regardless of whether the result has a PK or not. If there is a PK, then users can optimize the latency if they want by using the "eventual" setting. That means there is not a surprise for users through the fact that changing the schema of the table (adding a PK) suddenly changes the consistency semantics. If I understand correctly, "transactional" with PK would actually not be correct, it would still contain some duplicates. But that means we cannot expose a correct changelog to users (external subscribers) in all cases? The PK behavior seems like something that needs some general improvement in the SQL engine. (8) Optimistic locking. Thanks for clarifying, that makes sense. I think that would be good to add to the FLIP. The reason why something is done is as important as what exactly is being done. Thanks a lot! Stephan On Mon, Nov 15, 2021 at 10:41 AM Jingsong Li wrote: > Hi Timo, > > > It would be great if we can add not only `Receive any type of changelog` > but also `Receive any type of datatype`. > > Nice, I think we can. > > > Please clarify whether the compact DDL is a synchronous or asynchrounous > operation in the API? So far all DDL was synchrounous. And only DML > asynchrounous. > > It should be a synchronous operation. > > > I find this 'change-tracking' = 'false' a bit confusing. Even in batch > scenarios we have a changelog, only with insert-only changes. Can you > elaborate? Wouldn't 'exclude-from-log-store' or 'exclude-log-store' or > 'log.disabled' be more accurate? > > Change tracking is from Oracle and snowflake [1][2][3]. It matches the > "emit-changes" syntax. It means that after closing, the downstream > consumption cannot obtain the corresponding changes. > > > DESCRIBE DETAIL TABLE > > +1 to `DESCRIBE TABLE EXTENDED`. > > > Set checkpoint interval to 1 min if checkpoint is not enabled > when the planner detects a sink to built-in dynamic table. > This sounds like too much magic to me. > > You are right. And one minute may not be enough for all situations. +1 > to throw detailed exception to alert user. > > > GenericCatalog to `Catalog#supportesTableStorage` > > I originally thought about completely distinguishing it from external > catalog, but it is also possible to add a new method. > > > CatalogBaseTable.TableKind > > Yes, we can create a new TableKind for this table.
Re: [DISCUSS] Update Policy for old releases
I am of a bit different opinion here, I don't think LTS is the best way to go. To my understanding, the big issue faced by users is that an upgrade is simply too hard. Too many things change in subtle ways, you cannot just take the previous application and configuration and expect it to run the same after the upgrade. If that was much easier, users would be able to upgrade more easily and more frequently (maybe not every month, but every six months or so). In contrast, LTS is more about how long one provides patches and releases. The upgrade problem is the same, between LTS versions, upgrades should still be smooth. To make LTS to LTS smooth, we need to solve the same issue as making it smooth from individual version to individual version. Unless we expect non-linear upgrade paths with migration tools, which I am not convinced we should do. It seems to be the opposite of where the industry is moving (upgrade fast and frequently by updating images). The big downside of LTS versions is that almost no one ends up using the other versions, so we get little feedback on features. We will end up having a feature in Flink for three releases and still barely anyone will have used it, so we will lack the confidence to turn it on by default. I also see the risk that the community ends up taking compatibility with non-LTS releases not as serious, because "it is anyways not an LTS version". We could look at making the upgrades smoother, by starting to observe the issues listed here. I think we need to do that anyways, because that is what I hear users bringing up more and more. If after that we still feel like there is a problem, then let's revisit this issue. - API compatibility (signatures and behavior) - Make it possible to pin SQL semantics of a query across releases (FLIP-190 works on this) - Must be possible to use same configs as before in a new Flink version and keep the same behavior that way - REST API must be stable (signature and semantics) - Make it possible to mix different client/cluster versions (stable serializations for JobGraph, etc.) The issue that we define officially two supported versions, but many committers like to backport things for one more version is something we can certainly look at, to bring some consistency in there. Best, Stephan On Fri, Nov 12, 2021 at 9:17 AM Martijn Visser wrote: > Thanks for bringing this up for discussion Piotr. One the one hand I think > it's a good idea, because of the reasons you've mentioned. On the other > hand, having an LTS version will remove an incentive for some users to > upgrade, which will result in fewer Flink users who will test new features > because they wait for the next LTS version to upgrade. I can see that > particularly happening for large enterprises. Another downside I can > imagine is that upgrading from one LTS version to another LTS version will > become more complicated because more changes have been made between those > versions. > > Related to my second remark, would/could introducing an LTS version would > also trigger a follow-up discussion that we could potentially introduce > breaking changes in a next LTS version, like a Flink 2.0 [1] ? > > Best regards, > > Martijn > > [1] https://issues.apache.org/jira/browse/FLINK-3957 > > On Fri, 12 Nov 2021 at 08:59, Fabian Paul > wrote: > > > Thanks for bringing up this topic Piotr. > > I also think we should try to decouple our release cycles from our > support > > plans. Currently we are very limited by the approach because faster > release > > cycles result in also faster deprecation of versions. > > > > > > Therefore I am also favoring version 2 where we can align the next LTS > > version > > with our development speed. Option 1 I think can easily lead to confusion > > when > > the number of supported releases constantly changes. > > > > Best, > > Fabian > > > > >
Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage
Hi all! I have a few questions on the design still, posted those in the [DISCUSS] thread. It would be great to clarify those first before concluding this vote. Thanks, Stephan On Fri, Nov 12, 2021 at 7:22 AM Jark Wu wrote: > +1 (binding) > > Thanks for the great work Jingsong! > > Best, > Jark > > On Thu, 11 Nov 2021 at 19:41, JING ZHANG wrote: > > > +1 (non-binding) > > > > A small suggestion: > > The message queue is currently used to store middle layer data of the > > streaming data warehouse. We hope use built-in dynamic table storage to > > store those middle layer. > > But those middle data of the streaming data warehouse are often provided > to > > all business teams in a company. Some teams have not use Apache Flink as > > compute engine yet. In order to continue server those teams, the data in > > built-in dynamic table storage may be needed to copied to message queue > > again. > > If *the built-in storage could provide same consumer API as the commonly > > used message queues*, data copying may be avoided. So the built-in > dynamic > > table storage may be promoted faster in the streaming data warehouse > > business. > > > > Best regards, > > Jing Zhang > > > > Yufei Zhang 于2021年11月11日周四 上午9:34写道: > > > > > Hi, > > > > > > +1 (non-binding) > > > > > > Very interesting design. I saw a lot of discussion on the generic > > > interface design, good to know it will address extensibility. > > > > > > Cheers, > > > Yufei > > > > > > > > > On 2021/11/10 02:51:55 Jingsong Li wrote: > > > > Hi everyone, > > > > > > > > Thanks for all the feedback so far. Based on the discussion[1] we > seem > > > > to have consensus, so I would like to start a vote on FLIP-188 for > > > > which the FLIP has now also been updated[2]. > > > > > > > > The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless > > > > there is an objection or insufficient votes. > > > > > > > > [1] https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7 > > > > [2] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > > > > > > > Best, > > > > Jingsong > > > > > > > > > >
Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage
Hi all! Thank you for the writeup of this feature. I like the general direction a lot. There are some open questions and confusing details still, which I think we need to clarify first to make this feature really good. Below are questions/suggestions on the FLIP: Best, Stephan === *(1) Log Implementation* I agree with Eron that we should not design this hardwired to Kafka. Let's have the internal interfaces in place to make this open to other streaming storage systems as well. The config options seem to be designed in a way that is Kafka-exclusive. Can we change this, for example to something like - storage.log.system=kafka - storage.log.kafka.properties.bootstrap.servers - storage.log.kafka.retention *(2) Change Tracking* I am not sure I understand this fully. When a batch query inserts without change tracking what happens then? - does it skip writing to the change log? - does it simply overwrite the managed table with the new result? - something different? *(3) "table-storage.log.scan.startup.mode"* Somehow the presence of this flag seems to break the abstraction of managed tables. Let's say someone creates a managed table that is computed via a query over another managed table. It would need all the data from the previous table, or it would be inconsistent. What is the reason to have this setting? Support cases where one doesn't need all past data (let's say only data from the previous month)? Exposing this again somewhat destroys the nice "transparent out of the box" behavior, because now users need to think again about the incremental building of the tables. I think that case shows that we miss a bit better handling of data retention (see next point). Also, this seems to be a per-query setting, more than a global setting, so should this be part of the config with which the query is submitted that reads from the table-storage? The names could also be improved a bit, I think, for example we could call it just "table-storage.log.scan" with values "full", "latest", "from-timestamp". *(4) Data retention* I am wondering how and when data is ever cleaned up. For example, when the table definition has a time attribute and predicate so that the managed table should only contain the data from the previous month. How does old data get cleaned up? Only through deletes coming from timers in the Flink SQL layer? I think if we want this to be really good and efficient, we need to look at dropping data during the compaction. The compaction should know it needs to retain only data from WaterMark - 1 month or so. That is somewhat similar to the optimization I proposed also for SQL in general, to get rid of timers and only use TTL (and compaction filters) for data expiration. I think for managed tables, this is even more crucial for performance. But it would mean that we need to have a better model for inferring required data retention based on predicates over the time columns, and not simply just have fixed retention based on the watermark. *(5) Different formats for cases with PK and without PK* The FLIP proposes Debezium-Avro for cases without a PK and just Arvo for cases with PK. Do we expect that some users directly subscribe to the Table Changelog, meaning directly read via a Kafka Consumer from the topic? - I would expect that this will happen, because users want to avoid writing the log twice (one for Flink managed table queries, one for external subscribers). - If this is publicly exposed, then the fact that it uses different formats in different cases (PK or no PK) seems really confusing and not intuitive for users. - Can the format be just Debezium-JSON in all cases? *(6) Different consistency guarantees with PK and without PK* Is this purely an internal implementation detail, or will users see a difference? My understanding is that users see a difference. Having that difference implicitly happen when users add a PK reference seems very confusing to me. What about cases where the table has a PK (because users want the data in Kafka that way) but want transactional consistency? If we need the "low-latency eventual consistency" mode with PKs, I would suggest making this a separate mode that users can choose to activate if they want. We can restrict it to cases that have a PK, but not automatically change the behavior when a PK is declared. *(7) Eventual Consistency Mode vs. Faster Checkpoints* The eventual consistency mode with PK seems mainly there to get lower latency for the changelog. What latencies are we looking for here? There is also the work on generalized incremental checkpoints, which should get the latency down to a few seconds, would that be good enough? The current Upsert Kafka Source (which would be used with PK eventual consistency mode) has a big inefficiency in the way it needs to retain all state to convert the records to changelog records. That is also a high price to pay for that mode. *(8) Concurrent Write / Locking* I don't
[jira] [Created] (FLINK-24852) Cleanup of Orphaned Incremental State Artifacts
Stephan Ewen created FLINK-24852: Summary: Cleanup of Orphaned Incremental State Artifacts Key: FLINK-24852 URL: https://issues.apache.org/jira/browse/FLINK-24852 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.14.0 Reporter: Stephan Ewen Shared State Artifacts (state files in the "shared" folder in the DFS / ObjectStore) can become orphaned in various situations: * When a TaskManager fails right after it created a state file but before the checkpoint was ack-ed to the JobManager, that state file will be orphaned. * When the JobManager fails all state newly added for the currently pending checkpoint will be orphaned. These state artifacts are currently impossible to be cleaned up manually, because it isn't easily possible to understand whether they are still being used (referenced by any checkpoint). We should introduce a "garbage collector" that identifies and deletes such orphaned state artifacts. h2. Idea for a cleanup mechanism A periodic cleanup thread would periodically execute a cleanup procedure that searches for and deletes the orphaned artifacts. To identify those artifacts, the cleanup procedure needs the following inputs: * The oldest retained checkpoint ID * A snapshot of the shared state registry * A way to identify for each state artifact from which checkpoint it was created. The cleanup procedure would * enumerate all state artifacts (for example files in the "shared" directory) * For each one check whether it was created earlier than the oldest retained checkpoint. If not, that artifact would be skipped, because it might come from a later pending checkpoint, or later canceled checkpoint. * Finally, the procedure checks if the state artifact is known by the shared state registry. If yes, the artifact is kept, if not, it is orphaned and will be deleted. Because the cleanup procedure is specific to the checkpoint storage, it should probably be instantiated from the checkpoint storage. To make it possible to identify the checkpoint for which a state artifact was created, we can put that checkpoint ID into the state file name, for example format the state name as {{"_"}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] Creating an external connector repository
Thank you all, for the nice discussion! >From my point of view, I very much like the idea of putting connectors in a separate repository. But I would argue it should be part of Apache Flink, similar to flink-statefun, flink-ml, etc. I share many of the reasons for that: - As argued many times, reduces complexity of the Flink repo, increases response times of CI, etc. - Much lower barrier of contribution, because an unstable connector would not de-stabilize the whole build. Of course, we would need to make sure we set this up the right way, with connectors having individual CI runs, build status, etc. But it certainly seems possible. I would argue some points a bit different than some cases made before: (a) I believe the separation would increase connector stability. Because it really forces us to work with the connectors against the APIs like any external developer. A mono repo is somehow the wrong thing if you in practice want to actually guarantee stable internal APIs at some layer. Because the mono repo makes it easy to just change something on both sides of the API (provider and consumer) seamlessly. Major refactorings in Flink need to keep all connector API contracts intact, or we need to have a new version of the connector API. (b) We may even be able to go towards more lightweight and automated releases over time, even if we stay in Apache Flink with that repo. This isn't yet fully aligned with the Apache release policies, yet, but there are board discussions about whether there can be bot-triggered releases (by dependabot) and how that could fit into the Apache process. This doesn't seem to be quite there just yet, but seeing that those start is a good sign, and there is a good chance we can do some things there. I am not sure whether we should let bots trigger releases, because a final human look at things isn't a bad thing, especially given the popularity of software supply chain attacks recently. I do share Chesnay's concerns about complexity in tooling, though. Both release tooling and test tooling. They are not incompatible with that approach, but they are a task we need to tackle during this change which will add additional work. On Tue, Oct 26, 2021 at 10:31 AM Arvid Heise wrote: > Hi folks, > > I think some questions came up and I'd like to address the question of the > timing. > > Could you clarify what release cadence you're thinking of? There's quite > > a big range that fits "more frequent than Flink" (per-commit, daily, > > weekly, bi-weekly, monthly, even bi-monthly). > > The short answer is: as often as needed: > - If there is a CVE in a dependency and we need to bump it - release > immediately. > - If there is a new feature merged, release soonish. We may collect a few > successive features before a release. > - If there is a bugfix, release immediately or soonish depending on the > severity and if there are workarounds available. > > We should not limit ourselves; the whole idea of independent releases is > exactly that you release as needed. There is no release planning or > anything needed, you just go with a release as if it was an external > artifact. > > (1) is the connector API already stable? > > From another discussion thread [1], connector API is far from stable. > > Currently, it's hard to build connectors against multiple Flink versions. > > There are breaking API changes both in 1.12 -> 1.13 and 1.13 -> 1.14 and > > maybe also in the future versions, because Table related APIs are still > > @PublicEvolving and new Sink API is still @Experimental. > > > > The question is: what is stable in an evolving system? We recently > discovered that the old SourceFunction needed to be refined such that > cancellation works correctly [1]. So that interface is in Flink since 7 > years, heavily used also outside, and we still had to change the contract > in a way that I'd expect any implementer to recheck their implementation. > It might not be necessary to change anything and you can probably change > the the code for all Flink versions but still, the interface was not stable > in the closest sense. > > If we focus just on API changes on the unified interfaces, then we expect > one more change to Sink API to support compaction. For Table API, there > will most likely also be some changes in 1.15. So we could wait for 1.15. > > But I'm questioning if that's really necessary because we will add more > functionality beyond 1.15 without breaking API. For example, we may add > more unified connector metrics. If you want to use it in your connector, > you have to support multiple Flink versions anyhow. So rather then focusing > the discussion on "when is stuff stable", I'd rather focus on "how can we > support building connectors against multiple Flink versions" and make it as > painless as possible. > > Chesnay pointed out to use different branches for different Flink versions > which sounds like a good suggestion. With a mono-repo, we can't use > branches
Re: [NOTICE] CiBot improvements
Great initiative, thanks for doing this! On Mon, Oct 11, 2021 at 10:52 AM Till Rohrmann wrote: > Thanks a lot for this effort Chesnay! The improvements sound really good. > > Cheers, > Till > > On Mon, Oct 11, 2021 at 8:46 AM David Morávek wrote: > > > Nice! Thanks for the effort Chesnay, this is really a huge step forward! > > > > Best, > > D. > > > > On Mon, Oct 11, 2021 at 6:02 AM Xintong Song > > wrote: > > > > > Thanks for the effort, @Chesnay. This is super helpful. > > > > > > @Jing, > > > Every push to the PR branch should automatically trigger an entire new > > > build. `@flinkbot run azure` should only be used when you want to > re-run > > > the failed stages without changing the PR. E.g., when running into > known > > > unstable cases that are unrelated to the PR. > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > > > > On Mon, Oct 11, 2021 at 11:45 AM JING ZHANG > > wrote: > > > > > > > Hi Chesnay, > > > > Thanks for the effort. It is a very useful improvement. > > > > I have a minor question. Please forgive me if the question is too > > naive. > > > > Since '@flinkbot run azure' now behaves like "Rerun failed jobs", is > > > there > > > > any way to trigger an entirely new build? Because some times I'm not > > sure > > > > the passed cases in the last build could still success in the new > build > > > > because of introduced updates in new commit. > > > > > > > > Best, > > > > JING ZHANG > > > > > > > > > > > > Yangze Guo 于2021年10月11日周一 上午10:31写道: > > > > > > > > > Thanks for that great job, Chesnay! "Rerun failed jobs" will help a > > > lot. > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > On Sun, Oct 10, 2021 at 4:56 PM Chesnay Schepler < > ches...@apache.org > > > > > > > > wrote: > > > > > > > > > > > > I made a number of changes to the CiBot over the weekend. > > > > > > > > > > > > - '@flinkbot run azure' previously triggered an entirely new > build > > > > based > > > > > > on the last completed one. It now instead retries the last > > completed > > > > > > build, only running the jobs that actually failed. It basically > > > behaves > > > > > > like the "Rerun failed jobs" button in the Azure UI. > > > > > > - various optimizations to increase responsiveness (primarily by > > > doing > > > > > > significantly less unnecessary work / requests to GH) > > > > > > - removed TravisCI support (since we no longer support a release > > that > > > > > > used Travis) > > > > > > > > > > > > Please ping me if you spot anything weird. > > > > > > > > > > > > > > >
[jira] [Created] (FLINK-24366) Unnecessary/misleading error message about failing restores when tasks are already canceled.
Stephan Ewen created FLINK-24366: Summary: Unnecessary/misleading error message about failing restores when tasks are already canceled. Key: FLINK-24366 URL: https://issues.apache.org/jira/browse/FLINK-24366 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.13.2, 1.14.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.13.3, 1.15.0, 1.14.1 The following line is logged in all cases where the restore operation fails. The check whether the task is canceled comes only after that line. The fix would be to move the log line to after the check. {code} Exception while restoring my-stateful-task from alternative (1/1), will retry while more alternatives are available. {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24343) Revisit Scheduler and Coordinator Startup Procedure
Stephan Ewen created FLINK-24343: Summary: Revisit Scheduler and Coordinator Startup Procedure Key: FLINK-24343 URL: https://issues.apache.org/jira/browse/FLINK-24343 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.2, 1.14.0 Reporter: Stephan Ewen Fix For: 1.15.0 We need to re-examine the startup procedure of the scheduler, and how it interacts with the startup of the operator coordinators. We need to make sure the following conditions are met: - The Operator Coordinators are started before the first action happens that they need to be informed of. That includes as task being ready, a checkpoint happening, etc. - The scheduler must be started to the point that it can handle "failGlobal()" calls, because the coordinators might trigger that during their startup when an exception in "start()" occurs. /cc [~chesnay] -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] FLIP-173: Support DAG of algorithms
I think this will be a useful addition. Regarding the API and specific design decisions: I think this looks ok. I didn't dig very deep and would be fine to just go with the author's proposal. The main motivation for having a separate flink-ml repository was to develop more easily, make changes and iterate faster without having to weight every design as carefully as we need to do it in core Flink. So +1 from my side On Fri, Sep 10, 2021 at 4:33 AM Dong Lin wrote: > Hi all, > > We would like to start the vote for FLIP-173: Support DAG of > algorithms [1]. This FLIP was discussed in this thread [2]. > > The proposal extended the Flink ML API to support DAG of algorithms where > each algorithm could have multiple inputs and multiple outputs. It also > extended Flink ML API to support online learning scenarios where a > long-running Model instance needs to be continuously updated by the latest > model data generated by another long-running Estimator instance. > > The vote will be open for at least 72 hours, following the consensus voting > process. > > Thanks! > Dong Lin and Zhipeng Zhang > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615783 > [2] > > https://lists.apache.org/thread.html/r6729f351fb1bc13a93754c199d5fee1051cc8146e22374737c578779%40%3Cdev.flink.apache.org%3E >
[jira] [Created] (FLINK-24255) Test Environment / Mini Cluster do not forward configuration.
Stephan Ewen created FLINK-24255: Summary: Test Environment / Mini Cluster do not forward configuration. Key: FLINK-24255 URL: https://issues.apache.org/jira/browse/FLINK-24255 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 When using {{StreamExecutionEnvironment getExecutionEnvironment(Configuration)}}, the config should determine the characteristics of the execution. The config is for example passed to the local environment in the local execution case, and used during the instantiation of the MiniCluster. But when using the {{TestStreamEnvironment}} and the {{MiniClusterWithClientRule}}, the config is ignored. The issue is that the {{StreamExecutionEnvironmentFactory}} in {{TestStreamEnvironment}} ignores the config that is passed to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Apache Flink Stateful Functions 3.1.0, release candidate #1
+1 (binding) - Build project on Java 8 (maven command line) with full end to end tests - Compiled and ran tests in IDE (IntelliJ) with Java 11 (possible after some manual config, see comment below to automate this) - No binaries in the distribution - Verified license and notice files - Checked README Minor issues found, none of which are release blockers: - Some warnings in the command line due to a missing version of a plugin. Fixing that is good for stability (and better resilience against supply chain attacks): See this PR for a fix: https://github.com/apache/flink-statefun/pull/261 - Building in IDE (IntelliJ) with Java 11 does not work out of the box, due to issues with the Java Module System and the use of Unsafe in generated ProtoBuf classes. See this PR for a fix: https://github.com/apache/flink-statefun/pull/262 On Thu, Aug 26, 2021 at 8:32 AM Tzu-Li (Gordon) Tai wrote: > +1 (binding) > > - Built from source with Java 11 and Java 8 (mvn clean install > -Prun-e2e-tests) > - verified signatures and hashes > - verified NOTICE files of Maven artifacts properly list actual bundled > dependencies > - Ran GoLang greeter and showcase with the proposed Dockerfiles for 3.1.0 > - Ran a local smoke E2E against the Java SDK, with adjusted parameters to > run for a longer period of time > > Thanks for driving the release Igal! > > Cheers, > Gordon > > On Thu, Aug 26, 2021 at 4:06 AM Seth Wiesman wrote: > > > +1 (non-binding) > > > > - verified signatures and hashes > > - Checked licenses > > - ran mvn clean install -Prun-e2e-tests > > - ran golang greeter and showcase from the playground [1] > > > > Seth > > > > [1] https://github.com/apache/flink-statefun-playground/pull/12 > > > > On Wed, Aug 25, 2021 at 9:44 AM Igal Shilman wrote: > > > > > +1 from my side: > > > > > > Here are the results of my RC2 testing: > > > > > > - verified the signatures and hashes > > > - verified that the source distribution doesn't contain any binary > files > > > - ran mvn clean install -Prun-e2e-tests > > > - ran Java and Python greeters from the playground [1] with the new > > module > > > structure, and async transport enabled. > > > - verified that the docker image [2] builds and inspected the contents > > > manually. > > > > > > Thanks, > > > Igal > > > > > > [1] https://github.com/apache/flink-statefun-playground/tree/dev > > > [2] https://github.com/apache/flink-statefun-docker/pull/15 > > > > > > > > > On Tue, Aug 24, 2021 at 3:34 PM Igal Shilman wrote: > > > > > > > Sorry, the subject of the previous message should have said "[VOTE] > > > Apache > > > > Flink Stateful Functions 3.1.0, release candidate #2". > > > > > > > > > > > > On Tue, Aug 24, 2021 at 3:24 PM Igal Shilman > wrote: > > > > > > > >> Hi everyone, > > > >> > > > >> Please review and vote on the release candidate #2 for the version > > 3.1.0 > > > >> of Apache Flink Stateful Functions, as follows: > > > >> [ ] +1, Approve the release > > > >> [ ] -1, Do not approve the release (please provide specific > comments) > > > >> > > > >> **Testing Guideline** > > > >> > > > >> You can find here [1] a page in the project wiki on instructions for > > > >> testing. > > > >> To cast a vote, it is not necessary to perform all listed checks, > > > >> but please mention which checks you have performed when voting. > > > >> > > > >> **Release Overview** > > > >> > > > >> As an overview, the release consists of the following: > > > >> a) Stateful Functions canonical source distribution, to be deployed > to > > > >> the release repository at dist.apache.org > > > >> b) Stateful Functions Python SDK distributions to be deployed to > PyPI > > > >> c) Maven artifacts to be deployed to the Maven Central Repository > > > >> d) New Dockerfiles for the release > > > >> e) GoLang SDK tag statefun-sdk-go/v3.1.0-rc2 > > > >> > > > >> **Staging Areas to Review** > > > >> > > > >> The staging areas containing the above mentioned artifacts are as > > > >> follows, for your review: > > > >> * All artifacts for a) and b) can be found in the corresponding dev > > > >> repository at dist.apache.org [2] > > > >> * All artifacts for c) can be found at the Apache Nexus Repository > [3] > > > >> > > > >> All artifacts are signed with the key > > > >> 73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4] > > > >> > > > >> Other links for your review: > > > >> * JIRA release notes [5] > > > >> * source code tag "release-3.1.0-rc2" [6] > > > >> * PR for the new Dockerfiles [7] > > > >> > > > >> **Vote Duration** > > > >> > > > >> The voting time will run for at least 72 hours (since RC1). We are > > > >> targeting this vote to last until Thursday. 26th of August, 6pm CET. > > > >> If it is adopted by majority approval, with at least 3 PMC > affirmative > > > >> votes, it will be released. > > > >> > > > >> Thanks, > > > >> Igal > > > >> > > > >> [1] > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release > > > >> [2]
[ANNOUNCE] Dropping "CheckpointConfig.setPreferCheckpointForRecovery()"
Hi Flink Community! A quick heads-up: We suggest removing the setting "CheckpointConfig.setPreferCheckpointForRecovery()" [1]. The setting has been deprecated since Flink 1.12 and is strongly discouraged, because it can lead to data loss or data duplication in different scenarios. Please see also https://issues.apache.org/jira/browse/FLINK-20427 for background. Are there any concerns about deprecating this issue? Is anyone relying on this setting right now? For a long-term solution to ensuring that there is no slow recovery from savepoints: Some committers (me included) are working on a proposal to support more efficient savepoints and to ensure that intermediate savepoints don't interfere with side effects. Then we can always exclude them from recovery without risk of data loss or duplication. Best, Stephan [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L493
[jira] [Created] (FLINK-23843) Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure instead of Process Kill
Stephan Ewen created FLINK-23843: Summary: Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure instead of Process Kill Key: FLINK-23843 URL: https://issues.apache.org/jira/browse/FLINK-23843 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 Currently, when a the method "SplitEnumeratorContext.runInCoordinatorThread()" throws an exception, the effect is a process kill of the JobManager process. The chain how the process kill happens is: * An exception bubbling up in the executor, killing the executor thread * The executor starts a replacement thread, which is forbidden by the thread factory (as a safety net) and causes a process kill. We should prevent such exceptions from bubbling up in the coordinator executor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23842) Add log messages for reader registrations and split requests.
Stephan Ewen created FLINK-23842: Summary: Add log messages for reader registrations and split requests. Key: FLINK-23842 URL: https://issues.apache.org/jira/browse/FLINK-23842 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 Currently, there are is nothing logged when source enumerators get reader registration events, or when they receive split requests. While some specific source implementations log this in their implementation, for the general case, this information is missing, even though it is super valuable when debugging and understanding the work assignment behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] FLIP-180: Adjust StreamStatus and Idleness definition
+1 (binding) On Mon, Aug 9, 2021 at 12:08 PM Till Rohrmann wrote: > +1 (binding) > > Cheers, > Till > > On Thu, Aug 5, 2021 at 9:09 PM Arvid Heise wrote: > > > Dear devs, > > > > I'd like to open a vote on FLIP-180: Adjust StreamStatus and Idleness > > definition [1] which was discussed in this thread [2]. > > The vote will be open for at least 72 hours unless there is an objection > or > > not enough votes. > > > > Best, > > > > Arvid > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition > > [2] > > > > > https://lists.apache.org/thread.html/r8357d64b9cfdf5a233c53a20d9ac62b75c07c925ce2c43e162f1e39c%40%3Cdev.flink.apache.org%3E > > >
[jira] [Created] (FLINK-23649) Add RocksDB packages to parent-first classloading patterns.
Stephan Ewen created FLINK-23649: Summary: Add RocksDB packages to parent-first classloading patterns. Key: FLINK-23649 URL: https://issues.apache.org/jira/browse/FLINK-23649 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.13.2 Reporter: Stephan Ewen Fix For: 1.14.0 RocksDB classes are currently loaded child-first. Because of that, it can happen that the RocksDB library is attempted to be loaded multiple times (by different classloaders). That is prevented by JNI and results in an error as reported in this mail for example https://lists.apache.org/x/thread.html/rbc3ca24efe13b25e802af9739a6877276503363ffbdc5914ffdad7be@%3Cuser.flink.apache.org%3E We should prevent accidental repeated loading of RocksDB, because we rely on the fact that only one DB is created per task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23630) Make EventTimeWindowCheckpointingITCase and LocalRecoveryITCase run on Windows.
Stephan Ewen created FLINK-23630: Summary: Make EventTimeWindowCheckpointingITCase and LocalRecoveryITCase run on Windows. Key: FLINK-23630 URL: https://issues.apache.org/jira/browse/FLINK-23630 Project: Flink Issue Type: Bug Components: Tests Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 This need a fix in the test where it creates the paths for the checkpoint storage locations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23629) Remove redundant test cases in EventTimeWindowCheckpointingITCase
Stephan Ewen created FLINK-23629: Summary: Remove redundant test cases in EventTimeWindowCheckpointingITCase Key: FLINK-23629 URL: https://issues.apache.org/jira/browse/FLINK-23629 Project: Flink Issue Type: Bug Components: Tests Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 HashMap state store snapshots are always async right now, sync snapshots are no longer supported. We should adjust the {{EventTimeWindowCheckpointingITCase}} to remove the now redundant cases {{MEM_ASYNC}} and {{FILE_ASYNC}} parameter runs. The test is very time-intensive, so this is quite a time saver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [ANNOUNCE] RocksDB Version Upgrade and Performance
For the details of what causes this regression, I would add @Yun Tang to this discussion. On Wed, Aug 4, 2021 at 2:36 PM Yuval Itzchakov wrote: > We are heavy users of RocksDB and have had several issues with memory > managed in Kubernetes, most of them actually went away when we upgraded > from Flink 1.9 to 1.13. > > Do we know why there's such a huge performance regression? Can we improve > this somehow with some flag tweaking? It would be great if we see a more in > depth explanation of the gains vs losses of upgrading. > > On Wed, Aug 4, 2021 at 3:08 PM Stephan Ewen wrote: > >> Hi all! >> >> *!!! If you are a big user of the Embedded RocksDB State Backend and >> have performance sensitive workloads, please read this !!!* >> >> I want to quickly raise some awareness for a RocksDB version upgrade we >> plan to do, and some possible impact on application performance. >> >> *We plan to upgrade RocksDB to version 6.20.* That version of RocksDB >> unfortunately introduces some non-trivial performance regression. In our >> Nexmark Benchmark, at least one query is up to 13% slower. >> With some fixes, this can be improved, but even then there is an overall >> *regression >> up to 6% in some queries*. (See attached table for results from relevant >> Nexmark Benchmark queries). >> >> We would do this update nonetheless, because we need to get new features >> and bugfixes from RocksDB in. >> >> Please respond to this mail thread if you have major concerns about this. >> >> >> *### Fallback Plan* >> >> Optionally, we could fall back to Plan B, which is to upgrade RocksDB >> only to version 5.18.4. >> Which has no performance regression (after applying a custom patch). >> >> While this spares us the performance degradation of RocksDB 6.20.x, this >> has multiple disadvantages: >> - Does not include the better memory stability (strict cache control) >> - Misses out on some new features which some users asked about >> - Does not have the latest RocksDB bugfixes >> >> The latest point is especially bad in my opinion. While we can >> cherry-pick some bugfixes back (and have done this in the past), users >> typically run into an issue first and need to trace it back to RocksDB, >> then one of the committers can find the relevant patch from RocksDB master >> and backport it. That isn't the greatest user experience. >> >> Because of those disadvantages, we would prefer to do the upgrade to the >> newer RocksDB version despite the unfortunate performance regression. >> >> Best, >> Stephan >> >> >> > > -- > Best Regards, > Yuval Itzchakov. >
[ANNOUNCE] RocksDB Version Upgrade and Performance
Hi all! *!!! If you are a big user of the Embedded RocksDB State Backend and have performance sensitive workloads, please read this !!!* I want to quickly raise some awareness for a RocksDB version upgrade we plan to do, and some possible impact on application performance. *We plan to upgrade RocksDB to version 6.20.* That version of RocksDB unfortunately introduces some non-trivial performance regression. In our Nexmark Benchmark, at least one query is up to 13% slower. With some fixes, this can be improved, but even then there is an overall *regression up to 6% in some queries*. (See attached table for results from relevant Nexmark Benchmark queries). We would do this update nonetheless, because we need to get new features and bugfixes from RocksDB in. Please respond to this mail thread if you have major concerns about this. *### Fallback Plan* Optionally, we could fall back to Plan B, which is to upgrade RocksDB only to version 5.18.4. Which has no performance regression (after applying a custom patch). While this spares us the performance degradation of RocksDB 6.20.x, this has multiple disadvantages: - Does not include the better memory stability (strict cache control) - Misses out on some new features which some users asked about - Does not have the latest RocksDB bugfixes The latest point is especially bad in my opinion. While we can cherry-pick some bugfixes back (and have done this in the past), users typically run into an issue first and need to trace it back to RocksDB, then one of the committers can find the relevant patch from RocksDB master and backport it. That isn't the greatest user experience. Because of those disadvantages, we would prefer to do the upgrade to the newer RocksDB version despite the unfortunate performance regression. Best, Stephan
Re: [VOTE] FLIP-150: Introduce Hybrid Source
+1 (binding) Would be good to move this across the finish line. It seems this thread is held up mainly by formalities at this point. On Fri, Jul 2, 2021 at 4:27 AM Israel Ekpo wrote: > +1 (non-binding) > > On Thu, Jul 1, 2021 at 6:45 PM Elkhan Dadashov > wrote: > > > +1 (non-binding) > > > > On 2021/07/01 05:49:44 蒋晓峰 wrote: > > > Hi everyone, > > > > > > > > > > > > > > > Thanks for all the feedback to Hybrid Source so far. Based on the > > discussion[1] we seem to have consensus, so I would like to start a vote > on > > FLIP-150 for which the FLIP has also been updated[2]. > > > > > > > > > > > > > > > The vote will last for at least 72 hours (Sun, Jul 4th 12:00 GMT) > unless > > there is an objection or insufficient votes. > > > > > > > > > > > > > > > Thanks, > > > > > > Nicholas Jiang > > > > > > > > > > > > > > > [1] > > > https://lists.apache.org/thread.html/r94057d19f0df2a211695820375502d60cddeeab5ad27057c1ca988d6%40%3Cdev.flink.apache.org%3E > > > > > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source > > >
Re: [DISCUSS] Address deprecation warnings when upgrading dependencies
> > If someone started preparing a junit5 migration PR they will run into > merge conflicts if everyone now starts replacing these instances at will. This sounds like a good case for fixing it in one step during the upgrade. Otherwise folks will start individually fixing this individually when they encounter the deprecated methods. On Wed, Jul 14, 2021 at 5:31 PM Chesnay Schepler wrote: > If someone started preparing a junit5 migration PR they will run into > merge conflicts if everyone now starts replacing these instances at will. > > There are also some options on the table on how to actually do the > migration; we can use hamcrest of course, or create a small wrapper in > our test utils that retains the signature junit signature (then we'd > just have to adjust imports). > > On 14/07/2021 16:33, Stephan Ewen wrote: > > @Chesnay - can you elaborate on this? In the classes I worked with so > far, > > it was a 1:1 replacement of "org.junit.Assert.assertThat()" to > > "org.hamcrest.MatcherAssert.assertThat()". > > What other migration should happen there? > > > > On Wed, Jul 14, 2021 at 11:13 AM Chesnay Schepler > > wrote: > > > >> It may be better to not do that to ease the migration to junit5, where > >> we have to address exactly these usages. > >> > >> On 14/07/2021 09:57, Till Rohrmann wrote: > >>> I actually found > >>> myself recently, whenever touching a test class, replacing Junit's > >>> assertThat with Hamcrest's version which felt quite tedious. > >> > >> > >
Re: [DISCUSS] Address deprecation warnings when upgrading dependencies
@Chesnay - can you elaborate on this? In the classes I worked with so far, it was a 1:1 replacement of "org.junit.Assert.assertThat()" to "org.hamcrest.MatcherAssert.assertThat()". What other migration should happen there? On Wed, Jul 14, 2021 at 11:13 AM Chesnay Schepler wrote: > It may be better to not do that to ease the migration to junit5, where > we have to address exactly these usages. > > On 14/07/2021 09:57, Till Rohrmann wrote: > > I actually found > > myself recently, whenever touching a test class, replacing Junit's > > assertThat with Hamcrest's version which felt quite tedious. > > >
Re: [DISCUSS] Address deprecation warnings when upgrading dependencies
@Chesnay - Good question. I think we can be pragmatic there. If you upgrade Jackson, pick a class that uses it and look for the common methods. If everything is fine there, it is probably fine overall. If one or two deprecated method usages are overlooked, no problem, that's not an issue. If a common method gets deprecated, then chance is high you spot it quickly when looking at some Jackson usages. @Yangze - Agreed, we should do the same internally and make sure we keep our own API use and examples up to date. On Wed, Jul 14, 2021 at 11:00 AM Chesnay Schepler wrote: > How do you propose to do this in practice? > Let's say I bump jackson, how would I find all new usages of deprecated > APIs? > Build it locally and grep the maven output for jackson? > > On 14/07/2021 10:51, Yangze Guo wrote: > > +1 for fixing deprecation warnings when bumping/changing dependencies. > > > > Not only for the dependencies, we also use the deprecated API of Flink > > itself in `flink-examples` and the document, e.g. the #writeAsText. I > > think it would be good to do a clean-up for usability. WDYT? > > > > Best, > > Yangze Guo > > > > On Wed, Jul 14, 2021 at 3:57 PM Till Rohrmann > wrote: > >> I think this suggestion makes a lot of sense, Stephan. +1 for fixing > >> deprecation warnings when bumping/changing dependencies. I actually > found > >> myself recently, whenever touching a test class, replacing Junit's > >> assertThat with Hamcrest's version which felt quite tedious. > >> > >> Cheers, > >> Till > >> > >> On Tue, Jul 13, 2021 at 6:15 PM Stephan Ewen wrote: > >> > >>> Hi all! > >>> > >>> I would like to propose that we make it a project standard that when > >>> upgrading a dependency, deprecation issues arising from that need to be > >>> fixed in the same step. If the new dependency version deprecates a > method > >>> in favor of another method, all usages in the code need to be replaced > >>> together with the upgrade. > >>> > >>> We are accumulating deprecated API uses over time, and it floods logs > and > >>> IDEs with deprecation warnings. I find this is a problem, because the > >>> irrelevant warnings more and more drown out the actually relevant > warnings. > >>> And arguably, the deprecation warning isn't fully irrelevant, it can > cause > >>> problems in the future when the method is actually removed. > >>> We need the general principle that a change leaves the codebase in at > least > >>> as good shape as before, otherwise things accumulate over time and the > >>> overall quality goes down. > >>> > >>> The concrete example that motivated this for me is the JUnit dependency > >>> upgrade. Pretty much every test I looked at recently is quite yellow > (due > >>> to junit Matchers.assertThat being deprecated in the new JUnit > version). > >>> This is easily fixed (even a string replace and spotless:apply goes a > long > >>> way), so I would suggest we try and do these things in one step in the > >>> future. > >>> > >>> Curious what other committers think about this suggestion. > >>> > >>> Best, > >>> Stephan > >>> > >
Re: [DISCUSS] FLIP-182: Watermark alignment
@Eron Wright The per-split watermarks are the default in the new source interface (FLIP-27) and come for free if you use the SplitReader. Based on that, it is also possible to unsubscribe individual splits to solve the alignment in the case where operators have multiple splits assigned. Piotr and I already discussed that, but concluded that the implementation of that is largely orthogonal. I am a bit worried, though, that if we release and advertise the alignment without handling this case, we create a surprise for quite a few users. While this is admittedly valuable for some users, I think we need to position this accordingly. I would not fully advertise this before we have the second part implemented as well. On Mon, Jul 12, 2021 at 7:18 PM Eron Wright wrote: > The notion of per-split watermarks seems quite interesting. I think the > idleness feature could benefit from a per-split approach too, because > idleness is typically related to whether any splits are assigned to a given > operator instance. > > > On Mon, Jul 12, 2021 at 3:06 AM 刘建刚 wrote: > > > +1 for the source watermark alignment. > > In the previous flink version, the source connectors are different in > > implementation and it is hard to make this feature. When the consumed > data > > is not aligned or consuming history data, it is very easy to cause the > > unalignment. Source alignment can resolve many unstable problems. > > > > Seth Wiesman 于2021年7月9日周五 下午11:25写道: > > > > > +1 > > > > > > In my opinion, this limitation is perfectly fine for the MVP. Watermark > > > alignment is a long-standing issue and this already moves the ball so > far > > > forward. > > > > > > I don't expect this will cause many issues in practice, as I understand > > it > > > the FileSource always processes one split at a time, and in my > > experience, > > > 90% of Kafka users have a small number of partitions scale their > > pipelines > > > to have one reader per partition. Obviously, there are larger-scale > Kafka > > > topics and more sources that will be ported over in the future but I > > think > > > there is an implicit understanding that aligning sources adds latency > to > > > pipelines, and we can frame the follow-up "per-split" alignment as an > > > optimization. > > > > > > On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski < > piotr.nowoj...@gmail.com> > > > wrote: > > > > > > > Hey! > > > > > > > > A couple of weeks ago me and Arvid Heise played around with an idea > to > > > > address a long standing issue of Flink: lack of watermark/event time > > > > alignment between different parallel instances of sources, that can > > lead > > > to > > > > ever growing state size for downstream operators like WindowOperator. > > > > > > > > We had an impression that this is relatively low hanging fruit that > can > > > be > > > > quite easily implemented - at least partially (the first part > mentioned > > > in > > > > the FLIP document). I have written down our proposal [1] and you can > > also > > > > check out our PoC that we have implemented [2]. > > > > > > > > We think that this is a quite easy proposal, that has been in large > > part > > > > already implemented. There is one obvious limitation of our PoC. > Namely > > > we > > > > can only easily block individual SourceOperators. This works > perfectly > > > fine > > > > as long as there is at most one split per SourceOperator. However it > > > > doesn't work with multiple splits. In that case, if a single > > > > `SourceOperator` is responsible for processing both the least and the > > > most > > > > advanced splits, we won't be able to block this most advanced split > for > > > > generating new records. I'm proposing to solve this problem in the > > future > > > > in another follow up FLIP, as a solution that works with a single > split > > > per > > > > operator is easier and already valuable for some of the users. > > > > > > > > What do you think about this proposal? > > > > Best, Piotrek > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > > > [2] https://github.com/pnowojski/flink/commits/aligned-sources > > > > > > > > > >
[DISCUSS] Address deprecation warnings when upgrading dependencies
Hi all! I would like to propose that we make it a project standard that when upgrading a dependency, deprecation issues arising from that need to be fixed in the same step. If the new dependency version deprecates a method in favor of another method, all usages in the code need to be replaced together with the upgrade. We are accumulating deprecated API uses over time, and it floods logs and IDEs with deprecation warnings. I find this is a problem, because the irrelevant warnings more and more drown out the actually relevant warnings. And arguably, the deprecation warning isn't fully irrelevant, it can cause problems in the future when the method is actually removed. We need the general principle that a change leaves the codebase in at least as good shape as before, otherwise things accumulate over time and the overall quality goes down. The concrete example that motivated this for me is the JUnit dependency upgrade. Pretty much every test I looked at recently is quite yellow (due to junit Matchers.assertThat being deprecated in the new JUnit version). This is easily fixed (even a string replace and spotless:apply goes a long way), so I would suggest we try and do these things in one step in the future. Curious what other committers think about this suggestion. Best, Stephan
Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism
Before jumping into the designs of other mechanisms, can we clarify whether adjusting checkpointing is really the right approach here? What about storing empty state in the checkpoints, so that any recovery simply does a full replay of the input. Regarding the hybrid batch/streaming execution: The nice part about this would be that no checkpoints are necessary during the batch phase. On Tue, Jul 13, 2021 at 5:51 AM Senhong Liu wrote: > Hi Stephan, > > Thank you so much for replying and suggesting! Follow by your question, I > would give some explanation and new thoughts. > > (1) More detailed info about CDC use case. > > In the previous design of FLINK-CDC, they would start a full-table scanning > at the beginning by holding a read-write lock. Taking a checkpoint in the > middle would be meaningless since there is no guarantee that data would not > be changed during recovery. > > (2) Could hybrid batch/streaming execution solve the problem? > > It is acceptable to me and actually, the current Flink-CDC is actually > implementing a new feature like this[1][2]. But a large batch size could > still fail the checkpoint frequently after all and it could still be > confusing for users to understand what's going on. > > (3) Making the checkpoint more controllable and reliable. > > Overall, I am also looking for a way of making the checkpoint more > controllable and reliable, since hybrid batch/streaming execution is also a > tricky way of controlling the checkpoint. But maybe it is not configurable > or developable for users to implement for their specific use case. > > However, combining with your opinion, I am thinking of designing a set of > REST API to control the checkpoint scheduler might be more acceptable. > > > [1] > https://github.com/debezium/debezium-design-documents/blob/main/DDD-3.md > [2] > > https://github.com/ververica/flink-cdc-connectors/commit/c6ca6c187471b538a9774258d2572194e1bb855b > > Stephan Ewen 于2021年7月13日周二 上午1:25写道: > > > Hi! > > > > Thanks for writing this FLIP, and interesting idea. > > > > I would like to understand a bit better why exactly we need this, and > what > > our alternative options are. My main concerns are the following: > > > > *(1) Can we achieve this without changing the checkpointing mechanism?* > > > > The checkpoint mechanism is already complex and it is super sensitive (a > > bug there threatens every user with data loss). Such components must stay > > as minimal as possible. When there is a way to solve this outside > > checkpointing, then that should be the default approach. > > > > To check that, I would really like to understand this specific CDC use > case > > a bit more. If I understand it correctly, the issue is that checkpoints > are > > not possible while the initial database snapshot is ingested, before the > > actual Change-Data processing starts. > > > > (a) What is the reason that no checkpoint is possible during this time? > Why > > is there no way to store in the checkpoint the position of the snapshot > > ingestion? Is it because the snapshots are streamed in from a JDBC query > > and that is not deterministic, meaning retrying the query yields > different > > rows (or order of rows)? > > > > (b) If there can be no checkpoint progress during the snapshot ingestion > > and all checkpoints are rejected, what prevents us from just storing an > > empty state in the checkpoint? Meaning the system may have taken a bunch > of > > checkpoints, but any recovery would start the source from the beginning. > > Is there some concern about not emitting data during that database > snapshot > > reading phase? > > > > (c) There is quite some work in the direction of blending batch and > > streaming execution, meaning having an initial batch execution step for > > some data (like the backlog in Kafka, or a DB snapshot) and then > switching > > to streaming execution for the real-time stream (new records in Kafka, or > > CDC records). If we focus on advancing that, we get the behavior that all > > the initial data is processed in one batch (no intermediate checkpoint), > > and we also get the performance speedup provided by not having to work > with > > RocksDB for the batch step. I think this is where the future is. > > > > Regarding the case to not checkpoint in-between transaction markers: I > > don't know how often these occur, but I would expect potentially very > > often. If we reject whenever some operator is in-between transaction > > markers, we will reject very many (possibly almost all) checkpoints once > we > > get to a certain
Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism
Hi! Thanks for writing this FLIP, and interesting idea. I would like to understand a bit better why exactly we need this, and what our alternative options are. My main concerns are the following: *(1) Can we achieve this without changing the checkpointing mechanism?* The checkpoint mechanism is already complex and it is super sensitive (a bug there threatens every user with data loss). Such components must stay as minimal as possible. When there is a way to solve this outside checkpointing, then that should be the default approach. To check that, I would really like to understand this specific CDC use case a bit more. If I understand it correctly, the issue is that checkpoints are not possible while the initial database snapshot is ingested, before the actual Change-Data processing starts. (a) What is the reason that no checkpoint is possible during this time? Why is there no way to store in the checkpoint the position of the snapshot ingestion? Is it because the snapshots are streamed in from a JDBC query and that is not deterministic, meaning retrying the query yields different rows (or order of rows)? (b) If there can be no checkpoint progress during the snapshot ingestion and all checkpoints are rejected, what prevents us from just storing an empty state in the checkpoint? Meaning the system may have taken a bunch of checkpoints, but any recovery would start the source from the beginning. Is there some concern about not emitting data during that database snapshot reading phase? (c) There is quite some work in the direction of blending batch and streaming execution, meaning having an initial batch execution step for some data (like the backlog in Kafka, or a DB snapshot) and then switching to streaming execution for the real-time stream (new records in Kafka, or CDC records). If we focus on advancing that, we get the behavior that all the initial data is processed in one batch (no intermediate checkpoint), and we also get the performance speedup provided by not having to work with RocksDB for the batch step. I think this is where the future is. Regarding the case to not checkpoint in-between transaction markers: I don't know how often these occur, but I would expect potentially very often. If we reject whenever some operator is in-between transaction markers, we will reject very many (possibly almost all) checkpoints once we get to a certain scale (1000s of parallel operators). I think we need a different approach there, for example first grouping events into transaction sessions, or so. *(2) Inability to Checkpoint shouldn't become a first-class concept.* I think this is really a question of Flink design principles and direction. I believe we need to push Flink to a direction that it can always checkpoint, and more frequently and more reliably than at the moment. Various ongoing efforts move the system in that direction, giving it the ability to checkpoint always (async sources, async sinks, non-blocking mailbox), more predictably (unaligned checkpoints) and more frequently (log-based checkpoints). That is something that makes it much easier to operate the system. When checkpoints become infrequent or unpredictable in the interval (when they are not reliable), it becomes a big operational problem. If we start assuming that operators can arbitrarily reject checkpoints, we will easily get into situations where many checkpoints are rejected and it takes quite a while until a checkpoint can happen. There are many jobs with 1000s of operators, and when one of them rejects a checkpoint, the checkpoint as a whole fails. Letting operators decide when to take checkpoints is a cool property that makes many things easier, but I am skeptical whether this is compatible with global checkpoints. Not having frequent checkpoints (but very infrequent ones) is really more of a batch situation, and that brings me back to the point above: I think a really well-working solution for this would be the hybrid batch/streaming execution. For that reason, I am very skeptical that we should add a first-class API that suggests that a checkpoint happens only sometimes, when everyone happens to agree to it. I think that sends the wrong signal, both to developer direction, and to users in how to implement their applications. Rejecting checkpoints should be a rare thing, so not something an API suggests can happen all the time. Rejecting checkpoints for a few times at the beginning of a job might also be feasible, when later checkpoint generally succeed, but then again, the proposed API does not suggest that, it suggests any checkpoint is always rejectably by all operators. If we really need checkpoint rejection, I think Piotr's idea goes into a pretty good direction: Having a "RejectionException" that leads to a checkpoint abort in the regular way, but doesn't increment the failure counter. *Conclusion* Because of the sensitivity of the checkpoint mechanism, and because the API proposed here suggests a behavior of
[jira] [Created] (FLINK-23301) StateFun HTTP Ingress
Stephan Ewen created FLINK-23301: Summary: StateFun HTTP Ingress Key: FLINK-23301 URL: https://issues.apache.org/jira/browse/FLINK-23301 Project: Flink Issue Type: Sub-task Components: Stateful Functions Reporter: Stephan Ewen The HTTP ingress would start an HTTP Server at a specified port. The HTTP server would only handle _POST_ requests. The target function is represented by the path to which the request is made, the message contents is the body of the POST request. The following example would send an empty message to the function with the address \{{namespace='example', type='greeter', id='Igal'}}. {code} curl -X POST http://statefun-ingress:/in/example/greeter/Igal POST /in/example/greeter/Igal HTTP/1.1 Content-Length: 0 {code} The example below would send empty message of type 'statefun/string' to the function with the address \{{namespace='example', type='greeter', id='Elisa'}} and the message contents\{{"{numTimesToGreet: 5}"}}. curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d "\{numTimesToGreet: 5}" http://statefun-ingress:/in/example/greeter/Elisa POST /in/example/greeter/Elisa HTTP/1.1 Content-Type: text/plain; charset=UTF-8 Content-Length: 20 {numTimesToGreet: 5} {code} h3. Data Types The content type (mime type) specified in the request header of the HTTP request will be directly mapped to the statefun types. For example, a \{{Content-Type: io.statefun.tyes/int}} will set the type of the message to \{{io.statefun.tyes/int}}. As a special case, we map the content type \{{text/plain}} to \{{io.statefun.tyes/string}}, to make simple cases and examples work more seamlessly. The following examples would send a message to a function that expectes a ProtoBuf encoded type \{{Greeting}} registerd in StateFun as \{{example/greeting}}. {code} > curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d > "\{numTimesToGreet: 5}" > CONTENTS=`echo 'numTimesToGreet: 5' | ./protoc --encode Greeting > example_types.proto` > echo $CONTENTS | curl -X POST -H "Content-Type: example/greeting" > --data-binary @- http://statefun-ingress:/in/example/greeter/Bobby {code} h3. Sender and Responses We want to support round-trip-style interactions, meaning posting a request and receiving a response. Given the async messaging nature of StateFun, this might not be necessarily in one HTTP request which immediately gives you the corresponsing response. Instead, it can be in issuing (POST) a request to the HTTP ingress and polling (GET) a response from an associated HTTP Egress. To support these kind of patterns, the HTTP ingress will assign a random request correlation ID in the HTTP response. Furthermore, the ingress will optionally set the \{{sender()}} field of the created message to reference a configured associated egress. The ingress config woud add an entry referencing the egress (like \{{'paired_egress_name: httpout'}}). {code} > curl -X POST -i http://statefun-ingress:/in/example/greeter/Igal POST /in/example/greeter/Igal HTTP/1.1 Content-Length: 0 HTTP/1.1 200 OK StateFun-Request-Correlation-ID: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5 Content-Length: 0 {code} The created message would have no body, but would have the \{{sender() = {{egress/httpout/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5}}. _Note: We would need to extend the message address scheme to be able to reference egresses. The egress itself can grab the correlation ID from the ID part of the address, because the HTTP egres doesn't use that field (in fact, no egress currently interprets the ID)._ h3. Singleton Instantiation To avoid port conflicts, we need to do a singleton instantiation per JVM. This can be achieved by using a statically referenced context to hold the instantiated servier and a reference to the In the future, we can look into extending this to avoid setup/teardown when operators are cancelled for recovery. The server would then live as long as the StateFun application (job) lives (or more precisely, as long as the slot lives, which is the duration that the TaskManager is associated with the StateFun deployment - typically the entire lifetime). To achieve that, we would tear down the server in a [shutdown hook of the user-code classloader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L145). Instead of letting the first source set up the server, the first source would register its output as the stream for the server to push messages to. h3. Configuration parameters - Bind host (default 0.0.0.0) - Bind port (default ) - Path (default "in") (for the path in the URL \{{http(s)://:}}) - Egress pair name, for setting the egress that replies sh
[jira] [Created] (FLINK-23281) StateFun - Simplify Getting Stared Experience with HTTP Ingresses and Egresses
Stephan Ewen created FLINK-23281: Summary: StateFun - Simplify Getting Stared Experience with HTTP Ingresses and Egresses Key: FLINK-23281 URL: https://issues.apache.org/jira/browse/FLINK-23281 Project: Flink Issue Type: New Feature Components: Stateful Functions Reporter: Stephan Ewen To make it easier to get started with StateFun, we want to reduce the dependencies on other systems and tools that are currently required to get your first program running. _(For reference, you currently need a docker-compose setup with at least Flink, Kafka, ZooKeeper, and then you need to interact with it using Kafka command line tools (or other clients) to publish messages to the ingress topic.)_ This issue aims to add simple pre-packaged HTTP ingresses/egresses that can be used for examples and exploration, and can be used with standard tools (like \{{curl}}). That reduces the barrier to exploration. _(Citing @ssc here: you have roughlyone lunchbreak of time to get a developer excited. Many devs just play around for about 45 minutes, and when they don't see some preliminary success with simple examples, they drop the exploration.)_ An example interaction could be: {code} > curl -X POST -i http://:/in/example/greeter/Igal HTTP/2 200 request-id: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5 > curl -X GET > http://:/out/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5 Hello for the 1337th time... {code} *Note:* The HTTP Ingress/Egress here are different from the HTTP state access from FLINK-23261. State requests against the state access API (FLINK-23261) only interacts with state entries and never invoke functions. In contrast, messages against the here-proposed Ingress/Egress send messages to functions like any other ingress. This is the umbrella issue. Dedicated tasks for ingress/egress and request correlation are in the subtasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23261) StateFun - HTTP State Access Interface
Stephan Ewen created FLINK-23261: Summary: StateFun - HTTP State Access Interface Key: FLINK-23261 URL: https://issues.apache.org/jira/browse/FLINK-23261 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Stephan Ewen h2. Functionality To improve operations of StateFun applications, we should offer an interface to query and manipulate state entries. This can be used for exploration and debugging purposes, and to repair state if the application state needs to be corrected. To make this simple, I would suggest an HTTP REST interface: - GET: read state entry for key - PUT: set/overwrite state for key - DELETE: drop state entry for key The URLs could be: {{http(s)://statefun-service/state}}, where the string {{//}} is the fully-qualified address of the target, and the {{statename}} is the name under which that persistent state is stored. Keys are always UTF-8 strings in StateFun, so they can be encoded in the URL. For the responses, we would use the common codes 200 for GET/PUT/DELETE success and 404 for GET/DELETE not found. The state values, as returned from GET requests, would be generally just the bytes, and not interpreted by this request handling. The integrate of the StateFun type system and HTTP content types (mime types) is up for further discussion. One option is set the content type response header to {{"statefun/"}}, where all non-simple types map to {{Content-Type: application/octet-stream}}. We may make an exception for strings which could be returned as {{Content-Type: text/plain; charset=UTF-8}}. Later refinement is possible, like auto-stringifying contents when the request indicates to only accept {{text/plain}} responses. h2. Failure Guarantees The initial failure guarantees for PUT/DELETE would be none - the requests would be handled best effort. We can easily extend this later in one of two ways: - Delay responses to the HTTP requests until the next checkpoint is complete. That makes synchronous interaction easy and sounds like a good match for a more admin-style interface. - Return the current checkpoint ID and offer a way to poll until the next checkpoint is completed. This avoid blocking requests, but puts more burden on the caller. Given the expected nature of the use cases for PUT/DELETE are more of a "admin/fix" nature, I would suggest to go with synchronous requests, for simplicity. h2. Implementation There are two options to implement this: (1) A Source/Sink (Ingress/Egress) pair (2) An Operator Coordinator with HTTP Requests *Option (1) - Source/Sink pair* We would implement an specific source that is both an ingress and an egress. The source would spawn a HTTP server (singleton per TM process). Requests would be handled as follows: - Am HTTP request gets a generated correlation-ID. - The source injects a new message type (a "control message") into the stream. That message holds the Correlation-ID, the parallel subtask index of the originating source, and the target address and state name. - The function dispatcher handled these message in a special way, retrieving the state and sending an Egress message with the Correlation-ID to the parallel subtask of the egress as indicated by the message's subtask index. - The Egress (which is the same instance as the ingress source) uses to correlation ID to respond to the request. Advantages: - No changes necessary in Flink - Might sustain higher throughput, due to multiple HTTP endpoints Disadvantages: - Additional HTTP servers and ports require more setup (like service definitions on K8s). - Need to introduce new control message type and extend function dispatcher to handle them. - Makes a hard assumption that sources run on all slots. Needs "ugly" singleton hack to start only one server per TM process. *Option (2) - Operator Coordinator* Operator Coordinators are instances that run on the {{JobManager}} and can communicate with the Tasks via RPC. Coordinators can receive calls from HTTP handlers at the JobManager's HTTP endpoint. An example for this is the Result Fetching through HTTP/OperatorCoordinator requests. We would need a patch to Flink to allow registering custom URLs and passing the path as a parameter to the request. The RPCs can be processed in the mailbox on the Tasks, making them thread safe. This would also completely avoid the round-trip (source-to-sink) problem, the tasks simply need to send a response back to the RPC. Advantages: - Reuse existing HTTP Endpoint and port. No need to have an additional HTTP server and port and service, for this admin-style requests, this approach re-uses Flink's admin HTTP endpoint. - No need for singleton HTTP Server logic in Tasks - Does require the assumption that all TMs run an instance of all ope
Re: [DISCUSS] Feedback Collection Jira Bot
ls > > > "stale-assigned" and "pull-request-available" in order to review those > > with > > > priority. That's also why I am not a fan of excluding tickets with > > > "pull-request-available" from the bot. The bot can help to make these > > > tickets visible to reviewers. > > > > > > @Jing Zhang: That's a problem. We should try to change the permissions > > > accordingly or need to find a different solution. > > > > > > @Piotr, Kurt: Instead of closing tickets, we could introduce an > > additional > > > priority like "Not a Priority" to which we move tickets. No ticket > would > > be > > > closed automatically. > > > > > > Overall, the following changes could resolve most of the concerns, I > > > believe: > > > > > > * Ignore tickets with a fixVersion for all rules but the > stale-unassigned > > > role. > > > > > > * We change the time intervals as follows, accepting reality a bit more > > ;) > > > > > > * stale-assigned only after 30 days (instead of 14 days) > > > * stale-critical only after 14 days (instead of 7 days) > > > * stale-major only after 60 days (instead of 30 days) > > > > > > * Introduce "Not a Priority" priority and stop closing tickets. > > > > > > * Change default priority for new tickets of Flink project to "Minor" > > > > > > The measures, I proposed above, still try to achieve the goals > mentioned > > > and agreed upon in the original discussion thread, which were the > > > following: > > > > > > > > >- > > > > > >clearer communication and expectation management with the community > > >- > > > > > > a user or contributor should be able to judge the urgency of a > > ticket > > > by its priority > > > - > > > > > > if a ticket is assigned to someone the expectation that someone > is > > > working on it should hold > > > - > > > > > >generally reduce noise in Jira > > >- > > > > > >reduce overhead of committers to ask about status updates of > > >contributions or bug reports > > >- > > > > > > “Are you still working on this?” > > > - > > > > > > “Are you still interested in this?” > > > - > > > > > > “Does this still happen on Flink 1.x?” > > > - > > > > > > “Are you still experiencing this issue?” > > > - > > > > > > “What is the status of the implementation”? > > > - > > > > > >while still encouraging users to add new tickets and to leave > feedback > > >about existing tickets > > > > > > > > > Kurt, Stephan, if you'd like to change the bot to "just close very old > > > tickets", I suggest you start a separate discussion and subsequent > voting > > > thread. > > > > > > Best, > > > > > > Konstantin > > > > > > > > > On Wed, Jun 30, 2021 at 9:01 AM Kurt Young wrote: > > > > > > > +1 to Stephan's opinion, with just one minor difference. For my > > > experience > > > > and a project > > > > as big as Flink, picking up an issue created 1-2 years ago seems > normal > > > to > > > > me. To be more > > > > specific, during the blink planner merge, I created lots of clean up > & > > > > refactor issues, trying > > > > to make the code be more clean. I haven't had a chance to resolve all > > > these > > > > but I think they are > > > > still good improvements. Thus I would propose we don't close any > stall > > > > issues for at least 2 years. > > > > > > > > Best, > > > > Kurt > > > > > > > > > > > > On Wed, Jun 30, 2021 at 7:22 AM Stephan Ewen > wrote: > > > > > > > > > Being a bit late to the party, and don't want to ask to change > > > > everything, > > > > > just maybe some observation. > > > > > > > > > > My main observation and concern is still that this puts pressure > and > > > > > confusion on contributors, which are mostly blocked on committers > for > &
Re: [DISCUSS] Feedback Collection Jira Bot
Being a bit late to the party, and don't want to ask to change everything, just maybe some observation. My main observation and concern is still that this puts pressure and confusion on contributors, which are mostly blocked on committers for reviews, or are taking tickets as multi-week projects. I think it is not a great experience for contributors, when they are already unsure why their work isn't getting the attention from committers that they hoped for, to then see issues unassigned or deprioritized automatically. I think we really need to weigh this discouragement of contributors against the desire for a tidy ticket system. I also think by now this isn't just a matter of phrasing the bot's message correctly. Auto unassignment and deprioritization sends a subtle message that jira resolution is a more important goal than paying attention to contributors (at least I think that is how it will be perceived by many). Back to the original motivation, to not have issues lying around forever, ensuring there is closure eventually. For that, even much longer intervals would be fine. Like pinging every three months, closing after three pings - would resolve most tickets in a year, which is not too bad in the scope of a project like Flink. Many features/wishes easily move to two releases in the future, which is almost a year. We would get rid of long dead tickets and interfere little with current tickets. Contributors can probably understand ticket closing after a year of inactivity. I am curious if a very simple bot that really just looks at stale issues (no comment/change in three months), pings the issue/reporter/assignee/watchers and closes it after three pings would do the job. We would get out of the un-assigning business (which can send very tricky signals) and would rely on reporters/assignees/watchers to unassign if they see that the contributor abandoned the issue. With a cadence of three months for pinging, this isn't much work for the ones that get pinged. Issues where we rely on faster handling are probably the ones where committers have a stake in getting those into an upcoming release, so these tend to be watched anyways. On Wed, Jun 23, 2021 at 2:39 PM JING ZHANG wrote: > Hi Konstantin, Chesnay, > > > I would like it to not unassign people if a PR is open. These are > > usually blocked by the reviewer, not the assignee, and having the > > assignees now additionally having to update JIRA periodically is a bit > > like rubbing salt into the wound. > > I agree with Chesnay about not un-assign an issue if a PR is open. > Besides, Could assignees remove the "stale-assigned" tag by themself? It > seems assignees have no permission to delete the tag if the issue is not > created by themselves. > > Best regards, > JING ZHANG > > Konstantin Knauf 于2021年6月23日周三 下午4:17写道: > > > > I agree there are such tickets, but I don't see how this is addressing > my > > concerns. There are also tickets that just shouldn't be closed as I > > described above. Why do you think that duplicating tickets and losing > > discussions/knowledge is a good solution? > > > > I don't understand why we are necessarily losing discussion/knowledge. > The > > tickets are still there, just in "Closed" state, which are included in > > default Jira search. We could of course just add a label, but closing > seems > > clearer to me given that likely this ticket will not get comitter > attention > > in the foreseeable future. > > > > > I would like to avoid having to constantly fight against the bot. It's > > already responsible for the majority of my daily emails, with quite > little > > benefit for me personally. I initially thought that after some period of > > time it will settle down, but now I'm afraid it won't happen. > > > > Can you elaborate which rules you are running into mostly? I'd rather > like > > to understand how we work right now and where this conflicts with the > Jira > > bot vs slowly disabling the jira bot via labels. > > > > On Wed, Jun 23, 2021 at 10:00 AM Piotr Nowojski > > wrote: > > > > > Hi Konstantin, > > > > > > > In my opinion it is important that we close tickets eventually. There > > are > > > a > > > > lot of tickets (bugs, improvements, tech debt) that over time became > > > > irrelevant, out-of-scope, irreproducible, etc. In my experience, > these > > > > tickets are usually not closed by anyone but the bot. > > > > > > I agree there are such tickets, but I don't see how this is addressing > my > > > concerns. There are also tickets that just shouldn't be closed as I > > > described above. Why do you think that duplicating tickets and losing > > > discussions/knowledge is a good solution? > > > > > > I would like to avoid having to constantly fight against the bot. It's > > > already responsible for the majority of my daily emails, with quite > > little > > > benefit for me personally. I initially thought that after some period > of > > > time it will settle down, but now I'm afraid it won't
Re: [DISCUSS] FLIP-172: Support custom transactional.id prefix in FlinkKafkaProducer
Sounds good from my side, please go ahead. On Fri, Jun 25, 2021 at 5:31 PM Wenhao Ji wrote: > Thanks Stephan and Piotr for your replies. It seems that there is no > problem or concern about this feature. If there is no further > objection, I will start a vote thread for FLIP-172. > > Thanks, > Wenhao > > On Wed, Jun 23, 2021 at 3:41 PM Piotr Nowojski > wrote: > > > > Hi, > > > > +1 from my side on this idea. I do not see any problems that could be > > caused by this change. > > > > Best, > > Piotrek > > > > śr., 23 cze 2021 o 08:59 Stephan Ewen napisał(a): > > > > > The motivation and the proposal sound good to me, +1 from my side. > > > > > > Would be good to have a quick opinion from someone who worked > specifically > > > with Kafka, maybe Becket or Piotr? > > > > > > Best, > > > Stephan > > > > > > > > > On Sat, Jun 12, 2021 at 9:50 AM Wenhao Ji > wrote: > > > > > >> Hi everyone, > > >> > > >> I would like to open this discussion thread to take about the FLIP-172 > > >> < > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-172%3A+Support+custom+transactional.id+prefix+in+FlinkKafkaProducer > > >> >, > > >> which aims to provide a way to support specifying a custom > > >> transactional.id > > >> in the FlinkKafkaProducer class. > > >> > > >> I am looking forwards to your feedback and suggestions! > > >> > > >> Thanks, > > >> Wenhao > > >> > > > >
Re: [DISCUSS] Incrementally deprecating the DataSet API
Thanks for writing this up, this also reflects my understanding. I think a blog post would be nice, ideally with an explicit call for feedback so we learn about user concerns. A blog post has a lot more reach than an ML thread. Best, Stephan On Wed, Jun 23, 2021 at 12:23 PM Timo Walther wrote: > Hi everyone, > > I'm sending this email to make sure everyone is on the same page about > slowly deprecating the DataSet API. > > There have been a few thoughts mentioned in presentations, offline > discussions, and JIRA issues. However, I have observed that there are > still some concerns or different opinions on what steps are necessary to > implement this change. > > Let me summarize some of the steps and assumpations and let's have a > discussion about it: > > Step 1: Introduce a batch mode for Table API (FLIP-32) > [DONE in 1.9] > > Step 2: Introduce a batch mode for DataStream API (FLIP-134) > [DONE in 1.12] > > Step 3: Soft deprecate DataSet API (FLIP-131) > [DONE in 1.12] > > We updated the documentation recently to make this deprecation even more > visible. There is a dedicated `(Legacy)` label right next to the menu > item now. > > We won't deprecate concrete classes of the API with a @Deprecated > annotation to avoid extensive warnings in logs until then. > > Step 4: Drop the legacy SQL connectors and formats (FLINK-14437) > [DONE in 1.14] > > We dropped code for ORC, Parque, and HBase formats that were only used > by DataSet API users. The removed classes had no documentation and were > not annotated with one of our API stability annotations. > > The old functionality should be available through the new sources and > sinks for Table API and DataStream API. If not, we should bring them > into a shape that they can be a full replacement. > > DataSet users are encouraged to either upgrade the API or use Flink > 1.13. Users can either just stay at Flink 1.13 or copy only the format's > code to a newer Flink version. We aim to keep the core interfaces (i.e. > InputFormat and OutputFormat) stable until the next major version. > > We will maintain/allow important contributions to dropped connectors in > 1.13. So 1.13 could be considered as kind of a DataSet API LTS release. > > Step 5: Drop the legacy SQL planner (FLINK-14437) > [DONE in 1.14] > > This included dropping support of DataSet API with SQL. > > Step 6: Connect both Table and DataStream API in batch mode (FLINK-20897) > [PLANNED in 1.14] > > Step 7: Reach feature parity of Table API/DataStream API with DataSet API > [PLANNED for 1.14++] > > We need to identify blockers when migrating from DataSet API to Table > API/DataStream API. Here we need to estabilish a good feedback pipeline > to include DataSet users in the roadmap planning. > > Step 7: Drop the Gelly library > > No concrete plan yet. Latest would be the next major Flink version aka > Flink 2.0. > > Step 8: Drop DataSet API > > Planned for the next major Flink version aka Flink 2.0. > > > Please let me know if this matches your thoughts. We can also convert > this into a blog post or mention it in the next release notes. > > Regards, > Timo > >
Re: [DISCUSS] Drop Mesos in 1.14
I would prefer to remove Mesos from the Flink core as well. I also had a similar thought as Seth: As far as I know, you can package applications to run on Mesos with "Marathon". That would be like deploying an opaque Flink standalone cluster on Mesos The implication is similar to going from an active integration to a standalone cluster (like from native Flink Kubernetes Application Deployment to a Standalone Application Deployment on Kubernetes): You need to make sure the number of TMs / slots and the parallelism fit together (or use the new reactive mode). Other than that, I think it should work well for streaming jobs. Having a Flink-Marathon template in https://flink-packages.org/ would be a nice thing for Mesos users. @Oleksandr What do you think about that? On Wed, Jun 23, 2021 at 11:31 AM Leonard Xu wrote: > + 1 for dropping Mesos. I checked both commit history and mail list, the > Mesos related issue/user question has been rarely appeared. > > Best, > Leonard > >
Re: [DISCUSS] Do not merge PRs with "unrelated" test failures.
+1 to Xintong's proposal On Wed, Jun 23, 2021 at 1:53 PM Till Rohrmann wrote: > I would first try to not introduce the exception for local builds. It makes > it quite hard for others to verify the build and to make sure that the > right things were executed. If we see that this becomes an issue then we > can revisit this idea. > > Cheers, > Till > > On Wed, Jun 23, 2021 at 4:19 AM Yangze Guo wrote: > > > +1 for appending this to community guidelines for merging PRs. > > > > @Till Rohrmann > > I agree that with this approach unstable tests will not block other > > commit merges. However, it might be hard to prevent merging commits > > that are related to those tests and should have been passed them. It's > > true that this judgment can be made by the committers, but no one can > > ensure the judgment is always precise and so that we have this > > discussion thread. > > > > Regarding the unstable tests, how about adding another exception: > > committers verify it in their local environment and comment in such > > cases? > > > > Best, > > Yangze Guo > > > > On Tue, Jun 22, 2021 at 8:23 PM 刘建刚 wrote: > > > > > > It is a good principle to run all tests successfully with any change. > > This > > > means a lot for project's stability and development. I am big +1 for > this > > > proposal. > > > > > > Best > > > liujiangang > > > > > > Till Rohrmann 于2021年6月22日周二 下午6:36写道: > > > > > > > One way to address the problem of regularly failing tests that block > > > > merging of PRs is to disable the respective tests for the time being. > > Of > > > > course, the failing test then needs to be fixed. But at least that > way > > we > > > > would not block everyone from making progress. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Tue, Jun 22, 2021 at 12:00 PM Arvid Heise > wrote: > > > > > > > > > I think this is overall a good idea. So +1 from my side. > > > > > However, I'd like to put a higher priority on infrastructure then, > in > > > > > particular docker image/artifact caches. > > > > > > > > > > On Tue, Jun 22, 2021 at 11:50 AM Till Rohrmann < > trohrm...@apache.org > > > > > > > > wrote: > > > > > > > > > > > Thanks for bringing this topic to our attention Xintong. I think > > your > > > > > > proposal makes a lot of sense and we should follow it. It will > > give us > > > > > > confidence that our changes are working and it might be a good > > > > incentive > > > > > to > > > > > > quickly fix build instabilities. Hence, +1. > > > > > > > > > > > > Cheers, > > > > > > Till > > > > > > > > > > > > On Tue, Jun 22, 2021 at 11:12 AM Xintong Song < > > tonysong...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > In the past a couple of weeks, I've observed several times that > > PRs > > > > are > > > > > > > merged without a green light from the CI tests, where failure > > cases > > > > are > > > > > > > considered *unrelated*. This may not always cause problems, but > > would > > > > > > > increase the chance of breaking our code base. In fact, it has > > > > occurred > > > > > > to > > > > > > > me twice in the past few weeks that I had to revert a commit > > which > > > > > breaks > > > > > > > the master branch due to this. > > > > > > > > > > > > > > I think it would be nicer to enforce a stricter rule, that no > PRs > > > > > should > > > > > > be > > > > > > > merged without passing CI. > > > > > > > > > > > > > > The problems of merging PRs with "unrelated" test failures are: > > > > > > > - It's not always straightforward to tell whether a test > > failures are > > > > > > > related or not. > > > > > > > - It prevents subsequent test cases from being executed, which > > may > > > > fail > > > > > > > relating to the PR changes. > > > > > > > > > > > > > > To make things easier for the committers, the following > > exceptions > > > > > might > > > > > > be > > > > > > > considered acceptable. > > > > > > > - The PR has passed CI in the contributor's personal workspace. > > > > Please > > > > > > post > > > > > > > the link in such cases. > > > > > > > - The CI tests have been triggered multiple times, on the same > > > > commit, > > > > > > and > > > > > > > each stage has at least passed for once. Please also comment in > > such > > > > > > cases. > > > > > > > > > > > > > > If we all agree on this, I'd update the community guidelines > for > > > > > merging > > > > > > > PRs wrt. this proposal. [1] > > > > > > > > > > > > > > Please let me know what do you think. > > > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [DISCUSS] FLIP-172: Support custom transactional.id prefix in FlinkKafkaProducer
The motivation and the proposal sound good to me, +1 from my side. Would be good to have a quick opinion from someone who worked specifically with Kafka, maybe Becket or Piotr? Best, Stephan On Sat, Jun 12, 2021 at 9:50 AM Wenhao Ji wrote: > Hi everyone, > > I would like to open this discussion thread to take about the FLIP-172 > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-172%3A+Support+custom+transactional.id+prefix+in+FlinkKafkaProducer > >, > which aims to provide a way to support specifying a custom > transactional.id > in the FlinkKafkaProducer class. > > I am looking forwards to your feedback and suggestions! > > Thanks, > Wenhao >
[jira] [Created] (FLINK-23093) Limit number of I/O pool and Future threads in Mini Cluster
Stephan Ewen created FLINK-23093: Summary: Limit number of I/O pool and Future threads in Mini Cluster Key: FLINK-23093 URL: https://issues.apache.org/jira/browse/FLINK-23093 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Stephan Ewen Fix For: 1.14.0 When running tests on CI via the minicluster, the mini cluster typically spawns 100s of I/O threads, both in the MiniCluster I/O pool and in the TM I/O pool. The standard rule for the maximum pool size is 4*num-cores, but the number of cores can be fairly large these days. Various Java versions also mess up core counting when running in containers (JVM container might have been given 2 cores as resource limits, but the JVM counts the system as a whole, like 64/128 cores). This is both a nuisance for debugging, and a big waste of memory (each thread takes by default around 1MB when spawned, so the test JVM wastes 100s of MBs for nothing). I would suggest to set a default of 8 I/O threads for the Mini Cluster. The scaling-with-cores is important for proper TM/JM deployments, but not for the Mini Cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22753) Activate log-based Checkpoints for StateFun
Stephan Ewen created FLINK-22753: Summary: Activate log-based Checkpoints for StateFun Key: FLINK-22753 URL: https://issues.apache.org/jira/browse/FLINK-22753 Project: Flink Issue Type: Sub-task Components: Stateful Functions Reporter: Stephan Ewen Once available, we should use log-based checkpointing in StateFun, to have predictable checkpointing times and predictably low end-to-end exactly-once latencies. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22752) Add robust default state configuration to StateFun
Stephan Ewen created FLINK-22752: Summary: Add robust default state configuration to StateFun Key: FLINK-22752 URL: https://issues.apache.org/jira/browse/FLINK-22752 Project: Flink Issue Type: Sub-task Reporter: Stephan Ewen We aim to reduce the state configuration complexity by applying a default configuration with robust settings, based on lessons learned in Flink. *(1) Always use RocksDB.* _That is already the case._ We keep this for now, as long as the only other alternative are backends with Objects on the heap, which are tricky in terms of predictable JVM performance. RocksDB has a significant performance cost, but more robust behavior. *(2) Activate local recovery by default.* That makes recovery cheao for soft tasks failures and gracefully cancelled tasks. We need to set these options: - {{state.backend.local-recovery: true}} - {{taskmanager.state.local.root-dirs: }} - some local directory that will not possibly be wiped by the OS periodically, so typically some local directory that is not {{/tmp}}, for example {{/local/state/recovery}}. - {{state.backend.rocksdb.localdir: }} - a directory on the same FS / device as above, so that one can create hard links between them (required for RocksDB local checkpoints), for example {{/local/state/rocksdb}}. Flink will most likely adopt this as a default setting as well in the future. It still makes sense to pre-configer a different RocksDB working directory than {{/tmp}}. *(3) Activate partitioned indexes by default.* This may cost minimal performance in some cases, but can avoid massive performance regression in cases where the index blocks no longer fit into the memory cache (may happen more frequently when there are too many ColumnFamilies = states). Set {{state.backend.rocksdb.memory.partitioned-index-filters: true}}. See FLINK-20496 for details. *(4) Increase number of transfer threads by default.* This speeds up state recovery in many cases. The default value in Flink is a bit conservative, to avoid spamming DFS (like HDFS) by default. The more cloud-centric StateFun setups should be safe to use higher default value. Set {{state.backend.rocksdb.checkpoint.transfer.thread.num: 8}}. *(5) Increase RocksDB compaction threads by default.* The number of RocksDB compaction threads is frequently a bottleneck. Increasing it costs virtually nothing and mitigates that bottleneck in most cases. {{state.backend.rocksdb.thread.num: 4}} (this value is chosen under the assumption that there is only one slot). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22751) Reduce JVM Metaspace memory for StateFun
Stephan Ewen created FLINK-22751: Summary: Reduce JVM Metaspace memory for StateFun Key: FLINK-22751 URL: https://issues.apache.org/jira/browse/FLINK-22751 Project: Flink Issue Type: Sub-task Components: Stateful Functions Reporter: Stephan Ewen Flink by default reserves quite a lot of metaspace memory (256 MB) out of the total memory budget, to accommodate applications that load (and reload) a lot of classes. That is necessary because user code is executed directly in the JVM. StateFun by default doesn't execute code in the JVM, so it needs much less Metaspace memory. I would suggest to reduce the Metaspace size to something like 64MB or 96MB by default. {{taskmanager.memory.jvm-metaspace.size: 64m}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22749) Apply a robust default State Backend Configuration
Stephan Ewen created FLINK-22749: Summary: Apply a robust default State Backend Configuration Key: FLINK-22749 URL: https://issues.apache.org/jira/browse/FLINK-22749 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Stephan Ewen We should update the default state backend configuration with default settings that reflect lessons-learned about robust setups. (1) Always use the RocksDB State Backend. That is already the case. (2) Active Partitioned Index filters by default. This may cost some overhead in specific cases, but helps with massive performance regressions when we have too many ColumnFamilies (too many states) such that the cache can no longer hold all index files. We need to add {{state.backend.rocksdb.memory.partitioned-index-filters: true}} to the config. See FLINK-20496 for details. There is a chance that Flink makes this the default in the future as well, then we could remove it again from the StateFun setup. (3) Activate local recovery by default. That should speed up the recovery of all non-hard-crashed TMs by a lot. We need to configure - {{state.backend.local-recovery: true}} - {{taskmanager.state.local.root-dirs}} to some non-temp directory For this to work reliably, we need a local directory that is not periodically wiped by the OS, so we should not rely on the default ({{/tmp}} directory, but set up a dedicated non-temp state directory. Flink will probably make this the default in the future, but having a non-{{/tmp}} directory for the RocksDB and local snapshots makes still a lot of sense. (4) Increase state transfer threads by default, to speed up state restores. Add to the config: {{state.backend.rocksdb.checkpoint.transfer.thread.num: 8}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22741) Hide Flink complexity from Stateful Functions
Stephan Ewen created FLINK-22741: Summary: Hide Flink complexity from Stateful Functions Key: FLINK-22741 URL: https://issues.apache.org/jira/browse/FLINK-22741 Project: Flink Issue Type: New Feature Components: Stateful Functions Affects Versions: statefun-3.0.0 Reporter: Stephan Ewen This is an umbrella issue for various issues to hide and reduce the complexity and surface area (and configuration space) of Apache Flink when using Stateful Functions. The goal of this is to create a setup and configuration that works robustly in the vast majority of settings. Users should not be required to configure anything Flink-specific, other than If this happens at the cost of some minor regression in peak stream throughput, we can most likely stomach that in StateFun, because the performance cost is commonly dominated by the interaction between StateFun cluster and remote function deployments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [Statefun] Truncated Messages in Python workers
Thanks for reporting this, it looks indeed like a potential bug. I filed this Jira for it: https://issues.apache.org/jira/browse/FLINK-22729 Could you share (here ot in Jira) what the stack on the Python Worker side is (for example which HTTP server)? Do you know if the message truncation happens reliably at a certain message size? On Wed, May 19, 2021 at 2:12 PM Jan Brusch wrote: > Hi, > > recently we started seeing the following faulty behaviour in the Flink > Stateful Functions HTTP communication towards external Python workers. > This is only occuring when the system is under heavy load. > > The Java Application will send HTTP Messages to an external Python > Function but the external Function fails to parse the message with a > "Truncated Message Error". Printouts show that the truncated message > looks as follows: > > -- > > > > my.protobuf.MyClass: > > my.protobuf.MyClass: > > my.protobuf.MyClass: > > my.protobuf.MyClass: > -- > > Which leads to the following Error in the Python worker: > > -- > > Error Parsing Message: Truncated Message > > -- > > Either the sender or the receiver (or something in between) seems to be > truncacting some (not all) messages at some random point in the payload. > The source code in both Flink SDKs looks to be correct. We temporarily > solved this by setting the "maxNumBatchRequests" parameter in the > external function definition really low. But this is not an ideal > solution as we believe this adds considerable communication overhead > between the Java and the Python Functions. > > The Stateful Function version is 2.2.2, java8. The Java App as well as > the external Python workers are deployed in the same kubernetes cluster. > > > Has anyone ever seen this problem before? > > Best regards > > Jan > > -- > neuland – Büro für Informatik GmbH > Konsul-Smidt-Str. 8g, 28217 Bremen > > Telefon (0421) 380107 57 > Fax (0421) 380107 99 > https://www.neuland-bfi.de > > https://twitter.com/neuland > https://facebook.com/neulandbfi > https://xing.com/company/neulandbfi > > > Geschäftsführer: Thomas Gebauer, Jan Zander > Registergericht: Amtsgericht Bremen, HRB 23395 HB > USt-ID. DE 246585501 > >
[jira] [Created] (FLINK-22729) Truncated Messages in Python workers
Stephan Ewen created FLINK-22729: Summary: Truncated Messages in Python workers Key: FLINK-22729 URL: https://issues.apache.org/jira/browse/FLINK-22729 Project: Flink Issue Type: Bug Components: Stateful Functions Affects Versions: statefun-2.2.2 Environment: The Stateful Function version is 2.2.2, java8. The Java App as well as the external Python workers are deployed in the same kubernetes cluster. Reporter: Stephan Ewen Fix For: statefun-3.1.0 Recently we started seeing the following faulty behavior in the Flink Stateful Functions HTTP communication towards external Python workers. This is only occurring when the system is under heavy load. The Java Application will send HTTP Messages to an external Python Function but the external Function fails to parse the message with a "Truncated Message Error". Printouts show that the truncated message looks as follows: {code} my.protobuf.MyClass: my.protobuf.MyClass: my.protobuf.MyClass: my.protobuf.MyClass: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefun-Truncated-Messages-in-Python-workers-td43831.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Release 1.13.0, release candidate #2
Glad to hear that outcome. And no worries about the false alarm. Thank you for doing thorough testing, this is very helpful! On Wed, Apr 28, 2021 at 1:04 PM Caizhi Weng wrote: > After the investigation we found that this issue is caused by the > implementation of connector, not by the Flink framework. > > Sorry for the false alarm. > > Stephan Ewen 于2021年4月28日周三 下午3:23写道: > > > @Caizhi and @Becket - let me reach out to you to jointly debug this > issue. > > > > I am wondering if there is some incorrect reporting of failed events? > > > > On Wed, Apr 28, 2021 at 8:53 AM Caizhi Weng > wrote: > > > > > -1 > > > > > > We're testing this version on batch jobs with large (600~1000) > > parallelisms > > > and the following exception messages appear with high frequency: > > > > > > 2021-04-27 21:27:26 > > > org.apache.flink.util.FlinkException: An OperatorEvent from an > > > OperatorCoordinator to a task was lost. Triggering task failover to > > ensure > > > consistency. Event: '[NoMoreSplitEvent]', targetTask: - > > > execution #0 > > > at > > > > > > > > > org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.lambda$sendEvent$0(SubtaskGatewayImpl.java:81) > > > at > > > > > > > > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > > > at > > > > > > > > > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > > > at > > > > > > > > > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > > > at > > > > > > > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > > > at > > > > > > > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > > > at > > > > > > > > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > > > at > > > > > > > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > > > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > > > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > > > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > > > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > > > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > > > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > at > > > > > > > > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > at > > > > > > > > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > > Becket Qin is investigating this issue. > > > > > >
Re: [VOTE] Release 1.12.3, release candidate #1
A quick heads-up: A fix from 1.13.0 that I backported to 1.12.3 is apparently causing some issues at larger batch scale. We are investigating this, but it would affect this release as well. Please see the mail thread "[VOTE] Release 1.13.0, release candidate #2" for details. If we want to make sure this release isn't affected, we revert that issue. The tickets are: - https://issues.apache.org/jira/browse/FLINK-21996 - https://issues.apache.org/jira/browse/FLINK-18071 On Wed, Apr 28, 2021 at 7:59 AM Robert Metzger wrote: > +1 (binding) > > - started cluster, ran example job on macos > - sources look fine > - Eyeballed the diff: > https://github.com/apache/flink/compare/release-1.12.2...release-1.12.3-rc1 > . > According to "git diff release-1.12.2...release-1.12.3-rc1 '*.xml'", there > was only one external dependency change (snappy-java, which seems to be > properly reflected in the NOTICE file) > - the last CI run of the "release-1.12" branch is okay-ish: > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17315=results > (this one failure occurred: > https://issues.apache.org/jira/browse/FLINK-20950) > > > On Tue, Apr 27, 2021 at 5:03 PM Dawid Wysakowicz > wrote: > > > +1 (binding) > > > > - Verified checksums and signatures > > - Reviewed the website PR > > - Built from sources > > - verified dependency version upgrades and updates in NOTICE files > > compared to 1.12.2 > > - started cluster and run WordCount example in BATCH mode and everything > > looked good > > > > On 23/04/2021 23:52, Arvid Heise wrote: > > > Hi everyone, > > > Please review and vote on the release candidate #1 for the version > > 1.12.3, > > > as follows: > > > [ ] +1, Approve the release > > > [ ] -1, Do not approve the release (please provide specific comments) > > > > > > > > > The complete staging area is available for your review, which includes: > > > * JIRA release notes [1], > > > * the official Apache source release and binary convenience releases to > > be > > > deployed to dist.apache.org [2], which are signed with the key with > > > fingerprint 476DAA5D1FF08189 [3], > > > * all artifacts to be deployed to the Maven Central Repository [4], > > > * source code tag "release-1.2.3-rc3" [5], > > > * website pull request listing the new release and adding announcement > > blog > > > post [6]. > > > > > > The vote will be open for at least 72 hours. It is adopted by majority > > > approval, with at least 3 PMC affirmative votes. > > > > > > Thanks, > > > Your friendly release manager Arvid > > > > > > [1] > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349691 > > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.3-rc1/ > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > > [4] > > https://repository.apache.org/content/repositories/orgapacheflink-1419 > > > [5] https://github.com/apache/flink/releases/tag/release-1.12.3-rc1 > > > [6] https://github.com/apache/flink-web/pull/437 > > > > > > > >
Re: [VOTE] Release 1.13.0, release candidate #2
@Caizhi and @Becket - let me reach out to you to jointly debug this issue. I am wondering if there is some incorrect reporting of failed events? On Wed, Apr 28, 2021 at 8:53 AM Caizhi Weng wrote: > -1 > > We're testing this version on batch jobs with large (600~1000) parallelisms > and the following exception messages appear with high frequency: > > 2021-04-27 21:27:26 > org.apache.flink.util.FlinkException: An OperatorEvent from an > OperatorCoordinator to a task was lost. Triggering task failover to ensure > consistency. Event: '[NoMoreSplitEvent]', targetTask: - > execution #0 > at > > org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.lambda$sendEvent$0(SubtaskGatewayImpl.java:81) > at > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Becket Qin is investigating this issue. >
[jira] [Created] (FLINK-22433) CoordinatorEventsExactlyOnceITCase stalls on Adaptive Scheduler
Stephan Ewen created FLINK-22433: Summary: CoordinatorEventsExactlyOnceITCase stalls on Adaptive Scheduler Key: FLINK-22433 URL: https://issues.apache.org/jira/browse/FLINK-22433 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.13.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0, 1.13.1 Logs of the test failure: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17077=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7030a106-e977-5851-a05e-535de648c9c9=3 Steps to reproduce: Adjust the {{CoordinatorEventsExactlyOnceITCase }} and extend the MiniCluster configuration: {code} @BeforeClass public static void startMiniCluster() throws Exception { final Configuration config = new Configuration(); config.setString(RestOptions.BIND_PORT, "0"); config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); config.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22406) ReactiveModeITCase.testScaleDownOnTaskManagerLoss()
Stephan Ewen created FLINK-22406: Summary: ReactiveModeITCase.testScaleDownOnTaskManagerLoss() Key: FLINK-22406 URL: https://issues.apache.org/jira/browse/FLINK-22406 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.13.0 Reporter: Stephan Ewen The test is stalling on Azure CI. https://dev.azure.com/sewen0794/Flink/_build/results?buildId=292=logs=0a15d512-44ac-5ba5-97ab-13a5d066c22c=634cd701-c189-5dff-24cb-606ed884db87=4865 -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Task Local Recovery with mountable disks in the cloud
/cc dev@flink On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal wrote: > Hello, > > We've been experimenting with Task-local recovery using Kubernetes. We > have a way to specify mounting the same disk across Task Manager > restarts/deletions for when the pods get recreated. In this scenario, we > noticed that task local recovery does not kick in (as expected based on the > documentation). > > We did try to comment out the code on the shutdown path which cleaned up > the task local directories before the pod went down / was restarted. We > noticed that remote recovery kicked in even though the task local state was > present. I noticed that the slot IDs changed, and was wondering if this is > the main reason that the task local state didn't get used in this scenario? > > Since we're using this shared disk to store the local state across pod > failures, would it make sense to allow keeping the task local state so that > we can get faster recovery even for situations where the Task Manager > itself dies? In some sense, the storage here is disaggregated from the pods > and can potentially benefit from task local recovery. Any reason why this > is a bad idea in general? > > Is there a way to preserve the slot IDs across restarts? We setup the Task > Manager to pin the resource-id, but that didn't seem to help. My > understanding is that the slot ID needs to be reused for task local > recovery to kick in. > > Thanks, > Sonam > >
[jira] [Created] (FLINK-22358) Add missing stability annotation to Split Reader API classes
Stephan Ewen created FLINK-22358: Summary: Add missing stability annotation to Split Reader API classes Key: FLINK-22358 URL: https://issues.apache.org/jira/browse/FLINK-22358 Project: Flink Issue Type: Task Components: Connectors / Common Reporter: Stephan Ewen Fix For: 1.14.0, 1.13.1 The Split Reader API currently has no stability annotations, it is unclear which classes are public API, which are internal, which are stable, and which are evolving. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22357) Mark FLIP-27 Source API as stable
Stephan Ewen created FLINK-22357: Summary: Mark FLIP-27 Source API as stable Key: FLINK-22357 URL: https://issues.apache.org/jira/browse/FLINK-22357 Project: Flink Issue Type: Task Components: API / Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 The FLIP-27 source API was properly introduced in 1.11, has undergone some major improvements in 1.12. During the stabilization in 1.13 we needed only one very minor change to those interfaces. I think it is time to declare the core source API interfaces as stable, to allow users to safely rely on them. I would suggest to do that for 1.14, possibly even backport the annotation change to 1.13. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS]FLIP-150: Introduce Hybrid Source
Thanks, Thomas! @Becket and @Nicholas - would you be ok with that approach? On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise wrote: > Hi Stephan, > > Thanks for the feedback! > > I agree with the approach of starting with a simple implementation > that can address a well understood, significant portion of use cases. > > I'm planning to continue work on the prototype that I had shared. > There is production level usage waiting for it fairly soon. I expect > to open a PR in the coming weeks. > > Thomas > > > > > > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen wrote: > > > > Thanks all for this discussion. Looks like there are lots of ideas and > > folks that are eager to do things, so let's see how we can get this > moving. > > > > My take on this is the following: > > > > There will probably not be one Hybrid source, but possibly multiple ones, > > because of different strategies/requirements. > > - One may be very simple, with switching points known up-front. Would > > be good to have this in a very simple implementation. > > - There may be one where the switch is dynamic and the readers need > to > > report back where they left off. > > - There may be one that switches back and forth multiple times > during a > > job, for example Kakfa going to DFS whenever it falls behind retention, > in > > order to catch up again. > > > > This also seems hard to "design on paper"; I expect there are nuances in > a > > production setup that affect some details of the design. So I'd feel most > > comfortable in adding a variant of the hybrid source to Flink that has > been > > used already in a real use case (not necessarily in production, but maybe > > in a testing/staging environment, so it seems to meet all requirements). > > > > > > What do you think about the following approach? > > - If there is a tested PoC, let's try to get it contributed to Flink > > without trying to make it much more general. > > - When we see similar but a bit different requirements for another > hybrid > > source, then let's try to evolve the contributed one. > > - If we see new requirements that are so different that they don't fit > > well with the existing hybrid source, then let us look at building a > second > > hybrid source for those requirements. > > > > We need to make connector contributions in general more easy, and I think > > it is not a bad thing to end up with different approaches and see how > these > > play out against each other when being used by users. For example > switching > > with known boundaries, dynamic switching, back-and-forth-switching, etc. > > (I know some committers are planning to do some work on making > > connector contributions easier, with standardized testing frameworks, > > decoupled CI, etc.) > > > > Best, > > Stephan > > > > > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise wrote: > > > > > Hi, > > > > > > As mentioned in my previous email, I had been working on a prototype > for > > > the hybrid source. > > > > > > You can find it at https://github.com/tweise/flink/pull/1 > > > > > > It contains: > > > * Switching with configurable chain of sources > > > * Fixed or dynamic start positions > > > * Test with MockSource and FileSource > > > > > > The purpose of the above PR is to gather feedback and help drive > consensus > > > on the FLIP. > > > > > > * How to support a dynamic start position within the source chain? > > > > > > Relevant in those (few?) cases where start positions are not known > upfront. > > > You can find an example of what that might look like in the tests: > > > > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > > > > > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > > > > > When switching, the enumerator of the previous source needs to > > > supply information about consumed splits that allows to set the start > > > position for the next source. That could be something like the last > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator > > > doesn't track finished splits.) > > > > > > See previous discussion regarding start/end position. The prototype > shows > > > the use of checkpoint state with c
[jira] [Created] (FLINK-22324) Backport FLINK-18071 for 1.12.x
Stephan Ewen created FLINK-22324: Summary: Backport FLINK-18071 for 1.12.x Key: FLINK-22324 URL: https://issues.apache.org/jira/browse/FLINK-22324 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.3 See FLINK-18071 - this issue only tracks the backport to allow closing the blocker issue for 1.13.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Releasing Flink 1.12.3
+1 Thanks for pushing this, Arvid, let's get this fix out asap. On Fri, Apr 16, 2021 at 9:46 AM Arvid Heise wrote: > Dear devs, > > Since we just fixed a severe bug that causes the dataflow to halt under > specific circumstances [1], we would like to release a bugfix asap. > > I would volunteer as the release manager and kick off the release process > on next Monday (April 19th). > What do you think? > > Note that this time around, I would not wait for any specific > fixes/backports. However, you can still merge all fixes that you'd like to > see in 1.12.3 until Monday. > > Btw the fix is already in master and will be directly applied to the next > RC of 1.13.0. Flink version 1.11.x and older are not affected. > > [1] https://issues.apache.org/jira/browse/FLINK-21992 >
Re: [DISCUSS]FLIP-150: Introduce Hybrid Source
Thanks all for this discussion. Looks like there are lots of ideas and folks that are eager to do things, so let's see how we can get this moving. My take on this is the following: There will probably not be one Hybrid source, but possibly multiple ones, because of different strategies/requirements. - One may be very simple, with switching points known up-front. Would be good to have this in a very simple implementation. - There may be one where the switch is dynamic and the readers need to report back where they left off. - There may be one that switches back and forth multiple times during a job, for example Kakfa going to DFS whenever it falls behind retention, in order to catch up again. This also seems hard to "design on paper"; I expect there are nuances in a production setup that affect some details of the design. So I'd feel most comfortable in adding a variant of the hybrid source to Flink that has been used already in a real use case (not necessarily in production, but maybe in a testing/staging environment, so it seems to meet all requirements). What do you think about the following approach? - If there is a tested PoC, let's try to get it contributed to Flink without trying to make it much more general. - When we see similar but a bit different requirements for another hybrid source, then let's try to evolve the contributed one. - If we see new requirements that are so different that they don't fit well with the existing hybrid source, then let us look at building a second hybrid source for those requirements. We need to make connector contributions in general more easy, and I think it is not a bad thing to end up with different approaches and see how these play out against each other when being used by users. For example switching with known boundaries, dynamic switching, back-and-forth-switching, etc. (I know some committers are planning to do some work on making connector contributions easier, with standardized testing frameworks, decoupled CI, etc.) Best, Stephan On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise wrote: > Hi, > > As mentioned in my previous email, I had been working on a prototype for > the hybrid source. > > You can find it at https://github.com/tweise/flink/pull/1 > > It contains: > * Switching with configurable chain of sources > * Fixed or dynamic start positions > * Test with MockSource and FileSource > > The purpose of the above PR is to gather feedback and help drive consensus > on the FLIP. > > * How to support a dynamic start position within the source chain? > > Relevant in those (few?) cases where start positions are not known upfront. > You can find an example of what that might look like in the tests: > > > https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62 > > https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132 > > When switching, the enumerator of the previous source needs to > supply information about consumed splits that allows to set the start > position for the next source. That could be something like the last > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator > doesn't track finished splits.) > > See previous discussion regarding start/end position. The prototype shows > the use of checkpoint state with converter function. > > * Should readers be deployed dynamically? > > The prototype assumes a static source chain that is fixed at job submission > time. Conceivably there could be use cases that require more flexibility. > Such as switching one KafkaSource for another. A step in that direction > would be to deploy the actual readers dynamically, at the time of switching > source. > > Looking forward to feedback and suggestions for next steps! > > Thomas > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise wrote: > > > Hi Nicholas, > > > > Thanks for the reply. I had implemented a small PoC. It switches a > > configurable sequence of sources with predefined bounds. I'm using the > > unmodified MockSource for illustration. It does not require a > "Switchable" > > interface. I looked at the code you shared and the delegation and > signaling > > works quite similar. That's a good validation. > > > > Hi Kezhu, > > > > Thanks for bringing the more detailed discussion regarding the start/end > > position. I think in most cases the start and end positions will be known > > when the job is submitted. If we take a File -> Kafka source chain as > > example, there would most likely be a timestamp at which we want to > > transition from files to reading from Kafka. So we would either set the > > start position for Kafka based on that timestamp or provide the offsets > > directly. (Note that I'm skipping a few related nuances here. In order to > > achieve an exact switch without duplication or gap, we may also need some > > overlap and filtering, but that's a separate issue.) > > > > The point
Re: [DISCUSS] Backport FLIP-27 Kafka source connector fixes with API change to release-1.12.
Hi all! Generally, avoiding API changes in Bug fix versions is the right thing, in my opinion. But this case is a bit special, because we are changing something that never worked properly in the first place. So we are not breaking a "running thing" here, but making it usable. So +1 from my side to backport these changes, I think we make more users happy than angry with this. Best, Stephan On Thu, Apr 8, 2021 at 11:35 AM Becket Qin wrote: > Hi Arvid, > > There are interface changes to the Kafka source, and there is a backwards > compatible change in the base source implementation. Therefore technically > speaking, users might be able to run the Kafka source in 1.13 with a 1.12 > Flink job. However, it could be tricky because there might be some > dependent jar conflicts between 1.12 and 1.13. So this solution seems a > little fragile. > > I'd second Till's question if there is an issue for users that start with > > the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka > source > > with API changes. > > > Just to clarify, the bug fixes themselves include API changes, they are not > separable. So we basically have three options here: > > 1. Do not backport fixes in 1.12. So users have to upgrade to 1.13 in order > to use the new Kafka source. > 2. Write some completely different fixes for release 1.12 and ask users to > migrate to the new API when they upgrade to 1.13 > 3. Backport the fix with API changes to 1.12. So users don't need to handle > interface change when they upgrade to 1.13+. > > Personally I think option 3 here is better because it does not really > introduce any trouble to the users. The downside is that we do need to > change the API of Kafka source in 1.12. Given that the changed API won't be > useful without these bug fixes, changing the API seems to be doing more > good than bad here. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Thu, Apr 8, 2021 at 2:39 PM Arvid Heise wrote: > > > Hi Becket, > > > > did you need to change anything to the source interface itself? Wouldn't > it > > be possible for users to simply use the 1.13 connector with their Flink > > 1.12 deployment? > > > > I think the late-upgrade argument can be made for any feature, but I also > > see that the Kafka connector is of high interest. > > > > I'd second Till's question if there is an issue for users that start with > > the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka > source > > with API changes. > > > > Best, > > > > Arvid > > > > On Thu, Apr 8, 2021 at 2:54 AM Becket Qin wrote: > > > > > Thanks for the comment, Till and Thomas. > > > > > > As far as I know there are some users who have just upgraded their > Flink > > > version from 1.8 / 1.9 to Flink 1.12 and might not upgrade the version > > in 6 > > > months or more. There are also some organizations that have the > strategy > > of > > > not running the latest version of a project, but only the second latest > > > version with bug fixes. So those users may still benefit from the > > backport. > > > However, arguably the old Kafka source is there anyways in 1.12, so > they > > > should not be blocked on having the new source. > > > > > > I am leaning towards backporting the fixes mainly because this way we > > might > > > have more users migrating to the new Source and provide feedback. It > will > > > take some time for the users to pick up 1.13, especially for the users > > > running Flink at large scale. So backporting the fixes to 1.12 would > help > > > get the new source to be used sooner. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Apr 8, 2021 at 12:40 AM Thomas Weise wrote: > > > > > > > Hi, > > > > > > > > Thanks for fixing the new KafkaSource issues. > > > > > > > > I'm interested in using these fixes with 1.12 for experimental > > purposes. > > > > > > > > +1 for backporting. 1.12 is the current stable release and users who > > > would > > > > like to try the FLIP-27 sources are likely to use that release. > > > > > > > > Thomas > > > > > > > > On Wed, Apr 7, 2021 at 2:50 AM Till Rohrmann > > > wrote: > > > > > > > > > Hi Becket, > > > > > > > > > > If I remember correctly, then we deliberately not documented the > > Kafka > > > > > connector in the 1.12 release. Hence, from this point there should > be > > > no > > > > > need to backport any fixes because users are not aware of this > > feature. > > > > > > > > > > On the other hand this also means that we should be able to break > > > > anything > > > > > we want to. Consequently, backporting these fixes should be > possible. > > > > > > > > > > The question would probably be whether we want to ship new features > > > with > > > > a > > > > > bug fix release. Do we know of any users who want to use the new > > Kafka > > > > > source, are using the 1.12 version and cannot upgrade to 1.13 once > it > > > is > > > > > released? If this is the case, then this could be an argument for > > > > shipping > > > > > this feature with
[jira] [Created] (FLINK-22093) Unstable test "ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling"
Stephan Ewen created FLINK-22093: Summary: Unstable test "ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling" Key: FLINK-22093 URL: https://issues.apache.org/jira/browse/FLINK-22093 Project: Flink Issue Type: Bug Components: Tests Reporter: Stephan Ewen Fix For: 1.13.0 The test {{ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling()}} failed in all profiles in my latest CI build (even though it passes locally). - https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267=ab910030-93db-52a7-74a3-34a0addb481b=8102 - https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=6e55a443-5252-5db5-c632-109baf464772=9df6efca-61d0-513a-97ad-edb76d85786a=6698 - https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=cc649950-03e9-5fae-8326-2f1ad744b536=51cab6ca-669f-5dc0-221d-1e4f7dc4fc85 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22069) Check Log Pollution for 1.13 release
Stephan Ewen created FLINK-22069: Summary: Check Log Pollution for 1.13 release Key: FLINK-22069 URL: https://issues.apache.org/jira/browse/FLINK-22069 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Stephan Ewen Fix For: 1.13.0 We should check for log pollution and confusing log lines before the release. Below are some lines I stumbled over while using Flink during testing. - These lines show up on any execution of a local job and make me think I forgot to configure something I probably should have, wondering whether this might cause problems later? These have been in Flink for a few releases now, might be worth rephrasing, though. {code} 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. {code} - These lines show up on every job start, even if there is no recovery but just a plain job start. They are not particularly problematic, but also not helping. {code} 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,855 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) {code} When using {{DataStream.collect()}} we always have an excpetion in the log for the first fetch attempt, before the JM is ready. The loop retries and the program succeeds, but the exception in the log raises confusion about whether there is a swallowed but impactful error. {code} 7199 [main] WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An exception occurs when fetching query results java.util.concurrent.ExecutionException: org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:155) ~[classes/:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126) [classes
[jira] [Created] (FLINK-21996) Transient RPC failure without TaskManager failure can lead to split assignment loss
Stephan Ewen created FLINK-21996: Summary: Transient RPC failure without TaskManager failure can lead to split assignment loss Key: FLINK-21996 URL: https://issues.apache.org/jira/browse/FLINK-21996 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.12.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.13.0 NOTE: This bug has not been actually observed. It is based on reviews of the current implementation. I would expect it to be a pretty rare case, bu at scale, even the rare cases happen often enough. h2. Problem Intermediate RPC messages from JM to TM can get dropped, even when the TM is not marked as failed. That can happen when the connection can be recovered before the heartbeat times out. So RPCs generally retry, or handle failures: For example Deploy-Task-RPC retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers a new checkpoint. The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a Future with the acknowledgement. But if that one fails, we are in the situation where we do not know whether the event sending was successful or not (only the ack failed). This is especially tricky for split assignments and checkpoints. Consider this sequence of actions: 1. Coordinator assigns a split. Ack not yet received. 2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so is not included on the Coordinator. 3. Split assignment RPC response is "failed". 4. Checkpoint completes. Now we don't know whether the split was in the checkpoint on the Operator (TaskManager) or not, and with that we don't know whether we should add it back to the coordinator. We need to do something to make sure the split is now either on the coordinator or on the Operator. Currently, the split is implicitly assumed to be on the Operator; if it isn't, then that split is lost. Not, it is worth pointing out that this is a pretty rare situation, because it means that the RPC with the split assignment fails and the one for the checkpoint succeeds, even though they are in close proximity. The way the Akka-based RPC transport works (with retries, etc.), this can happen, but isn't very likely. That why we haven't so far seen this bug in practice or haven't gotten a report for it, yet. h2. Proposed solution The solution has two components: 1. Fallback to consistent point: If the system doesn't know whether two parts are still consistent with each other (here coordinator and Operator), fall back to a consistent point. Here that is the case when the Ack-Future for the "Send Operator Event" RPC fails or times out. Then we call the scheduler to trigger a failover of the target operator to latest checkpoint and signaling the coordinator the same. That restores consistency. We can later optimize this (see below). 2. We cannot trigger checkpoints while we are "in limbo" concerning our knowledge about splits. Concretely that means that the Coordinator can only acknowledge the checkpoint once the Acks for pending Operator Event RPCs (Assign-Splits) have arrived. The checkpoint future is conditional on all pending RPC futures. If the RPC futures fail (or time out) then the checkpoint cannot complete (and the target operator will anyways go through a failover). In the common case, RPC round trip time is milliseconds, which would be added to the checkpoint latency if the checkpoint happends to overlap with a split assignment (most won't). h2. Possible Future Improvements Step (1) above can be optimized by going with retries first and sequence numbers to deduplicate the calls. That can help reduce the number of cases were a failover is needed. However, the number of situations where the RPC would need a retry and has a chance of succeeding (the TM is not down) should be very few to begin with, so whether this optimization is worth it remains to be seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21935) Remove "state.backend.async" option.
Stephan Ewen created FLINK-21935: Summary: Remove "state.backend.async" option. Key: FLINK-21935 URL: https://issues.apache.org/jira/browse/FLINK-21935 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Stephan Ewen Fix For: 1.13.0 Checkpoints are always asynchronous, there is no case ever for a synchronous checkpoint. The RocksDB state backend doesn't even support synchronous snapshots, and the HashMap Heap backend also has no good use case for synchronous snapshots (other than a very minor reduction in heap objects). Most importantly, we should not expose this option in the constructors of the new state backend API classes, like {{HashMapStateBackend}}. I marked this a blocker because it is part of the new user-facing State Backend API and I would like to avoid that this option enters this API and causes confusion when we eventually remove it. /cc [~sjwiesman] and [~liyu] -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle
Hi Yingjie! Thanks for doing those experiments, the results look good. Let's go ahead with 32M then. Regarding the key, I am not strongly opinionated there. There are arguments for both keys, (1) making the key part of the network pool config as you did here or (2) making it part of the TM config (relative to framework off-heap memory). I find (1) quite understandable, but it is personal taste, so I can go with either option. Best, Stephan On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) wrote: > Hi all, > > I have tested the default memory size with both batch (tpcds) and > streaming jobs running in one session cluster (more than 100 queries). The > result is: > 1. All settings (16M, 32M and 64M) can work well without any OOM. > 2. For streaming jobs running after batch jobs, there is no performance or > stability regression. > 2. 32M and 64M is better (over 10%) in terms of performance for the test > batch job on HDD. > > Based on the above results, I think 32M is a good default choice, because > the performance is good enough for the test job and compared to 64M, more > direct memory can be used by netty and other components. What do you think? > > BTW, about the configuration key, do we reach a consensus? I am > temporarily using taskmanager.memory.network.batch-shuffle-read.size in > my PR now. Any suggestions about that? > > Best, > Yingjie (Kevin) > > -- > 发件人:Guowei Ma > 日 期:2021年03月09日 17:28:35 > 收件人:曹英杰(北牧) > 抄 送:Till Rohrmann; Stephan Ewen; > dev; user; Xintong Song< > tonysong...@gmail.com> > 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM > merge shuffle > > Hi, all > > Thanks all for your suggestions and feedback. > I think it is a good idea that we increase the default size of the > separated pool by testing. I am fine with adding the suffix(".size") to the > config name, which makes it more clear to the user. > But I am a little worried about adding a prefix("framework") because > currently the tm shuffle service is only a shuffle-plugin, which is not a > part of the framework. So maybe we could add a clear explanation in the > document? > > Best, > Guowei > > > On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) wrote: > >> Thanks for the suggestions. I will do some tests and share the results >> after the implementation is ready. Then we can give a proper default value. >> >> Best, >> Yingjie >> >> -- >> 发件人:Till Rohrmann >> 日 期:2021年03月05日 23:03:10 >> 收件人:Stephan Ewen >> 抄 送:dev; user; Xintong Song< >> tonysong...@gmail.com>; 曹英杰(北牧); Guowei Ma< >> guowei@gmail.com> >> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge >> shuffle >> >> Thanks for this proposal Guowei. +1 for it. >> >> Concerning the default size, maybe we can run some experiments and see >> how the system behaves with different pool sizes. >> >> Cheers, >> Till >> >> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen wrote: >> >>> Thanks Guowei, for the proposal. >>> >>> As discussed offline already, I think this sounds good. >>> >>> One thought is that 16m sounds very small for a default read buffer >>> pool. How risky do you think it is to increase this to 32m or 64m? >>> >>> Best, >>> Stephan >>> >>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma wrote: >>> >>>> Hi, all >>>> >>>> >>>> In the Flink 1.12 we introduce the TM merge shuffle. But the >>>> out-of-the-box experience of using TM merge shuffle is not very good. The >>>> main reason is that the default configuration always makes users encounter >>>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle >>>> to avoid the problem. >>>> Goals >>>> >>>>1. Don't affect the streaming and pipelined-shuffle-only batch >>>>setups. >>>>2. Don't mix memory with different life cycle in the same pool. >>>>E.g., write buffers needed by running tasks and read buffer needed even >>>>after tasks being finished. >>>>3. User can use the TM merge shuffle with default memory >>>>configurations. (May need further tunings for performance optimization, >>>> but >>>>should not fail with the default configurations.) >>>> >>>> Proposal >>>> >>>>1. Introduce a co
Re: [ANNOUCE][DISCUSS] Roadmap Update for Website
I created a Pull Request for the roadmap update: https://github.com/apache/flink-web/pull/426 Happy to take reviews! Best, Stephan On Wed, Mar 10, 2021 at 1:55 PM Stephan Ewen wrote: > Thanks for the positive feedback. Will convert this to a pull request then > in the next days. > > Still looking to beautify the Feature Radar graphic a bit. > > Best, > Stephan > > > On Wed, Mar 10, 2021 at 8:26 AM Dawid Wysakowicz > wrote: > >> Thank you Stephan for working on this. It's really nice to see the >> roadmap updated. I also find the "Feature Radar" extremely helpful. It >> looks good to be published from my side. >> >> Best, >> >> Dawid >> >> On 02/03/2021 15:38, Stephan Ewen wrote: >> > Hi all! >> > >> > The roadmap on the Flink website is quite outdated: >> > https://flink.apache.org/roadmap.html >> > >> > I drafted an update to the roadmap that reflects the currently ongoing >> > bigger threads. >> > Not every detail is mentioned there, because this roadmap should give >> users >> > a high-level view where the project is going. >> > >> > There is also a new "Feature Radar" to help users understand in which >> stage >> > of maturity and support individual features are. >> > >> > Now, I am sure this will cause some discussion about which feature >> > should be on which level of maturity. This is my initial proposal which >> I >> > checked with some committers, but of course, there may be different >> > opinions. In that case, please bring this up in this discussion thread >> and >> > we find consensus together. >> > >> > >> https://docs.google.com/document/d/1g6T72-8PHTsfhZq8GWWzG8F3PmD-dMuFSzg2AozXNxk/edit?usp=sharing >> > >> > The raw figure for the feature radar is: >> > >> https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg >> > >> > If someone has some graphics skills to make this look more pretty, help >> is >> > greatly welcome! >> > (Only request would be to keep a format that many people (and open >> tools) >> > can edit, so maintenance remains easy). >> > >> > Looking forward to hearing what you think! >> > >> > Best, >> > Stephan >> > >> >>
Re: [ANNOUCE][DISCUSS] Roadmap Update for Website
Thanks for the positive feedback. Will convert this to a pull request then in the next days. Still looking to beautify the Feature Radar graphic a bit. Best, Stephan On Wed, Mar 10, 2021 at 8:26 AM Dawid Wysakowicz wrote: > Thank you Stephan for working on this. It's really nice to see the > roadmap updated. I also find the "Feature Radar" extremely helpful. It > looks good to be published from my side. > > Best, > > Dawid > > On 02/03/2021 15:38, Stephan Ewen wrote: > > Hi all! > > > > The roadmap on the Flink website is quite outdated: > > https://flink.apache.org/roadmap.html > > > > I drafted an update to the roadmap that reflects the currently ongoing > > bigger threads. > > Not every detail is mentioned there, because this roadmap should give > users > > a high-level view where the project is going. > > > > There is also a new "Feature Radar" to help users understand in which > stage > > of maturity and support individual features are. > > > > Now, I am sure this will cause some discussion about which feature > > should be on which level of maturity. This is my initial proposal which I > > checked with some committers, but of course, there may be different > > opinions. In that case, please bring this up in this discussion thread > and > > we find consensus together. > > > > > https://docs.google.com/document/d/1g6T72-8PHTsfhZq8GWWzG8F3PmD-dMuFSzg2AozXNxk/edit?usp=sharing > > > > The raw figure for the feature radar is: > > > https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg > > > > If someone has some graphics skills to make this look more pretty, help > is > > greatly welcome! > > (Only request would be to keep a format that many people (and open tools) > > can edit, so maintenance remains easy). > > > > Looking forward to hearing what you think! > > > > Best, > > Stephan > > > >
[jira] [Created] (FLINK-21695) Increase default value for number of KeyGroups
Stephan Ewen created FLINK-21695: Summary: Increase default value for number of KeyGroups Key: FLINK-21695 URL: https://issues.apache.org/jira/browse/FLINK-21695 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Stephan Ewen Fix For: 1.13.0 The current calculation for the number of Key Groups (max parallelism) leads in many cases to data skew and to confusion among users. Specifically, the fact that for maxParallelisms above 128, the default value is set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, half of the tasks get one keygroup and the other half gets two keygroups, which is very skewed. See section (1) in this "lessons learned" blog post. https://engineering.contentsquare.com/2021/ten-flink-gotchas/ We can fix this by - either setting a default maxParallelism to something pretty high (2048 for example). The cost is that we add the default key group overhead per state entry from one byte to two bytes. - or we stay with some similar logic, but we instead of {{1.5 x operatorParallelism}} we go with some higher multiplier, like {{4 x operatorParallelism}}. The price is again that we more quickly reach the point where we have two bytes of keygroup encoding overhead, instead of one. Implementation wise, there is an unfortunate situation that the maxParallelism, if not configured, is not stored anywhere in the job graph, but re-derived on the JobManager each time it loads a JobGraph vertex (ExecutionJobVertex) which does not have a MaxParallelism configured. This relies on the implicit contract that this logic never changes. Changing this logic will instantly break all jobs which have not explicitly configured the Max Parallelism. That seems like a pretty heavy design shortcoming, unfortunately :-( A way to partially work around that is by moving the logic that derives the maximum parallelism to the {{StreamGraphGenerator}}, so we never create JobGraphs where vertices have no configured Max Parallelism (and we keep the re-derivation logic for backwards compatibility for persisted JobGraphs). The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to give existing un-configured applications a way to keep restoring from old savepoints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21694) Increase default value of "state.backend.rocksdb.checkpoint.transfer.thread.num"
Stephan Ewen created FLINK-21694: Summary: Increase default value of "state.backend.rocksdb.checkpoint.transfer.thread.num" Key: FLINK-21694 URL: https://issues.apache.org/jira/browse/FLINK-21694 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Stephan Ewen Fix For: 1.13.0 The default value for the number of threads used to download state artifacts from checkpoint storage should be increased. The increase should not pose risk of regression, but does in many cases speed up checkpoint recovery significantly. Something similar was reported in this blog post, item (3). https://engineering.contentsquare.com/2021/ten-flink-gotchas/ A default value of 8 (eight) sounds like a good default. It should not result in excessive thread explosion, and already speeds up recovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [ANNOUCE][DISCUSS] Roadmap Update for Website
Thanks, Robert! As far as I understand, the Reactive Mode is not yet released, that's why I didn't put it there. The radar should be updated after each release. What do you think? On Sat, Mar 6, 2021 at 3:41 PM Robert Metzger wrote: > Thanks a lot for updating the roadmap! > > I really like the new Feature Radar, because it makes some assumptions > floating around on JIRA and the lists properly documented. > > I guess the radar is missing the Reactive Mode as an MVP feature. > > > Otherwise, I'm +1 on publishing this soon! > > On Tue, Mar 2, 2021 at 3:38 PM Stephan Ewen wrote: > > > Hi all! > > > > The roadmap on the Flink website is quite outdated: > > https://flink.apache.org/roadmap.html > > > > I drafted an update to the roadmap that reflects the currently ongoing > > bigger threads. > > Not every detail is mentioned there, because this roadmap should give > users > > a high-level view where the project is going. > > > > There is also a new "Feature Radar" to help users understand in which > stage > > of maturity and support individual features are. > > > > Now, I am sure this will cause some discussion about which feature > > should be on which level of maturity. This is my initial proposal which I > > checked with some committers, but of course, there may be different > > opinions. In that case, please bring this up in this discussion thread > and > > we find consensus together. > > > > > > > https://docs.google.com/document/d/1g6T72-8PHTsfhZq8GWWzG8F3PmD-dMuFSzg2AozXNxk/edit?usp=sharing > > > > The raw figure for the feature radar is: > > > > > https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg > > > > If someone has some graphics skills to make this look more pretty, help > is > > greatly welcome! > > (Only request would be to keep a format that many people (and open tools) > > can edit, so maintenance remains easy). > > > > Looking forward to hearing what you think! > > > > Best, > > Stephan > > >
Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle
Thanks Guowei, for the proposal. As discussed offline already, I think this sounds good. One thought is that 16m sounds very small for a default read buffer pool. How risky do you think it is to increase this to 32m or 64m? Best, Stephan On Fri, Mar 5, 2021 at 4:32 AM Guowei Ma wrote: > Hi, all > > > In the Flink 1.12 we introduce the TM merge shuffle. But the > out-of-the-box experience of using TM merge shuffle is not very good. The > main reason is that the default configuration always makes users encounter > OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle > to avoid the problem. > Goals > >1. Don't affect the streaming and pipelined-shuffle-only batch setups. >2. Don't mix memory with different life cycle in the same pool. E.g., >write buffers needed by running tasks and read buffer needed even after >tasks being finished. >3. User can use the TM merge shuffle with default memory >configurations. (May need further tunings for performance optimization, but >should not fail with the default configurations.) > > Proposal > >1. Introduce a configuration `taskmanager.memory.network.batch-read` >to specify the size of this memory pool. The default value is 16m. >2. Allocate the pool lazily. It means that the memory pool would be >allocated when the TM merge shuffle is used at the first time. >3. This pool size will not be add up to the TM's total memory size, >but will be considered part of >`taskmanager.memory.framework.off-heap.size`. We need to check that the >pool size is not larger than the framework off-heap size, if TM merge >shuffle is enabled. > > > In this default configuration, the allocation of the memory pool is almost > impossible to fail. Currently the default framework’s off-heap memory is > 128m, which is mainly used by Netty. But after we introduced zero copy, the > usage of it has been reduced, and you can refer to the detailed data [2]. > Known Limitation > Usability for increasing the memory pool size > > In addition to increasing `taskmanager.memory.network.batch-read`, the > user may also need to adjust `taskmanager.memory.framework.off-heap.size` > at the same time. It also means that once the user forgets this, it is > likely to fail the check when allocating the memory pool. > > > So in the following two situations, we will still prompt the user to > increase the size of `framework.off-heap.size`. > >1. `taskmanager.memory.network.batch-read` is bigger than >`taskmanager.memory.framework.off-heap.size` >2. Allocating the pool encounters the OOM. > > > An alternative is that when the user adjusts the size of the memory pool, > the system automatically adjusts it. But we are not entierly sure about > this, given its implicity and complicating the memory configurations. > Potential memory waste > > In the first step, the memory pool will not be released once allocated. This > means in the first step, even if there is no subsequent batch job, the > pooled memory cannot be used by other consumers. > > > We are not releasing the pool in the first step due to the concern that > frequently allocating/deallocating the entire pool may increase the GC > pressue. Investitations on how to dynamically release the pool when it's no > longer needed is considered a future follow-up. > > > Looking forward to your feedback. > > > > [1] https://issues.apache.org/jira/browse/FLINK-20740 > > [2] https://github.com/apache/flink/pull/7368. > Best, > Guowei >
Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle
Thanks Guowei, for the proposal. As discussed offline already, I think this sounds good. One thought is that 16m sounds very small for a default read buffer pool. How risky do you think it is to increase this to 32m or 64m? Best, Stephan On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma wrote: > Hi, all > > > In the Flink 1.12 we introduce the TM merge shuffle. But the > out-of-the-box experience of using TM merge shuffle is not very good. The > main reason is that the default configuration always makes users encounter > OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle > to avoid the problem. > Goals > >1. Don't affect the streaming and pipelined-shuffle-only batch setups. >2. Don't mix memory with different life cycle in the same pool. E.g., >write buffers needed by running tasks and read buffer needed even after >tasks being finished. >3. User can use the TM merge shuffle with default memory >configurations. (May need further tunings for performance optimization, but >should not fail with the default configurations.) > > Proposal > >1. Introduce a configuration `taskmanager.memory.network.batch-read` >to specify the size of this memory pool. The default value is 16m. >2. Allocate the pool lazily. It means that the memory pool would be >allocated when the TM merge shuffle is used at the first time. >3. This pool size will not be add up to the TM's total memory size, >but will be considered part of >`taskmanager.memory.framework.off-heap.size`. We need to check that the >pool size is not larger than the framework off-heap size, if TM merge >shuffle is enabled. > > > In this default configuration, the allocation of the memory pool is almost > impossible to fail. Currently the default framework’s off-heap memory is > 128m, which is mainly used by Netty. But after we introduced zero copy, the > usage of it has been reduced, and you can refer to the detailed data [2]. > Known Limitation > Usability for increasing the memory pool size > > In addition to increasing `taskmanager.memory.network.batch-read`, the > user may also need to adjust `taskmanager.memory.framework.off-heap.size` > at the same time. It also means that once the user forgets this, it is > likely to fail the check when allocating the memory pool. > > > So in the following two situations, we will still prompt the user to > increase the size of `framework.off-heap.size`. > >1. `taskmanager.memory.network.batch-read` is bigger than >`taskmanager.memory.framework.off-heap.size` >2. Allocating the pool encounters the OOM. > > > An alternative is that when the user adjusts the size of the memory pool, > the system automatically adjusts it. But we are not entierly sure about > this, given its implicity and complicating the memory configurations. > Potential memory waste > > In the first step, the memory pool will not be released once allocated. This > means in the first step, even if there is no subsequent batch job, the > pooled memory cannot be used by other consumers. > > > We are not releasing the pool in the first step due to the concern that > frequently allocating/deallocating the entire pool may increase the GC > pressue. Investitations on how to dynamically release the pool when it's no > longer needed is considered a future follow-up. > > > Looking forward to your feedback. > > > > [1] https://issues.apache.org/jira/browse/FLINK-20740 > > [2] https://github.com/apache/flink/pull/7368. > Best, > Guowei >
[ANNOUCE][DISCUSS] Roadmap Update for Website
Hi all! The roadmap on the Flink website is quite outdated: https://flink.apache.org/roadmap.html I drafted an update to the roadmap that reflects the currently ongoing bigger threads. Not every detail is mentioned there, because this roadmap should give users a high-level view where the project is going. There is also a new "Feature Radar" to help users understand in which stage of maturity and support individual features are. Now, I am sure this will cause some discussion about which feature should be on which level of maturity. This is my initial proposal which I checked with some committers, but of course, there may be different opinions. In that case, please bring this up in this discussion thread and we find consensus together. https://docs.google.com/document/d/1g6T72-8PHTsfhZq8GWWzG8F3PmD-dMuFSzg2AozXNxk/edit?usp=sharing The raw figure for the feature radar is: https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg If someone has some graphics skills to make this look more pretty, help is greatly welcome! (Only request would be to keep a format that many people (and open tools) can edit, so maintenance remains easy). Looking forward to hearing what you think! Best, Stephan
Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend
Thanks for clarifying. Concerning the JM aborted checkpoints and state handles: I was thinking about it the other day as well and was considering an approach like that: The core idea is to move the cleanup from JM to TM. That solves two issues: (1) The StateBackends / DSTL delete the artifacts themselves, meaning we don't have to make assumptions about the state on the JM. That sounds too fragile, with easy bugs as soon as some slight assumptions change (see also bug with incr. checkpoint / savepoint data loss, https://issues.apache.org/jira/browse/FLINK-21351) (2) We do not need to clean up from one node. In the past, doing the cleanup from one node (JM) has sometimes become a bottleneck. To achieve that, we would need to extend the "notifyCheckpointComplete()" RPC from the JM to the TM includes both the ID of the completed checkpoint, and the ID of the earliest retained checkpoint. Then the TM can clean up all artifacts from earlier checkpoints. There are two open questions to that design: (1) On restore, we need to communicate the state handles of the previous checkpoints to the TM as well, so the TM gets again the full picture of all state artifacts. (2) On rescaling, we need to clarify which TM is responsible for releasing a handle, if they are mapped to multiple TMs. Otherwise we get double-delete calls. That isn't per se a problem, it is just a bit less efficient. Maybe we could think in that direction for the DSTL work? On Mon, Feb 15, 2021 at 8:44 PM Roman Khachatryan wrote: > Thanks for your reply Stephan. > > Yes, there is overlap between FLIP-151 and FLIP-158 as both > address incremental state updates. However, I think that FLIP-151 on top > of FLIP-158 increases efficiency by: > > 1. "Squashing" the changes made to the same key. For example, if some > counter was changed 10 times then FLIP-151 will send only the last value > (this allows to send AND store less data compared to FLIP-158) > > 2. Keeping in memory only the changed keys and not the values. > (this allows to reduce memory AND latency (caused by serialization + > copying on every update) compared to FLIP-158) > > (1) can probably be implemented in FLIP-158, but not (2). > > I don't think there will be a lot of follow-up efforts and I hope > @Dawid Wysakowicz , @pnowojski > , Yuan Mei and probably > @Yu Li will be able to join at different stages. > > Regarding using only the confirmed checkpoints, you are right: JM can > abort non-confirmed checkpoints and discard the state. FLIP-158 has > the same problem because StateChangelog produces StateHandles that > can be discarded by the JM. Currently, potentially discarded > changes are re-uploaded in both FLIPs. > > In FLIP-158 (or follow-up), I planned to improve this part by: > 1. Limiting max-concurrent-checkpoints to 1, and > 2. Sending the last confirmed checkpoint ID in RPCs and barriers > So at the time of checkpoint, backend knows exactly which changes can be > included. > > Handling of removed keys is not related to the aborted checkpoints. They > are > needed on recovery to actually remove data from the previous snapshot. > In FLIP-158 it is again similar: ChangelogStateBackend has to encode > removal operations and send them to StateChangelog (though no additional > data structure is required). > > Regards, > Roman > > > On Thu, Feb 11, 2021 at 4:28 PM Stephan Ewen wrote: > > > Thanks, Roman for publishing this design. > > > > There seems to be quite a bit of overlap with FLIP-158 (generalized > > incremental checkpoints). > > > > I would go with +1 to the effort if it is a pretty self-contained and > > closed effort. Meaning we don't expect that this needs a ton of > follow-ups, > > other than common maintenance and small bug fixes. If we expect that this > > requires a lot of follow-ups, then we end up splitting our work between > > this FLIP and FLIP-158, which seems a bit inefficient. > > What other committers would be involved to ensure the community can > > maintain this? > > > > > > The design looks fine, in general, with one question: > > > > When persisting changes, you persist all changes that have a newer > version > > than the latest one confirmed by the JM. > > > > Can you explain why it is like that exactly? Alternatively, you could > keep > > the latest checkpoint ID for which the state backend persisted the diff > > successfully to the checkpoint storage, and created a state handle. For > > each checkpoint, the state backend includes the state handles of all > > involved chunks. That would be similar to the log-based approach in > > FLIP-158. > > > > I have a suspicion that this is because the JM may have released t
Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend
Thanks, Roman for publishing this design. There seems to be quite a bit of overlap with FLIP-158 (generalized incremental checkpoints). I would go with +1 to the effort if it is a pretty self-contained and closed effort. Meaning we don't expect that this needs a ton of follow-ups, other than common maintenance and small bug fixes. If we expect that this requires a lot of follow-ups, then we end up splitting our work between this FLIP and FLIP-158, which seems a bit inefficient. What other committers would be involved to ensure the community can maintain this? The design looks fine, in general, with one question: When persisting changes, you persist all changes that have a newer version than the latest one confirmed by the JM. Can you explain why it is like that exactly? Alternatively, you could keep the latest checkpoint ID for which the state backend persisted the diff successfully to the checkpoint storage, and created a state handle. For each checkpoint, the state backend includes the state handles of all involved chunks. That would be similar to the log-based approach in FLIP-158. I have a suspicion that this is because the JM may have released the state handle (and discarded the diff) for a checkpoint that succeeded on the task but didn't succeed globally. So we cannot reference any state handle that has been handed over to the JobManager, but is not yet confirmed. This characteristic seems to be at the heart of much of the complexity, also the handling of removed keys seems to be caused by that. If we could change that assumption, the design would become simpler. (Side note: I am wondering if this also impacts the FLIP-158 DSTL design.) Best, Stephan On Sun, Nov 15, 2020 at 8:51 AM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi Stefan, > > Thanks for your reply. Very interesting ideas! > If I understand correctly, SharedStateRegistry will still be responsible > for pruning the old state; for that, it will maintain some (ordered) > mapping between StateMaps and their versions, per key group. > I think one modification to this approach is needed to support journaling: > for each entry, maintain a version when it was last fully snapshotted; and > use this version to find the minimum as you described above. > I'm considering a better state cleanup and optimization of removals as the > next step. Anyway, I will add it to the FLIP document. > > Thanks! > > Regards, > Roman > > > On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter > > wrote: > > > Hi, > > > > Very happy to see that the incremental checkpoint idea is finally > becoming > > a reality for the heap backend! Overall the proposal looks pretty good to > > me. Just wanted to point out one possible improvement from what I can > still > > remember from my ideas back then: I think you can avoid doing periodic > full > > snapshots for consolidation. Instead, my suggestion would be to track the > > version numbers you encounter while you iterate a snapshot for writing > it - > > and then you should be able to prune all incremental snapshots that were > > performed with a version number smaller than the minimum you find. To > avoid > > the problem of very old entries that never get modified you could start > > spilling entries with a certain age-difference compared to the current > map > > version so that eventually all entries for an old version are re-written > to > > newer snapshots. You can track the version up to which this was done in > the > > map and then you can again let go of their corresponding snapshots after > a > > guaranteed time.So instead of having the burden of periodic large > > snapshots, you can make every snapshot work a little bit on the cleanup > and > > if you are lucky it might happen mostly by itself if most entries are > > frequently updated. I would also consider to make map clean a special > event > > in your log and consider unticking the versions on this event - this > allows > > you to let go of old snapshots and saves you from writing a log of > > antimatter entries. Maybe the ideas are still useful to you. > > > > Best, > > Stefan > > > > On 2020/11/04 01:54:25, Khachatryan Roman wrote: > > > Hi devs,> > > > > > > I'd like to start a discussion of FLIP-151: Incremental snapshots for> > > > heap-based state backend [1]> > > > > > > Heap backend, while being limited state sizes fitting into memory, also > > has> > > > some advantages compared to RocksDB backend:> > > > 1. Serialization once per checkpoint, not per state modification. This> > > > allows to “squash” updates to the same keys> > > > 2. Shorter synchronous phase (compared to RocksDB incremental)> > > > 3. No need for sorting and compaction, no IO amplification and JNI > > overhead> > > > This can potentially give higher throughput and efficiency.> > > > > > > However, Heap backend currently lacks incremental checkpoints. This > > FLIP> > > > aims to add initial support for them.> > > > > > > [1]> > > > > > >
Re: [DISCUSS] FLIP-162: Consistent Flink SQL time function behavior
Hi all! A quick thought on this thread: We see a typical stalemate here, as in so many discussions recently. One developer prefers it this way, another one another way. Both have pro/con arguments, it takes a lot of time from everyone, still there is little progress in the discussion. Ultimately, this can only be decided by talking to the users. And it would also be the best way to ensure that what we build is the intuitive and expected way for users. The less the users are into the deep aspects of Flink SQL, the better they can mirror what a common user would expect (a power user will anyways figure it out). Let's find a person to drive that, spell it out in the FLIP as "semantics TBD", and focus on the implementation of the parts that are agreed upon. For interviewing the users, here are some ideas for questions to look at: - How do they view the trade-off between stable semantics vs. out-of-the-box magic (faster getting started). - How comfortable are they realizing the different meaning of "now()" in a streaming versus batch context. - What would be their expectation when moving a query with the time functions ("now()") from an unbounded stream (Kafka source without end offset) to a bounded stream (Kafka source with end offsets), which may switch execution to batch. Best, Stephan On Tue, Feb 2, 2021 at 3:19 PM Jark Wu wrote: > Hi Fabian, > > I think we have an agreement that the functions should be evaluated at > query start in batch mode. > Because all the other batch systems and traditional databases are this > behavior, which is standard SQL compliant. > > *1. The different point of view is what's the behavior in streaming mode? * > > From my point of view, I don't see any potential meaning to evaluate at > query-start for a 365-day long running streaming job. > And from my observation, CURRENT_TIMESTAMP is heavily used by Flink > streaming users and they expect the current behaviors. > The SQL standard only provides a guideline for traditional batch systems, > however Flink is a leading streaming processing system > which is out of the scope of SQL standard, and Flink should define the > streaming standard. I think a standard should follow users' intuition. > Therefore, I think we don't need to be standard SQL compliant at this point > because users don't expect it. > Changing the behavior of the functions to evaluate at query start for > streaming mode will hurt most of Flink SQL users and we have nothing to > gain, > we should avoid this. > > *2. Does it break the unified streaming-batch semantics? * > > I don't think so. First of all, what's the unified streaming-batch > semantic? > I think it means the* eventual result* instead of the *behavior*. > It's hard to say we have provided unified behavior for streaming and batch > jobs, > because for example unbounded aggregate behaves very differently. > In batch mode, it only evaluates once for the bounded data and emits the > aggregate result once. > But in streaming mode, it evaluates for each row and emits the updated > result. > What we have always emphasized "unified streaming-batch semantics" is [1] > > > a query produces exactly the same result regardless whether its input is > static batch data or streaming data. > > From my understanding, the "semantic" means the "eventual result". > And time functions are non-deterministic, so it's reasonable to get > different results for batch and streaming mode. > Therefore, I think it doesn't break the unified streaming-batch semantics > to evaluate per-record for streaming and > query-start for batch, as the semantic doesn't means behavior semantic. > > Best, > Jark > > [1]: https://flink.apache.org/news/2017/04/04/dynamic-tables.html > > On Tue, 2 Feb 2021 at 18:34, Fabian Hueske wrote: > > > Hi everyone, > > > > Sorry for joining this discussion late. > > Let me give some thought to two of the arguments raised in this thread. > > > > Time functions are inherently non-determintistic: > > -- > > This is of course true, but IMO it doesn't mean that the semantics of > time > > functions do not matter. > > It makes a difference whether a function is evaluated once and it's > result > > is reused or whether it is invoked for every record. > > Would you use the same logic to justify different behavior of RAND() in > > batch and streaming queries? > > > > Provide the semantics that most users expect: > > -- > > I don't think it is clear what most users expect, esp. if we also include > > future users (which we certainly want to gain) into this assessment. > > Our current users got used to the semantics that we introduced. So I > > wouldn't be surprised if they would say stick with the current semantics. > > However, we are also claiming standard SQL compliance and stress the goal > > of batch-stream unification. > > So I would assume that new SQL users expect standard compliant behavior > for > > batch and streaming queries. > > > > > > IMO, we should try hard to stick to our goals of 1)
Re: [DISCUSS] FLIP-158: Generalized incremental checkpoints
+1 to this FLIP in general. I like the general idea very much (full disclosure, have been involved in the discussions and drafting of the design for a while, so I am not a new/neutral reviewer here). One thing I would like to see us do here, is reaching out to users early with this, and validating this approach. It is a very fundamental change that also shifts certain tradeoffs, like "cost during execution" vs. "cost on recovery". This approach will increase the data write rate to S3/HDFS/... So before we build every bit of the complex implementation, let's try and validate/test the critical bits with the users. In my assessment, the most critical bit is the continuous log writing, which adds overhead during execution time. Recovery is less critical, there'll be no overhead or additional load, so recovery should be strictly better than currently. I would propose we hence focus on the implementation of the logging first (ignoring recovery, focusing on one target FileSystem/Object store) and test run this with a few users, see that it works well and whether they like the new characteristics. I am also trying to contribute some adjustments to the FLIP text, like more illustrations/explanations, to make it easier to share this FLIP with a wider audience, so we can get the above-mentioned user input and validation. Best, Stephan On Thu, Jan 28, 2021 at 10:46 AM Piotr Nowojski wrote: > Hi Roman, > > +1 from my side on this proposal. Also big +1 for the recent changes in > this FLIP in the motivation and high level overview sections. > > For me there are quite a bit of unanswered things around how to actually > implement the proposed changes and especially how to integrate it with the > state backends and checkpointing, but maybe we can do that in either a > follow up design docs or discuss it in the tickets or even maybe some PoC. > > Piotrek > > pt., 15 sty 2021 o 07:49 Khachatryan Roman > napisał(a): > > > Hi devs, > > > > I'd like to start a discussion of FLIP-158: Generalized incremental > > checkpoints [1] > > > > FLIP motivation: > > Low end-to-end latency is a much-demanded property in many Flink setups. > > With exactly-once, this latency depends on checkpoint interval/duration > > which in turn is defined by the slowest node (usually the one doing a > full > > non-incremental snapshot). In large setups with many nodes, the > probability > > of at least one node being slow gets higher, making almost every > checkpoint > > slow. > > > > This FLIP proposes a mechanism to deal with this by materializing and > > uploading state continuously and only uploading the changed part during > the > > checkpoint itself. It differs from other approaches in that 1) > checkpoints > > are always incremental; 2) works for any state backend. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints > > > > Any feedback highly appreciated! > > > > Regards, > > Roman > > >
Re: [DISCUSS] FLIP-156: Runtime Interfaces for Fine-Grained Resource Requirements
Thanks a lot, Yangze and Xintong for this FLIP. I want to say, first of all, that this is super well written. And the points that the FLIP makes about how to expose the configuration to users is exactly the right thing to figure out first. So good job here! About how to let users specify the resource profiles. If I can sum the FLIP and previous discussion up in my own words, the problem is the following: Operator-level specification is the simplest and cleanest approach, because > it avoids mixing operator configuration (resource) and scheduling. No > matter what other parameters change (chaining, slot sharing, switching > pipelined and blocking shuffles), the resource profiles stay the same. > But it would require that a user specifies resources on all operators, > which makes it hard to use. That's why the FLIP suggests going with > specifying resources on a Sharing-Group. I think both thoughts are important, so can we find a solution where the Resource Profiles are specified on an Operator, but we still avoid that we need to specify a resource profile on every operator? What do you think about something like the following: - Resource Profiles are specified on an operator level. - Not all operators need profiles - All Operators without a Resource Profile ended up in the default slot sharing group with a default profile (will get a default slot). - All Operators with a Resource Profile will go into another slot sharing group (the resource-specified-group). - Users can define different slot sharing groups for operators like they do now, with the exception that you cannot mix operators that have a resource profile and operators that have no resource profile. - The default case where no operator has a resource profile is just a special case of this model - The chaining logic sums up the profiles per operator, like it does now, and the scheduler sums up the profiles of the tasks that it schedules together. There is another question about reactive scaling raised in the FLIP. I need to think a bit about that. That is indeed a bit more tricky once we have slots of different sizes. It is not clear then which of the different slot requests the ResourceManager should fulfill when new resources (TMs) show up, or how the JobManager redistributes the slots resources when resources (TMs) disappear This question is pretty orthogonal, though, to the "how to specify the resources". Best, Stephan On Fri, Jan 8, 2021 at 5:14 AM Xintong Song wrote: > Thanks for drafting the FLIP and driving the discussion, Yangze. > And Thanks for the feedback, Till and Chesnay. > > @Till, > > I agree that specifying requirements for SSGs means that SSGs need to be > supported in fine-grained resource management, otherwise each operator > might use as many resources as the whole group. However, I cannot think of > a strong reason for not supporting SSGs in fine-grained resource > management. > > > > Interestingly, if all operators have their resources properly specified, > > then slot sharing is no longer needed because Flink could slice off the > > appropriately sized slots for every Task individually. > > > > So for example, if we have a job consisting of two operator op_1 and op_2 > > where each op needs 100 MB of memory, we would then say that the slot > > sharing group needs 200 MB of memory to run. If we have a cluster with 2 > > TMs with one slot of 100 MB each, then the system cannot run this job. If > > the resources were specified on an operator level, then the system could > > still make the decision to deploy op_1 to TM_1 and op_2 to TM_2. > > > Couldn't agree more that if all operators' requirements are properly > specified, slot sharing should be no longer needed. I think this exactly > disproves the example. If we already know op_1 and op_2 each needs 100 MB > of memory, why would we put them in the same group? If they are in separate > groups, with the proposed approach the system can freely deploy them to > either a 200 MB TM or two 100 MB TMs. > > Moreover, the precondition for not needing slot sharing is having resource > requirements properly specified for all operators. This is not always > possible, and usually requires tremendous efforts. One of the benefits for > SSG-based requirements is that it allows the user to freely decide the > granularity, thus efforts they want to pay. I would consider SSG in > fine-grained resource management as a group of operators that the user > would like to specify the total resource for. There can be only one group > in the job, 2~3 groups dividing the job into a few major parts, or as many > groups as the number of tasks/operators, depending on how fine-grained the > user is able to specify the resources. > > Having to support SSGs might be a constraint. But given that all the > current scheduler implementations already support SSGs, I tend to think > that as an acceptable price for the above discussed usability and > flexibility. > > @Chesnay > > Will
Re: [DISCUSS] FLIP-157 Migrate Flink Documentation from Jekyll to Hugo
+1 for this FLIP. I tried it out locally, and it works well. Aside from the fact build times, I really like the search feature, which works pretty well. Thanks a lot for the initiative, Seth! On Sun, Jan 17, 2021 at 4:43 PM Jark Wu wrote: > I have tried in my local env, and the build time is really fast. > Looking forward Hugo can help to better cooperate with the translation > work. > > +1 to start a vote. > > best, > Jark > > On Fri, 15 Jan 2021 at 23:02, Seth Wiesman wrote: > > > Great, if there aren't any other concerns I will open this up for a vote > on > > Monday. > > > > Seth > > > > On Thu, Jan 14, 2021 at 9:03 AM Seth Wiesman > wrote: > > > > > Happy to see there is enthusiasm for this change, let me try and > answers > > > each of these questions. > > > > > > @Jark Wu Hugo has proper support for i18n which > means > > > we can move content into external files that can be easily > translated[1]. > > > For reference, Kubernetes has successfully used Hugo's built-in > features > > to > > > maintain 14 different languages[2]. Additionally, Hugo's md files are > > > standard markdown which could allow us to integrate with other tooling. > > For > > > example, we may look into using Crowdin for managing translations as > the > > > pulsar community does. > > > > > > @Till Rohrmann None that I have found. In the > > > proof of concept, I have already implemented all the Jekyll > functionality > > > we are using in the docs[4]. I have found Hugo shortcodes to be a more > > > flexible alternative to liquid tags. > > > > > > @Chesnay Schepler Not yet, I do not have access > to > > > the build bot (it is PMC only). I will work with INFRA to get Hugo > > > installed if it is not already and Robert has agreed to set-up the > build > > > script on the build bot itself. > > > > > > Seth > > > > > > [1] https://gohugo.io/functions/i18n/ > > > [2] https://github.com/kubernetes/website/ > > > [3] https://github.com/apache/pulsar-translation > > > [4] > > > > > > https://github.com/sjwiesman/flink-docs-v2/tree/master/layouts/shortcodes > > > > > > > > > > > > On Thu, Jan 14, 2021 at 7:03 AM David Anderson > > > wrote: > > > > > >> I've spent a few hours digging into this with Seth, and can report > that > > >> this makes working on the docs much less of a chore. > > >> > > >> +1 (with enthusiasm) > > >> > > >> Best, > > >> David > > >> > > >> On Thu, Jan 14, 2021 at 1:34 PM Kostas Kloudas > > >> wrote: > > >> > > >> > +1 for moving to Hugo. > > >> > > > >> > Cheers, > > >> > Kostas > > >> > > > >> > On Thu, Jan 14, 2021 at 1:27 PM Wei Zhong > > >> wrote: > > >> > > > > >> > > +1 for migrating to Hugo. > > >> > > > > >> > > Currently we have developed many plugins based on Jekyll because > the > > >> > native features of Jekyll cannot meet our needs. It seems all of > them > > >> can > > >> > be supported via Hugo shortcodes and will become more concise. > > >> > > > > >> > > Best, > > >> > > Wei > > >> > > > > >> > > > 在 2021年1月14日,18:21,Aljoscha Krettek 写道: > > >> > > > > > >> > > > +1 > > >> > > > > > >> > > > The build times on Jekyll have just become to annoying for me. I > > >> > realize that that is also a function of how we structure our > > >> documentation, > > >> > and especially how we construct the nav sidebar, but I think overall > > >> moving > > >> > to Hugo is still a benefit. > > >> > > > > > >> > > > Aljoscha > > >> > > > > > >> > > > On 2021/01/13 10:14, Seth Wiesman wrote: > > >> > > >> Hi All, > > >> > > >> > > >> > > >> I would like to start a discussion for FLIP-157: Migrating the > > >> Flink > > >> > docs > > >> > > >> from Jekyll to Hugo. > > >> > > >> > > >> > > >> This will allow us: > > >> > > >> > > >> > > >> - Proper internationalization > > >> > > >> - Working Search > > >> > > >> - Sub-second build time ;) > > >> > > >> > > >> > > >> Please take a look and let me know what you think. > > >> > > >> > > >> > > >> Seth > > >> > > >> > > >> > > >> > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-157+Migrate+Flink+Documentation+from+Jekyll+to+Hugo > > >> > > > > >> > > > >> > > > > > >
[jira] [Created] (FLINK-20472) Change quickstarts to have a dependency on "flink-dist"
Stephan Ewen created FLINK-20472: Summary: Change quickstarts to have a dependency on "flink-dist" Key: FLINK-20472 URL: https://issues.apache.org/jira/browse/FLINK-20472 Project: Flink Issue Type: Improvement Components: Quickstarts Reporter: Stephan Ewen I suggest we change the quickstarts to have the following dependencies. - {{flink-dist}} (provided) - log4j (runtime) That way the projects created form quickstarts have exactly the same dependencies as what the distribution provides. That solves all our mismatches between needing different dependencies for compiling/running-in-IDE and packaging for deployment. For example, we can add the {{flink-connector-base}} and {{flink-connector-files}} back to {{flink-dist}} without having any issues with setting up dependencies in the user projects. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20413) Sources should add splits back in "resetSubtask()", rather than in "subtaskFailed()".
Stephan Ewen created FLINK-20413: Summary: Sources should add splits back in "resetSubtask()", rather than in "subtaskFailed()". Key: FLINK-20413 URL: https://issues.apache.org/jira/browse/FLINK-20413 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Because "subtaskFailed()" has no strong order guarantees with checkpoint completion, we need to return failed splits in "resetSubtask()" instead. See FLINK-20396 for a detailed explanation of the race condition. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20412) Collect Result Fetching occasionally fails after a JobManager Failover
Stephan Ewen created FLINK-20412: Summary: Collect Result Fetching occasionally fails after a JobManager Failover Key: FLINK-20412 URL: https://issues.apache.org/jira/browse/FLINK-20412 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Stephan Ewen The encountered exception is as blow. The issue can be reproduced by running a test with JobManager failover in a tight loop, for example the FileTextLinesITCase from this PR: [https://github.com/apache/flink/pull/14199] {code:java} 15335 [main] WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher - An exception occurs when fetching query results java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163) ~[classes/:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:134) [classes/:?] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103) [classes/:?] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77) [classes/:?] at org.apache.flink.streaming.api.datastream.DataStreamUtils.collectRecordsFromUnboundedStream(DataStreamUtils.java:142) [classes/:?] at org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSource(FileSourceTextLinesITCase.java:272) [test-classes/:?] at org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSourceWithJobManagerFailover(FileSourceTextLinesITCase.java:228) [test-classes/:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) [junit-4.12.jar:4.12] at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.12.jar:4.12] at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.12.jar:4.12] at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) [junit-4.12.jar:4.12] at org.junit.rules.RunRules.evaluate(RunRules.java:20) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) [junit-4.12.jar:4.12] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) [junit-4.12.jar:4.12] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) [junit-4.12.jar:4.12] at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) [junit-4.12.jar:4.12] at org.junit.rules.RunRules.evaluate(RunRules.java:20) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.run(ParentRunner.java:363) [junit-4.12.jar:4.12] at org.junit.runner.JUnitCore.run(JUnitCore.java:137) [junit-4.12.jar:4.12] at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) [junit-rt.jar:?] at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:67) [junit-rt.jar
[jira] [Created] (FLINK-20406) Return the Checkpoint ID of the restored Checkpoint in CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks()
Stephan Ewen created FLINK-20406: Summary: Return the Checkpoint ID of the restored Checkpoint in CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks() Key: FLINK-20406 URL: https://issues.apache.org/jira/browse/FLINK-20406 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 To allow the scheduler to notify Operator Coordinators of subtask restores (local failover), we need to know which checkpoint ID was restored. This change does not adjust the other restore methods of the Checkpoint Coordinator, because the fact that the Scheduler needs to be involved in the subtask restore notification at all is only due to a shortcoming of the Checkpoint Coordinator: The CC is not aware of subtask restores, it always restores all subtasks and relies on the fact that assigning state to a running execution attempt has no effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20397) Pass checkpointId to OperatorCoordinator.resetToCheckpoint().
Stephan Ewen created FLINK-20397: Summary: Pass checkpointId to OperatorCoordinator.resetToCheckpoint(). Key: FLINK-20397 URL: https://issues.apache.org/jira/browse/FLINK-20397 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0, 1.11.3 The OperatorCoordinator.resetToCheckpoint() currently lacks the information which checkpoint it recovers to. That forces implementers to assume strict ordering of method calls between restore and failure. While that is currently guaranteed in this case, it is not guaranteed in other places (see parent issue). Because of that, we want implementations to not assume method order at all, but rely on explicit information passed to the methods (checkpoint IDs). Otherwise we end up with mixed implementations that partially infer context from the order of method calls, and partially use explicit information that was passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20396) Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()"
Stephan Ewen created FLINK-20396: Summary: Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()" Key: FLINK-20396 URL: https://issues.apache.org/jira/browse/FLINK-20396 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.11.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0, 1.11.3 There are no strong order guarantees between {{OperatorCoordinator.subtaskFailed()}} and {{OperatorCoordinator.notifyCheckpointComplete()}}. It can happen that a checkpoint completes after the notification for task failure is sent: - {{OperatorCoordinator.checkpoint()}} - {{OperatorCoordinator.subtaskFailed()}} - {{OperatorCoordinator.checkpointComplete()}} The subtask failure here does not know whether the previous checkpoint completed or not. It cannot decide what state the subtask will be in after recovery. There is no easy fix right now to strictly guarantee the order of the method calls, so alternatively we need to provide the necessary information to reason about the status of tasks. We should replace {{OperatorCoordinator.subtaskFailed(int subtask)}} with {{OperatorCoordinator.subtaskRestored(int subtask, long checkpoint)}}. That implementations get the explicit checkpoint ID for the subtask recovery, and can align that with the IDs of checkpoints that were taken. It is still (in rare cases) possible that for a specific checkpoint C, {{OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before {{OperatorCoordinator.checkpointComplete(C)}}. h3. Background The Checkpointing Procedure is partially asynchronous on the {{JobManager}} / {{CheckpointCoordinator}}: After all subtasks acknowledged the checkpoint, the finalization (writing out metadata and registering the checkpoint in ZooKeeper) happens in an I/O thread, and the checkpoint completes after that. This sequence of events can happen: - tasks acks checkpoint - checkpoint fully acknowledged, finalization starts - task fails - task failure notification is dispatched - checkpoint completes. For task failures and checkpoint completion, no order is defined. However, for task restore and checkpoint completion, the order is well defined: When a task is restored, pending checkpoints are either canceled or complete. None can be within finalization. That is currently guaranteed with a lock in the {{CheckpointCoordinator}}. (An implication of that being that restores can be blocking operations in the scheduler, which is not ideal from the perspective of making the scheduler async/non-blocking, but it is currently essential for correctness). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20379) New Kafka Connector does not support DeserializationSchema
Stephan Ewen created FLINK-20379: Summary: New Kafka Connector does not support DeserializationSchema Key: FLINK-20379 URL: https://issues.apache.org/jira/browse/FLINK-20379 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Stephan Ewen Fix For: 1.12.0 The new Kafka Connector defines its own deserialization schema and is incompatible with the existing library of deserializers. That means that users cannot use all of Flink's Formats (Avro, JSON, Csv, Protobuf, Confluent Schema Registry, ...) with the new Kafka Connector. I think we should change the new Kafka Connector to use the existing Deserialization classes, so all formats can be used, and users can reuse their deserializer implementations. It would also be good to use the existing KafkaDeserializationSchema. Otherwise all users need to migrate their sources again. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20276) Transparent DeCompression of streams missing on new File Source
Stephan Ewen created FLINK-20276: Summary: Transparent DeCompression of streams missing on new File Source Key: FLINK-20276 URL: https://issues.apache.org/jira/browse/FLINK-20276 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The existing {{FileInputFormat}} applies decompression (gzip, xy, ...) automatically on the file input stream, based on the file extension. We need to add similar functionality for the {{StreamRecordFormat}} of the new FileSource to be on par with this functionality. This can be easily applied in the {{StreamFormatAdapter}} when opening the file stream. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20188) Add Documentation for new File Source
Stephan Ewen created FLINK-20188: Summary: Add Documentation for new File Source Key: FLINK-20188 URL: https://issues.apache.org/jira/browse/FLINK-20188 Project: Flink Issue Type: Task Components: Documentation Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Apache Flink Stateful Functions 2.2.1, release candidate #1
+1 (binding) Compiled source and ran all tests (JDK 8) Ran example (greeter) with a docker compose setup Release contains no binaries. Checked LICENSE and NOTICE for source release Checked LICENSE and NOTICE for flink-statefun-distribution binary release On Fri, Nov 6, 2020 at 11:46 PM Igal Shilman wrote: > +1 (non binding) > > Legal: > - Checked signatures > - Checked that no binaries were included in source artifacts > > Build > - Built source `mvn clean install -Prun-e2e-tests` with jdk8 > - Built source mvn clean install -Prun-e2e-tests' with jdk11 > - Built the python SDK > - Built the Docker Image > > Functional: > - Verified that FLINK-19692 can no longer be reproduced. > - Run a randomized smoke test ("random clicker"), with a high volume > producer, and state verification in the presence of task manager failures. > This program run successfully on this RC, and failed (as expected) on the > previous version. > > Thanks, > Igal. > > On Fri, Nov 6, 2020 at 9:48 AM Tzu-Li (Gordon) Tai > wrote: > > > +1 (binding) > > > > Legal: > > - Checked signatures > > - Checked that no binaries were included in source artifacts > > - Build source `mvn clean install -Prun-e2e-tests` > > - No dependency changes since 2.2.0, so assuming that NOTICE files are > > correct (require no changes) > > > > Functional: > > - Verified that FLINK-19692 can no longer be reproduced with a StateFun > app > > that deterministically floods the feedback loop with events. Run with a > > docker-compose setup with parallelism=4, checkpointing to S3, verified > for > > all state backends. > > - Checked that state bootstrap API by running the state bootstrap > example, > > and restoring the greeter example from the generated savepoint. > > > > On Thu, Nov 5, 2020 at 3:24 AM Robert Metzger > wrote: > > > > > +1 binding > > > > > > Checks: > > > - source sha and signature > > > - mvn clean install passed on source archive > > > - staging artifacts have correct version, also in quickstart, NOTICE > > files > > > seem correct > > > > > > > > > > > > On Wed, Nov 4, 2020 at 11:20 AM Tzu-Li (Gordon) Tai < > tzuli...@apache.org > > > > > > wrote: > > > > > > > Hi everyone, > > > > > > > > Please review and vote on the release candidate #1 for the version > > 2.2.1 > > > of > > > > Apache Flink Stateful Functions, > > > > as follows: > > > > [ ] +1, Approve the release > > > > [ ] -1, Do not approve the release (please provide specific comments) > > > > > > > > ***Testing Guideline*** > > > > > > > > You can find here [1] a page in the project wiki on instructions for > > > > testing. > > > > To cast a vote, it is not necessary to perform all listed checks, > > > > but please mention which checks you have performed when voting. > > > > > > > > ***Release Overview*** > > > > > > > > As an overview, the release consists of the following: > > > > a) Stateful Functions canonical source distribution, to be deployed > to > > > the > > > > release repository at dist.apache.org > > > > b) Stateful Functions Python SDK distributions to be deployed to PyPI > > > > c) Maven artifacts to be deployed to the Maven Central Repository > > > > d) New Dockerfiles for the release > > > > e) Release announcement to be published to the Flink website > > > > > > > > ***Staging Areas to Review*** > > > > > > > > The staging areas containing the above mentioned artifacts are as > > > follows, > > > > for your review: > > > > * All artifacts for a) and b) can be found in the corresponding dev > > > > repository at dist.apache.org [2] > > > > * All artifacts for c) can be found at the Apache Nexus Repository > [3] > > > > > > > > All artifacts are signed with the key > > > > 1C1E2394D3194E1944613488F320986D35C33D6A [4] > > > > > > > > Other links for your review: > > > > * JIRA release notes [5] > > > > * source code tag "release-2.2.1-rc1" [6] > > > > * PR for the new Dockerfiles [7] > > > > * PR for the release announcement blog post [8] > > > > > > > > ***Vote Duration*** > > > > > > > > The voting time will run for 72 hours, lasting until Nov. 7th, 10am > > CET. > > > > It is adopted by majority approval, with at least 3 PMC affirmative > > > votes. > > > > > > > > Thanks, > > > > Gordon > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release > > > > [2] > > > https://dist.apache.org/repos/dist/dev/flink/flink-statefun-2.2.1-rc1/ > > > > [3] https://repository.apache.org/content/repositories/org apache > > > > flink-1401/ > > > > < > > https://repository.apache.org/content/repositories/orgapacheflink-1401/ > > > > > > > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS > > > > [5] > > > > > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349291 > > > > [6] > > > > > > > > > > > > > > https://gitbox.apache.org/repos/asf?p=flink-statefun.git;a=commit;h=3d53612fccfe143acbdd1686a942b29c96c6bbd8 > > > > [7]
[jira] [Created] (FLINK-20063) File Source requests an additional split on every restore.
Stephan Ewen created FLINK-20063: Summary: File Source requests an additional split on every restore. Key: FLINK-20063 URL: https://issues.apache.org/jira/browse/FLINK-20063 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Currently, the {{FileSourceReader}} requests a new split when started. That includes cases when it was restored from a checkpoint. So with every restore, the reader increases its split backlog size by one, causing problems for balanced split assignments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20049) Simplify handling of "request split".
Stephan Ewen created FLINK-20049: Summary: Simplify handling of "request split". Key: FLINK-20049 URL: https://issues.apache.org/jira/browse/FLINK-20049 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 This issue is similar to FLINK-19265 Split assignment events are treated specially by the source API. Users do not create them directly but call methods on the contexts to assign splits. The {{RequestSplitEvent}} is in contrast to that a custom user event and needs to be handled like a custom event, when sent by enumerators and received by the readers. That seems a bit confusing and inconsistent, given that {{RequestSplitEvent}} is essential for all use cases with pull-based split assignment, which is pretty much any batch use case and various streaming use cases. The event should be on the same level as the AddSplitEvent. I suggest that we add a {{SourceReaderContext.requestSplit()}} and {{SplitEnumerator.handleSplitRequest()}}, to have split requests and responses symmetrical. -- This message was sent by Atlassian Jira (v8.3.4#803005)