[jira] [Created] (FLINK-27459) JsonJobGraphGenerationTest doesn't reset context environment
Chesnay Schepler created FLINK-27459: Summary: JsonJobGraphGenerationTest doesn't reset context environment Key: FLINK-27459 URL: https://issues.apache.org/jira/browse/FLINK-27459 Project: Flink Issue Type: Technical Debt Components: Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [VOTE] Apache Flink Table Store 0.1.0, release candidate #2
Hi all, +1 for the release (non-binding). The check follows the Jira release note[1] and is listed as follows. - Verify that the source distributions of [2] do not contain any binaries; - Build the source distribution to ensure all source files have Apache headers, and test functionality against Maven Staged Artifacts under Java8 and Scala 2.12; - Check README.md; - Check file hashes(MD5 and SHA-1 for [3], SHA-512 for [2]) and GPG signatures for [2] and [3] with gpg 2.3.6; Best, Jane Chan [1] https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release [2] https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/ [3] https://repository.apache.org/content/repositories/orgapacheflink-1502/ On Fri, Apr 29, 2022 at 10:24 AM Jingsong Li wrote: > Hi everyone, > > Please review and vote on the release candidate #2 for the version 0.1.0 of > Apache Flink Table Store, as follows: > > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > **Release Overview** > > As an overview, the release consists of the following: > a) Table Store canonical source distribution, to be deployed to the > release repository at dist.apache.org > b) Maven artifacts to be deployed to the Maven Central Repository > > **Staging Areas to Review** > > The staging areas containing the above mentioned artifacts are as follows, > for your review: > * All artifacts for a) and b) can be found in the corresponding dev > repository at dist.apache.org [2] > * All artifacts for c) can be found at the Apache Nexus Repository [3] > * Pre Bundled Binaries Jar can work fine with quick start [4][5] > > All artifacts are signed with the key > 2C2B6A653B07086B65E4369F7C76245E0A318150 [6] > > Other links for your review: > * JIRA release notes [7] > * source code tag "release-0.1.0-rc2" [8] > * PR to update the website Downloads page to include Table Store > links [9] > > **Vote Duration** > > The voting time will run for at least 72 hours. > It is adopted by majority approval, with at least 3 PMC affirmative votes. > > Best, > Jingsong Lee > > [1] > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release > [2] > https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/ > [3] > https://repository.apache.org/content/repositories/orgapacheflink-1502/ > [4] > https://repository.apache.org/content/repositories/orgapacheflink-1502/org/apache/flink/flink-table-store-dist/0.1.0/flink-table-store-dist-0.1.0.jar > [5] > https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/ > [6] https://dist.apache.org/repos/dist/release/flink/KEYS > [7] > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351234 > [8] https://github.com/apache/flink-table-store/tree/release-0.1.0-rc2 > [9] https://github.com/apache/flink-web/pull/531 >
Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric
Hi Alexander and Arvid, Thanks for the discussion and sorry for my late response! We had an internal discussion together with Jark and Leonard and I’d like to summarize our ideas. Instead of implementing the cache logic in the table runtime layer or wrapping around the user-provided table function, we prefer to introduce some new APIs extending TableFunction with these concerns: 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF proc_time”, because it couldn’t truly reflect the content of the lookup table at the moment of querying. If users choose to enable caching on the lookup table, they implicitly indicate that this breakage is acceptable in exchange for the performance. So we prefer not to provide caching on the table runtime level. 2. If we make the cache implementation in the framework (whether in a runner or a wrapper around TableFunction), we have to confront a situation that allows table options in DDL to control the behavior of the framework, which has never happened previously and should be cautious. Under the current design the behavior of the framework should only be specified by configurations (“table.exec.xxx”), and it’s hard to apply these general configs to a specific table. 3. We have use cases that lookup source loads and refresh all records periodically into the memory to achieve high lookup performance (like Hive connector in the community, and also widely used by our internal connectors). Wrapping the cache around the user’s TableFunction works fine for LRU caches, but I think we have to introduce a new interface for this all-caching scenario and the design would become more complex. 4. Providing the cache in the framework might introduce compatibility issues to existing lookup sources like there might exist two caches with totally different strategies if the user incorrectly configures the table (one in the framework and another implemented by the lookup source). As for the optimization mentioned by Alexander, I think filters and projections should be pushed all the way down to the table function, like what we do in the scan source, instead of the runner with the cache. The goal of using cache is to reduce the network I/O and pressure on the external system, and only applying these optimizations to the cache seems not quite useful. I made some updates to the FLIP[1] to reflect our ideas. We prefer to keep the cache implementation as a part of TableFunction, and we could provide some helper classes (CachingTableFunction, AllCachingTableFunction, CachingAsyncTableFunction) to developers and regulate metrics of the cache. Also, I made a POC[2] for your reference. Looking forward to your ideas! [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric [2] https://github.com/PatrickRen/flink/tree/FLIP-221 Best regards, Qingsheng On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов wrote: > Thanks for the response, Arvid! > > I have few comments on your message. > > > but could also live with an easier solution as the first step: > > I think that these 2 ways are mutually exclusive (originally proposed > by Qingsheng and mine), because conceptually they follow the same > goal, but implementation details are different. If we will go one way, > moving to another way in the future will mean deleting existing code > and once again changing the API for connectors. So I think we should > reach a consensus with the community about that and then work together > on this FLIP, i.e. divide the work on tasks for different parts of the > flip (for example, LRU cache unification / introducing proposed set of > metrics / further work…). WDYT, Qingsheng? > > > as the source will only receive the requests after filter > > Actually if filters are applied to fields of the lookup table, we > firstly must do requests, and only after that we can filter responses, > because lookup connectors don't have filter pushdown. So if filtering > is done before caching, there will be much less rows in cache. > > > @Alexander unfortunately, your architecture is not shared. I don't know > the > > > solution to share images to be honest. > > Sorry for that, I’m a bit new to such kinds of conversations :) > I have no write access to the confluence, so I made a Jira issue, > where described the proposed changes in more details - > https://issues.apache.org/jira/browse/FLINK-27411. > > Will happy to get more feedback! > > Best, > Smirnov Alexander > > пн, 25 апр. 2022 г. в 19:49, Arvid Heise : > > > > Hi Qingsheng, > > > > Thanks for driving this; the inconsistency was not satisfying for me. > > > > I second Alexander's idea though but could also live with an easier > > solution as the first step: Instead of making caching an implementation > > detail of TableFunction X, rather devise a caching layer around X. So the > > proposal would be a CachingTableFunction that delegates to X in case of > > misses and else manages the cache. Lifting it into the
[jira] [Created] (FLINK-27458) Expose allowNonRestoredState flag in JobSpec
Gyula Fora created FLINK-27458: -- Summary: Expose allowNonRestoredState flag in JobSpec Key: FLINK-27458 URL: https://issues.apache.org/jira/browse/FLINK-27458 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.0.0 We should probably expose this option as a top level spec field otherwise it is impossible to set this on a per job level for SessionJobs. What do you think [~aitozi] [~wangyang0918] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27457) CassandraOutputFormats should support flush
Etienne Chauchot created FLINK-27457: Summary: CassandraOutputFormats should support flush Key: FLINK-27457 URL: https://issues.apache.org/jira/browse/FLINK-27457 Project: Flink Issue Type: Improvement Components: Connectors / Cassandra Reporter: Etienne Chauchot When migrating to Cassandra 4.x in [this PR|https://github.com/apache/flink/pull/19586] a race condition in the tests between the asynchronous writes and the junit assertions was uncovered. So it was decided to introduce the flush mechanism to asynchronous writes in the Cassandra output formats similarly to what was done in Cassandra sinks. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-227: Support overdraft buffer
Let me add some information about the LegacySource. If we want to disable the overdraft buffer for LegacySource. Could we add the enableOverdraft in LocalBufferPool? The default value is false. If the getAvailableFuture is called, change enableOverdraft=true. It indicates whether there are checks isAvailable elsewhere. I don't think it is elegant, but it's safe. Please correct me if I'm wrong. Thanks fanrui On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> wrote: > Hi, > > Thanks for your quick response. > > For question 1/2/3, we think they are clear. We just need to discuss the > default value in PR. > > For the legacy source, you are right. It's difficult for general > implementation. > Currently, we implement ensureRecordWriterIsAvailable() in > SourceFunction.SourceContext. And call it in our common LegacySource, > e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so > fixing FlinkKafkaConsumer solved most of our problems. > > Core code: > ``` > public void ensureRecordWriterIsAvailable() { > if (recordWriter == null > || > !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, > false) > || recordWriter.isAvailable()) { > return; > } > > CompletableFuture resumeFuture = recordWriter.getAvailableFuture(); > try { > resumeFuture.get(); > } catch (Throwable ignored) { > } > } > ``` > > LegacySource calls sourceContext.ensureRecordWriterIsAvailable() > before synchronized (checkpointLock) and collects records. > Please let me know if there is a better solution. > > Thanks > fanrui > > On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov > wrote: > >> Hi. >> >> -- 1. Do you mean split this into two JIRAs or two PRs or two commits in a >> PR? >> >> Perhaps, the separated ticket will be better since this task has fewer >> questions but we should find a solution for LegacySource first. >> >> -- 2. For the first task, if the flink user disables the Unaligned >> Checkpoint, do we ignore max buffers per channel? Because the >> overdraft >> isn't useful for the Aligned Checkpoint, it still needs to wait for >> downstream Task to consume. >> >> I think that the logic should be the same for AC and UC. As I understand, >> the overdraft maybe is not really helpful for AC but it doesn't make it >> worse as well. >> >> 3. For the second task >> -- - The default value of maxOverdraftBuffersPerPartition may also >> need >>to be discussed. >> >> I think it should be a pretty small value or even 0 since it kind of >> optimization and user should understand what they do(especially if we >> implement the first task). >> >> -- - If the user disables the Unaligned Checkpoint, can we set the >>maxOverdraftBuffersPerPartition=0? Because the overdraft isn't >> useful for >>the Aligned Checkpoint. >> >> The same answer that above, if the overdraft doesn't make degradation for >> the Aligned Checkpoint I don't think that we should make difference between >> AC and UC. >> >> 4. For the legacy source >> -- - If enabling the Unaligned Checkpoint, it uses up to >>maxOverdraftBuffersPerPartition buffers. >>- If disabling the UC, it doesn't use the overdraft buffer. >>- Do you think it's ok? >> >> Ideally, I don't want to use overdraft for LegacySource at all since it >> can lead to undesirable results especially if the limit is high. At least, >> as I understand, it will always work in overdraft mode and it will borrow >> maxOverdraftBuffersPerPartition buffers from the global pool which can lead >> to degradation of other subtasks on the same TaskManager. >> >> -- - Actually, we added the checkAvailable logic for LegacySource in >> our >>internal version. It works well. >> >> I don't really understand how it is possible for general case considering >> that each user has their own implementation of LegacySourceOperator >> >> -- 5. For the benchmark, do you have any suggestions? I submitted the PR >> [1]. >> >> I haven't looked at it yet, but I'll try to do it soon. >> >> >> 29.04.2022 14:14, rui fan пишет: >> > Hi, >> > >> > Thanks for your feedback. I have a servel of questions. >> > >> > 1. Do you mean split this into two JIRAs or two PRs or two commits >> in a >> > PR? >> > 2. For the first task, if the flink user disables the Unaligned >> > Checkpoint, do we ignore max buffers per channel? Because the >> overdraft >> > isn't useful for the Aligned Checkpoint, it still needs to wait for >> > downstream Task to consume. >> > 3. For the second task >> >- The default value of maxOverdraftBuffersPerPartition may also >> need >> >to be discussed. >> >- If the user disables the Unaligned Checkpoint, can we set the >> >maxOverdraftBuffersPerPartition=0? Because the overdraft isn't >> useful for >> >the Aligned Checkpoint. >> > 4. For the legacy
Re: [DISCUSS] FLIP-227: Support overdraft buffer
Hi, Thanks for your quick response. For question 1/2/3, we think they are clear. We just need to discuss the default value in PR. For the legacy source, you are right. It's difficult for general implementation. Currently, we implement ensureRecordWriterIsAvailable() in SourceFunction.SourceContext. And call it in our common LegacySource, e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so fixing FlinkKafkaConsumer solved most of our problems. Core code: ``` public void ensureRecordWriterIsAvailable() { if (recordWriter == null || !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false) || recordWriter.isAvailable()) { return; } CompletableFuture resumeFuture = recordWriter.getAvailableFuture(); try { resumeFuture.get(); } catch (Throwable ignored) { } } ``` LegacySource calls sourceContext.ensureRecordWriterIsAvailable() before synchronized (checkpointLock) and collects records. Please let me know if there is a better solution. Thanks fanrui On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov wrote: > Hi. > > -- 1. Do you mean split this into two JIRAs or two PRs or two commits in a > PR? > > Perhaps, the separated ticket will be better since this task has fewer > questions but we should find a solution for LegacySource first. > > -- 2. For the first task, if the flink user disables the Unaligned > Checkpoint, do we ignore max buffers per channel? Because the overdraft > isn't useful for the Aligned Checkpoint, it still needs to wait for > downstream Task to consume. > > I think that the logic should be the same for AC and UC. As I understand, > the overdraft maybe is not really helpful for AC but it doesn't make it > worse as well. > > 3. For the second task > -- - The default value of maxOverdraftBuffersPerPartition may also > need >to be discussed. > > I think it should be a pretty small value or even 0 since it kind of > optimization and user should understand what they do(especially if we > implement the first task). > > -- - If the user disables the Unaligned Checkpoint, can we set the >maxOverdraftBuffersPerPartition=0? Because the overdraft isn't > useful for >the Aligned Checkpoint. > > The same answer that above, if the overdraft doesn't make degradation for > the Aligned Checkpoint I don't think that we should make difference between > AC and UC. > > 4. For the legacy source > -- - If enabling the Unaligned Checkpoint, it uses up to >maxOverdraftBuffersPerPartition buffers. >- If disabling the UC, it doesn't use the overdraft buffer. >- Do you think it's ok? > > Ideally, I don't want to use overdraft for LegacySource at all since it > can lead to undesirable results especially if the limit is high. At least, > as I understand, it will always work in overdraft mode and it will borrow > maxOverdraftBuffersPerPartition buffers from the global pool which can lead > to degradation of other subtasks on the same TaskManager. > > -- - Actually, we added the checkAvailable logic for LegacySource in > our >internal version. It works well. > > I don't really understand how it is possible for general case considering > that each user has their own implementation of LegacySourceOperator > > -- 5. For the benchmark, do you have any suggestions? I submitted the PR > [1]. > > I haven't looked at it yet, but I'll try to do it soon. > > > 29.04.2022 14:14, rui fan пишет: > > Hi, > > > > Thanks for your feedback. I have a servel of questions. > > > > 1. Do you mean split this into two JIRAs or two PRs or two commits > in a > > PR? > > 2. For the first task, if the flink user disables the Unaligned > > Checkpoint, do we ignore max buffers per channel? Because the > overdraft > > isn't useful for the Aligned Checkpoint, it still needs to wait for > > downstream Task to consume. > > 3. For the second task > >- The default value of maxOverdraftBuffersPerPartition may also > need > >to be discussed. > >- If the user disables the Unaligned Checkpoint, can we set the > >maxOverdraftBuffersPerPartition=0? Because the overdraft isn't > useful for > >the Aligned Checkpoint. > > 4. For the legacy source > >- If enabling the Unaligned Checkpoint, it uses up to > >maxOverdraftBuffersPerPartition buffers. > >- If disabling the UC, it doesn't use the overdraft buffer. > >- Do you think it's ok? > >- Actually, we added the checkAvailable logic for LegacySource in > our > >internal version. It works well. > > 5. For the benchmark, do you have any suggestions? I submitted the PR > > [1]. > > > > [1] https://github.com/apache/flink-benchmarks/pull/54 > > > > Thanks > > fanrui > > > > On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov > > wrote: > > > >> Hi, > >> > >> We discuss about
Re: [DISCUSS] FLIP-227: Support overdraft buffer
Hi. -- 1. Do you mean split this into two JIRAs or two PRs or two commits in a PR? Perhaps, the separated ticket will be better since this task has fewer questions but we should find a solution for LegacySource first. -- 2. For the first task, if the flink user disables the Unaligned Checkpoint, do we ignore max buffers per channel? Because the overdraft isn't useful for the Aligned Checkpoint, it still needs to wait for downstream Task to consume. I think that the logic should be the same for AC and UC. As I understand, the overdraft maybe is not really helpful for AC but it doesn't make it worse as well. 3. For the second task -- - The default value of maxOverdraftBuffersPerPartition may also need to be discussed. I think it should be a pretty small value or even 0 since it kind of optimization and user should understand what they do(especially if we implement the first task). -- - If the user disables the Unaligned Checkpoint, can we set the maxOverdraftBuffersPerPartition=0? Because the overdraft isn't useful for the Aligned Checkpoint. The same answer that above, if the overdraft doesn't make degradation for the Aligned Checkpoint I don't think that we should make difference between AC and UC. 4. For the legacy source -- - If enabling the Unaligned Checkpoint, it uses up to maxOverdraftBuffersPerPartition buffers. - If disabling the UC, it doesn't use the overdraft buffer. - Do you think it's ok? Ideally, I don't want to use overdraft for LegacySource at all since it can lead to undesirable results especially if the limit is high. At least, as I understand, it will always work in overdraft mode and it will borrow maxOverdraftBuffersPerPartition buffers from the global pool which can lead to degradation of other subtasks on the same TaskManager. -- - Actually, we added the checkAvailable logic for LegacySource in our internal version. It works well. I don't really understand how it is possible for general case considering that each user has their own implementation of LegacySourceOperator -- 5. For the benchmark, do you have any suggestions? I submitted the PR [1]. I haven't looked at it yet, but I'll try to do it soon. 29.04.2022 14:14, rui fan пишет: Hi, Thanks for your feedback. I have a servel of questions. 1. Do you mean split this into two JIRAs or two PRs or two commits in a PR? 2. For the first task, if the flink user disables the Unaligned Checkpoint, do we ignore max buffers per channel? Because the overdraft isn't useful for the Aligned Checkpoint, it still needs to wait for downstream Task to consume. 3. For the second task - The default value of maxOverdraftBuffersPerPartition may also need to be discussed. - If the user disables the Unaligned Checkpoint, can we set the maxOverdraftBuffersPerPartition=0? Because the overdraft isn't useful for the Aligned Checkpoint. 4. For the legacy source - If enabling the Unaligned Checkpoint, it uses up to maxOverdraftBuffersPerPartition buffers. - If disabling the UC, it doesn't use the overdraft buffer. - Do you think it's ok? - Actually, we added the checkAvailable logic for LegacySource in our internal version. It works well. 5. For the benchmark, do you have any suggestions? I submitted the PR [1]. [1] https://github.com/apache/flink-benchmarks/pull/54 Thanks fanrui On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov wrote: Hi, We discuss about it a little with Dawid Wysakowicz. Here is some conclusion: First of all, let's split this into two tasks. The first task is about ignoring max buffers per channel. This means if we request a memory segment from LocalBufferPool and the maxBuffersPerChannel is reached for this channel, we just ignore that and continue to allocate buffer while LocalBufferPool has it(it is actually not a overdraft). The second task is about the real overdraft. I am pretty convinced now that we, unfortunately, need configuration for limitation of overdraft number(because it is not ok if one subtask allocates all buffers of one TaskManager considering that several different jobs can be submitted on this TaskManager). So idea is to have maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool). In this case, when a limit of buffers in LocalBufferPool is reached, LocalBufferPool can request additionally from NetworkBufferPool up to maxOverdraftBuffersPerPartition buffers. But it is still not clear how to handle LegacySource since it actually works as unlimited flatmap and it will always work in overdraft mode which is not a target. So we still need to think about that. 29.04.2022 11:11, rui fan пишет: Hi Anton Kalashnikov, I think you agree with we should limit the maximum number of overdraft segments that each LocalBufferPool can apply for, right? I prefer to hard code
Re: [DISCUSS] Planning Flink 1.16
Thanks for kicking off the topic, Konstantin and Chesnay. Also thanks Martijn, Godfrey and Xingbo for volunteering to be the release manager. Given that release 1.16 would likely be a beefy release with a bunch of major features already on their way, it might make sense to have more release managers than we usually do. We can figure out how to collaborate, e.g. splitting by modules / FLIPs. In case we need someone to get some errands or coordination done, I am happy to help. Also, +1 for feature freeze by the end of July with a potential 2-week delay of contingency. Cheers, Jiangjie (Becket) Qin On Fri, Apr 29, 2022 at 5:14 PM Xingbo Huang wrote: > Thanks Konstantin and Chesnay for starting the discussion. I'm also willing > to volunteer as the release manager if this is still open. > > Regarding the feature freeze date, +1 to the end of mid August. > > Best, > Xingbo > > Zhu Zhu 于2022年4月29日周五 11:01写道: > > > +1 for a 5 months release cycle. > > +1 target the feature freeze date 1.5 months before the release date. It > > can better guard the release date. > > Therefore, also +1 for mid August as the feature freeze date of 1.16 > > > > Thanks, > > Zhu > > > > Jark Wu 于2022年4月28日周四 22:24写道: > > > > > I'm also fine with the end of July for the feature freeze. > > > > > > Best, > > > Jark > > > > > > On Thu, 28 Apr 2022 at 21:00, Martijn Visser > > > wrote: > > > > > > > +1 for continuing to strive for a 5 months release cycle. > > > > > > > > And +1 to have the planned feature freeze mid August, which I would > > > propose > > > > to have happen on Monday the 15th of August 2022. I would also > already > > > > state that even though we know this is a holiday period, we should > not > > > > extend this deadline for that reason :) > > > > > > > > Best regards, > > > > > > > > Martijn Visser > > > > https://twitter.com/MartijnVisser82 > > > > https://github.com/MartijnVisser > > > > > > > > > > > > On Thu, 28 Apr 2022 at 14:37, Jingsong Li > > > wrote: > > > > > > > > > Thanks for the check. > > > > > > > > > > > Continue to strive for a 5 months release cycle and 1.5 months > > before > > > > to > > > > > the desired release date > > > > > > > > > > Sounds good to me! > > > > > > > > > > Best, > > > > > Jingsong > > > > > > > > > > On Thu, Apr 28, 2022 at 7:06 PM Konstantin Knauf < > kna...@apache.org> > > > > > wrote: > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > thank you for the feedback so far. I've checked the length of the > > > > > previous > > > > > > last release cycles. Up until Flink 1.14, we've actually been > > > > incredibly > > > > > > good at maintaining a 5 months release cycle (see below). > > > > Interestingly, > > > > > > the community is officially targeting a 4 months release cycle > [1]. > > > > > > > > > > > > - 1.15.0 2022-05-01? (7 months, 2 days?) > > > > > > - 1.14.0: 2021-09-29 (4 months, 26 days) > > > > > > - 1.13.0: 2021-05-03 (4 months, 23 days) > > > > > > - 1.12.0: 2020-12-10 (5 months, 3 days) > > > > > > - 1.11.0: 2020-07-07 (4 months, 26 days) > > > > > > - 1.10.0: 2020-02-11 > > > > > > > > > > > > The 1.15 release cycle has took significantly longer. In my > opinion > > > we > > > > > > should try to get back into the 5 months cadence with the next > > > release. > > > > > > Since empirically we always often end up moving the the feature > > > freeze > > > > > by a > > > > > > week or two, and that we often need about a month for release > > > testing & > > > > > > stabilization and releasing, I don't think, we should move the > > > planned > > > > > > feature freeze to later than > > > > > > *mid August. * > > > > > > What do you think: > > > > > > 1. Should we continue to strive for a 5 months release cycle (and > > > > update > > > > > > [1] accordingly)? > > > > > > 2. Does it sound reasonable to target a feature freeze date, > which > > is > > > > 1.5 > > > > > > months before to the desired release date? > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Konstantin > > > > > > > > > > > > [1] > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Time-based+releases > > > > > > > > > > > > Am Do., 28. Apr. 2022 um 05:20 Uhr schrieb Jingsong Li < > > > > > > jingsongl...@gmail.com>: > > > > > > > > > > > > > Thanks Konstantin and Chesnay for starting the discussion. And > > > thanks > > > > > > > Konstantin, Chesnay, Godfrey, Martijn for volunteering. > > > > > > > > > > > > > > The 1.16 release at the end of July means that there are > > currently > > > > > > > only 3 months to fully commit to 1.16 development, and I'm a > > little > > > > > > > concerned that this is too short a time frame, which may result > > in > > > > > > > some features only reaching a halfway decent state. > > > > > > > > > > > > > > Best, > > > > > > > Jingsong > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 27, 2022 at 7:10 PM David Morávek > > > > > wrote: > > > > > > > > > > > > > > > > Thanks Konstantin and Chesnay for starting the
Re: [VOTE] Apache Flink Table Store 0.1.0, release candidate #2
+1 (binding) * mvn clean package - PASSED * checked signatures & checksums of source artifacts - OK * went through quick start - WORKS * skimmed over NOTICE file - LOOKS GOOD I also read over the announcement blog post. In my opinion, we could try to motivate the project a bit better. What is the problem that flink-table-store will solve? What is its relationship to Apache Flink itself? Since this is the first release, I am sure a lot of users will ask themselves these questions. Am Fr., 29. Apr. 2022 um 11:49 Uhr schrieb Caizhi Weng : > Hi all! > > +1 for the release (non-binding). I've tested the jar with a standalone > cluster and SQL client. > >- Compiled the sources; >- Run through quick start guide; >- Test all supported data types; >- Check that table store jar has no conflict with orc / avro format and >kafka connector; >- Check that both file store and log store works. > > I haven't checked the signatures as my gnupg version is old. > > Jingsong Li 于2022年4月29日周五 10:24写道: > > > Hi everyone, > > > > Please review and vote on the release candidate #2 for the version 0.1.0 > of > > Apache Flink Table Store, as follows: > > > > [ ] +1, Approve the release > > [ ] -1, Do not approve the release (please provide specific comments) > > > > **Release Overview** > > > > As an overview, the release consists of the following: > > a) Table Store canonical source distribution, to be deployed to the > > release repository at dist.apache.org > > b) Maven artifacts to be deployed to the Maven Central Repository > > > > **Staging Areas to Review** > > > > The staging areas containing the above mentioned artifacts are as > follows, > > for your review: > > * All artifacts for a) and b) can be found in the corresponding dev > > repository at dist.apache.org [2] > > * All artifacts for c) can be found at the Apache Nexus Repository [3] > > * Pre Bundled Binaries Jar can work fine with quick start [4][5] > > > > All artifacts are signed with the key > > 2C2B6A653B07086B65E4369F7C76245E0A318150 [6] > > > > Other links for your review: > > * JIRA release notes [7] > > * source code tag "release-0.1.0-rc2" [8] > > * PR to update the website Downloads page to include Table Store > > links [9] > > > > **Vote Duration** > > > > The voting time will run for at least 72 hours. > > It is adopted by majority approval, with at least 3 PMC affirmative > votes. > > > > Best, > > Jingsong Lee > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release > > [2] > > > https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/ > > [3] > > https://repository.apache.org/content/repositories/orgapacheflink-1502/ > > [4] > > > https://repository.apache.org/content/repositories/orgapacheflink-1502/org/apache/flink/flink-table-store-dist/0.1.0/flink-table-store-dist-0.1.0.jar > > [5] > > > https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/ > > [6] https://dist.apache.org/repos/dist/release/flink/KEYS > > [7] > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351234 > > [8] https://github.com/apache/flink-table-store/tree/release-0.1.0-rc2 > > [9] https://github.com/apache/flink-web/pull/531 > > > -- https://twitter.com/snntrable https://github.com/knaufk
Re: [DISCUSS] FLIP-220: Temporal State
Hi all, Yun, David M, David A, and I had an offline discussion and talked through a couple of details that emerged from the discussion here. We believe, we have found a consensus on these points and would like to share our points for further feedback: Let me try to get through the points that were opened in arbitrary order: 1. We want to offer a generic interface for sorted state, not just temporal state as proposed initially. We would like to... a) ...offer a single new state type similar to what TemporalListState was offering (so not offering something like TemporalValueState to keep the API slim). b) ...name it BinarySortedMultiMap with Java-Object keys and values (I'll go into the API further below) - the naming stresses on "Binary" because we want to make clear that this is what the sort will be based on (see below) c) ...have our own state descriptor (BinarySortedMultiMapStateDescriptor) similar to MapStateDescriptor d) ...require TypeSerializer implementations for the key to extend from LexicographicalTypeSerializer (details below) 2. LexicographicalTypeSerializer basically defines the sort order when retrieving values: it is based on the serialized binaries, comparing them one- by-one in an unsigned fashion. For heap-based state backends, these serializers can also optionally define a Comparator that doesn't require serialization but needs to retain the same sort order. We would provide implementations of the range-based operations that will iterate based on binary keys if this is not provided (by simply converting all relevant keys to their binary form and using an appropriate comparator). ``` public interface LexicographicalTypeSerializer extends TypeSerializer { default Optional> findComparator() { return Optional>.empty() } } ``` 3. BinarySortedMultiMap should offer the following API ``` public class BinarySortedMultiMap extends State { void put(UK key, Collection) throws Exception; void add(UK key, UV value) throws Exception; Map.Entry entryAt(UK key) throws Exception; Map.Entry firstEntry() throws Exception; Map.Entry lastEntry() throws Exception; Iterable> readRange(UK fromKey, UK toKey) throws Exception; Iterable> readRangeUntil(UK endUserKey) throws Exception; Iterable> readRangeFrom(UK startUserKey) throws Exception; Iterable> clearRange(UK fromKey, UK toKey) throws Exception; Iterable> clearRangeUntil(UK endUserKey) throws Exception; Iterable> clearRangeFrom(UK startUserKey) throws Exception; } ``` That's for the core of the points - following a few more things that came up and some arguments about the "why": A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]? -> We looked at various use cases and didn't find a strong need because you could always fall back to readRange*. In the interest of a slim API, we thought it would be best to start without these (we can always add them later) A2) Should we support iterating backwards? -> We haven't found a compelling use case that needs this. If you need it, at least for some (?) use cases, you could negate the sort order through the serializer and achieve the same thing (unless you need to walk in two directions). Let's rather start with a slim API. A3) Lazy vs. eager iteration -> Let's implement our iterators similarly to RocksDBMapIterator by eagerly retrieving a couple of values (defaults to 128 here) into a cache. This seems to be the best of both worlds without bloating the API A4) ChangelogStateBackend -> Since we require TypeSerializer implementations for the key and those know the length to serializer (from other requirements, e.g. in the network stack), it isn't affected by our change except for delegating the new operations to the underlying state backend. A5) Defining the binary sort order as one-by-one with unsigned bytes is fine because it is a very common thing among storage systems. Should a different binary-based state backend require something else, there could be a mapping function translating between different definitions. A6) How to handle Duplicates -> We let the user handle this by storing a multi-map, i.e. multiple values for the (primary) sort key. If needed, users can sort these values manually. As long as we don't have a strong use case where this is not feasible, we don't need any implicit duplicate handling by the framework (Flink). A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator -> We propose to use readRange*** because that seems more explicit/intuitive in what this is doing. A8) readRange*** with inclusive/exclusive switch -> In the interest of a slim API, let's not provide that. The API above will interpret all keys as _inclusive_ and should a user need exclusive behaviour, they would in the worst case read one more entry - in most of the cases, however, this would be served from the cache anyway, so it's not much of a problem A9) Why don't we want to provide a BinarySortedMap with value-like
Re: [DISCUSS] FLIP-227: Support overdraft buffer
Hi, Thanks for your feedback. I have a servel of questions. 1. Do you mean split this into two JIRAs or two PRs or two commits in a PR? 2. For the first task, if the flink user disables the Unaligned Checkpoint, do we ignore max buffers per channel? Because the overdraft isn't useful for the Aligned Checkpoint, it still needs to wait for downstream Task to consume. 3. For the second task - The default value of maxOverdraftBuffersPerPartition may also need to be discussed. - If the user disables the Unaligned Checkpoint, can we set the maxOverdraftBuffersPerPartition=0? Because the overdraft isn't useful for the Aligned Checkpoint. 4. For the legacy source - If enabling the Unaligned Checkpoint, it uses up to maxOverdraftBuffersPerPartition buffers. - If disabling the UC, it doesn't use the overdraft buffer. - Do you think it's ok? - Actually, we added the checkAvailable logic for LegacySource in our internal version. It works well. 5. For the benchmark, do you have any suggestions? I submitted the PR [1]. [1] https://github.com/apache/flink-benchmarks/pull/54 Thanks fanrui On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov wrote: > Hi, > > We discuss about it a little with Dawid Wysakowicz. Here is some > conclusion: > > First of all, let's split this into two tasks. > > The first task is about ignoring max buffers per channel. This means if > we request a memory segment from LocalBufferPool and the > maxBuffersPerChannel is reached for this channel, we just ignore that > and continue to allocate buffer while LocalBufferPool has it(it is > actually not a overdraft). > > The second task is about the real overdraft. I am pretty convinced now > that we, unfortunately, need configuration for limitation of overdraft > number(because it is not ok if one subtask allocates all buffers of one > TaskManager considering that several different jobs can be submitted on > this TaskManager). So idea is to have > maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool). > In this case, when a limit of buffers in LocalBufferPool is reached, > LocalBufferPool can request additionally from NetworkBufferPool up to > maxOverdraftBuffersPerPartition buffers. > > > But it is still not clear how to handle LegacySource since it actually > works as unlimited flatmap and it will always work in overdraft mode > which is not a target. So we still need to think about that. > > > 29.04.2022 11:11, rui fan пишет: > > Hi Anton Kalashnikov, > > > > I think you agree with we should limit the maximum number of overdraft > > segments that each LocalBufferPool can apply for, right? > > > > I prefer to hard code the maxOverdraftBuffers due to don't add the new > > configuration. And I hope to hear more from the community. > > > > Best wishes > > fanrui > > > > On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote: > > > >> Hi Anton Kalashnikov, > >> > >> Thanks for your very clear reply, I think you are totally right. > >> > >> The 'maxBuffersNumber - buffersInUseNumber' can be used as the > >> overdraft buffer, it won't need the new buffer configuration.Flink users > >> can turn up the maxBuffersNumber to control the overdraft buffer size. > >> > >> Also, I‘d like to add some information. For safety, we should limit the > >> maximum number of overdraft segments that each LocalBufferPool > >> can apply for. > >> > >> Why do we limit it? > >> Some operators don't check the `recordWriter.isAvailable` during > >> processing records, such as LegacySource. I have mentioned it in > >> FLINK-26759 [1]. I'm not sure if there are other cases. > >> > >> If don't add the limitation, the LegacySource will use up all remaining > >> memory in the NetworkBufferPool when the backpressure is severe. > >> > >> How to limit it? > >> I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions` > >> in the constructor of LocalBufferPool. The maxOverdraftBuffers is just > >> for safety, and it should be enough for most flink jobs. Or we can set > >> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle > >> some jobs of low parallelism. > >> > >> Also if user don't enable the Unaligned Checkpoint, we can set > >> maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because > >> the overdraft isn't useful for the Aligned Checkpoint. > >> > >> Please correct me if I'm wrong. Thanks a lot. > >> > >> [1] https://issues.apache.org/jira/browse/FLINK-26759 > >> > >> Best wishes > >> fanrui > >> > >> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov > >> wrote: > >> > >>> Hi fanrui, > >>> > >>> Thanks for creating the FLIP. > >>> > >>> In general, I think the overdraft is good idea and it should help in > >>> described above cases. Here are my thoughts about configuration: > >>> > >>> Please, correct me if I am wrong but as I understand right now we have > >>> following calculation. > >>> > >>>
Re: [DISCUSS] FLIP-227: Support overdraft buffer
Hi, We discuss about it a little with Dawid Wysakowicz. Here is some conclusion: First of all, let's split this into two tasks. The first task is about ignoring max buffers per channel. This means if we request a memory segment from LocalBufferPool and the maxBuffersPerChannel is reached for this channel, we just ignore that and continue to allocate buffer while LocalBufferPool has it(it is actually not a overdraft). The second task is about the real overdraft. I am pretty convinced now that we, unfortunately, need configuration for limitation of overdraft number(because it is not ok if one subtask allocates all buffers of one TaskManager considering that several different jobs can be submitted on this TaskManager). So idea is to have maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool). In this case, when a limit of buffers in LocalBufferPool is reached, LocalBufferPool can request additionally from NetworkBufferPool up to maxOverdraftBuffersPerPartition buffers. But it is still not clear how to handle LegacySource since it actually works as unlimited flatmap and it will always work in overdraft mode which is not a target. So we still need to think about that. 29.04.2022 11:11, rui fan пишет: Hi Anton Kalashnikov, I think you agree with we should limit the maximum number of overdraft segments that each LocalBufferPool can apply for, right? I prefer to hard code the maxOverdraftBuffers due to don't add the new configuration. And I hope to hear more from the community. Best wishes fanrui On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote: Hi Anton Kalashnikov, Thanks for your very clear reply, I think you are totally right. The 'maxBuffersNumber - buffersInUseNumber' can be used as the overdraft buffer, it won't need the new buffer configuration.Flink users can turn up the maxBuffersNumber to control the overdraft buffer size. Also, I‘d like to add some information. For safety, we should limit the maximum number of overdraft segments that each LocalBufferPool can apply for. Why do we limit it? Some operators don't check the `recordWriter.isAvailable` during processing records, such as LegacySource. I have mentioned it in FLINK-26759 [1]. I'm not sure if there are other cases. If don't add the limitation, the LegacySource will use up all remaining memory in the NetworkBufferPool when the backpressure is severe. How to limit it? I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions` in the constructor of LocalBufferPool. The maxOverdraftBuffers is just for safety, and it should be enough for most flink jobs. Or we can set `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle some jobs of low parallelism. Also if user don't enable the Unaligned Checkpoint, we can set maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because the overdraft isn't useful for the Aligned Checkpoint. Please correct me if I'm wrong. Thanks a lot. [1] https://issues.apache.org/jira/browse/FLINK-26759 Best wishes fanrui On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov wrote: Hi fanrui, Thanks for creating the FLIP. In general, I think the overdraft is good idea and it should help in described above cases. Here are my thoughts about configuration: Please, correct me if I am wrong but as I understand right now we have following calculation. maxBuffersNumber(per TaskManager) = Network memory(calculated via taskmanager.memory.network.fraction, taskmanager.memory.network.min, taskmanager.memory.network.max and total memory size) / taskmanager.memory.segment-size. requiredBuffersNumber(per TaskManager) = (exclusive buffers * parallelism + floating buffers) * subtasks number in TaskManager buffersInUseNumber = real number of buffers which used at current moment(always <= requiredBuffersNumber) Ideally requiredBuffersNumber should be equal to maxBuffersNumber which allows Flink work predictibly. But if requiredBuffersNumber > maxBuffersNumber sometimes it is also fine(but not good) since not all required buffers really mandatory(e.g. it is ok if Flink can not allocate floating buffers) But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink just never use these leftovers buffers(maxBuffersNumber - requiredBuffersNumber). Which I propose to use. ( we can actualy use even difference 'requiredBuffersNumber - buffersInUseNumber' since if one TaskManager contains several operators including 'window' which can temporally borrow buffers from the global pool). My proposal, more specificaly(it relates only to requesting buffers during processing single record while switching to unavalability between records should be the same as we have it now): * If one more buffer requested but maxBuffersPerChannel reached, then just ignore this limitation and allocate this buffers from any place(from LocalBufferPool if it has something yet otherwise from NetworkBufferPool) * If LocalBufferPool exceeds limit, then
[jira] [Created] (FLINK-27456) mistake and confusion with CEP example in docs
David Anderson created FLINK-27456: -- Summary: mistake and confusion with CEP example in docs Key: FLINK-27456 URL: https://issues.apache.org/jira/browse/FLINK-27456 Project: Flink Issue Type: Bug Components: Documentation, Library / CEP Affects Versions: 1.14.4 Reporter: David Anderson [https://nightlies.apache.org/flink/flink-docs-master/docs/libs/cep/#contiguity-within-looping-patterns] In the section of the docs on contiguity within looping patterns, what it says about strict contiguity for the given example is either incorrect or very confusing (or both). It doesn't help that the example code doesn't precisely match the scenario described in the text. To study this, I implemented the example in the text and find it produces no output for strict contiguity (as I expected), which contradicts what the text says. {code:java} public class StreamingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream events = env.fromElements("a", "b1", "d1", "b2", "d2", "b3", "c"); AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent(); Pattern pattern = Pattern.begin("a", skipStrategy) .where( new SimpleCondition() { @Override public boolean filter(String element) throws Exception { return element.startsWith("a"); } }) .next("b+") .where( new SimpleCondition() { @Override public boolean filter(String element) throws Exception { return element.startsWith("b"); } }) .oneOrMore().consecutive() .next("c") .where( new SimpleCondition() { @Override public boolean filter(String element) throws Exception { return element.startsWith("c"); } }); PatternStream patternStream = CEP.pattern(events, pattern).inProcessingTime(); patternStream.select(new SelectSegment()).addSink(new PrintSinkFunction<>(true)); env.execute(); } public static class SelectSegment implements PatternSelectFunction { public String select(Map> pattern) { return String.join("", pattern.get("a")) + String.join("", pattern.get("b+")) + String.join("", pattern.get("c")); } } } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27455) [JUnit5 Migration] SnapshotMigrationTestBase
Chesnay Schepler created FLINK-27455: Summary: [JUnit5 Migration] SnapshotMigrationTestBase Key: FLINK-27455 URL: https://issues.apache.org/jira/browse/FLINK-27455 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store
Hi devs, I want to start a discussion about Schema Evolution on the Flink Table Store. [1] In FLINK-21634, We plan to support many schema changes in Flink SQL. But for the current Table Store, it may result in wrong data, unclear evolutions. In general, the user has these operations for schema: - Add column: Adding a column to a table. - Modify column type. - Drop column: Drop a column. - Rename column: For example, rename the "name_1" column to "name_2". Another schema change is partition keys, the data is changing over time, for example, a table with day partition, as the business continues to grow, the new partition of the table by day will become larger and the business wants to change to hourly partitions. A simple approach is to rewrite all the existing data when modifying the schema. But this expensive way is not acceptable to the user, so we need to support and define it clearly. Modifying the schema does not rewrite the existing data, when reading the original data needs to evolve to the current schema. Look forward to your feedback! [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store Best, Jingsong
Re: [VOTE] Apache Flink Table Store 0.1.0, release candidate #2
Hi all! +1 for the release (non-binding). I've tested the jar with a standalone cluster and SQL client. - Compiled the sources; - Run through quick start guide; - Test all supported data types; - Check that table store jar has no conflict with orc / avro format and kafka connector; - Check that both file store and log store works. I haven't checked the signatures as my gnupg version is old. Jingsong Li 于2022年4月29日周五 10:24写道: > Hi everyone, > > Please review and vote on the release candidate #2 for the version 0.1.0 of > Apache Flink Table Store, as follows: > > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > **Release Overview** > > As an overview, the release consists of the following: > a) Table Store canonical source distribution, to be deployed to the > release repository at dist.apache.org > b) Maven artifacts to be deployed to the Maven Central Repository > > **Staging Areas to Review** > > The staging areas containing the above mentioned artifacts are as follows, > for your review: > * All artifacts for a) and b) can be found in the corresponding dev > repository at dist.apache.org [2] > * All artifacts for c) can be found at the Apache Nexus Repository [3] > * Pre Bundled Binaries Jar can work fine with quick start [4][5] > > All artifacts are signed with the key > 2C2B6A653B07086B65E4369F7C76245E0A318150 [6] > > Other links for your review: > * JIRA release notes [7] > * source code tag "release-0.1.0-rc2" [8] > * PR to update the website Downloads page to include Table Store > links [9] > > **Vote Duration** > > The voting time will run for at least 72 hours. > It is adopted by majority approval, with at least 3 PMC affirmative votes. > > Best, > Jingsong Lee > > [1] > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release > [2] > https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/ > [3] > https://repository.apache.org/content/repositories/orgapacheflink-1502/ > [4] > https://repository.apache.org/content/repositories/orgapacheflink-1502/org/apache/flink/flink-table-store-dist/0.1.0/flink-table-store-dist-0.1.0.jar > [5] > https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/ > [6] https://dist.apache.org/repos/dist/release/flink/KEYS > [7] > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351234 > [8] https://github.com/apache/flink-table-store/tree/release-0.1.0-rc2 > [9] https://github.com/apache/flink-web/pull/531 >
[jira] [Created] (FLINK-27454) Remove inheritance from TestBaseUtils
Chesnay Schepler created FLINK-27454: Summary: Remove inheritance from TestBaseUtils Key: FLINK-27454 URL: https://issues.apache.org/jira/browse/FLINK-27454 Project: Flink Issue Type: Technical Debt Components: Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 The TestBaseUtils is really just a collection of static methods. There's no real reason to inherit from it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27453) Cleanup TestBaseUtils
Chesnay Schepler created FLINK-27453: Summary: Cleanup TestBaseUtils Key: FLINK-27453 URL: https://issues.apache.org/jira/browse/FLINK-27453 Project: Flink Issue Type: Technical Debt Components: Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.16.0 Contains a bunch of unused methods or ones that are only used internally but are public. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27452) Move Te
Chesnay Schepler created FLINK-27452: Summary: Move Te Key: FLINK-27452 URL: https://issues.apache.org/jira/browse/FLINK-27452 Project: Flink Issue Type: Technical Debt Reporter: Chesnay Schepler -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS)
Thanks for for driving this work, it's to be a useful feature. About the flip-218, I have some questions. 1: Does our CTAS syntax support specify target table's schema including column name and data type? I think it maybe a useful fature in case we want to change the data types in target table instead of always copy the source table's schema. It'll be more flexible with this feature. Btw, MySQL's "CREATE TABLE ... SELECT Statement"[1] support this feature. 2: Seems it'll requre sink to implement an public interface to drop table, so what's the interface will look like? [1] https://dev.mysql.com/doc/refman/8.0/en/create-table-select.html Best regards, Yuxia - 原始邮件 - 发件人: "Mang Zhang" 收件人: "dev" 发送时间: 星期四, 2022年 4 月 28日 下午 4:57:24 主题: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) Hi, everyone I would like to open a discussion for support select clause in CREATE TABLE(CTAS), With the development of business and the enhancement of flink sql capabilities, queries become more and more complex. Now the user needs to use the Create Table statement to create the target table first, and then execute the insert statement. However, the target table may have many columns, which will bring a lot of work outside the business logic to the user. At the same time, ensure that the schema of the created target table is consistent with the schema of the query result. Using a CTAS syntax like Hive/Spark can greatly facilitate the user. You can find more details in FLIP-218[1]. Looking forward to your feedback. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-218%3A+Support+SELECT+clause+in+CREATE+TABLE(CTAS) -- Best regards, Mang Zhang
Re: [DISCUSS] Planning Flink 1.16
Thanks Konstantin and Chesnay for starting the discussion. I'm also willing to volunteer as the release manager if this is still open. Regarding the feature freeze date, +1 to the end of mid August. Best, Xingbo Zhu Zhu 于2022年4月29日周五 11:01写道: > +1 for a 5 months release cycle. > +1 target the feature freeze date 1.5 months before the release date. It > can better guard the release date. > Therefore, also +1 for mid August as the feature freeze date of 1.16 > > Thanks, > Zhu > > Jark Wu 于2022年4月28日周四 22:24写道: > > > I'm also fine with the end of July for the feature freeze. > > > > Best, > > Jark > > > > On Thu, 28 Apr 2022 at 21:00, Martijn Visser > > wrote: > > > > > +1 for continuing to strive for a 5 months release cycle. > > > > > > And +1 to have the planned feature freeze mid August, which I would > > propose > > > to have happen on Monday the 15th of August 2022. I would also already > > > state that even though we know this is a holiday period, we should not > > > extend this deadline for that reason :) > > > > > > Best regards, > > > > > > Martijn Visser > > > https://twitter.com/MartijnVisser82 > > > https://github.com/MartijnVisser > > > > > > > > > On Thu, 28 Apr 2022 at 14:37, Jingsong Li > > wrote: > > > > > > > Thanks for the check. > > > > > > > > > Continue to strive for a 5 months release cycle and 1.5 months > before > > > to > > > > the desired release date > > > > > > > > Sounds good to me! > > > > > > > > Best, > > > > Jingsong > > > > > > > > On Thu, Apr 28, 2022 at 7:06 PM Konstantin Knauf > > > > wrote: > > > > > > > > > > Hi everyone, > > > > > > > > > > thank you for the feedback so far. I've checked the length of the > > > > previous > > > > > last release cycles. Up until Flink 1.14, we've actually been > > > incredibly > > > > > good at maintaining a 5 months release cycle (see below). > > > Interestingly, > > > > > the community is officially targeting a 4 months release cycle [1]. > > > > > > > > > > - 1.15.0 2022-05-01? (7 months, 2 days?) > > > > > - 1.14.0: 2021-09-29 (4 months, 26 days) > > > > > - 1.13.0: 2021-05-03 (4 months, 23 days) > > > > > - 1.12.0: 2020-12-10 (5 months, 3 days) > > > > > - 1.11.0: 2020-07-07 (4 months, 26 days) > > > > > - 1.10.0: 2020-02-11 > > > > > > > > > > The 1.15 release cycle has took significantly longer. In my opinion > > we > > > > > should try to get back into the 5 months cadence with the next > > release. > > > > > Since empirically we always often end up moving the the feature > > freeze > > > > by a > > > > > week or two, and that we often need about a month for release > > testing & > > > > > stabilization and releasing, I don't think, we should move the > > planned > > > > > feature freeze to later than > > > > > *mid August. * > > > > > What do you think: > > > > > 1. Should we continue to strive for a 5 months release cycle (and > > > update > > > > > [1] accordingly)? > > > > > 2. Does it sound reasonable to target a feature freeze date, which > is > > > 1.5 > > > > > months before to the desired release date? > > > > > > > > > > Cheers, > > > > > > > > > > Konstantin > > > > > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/Time-based+releases > > > > > > > > > > Am Do., 28. Apr. 2022 um 05:20 Uhr schrieb Jingsong Li < > > > > > jingsongl...@gmail.com>: > > > > > > > > > > > Thanks Konstantin and Chesnay for starting the discussion. And > > thanks > > > > > > Konstantin, Chesnay, Godfrey, Martijn for volunteering. > > > > > > > > > > > > The 1.16 release at the end of July means that there are > currently > > > > > > only 3 months to fully commit to 1.16 development, and I'm a > little > > > > > > concerned that this is too short a time frame, which may result > in > > > > > > some features only reaching a halfway decent state. > > > > > > > > > > > > Best, > > > > > > Jingsong > > > > > > > > > > > > > > > > > > On Wed, Apr 27, 2022 at 7:10 PM David Morávek > > > wrote: > > > > > > > > > > > > > > Thanks Konstantin and Chesnay for starting the discussion and > > > > > > volunteering. > > > > > > > The timeline proposal sounds reasonable :+1: > > > > > > > > > > > > > > Best, > > > > > > > D. > > > > > > > > > > > > > > On Tue, Apr 26, 2022 at 1:37 PM Martijn Visser < > > > > martijnvis...@apache.org > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > Thanks for starting this discussion. I would also volunteer > to > > > help > > > > > > out as > > > > > > > > a release manager for the 1.16 release. > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > Martijn Visser > > > > > > > > https://twitter.com/MartijnVisser82 > > > > > > > > https://github.com/MartijnVisser > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 26 Apr 2022 at 13:19, godfrey he < > godfre...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > > > Hi Konstantin & Chesnay, > > > > > > > > > > > > > > > > > > Thanks for driving
Re: [DISCUSS] FLIP-227: Support overdraft buffer
Hi Anton Kalashnikov, I think you agree with we should limit the maximum number of overdraft segments that each LocalBufferPool can apply for, right? I prefer to hard code the maxOverdraftBuffers due to don't add the new configuration. And I hope to hear more from the community. Best wishes fanrui On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote: > Hi Anton Kalashnikov, > > Thanks for your very clear reply, I think you are totally right. > > The 'maxBuffersNumber - buffersInUseNumber' can be used as the > overdraft buffer, it won't need the new buffer configuration.Flink users > can turn up the maxBuffersNumber to control the overdraft buffer size. > > Also, I‘d like to add some information. For safety, we should limit the > maximum number of overdraft segments that each LocalBufferPool > can apply for. > > Why do we limit it? > Some operators don't check the `recordWriter.isAvailable` during > processing records, such as LegacySource. I have mentioned it in > FLINK-26759 [1]. I'm not sure if there are other cases. > > If don't add the limitation, the LegacySource will use up all remaining > memory in the NetworkBufferPool when the backpressure is severe. > > How to limit it? > I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions` > in the constructor of LocalBufferPool. The maxOverdraftBuffers is just > for safety, and it should be enough for most flink jobs. Or we can set > `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle > some jobs of low parallelism. > > Also if user don't enable the Unaligned Checkpoint, we can set > maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because > the overdraft isn't useful for the Aligned Checkpoint. > > Please correct me if I'm wrong. Thanks a lot. > > [1] https://issues.apache.org/jira/browse/FLINK-26759 > > Best wishes > fanrui > > On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov > wrote: > >> Hi fanrui, >> >> Thanks for creating the FLIP. >> >> In general, I think the overdraft is good idea and it should help in >> described above cases. Here are my thoughts about configuration: >> >> Please, correct me if I am wrong but as I understand right now we have >> following calculation. >> >> maxBuffersNumber(per TaskManager) = Network memory(calculated via >> taskmanager.memory.network.fraction, taskmanager.memory.network.min, >> taskmanager.memory.network.max and total memory size) / >> taskmanager.memory.segment-size. >> >> requiredBuffersNumber(per TaskManager) = (exclusive buffers * >> parallelism + floating buffers) * subtasks number in TaskManager >> >> buffersInUseNumber = real number of buffers which used at current >> moment(always <= requiredBuffersNumber) >> >> Ideally requiredBuffersNumber should be equal to maxBuffersNumber which >> allows Flink work predictibly. But if requiredBuffersNumber > >> maxBuffersNumber sometimes it is also fine(but not good) since not all >> required buffers really mandatory(e.g. it is ok if Flink can not >> allocate floating buffers) >> >> But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink >> just never use these leftovers buffers(maxBuffersNumber - >> requiredBuffersNumber). Which I propose to use. ( we can actualy use >> even difference 'requiredBuffersNumber - buffersInUseNumber' since if >> one TaskManager contains several operators including 'window' which can >> temporally borrow buffers from the global pool). >> >> My proposal, more specificaly(it relates only to requesting buffers >> during processing single record while switching to unavalability between >> records should be the same as we have it now): >> >> * If one more buffer requested but maxBuffersPerChannel reached, then >> just ignore this limitation and allocate this buffers from any >> place(from LocalBufferPool if it has something yet otherwise from >> NetworkBufferPool) >> >> * If LocalBufferPool exceeds limit, then temporally allocate it from >> NetworkBufferPool while it has something to allocate >> >> >> Maybe I missed something and this solution won't work, but I like it >> since on the one hand, it work from the scratch without any >> configuration, on the other hand, it can be configuration by changing >> proportion of maxBuffersNumber and requiredBuffersNumber. >> >> The last thing that I want to say, I don't really want to implement new >> configuration since even now it is not clear how to correctly configure >> network buffers with existing configuration and I don't want to >> complicate it, especially if it will be possible to resolve the problem >> automatically(as described above). >> >> >> So is my understanding about network memory/buffers correct? >> >> -- >> >> Best regards, >> Anton Kalashnikov >> >> 27.04.2022 07:46, rui fan пишет: >> > Hi everyone, >> > >> > Unaligned Checkpoint (FLIP-76 [1]) is a major feature of Flink. It >> > effectively solves the problem of checkpoint timeout or slow >> > checkpoint when backpressure is severe. >> > >> > We
[jira] [Created] (FLINK-27451) Enable the validator plugin in webhook
Aitozi created FLINK-27451: -- Summary: Enable the validator plugin in webhook Key: FLINK-27451 URL: https://issues.apache.org/jira/browse/FLINK-27451 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.0.0 Reporter: Aitozi Currently the validator plugin is only enable in the operator. I think it should also be enabled in webhook. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-224: Blacklist Mechanism
Thanks for driving this, Zhu and Lijie. +1 for the overall proposal. Just share some cents here: - Why do we need to expose cluster.resource-blacklist.item.timeout-check-interval to the user? I think the semantics of `cluster.resource-blacklist.item.timeout` is sufficient for the user. How to guarantee the timeout mechanism is Flink's internal implementation. I think it will be very confusing and we do not need to expose it to users. - ResourceManager can notify the exception of a task manager to `BlacklistHandler` as well. For example, the slot allocation might fail in case the target task manager is busy or has a network jitter. I don't mean we need to cover this case in this version, but we can also open a `notifyException` in `ResourceManagerBlacklistHandler`. - Before we sync the blocklist to ResourceManager, will the slot of a blocked task manager continues to be released and allocated? Best, Yangze Guo On Thu, Apr 28, 2022 at 3:11 PM Lijie Wang wrote: > > Hi Konstantin, > > Thanks for your feedback. I will response your 4 remarks: > > > 1) Thanks for reminding me of the controversy. I think “BlockList” is good > enough, and I will change it in FLIP. > > > 2) Your suggestion for the REST API is a good idea. Based on the above, I > would change REST API as following: > > POST/GET /blocklist/nodes > > POST/GET /blocklist/taskmanagers > > DELETE /blocklist/node/ > > DELETE /blocklist/taskmanager/ > > > 3) If a node is blocking/blocklisted, it means that all task managers on > this node are blocklisted. All slots on these TMs are not available. This > is actually a bit like TM losts, but these TMs are not really lost, they > are in an unavailable status, and they are still registered in this flink > cluster. They will be available again once the corresponding blocklist item > is removed. This behavior is the same in active/non-active clusters. > However in the active clusters, these TMs may be released due to idle > timeouts. > > > 4) For the item timeout, I prefer to keep it. The reasons are as following: > > a) The timeout will not affect users adding or removing items via REST API, > and users can disable it by configuring it to Long.MAX_VALUE . > > b) Some node problems can recover after a period of time (such as machine > hotspots), in which case users may prefer that Flink can do this > automatically instead of requiring the user to do it manually. > > > Best, > > Lijie > > Konstantin Knauf 于2022年4月27日周三 19:23写道: > > > Hi Lijie, > > > > I think, this makes sense and +1 to only support manually blocking > > taskmanagers and nodes. Maybe the different strategies can also be > > maintained outside of Apache Flink. > > > > A few remarks: > > > > 1) Can we use another term than "bla.cklist" due to the controversy around > > the term? [1] There was also a Jira Ticket about this topic a while back > > and there was generally a consensus to avoid the term blacklist & whitelist > > [2]? We could use "blocklist" "denylist" or "quarantined" > > 2) For the REST API, I'd prefer a slightly different design as verbs like > > add/remove often considered an anti-pattern for REST APIs. POST on a list > > item is generally the standard to add items. DELETE on the individual > > resource is standard to remove an item. > > > > POST /quarantine/items > > DELETE /quarantine/items/ > > > > We could also consider to separate taskmanagers and nodes in the REST API > > (and internal data structures). Any opinion on this? > > > > POST/GET /quarantine/nodes > > POST/GET /quarantine/taskmanager > > DELETE /quarantine/nodes/ > > DELETE /quarantine/taskmanager/ > > > > 3) How would blocking nodes behave with non-active resource managers, i.e. > > standalone or reactive mode? > > > > 4) To keep the implementation even more minimal, do we need the timeout > > behavior? If items are added/removed manually we could delegate this to the > > user easily. In my opinion the timeout behavior would better fit into > > specific strategies at a later point. > > > > Looking forward to your thoughts. > > > > Cheers and thank you, > > > > Konstantin > > > > [1] > > > > https://en.wikipedia.org/wiki/Blacklist_(computing)#Controversy_over_use_of_the_term > > [2] https://issues.apache.org/jira/browse/FLINK-18209 > > > > Am Mi., 27. Apr. 2022 um 04:04 Uhr schrieb Lijie Wang < > > wangdachui9...@gmail.com>: > > > > > Hi all, > > > > > > Flink job failures may happen due to cluster node issues (insufficient > > disk > > > space, bad hardware, network abnormalities). Flink will take care of the > > > failures and redeploy the tasks. However, due to data locality and > > limited > > > resources, the new tasks are very likely to be redeployed to the same > > > nodes, which will result in continuous task abnormalities and affect job > > > progress. > > > > > > Currently, Flink users need to manually identify the problematic node and > > > take it offline to solve this problem. But this approach has following > > > disadvantages: > > > > > > 1.
Re: [DISCUSS] FLIP-222: Support full query lifecycle statements in SQL client
Hi. Thanks for Paul's update. > It's better we can also get the infos about the cluster where the job is > running through the DESCRIBE statement. I just wonder how the users can get the web ui in the application mode. Therefore, it's better we can list the Web UI using the SHOW statement. WDYT? > QUERY or other keywords. I list the statement to manage the lifecycle of the query/dml in other systems: Mysql[1] allows users to SHOW [FULL] PROCESSLIST and use the KILL command to kill the query. ``` mysql> SHOW PROCESSLIST; mysql> KILL 27; ``` Postgres use the following statements to kill the queries. ``` SELECT pg_cancel_backend() SELECT pg_terminate_backend() ``` KSQL uses the following commands to control the query lifecycle[4]. ``` SHOW QUERIES; TERMINATE ; ``` [1] https://dev.mysql.com/doc/refman/8.0/en/show-processlist.html [2] https://scaledynamix.com/blog/how-to-kill-mysql-queries/ [3] https://stackoverflow.com/questions/35319597/how-to-stop-kill-a-query-in-postgresql [4] https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/show-queries/ [5] https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/terminate/ After the investigation, I am fine with the QUERY but the keyword JOB is also okay to me. We also have two questions here. 1. Could you add some details about the behaviour with the different execution.target, e.g. session, application mode? 2. Considering the SQL Client/Gateway is not limited to submitting the job to the specified cluster, is it able to list jobs in the other clusters? Best, Shengkai Paul Lam 于2022年4月28日周四 17:17写道: > Hi Martjin, > > Thanks a lot for your reply! I agree that the scope may be a bit confusing, > please let me clarify. > > The FLIP aims to add new SQL statements that are supported only in > sql-client, similar to > jar statements [1]. Jar statements can be parsed into jar operations, which > are used only in > CliClient in sql-client module and cannot be executed by TableEnvironment > (not available in > Table API program that contains SQL that you mentioned). > > WRT the unchanged CLI client, I mean CliClient instead of the sql-client > module, which > currently contains the gateway codes (e.g. Executor). The FLIP mainly > extends > the gateway part, and barely touches CliClient and REST server (REST > endpoint in FLIP-91). > > WRT the syntax, I don't have much experience with SQL standards, and I'd > like to hear > more opinions from the community. I prefer Hive-style syntax because I > think many users > are familiar with Hive, and there're on-going efforts to improve Flink-Hive > integration [2][3]. > But my preference is not strong, I'm okay with other options too. Do you > think JOB/Task is > a good choice, or do you have other preferred keywords? > > [1] > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/jar/ > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility > [3] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-223%3A+Support+HiveServer2+Endpoint > > Best, > Paul Lam > > Martijn Visser 于2022年4月26日周二 20:14写道: > > > Hi Paul, > > > > Thanks for creating the FLIP and opening the discussion. I did get a bit > > confused about the title, being "query lifecycle statements in SQL > client". > > This sounds like you want to adopt the SQL client, but you want to expand > > the SQL syntax with lifecycle statements, which could be used from the > SQL > > client, but of course also in a Table API program that contains SQL. > GIven > > that you're highlighting the CLI client as unchanged, this adds to more > > confusion. > > > > I am interested if there's anything listed in the SQL 2016 standard on > > these types of lifecycle statements. I did a quick scan for "SHOW > QUERIES" > > but couldn't find it. It would be great if we could stay as close as > > possible to such syntax. Overall I'm not in favour of using QUERIES as a > > keyword. I think Flink applications are not queries, but short- or long > > running applications. Why should we follow Hive's setup and indeed not > > others such as Snowflake, but also Postgres or MySQL? > > > > Best regards, > > > > Martijn Visser > > https://twitter.com/MartijnVisser82 > > https://github.com/MartijnVisser > > > > > > On Fri, 22 Apr 2022 at 12:06, Paul Lam wrote: > > > > > Hi Shengkai, > > > > > > Thanks a lot for your opinions! > > > > > > > 1. I think the keyword QUERY may confuse users because the statement > > also > > > > works for the DML statement. > > > > > > I slightly lean to QUERY, because: > > > > > > Hive calls DMLs queries. We could be better aligned with Hive using > > QUERY, > > > especially given that we plan to introduce Hive endpoint. > > > QUERY is a more SQL-like concept and friendly to SQL users. > > > > > > In general, my preference: QUERY > JOB > TASK. I’m okay with JOB, but > not > > > very good with TASK, as it conflicts with the task concept in Flink > >
Re: [DISCUSS] Keep the Community Member Information Up-to-Date
Hi Xintong, +1 to add a link for the full member information. And I think the avatars are very friendly for website viewers. The lazy update might be caused by the invitation letter to new committer/PMC do not have such hint to tell them to update the website. If we can: 1. Manually update the information on the website if someone voluntary, we can use the avatars from the github account by default. 2. Add such reminder information in the invitation letters. 3. Add descriptions in the website to tell viewers that the information might not be up-to-date. I think this looks like a better solution. Best Yun Tang From: tison Sent: Thursday, April 28, 2022 21:52 To: dev Subject: Re: [DISCUSS] Keep the Community Member Information Up-to-Date Hi Xintong, Thanks for starting this discussion. +0 to replace the information with link to https://projects.apache.org/committee.html?flink. +1 to add such a link. My opinion is that we the community doesn't have to keep the page up to date since Apache has a member page[1] that isn't up to date also. We can add one line to redirect to the whole list so that those who are "lazy" to add themselves on the page don't have to do it. And keep the table so that those who are proud to announce their membership or trying a commit with their commit access can do. Best, tison. [1] https://www.apache.org/foundation/members.html Xintong Song 于2022年4月28日周四 21:26写道: > > > > Personally I'm tempted to just link to > > https://projects.apache.org/committee.html?flink, if at all. > > > > Despite its fashionless look, I'm thinking the same thing... > > > Thank you~ > > Xintong Song > > > > On Thu, Apr 28, 2022 at 8:41 PM Jingsong Li > wrote: > > > One value is that this page has avatars. :-) > > > > Best, > > Jingsong > > > > On Thu, Apr 28, 2022 at 8:27 PM Chesnay Schepler > > wrote: > > > > > > Personally I'm tempted to just link to > > > https://projects.apache.org/committee.html?flink, if at all. > > > > > > I'm not sure overall whether this listing really provides value in the > > > first place. > > > > > > On 28/04/2022 13:58, Xintong Song wrote: > > > > Hi Flink Committers & PMC members, > > > > > > > > I just noticed that the list of community members on our website [1] > is > > > > quite out-of-date. According to the ASF roster [2], this project > > currently > > > > has 87 committers, 39 of which are PMC members. However, there's only > > 62 > > > > people listed on our website, and many (e.g., me) have outdated > roles. > > > > > > > > I believe the list on the website is supposed to be updated by each > new > > > > committer / PMC member. I remember reading somewhere that suggested > new > > > > committers to add themselves to this list as the first trying-out for > > > > merging changes. Unfortunately I couldn't find it anymore. > > > > > > > > Do you think we should keep the page manually updated, or shall we > > > > investigate some way to keep it automatically synchronized? > > > > > > > > Thank you~ > > > > > > > > Xintong Song > > > > > > > > > > > > [1] https://flink.apache.org/community.html > > > > > > > > [2] https://whimsy.apache.org/roster/committee/flink > > > > > > > > > >
Re: Re: [DISCUSS] FLIP-168: Speculative execution for Batch Job
Thank you for all the feedback! @Guowei Ma Here's my thoughts for your questions: >> 1. How to judge whether the Execution Vertex belongs to a slow task. If a slow task fails and gets restarted, it may not be a slow task anymore. Especially given that the nodes of the slow task may have been blacklisted and the new task will be deployed to a new node. I think we should again go through the slow task detection process to determine whether it is a slow task. I agree that it is not ideal to take another 59 mins to identify a slow task. To solve this problem, one idea is to introduce a slow task detection strategy which identifies slow tasks according to the throughput. This approach needs more thoughts and experiments so we now target it to a future time. >> 2. The fault tolerance strategy and the Slow task detection strategy are coupled I don't think the fault tolerance and slow task detecting are coupled. If a task fails while the ExecutionVertex still has a task in progress, there is no need to start new executions for the vertex in the perspective of fault tolerance. If the remaining task is slow, in the next slow task detecting, a speculative execution will be created and deployed for it. This, however, is a normal speculative execution process rather than a failure recovery process. In this way, the fault tolerance and slow task detecting work without knowing each other and the job can still recover from failures and guarantee there are speculative executions for slow tasks. >> 3. Default value of `slow-task-detector.execution-time.baseline-lower-bound` is too small >From what I see in production and knowing from users, there are many batch jobs of a relatively small scale (a few terabytes, hundreds of gigabytes). Tasks of these jobs can finish in minutes, so that a `1 min` lowbound is large enough. Besides that, I think the out-of-box experience is more important for users running small scale jobs. Thanks, Zhu Guowei Ma 于2022年4月28日周四 17:55写道: > Hi, zhu > > Many thanks to zhuzhu for initiating the FLIP discussion. Overall I think > it's ok, I just have 3 small questions > > 1. How to judge whether the Execution Vertex belongs to a slow task. > The current calculation method is: the current timestamp minus the > timestamp of the execution deployment. If the execution time of this > execution exceeds the baseline, then it is judged as a slow task. Normally > this is no problem. But if an execution fails, the time may not be > accurate. For example, the baseline is 59 minutes, and a task fails after > 56 minutes of execution. In the worst case, it may take an additional 59 > minutes to discover that the task is a slow task. > > 2. Speculative Scheduler's fault tolerance strategy. > The strategy in FLIP is: if the Execution Vertex can be executed, even if > the execution fails, the fault tolerance strategy will not be adopted. > Although currently `ExecutionTimeBasedSlowTaskDetector` can restart an > execution. But isn't this dependency a bit too strong? To some extent, the > fault tolerance strategy and the Slow task detection strategy are coupled > together. > > > 3. The value of the default configuration > IMHO, prediction execution should only be required for relatively > large-scale, very time-consuming and long-term jobs. > If `slow-task-detector.execution-time.baseline-lower-bound` is too small, > is it possible for the system to always start some additional tasks that > have little effect? In the end, the user needs to reset this default > configuration. Is it possible to consider a larger configuration. Of > course, this part is best to listen to the suggestions of other community > users. > > Best, > Guowei > > > On Thu, Apr 28, 2022 at 3:54 PM Jiangang Liu > wrote: > > > +1 for the feature. > > > > Mang Zhang 于2022年4月28日周四 11:36写道: > > > > > Hi zhu: > > > > > > > > > This sounds like a great job! Thanks for your great job. > > > In our company, there are already some jobs using Flink Batch, > > > but everyone knows that the offline cluster has a lot more load > than > > > the online cluster, and the failure rate of the machine is also much > > higher. > > > If this work is done, we'd love to use it, it's simply awesome for > > our > > > flink users. > > > thanks again! > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best regards, > > > Mang Zhang > > > > > > > > > > > > > > > > > > At 2022-04-27 10:46:06, "Zhu Zhu" wrote: > > > >Hi everyone, > > > > > > > >More and more users are running their batch jobs on Flink nowadays. > > > >One major problem they encounter is slow tasks running on hot/bad > > > >nodes, resulting in very long and uncontrollable execution time of > > > >batch jobs. This problem is a pain or even unacceptable in > > > >production. Many users have been asking for a solution for it. > > > > > > > >Therefore, I'd like to revive the discussion of speculative > > > >execution to solve this problem. > > > > > > > >Weijun Wang, Jing
[jira] [Created] (FLINK-27450) [JDK 11] Hive SessionState can't be initialized due to classloader problem
tartarus created FLINK-27450: Summary: [JDK 11] Hive SessionState can't be initialized due to classloader problem Key: FLINK-27450 URL: https://issues.apache.org/jira/browse/FLINK-27450 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.15.0, 1.13.1 Reporter: tartarus If we use jdk11 to run Hive related jobs, there will be a problem that the Hive SessionState cannot be initialized {code:java} java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader cannot be cast to class java.net.URLClassLoader (jdk.internal.loader.ClassLoaders$AppClassLoader and java.net.URLClassLoader are in module java.base of loader 'bootstrap') at org.apache.hadoop.hive.ql.session.SessionState.(SessionState.java:394) at org.apache.hadoop.hive.ql.session.SessionState.(SessionState.java:370) at org.apache.flink.table.planner.delegation.hive.HiveParser$HiveParserSessionState.(HiveParser.java:382) at org.apache.flink.table.planner.delegation.hive.HiveParser.startSessionState(HiveParser.java:306) at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:205) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695) at org.apache.flink.connectors.hive.HiveDialectQueryITCase.setup(HiveDialectQueryITCase.java:74) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) {code} Refer to [HIVE-21237|https://issues.apache.org/jira/browse/HIVE-21237], choose hive version 2.3.7 to support jdk11 We have been running in production for a long time -- This message was sent by Atlassian Jira (v8.20.7#820007)