[jira] [Created] (FLINK-27459) JsonJobGraphGenerationTest doesn't reset context environment

2022-04-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27459:


 Summary: JsonJobGraphGenerationTest doesn't reset context 
environment
 Key: FLINK-27459
 URL: https://issues.apache.org/jira/browse/FLINK-27459
 Project: Flink
  Issue Type: Technical Debt
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [VOTE] Apache Flink Table Store 0.1.0, release candidate #2

2022-04-29 Thread Jane Chan
Hi all,

+1 for the release (non-binding). The check follows the Jira release
note[1] and is listed as follows.

- Verify that the source distributions of [2] do not contain any
binaries;
- Build the source distribution to ensure all source files have Apache
headers, and test functionality against Maven Staged Artifacts under Java8
and Scala 2.12;
- Check README.md;
- Check file hashes(MD5 and SHA-1 for [3], SHA-512 for [2]) and GPG
signatures for [2] and [3] with gpg 2.3.6;

Best,
Jane Chan

[1]
https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
[2]
https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/
[3] https://repository.apache.org/content/repositories/orgapacheflink-1502/

On Fri, Apr 29, 2022 at 10:24 AM Jingsong Li  wrote:

> Hi everyone,
>
> Please review and vote on the release candidate #2 for the version 0.1.0 of
> Apache Flink Table Store, as follows:
>
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> **Release Overview**
>
> As an overview, the release consists of the following:
> a) Table Store canonical source distribution, to be deployed to the
> release repository at dist.apache.org
> b) Maven artifacts to be deployed to the Maven Central Repository
>
> **Staging Areas to Review**
>
> The staging areas containing the above mentioned artifacts are as follows,
> for your review:
> * All artifacts for a) and b) can be found in the corresponding dev
> repository at dist.apache.org [2]
> * All artifacts for c) can be found at the Apache Nexus Repository [3]
> * Pre Bundled Binaries Jar can work fine with quick start [4][5]
>
> All artifacts are signed with the key
> 2C2B6A653B07086B65E4369F7C76245E0A318150 [6]
>
> Other links for your review:
> * JIRA release notes [7]
> * source code tag "release-0.1.0-rc2" [8]
> * PR to update the website Downloads page to include Table Store
> links [9]
>
> **Vote Duration**
>
> The voting time will run for at least 72 hours.
> It is adopted by majority approval, with at least 3 PMC affirmative votes.
>
> Best,
> Jingsong Lee
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/
> [3]
> https://repository.apache.org/content/repositories/orgapacheflink-1502/
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1502/org/apache/flink/flink-table-store-dist/0.1.0/flink-table-store-dist-0.1.0.jar
> [5]
> https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/
> [6] https://dist.apache.org/repos/dist/release/flink/KEYS
> [7]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351234
> [8] https://github.com/apache/flink-table-store/tree/release-0.1.0-rc2
> [9] https://github.com/apache/flink-web/pull/531
>


Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-04-29 Thread Qingsheng Ren
Hi Alexander and Arvid,

Thanks for the discussion and sorry for my late response! We had an
internal discussion together with Jark and Leonard and I’d like to
summarize our ideas. Instead of implementing the cache logic in the table
runtime layer or wrapping around the user-provided table function, we
prefer to introduce some new APIs extending TableFunction with these
concerns:

1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
proc_time”, because it couldn’t truly reflect the content of the lookup
table at the moment of querying. If users choose to enable caching on the
lookup table, they implicitly indicate that this breakage is acceptable in
exchange for the performance. So we prefer not to provide caching on the
table runtime level.

2. If we make the cache implementation in the framework (whether in a
runner or a wrapper around TableFunction), we have to confront a situation
that allows table options in DDL to control the behavior of the framework,
which has never happened previously and should be cautious. Under the
current design the behavior of the framework should only be specified by
configurations (“table.exec.xxx”), and it’s hard to apply these general
configs to a specific table.

3. We have use cases that lookup source loads and refresh all records
periodically into the memory to achieve high lookup performance (like Hive
connector in the community, and also widely used by our internal
connectors). Wrapping the cache around the user’s TableFunction works fine
for LRU caches, but I think we have to introduce a new interface for this
all-caching scenario and the design would become more complex.

4. Providing the cache in the framework might introduce compatibility
issues to existing lookup sources like there might exist two caches with
totally different strategies if the user incorrectly configures the table
(one in the framework and another implemented by the lookup source).

As for the optimization mentioned by Alexander, I think filters and
projections should be pushed all the way down to the table function, like
what we do in the scan source, instead of the runner with the cache. The
goal of using cache is to reduce the network I/O and pressure on the
external system, and only applying these optimizations to the cache seems
not quite useful.

I made some updates to the FLIP[1] to reflect our ideas. We prefer to keep
the cache implementation as a part of TableFunction, and we could provide
some helper classes (CachingTableFunction, AllCachingTableFunction,
CachingAsyncTableFunction) to developers and regulate metrics of the cache.
Also, I made a POC[2] for your reference.

Looking forward to your ideas!

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
[2] https://github.com/PatrickRen/flink/tree/FLIP-221

Best regards,

Qingsheng

On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов 
wrote:

> Thanks for the response, Arvid!
>
> I have few comments on your message.
>
> > but could also live with an easier solution as the first step:
>
> I think that these 2 ways are mutually exclusive (originally proposed
> by Qingsheng and mine), because conceptually they follow the same
> goal, but implementation details are different. If we will go one way,
> moving to another way in the future will mean deleting existing code
> and once again changing the API for connectors. So I think we should
> reach a consensus with the community about that and then work together
> on this FLIP, i.e. divide the work on tasks for different parts of the
> flip (for example, LRU cache unification / introducing proposed set of
> metrics / further work…). WDYT, Qingsheng?
>
> > as the source will only receive the requests after filter
>
> Actually if filters are applied to fields of the lookup table, we
> firstly must do requests, and only after that we can filter responses,
> because lookup connectors don't have filter pushdown. So if filtering
> is done before caching, there will be much less rows in cache.
>
> > @Alexander unfortunately, your architecture is not shared. I don't know
> the
>
> > solution to share images to be honest.
>
> Sorry for that, I’m a bit new to such kinds of conversations :)
> I have no write access to the confluence, so I made a Jira issue,
> where described the proposed changes in more details -
> https://issues.apache.org/jira/browse/FLINK-27411.
>
> Will happy to get more feedback!
>
> Best,
> Smirnov Alexander
>
> пн, 25 апр. 2022 г. в 19:49, Arvid Heise :
> >
> > Hi Qingsheng,
> >
> > Thanks for driving this; the inconsistency was not satisfying for me.
> >
> > I second Alexander's idea though but could also live with an easier
> > solution as the first step: Instead of making caching an implementation
> > detail of TableFunction X, rather devise a caching layer around X. So the
> > proposal would be a CachingTableFunction that delegates to X in case of
> > misses and else manages the cache. Lifting it into the 

[jira] [Created] (FLINK-27458) Expose allowNonRestoredState flag in JobSpec

2022-04-29 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-27458:
--

 Summary: Expose allowNonRestoredState flag in JobSpec
 Key: FLINK-27458
 URL: https://issues.apache.org/jira/browse/FLINK-27458
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.0.0


We should probably expose this option as a top level spec field otherwise it is 
impossible to set this on a per job level for SessionJobs.

What do you think [~aitozi] [~wangyang0918] 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27457) CassandraOutputFormats should support flush

2022-04-29 Thread Etienne Chauchot (Jira)
Etienne Chauchot created FLINK-27457:


 Summary: CassandraOutputFormats should support flush
 Key: FLINK-27457
 URL: https://issues.apache.org/jira/browse/FLINK-27457
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Cassandra
Reporter: Etienne Chauchot


When migrating to Cassandra 4.x in [this 
PR|https://github.com/apache/flink/pull/19586] a race condition in the tests 
between the asynchronous writes and the junit assertions was uncovered. So it 
was decided to introduce the flush mechanism to asynchronous writes in the 
Cassandra output formats similarly to what was done in Cassandra sinks.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-04-29 Thread rui fan
Let me add some information about the LegacySource.

If we want to disable the overdraft buffer for LegacySource.
Could we add the enableOverdraft in LocalBufferPool?
The default value is false. If the getAvailableFuture is called,
change enableOverdraft=true. It indicates whether there are
checks isAvailable elsewhere.

I don't think it is elegant, but it's safe. Please correct me if I'm wrong.

Thanks
fanrui

On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> wrote:

> Hi,
>
> Thanks for your quick response.
>
> For question 1/2/3, we think they are clear. We just need to discuss the
> default value in PR.
>
> For the legacy source, you are right. It's difficult for general
> implementation.
> Currently, we implement ensureRecordWriterIsAvailable() in
> SourceFunction.SourceContext. And call it in our common LegacySource,
> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so
> fixing FlinkKafkaConsumer solved most of our problems.
>
> Core code:
> ```
> public void ensureRecordWriterIsAvailable() {
>  if (recordWriter == null
>   ||
> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
> false)
>   || recordWriter.isAvailable()) {
>   return;
>  }
>
>  CompletableFuture resumeFuture = recordWriter.getAvailableFuture();
>  try {
>   resumeFuture.get();
>  } catch (Throwable ignored) {
>  }
> }
> ```
>
> LegacySource calls sourceContext.ensureRecordWriterIsAvailable()
> before synchronized (checkpointLock) and collects records.
> Please let me know if there is a better solution.
>
> Thanks
> fanrui
>
> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov 
> wrote:
>
>> Hi.
>>
>> -- 1. Do you mean split this into two JIRAs or two PRs or two commits in a
>> PR?
>>
>> Perhaps, the separated ticket will be better since this task has fewer
>> questions but we should find a solution for LegacySource first.
>>
>> --  2. For the first task, if the flink user disables the Unaligned
>> Checkpoint, do we ignore max buffers per channel? Because the
>> overdraft
>> isn't useful for the Aligned Checkpoint, it still needs to wait for
>> downstream Task to consume.
>>
>> I think that the logic should be the same for AC and UC. As I understand,
>> the overdraft maybe is not really helpful for AC but it doesn't make it
>> worse as well.
>>
>>   3. For the second task
>> --  - The default value of maxOverdraftBuffersPerPartition may also
>> need
>>to be discussed.
>>
>> I think it should be a pretty small value or even 0 since it kind of
>> optimization and user should understand what they do(especially if we
>> implement the first task).
>>
>> --  - If the user disables the Unaligned Checkpoint, can we set the
>>maxOverdraftBuffersPerPartition=0? Because the overdraft isn't
>> useful for
>>the Aligned Checkpoint.
>>
>> The same answer that above, if the overdraft doesn't make degradation for
>> the Aligned Checkpoint I don't think that we should make difference between
>> AC and UC.
>>
>> 4. For the legacy source
>> --  - If enabling the Unaligned Checkpoint, it uses up to
>>maxOverdraftBuffersPerPartition buffers.
>>- If disabling the UC, it doesn't use the overdraft buffer.
>>- Do you think it's ok?
>>
>> Ideally, I don't want to use overdraft for LegacySource at all since it
>> can lead to undesirable results especially if the limit is high. At least,
>> as I understand, it will always work in overdraft mode and it will borrow
>> maxOverdraftBuffersPerPartition buffers from the global pool which can lead
>> to degradation of other subtasks on the same TaskManager.
>>
>> --  - Actually, we added the checkAvailable logic for LegacySource in
>> our
>>internal version. It works well.
>>
>> I don't really understand how it is possible for general case considering
>> that each user has their own implementation of LegacySourceOperator
>>
>> --   5. For the benchmark, do you have any suggestions? I submitted the PR
>> [1].
>>
>> I haven't looked at it yet, but I'll try to do it soon.
>>
>>
>> 29.04.2022 14:14, rui fan пишет:
>> > Hi,
>> >
>> > Thanks for your feedback. I have a servel of questions.
>> >
>> > 1. Do you mean split this into two JIRAs or two PRs or two commits
>> in a
>> > PR?
>> > 2. For the first task, if the flink user disables the Unaligned
>> > Checkpoint, do we ignore max buffers per channel? Because the
>> overdraft
>> > isn't useful for the Aligned Checkpoint, it still needs to wait for
>> > downstream Task to consume.
>> > 3. For the second task
>> >- The default value of maxOverdraftBuffersPerPartition may also
>> need
>> >to be discussed.
>> >- If the user disables the Unaligned Checkpoint, can we set the
>> >maxOverdraftBuffersPerPartition=0? Because the overdraft isn't
>> useful for
>> >the Aligned Checkpoint.
>> > 4. For the legacy 

Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-04-29 Thread rui fan
Hi,

Thanks for your quick response.

For question 1/2/3, we think they are clear. We just need to discuss the
default value in PR.

For the legacy source, you are right. It's difficult for general
implementation.
Currently, we implement ensureRecordWriterIsAvailable() in
SourceFunction.SourceContext. And call it in our common LegacySource,
e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so
fixing FlinkKafkaConsumer solved most of our problems.

Core code:
```
public void ensureRecordWriterIsAvailable() {
 if (recordWriter == null
  ||
!configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
false)
  || recordWriter.isAvailable()) {
  return;
 }

 CompletableFuture resumeFuture = recordWriter.getAvailableFuture();
 try {
  resumeFuture.get();
 } catch (Throwable ignored) {
 }
}
```

LegacySource calls sourceContext.ensureRecordWriterIsAvailable()
before synchronized (checkpointLock) and collects records.
Please let me know if there is a better solution.

Thanks
fanrui

On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov 
wrote:

> Hi.
>
> -- 1. Do you mean split this into two JIRAs or two PRs or two commits in a
> PR?
>
> Perhaps, the separated ticket will be better since this task has fewer
> questions but we should find a solution for LegacySource first.
>
> --  2. For the first task, if the flink user disables the Unaligned
> Checkpoint, do we ignore max buffers per channel? Because the overdraft
> isn't useful for the Aligned Checkpoint, it still needs to wait for
> downstream Task to consume.
>
> I think that the logic should be the same for AC and UC. As I understand,
> the overdraft maybe is not really helpful for AC but it doesn't make it
> worse as well.
>
>   3. For the second task
> --  - The default value of maxOverdraftBuffersPerPartition may also
> need
>to be discussed.
>
> I think it should be a pretty small value or even 0 since it kind of
> optimization and user should understand what they do(especially if we
> implement the first task).
>
> --  - If the user disables the Unaligned Checkpoint, can we set the
>maxOverdraftBuffersPerPartition=0? Because the overdraft isn't
> useful for
>the Aligned Checkpoint.
>
> The same answer that above, if the overdraft doesn't make degradation for
> the Aligned Checkpoint I don't think that we should make difference between
> AC and UC.
>
> 4. For the legacy source
> --  - If enabling the Unaligned Checkpoint, it uses up to
>maxOverdraftBuffersPerPartition buffers.
>- If disabling the UC, it doesn't use the overdraft buffer.
>- Do you think it's ok?
>
> Ideally, I don't want to use overdraft for LegacySource at all since it
> can lead to undesirable results especially if the limit is high. At least,
> as I understand, it will always work in overdraft mode and it will borrow
> maxOverdraftBuffersPerPartition buffers from the global pool which can lead
> to degradation of other subtasks on the same TaskManager.
>
> --  - Actually, we added the checkAvailable logic for LegacySource in
> our
>internal version. It works well.
>
> I don't really understand how it is possible for general case considering
> that each user has their own implementation of LegacySourceOperator
>
> --   5. For the benchmark, do you have any suggestions? I submitted the PR
> [1].
>
> I haven't looked at it yet, but I'll try to do it soon.
>
>
> 29.04.2022 14:14, rui fan пишет:
> > Hi,
> >
> > Thanks for your feedback. I have a servel of questions.
> >
> > 1. Do you mean split this into two JIRAs or two PRs or two commits
> in a
> > PR?
> > 2. For the first task, if the flink user disables the Unaligned
> > Checkpoint, do we ignore max buffers per channel? Because the
> overdraft
> > isn't useful for the Aligned Checkpoint, it still needs to wait for
> > downstream Task to consume.
> > 3. For the second task
> >- The default value of maxOverdraftBuffersPerPartition may also
> need
> >to be discussed.
> >- If the user disables the Unaligned Checkpoint, can we set the
> >maxOverdraftBuffersPerPartition=0? Because the overdraft isn't
> useful for
> >the Aligned Checkpoint.
> > 4. For the legacy source
> >- If enabling the Unaligned Checkpoint, it uses up to
> >maxOverdraftBuffersPerPartition buffers.
> >- If disabling the UC, it doesn't use the overdraft buffer.
> >- Do you think it's ok?
> >- Actually, we added the checkAvailable logic for LegacySource in
> our
> >internal version. It works well.
> > 5. For the benchmark, do you have any suggestions? I submitted the PR
> > [1].
> >
> > [1] https://github.com/apache/flink-benchmarks/pull/54
> >
> > Thanks
> > fanrui
> >
> > On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov 
> > wrote:
> >
> >> Hi,
> >>
> >> We discuss about 

Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-04-29 Thread Anton Kalashnikov

Hi.

-- 1. Do you mean split this into two JIRAs or two PRs or two commits in a
   PR?

Perhaps, the separated ticket will be better since this task has fewer 
questions but we should find a solution for LegacySource first.

--  2. For the first task, if the flink user disables the Unaligned
   Checkpoint, do we ignore max buffers per channel? Because the overdraft
   isn't useful for the Aligned Checkpoint, it still needs to wait for
   downstream Task to consume.

I think that the logic should be the same for AC and UC. As I understand, the 
overdraft maybe is not really helpful for AC but it doesn't make it worse as 
well.

 3. For the second task
--  - The default value of maxOverdraftBuffersPerPartition may also need
  to be discussed.

I think it should be a pretty small value or even 0 since it kind of 
optimization and user should understand what they do(especially if we implement 
the first task).

--  - If the user disables the Unaligned Checkpoint, can we set the
  maxOverdraftBuffersPerPartition=0? Because the overdraft isn't useful for
  the Aligned Checkpoint.

The same answer that above, if the overdraft doesn't make degradation for the 
Aligned Checkpoint I don't think that we should make difference between AC and 
UC.

   4. For the legacy source
--  - If enabling the Unaligned Checkpoint, it uses up to
  maxOverdraftBuffersPerPartition buffers.
  - If disabling the UC, it doesn't use the overdraft buffer.
  - Do you think it's ok?

Ideally, I don't want to use overdraft for LegacySource at all since it can 
lead to undesirable results especially if the limit is high. At least, as I 
understand, it will always work in overdraft mode and it will borrow 
maxOverdraftBuffersPerPartition buffers from the global pool which can lead to 
degradation of other subtasks on the same TaskManager.

--  - Actually, we added the checkAvailable logic for LegacySource in our
  internal version. It works well.

I don't really understand how it is possible for general case considering that 
each user has their own implementation of LegacySourceOperator

--   5. For the benchmark, do you have any suggestions? I submitted the PR
   [1].

I haven't looked at it yet, but I'll try to do it soon.


29.04.2022 14:14, rui fan пишет:

Hi,

Thanks for your feedback. I have a servel of questions.

1. Do you mean split this into two JIRAs or two PRs or two commits in a
PR?
2. For the first task, if the flink user disables the Unaligned
Checkpoint, do we ignore max buffers per channel? Because the overdraft
isn't useful for the Aligned Checkpoint, it still needs to wait for
downstream Task to consume.
3. For the second task
   - The default value of maxOverdraftBuffersPerPartition may also need
   to be discussed.
   - If the user disables the Unaligned Checkpoint, can we set the
   maxOverdraftBuffersPerPartition=0? Because the overdraft isn't useful for
   the Aligned Checkpoint.
4. For the legacy source
   - If enabling the Unaligned Checkpoint, it uses up to
   maxOverdraftBuffersPerPartition buffers.
   - If disabling the UC, it doesn't use the overdraft buffer.
   - Do you think it's ok?
   - Actually, we added the checkAvailable logic for LegacySource in our
   internal version. It works well.
5. For the benchmark, do you have any suggestions? I submitted the PR
[1].

[1] https://github.com/apache/flink-benchmarks/pull/54

Thanks
fanrui

On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov 
wrote:


Hi,

We discuss about it a little with Dawid Wysakowicz. Here is some
conclusion:

First of all, let's split this into two tasks.

The first task is about ignoring max buffers per channel. This means if
we request a memory segment from LocalBufferPool and the
maxBuffersPerChannel is reached for this channel, we just ignore that
and continue to allocate buffer while LocalBufferPool has it(it is
actually not a overdraft).

The second task is about the real overdraft. I am pretty convinced now
that we, unfortunately, need configuration for limitation of overdraft
number(because it is not ok if one subtask allocates all buffers of one
TaskManager considering that several different jobs can be submitted on
this TaskManager). So idea is to have
maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool).
In this case, when a limit of buffers in LocalBufferPool is reached,
LocalBufferPool can request additionally from NetworkBufferPool up to
maxOverdraftBuffersPerPartition buffers.


But it is still not clear how to handle LegacySource since it actually
works as unlimited flatmap and it will always work in overdraft mode
which is not a target. So we still need to think about that.


   29.04.2022 11:11, rui fan пишет:

Hi Anton Kalashnikov,

I think you agree with we should limit the maximum number of overdraft
segments that each LocalBufferPool can apply for, right?

I prefer to hard code 

Re: [DISCUSS] Planning Flink 1.16

2022-04-29 Thread Becket Qin
Thanks for kicking off the topic, Konstantin and Chesnay.

Also thanks Martijn, Godfrey and Xingbo for volunteering to be the release
manager. Given that release 1.16 would likely be a beefy release with a
bunch of major features already on their way, it might make sense to have
more release managers than we usually do. We can figure out how to
collaborate, e.g. splitting by modules / FLIPs. In case we need someone to
get some errands or coordination done, I am happy to help.

Also, +1 for feature freeze by the end of July with a potential 2-week
delay of contingency.

Cheers,

Jiangjie (Becket) Qin

On Fri, Apr 29, 2022 at 5:14 PM Xingbo Huang  wrote:

> Thanks Konstantin and Chesnay for starting the discussion. I'm also willing
> to volunteer as the release manager if this is still open.
>
> Regarding the feature freeze date, +1 to the end of mid August.
>
> Best,
> Xingbo
>
> Zhu Zhu  于2022年4月29日周五 11:01写道:
>
> > +1 for a 5 months release cycle.
> > +1 target the feature freeze date 1.5 months before the release date. It
> > can better guard the release date.
> > Therefore, also +1 for mid August as the feature freeze date of 1.16
> >
> > Thanks,
> > Zhu
> >
> > Jark Wu  于2022年4月28日周四 22:24写道:
> >
> > > I'm also fine with the end of July for the feature freeze.
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 28 Apr 2022 at 21:00, Martijn Visser 
> > > wrote:
> > >
> > > > +1 for continuing to strive for a 5 months release cycle.
> > > >
> > > > And +1 to have the planned feature freeze mid August, which I would
> > > propose
> > > > to have happen on Monday the 15th of August 2022. I would also
> already
> > > > state that even though we know this is a holiday period, we should
> not
> > > > extend this deadline for that reason :)
> > > >
> > > > Best regards,
> > > >
> > > > Martijn Visser
> > > > https://twitter.com/MartijnVisser82
> > > > https://github.com/MartijnVisser
> > > >
> > > >
> > > > On Thu, 28 Apr 2022 at 14:37, Jingsong Li 
> > > wrote:
> > > >
> > > > > Thanks for the check.
> > > > >
> > > > > > Continue to strive for a 5 months release cycle and 1.5 months
> > before
> > > > to
> > > > > the desired release date
> > > > >
> > > > > Sounds good to me!
> > > > >
> > > > > Best,
> > > > > Jingsong
> > > > >
> > > > > On Thu, Apr 28, 2022 at 7:06 PM Konstantin Knauf <
> kna...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > thank you for the feedback so far. I've checked the length of the
> > > > > previous
> > > > > > last release cycles. Up until Flink 1.14, we've actually been
> > > > incredibly
> > > > > > good at maintaining a 5 months release cycle (see below).
> > > > Interestingly,
> > > > > > the community is officially targeting a 4 months release cycle
> [1].
> > > > > >
> > > > > > - 1.15.0 2022-05-01? (7 months, 2 days?)
> > > > > > - 1.14.0: 2021-09-29 (4 months, 26 days)
> > > > > > - 1.13.0: 2021-05-03 (4 months, 23 days)
> > > > > > - 1.12.0: 2020-12-10 (5 months, 3 days)
> > > > > > - 1.11.0: 2020-07-07 (4 months, 26 days)
> > > > > > - 1.10.0: 2020-02-11
> > > > > >
> > > > > > The 1.15 release cycle has took significantly longer. In my
> opinion
> > > we
> > > > > > should try to get back into the 5 months cadence with the next
> > > release.
> > > > > > Since empirically we always often end up moving the the feature
> > > freeze
> > > > > by a
> > > > > > week or two, and that we often need about a month for release
> > > testing &
> > > > > > stabilization and releasing, I don't think, we should move the
> > > planned
> > > > > > feature freeze to later than
> > > > > > *mid August. *
> > > > > > What do you think:
> > > > > > 1. Should we continue to strive for a 5 months release cycle (and
> > > > update
> > > > > > [1] accordingly)?
> > > > > > 2. Does it sound reasonable to target a feature freeze date,
> which
> > is
> > > > 1.5
> > > > > > months before to the desired release date?
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Konstantin
> > > > > >
> > > > > >  [1]
> > > > >
> > https://cwiki.apache.org/confluence/display/FLINK/Time-based+releases
> > > > > >
> > > > > > Am Do., 28. Apr. 2022 um 05:20 Uhr schrieb Jingsong Li <
> > > > > > jingsongl...@gmail.com>:
> > > > > >
> > > > > > > Thanks Konstantin and Chesnay for starting the discussion. And
> > > thanks
> > > > > > > Konstantin, Chesnay, Godfrey, Martijn for volunteering.
> > > > > > >
> > > > > > > The 1.16 release at the end of July means that there are
> > currently
> > > > > > > only 3 months to fully commit to 1.16 development, and I'm a
> > little
> > > > > > > concerned that this is too short a time frame, which may result
> > in
> > > > > > > some features only reaching a halfway decent state.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jingsong
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Apr 27, 2022 at 7:10 PM David Morávek  >
> > > > wrote:
> > > > > > > >
> > > > > > > > Thanks Konstantin and Chesnay for starting the 

Re: [VOTE] Apache Flink Table Store 0.1.0, release candidate #2

2022-04-29 Thread Konstantin Knauf
+1 (binding)

* mvn clean package - PASSED
* checked signatures & checksums of source artifacts - OK
* went through quick start - WORKS
* skimmed over NOTICE file - LOOKS GOOD

I also read over the announcement blog post. In my opinion, we could try to
motivate the project a bit better. What is the problem that
flink-table-store will solve? What is its relationship to Apache Flink
itself? Since this is the first release, I am sure a lot of users will ask
themselves these questions.





Am Fr., 29. Apr. 2022 um 11:49 Uhr schrieb Caizhi Weng :

> Hi all!
>
> +1 for the release (non-binding). I've tested the jar with a standalone
> cluster and SQL client.
>
>- Compiled the sources;
>- Run through quick start guide;
>- Test all supported data types;
>- Check that table store jar has no conflict with orc / avro format and
>kafka connector;
>- Check that both file store and log store works.
>
> I haven't checked the signatures as my gnupg version is old.
>
> Jingsong Li  于2022年4月29日周五 10:24写道:
>
> > Hi everyone,
> >
> > Please review and vote on the release candidate #2 for the version 0.1.0
> of
> > Apache Flink Table Store, as follows:
> >
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > **Release Overview**
> >
> > As an overview, the release consists of the following:
> > a) Table Store canonical source distribution, to be deployed to the
> > release repository at dist.apache.org
> > b) Maven artifacts to be deployed to the Maven Central Repository
> >
> > **Staging Areas to Review**
> >
> > The staging areas containing the above mentioned artifacts are as
> follows,
> > for your review:
> > * All artifacts for a) and b) can be found in the corresponding dev
> > repository at dist.apache.org [2]
> > * All artifacts for c) can be found at the Apache Nexus Repository [3]
> > * Pre Bundled Binaries Jar can work fine with quick start [4][5]
> >
> > All artifacts are signed with the key
> > 2C2B6A653B07086B65E4369F7C76245E0A318150 [6]
> >
> > Other links for your review:
> > * JIRA release notes [7]
> > * source code tag "release-0.1.0-rc2" [8]
> > * PR to update the website Downloads page to include Table Store
> > links [9]
> >
> > **Vote Duration**
> >
> > The voting time will run for at least 72 hours.
> > It is adopted by majority approval, with at least 3 PMC affirmative
> votes.
> >
> > Best,
> > Jingsong Lee
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> > [2]
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/
> > [3]
> > https://repository.apache.org/content/repositories/orgapacheflink-1502/
> > [4]
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1502/org/apache/flink/flink-table-store-dist/0.1.0/flink-table-store-dist-0.1.0.jar
> > [5]
> >
> https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/
> > [6] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [7]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351234
> > [8] https://github.com/apache/flink-table-store/tree/release-0.1.0-rc2
> > [9] https://github.com/apache/flink-web/pull/531
> >
>


-- 
https://twitter.com/snntrable
https://github.com/knaufk


Re: [DISCUSS] FLIP-220: Temporal State

2022-04-29 Thread Nico Kruber
Hi all,
Yun, David M, David A, and I had an offline discussion and talked through a 
couple of details that emerged from the discussion here. We believe, we have 
found a consensus on these points and would like to share our points for 
further feedback:

Let me try to get through the points that were opened in arbitrary order:


1. We want to offer a generic interface for sorted state, not just temporal 
state as proposed initially. We would like to...
a) ...offer a single new state type similar to what TemporalListState was 
offering (so not offering something like TemporalValueState to keep the API 
slim).
b) ...name it BinarySortedMultiMap with Java-Object keys and values 
(I'll go into the API further below) - the naming stresses on "Binary" because 
we want to make clear that this is what the sort will be based on (see below)
c) ...have our own state descriptor (BinarySortedMultiMapStateDescriptor) similar to MapStateDescriptor
d) ...require TypeSerializer implementations for the key to extend from 
LexicographicalTypeSerializer (details below)


2. LexicographicalTypeSerializer basically defines the sort order when 
retrieving values: it is based on the serialized binaries, comparing them one-
by-one in an unsigned fashion. For heap-based state backends, these 
serializers can also optionally define a Comparator that doesn't require 
serialization but needs to retain the same sort order. We would provide
implementations of the range-based operations that will iterate based on 
binary keys if this is not provided (by simply converting all relevant keys to 
their binary form and using an appropriate comparator).

```
public interface LexicographicalTypeSerializer extends TypeSerializer {
  default Optional> findComparator() {
return Optional>.empty()
  }
}
```


3. BinarySortedMultiMap should offer the following API

```
public class BinarySortedMultiMap extends State {
  void put(UK key, Collection) throws Exception;
  void add(UK key, UV value) throws Exception;

  Map.Entry entryAt(UK key) throws Exception;
  Map.Entry firstEntry() throws Exception;
  Map.Entry lastEntry() throws Exception;

  Iterable> readRange(UK fromKey, UK toKey) throws 
Exception;
  Iterable> readRangeUntil(UK endUserKey) throws Exception;
  Iterable> readRangeFrom(UK startUserKey) throws Exception;

  Iterable> clearRange(UK fromKey, UK toKey) throws 
Exception;
  Iterable> clearRangeUntil(UK endUserKey) throws Exception;
  Iterable> clearRangeFrom(UK startUserKey) throws 
Exception;
}
```

That's for the core of the points - following a few more things that came up 
and some arguments about the "why":

A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]?
-> We looked at various use cases and didn't find a strong need because you 
could always fall back to readRange*. In the interest of a slim API, we 
thought it would be best to start without these (we can always add them later)

A2) Should we support iterating backwards?
-> We haven't found a compelling use case that needs this. If you need it, at 
least for some (?) use cases, you could negate the sort order through the 
serializer and achieve the same thing (unless you need to walk in two 
directions). Let's rather start with a slim API.

A3) Lazy vs. eager iteration
-> Let's implement our iterators similarly to RocksDBMapIterator by eagerly 
retrieving a couple of values (defaults to 128 here) into a cache. This seems 
to be the best of both worlds without bloating the API

A4) ChangelogStateBackend
-> Since we require TypeSerializer implementations for the key and those know 
the length to serializer (from other requirements, e.g. in the network stack), 
it isn't affected by our change except for delegating the new operations to 
the underlying state backend.

A5) Defining the binary sort order as one-by-one with unsigned bytes is fine 
because it is a very common thing among storage systems. Should a different 
binary-based state backend require something else, there could be a mapping 
function translating between different definitions.

A6) How to handle Duplicates
-> We let the user handle this by storing a multi-map, i.e. multiple values 
for the (primary) sort key. If needed, users can sort these values manually. 
As long as we don't have a strong use case where this is not feasible, we 
don't need any implicit duplicate handling by the framework (Flink).

A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator
-> We propose to use readRange*** because that seems more explicit/intuitive 
in what this is doing.

A8) readRange*** with inclusive/exclusive switch
-> In the interest of a slim API, let's not provide that. The API above will 
interpret all keys as _inclusive_ and should a user need exclusive behaviour, 
they would in the worst case read one more entry - in most of the cases, 
however, this would be served from the cache anyway, so it's not much of a 
problem

A9) Why don't we want to provide a BinarySortedMap with value-like 

Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-04-29 Thread rui fan
Hi,

Thanks for your feedback. I have a servel of questions.

   1. Do you mean split this into two JIRAs or two PRs or two commits in a
   PR?
   2. For the first task, if the flink user disables the Unaligned
   Checkpoint, do we ignore max buffers per channel? Because the overdraft
   isn't useful for the Aligned Checkpoint, it still needs to wait for
   downstream Task to consume.
   3. For the second task
  - The default value of maxOverdraftBuffersPerPartition may also need
  to be discussed.
  - If the user disables the Unaligned Checkpoint, can we set the
  maxOverdraftBuffersPerPartition=0? Because the overdraft isn't useful for
  the Aligned Checkpoint.
   4. For the legacy source
  - If enabling the Unaligned Checkpoint, it uses up to
  maxOverdraftBuffersPerPartition buffers.
  - If disabling the UC, it doesn't use the overdraft buffer.
  - Do you think it's ok?
  - Actually, we added the checkAvailable logic for LegacySource in our
  internal version. It works well.
   5. For the benchmark, do you have any suggestions? I submitted the PR
   [1].

[1] https://github.com/apache/flink-benchmarks/pull/54

Thanks
fanrui

On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov 
wrote:

> Hi,
>
> We discuss about it a little with Dawid Wysakowicz. Here is some
> conclusion:
>
> First of all, let's split this into two tasks.
>
> The first task is about ignoring max buffers per channel. This means if
> we request a memory segment from LocalBufferPool and the
> maxBuffersPerChannel is reached for this channel, we just ignore that
> and continue to allocate buffer while LocalBufferPool has it(it is
> actually not a overdraft).
>
> The second task is about the real overdraft. I am pretty convinced now
> that we, unfortunately, need configuration for limitation of overdraft
> number(because it is not ok if one subtask allocates all buffers of one
> TaskManager considering that several different jobs can be submitted on
> this TaskManager). So idea is to have
> maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool).
> In this case, when a limit of buffers in LocalBufferPool is reached,
> LocalBufferPool can request additionally from NetworkBufferPool up to
> maxOverdraftBuffersPerPartition buffers.
>
>
> But it is still not clear how to handle LegacySource since it actually
> works as unlimited flatmap and it will always work in overdraft mode
> which is not a target. So we still need to think about that.
>
>
>   29.04.2022 11:11, rui fan пишет:
> > Hi Anton Kalashnikov,
> >
> > I think you agree with we should limit the maximum number of overdraft
> > segments that each LocalBufferPool can apply for, right?
> >
> > I prefer to hard code the maxOverdraftBuffers due to don't add the new
> > configuration. And I hope to hear more from the community.
> >
> > Best wishes
> > fanrui
> >
> > On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote:
> >
> >> Hi Anton Kalashnikov,
> >>
> >> Thanks for your very clear reply, I think you are totally right.
> >>
> >> The 'maxBuffersNumber - buffersInUseNumber' can be used as the
> >> overdraft buffer, it won't need the new buffer configuration.Flink users
> >> can turn up the maxBuffersNumber to control the overdraft buffer size.
> >>
> >> Also, I‘d like to add some information. For safety, we should limit the
> >> maximum number of overdraft segments that each LocalBufferPool
> >> can apply for.
> >>
> >> Why do we limit it?
> >> Some operators don't check the `recordWriter.isAvailable` during
> >> processing records, such as LegacySource. I have mentioned it in
> >> FLINK-26759 [1]. I'm not sure if there are other cases.
> >>
> >> If don't add the limitation, the LegacySource will use up all remaining
> >> memory in the NetworkBufferPool when the backpressure is severe.
> >>
> >> How to limit it?
> >> I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions`
> >> in the constructor of LocalBufferPool. The maxOverdraftBuffers is just
> >> for safety, and it should be enough for most flink jobs. Or we can set
> >> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle
> >> some jobs of low parallelism.
> >>
> >> Also if user don't enable the Unaligned Checkpoint, we can set
> >> maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because
> >> the overdraft isn't useful for the Aligned Checkpoint.
> >>
> >> Please correct me if I'm wrong. Thanks a lot.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-26759
> >>
> >> Best wishes
> >> fanrui
> >>
> >> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov 
> >> wrote:
> >>
> >>> Hi fanrui,
> >>>
> >>> Thanks for creating the FLIP.
> >>>
> >>> In general, I think the overdraft is good idea and it should help in
> >>> described above cases. Here are my thoughts about configuration:
> >>>
> >>> Please, correct me if I am wrong but as I understand right now we have
> >>> following calculation.
> >>>
> >>> 

Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-04-29 Thread Anton Kalashnikov

Hi,

We discuss about it a little with Dawid Wysakowicz. Here is some conclusion:

First of all, let's split this into two tasks.

The first task is about ignoring max buffers per channel. This means if 
we request a memory segment from LocalBufferPool and the 
maxBuffersPerChannel is reached for this channel, we just ignore that 
and continue to allocate buffer while LocalBufferPool has it(it is 
actually not a overdraft).


The second task is about the real overdraft. I am pretty convinced now 
that we, unfortunately, need configuration for limitation of overdraft 
number(because it is not ok if one subtask allocates all buffers of one 
TaskManager considering that several different jobs can be submitted on 
this TaskManager). So idea is to have 
maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool). 
In this case, when a limit of buffers in LocalBufferPool is reached, 
LocalBufferPool can request additionally from NetworkBufferPool up to 
maxOverdraftBuffersPerPartition buffers.



But it is still not clear how to handle LegacySource since it actually 
works as unlimited flatmap and it will always work in overdraft mode 
which is not a target. So we still need to think about that.



 29.04.2022 11:11, rui fan пишет:

Hi Anton Kalashnikov,

I think you agree with we should limit the maximum number of overdraft
segments that each LocalBufferPool can apply for, right?

I prefer to hard code the maxOverdraftBuffers due to don't add the new
configuration. And I hope to hear more from the community.

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote:


Hi Anton Kalashnikov,

Thanks for your very clear reply, I think you are totally right.

The 'maxBuffersNumber - buffersInUseNumber' can be used as the
overdraft buffer, it won't need the new buffer configuration.Flink users
can turn up the maxBuffersNumber to control the overdraft buffer size.

Also, I‘d like to add some information. For safety, we should limit the
maximum number of overdraft segments that each LocalBufferPool
can apply for.

Why do we limit it?
Some operators don't check the `recordWriter.isAvailable` during
processing records, such as LegacySource. I have mentioned it in
FLINK-26759 [1]. I'm not sure if there are other cases.

If don't add the limitation, the LegacySource will use up all remaining
memory in the NetworkBufferPool when the backpressure is severe.

How to limit it?
I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions`
in the constructor of LocalBufferPool. The maxOverdraftBuffers is just
for safety, and it should be enough for most flink jobs. Or we can set
`maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle
some jobs of low parallelism.

Also if user don't enable the Unaligned Checkpoint, we can set
maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because
the overdraft isn't useful for the Aligned Checkpoint.

Please correct me if I'm wrong. Thanks a lot.

[1] https://issues.apache.org/jira/browse/FLINK-26759

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov 
wrote:


Hi fanrui,

Thanks for creating the FLIP.

In general, I think the overdraft is good idea and it should help in
described above cases. Here are my thoughts about configuration:

Please, correct me if I am wrong but as I understand right now we have
following calculation.

maxBuffersNumber(per TaskManager) = Network memory(calculated via
taskmanager.memory.network.fraction, taskmanager.memory.network.min,
taskmanager.memory.network.max and total memory size) /
taskmanager.memory.segment-size.

requiredBuffersNumber(per TaskManager) = (exclusive buffers *
parallelism + floating buffers) * subtasks number in TaskManager

buffersInUseNumber = real number of buffers which used at current
moment(always <= requiredBuffersNumber)

Ideally requiredBuffersNumber should be equal to maxBuffersNumber which
allows Flink work predictibly. But if requiredBuffersNumber >
maxBuffersNumber sometimes it is also fine(but not good) since not all
required buffers really mandatory(e.g. it is ok if Flink can not
allocate floating buffers)

But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink
just never use these leftovers buffers(maxBuffersNumber -
requiredBuffersNumber). Which I propose to use. ( we can actualy use
even difference 'requiredBuffersNumber - buffersInUseNumber' since if
one TaskManager contains several operators including 'window' which can
temporally borrow buffers from the global pool).

My proposal, more specificaly(it relates only to requesting buffers
during processing single record while switching to unavalability between
records should be the same as we have it now):

* If one more buffer requested but maxBuffersPerChannel reached, then
just ignore this limitation and allocate this buffers from any
place(from LocalBufferPool if it has something yet otherwise from
NetworkBufferPool)

* If LocalBufferPool exceeds limit, then 

[jira] [Created] (FLINK-27456) mistake and confusion with CEP example in docs

2022-04-29 Thread David Anderson (Jira)
David Anderson created FLINK-27456:
--

 Summary: mistake and confusion with CEP example in docs
 Key: FLINK-27456
 URL: https://issues.apache.org/jira/browse/FLINK-27456
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Library / CEP
Affects Versions: 1.14.4
Reporter: David Anderson


[https://nightlies.apache.org/flink/flink-docs-master/docs/libs/cep/#contiguity-within-looping-patterns]

In the section of the docs on contiguity within looping patterns, what it says 
about strict contiguity for the given example is either incorrect or very 
confusing (or both). It doesn't help that the example code doesn't precisely 
match the scenario described in the text.

To study this, I implemented the example in the text and find it produces no 
output for strict contiguity (as I expected), which contradicts what the text 
says.
{code:java}
public class StreamingJob {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream events = env.fromElements("a", "b1", "d1", "b2", 
"d2", "b3", "c");

AfterMatchSkipStrategy skipStrategy = 
AfterMatchSkipStrategy.skipPastLastEvent();
Pattern pattern =
Pattern.begin("a", skipStrategy)
.where(
new SimpleCondition() {
@Override
public boolean filter(String element) 
throws Exception {
return element.startsWith("a");
}
})
.next("b+")
.where(
new SimpleCondition() {
@Override
public boolean filter(String element) 
throws Exception {
return element.startsWith("b");
}
})
.oneOrMore().consecutive()
.next("c")
.where(
new SimpleCondition() {
@Override
public boolean filter(String element) 
throws Exception {
return element.startsWith("c");
}
});

PatternStream patternStream = CEP.pattern(events, 
pattern).inProcessingTime();
patternStream.select(new SelectSegment()).addSink(new 
PrintSinkFunction<>(true));
env.execute();
}

public static class SelectSegment implements PatternSelectFunction {
public String select(Map> pattern) {
return String.join("", pattern.get("a"))
+ String.join("", pattern.get("b+"))
+ String.join("", pattern.get("c"));
}
}
}
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27455) [JUnit5 Migration] SnapshotMigrationTestBase

2022-04-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27455:


 Summary: [JUnit5 Migration] SnapshotMigrationTestBase
 Key: FLINK-27455
 URL: https://issues.apache.org/jira/browse/FLINK-27455
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[DISCUSS] FLIP-226: Introduce Schema Evolution on Table Store

2022-04-29 Thread Jingsong Li
Hi devs,

I want to start a discussion about Schema Evolution on the Flink Table
Store. [1]

In FLINK-21634, We plan to support many schema changes in Flink SQL.
But for the current Table Store, it may result in wrong data, unclear
evolutions.

In general, the user has these operations for schema:
- Add column: Adding a column to a table.
- Modify column type.
- Drop column: Drop a column.
- Rename column: For example, rename the "name_1" column to "name_2".

Another schema change is partition keys, the data is changing over
time, for example, a table with day partition, as the business
continues to grow, the new partition of the table by day will become
larger and the business wants to change to hourly partitions.

A simple approach is to rewrite all the existing data when modifying the schema.
But this expensive way is not acceptable to the user, so we need to
support and define it clearly.
Modifying the schema does not rewrite the existing data, when reading
the original data needs to evolve to the current schema.

Look forward to your feedback!

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store

Best,
Jingsong


Re: [VOTE] Apache Flink Table Store 0.1.0, release candidate #2

2022-04-29 Thread Caizhi Weng
Hi all!

+1 for the release (non-binding). I've tested the jar with a standalone
cluster and SQL client.

   - Compiled the sources;
   - Run through quick start guide;
   - Test all supported data types;
   - Check that table store jar has no conflict with orc / avro format and
   kafka connector;
   - Check that both file store and log store works.

I haven't checked the signatures as my gnupg version is old.

Jingsong Li  于2022年4月29日周五 10:24写道:

> Hi everyone,
>
> Please review and vote on the release candidate #2 for the version 0.1.0 of
> Apache Flink Table Store, as follows:
>
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> **Release Overview**
>
> As an overview, the release consists of the following:
> a) Table Store canonical source distribution, to be deployed to the
> release repository at dist.apache.org
> b) Maven artifacts to be deployed to the Maven Central Repository
>
> **Staging Areas to Review**
>
> The staging areas containing the above mentioned artifacts are as follows,
> for your review:
> * All artifacts for a) and b) can be found in the corresponding dev
> repository at dist.apache.org [2]
> * All artifacts for c) can be found at the Apache Nexus Repository [3]
> * Pre Bundled Binaries Jar can work fine with quick start [4][5]
>
> All artifacts are signed with the key
> 2C2B6A653B07086B65E4369F7C76245E0A318150 [6]
>
> Other links for your review:
> * JIRA release notes [7]
> * source code tag "release-0.1.0-rc2" [8]
> * PR to update the website Downloads page to include Table Store
> links [9]
>
> **Vote Duration**
>
> The voting time will run for at least 72 hours.
> It is adopted by majority approval, with at least 3 PMC affirmative votes.
>
> Best,
> Jingsong Lee
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.1.0-rc2/
> [3]
> https://repository.apache.org/content/repositories/orgapacheflink-1502/
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1502/org/apache/flink/flink-table-store-dist/0.1.0/flink-table-store-dist-0.1.0.jar
> [5]
> https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/docs/try-table-store/quick-start/
> [6] https://dist.apache.org/repos/dist/release/flink/KEYS
> [7]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351234
> [8] https://github.com/apache/flink-table-store/tree/release-0.1.0-rc2
> [9] https://github.com/apache/flink-web/pull/531
>


[jira] [Created] (FLINK-27454) Remove inheritance from TestBaseUtils

2022-04-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27454:


 Summary: Remove inheritance from TestBaseUtils
 Key: FLINK-27454
 URL: https://issues.apache.org/jira/browse/FLINK-27454
 Project: Flink
  Issue Type: Technical Debt
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


The TestBaseUtils is really just a collection of static methods. There's no 
real reason to inherit from it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27453) Cleanup TestBaseUtils

2022-04-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27453:


 Summary: Cleanup TestBaseUtils
 Key: FLINK-27453
 URL: https://issues.apache.org/jira/browse/FLINK-27453
 Project: Flink
  Issue Type: Technical Debt
  Components: Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


Contains a bunch of unused methods or ones that are only used internally but 
are public.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27452) Move Te

2022-04-29 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27452:


 Summary: Move Te
 Key: FLINK-27452
 URL: https://issues.apache.org/jira/browse/FLINK-27452
 Project: Flink
  Issue Type: Technical Debt
Reporter: Chesnay Schepler






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS)

2022-04-29 Thread yuxia
Thanks for for driving this work, it's to be a useful feature.
About the flip-218, I have some questions.

1: Does our CTAS syntax support specify target table's schema including column 
name and data type? I think it maybe a useful fature in case we want to change 
the data types in target table instead of always copy the source table's 
schema. It'll be more flexible with this feature.
Btw, MySQL's "CREATE TABLE ... SELECT Statement"[1] support this feature.

2: Seems it'll requre sink to implement an public interface to drop table, so 
what's the interface will look like? 

[1] https://dev.mysql.com/doc/refman/8.0/en/create-table-select.html

Best regards,
Yuxia

- 原始邮件 -
发件人: "Mang Zhang" 
收件人: "dev" 
发送时间: 星期四, 2022年 4 月 28日 下午 4:57:24
主题: [DISCUSS] FLIP-218: Support SELECT clause in CREATE TABLE(CTAS)

Hi, everyone


I would like to open a discussion for support select clause in CREATE 
TABLE(CTAS),
With the development of business and the enhancement of flink sql capabilities, 
queries become more and more complex.
Now the user needs to use the Create Table statement to create the target table 
first, and then execute the insert statement.
However, the target table may have many columns, which will bring a lot of work 
outside the business logic to the user.
At the same time, ensure that the schema of the created target table is 
consistent with the schema of the query result.
Using a CTAS syntax like Hive/Spark can greatly facilitate the user.



You can find more details in FLIP-218[1]. Looking forward to your feedback.



[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-218%3A+Support+SELECT+clause+in+CREATE+TABLE(CTAS)




--

Best regards,
Mang Zhang


Re: [DISCUSS] Planning Flink 1.16

2022-04-29 Thread Xingbo Huang
Thanks Konstantin and Chesnay for starting the discussion. I'm also willing
to volunteer as the release manager if this is still open.

Regarding the feature freeze date, +1 to the end of mid August.

Best,
Xingbo

Zhu Zhu  于2022年4月29日周五 11:01写道:

> +1 for a 5 months release cycle.
> +1 target the feature freeze date 1.5 months before the release date. It
> can better guard the release date.
> Therefore, also +1 for mid August as the feature freeze date of 1.16
>
> Thanks,
> Zhu
>
> Jark Wu  于2022年4月28日周四 22:24写道:
>
> > I'm also fine with the end of July for the feature freeze.
> >
> > Best,
> > Jark
> >
> > On Thu, 28 Apr 2022 at 21:00, Martijn Visser 
> > wrote:
> >
> > > +1 for continuing to strive for a 5 months release cycle.
> > >
> > > And +1 to have the planned feature freeze mid August, which I would
> > propose
> > > to have happen on Monday the 15th of August 2022. I would also already
> > > state that even though we know this is a holiday period, we should not
> > > extend this deadline for that reason :)
> > >
> > > Best regards,
> > >
> > > Martijn Visser
> > > https://twitter.com/MartijnVisser82
> > > https://github.com/MartijnVisser
> > >
> > >
> > > On Thu, 28 Apr 2022 at 14:37, Jingsong Li 
> > wrote:
> > >
> > > > Thanks for the check.
> > > >
> > > > > Continue to strive for a 5 months release cycle and 1.5 months
> before
> > > to
> > > > the desired release date
> > > >
> > > > Sounds good to me!
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Thu, Apr 28, 2022 at 7:06 PM Konstantin Knauf 
> > > > wrote:
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > thank you for the feedback so far. I've checked the length of the
> > > > previous
> > > > > last release cycles. Up until Flink 1.14, we've actually been
> > > incredibly
> > > > > good at maintaining a 5 months release cycle (see below).
> > > Interestingly,
> > > > > the community is officially targeting a 4 months release cycle [1].
> > > > >
> > > > > - 1.15.0 2022-05-01? (7 months, 2 days?)
> > > > > - 1.14.0: 2021-09-29 (4 months, 26 days)
> > > > > - 1.13.0: 2021-05-03 (4 months, 23 days)
> > > > > - 1.12.0: 2020-12-10 (5 months, 3 days)
> > > > > - 1.11.0: 2020-07-07 (4 months, 26 days)
> > > > > - 1.10.0: 2020-02-11
> > > > >
> > > > > The 1.15 release cycle has took significantly longer. In my opinion
> > we
> > > > > should try to get back into the 5 months cadence with the next
> > release.
> > > > > Since empirically we always often end up moving the the feature
> > freeze
> > > > by a
> > > > > week or two, and that we often need about a month for release
> > testing &
> > > > > stabilization and releasing, I don't think, we should move the
> > planned
> > > > > feature freeze to later than
> > > > > *mid August. *
> > > > > What do you think:
> > > > > 1. Should we continue to strive for a 5 months release cycle (and
> > > update
> > > > > [1] accordingly)?
> > > > > 2. Does it sound reasonable to target a feature freeze date, which
> is
> > > 1.5
> > > > > months before to the desired release date?
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Konstantin
> > > > >
> > > > >  [1]
> > > >
> https://cwiki.apache.org/confluence/display/FLINK/Time-based+releases
> > > > >
> > > > > Am Do., 28. Apr. 2022 um 05:20 Uhr schrieb Jingsong Li <
> > > > > jingsongl...@gmail.com>:
> > > > >
> > > > > > Thanks Konstantin and Chesnay for starting the discussion. And
> > thanks
> > > > > > Konstantin, Chesnay, Godfrey, Martijn for volunteering.
> > > > > >
> > > > > > The 1.16 release at the end of July means that there are
> currently
> > > > > > only 3 months to fully commit to 1.16 development, and I'm a
> little
> > > > > > concerned that this is too short a time frame, which may result
> in
> > > > > > some features only reaching a halfway decent state.
> > > > > >
> > > > > > Best,
> > > > > > Jingsong
> > > > > >
> > > > > >
> > > > > > On Wed, Apr 27, 2022 at 7:10 PM David Morávek 
> > > wrote:
> > > > > > >
> > > > > > > Thanks Konstantin and Chesnay for starting the discussion and
> > > > > > volunteering.
> > > > > > > The timeline proposal sounds reasonable :+1:
> > > > > > >
> > > > > > > Best,
> > > > > > > D.
> > > > > > >
> > > > > > > On Tue, Apr 26, 2022 at 1:37 PM Martijn Visser <
> > > > martijnvis...@apache.org
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > Thanks for starting this discussion. I would also volunteer
> to
> > > help
> > > > > > out as
> > > > > > > > a release manager for the 1.16 release.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Martijn Visser
> > > > > > > > https://twitter.com/MartijnVisser82
> > > > > > > > https://github.com/MartijnVisser
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, 26 Apr 2022 at 13:19, godfrey he <
> godfre...@gmail.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Konstantin & Chesnay,
> > > > > > > > >
> > > > > > > > > Thanks for driving 

Re: [DISCUSS] FLIP-227: Support overdraft buffer

2022-04-29 Thread rui fan
Hi Anton Kalashnikov,

I think you agree with we should limit the maximum number of overdraft
segments that each LocalBufferPool can apply for, right?

I prefer to hard code the maxOverdraftBuffers due to don't add the new
configuration. And I hope to hear more from the community.

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote:

> Hi Anton Kalashnikov,
>
> Thanks for your very clear reply, I think you are totally right.
>
> The 'maxBuffersNumber - buffersInUseNumber' can be used as the
> overdraft buffer, it won't need the new buffer configuration.Flink users
> can turn up the maxBuffersNumber to control the overdraft buffer size.
>
> Also, I‘d like to add some information. For safety, we should limit the
> maximum number of overdraft segments that each LocalBufferPool
> can apply for.
>
> Why do we limit it?
> Some operators don't check the `recordWriter.isAvailable` during
> processing records, such as LegacySource. I have mentioned it in
> FLINK-26759 [1]. I'm not sure if there are other cases.
>
> If don't add the limitation, the LegacySource will use up all remaining
> memory in the NetworkBufferPool when the backpressure is severe.
>
> How to limit it?
> I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions`
> in the constructor of LocalBufferPool. The maxOverdraftBuffers is just
> for safety, and it should be enough for most flink jobs. Or we can set
> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle
> some jobs of low parallelism.
>
> Also if user don't enable the Unaligned Checkpoint, we can set
> maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because
> the overdraft isn't useful for the Aligned Checkpoint.
>
> Please correct me if I'm wrong. Thanks a lot.
>
> [1] https://issues.apache.org/jira/browse/FLINK-26759
>
> Best wishes
> fanrui
>
> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov 
> wrote:
>
>> Hi fanrui,
>>
>> Thanks for creating the FLIP.
>>
>> In general, I think the overdraft is good idea and it should help in
>> described above cases. Here are my thoughts about configuration:
>>
>> Please, correct me if I am wrong but as I understand right now we have
>> following calculation.
>>
>> maxBuffersNumber(per TaskManager) = Network memory(calculated via
>> taskmanager.memory.network.fraction, taskmanager.memory.network.min,
>> taskmanager.memory.network.max and total memory size) /
>> taskmanager.memory.segment-size.
>>
>> requiredBuffersNumber(per TaskManager) = (exclusive buffers *
>> parallelism + floating buffers) * subtasks number in TaskManager
>>
>> buffersInUseNumber = real number of buffers which used at current
>> moment(always <= requiredBuffersNumber)
>>
>> Ideally requiredBuffersNumber should be equal to maxBuffersNumber which
>> allows Flink work predictibly. But if requiredBuffersNumber >
>> maxBuffersNumber sometimes it is also fine(but not good) since not all
>> required buffers really mandatory(e.g. it is ok if Flink can not
>> allocate floating buffers)
>>
>> But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink
>> just never use these leftovers buffers(maxBuffersNumber -
>> requiredBuffersNumber). Which I propose to use. ( we can actualy use
>> even difference 'requiredBuffersNumber - buffersInUseNumber' since if
>> one TaskManager contains several operators including 'window' which can
>> temporally borrow buffers from the global pool).
>>
>> My proposal, more specificaly(it relates only to requesting buffers
>> during processing single record while switching to unavalability between
>> records should be the same as we have it now):
>>
>> * If one more buffer requested but maxBuffersPerChannel reached, then
>> just ignore this limitation and allocate this buffers from any
>> place(from LocalBufferPool if it has something yet otherwise from
>> NetworkBufferPool)
>>
>> * If LocalBufferPool exceeds limit, then temporally allocate it from
>> NetworkBufferPool while it has something to allocate
>>
>>
>> Maybe I missed something and this solution won't work, but I like it
>> since on the one hand, it work from the scratch without any
>> configuration, on the other hand, it can be configuration by changing
>> proportion of maxBuffersNumber and requiredBuffersNumber.
>>
>> The last thing that I want to say, I don't really want to implement new
>> configuration since even now it is not clear how to correctly configure
>> network buffers with existing configuration and I don't want to
>> complicate it, especially if it will be possible to resolve the problem
>> automatically(as described above).
>>
>>
>> So is my understanding about network memory/buffers correct?
>>
>> --
>>
>> Best regards,
>> Anton Kalashnikov
>>
>> 27.04.2022 07:46, rui fan пишет:
>> > Hi everyone,
>> >
>> > Unaligned Checkpoint (FLIP-76 [1]) is a major feature of Flink. It
>> > effectively solves the problem of checkpoint timeout or slow
>> > checkpoint when backpressure is severe.
>> >
>> > We 

[jira] [Created] (FLINK-27451) Enable the validator plugin in webhook

2022-04-29 Thread Aitozi (Jira)
Aitozi created FLINK-27451:
--

 Summary: Enable the validator plugin in webhook
 Key: FLINK-27451
 URL: https://issues.apache.org/jira/browse/FLINK-27451
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.0.0
Reporter: Aitozi


Currently the validator plugin is only enable in the operator. I think it 
should also be enabled in webhook. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-224: Blacklist Mechanism

2022-04-29 Thread Yangze Guo
Thanks for driving this, Zhu and Lijie.

+1 for the overall proposal. Just share some cents here:

- Why do we need to expose
cluster.resource-blacklist.item.timeout-check-interval to the user?
I think the semantics of `cluster.resource-blacklist.item.timeout` is
sufficient for the user. How to guarantee the timeout mechanism is
Flink's internal implementation. I think it will be very confusing and
we do not need to expose it to users.

- ResourceManager can notify the exception of a task manager to
`BlacklistHandler` as well.
For example, the slot allocation might fail in case the target task
manager is busy or has a network jitter. I don't mean we need to cover
this case in this version, but we can also open a `notifyException` in
`ResourceManagerBlacklistHandler`.

- Before we sync the blocklist to ResourceManager, will the slot of a
blocked task manager continues to be released and allocated?

Best,
Yangze Guo

On Thu, Apr 28, 2022 at 3:11 PM Lijie Wang  wrote:
>
> Hi Konstantin,
>
> Thanks for your feedback. I will response your 4 remarks:
>
>
> 1) Thanks for reminding me of the controversy. I think “BlockList” is good
> enough, and I will change it in FLIP.
>
>
> 2) Your suggestion for the REST API is a good idea. Based on the above, I
> would change REST API as following:
>
> POST/GET /blocklist/nodes
>
> POST/GET /blocklist/taskmanagers
>
> DELETE /blocklist/node/
>
> DELETE /blocklist/taskmanager/
>
>
> 3) If a node is blocking/blocklisted, it means that all task managers on
> this node are blocklisted. All slots on these TMs are not available. This
> is actually a bit like TM losts, but these TMs are not really lost, they
> are in an unavailable status, and they are still registered in this flink
> cluster. They will be available again once the corresponding blocklist item
> is removed. This behavior is the same in active/non-active clusters.
> However in the active clusters, these TMs may be released due to idle
> timeouts.
>
>
> 4) For the item timeout, I prefer to keep it. The reasons are as following:
>
> a) The timeout will not affect users adding or removing items via REST API,
> and users can disable it by configuring it to Long.MAX_VALUE .
>
> b) Some node problems can recover after a period of time (such as machine
> hotspots), in which case users may prefer that Flink can do this
> automatically instead of requiring the user to do it manually.
>
>
> Best,
>
> Lijie
>
> Konstantin Knauf  于2022年4月27日周三 19:23写道:
>
> > Hi Lijie,
> >
> > I think, this makes sense and +1 to only support manually blocking
> > taskmanagers and nodes. Maybe the different strategies can also be
> > maintained outside of Apache Flink.
> >
> > A few remarks:
> >
> > 1) Can we use another term than "bla.cklist" due to the controversy around
> > the term? [1] There was also a Jira Ticket about this topic a while back
> > and there was generally a consensus to avoid the term blacklist & whitelist
> > [2]? We could use "blocklist" "denylist" or "quarantined"
> > 2) For the REST API, I'd prefer a slightly different design as verbs like
> > add/remove often considered an anti-pattern for REST APIs. POST on a list
> > item is generally the standard to add items. DELETE on the individual
> > resource is standard to remove an item.
> >
> > POST /quarantine/items
> > DELETE /quarantine/items/
> >
> > We could also consider to separate taskmanagers and nodes in the REST API
> > (and internal data structures). Any opinion on this?
> >
> > POST/GET /quarantine/nodes
> > POST/GET /quarantine/taskmanager
> > DELETE /quarantine/nodes/
> > DELETE /quarantine/taskmanager/
> >
> > 3) How would blocking nodes behave with non-active resource managers, i.e.
> > standalone or reactive mode?
> >
> > 4) To keep the implementation even more minimal, do we need the timeout
> > behavior? If items are added/removed manually we could delegate this to the
> > user easily. In my opinion the timeout behavior would better fit into
> > specific strategies at a later point.
> >
> > Looking forward to your thoughts.
> >
> > Cheers and thank you,
> >
> > Konstantin
> >
> > [1]
> >
> > https://en.wikipedia.org/wiki/Blacklist_(computing)#Controversy_over_use_of_the_term
> > [2] https://issues.apache.org/jira/browse/FLINK-18209
> >
> > Am Mi., 27. Apr. 2022 um 04:04 Uhr schrieb Lijie Wang <
> > wangdachui9...@gmail.com>:
> >
> > > Hi all,
> > >
> > > Flink job failures may happen due to cluster node issues (insufficient
> > disk
> > > space, bad hardware, network abnormalities). Flink will take care of the
> > > failures and redeploy the tasks. However, due to data locality and
> > limited
> > > resources, the new tasks are very likely to be redeployed to the same
> > > nodes, which will result in continuous task abnormalities and affect job
> > > progress.
> > >
> > > Currently, Flink users need to manually identify the problematic node and
> > > take it offline to solve this problem. But this approach has following
> > > disadvantages:
> > >
> > > 1. 

Re: [DISCUSS] FLIP-222: Support full query lifecycle statements in SQL client

2022-04-29 Thread Shengkai Fang
Hi.

Thanks for Paul's update.

> It's better we can also get the infos about the cluster where the job is
> running through the DESCRIBE statement.

I just wonder how the users can get the web ui in the application mode.
Therefore, it's better we can list the Web UI using the SHOW statement.
WDYT?


> QUERY or other keywords.

I list the statement to manage the lifecycle of the query/dml in other
systems:

Mysql[1] allows users to SHOW [FULL] PROCESSLIST and use the KILL command
to kill the query.

```
mysql> SHOW PROCESSLIST;

mysql> KILL 27;
```


Postgres use the following statements to kill the queries.

```
SELECT pg_cancel_backend()

SELECT pg_terminate_backend()
```

KSQL uses the following commands to control the query lifecycle[4].

```
SHOW QUERIES;

TERMINATE ;

```

[1] https://dev.mysql.com/doc/refman/8.0/en/show-processlist.html
[2] https://scaledynamix.com/blog/how-to-kill-mysql-queries/
[3]
https://stackoverflow.com/questions/35319597/how-to-stop-kill-a-query-in-postgresql
[4]
https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/show-queries/
[5]
https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/terminate/

After the investigation, I am fine with the QUERY but the keyword JOB is
also okay to me.

We also have two questions here.

1. Could you add some details about the behaviour with the different
execution.target, e.g. session, application mode?

2. Considering the SQL Client/Gateway is not limited to submitting the job
to the specified cluster, is it able to list jobs in the other clusters?


Best,
Shengkai

Paul Lam  于2022年4月28日周四 17:17写道:

> Hi Martjin,
>
> Thanks a lot for your reply! I agree that the scope may be a bit confusing,
> please let me clarify.
>
> The FLIP aims to add new SQL statements that are supported only in
> sql-client, similar to
> jar statements [1]. Jar statements can be parsed into jar operations, which
> are used only in
> CliClient in sql-client module and cannot be executed by TableEnvironment
> (not available in
> Table API program that contains SQL that you mentioned).
>
> WRT the unchanged CLI client, I mean CliClient instead of the sql-client
> module, which
> currently contains the gateway codes (e.g. Executor). The FLIP mainly
> extends
> the gateway part, and barely touches CliClient and REST server (REST
> endpoint in FLIP-91).
>
> WRT the syntax, I don't have much experience with SQL standards, and I'd
> like to hear
> more opinions from the community. I prefer Hive-style syntax because I
> think many users
> are familiar with Hive, and there're on-going efforts to improve Flink-Hive
> integration [2][3].
> But my preference is not strong, I'm okay with other options too. Do you
> think JOB/Task is
> a good choice, or do you have other preferred keywords?
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/jar/
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-223%3A+Support+HiveServer2+Endpoint
>
> Best,
> Paul Lam
>
> Martijn Visser  于2022年4月26日周二 20:14写道:
>
> > Hi Paul,
> >
> > Thanks for creating the FLIP and opening the discussion. I did get a bit
> > confused about the title, being "query lifecycle statements in SQL
> client".
> > This sounds like you want to adopt the SQL client, but you want to expand
> > the SQL syntax with lifecycle statements, which could be used from the
> SQL
> > client, but of course also in a Table API program that contains SQL.
> GIven
> > that you're highlighting the CLI client as unchanged, this adds to more
> > confusion.
> >
> > I am interested if there's anything listed in the SQL 2016 standard on
> > these types of lifecycle statements. I did a quick scan for "SHOW
> QUERIES"
> > but couldn't find it. It would be great if we could stay as close as
> > possible to such syntax. Overall I'm not in favour of using QUERIES as a
> > keyword. I think Flink applications are not queries, but short- or long
> > running applications. Why should we follow Hive's setup and indeed not
> > others such as Snowflake, but also Postgres or MySQL?
> >
> > Best regards,
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> > https://github.com/MartijnVisser
> >
> >
> > On Fri, 22 Apr 2022 at 12:06, Paul Lam  wrote:
> >
> > > Hi Shengkai,
> > >
> > > Thanks a lot for your opinions!
> > >
> > > > 1. I think the keyword QUERY may confuse users because the statement
> > also
> > > > works for the DML statement.
> > >
> > > I slightly lean to QUERY, because:
> > >
> > > Hive calls DMLs queries. We could be better aligned with Hive using
> > QUERY,
> > > especially given that we plan to introduce Hive endpoint.
> > > QUERY is a more SQL-like concept and friendly to SQL users.
> > >
> > > In general, my preference: QUERY > JOB > TASK. I’m okay with JOB, but
> not
> > > very good with TASK, as it conflicts with the task concept in Flink
> > 

Re: [DISCUSS] Keep the Community Member Information Up-to-Date

2022-04-29 Thread Yun Tang
Hi Xintong,


+1 to add a link for the full member information.

And I think the avatars are very friendly for website viewers. The lazy update 
might be caused by the invitation letter to new committer/PMC do not have such 
hint to tell them to update the website.
If we can:

  1.  Manually update the information on the website if someone voluntary, we 
can use the avatars from the github account by default.
  2.  Add such reminder information in the invitation letters.
  3.  Add descriptions in the website to tell viewers that the information 
might not be up-to-date.

I think this looks like a better solution.

Best
Yun Tang

From: tison 
Sent: Thursday, April 28, 2022 21:52
To: dev 
Subject: Re: [DISCUSS] Keep the Community Member Information Up-to-Date

Hi Xintong,

Thanks for starting this discussion.

+0 to replace the information with link to
https://projects.apache.org/committee.html?flink.
+1 to add such a link.

My opinion is that we the community doesn't have to keep the page up to
date since Apache has a member page[1]
that isn't up to date also.

We can add one line to redirect to the whole list so that those who are
"lazy" to add themselves on the page
don't have to do it. And keep the table so that those who are proud to
announce their membership or trying a commit
with their commit access can do.

Best,
tison.

[1] https://www.apache.org/foundation/members.html


Xintong Song  于2022年4月28日周四 21:26写道:

> >
> > Personally I'm tempted to just link to
> > https://projects.apache.org/committee.html?flink, if at all.
> >
>
> Despite its fashionless look, I'm thinking the same thing...
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Apr 28, 2022 at 8:41 PM Jingsong Li 
> wrote:
>
> > One value is that this page has avatars. :-)
> >
> > Best,
> > Jingsong
> >
> > On Thu, Apr 28, 2022 at 8:27 PM Chesnay Schepler 
> > wrote:
> > >
> > > Personally I'm tempted to just link to
> > > https://projects.apache.org/committee.html?flink, if at all.
> > >
> > > I'm not sure overall whether this listing really provides value in the
> > > first place.
> > >
> > > On 28/04/2022 13:58, Xintong Song wrote:
> > > > Hi Flink Committers & PMC members,
> > > >
> > > > I just noticed that the list of community members on our website [1]
> is
> > > > quite out-of-date. According to the ASF roster [2], this project
> > currently
> > > > has 87 committers, 39 of which are PMC members. However, there's only
> > 62
> > > > people listed on our website, and many (e.g., me) have outdated
> roles.
> > > >
> > > > I believe the list on the website is supposed to be updated by each
> new
> > > > committer / PMC member. I remember reading somewhere that suggested
> new
> > > > committers to add themselves to this list as the first trying-out for
> > > > merging changes. Unfortunately I couldn't find it anymore.
> > > >
> > > > Do you think we should keep the page manually updated, or shall we
> > > > investigate some way to keep it automatically synchronized?
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > > [1] https://flink.apache.org/community.html
> > > >
> > > > [2] https://whimsy.apache.org/roster/committee/flink
> > > >
> > >
> >
>


Re: Re: [DISCUSS] FLIP-168: Speculative execution for Batch Job

2022-04-29 Thread Zhu Zhu
Thank you for all the feedback!

@Guowei Ma
Here's my thoughts for your questions:
>> 1. How to judge whether the Execution Vertex belongs to a slow task.
If a slow task fails and gets restarted, it may not be a slow task
anymore. Especially given that the nodes of the slow task may have been
blacklisted and the new task will be deployed to a new node. I think we
should again go through the slow task detection process to determine
whether it is a slow task. I agree that it is not ideal to take another
59 mins to identify a slow task. To solve this problem, one idea is to
introduce a slow task detection strategy which identifies slow tasks
according to the throughput. This approach needs more thoughts and
experiments so we now target it to a future time.

>> 2. The fault tolerance strategy and the Slow task detection strategy are
coupled
I don't think the fault tolerance and slow task detecting are coupled.
If a task fails while the ExecutionVertex still has a task in progress,
there is no need to start new executions for the vertex in the perspective
of fault tolerance. If the remaining task is slow, in the next slow task
detecting, a speculative execution will be created and deployed for it.
This, however, is a normal speculative execution process rather than a
failure recovery process. In this way, the fault tolerance and slow task
detecting work without knowing each other and the job can still recover
from failures and guarantee there are speculative executions for slow tasks.

>> 3. Default value of
`slow-task-detector.execution-time.baseline-lower-bound` is too small
>From what I see in production and knowing from users, there are many
batch jobs of a relatively small scale (a few terabytes, hundreds of
gigabytes). Tasks of these jobs can finish in minutes, so that a
`1 min` lowbound is large enough. Besides that, I think the out-of-box
experience is more important for users running small scale jobs.

Thanks,
Zhu

Guowei Ma  于2022年4月28日周四 17:55写道:

> Hi, zhu
>
> Many thanks to zhuzhu for initiating the FLIP discussion. Overall I think
> it's ok, I just have 3 small questions
>
> 1. How to judge whether the Execution Vertex belongs to a slow task.
> The current calculation method is: the current timestamp minus the
> timestamp of the execution deployment. If the execution time of this
> execution exceeds the baseline, then it is judged as a slow task. Normally
> this is no problem. But if an execution fails, the time may not be
> accurate. For example, the baseline is 59 minutes, and a task fails after
> 56 minutes of execution. In the worst case, it may take an additional 59
> minutes to discover that the task is a slow task.
>
> 2. Speculative Scheduler's fault tolerance strategy.
> The strategy in FLIP is: if the Execution Vertex can be executed, even if
> the execution fails, the fault tolerance strategy will not be adopted.
> Although currently `ExecutionTimeBasedSlowTaskDetector` can restart an
> execution. But isn't this dependency a bit too strong? To some extent, the
> fault tolerance strategy and the Slow task detection strategy are coupled
> together.
>
>
> 3. The value of the default configuration
> IMHO, prediction execution should only be required for relatively
> large-scale, very time-consuming and long-term jobs.
> If `slow-task-detector.execution-time.baseline-lower-bound` is too small,
> is it possible for the system to always start some additional tasks that
> have little effect? In the end, the user needs to reset this default
> configuration. Is it possible to consider a larger configuration. Of
> course, this part is best to listen to the suggestions of other community
> users.
>
> Best,
> Guowei
>
>
> On Thu, Apr 28, 2022 at 3:54 PM Jiangang Liu 
> wrote:
>
> > +1 for the feature.
> >
> > Mang Zhang  于2022年4月28日周四 11:36写道:
> >
> > > Hi zhu:
> > >
> > >
> > > This sounds like a great job! Thanks for your great job.
> > > In our company, there are already some jobs using Flink Batch,
> > > but everyone knows that the offline cluster has a lot more load
> than
> > > the online cluster, and the failure rate of the machine is also much
> > higher.
> > > If this work is done, we'd love to use it, it's simply awesome for
> > our
> > > flink users.
> > > thanks again!
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Best regards,
> > > Mang Zhang
> > >
> > >
> > >
> > >
> > >
> > > At 2022-04-27 10:46:06, "Zhu Zhu"  wrote:
> > > >Hi everyone,
> > > >
> > > >More and more users are running their batch jobs on Flink nowadays.
> > > >One major problem they encounter is slow tasks running on hot/bad
> > > >nodes, resulting in very long and uncontrollable execution time of
> > > >batch jobs. This problem is a pain or even unacceptable in
> > > >production. Many users have been asking for a solution for it.
> > > >
> > > >Therefore, I'd like to revive the discussion of speculative
> > > >execution to solve this problem.
> > > >
> > > >Weijun Wang, Jing 

[jira] [Created] (FLINK-27450) [JDK 11] Hive SessionState can't be initialized due to classloader problem

2022-04-29 Thread tartarus (Jira)
tartarus created FLINK-27450:


 Summary: [JDK 11] Hive SessionState can't be initialized due to 
classloader problem
 Key: FLINK-27450
 URL: https://issues.apache.org/jira/browse/FLINK-27450
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.15.0, 1.13.1
Reporter: tartarus


If we use jdk11 to run Hive related jobs, there will be a problem that the Hive 
SessionState cannot be initialized

{code:java}
java.lang.ClassCastException: class 
jdk.internal.loader.ClassLoaders$AppClassLoader cannot be cast to class 
java.net.URLClassLoader (jdk.internal.loader.ClassLoaders$AppClassLoader and 
java.net.URLClassLoader are in module java.base of loader 'bootstrap')

at 
org.apache.hadoop.hive.ql.session.SessionState.(SessionState.java:394)
at 
org.apache.hadoop.hive.ql.session.SessionState.(SessionState.java:370)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser$HiveParserSessionState.(HiveParser.java:382)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.startSessionState(HiveParser.java:306)
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:205)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:695)
at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.setup(HiveDialectQueryITCase.java:74)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
{code}

Refer to [HIVE-21237|https://issues.apache.org/jira/browse/HIVE-21237], choose 
hive version 2.3.7 to support jdk11

We have been running in production for a long time




--
This message was sent by Atlassian Jira
(v8.20.7#820007)