[jira] [Created] (FLINK-35528) Skip execution of interruptible mails when yielding

2024-06-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35528:
--

 Summary: Skip execution of interruptible mails when yielding
 Key: FLINK-35528
 URL: https://issues.apache.org/jira/browse/FLINK-35528
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.20.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


When operators are yielding, for example waiting for async state access to 
complete before a checkpoint, it would be beneficial to not execute 
interruptible mails. Otherwise continuation mail for firing timers would be 
continuously re-enqeueed. To achieve that MailboxExecutor must be aware which 
mails are interruptible.

The easiest way to achieve this is to set MIN_PRIORITY for interruptible mails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[ANNOUNCE] New Apache Flink PMC Member - Fan Rui

2024-06-05 Thread Piotr Nowojski
Hi everyone,

On behalf of the PMC, I'm very happy to announce another new Apache Flink
PMC Member - Fan Rui.

Rui has been active in the community since August 2019. During this time he
has contributed a lot of new features. Among others:
  - Decoupling Autoscaler from Kubernetes Operator, and supporting
Standalone Autoscaler
  - Improvements to checkpointing, flamegraphs, restart strategies,
watermark alignment, network shuffles
  - Optimizing the memory and CPU usage of large operators, greatly
reducing the risk and probability of TaskManager OOM

He reviewed a significant amount of PRs and has been active both on the
mailing lists and in Jira helping to both maintain and grow Apache Flink's
community. He is also our current Flink 1.20 release manager.

In the last 12 months, Rui has been the most active contributor in the
Flink Kubernetes Operator project, while being the 2nd most active Flink
contributor at the same time.

Please join me in welcoming and congratulating Fan Rui!

Best,
Piotrek (on behalf of the Flink PMC)


[jira] [Created] (FLINK-35518) CI Bot doesn't run on PRs

2024-06-04 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35518:
--

 Summary: CI Bot doesn't run on PRs
 Key: FLINK-35518
 URL: https://issues.apache.org/jira/browse/FLINK-35518
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Piotr Nowojski


Doesn't want to run on my PR/branch. I was doing force-pushes, rebases, asking 
flink bot to run, closed and opened new PR, but nothing helped
https://github.com/apache/flink/pull/24868
https://github.com/apache/flink/pull/24883

I've heard others were having similar problems recently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-443: Interruptible watermark processing

2024-05-28 Thread Piotr Nowojski
Hi all!

I'm happy to announce that FLIP-443 [1] has been accepted [2]. There were 7
votes in favour, all binding:

- Zakelly
- Yanfei Lei
- Rui Fan
- Wejie Guo
- Martijn Visser
- Stefan Richter
- Piotr Nowojski

[1] https://cwiki.apache.org/confluence/x/qgn9EQ
[2] https://lists.apache.org/thread/tf4hrfy4o3flk0f4zqlrbln390n6w4yq


Re: [VOTE] FLIP-443: Interruptible watermark processing

2024-05-28 Thread Piotr Nowojski
+1 (binding) from my side as well.

Voting is closed. Thank you all for voting.

Best,
Piotrek

pon., 27 maj 2024 o 11:37 Stefan Richter 
napisał(a):

>
> +1 (binding)
>
>
>
> > On 24. May 2024, at 09:59, Martijn Visser 
> wrote:
> >
> > +1 (binding)
> >
> > On Fri, May 24, 2024 at 7:31 AM weijie guo  <mailto:guoweijieres...@gmail.com>>
> > wrote:
> >
> >> +1(binding)
> >>
> >> Thanks for driving this!
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >> Rui Fan <1996fan...@gmail.com> 于2024年5月24日周五 13:03写道:
> >>
> >>> +1(binding)
> >>>
> >>> Best,
> >>> Rui
> >>>
> >>> On Fri, May 24, 2024 at 12:01 PM Yanfei Lei 
> wrote:
> >>>
> >>>> Thanks for driving this!
> >>>>
> >>>> +1 (binding)
> >>>>
> >>>> Best,
> >>>> Yanfei
> >>>>
> >>>> Zakelly Lan  于2024年5月24日周五 10:13写道:
> >>>>
> >>>>>
> >>>>> +1 (binding)
> >>>>>
> >>>>> Best,
> >>>>> Zakelly
> >>>>>
> >>>>> On Thu, May 23, 2024 at 8:21 PM Piotr Nowojski  >>>
> >>>> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> After reaching what looks like a consensus in the discussion thread
> >>>> [1], I
> >>>>>> would like to put FLIP-443 [2] to the vote.
> >>>>>>
> >>>>>> The vote will be open for at least 72 hours unless there is an
> >>>> objection or
> >>>>>> insufficient votes.
> >>>>>>
> >>>>>> [1]
> >>
> https://www.google.com/url?q=https://lists.apache.org/thread/flxm7rphvfgqdn2gq2z0bb7kl007olpz=gmail-imap=171714246900=AOvVaw1sxqcTTJfXbE_qaBA0l1FH
> >>>>>> [2]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171714246900=AOvVaw3yQ55VLWPxkY2OHXf0k72Q
> >>>>>>
> >>>>>> Bets,
> >>>>>> Piotrek
>
>


[VOTE] FLIP-443: Interruptible watermark processing

2024-05-23 Thread Piotr Nowojski
Hi all,

After reaching what looks like a consensus in the discussion thread [1], I
would like to put FLIP-443 [2] to the vote.

The vote will be open for at least 72 hours unless there is an objection or
insufficient votes.

[1] https://lists.apache.org/thread/flxm7rphvfgqdn2gq2z0bb7kl007olpz
[2] https://cwiki.apache.org/confluence/x/qgn9EQ

Bets,
Piotrek


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-23 Thread Piotr Nowojski
Hi Zakelly,

I've thought about it a bit more, and I think only `#execute()` methods
make the most sense to be used when implementing operators (and
interruptible mails), so I will just add `MailOptions` parameters only to
them. If necessary, we can add more in the future.

I have updated the FLIP. If it looks good to you, I would start a voting
thread today/tomorrow.

Best,
Piotrek

czw., 23 maj 2024 o 09:00 Zakelly Lan  napisał(a):

> Hi Piotrek,
>
> Well, compared to this plan, I prefer your previous one, which is more in
> line with the intuition for executors' API, by calling `execute` directly.
> Before the variants get too much, I'd suggest we only do minimum change for
> only "interruptible".
>
> My original thinking is, doubling each method could result in a scenario
> where new methods lack callers. But like you said, for the sake of
> completeness, I could accept the doubling method plan.
>
>
> Thanks & Best,
> Zakelly
>
> On Wed, May 22, 2024 at 5:05 PM Piotr Nowojski 
> wrote:
>
> > Hi Zakelly,
> >
> > > I suggest not doubling the existing methods. Only providing the
> following
> > one is enough
> >
> > In that case I would prefer to have a complete set of the methods for the
> > sake of completeness. If the number of variants is/would be getting too
> > much, we could convert the class into a builder?
> >
> >
> >
> mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
> > %d").arg(42).submit();
> >
> > It could be done in both in the future, if we would ever need to add even
> > more methods, or I could do it now. WDYT?
> >
> > Best,
> > Piotrek
> >
> > śr., 22 maj 2024 o 08:48 Zakelly Lan  napisał(a):
> >
> > > Hi Piotrek,
> > >
> > > `MailOptions` looks good to me. I suggest not doubling the existing
> > > methods. Only providing the following one is enough:
> > >
> > > void execute(
> > > > MailOptions mailOptions,
> > > > ThrowingRunnable command,
> > > > String descriptionFormat,
> > > > Object... descriptionArgs);
> > >
> > >
> > > WDYT?
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > >
> > > On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Zakelly and others,
> > > >
> > > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and
> the
> > > > > continuation mail will return true. The FLIP-425 will leverage this
> > > queue
> > > > > to execute some state requests, and when the cp arrives, the
> operator
> > > may
> > > > > call `yield()` to drain. It may happen that the continuation mail
> is
> > > > called
> > > > > again in `yield()`. By checking `isInterruptable()`, we can skip
> this
> > > > mail
> > > > > and re-enqueue.
> > > >
> > > > Do you have some suggestions on how `isInterruptible` should be
> > defined?
> > > > Do we have to double the amount of methods in the `MailboxExecutor`,
> to
> > > > provide versions of the existing methods, that would enqueue
> > > > "interruptible"
> > > > versions of mails? Something like:
> > > >
> > > > default void execute(ThrowingRunnable
> command,
> > > > String description) {
> > > > execute(DEFAULT_OPTIONS, command, description);
> > > > }
> > > >
> > > > default void execute(MailOptions options, ThrowingRunnable > extends
> > > > Exception> command, String description) {
> > > > execute(options, command, description, EMPTY_ARGS);
> > > > }
> > > >
> > > > default void execute(
> > > > ThrowingRunnable command,
> > > > String descriptionFormat,
> > > > Object... descriptionArgs) {
> > > > execute(DEFAULT_OPTIONS, command, descriptionFormat,
> > > > descriptionArgs);
> > > > }
> > > >
> > > >void execute(
> > > > MailOptions options,
> > > > ThrowingRunnable command,
> > > > String descriptionFormat,
> > > > Object... descriptionArgs);
> > > >
> > > >public static class MailOptions {
> > > > (...)
> > >

[jira] [Created] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-22 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35420:
--

 Summary: WordCountMapredITCase fails to compile in IntelliJ
 Key: FLINK-35420
 URL: https://issues.apache.org/jira/browse/FLINK-35420
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hadoop Compatibility
Affects Versions: 1.20.0
Reporter: Piotr Nowojski


{noformat}
flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
value isFalse is not a member of ?0
possible cause: maybe a semicolon is missing before `value isFalse'?
  .isFalse()
{noformat}

Might be caused by:
https://youtrack.jetbrains.com/issue/SCL-20679




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-22 Thread Piotr Nowojski
Hi Zakelly,

> I suggest not doubling the existing methods. Only providing the following
one is enough

In that case I would prefer to have a complete set of the methods for the
sake of completeness. If the number of variants is/would be getting too
much, we could convert the class into a builder?

mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla
%d").arg(42).submit();

It could be done in both in the future, if we would ever need to add even
more methods, or I could do it now. WDYT?

Best,
Piotrek

śr., 22 maj 2024 o 08:48 Zakelly Lan  napisał(a):

> Hi Piotrek,
>
> `MailOptions` looks good to me. I suggest not doubling the existing
> methods. Only providing the following one is enough:
>
> void execute(
> > MailOptions mailOptions,
> > ThrowingRunnable command,
> > String descriptionFormat,
> > Object... descriptionArgs);
>
>
> WDYT?
>
>
> Best,
> Zakelly
>
>
> On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski 
> wrote:
>
> > Hi Zakelly and others,
> >
> > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > > continuation mail will return true. The FLIP-425 will leverage this
> queue
> > > to execute some state requests, and when the cp arrives, the operator
> may
> > > call `yield()` to drain. It may happen that the continuation mail is
> > called
> > > again in `yield()`. By checking `isInterruptable()`, we can skip this
> > mail
> > > and re-enqueue.
> >
> > Do you have some suggestions on how `isInterruptible` should be defined?
> > Do we have to double the amount of methods in the `MailboxExecutor`, to
> > provide versions of the existing methods, that would enqueue
> > "interruptible"
> > versions of mails? Something like:
> >
> > default void execute(ThrowingRunnable command,
> > String description) {
> > execute(DEFAULT_OPTIONS, command, description);
> > }
> >
> > default void execute(MailOptions options, ThrowingRunnable > Exception> command, String description) {
> > execute(options, command, description, EMPTY_ARGS);
> > }
> >
> > default void execute(
> > ThrowingRunnable command,
> > String descriptionFormat,
> > Object... descriptionArgs) {
> > execute(DEFAULT_OPTIONS, command, descriptionFormat,
> > descriptionArgs);
> > }
> >
> >void execute(
> > MailOptions options,
> > ThrowingRunnable command,
> > String descriptionFormat,
> > Object... descriptionArgs);
> >
> >public static class MailOptions {
> > (...)
> > public MailOptions() {
> > }
> >
> > MailOptions setIsInterruptible() {
> > this.isInterruptible = true;
> > return this;
> > }
> > }
> >
> > And usage would be like this:
> >
> > mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () -> {
> > foo(); }, "foo");
> >
> > ?
> >
> > Best,
> > Piotrek
> >
> > czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):
> >
> > > Hi Piotr,
> > >
> > > > we checked in the firing timers benchmark [1] and we didn't observe
> any
> > > > performance regression.
> > >
> > > Thanks for the feedback, it's good news to hear that. I didn't notice
> > > we already have fireProcessingTimers benchmark.
> > >
> > > If so, we can follow it after this FLIP is merged.
> > >
> > > +1 for this FLIP.
> > >
> > > Best,
> > > Rui
> > >
> > > On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Zakelly,
> > > >
> > > > > I'm suggesting skipping the continuation mail during draining of
> > async
> > > > state access.
> > > >
> > > > I see. That makes sense to me now. I will later update the FLIP.
> > > >
> > > > > the code path will become more complex after this FLIP
> > > > due to the addition of shouldIntterupt() checks, right?
> > > >
> > > > Yes, that's correct.
> > > >
> > > > > If so, it's better to add a benchmark to check whether the job
> > > > > performance regresses when one job has a lot of timers.
> > > > > If the performance regresses too mu

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-21 Thread Piotr Nowojski
Hi Zakelly and others,

> 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> continuation mail will return true. The FLIP-425 will leverage this queue
> to execute some state requests, and when the cp arrives, the operator may
> call `yield()` to drain. It may happen that the continuation mail is
called
> again in `yield()`. By checking `isInterruptable()`, we can skip this mail
> and re-enqueue.

Do you have some suggestions on how `isInterruptible` should be defined?
Do we have to double the amount of methods in the `MailboxExecutor`, to
provide versions of the existing methods, that would enqueue
"interruptible"
versions of mails? Something like:

default void execute(ThrowingRunnable command,
String description) {
execute(DEFAULT_OPTIONS, command, description);
}

default void execute(MailOptions options, ThrowingRunnable command, String description) {
execute(options, command, description, EMPTY_ARGS);
}

default void execute(
ThrowingRunnable command,
String descriptionFormat,
Object... descriptionArgs) {
execute(DEFAULT_OPTIONS, command, descriptionFormat,
descriptionArgs);
}

   void execute(
MailOptions options,
ThrowingRunnable command,
String descriptionFormat,
Object... descriptionArgs);

   public static class MailOptions {
(...)
public MailOptions() {
}

MailOptions setIsInterruptible() {
this.isInterruptible = true;
return this;
}
}

And usage would be like this:

mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () -> {
foo(); }, "foo");

?

Best,
Piotrek

czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi Piotr,
>
> > we checked in the firing timers benchmark [1] and we didn't observe any
> > performance regression.
>
> Thanks for the feedback, it's good news to hear that. I didn't notice
> we already have fireProcessingTimers benchmark.
>
> If so, we can follow it after this FLIP is merged.
>
> +1 for this FLIP.
>
> Best,
> Rui
>
> On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski 
> wrote:
>
> > Hi Zakelly,
> >
> > > I'm suggesting skipping the continuation mail during draining of async
> > state access.
> >
> > I see. That makes sense to me now. I will later update the FLIP.
> >
> > > the code path will become more complex after this FLIP
> > due to the addition of shouldIntterupt() checks, right?
> >
> > Yes, that's correct.
> >
> > > If so, it's better to add a benchmark to check whether the job
> > > performance regresses when one job has a lot of timers.
> > > If the performance regresses too much, we need to re-consider it.
> > > Of course, I hope the performance is fine.
> >
> > I had the same concerns when initially David Moravek proposed this
> > solution,
> > but we checked in the firing timers benchmark [1] and we didn't observe
> any
> > performance regression.
> >
> > Best,
> > Piotrek
> >
> > [1] http://flink-speed.xyz/timeline/?ben=fireProcessingTimers=3
> >
> >
> >
> > wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Overall this FLIP is fine for me. I have a minor concern:
> > > IIUC, the code path will become more complex after this FLIP
> > > due to the addition of shouldIntterupt() checks, right?
> > >
> > > If so, it's better to add a benchmark to check whether the job
> > > performance regresses when one job has a lot of timers.
> > > If the performance regresses too much, we need to re-consider it.
> > > Of course, I hope the performance is fine.
> > >
> > > Best,
> > > Rui
> > >
> > > On Mon, May 6, 2024 at 6:30 PM Zakelly Lan 
> > wrote:
> > >
> > > > Hi Piotr,
> > > >
> > > > I'm saying the scenario where things happen in the following order:
> > > > 1. advance watermark and process timers.
> > > > 2. the cp arrives and interrupts the timer processing, after this the
> > > > continuation mail is in the mailbox queue.
> > > > 3. `snapshotState` is called, where the async state access responses
> > will
> > > > be drained by calling `tryYield()` [1]. —— What if the continuation
> > mail
> > > is
> > > > triggered by `tryYield()`?
> > > >
> > > > I'm suggesting skipping the continuation mail during draining of
> async
> > > > state a

Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-16 Thread Piotr Nowojski
Hi Zakelly,

> I'm suggesting skipping the continuation mail during draining of async
state access.

I see. That makes sense to me now. I will later update the FLIP.

> the code path will become more complex after this FLIP
due to the addition of shouldIntterupt() checks, right?

Yes, that's correct.

> If so, it's better to add a benchmark to check whether the job
> performance regresses when one job has a lot of timers.
> If the performance regresses too much, we need to re-consider it.
> Of course, I hope the performance is fine.

I had the same concerns when initially David Moravek proposed this
solution,
but we checked in the firing timers benchmark [1] and we didn't observe any
performance regression.

Best,
Piotrek

[1] http://flink-speed.xyz/timeline/?ben=fireProcessingTimers=3



wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi Piotr,
>
> Overall this FLIP is fine for me. I have a minor concern:
> IIUC, the code path will become more complex after this FLIP
> due to the addition of shouldIntterupt() checks, right?
>
> If so, it's better to add a benchmark to check whether the job
> performance regresses when one job has a lot of timers.
> If the performance regresses too much, we need to re-consider it.
> Of course, I hope the performance is fine.
>
> Best,
> Rui
>
> On Mon, May 6, 2024 at 6:30 PM Zakelly Lan  wrote:
>
> > Hi Piotr,
> >
> > I'm saying the scenario where things happen in the following order:
> > 1. advance watermark and process timers.
> > 2. the cp arrives and interrupts the timer processing, after this the
> > continuation mail is in the mailbox queue.
> > 3. `snapshotState` is called, where the async state access responses will
> > be drained by calling `tryYield()` [1]. —— What if the continuation mail
> is
> > triggered by `tryYield()`?
> >
> > I'm suggesting skipping the continuation mail during draining of async
> > state access.
> >
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305
> >
> > Best,
> > Zakelly
> >
> >
> > On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > Can you elaborate a bit more on what you have in mind? How marking
> mails
> > as
> > > interruptible helps with something? If an incoming async state access
> > > response comes, it could just request to interrupt any currently
> ongoing
> > > computations, regardless the currently executed mail is or is not
> > > interruptible.
> > >
> > > Best,
> > > Piotrek
> > >
> > > pon., 6 maj 2024 o 06:33 Zakelly Lan 
> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for the improvement, overall +1 for this. I'd leave a minor
> > > comment:
> > > >
> > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> > > > continuation mail will return true. The FLIP-425 will leverage this
> > queue
> > > > to execute some state requests, and when the cp arrives, the operator
> > may
> > > > call `yield()` to drain. It may happen that the continuation mail is
> > > called
> > > > again in `yield()`. By checking `isInterruptable()`, we can skip this
> > > mail
> > > > and re-enqueue.
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei 
> wrote:
> > > >
> > > > > Thanks for your answers, Piotrek. I got it now.  +1 for this
> > > improvement.
> > > > >
> > > > > Best,
> > > > > Yanfei
> > > > >
> > > > > Stefan Richter  于2024年4月30日周二
> > 21:30写道:
> > > > > >
> > > > > >
> > > > > > Thanks for the improvement proposal, I’m +1 for the change!
> > > > > >
> > > > > > Best,
> > > > > > Stefan
> > > > > >
> > > > > >
> > > > > >
> > > > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan  >
> > > > wrote:
> > > > > > >
> > > > > > > Thanks for the proposal, I definitely see the need for this
> > > > > improvement, +1.
> > > > > > >
> > > > > > >

Re: [DISCUSS] FLIP-444: Native file copy support

2024-05-16 Thread Piotr Nowojski
Hi Lorenzo,

> • concerns about memory and CPU used out of Flink's control

Please note that using AWS SDKv2 would have the same concerns. In both
cases Flink can control only to a certain extent what either the SDKv2
library does under the hood or the s5cmd process, via configuration
parameters. SDKv2, if configured improperly, could also overload TMs CPU,
and it would also use extra memory.  To an extent that also applies to the
current way we are downloading/uploading files from the S3.

> • deployment concerns from the usability perspective (would need to
install s5cmd on all TMs prior to the job deploy)

Yes, that's the downside.

> Also, invoking an external binary would incur in some performance
degradation possibly. Might it be that using AWS SDK would not, given its
Java implementation, and performance could be similar?

We haven't run the benchmarks for very fast checkpointing and we haven't
measured latency impact, but please note `s5cmd` is very fast to startup.
It's not Java after all ;) It's at least an order of magnitude below 1s, so
the impact on Flink should be negligible, as Flink doesn't support
checkpointing under < 1s. Especially not with uploading files to S3. I
wouldn't be actually surprised that `s5cmd` (or AWS
SDKv2's `TransferManager`) would actually both improve minimal e2e
checkpointing times.

> Wrapping up, I see using AWS SDK having PROs that could be traded with
the CON of slightly worse perf than s5cmd:
>
> • no hurdles for the user, as the SDK would be a Flink dependency
> • less config on the Flink side
>
> Do you agree?

To an extent, yes. AWS SDKv2's TransferManager would also have to be
configured properly. But I agree that the largest hurdle with `s5cmd` is
the added operational complexity of having to supply 3rd party binary.

Please also keep in mind that a lot of work for that FLIP will be in
defining, creating and using the interfaces for batch files copy. The
actual implementation of the fast copying file system interface and using
`s5cmd` is not the dominant factor. So all in all, I wouldn't object if
someone would like to take over my AWS SDKv2 PoC, and finish it off in the
future, as an alternative for the `s5cmd`. Indeed AWS SDKv2 could at some
point become the default setting, while `s5cmd` could remain as a faster
alternative.

Best,
Piotrek

czw., 16 maj 2024 o 09:03 
napisał(a):

> Hello Piotr and thanks for this proposal!
> The idea sounds smart and very well grounded thank you!
>
> Also here, as others, I have some doubts about invoking an external binary
> (namely s5cmd):
>
> • concerns about memory and CPU used out of Flink's control
> • deployment concerns from the usability perspective (would need to
> install s5cmd on all TMs prior to the job deploy)
>
>
> Also, invoking an external binary would incur in some performance
> degradation possibly. Might it be that using AWS SDK would not, given its
> Java implementation, and performance could be similar?
>
> Wrapping up, I see using AWS SDK having PROs that could be traded with the
> CON of slightly worse perf than s5cmd:
>
> • no hurdles for the user, as the SDK would be a Flink dependency
> • less config on the Flink side
>
>
> Do you agree?
> On May 13, 2024 at 07:42 +0200, Hangxiang Yu , wrote:
> > >
> > > Note that for both recovery and checkpoints, there are no retring
> > > mechanisms. If any part of downloading or
> > > uploading fails, the job fails over, so actually using such interface
> > > extension would be out of scope of this FLIP. In
> > > that case, maybe if this could be extended in the future without
> breaking
> > > compatibility we could leave it as a
> > > future improvement?
> > >
> >
> > Thanks for the reply.
> > It makes sense to consider as a future optimization.
> >
> > On Fri, May 10, 2024 at 4:31 PM Piotr Nowojski  >
> > wrote:
> >
> > > Hi!
> > >
> > > Thanks for your suggestions!
> > >
> > > > > I'd prefer a unified one interface
> > >
> > > I have updated the FLIP to take that into account. In this case, I
> would
> > > also propose to completely drop `DuplicatingFileSystem` in favour of a
> > > basically renamed version of it `PathsCopyingFileSystem`.
> > > `DuplicatingFileSystem` was not marked as PublicEvolving/Experimental
> > > (probably by mistake), so technically we can do it. Even if not for
> that
> > > mistake, I would still vote to replace it to simplify the code, as any
> > > migration would be very easy. At the same time to the best of my
> knowledge,
> > > no one has ever implemented it.
> > >
> > > > > The proposal mentions that s5cmd utilises 100% of CPU

Re: [DISCUSS] FLIP-444: Native file copy support

2024-05-10 Thread Piotr Nowojski
following use cases:
> > > > > - `canFastCopy(remoteA, remoteB)` returns true - current equivalent
> > of
> > > > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path
> > > > > - `canFastCopy(local, remote)` returns true - FS can natively
> upload
> > > > local
> > > > > file to a remote location
> > > > > - `canFastCopy(remote, local)` returns true - FS can natively
> > download
> > > > > local file from a remote location
> > > > >
> > > > > Maybe indeed that's a better solution vs having two separate
> > interfaces
> > > > for
> > > > > copying and duplicating?
> > > > >
> > > >
> > > > I'd prefer a unified one interface, `canFastCopy(Path, Path)` looks
> > good
> > > to
> > > > me. This also resolves my question 1 about the destination.
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Mon, May 6, 2024 at 6:36 PM Piotr Nowojski 
> > > > wrote:
> > > >
> > > > > Hi All!
> > > > >
> > > > > Thanks for your comments.
> > > > >
> > > > > Muhammet and Hong, about the config options.
> > > > >
> > > > > > Could you please also add the configuration property for this? An
> > > > example
> > > > > showing how users would set this parameter would be helpful.
> > > > >
> > > > > > 1/ Configure the implementation of PathsCopyingFileSystem used
> > > > > > 2/ Configure the location of the s5cmd binary (version control
> > etc.)
> > > > >
> > > > > Ops, sorry I added the config options that I had in mind to the
> > FLIP. I
> > > > > don't know why I have omitted this. Basically I suggest that in
> order
> > > to
> > > > > use native file copying:
> > > > > 1. `FileSystem` must support it via implementing
> > > `PathsCopyingFileSystem`
> > > > > interface
> > > > > 2. That `FileSystem` would have to be configured to actually use
> it.
> > > For
> > > > > example S3 file system would return `true` that it can copy paths
> > > > > only if `s3.s5cmd.path` has been specified.
> > > > >
> > > > > > Would this affect any filesystem connectors that use
> > FileSystem[1][2]
> > > > > dependencies?
> > > > >
> > > > > Definitely not out of the box. Any place in Flink that is currently
> > > > > uploading/downloading files from a FileSystem could use this
> feature,
> > > but
> > > > > it
> > > > > would have to be implemented. The same way this FLIP will implement
> > > > native
> > > > > files copying when downloading state during recovery,
> > > > > but the old code path will be still used for uploading state files
> > > > during a
> > > > > checkpoint.
> > > > >
> > > > > > How adding a s5cmd will affect memory footprint? Since this is a
> > > native
> > > > > binary, memory consumption will not be controlled by JVM or Flink.
> > > > >
> > > > > As you mentioned the memory usage of `s5cmd` will not be
> controlled,
> > so
> > > > the
> > > > > memory footprint will grow. S5cmd integration with Flink
> > > > > has been tested quite extensively on our production environment
> > > already,
> > > > > and we haven't observed any issues so far despite the fact we
> > > > > are using quite small pods. But of course if your setup is working
> on
> > > the
> > > > > edge of OOM, this could tip you over that edge.
> > > > >
> > > > > Zakelly:
> > > > >
> > > > > > 1. What is the semantic of `canCopyPath`? Should it be associated
> > > with
> > > > a
> > > > > > specific destination path? e.g. It can be copied to local, but
> not
> > to
> > > > the
> > > > > > remote FS.
> > > > >
> > > > > For the S3 (both for SDKv2 and s5cmd implementations), the copying
> > > > > direction (upload/download) doesn't matter. I don't know about
> other
> > > > > file systems, I haven't investigated anything besides S3.
> > Neverth

Re: [VOTE] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-05-06 Thread Piotr Nowojski
+1 (binding)

Piotrek

pon., 6 maj 2024 o 12:35 Roman Khachatryan  napisał(a):

> +1 (binding)
>
> Regards,
> Roman
>
>
> On Mon, May 6, 2024 at 11:56 AM gongzhongqiang 
> wrote:
>
> > +1 (non-binding)
> >
> > Best,
> > Zhongqiang Gong
> >
> > yue ma  于2024年5月6日周一 10:54写道:
> >
> > > Hi everyone,
> > >
> > > Thanks for all the feedback, I'd like to start a vote on the FLIP-447:
> > > Upgrade FRocksDB from 6.20.3 to 8.10.0 [1]. The discussion thread is
> here
> > > [2].
> > >
> > > The vote will be open for at least 72 hours unless there is an
> objection
> > or
> > > insufficient votes.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0
> > > [2] https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw
> > >
> > > --
> > > Best,
> > > Yue
> > >
> >
>


Re: [DISCUSS] FLIP-444: Native file copy support

2024-05-06 Thread Piotr Nowojski
future work to improve the upload speed as well.
> This
> > > would be useful for jobs with large state and high Async checkpointing
> > > times.
> > >
> > > Some thoughts on the configuration, it might be good for us to
> introduce
> > 2x
> > > points of configurability for future proofing:
> > > 1/ Configure the implementation of PathsCopyingFileSystem used, maybe
> by
> > > config, or by ServiceResources (this would allow us to use this for
> > > alternative clouds/Implement S3 SDKv2 support if we want this in the
> > > future). Also this could be used as a feature flag to determine if we
> > > should be using this new native file copy support.
> > > 2/ Configure the location of the s5cmd binary (version control etc.),
> as
> > > you have mentioned in the FLIP.
> > >
> > > Regards,
> > > Hong
> > >
> > >
> > > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov
> > >  wrote:
> > >
> > > > Hey Piotr,
> > > >
> > > > Thanks for the proposal! It would be great improvement!
> > > >
> > > > Some questions from my side:
> > > >
> > > > > In order to configure s5cmd Flink’s user would need
> > > > > to specify path to the s5cmd binary.
> > > >
> > > > Could you please also add the configuration property
> > > > for this? An example showing how users would set this
> > > > parameter would be helpful.
> > > >
> > > > Would this affect any filesystem connectors that use
> > > > FileSystem[1][2] dependencies?
> > > >
> > > > Best,
> > > > Muhammet
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
> > > > [2]:
> > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> > > >
> > > > On 2024-04-30 13:15, Piotr Nowojski wrote:
> > > > > Hi all!
> > > > >
> > > > > I would like to put under discussion:
> > > > >
> > > > > FLIP-444: Native file copy support
> > > > > https://cwiki.apache.org/confluence/x/rAn9EQ
> > > > >
> > > > > This proposal aims to speed up Flink recovery times, by speeding up
> > > > > state
> > > > > download times. However in the future, the same mechanism could be
> > also
> > > > > used to speed up state uploading (checkpointing/savepointing).
> > > > >
> > > > > I'm curious to hear your thoughts.
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-06 Thread Piotr Nowojski
Hi Zakelly,

Can you elaborate a bit more on what you have in mind? How marking mails as
interruptible helps with something? If an incoming async state access
response comes, it could just request to interrupt any currently ongoing
computations, regardless the currently executed mail is or is not
interruptible.

Best,
Piotrek

pon., 6 maj 2024 o 06:33 Zakelly Lan  napisał(a):

> Hi Piotr,
>
> Thanks for the improvement, overall +1 for this. I'd leave a minor comment:
>
> 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
> continuation mail will return true. The FLIP-425 will leverage this queue
> to execute some state requests, and when the cp arrives, the operator may
> call `yield()` to drain. It may happen that the continuation mail is called
> again in `yield()`. By checking `isInterruptable()`, we can skip this mail
> and re-enqueue.
>
>
> Best,
> Zakelly
>
> On Wed, May 1, 2024 at 4:35 PM Yanfei Lei  wrote:
>
> > Thanks for your answers, Piotrek. I got it now.  +1 for this improvement.
> >
> > Best,
> > Yanfei
> >
> > Stefan Richter  于2024年4月30日周二 21:30写道:
> > >
> > >
> > > Thanks for the improvement proposal, I’m +1 for the change!
> > >
> > > Best,
> > > Stefan
> > >
> > >
> > >
> > > > On 30. Apr 2024, at 15:23, Roman Khachatryan 
> wrote:
> > > >
> > > > Thanks for the proposal, I definitely see the need for this
> > improvement, +1.
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > >
> > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  > <mailto:pnowoj...@apache.org>> wrote:
> > > >
> > > >> Hi Yanfei,
> > > >>
> > > >> Thanks for the feedback!
> > > >>
> > > >>> 1. Currently when AbstractStreamOperator or
> AbstractStreamOperatorV2
> > > >>> processes a watermark, the watermark will be sent to downstream, if
> > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted,
> when
> > > >>> is the watermark sent downstream?
> > > >>
> > > >> The watermark would be outputted by an operator only once all
> relevant
> > > >> timers are fired.
> > > >> In other words, if firing of timers is interrupted a continuation
> > mail to
> > > >> continue firing those
> > > >> interrupted timers is created. Watermark will be emitted downstream
> > at the
> > > >> end of that
> > > >> continuation mail.
> > > >>
> > > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail
> and
> > > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > > >>
> > > >> Yes, both firing processing and even time timers share the same code
> > and
> > > >> both will
> > > >> support interruptions in the same way. Actually I've renamed the
> FLIP
> > from
> > > >>
> > > >>> Interruptible watermarks processing
> > > >>
> > > >> to:
> > > >>
> > > >>> Interruptible timers firing
> > > >>
> > > >> to make this more clear.
> > > >>
> > > >> Best,
> > > >> Piotrek
> > > >>
> > > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei 
> napisał(a):
> > > >>
> > > >>> Hi Piotrek,
> > > >>>
> > > >>> Thanks for this proposal. It looks like it will shorten the
> > checkpoint
> > > >>> duration, especially in the case of back pressure. +1 for it!  I'd
> > > >>> like to ask some questions to understand your thoughts more
> > precisely.
> > > >>>
> > > >>> 1. Currently when AbstractStreamOperator or
> AbstractStreamOperatorV2
> > > >>> processes a watermark, the watermark will be sent to downstream, if
> > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted,
> when
> > > >>> is the watermark sent downstream?
> > > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail
> and
> > > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > > >>>
> > > >>> Best regards,
> > > >>> Yanfei
> > > >>>
> > > >>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
> > > >>>
> > > >>>>
> > > >>>> Hi all,
> > > >>>>
> > > >>>> I would like to start a discussion on FLIP-443: Interruptible
> > watermark
> > > >>>> processing.
> > > >>>>
> > > >>>>
> >
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171508837000=AOvVaw0eTZDvLwdZUDai5GqoSGrD
> > > >>>>
> > > >>>> This proposal tries to make Flink's subtask thread more responsive
> > when
> > > >>>> processing watermarks/firing timers, and make those operations
> > > >>>> interruptible/break them apart into smaller steps. At the same
> time,
> > > >> the
> > > >>>> proposed solution could be potentially adopted in other places in
> > the
> > > >>> code
> > > >>>> base as well, to solve similar problems with other flatMap-like
> > > >> operators
> > > >>>> (non windowed joins, aggregations, CepOperator, ...).
> > > >>>>
> > > >>>> I'm looking forward to your thoughts.
> > > >>>>
> > > >>>> Best,
> > > >>>> Piotrek
> > >
> >
>


[DISCUSS] FLIP-444: Native file copy support

2024-04-30 Thread Piotr Nowojski
Hi all!

I would like to put under discussion:

FLIP-444: Native file copy support
https://cwiki.apache.org/confluence/x/rAn9EQ

This proposal aims to speed up Flink recovery times, by speeding up state
download times. However in the future, the same mechanism could be also
used to speed up state uploading (checkpointing/savepointing).

I'm curious to hear your thoughts.

Best,
Piotrek


Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-04-30 Thread Piotr Nowojski
Hi Yanfei,

Thanks for the feedback!

> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> processes a watermark, the watermark will be sent to downstream, if
> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> is the watermark sent downstream?

The watermark would be outputted by an operator only once all relevant
timers are fired.
In other words, if firing of timers is interrupted a continuation mail to
continue firing those
interrupted timers is created. Watermark will be emitted downstream at the
end of that
continuation mail.

> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> executed in mailbox. Is processing-timer allowed to be interrupted?

Yes, both firing processing and even time timers share the same code and
both will
support interruptions in the same way. Actually I've renamed the FLIP from

> Interruptible watermarks processing

to:

> Interruptible timers firing

to make this more clear.

Best,
Piotrek

wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):

> Hi Piotrek,
>
> Thanks for this proposal. It looks like it will shorten the checkpoint
> duration, especially in the case of back pressure. +1 for it!  I'd
> like to ask some questions to understand your thoughts more precisely.
>
> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> processes a watermark, the watermark will be sent to downstream, if
> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> is the watermark sent downstream?
> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> executed in mailbox. Is processing-timer allowed to be interrupted?
>
> Best regards,
> Yanfei
>
> Piotr Nowojski  于2024年4月29日周一 21:57写道:
>
> >
> > Hi all,
> >
> > I would like to start a discussion on FLIP-443: Interruptible watermark
> > processing.
> >
> > https://cwiki.apache.org/confluence/x/qgn9EQ
> >
> > This proposal tries to make Flink's subtask thread more responsive when
> > processing watermarks/firing timers, and make those operations
> > interruptible/break them apart into smaller steps. At the same time, the
> > proposed solution could be potentially adopted in other places in the
> code
> > base as well, to solve similar problems with other flatMap-like operators
> > (non windowed joins, aggregations, CepOperator, ...).
> >
> > I'm looking forward to your thoughts.
> >
> > Best,
> > Piotrek
>


[DISCUSS] FLIP-443: Interruptible watermark processing

2024-04-29 Thread Piotr Nowojski
Hi all,

I would like to start a discussion on FLIP-443: Interruptible watermark
processing.

https://cwiki.apache.org/confluence/x/qgn9EQ

This proposal tries to make Flink's subtask thread more responsive when
processing watermarks/firing timers, and make those operations
interruptible/break them apart into smaller steps. At the same time, the
proposed solution could be potentially adopted in other places in the code
base as well, to solve similar problems with other flatMap-like operators
(non windowed joins, aggregations, CepOperator, ...).

I'm looking forward to your thoughts.

Best,
Piotrek


[jira] [Created] (FLINK-35065) Add numFiredTimers and numFiredTimersPerSecond metrics

2024-04-09 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35065:
--

 Summary: Add numFiredTimers and numFiredTimersPerSecond metrics
 Key: FLINK-35065
 URL: https://issues.apache.org/jira/browse/FLINK-35065
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics, Runtime / Task
Affects Versions: 1.19.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.20.0


Currently there is now way of knowing how many timers are being fired by Flink, 
so it's impossible to distinguish, even using code profiling, if operator is 
firing only a couple of heavy timers per second using ~100% of the CPU time, vs 
firing thousands of timer per seconds.

We could add the following metrics to address this issue:
* numFiredTimers - total number of fired timers per operator
* numFiredTimersPerSecond - per second rate of firing timers per operator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35051) Weird priorities when processing unaligned checkpoints

2024-04-08 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35051:
--

 Summary: Weird priorities when processing unaligned checkpoints
 Key: FLINK-35051
 URL: https://issues.apache.org/jira/browse/FLINK-35051
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Network, Runtime / Task
Affects Versions: 1.18.1, 1.19.0, 1.17.2
Reporter: Piotr Nowojski


While looking through the code I noticed that `StreamTask` is processing 
unaligned checkpoints in strange order/priority. The end result is that 
unaligned checkpoint `Start Delay` time can be increased, and triggering 
checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions 
in the system, like for example:
* processing time timers
* `AsyncWaitOperator` results
* ... 

Incoming UC barrier is treated as a priority event by the network stack (it 
will be polled from the input before anything else). This is what we want, but 
polling elements from network stack has lower priority then processing enqueued 
mailbox actions.

Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
this mailbox action is also not prioritised in any way, so other mailbox 
actions could be unnecessarily executed first. 

On top of that there is a clash of two separate concepts here:
# Mailbox priority. yieldToDownstream - so in a sense reverse to what we would 
like to have for triggering checkpoint, but that only kicks in #yield() calls, 
where it's actually correct, that operator in a middle of execution can not 
yield to checkpoint - it should only yield to downstream.
# Control mails in mailbox executor - cancellation is done via that, it 
bypasses whole mailbox queue.
# Priority events in the network stack.

It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in 
both things, and highest network priority event containing UC barrier, when 
executed via mailbox has actually the lowest mailbox priority.

Control mails mechanism is a kind of priority mails executed out of order, but 
doesn't generalise well for use in checkpointing.

This whole thing should be re-worked at some point. Ideally what we would like 
have is that:
* mail to convert AC barriers to UC
* polling UC barrier from the network input
* checkpoint trigger via RPC for source tasks
should be processed first, with an exception of yieldToDownstream, where 
current mailbox priorities should be adhered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-437: Support ML Models in Flink SQL

2024-04-03 Thread Piotr Nowojski
+1 (binding)

Best,
Piotrek

śr., 3 kwi 2024 o 04:29 Yu Chen  napisał(a):

> +1 (non-binding)
>
> Looking forward to this future.
>
> Thanks,
> Yu Chen
>
> > 2024年4月3日 10:23,Jark Wu  写道:
> >
> > +1 (binding)
> >
> > Best,
> > Jark
> >
> > On Tue, 2 Apr 2024 at 15:12, Timo Walther  wrote:
> >
> >> +1 (binding)
> >>
> >> Thanks,
> >> Timo
> >>
> >> On 29.03.24 17:30, Hao Li wrote:
> >>> Hi devs,
> >>>
> >>> I'd like to start a vote on the FLIP-437: Support ML Models in Flink
> >>> SQL [1]. The discussion thread is here [2].
> >>>
> >>> The vote will be open for at least 72 hours unless there is an
> objection
> >> or
> >>> insufficient votes.
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
> >>>
> >>> [2] https://lists.apache.org/thread/9z94m2bv4w265xb5l2mrnh4lf9m28ccn
> >>>
> >>> Thanks,
> >>> Hao
> >>>
> >>
> >>
>
>


Re: Re: [VOTE] FLIP-427: Disaggregated state Store

2024-03-28 Thread Piotr Nowojski
+1 (binding)

czw., 28 mar 2024 o 13:31 Feifan Wang  napisał(a):

> +1 (non-binding)
>
>
>
>
> ——
>
> Best regards,
>
> Feifan Wang
>
>
>
>
> At 2024-03-28 19:01:01, "Yuan Mei"  wrote:
> >+1 (binding)
> >
> >Best
> >Yuan
> >
> >
> >
> >
> >On Wed, Mar 27, 2024 at 6:37 PM Hangxiang Yu  wrote:
> >
> >> Hi devs,
> >>
> >> Thanks all for your valuable feedback about FLIP-427: Disaggregated
> state
> >> Store [1].
> >> I'd like to start a vote on it.  The discussion thread is here [2].
> >>
> >> The vote will be open for at least 72 hours unless there is an
> objection or
> >> insufficient votes.
> >>
> >> [1] https://cwiki.apache.org/confluence/x/T4p3EQ
> >> [2] https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> >>
> >>
> >> Best,
> >> Hangxiang
> >>
>


Re: [VOTE] FLIP-425: Asynchronous Execution Model

2024-03-28 Thread Piotr Nowojski
+1 (binding)

Piotrek

czw., 28 mar 2024 o 11:44 Yuan Mei  napisał(a):

> +1 (binding)
>
> Best,
> Yuan
>
> On Thu, Mar 28, 2024 at 4:33 PM Xuannan Su  wrote:
>
> > +1 (non-binding)
> >
> > Best regards,
> > Xuannan
> >
> > On Wed, Mar 27, 2024 at 6:28 PM Yanfei Lei  wrote:
> > >
> > > Hi everyone,
> > >
> > > Thanks for all the feedback about the FLIP-425: Asynchronous Execution
> > > Model [1]. The discussion thread is here [2].
> > >
> > > The vote will be open for at least 72 hours unless there is an
> > > objection or insufficient votes.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > [2] https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > >
> > > Best regards,
> > > Yanfei
> >
>


Re: [VOTE] FLIP-424: Asynchronous State APIs

2024-03-28 Thread Piotr Nowojski
+1 (binding)

Piotrek

czw., 28 mar 2024 o 11:42 Yuan Mei  napisał(a):

> +1 (binding)
>
> Best,
> Yuan
>
> On Thu, Mar 28, 2024 at 4:30 PM Xuannan Su  wrote:
>
> > +1 (non-binding)
> >
> > Best,
> > Xuannan
> >
> >
> > On Wed, Mar 27, 2024 at 6:23 PM Zakelly Lan 
> wrote:
> > >
> > > Hi devs,
> > >
> > > I'd like to start a vote on the FLIP-424: Asynchronous State APIs [1].
> > The
> > > discussion thread is here [2].
> > >
> > > The vote will be open for at least 72 hours unless there is an
> objection
> > or
> > > insufficient votes.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > [2] https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > >
> > >
> > > Best,
> > > Zakelly
> >
>


Re: [VOTE] FLIP-423: Disaggregated State Storage and Management (Umbrella FLIP)

2024-03-28 Thread Piotr Nowojski
+1 (binding)

Piotrek

czw., 28 mar 2024 o 11:43 Yuan Mei  napisał(a):

> Hi devs,
>
> I'd like to start a vote on the FLIP-423: Disaggregated State Storage and
> Management (Umbrella FLIP) [1]. The discussion thread is here [2].
>
> The vote will be open for at least 72 hours unless there is an objection or
> insufficient votes.
>
> [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> [2] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
>
>
> Best,
> Yuan
>


Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-27 Thread Piotr Nowojski
ll affect this. We are controlling the records that can
> get
> >> in the 'processElement' and the state requests that can fire in
> parallel,
> >> no matter how high the load spikes, they will be blocked outside. It is
> >> relatively stable within the proposed execution model itself. The
> unaligned
> >> barrier will skip those inputs in the queue as before.
> >>
> >>
> >> At the same time, I still don't understand why we can not implement
> things
> >>> incrementally? First
> >>> let's start with the current API, without the need to rewrite all of
> the
> >>> operators, we can asynchronously fetch whole
> >>> state for a given record using its key. That should already vastly
> >>> improve
> >>> many things, and this way we could
> >>> perform a checkpoint without a need of draining the
> in-progress/in-flight
> >>> buffer. We could roll that version out,
> >>> test it out in practice, and then we could see if the fine grained
> state
> >>> access is really needed. Otherwise it sounds
> >>> to me like a premature optimization, that requires us to not only
> >>> rewrite a
> >>> lot of the code, but also to later maintain
> >>> it, even if it ultimately proves to be not needed. Which I of course
> can
> >>> not be certain but I have a feeling that it
> >>> might be the case.
> >>
> >>
> >> The disaggregated state management we proposed is target at including
> but
> >> not limited to the following challenges:
> >>
> >>1. Local disk constraints, including limited I/O and space.
> >>(discussed in FLIP-423)
> >>2. Unbind the I/O resource with pre-allocated CPU resource, to make
> >>good use of both (motivation of FLIP-424)
> >>3. Elasticity of scaling I/O or storage capacity.
> >>
> >> Thus our plan is dependent on DFS, using local disk as an optional cache
> >> only. The pre-fetching plan you mentioned is still binding I/O with CPU
> >> resources, and will consume even more I/O to load unnecessary state. It
> >> makes things worse. Please note that we are not targeting some scenarios
> >> where the local state could handle well, and our goal is not to replace
> the
> >> local state.
> >>
> >> And If manpower is a big concern of yours, I would say many of my
> >> colleagues could help contribute in runtime or SQL operators. It is
> >> experimental on a separate code path other than the local state and
> will be
> >> recommended to use only when we prove it mature.
> >>
> >>
> >> Thanks & Best,
> >> Zakelly
> >>
> >> On Wed, Mar 20, 2024 at 10:04 PM Piotr Nowojski 
> >> wrote:
> >>
> >>> Hey Zakelly!
> >>>
> >>> Sorry for the late reply. I still have concerns about the proposed
> >>> solution, with my main concerns coming from
> >>> the implications of the asynchronous state access API on the
> >>> checkpointing
> >>> and responsiveness of Flink.
> >>>
> >>> >> What also worries me a lot in this fine grained model is the effect
> on
> >>> the checkpointing times.
> >>> >
> >>> > Your concerns are very reasonable. Faster checkpointing is always a
> >>> core
> >>> advantage of disaggregated state,
> >>> > but only for the async phase. There will be some complexity
> introduced
> >>> by
> >>> in-flight requests, but I'd suggest
> >>> > a checkpoint containing those in-flight state requests as part of the
> >>> state, to accelerate the sync phase by
> >>> > skipping the buffer draining. This makes the buffer size have little
> >>> impact on checkpoint time. And all the
> >>> > changes keep within the execution model we proposed while the
> >>> checkpoint
> >>> barrier alignment or handling
> >>> > will not be touched in our proposal, so I guess the complexity is
> >>> relatively controllable. I have faith in that :)
> >>>
> >>> As we discussed off-line, you agreed that we can not checkpoint while
> >>> some
> >>> records are in the middle of being
> >>> processed. That we would have to drain the in-progress records before
> >>> doing
> >>> the checkpoint. You also argued
> >>> that this is not a problem, because the s

[jira] [Created] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-22 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-34913:
--

 Summary: ConcurrentModificationException 
SubTaskInitializationMetricsBuilder.addDurationMetric
 Key: FLINK-34913
 URL: https://issues.apache.org/jira/browse/FLINK-34913
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics, Runtime / State Backends
Affects Versions: 1.19.0
Reporter: Piotr Nowojski
 Fix For: 1.19.1


The following failures can occur during job's recovery when using clip & ingest

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-20 Thread Piotr Nowojski
Hey Zakelly!

Sorry for the late reply. I still have concerns about the proposed
solution, with my main concerns coming from
the implications of the asynchronous state access API on the checkpointing
and responsiveness of Flink.

>> What also worries me a lot in this fine grained model is the effect on
the checkpointing times.
>
> Your concerns are very reasonable. Faster checkpointing is always a core
advantage of disaggregated state,
> but only for the async phase. There will be some complexity introduced by
in-flight requests, but I'd suggest
> a checkpoint containing those in-flight state requests as part of the
state, to accelerate the sync phase by
> skipping the buffer draining. This makes the buffer size have little
impact on checkpoint time. And all the
> changes keep within the execution model we proposed while the checkpoint
barrier alignment or handling
> will not be touched in our proposal, so I guess the complexity is
relatively controllable. I have faith in that :)

As we discussed off-line, you agreed that we can not checkpoint while some
records are in the middle of being
processed. That we would have to drain the in-progress records before doing
the checkpoint. You also argued
that this is not a problem, because the size of this buffer can be
configured.

I'm really afraid of such a solution. I've seen in the past plenty of
times, that whenever Flink has to drain some
buffered records, eventually that always brakes timely checkpointing (and
hence ability for Flink to rescale in
a timely manner). Even a single record with a `flatMap` like operator
currently in Flink causes problems during
back pressure. That's especially true for example for processing
watermarks. At the same time, I don't see how
this value could be configured by even Flink's power users, let alone an
average user. The size of that in-flight
buffer not only depends on a particular query/job, but also the "good"
value changes dynamically over time,
and can change very rapidly. Sudden spikes of records or backpressure, some
hiccup during emitting watermarks,
all of those could change in an instant the theoretically optimal buffer
size of let's say "6000" records, down to "1".
And when those changes happen, those are the exact times when timely
checkpointing matters the most.
If the load pattern suddenly changes, and checkpointing takes suddenly tens
of minutes instead of a couple of
seconds, it means you can not use rescaling and you are forced to
overprovision the resources. And there also
other issues if checkpointing takes too long.

At the same time, I still don't understand why we can not implement things
incrementally? First
let's start with the current API, without the need to rewrite all of the
operators, we can asynchronously fetch whole
state for a given record using its key. That should already vastly improve
many things, and this way we could
perform a checkpoint without a need of draining the in-progress/in-flight
buffer. We could roll that version out,
test it out in practice, and then we could see if the fine grained state
access is really needed. Otherwise it sounds
to me like a premature optimization, that requires us to not only rewrite a
lot of the code, but also to later maintain
it, even if it ultimately proves to be not needed. Which I of course can
not be certain but I have a feeling that it
might be the case.

Best,
Piotrek

wt., 19 mar 2024 o 10:42 Zakelly Lan  napisał(a):

> Hi everyone,
>
> Thanks for your valuable feedback!
>
> Our discussions have been going on for a while and are nearing a
> consensus. So I would like to start a vote after 72 hours.
>
> Please let me know if you have any concerns, thanks!
>
>
> Best,
> Zakelly
>
> On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan  wrote:
>
> > Hi Yunfeng,
> >
> > Thanks for the suggestion!
> >
> > I will reorganize the FLIP-425 accordingly.
> >
> >
> > Best,
> > Zakelly
> >
> > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com>
> > wrote:
> >
> >> Hi Xintong and Zakelly,
> >>
> >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
> >> I agree with it that watermarks can use only out-of-order mode for
> >> now, because there is still not a concrete example showing the
> >> correctness risk about it. However, the strictly-ordered mode should
> >> still be supported as the default option for non-record event types
> >> other than watermark, at least for checkpoint barriers.
> >>
> >> I noticed that this information has already been documented in "For
> >> other non-record events, such as RecordAttributes ...", but it's at
> >> the bottom of the "Watermark" section, which might not be very
> >> obvious. Thus it might be better to reorganize the FLIP to better
> >> claim that the two order modes are designed for all non-record events,
> >> and which mode this FLIP would choose for each type of event.
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song 
> >> wrote:
> >> >
> >> > 

Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-01 Thread Piotr Nowojski
Thanks for your answers Zakelly. I get your points.

>  (...) it may not be suitable for scenarios where
> - A state is read by condition.
> - MapState with the user key cannot be determined in advance.

I guess it depends how we would like to treat the local disks. I've always
thought about them that almost always eventually all state from the DFS
should end up cached in the local disks. In this context, if you prefetch
all of the state for a given key, even if it's not strictly speaking
necessary right now, we know it will be accessed sooner or later, so it's
ok to download it right away. Even if state doesn't fit into local disks,
the more coarse grained solution shouldn't increase the latency by much. In
the currently proposed more fine grained solution, you make a single
request to DFS per each state access. In the proposal I mentioned, if we
group all of the state fields for a given key together on the DFS, that
would still be one single request to the DFS, albeit larger. But as your
benchmarks showed it's the latency of a single request that matters, so if
we are doing the request anyway, we can just as well fetch all of the state
for that key.

 > according to our PoC tests[3], it is still beneficial to load state from
local disk asynchronously (See line 4 of that table with 100% state in
local cache). Optimization mainly comes from parallel I/O

In that benchmark you mentioned, are you requesting the state
asynchronously from local disks into memory? If the benefit comes from
parallel I/O, then I would expect the benefit to disappear/shrink when
running multiple subtasks on the same machine, as they would be making
their own parallel requests, right? Also enabling checkpointing would
further cut into the available I/O budget.

Also please keep in mind I'm not arguing against the idea of async state
access :) Just with what granularity those async requests should be made.
Making state access asynchronous is definitely the right way to go!

What also worries me a lot in this fine grained model is the effect on the
checkpointing times. Many times we have noticed that basically every time
we introduce a buffer (local SQL aggregations, flink-python, firing all
timers at once, ...), which has to be processed before checkpoint
completes, is always breaking checkpointing times under backpressure.
Making the fine grained async mode compatible with unaligned checkpoints
might be possible, but even if possible, it would be difficult. On the
other hand, limiting the size of the in-flight state requests is difficult
to get right. Value that works for one job, won't work properly for a
different one. Or even a value for the same job will not work properly in
the same job all the time. Firing timers, varying input records rate,
watermarks hiccups, varying CPU load on the machine, ... might suddenly
make the size of the buffer too large.  In this regard the coarse grained
approach would fair much better, as the size of the in-flight requests
buffer wouldn't affect checkpointing times much - we would just need to
checkpoint a bit more of the in-flight data, which has small, very
predictable and stable impact on the checkpointing times.

Also regarding the overheads, it would be great if you could provide
profiling results of the benchmarks that you conducted to verify the
results. Or maybe if you could describe the steps to reproduce the results?
Especially "Hashmap (sync)" vs "Hashmap with async API".

> However, IIUC, your proposal is valuable in that it is compatible with
the original state APIs, and it can co-exist with the current plan. We do
consider providing such a pre-fetch cache under the original state APIs and
enhancing the performance transparently in future milestones.

I think this probably could be made to work, with some extra complexity for
the implementation, maintenance and would make it more complicated for
users to configure Flink. It would be best to have a single option that
either works the best or is the best compromise.

I'm curious to read your thoughts on those aspects :)

Best,
Piotrek


Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-02-29 Thread Piotr Nowojski
Hi!

Thanks for this proposal. It looks like it will be a great improvement!
I've only started reading the FLIP's, but I already have some questions
about the FLIP-425, the async execution.

What's the motivation behind splitting execution of a single element into
multiple independent steps in individual futures? Have you considered
something like this (?) :

1. Check what is the key of the incoming record.
2. Is the state for that key missing?
a) No! - just execute `processElement`/firing timer
b) Yes :( Are we already fetching a state for that key?
i) No - asynchronously fetch from the DFS to cache (local disk od
mem) state for that key
ii) Yes - enqueue this element/timer after the element that has
started async fetch and is already waiting for the fetch to complete
3. Once the async fetch completes for a given key, run `processElement` or
`onEventTime` for all of the buffered elements for that key.

That should both eliminate overheads, simplify the API for the users and
potentially further improve performance/reduce latencies from processing
all elements for the already pre-fetched key.

If we consider the state already available (so no need to asynchronously
fetch it) if it's either on local disks or memory, then I don't see a
downside of this compared to your current proposal. If you would like to
asynchronously fetch state from local disks into memory cache, then the
naive version of this approach would have a downside of potentially
unnecessarily reading into RAM a state field that the current
`processElement` call wouldn't need. But that could also be
solved/mitigated in a couple of ways. And I think considering the state as
"available" for sync access if it's in local disks (not RAM) is probably
good enough (comparable to RocksDB).

Best,
Piotrek

czw., 29 lut 2024 o 07:17 Yuan Mei  napisał(a):

> Hi Devs,
>
> This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, Hangxiang Yu,
> Yanfei Lei and Feng Wang. We'd like to start a discussion about introducing
> Disaggregated State Storage and Management in Flink 2.0.
>
> The past decade has witnessed a dramatic shift in Flink's deployment mode,
> workload patterns, and hardware improvements. We've moved from the
> map-reduce era where workers are computation-storage tightly coupled nodes
> to a cloud-native world where containerized deployments on Kubernetes
> become standard. To enable Flink's Cloud-Native future, we introduce
> Disaggregated State Storage and Management that uses DFS as primary storage
> in Flink 2.0, as promised in the Flink 2.0 Roadmap.
>
> Design Details can be found in FLIP-423[1].
>
> This new architecture is aimed to solve the following challenges brought in
> the cloud-native era for Flink.
> 1. Local Disk Constraints in containerization
> 2. Spiky Resource Usage caused by compaction in the current state model
> 3. Fast Rescaling for jobs with large states (hundreds of Terabytes)
> 4. Light and Fast Checkpoint in a native way
>
> More specifically, we want to reach a consensus on the following issues in
> this discussion:
>
> 1. Overall design
> 2. Proposed Changes
> 3. Design details to achieve Milestone1
>
> In M1, we aim to achieve an end-to-end baseline version using DFS as
> primary storage and complete core functionalities, including:
>
> - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs for
> asynchronous state access.
> - Asynchronous Execution Model (FLIP-425)[3]: Implement a non-blocking
> execution model leveraging the asynchronous APIs introduced in FLIP-424.
> - Grouping Remote State Access (FLIP-426)[4]: Enable retrieval of remote
> state data in batches to avoid unnecessary round-trip costs for remote
> access
> - Disaggregated State Store (FLIP-427)[5]: Introduce the initial version of
> the ForSt disaggregated state store.
> - Fault Tolerance/Rescale Integration (FLIP-428)[6]: Integrate
> checkpointing mechanisms with the disaggregated state store for fault
> tolerance and fast rescaling.
>
> We will vote on each FLIP in separate threads to make sure each FLIP
> reaches a consensus. But we want to keep the discussion within a focused
> thread (this thread) for easier tracking of contexts to avoid duplicated
> questions/discussions and also to think of the problem/solution in a full
> picture.
>
> Looking forward to your feedback
>
> Best,
> Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
>
> [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> [6] https://cwiki.apache.org/confluence/x/UYp3EQ
>


Re: Making CollectSinkFunction to wait till all results are consumed

2024-02-14 Thread Piotr Nowojski
Hi!

Interesting catch. I think the current master branch behaviour is broken.
The chance to lose some records on `endInput`
is simply a critical bug. The limited buffer size is still problematic, as
it can surprise users. Having said that, the newly
proposed behaviour without any buffer is also problematic.

Maybe whenever the user wants to collect the results, before or when
calling `env.execute()` we should spawn a new
thread that would asynchronously collect results from the
`CollectSinkFunction`? I'm not sure but maybe hooking this
logic up to whenever `CollectStreamSink` is being used is the way to go?

One thing I'm not sure about is whether there are scenarios/existing code
paths where someone wants to use the
`CollectSinkFunction` but doesn't want the results to be read
automatically? And if there are, can we tell them apart?

Best,
Piotrek

pon., 12 lut 2024 o 08:48 Alexey Leonov-Vendrovskiy 
napisał(a):

> Hey all,
>
> We propose to slightly change the behavior of the CollectSinkFunction
> <
> https://github.com/apache/flink/blob/6f4d31f1b79afbde6c093b5d40ac83fe7524e303/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L129
> >
> and make it wait till all the result from the buffer is consumed by the
> client, before shutting it down.
>
> Overall protocol and all the other behavior stay the same.
>
> This would be a way to guarantee result availability upon the job
> completion. Today, the tail of the result is stored in an accumulator, and
> gets stored in the job manager. There is an opportunity for this part of
> the result to get lost, after the job is claimed to be
> successfully "completed". Waiting till all the results are consumed while
> the job is running is a natural way to achieve availability. Once the job
> is done, we are certain all the results are consumed.
>
> This change would be achieved by overriding the endInput() method
> in CollectSinkOperator, and passing the call to CollectSinkFunction to wait
> till the buffer is empty.
>
> The old behavior could be enabled via a configuration flag (to be added).
>
> A notable side-effect of the change is that any invocation
> of StreamExecutionEnvironment.execute() (synchronous execution) with a
> pipeline with CollectSinkFunction in it, would effectively block waiting
> for the results to get consumed. This would require running the consumer on
> a different thread. Though note, it is* already the case* when the result
> is larger that what can fit into the CollectSinkFunction's buffer.  Take a
> look at flink-end-to-end-tests/test-scripts/test_quickstarts.sh in the
> current state of the repo: if we change the parameter numRecords to be
> 1,000,000, the test locks and waits forever. So, the only difference with
> the change would be that in similar setups it would wait on any buffer size
> > 0. It makes behavior consistent for results of any non-zero size.
>
>
> Let me know your thoughts.
>
> Thanks,
> Alexey
>


[jira] [Created] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used

2024-02-12 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-34430:
--

 Summary: Akka frame size exceeded with many ByteStreamStateHandle 
being used
 Key: FLINK-34430
 URL: https://issues.apache.org/jira/browse/FLINK-34430
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.1, 1.17.2, 1.16.3, 1.19.0
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for improved state compatibility on parallelism change

2024-02-07 Thread Piotr Nowojski
Hey

> AFAIK, there's no way to set UIDs for a SQL job,

AFAIK you can't set UID manually, but  Flink SQL generates a compiled plan
of a query with embedded UIDs. As I understand it, using a compiled plan is
the preferred (only?) way for Flink SQL if one wants to make any changes to
query later on or support Flink's runtime upgrades, without losing the
state.

If that's the case, what would be the usefulness of this FLIP? Only for
DataStream API for users that didn't know that they should have manually
configured UIDs? But they have the workaround to actually post-factum add
the UIDs anyway, right? So maybe indeed Chesnay is right that this FLIP is
not that helpful/worth the extra effort?

Best,
Piotrek

czw., 8 lut 2024 o 03:55 Zhanghao Chen 
napisał(a):

> Hi Chesnay,
>
> AFAIK, there's no way to set UIDs for a SQL job, it'll be great if you can
> share how you allow UID setting for SQL jobs. We've explored providing a
> visualized DAG editor for SQL jobs that allows UID setting on our internal
> platform, but most users found it too complicated to use. Another
> possible way is to utilize SQL hints, but that's complicated as well. From
> our experience, many SQL users are not familiar with Flink, what they want
> is an experience similar to writing a normal SQL in MySQL, without
> involving much extra concepts like the DAG and the UID. In fact, some
> DataStream and PyFlink users also share the same concern.
>
> On the other hand, some performance-tuning is inevitable for a
> long-running jobs in production, and parallelism tuning is among the most
> common techniques. FLIP-367 [1] and FLIP-146 [2] allow user to tune the
> parallelism of source and sinks, and both are well-received in the
> discussion thread. Users definitely don't want to lost state after a
> parallelism tuning, which is highly risky at present.
>
> Putting these together, I think the FLIP has a high value in production.
> Through offline discussion, I leant that multiple companies have developed
> or trying to develop similar hasher changes in their internal distribution,
> including ByteDance, Xiaohongshu, and Bilibili. It'll be great if we can
> improve the SQL experience for all community users as well, WDYT?
>
> Best,
> Zhanghao Chen
> --
> *From:* Chesnay Schepler 
> *Sent:* Thursday, February 8, 2024 2:01
> *To:* dev@flink.apache.org ; Zhanghao Chen <
> zhanghao.c...@outlook.com>; Piotr Nowojski ; Yu
> Chen 
> *Subject:* Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> generation for improved state compatibility on parallelism change
>
> The FLIP is a bit weird to be honest. It only applies in cases where
> users haven't set uids, but that goes against best-practices and as far
> as I'm told SQL also sets UIDs everywhere.
>
> I'm wondering if this is really worth the effort.
>
> On 07/02/2024 10:23, Zhanghao Chen wrote:
> > After offline discussion with @Yu Chen<mailto:yuchen.e...@gmail.com
> >, I've updated the FLIP [1] to include a design
> that allows for compatible hasher upgrade by adding StreamGraphHasherV2 to
> the legacy hasher list, which is actually a revival of the idea from
> FLIP-5290 [2] when StreamGraphHasherV2 was introduced in Flink 1.2. We're
> targeting to make V3 the default hasher in Flink 1.20 given that
> state-compatibility is no longer an issue. Take a review when you have a
> chance, and I'd like to especially thank @Yu Chen<
> mailto:yuchen.e...@gmail.com > for the through
> offline discussion and code debugging help to make this possible.
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-411%3A+Chaining-agnostic+Operator+ID+generation+for+improved+state+compatibility+on+parallelism+change
> > [2] https://issues.apache.org/jira/browse/FLINK-5290
> >
> > Best,
> > Zhanghao Chen
> > 
> > From: Zhanghao Chen 
> > Sent: Friday, January 12, 2024 10:46
> > To: Piotr Nowojski ; Yu Chen <
> yuchen.e...@gmail.com>
> > Cc: dev@flink.apache.org 
> > Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> generation for improved state compatibility on parallelism change
> >
> > Thanks for the input, Piotr. It might still be possible to make it
> compatible with the old snapshots, following the direction of FLINK-5290<
> https://issues.apache.org/jira/browse/FLINK-5290> suggested by Yu. I'll
> discuss with Yu on more details.
> >
> > Best,
> > Zhanghao Chen
> > 
> > From: Piotr Nowojski 
> > Sent: Friday, January 12, 2024 1:55
> > To: Yu Chen 
> > Cc: Zhanghao Chen ; dev@flink.apache.org <
> dev@flink.apache.org>
> > Subject: 

Re: [DISCUSS] Alternative way of posting FLIPs

2024-02-07 Thread Piotr Nowojski
+1 for the first option as well

Best,
Piotrek

śr., 7 lut 2024 o 16:48 Matthias Pohl 
napisał(a):

> +1 for option 1 since it's a reasonable temporary workaround
>
> Moving to GitHub discussions would either mean moving the current FLIP
> collection or having the FLIPs in two locations. Both options do not seem
> to be optimal. Another concern I had was that GitHub Discussions wouldn't
> allow integrating diagrams that easily. But it looks like they support
> Mermaid [1] for diagrams.
>
> One flaw of the GoogleDocs approach is, though, that we have to rely on
> diagrams being provided as PNG/JPG/SVG rather than draw.io diagrams.
> draw.io
> is more tightly integrated with the Confluence wiki which allows
> editing/updating diagrams in the wiki rather than using some external tool.
> Google Draw is also not that convenient to use in my opinion. Anyway,
> that's a minor issue, I guess.
>
> Matthias
>
> [1]
>
> https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/creating-diagrams
>
> On Wed, Feb 7, 2024 at 3:30 PM Lincoln Lee  wrote:
>
> > Thanks Martijn moving this forward!
> >
> > +1 for the first solution, because as of now it looks like this is a
> > temporary solution and we're still looking forward to the improvement by
> > ASF Infra, when the access is ok for contributors, we can back to the
> > current workflow.
> >
> > For solution 2, one visible downside is that it becomes inconvenient to
> > look for flips (unless we permanently switch to github discussion).
> >
> > Looking forward to hearing more thoughts.
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Martijn Visser  于2024年2月7日周三 21:51写道:
> >
> > > Hi all,
> > >
> > > ASF Infra has confirmed to me that only ASF committers can access the
> > > ASF Confluence site since a recent change. One of the results of this
> > > decision is that users can't signup and access Confluence, so only
> > > committers+ can create FLIPs.
> > >
> > > ASF Infra hopes to improve this situation when they move to the Cloud
> > > shortly (as in: some months), but they haven't committed on an actual
> > > date. The idea would be that we find a temporary solution until anyone
> > > can request access to Confluence.
> > >
> > > There are a couple of ways we could resolve this situation:
> > > 1. Contributors create a Google Doc and make that view-only, and post
> > > that Google Doc to the mailing list for a discussion thread. When the
> > > discussions have been resolved, the contributor ask on the Dev mailing
> > > list to a committer/PMC to copy the contents from the Google Doc, and
> > > create a FLIP number for them. The contributor can then use that FLIP
> > > to actually have a VOTE thread.
> > > 2. We could consider moving FLIPs to "Discussions" on Github, like
> > > Airflow does at https://github.com/apache/airflow/discussions
> > > 3. Perhaps someone else has another good idea.
> > >
> > > Looking forward to your thoughts.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> >
>


Re: [DISCUSS] FLIP-395: Deprecate Global Aggregator Manager

2024-01-17 Thread Piotr Nowojski
Hi,

Sorry for the late response. +1 for deprecating it.

It would be even better to just remove it from the code base, but would
require a little bit of investigation in the kinesis connector [1], if this
feature can be safely removed from the kinesis connector in favour of the
generic watermark alignment.

Best,
Piotrek

[1]
https://github.com/apache/flink-connector-aws/blob/38aafb44d3a8200e4ff41d87e0780338f40da258/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java#L40



pon., 4 gru 2023 o 11:54 Zhanghao Chen 
napisał(a):

> Hi Benchao,
>
> I think part of the reason is that a general global coordination mechanism
> is complex and hence subject to some internals changes in the future.
> Instead of directly exposing the full mechanism to users, it might be
> better to expose some well-defined subset of the feature set to users.
>
> I'm also ccing the email to Piotr and David for their suggestions on this.
>
> Best,
> Zhanghao Chen
> 
> From: Benchao Li 
> Sent: Monday, November 27, 2023 13:03
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-395: Deprecate Global Aggregator Manager
>
> +1 for the idea.
>
> Currently OperatorCoordinator is still marked as @Internal, shouldn't
> it be a public API already?
>
> Besides, GlobalAggregatorManager supports coordination between
> different operators, but OperatorCoordinator only supports
> coordination within one operator. And CoordinatorStore introduced in
> FLINK-24439 opens the door for multi operators. Again, should it also
> be a public API too?
>
> Weihua Hu  于2023年11月27日周一 11:05写道:
> >
> > Thanks Zhanghao for driving this FLIP.
> >
> > +1 for this.
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, Nov 20, 2023 at 5:49 PM Zhanghao Chen  >
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion of FLIP-395: Deprecate Global Aggregator
> > > Manager [1].
> > >
> > > Global Aggregate Manager was introduced in [2] to support event time
> > > synchronization across sources and more generally, coordination of
> parallel
> > > tasks. AFAIK, this was only used in the Kinesis source for an early
> version
> > > of watermark alignment. Operator Coordinator, introduced in FLIP-27,
> > > provides a more powerful and elegant solution for that need and is
> part of
> > > the new source API standard. FLIP-217 further provides a complete
> solution
> > > for watermark alignment of source splits on top of the Operator
> Coordinator
> > > mechanism. Furthermore, Global Aggregate Manager manages state in
> JobMaster
> > > object, causing problems for adaptive parallelism changes [3].
> > >
> > > Therefore, I propose to deprecate the use of Global Aggregate Manager,
> > > which can improve the maintainability of the Flink codebase without
> > > compromising its functionality.
> > >
> > > Looking forward to your feedbacks, thanks.
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-395%3A+Deprecate+Global+Aggregator+Manager
> > > [2] https://issues.apache.org/jira/browse/FLINK-10886
> > > [3] https://issues.apache.org/jira/browse/FLINK-31245
> > >
> > > Best,
> > > Zhanghao Chen
> > >
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: [DISCUSS] FLIP-416: Deprecate and remove the RestoreMode#LEGACY

2024-01-15 Thread Piotr Nowojski
+1 good idea!

pon., 15 sty 2024 o 05:11 Jinzhong Li  napisał(a):

> Hi Zakelly,
>
> Thanks for driving the discussion. It makes sense to remove  LEGACY mode in
> Flink 2.0.
>
> Best,
> Jinzhong Li
>
> On Mon, Jan 15, 2024 at 10:34 AM Xuannan Su  wrote:
>
> > Hi Zakelly,
> >
> > Thanks for driving this. +1 to removing the LEGACY mode.
> >
> > Best regards,
> > Xuannan
> >
> > On Mon, Jan 15, 2024 at 3:22 AM Danny Cranmer 
> > wrote:
> > >
> > > +1 to removing LEGACY mode in Flink 2.0. Thanks for driving.
> > >
> > > Danny,
> > >
> > > On Sat, 13 Jan 2024, 08:20 Yanfei Lei,  wrote:
> > >
> > > > Thanks Zakelly for starting this discussion.
> > > >
> > > > Regardless of whether it is for users or developers, deprecating
> > > > RestoreMode#LEGACY makes the semantics clearer and lower maintenance
> > > > costs, and Flink 2.0 is a good time point to do this.
> > > > So +1 for the overall idea.
> > > >
> > > > Best,
> > > > Yanfei
> > > >
> > > > Zakelly Lan  于2024年1月11日周四 14:57写道:
> > > >
> > > > >
> > > > > Hi devs,
> > > > >
> > > > > I'd like to start a discussion on FLIP-416: Deprecate and remove
> the
> > > > > RestoreMode#LEGACY[1].
> > > > >
> > > > > The FLIP-193[2] introduced two modes of state file ownership during
> > > > > checkpoint restoration: RestoreMode#CLAIM and RestoreMode#NO_CLAIM.
> > The
> > > > > LEGACY mode, which was how Flink worked until 1.15, has been
> > superseded
> > > > by
> > > > > NO_CLAIM as the default mode. The main drawback of LEGACY mode is
> > that
> > > > the
> > > > > new job relies on artifacts from the old job without cleaning them
> > up,
> > > > > leaving users uncertain about when it is safe to delete the old
> > > > checkpoint
> > > > > directories. This leads to the accumulation of unnecessary
> checkpoint
> > > > files
> > > > > that are never cleaned up. Considering cluster availability and job
> > > > > maintenance, it is not recommended to use LEGACY mode. Users could
> > choose
> > > > > the other two modes to get a clear semantic for the state file
> > ownership.
> > > > >
> > > > > This FLIP proposes to deprecate the LEGACY mode and remove it
> > completely
> > > > in
> > > > > the upcoming Flink 2.0. This will make the semantic clear as well
> as
> > > > > eliminate many bugs caused by mode transitions involving LEGACY
> mode
> > > > (e.g.
> > > > > FLINK-27114 [3]) and enhance code maintainability.
> > > > >
> > > > > Looking forward to hearing from you!
> > > > >
> > > > > [1] https://cwiki.apache.org/confluence/x/ookkEQ
> > > > > [2] https://cwiki.apache.org/confluence/x/bIyqCw
> > > > > [3] https://issues.apache.org/jira/browse/FLINK-27114
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > >
> >
>


Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation for improved state compatibility on parallelism change

2024-01-11 Thread Piotr Nowojski
Hi,

Using unaligned checkpoints is orthogonal to this FLIP.

Yes, unaligned checkpoints are not supported for pointwise connections, so
most of the cases go away anyway.
It is possible to switch from unchained to chained subtasks by removing a
keyBy exchange, and this would be
a problem, but that's just one of the things that we claim that unaligned
checkpoints do not support [1]. But as
I stated above, this is an orthogonal issue to this FLIP.

Regarding the proposal itself, generally speaking it makes sense to me as
well. However I'm quite worried about
the compatibility and/or migration path. The:
> (v2.0) Make HasherV3 the default hasher, mark HasherV2 deprecated.

step would break the compatibility with Flink 1.xx snapshots. But as this
is for v2.0, maybe that's not the end of
the world?

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/#capabilities-and-limitations

czw., 11 sty 2024 o 12:10 Yu Chen  napisał(a):

> Hi Zhanghao,
>
> Actually, Stefan has done similar compatibility work in the early
> FLINK-5290[1], where he introduced the legacyStreamGraphHashers list for
> hasher backward compatibility.
>
> We have attempted to implement a similar feature in the internal version
> of FLINK and tried to include the new hasher as part of the
> legacyStreamGraphHashers,
> which would ensure that the corresponding Operator State could be found at
> restore while ignoring the chaining condition(without changing the default
> hasher).
>
> However, we have found that such a solution may lead to some unexpected
> situations in some cases. While I have no time to find out the root cause
> recently.
>
> If you're interested, I'd be happy to discuss it with you and try to solve
> the problem.
>
> [1] https://issues.apache.org/jira/browse/FLINK-5290
>
> Best,
> Yu Chen
>
>
>
> 2024年1月11日 15:07,Zhanghao Chen  写道:
>
> Hi Yu,
>
> I haven't thought too much about the compatibility design before. By the
> nature of the problem, it's impossible to make V3 compatible with V2, what
> we can do is to somewhat better inform users when switching the hasher, but
> I don't have any good idea so far. Do you have any suggestions on this?
>
> Best,
> Zhanghao Chen
> ------
> *From:* Yu Chen 
> *Sent:* Thursday, January 11, 2024 13:52
> *To:* dev@flink.apache.org 
> *Cc:* Piotr Nowojski ; zhanghao.c...@outlook.com
> 
> *Subject:* Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID
> generation for improved state compatibility on parallelism change
>
> Hi Zhanghao,
>
> Thanks for driving this, that’s really painful for us when we need to
> switch config `pipeline.operator-chaining`.
>
> But I have a Concern, according to FLIP description, modifying
> `isChainable` related code in `StreamGraphHasherV2` will cause the
> generated operator id to be changed, which will result in the user unable
> to recover from the old state (old and new Operator IDs can't be mapped).
> Therefore switching Hasher strategy (V2->V3 or V3->V2) will lead to an
> incompatibility, is there any relevant compatibility design considered?
>
> Best,
> Yu Chen
>
> 2024年1月10日 10:25,Zhanghao Chen  写道:
>
> Hi David,
>
> Thanks for the comments. AFAIK, unaligned checkpoints are disabled for
> pointwise connections according to [1], let's wait Piotr for confirmation.
> The issue itself is not directly related to this proposal as well. If a
> user manually specifies UIDs for each of the chained operators and has
> unaligned checkpoints enabled, we will encounter the same issue if they
> decide to break the chain on a later restart and try to recover from a
> retained cp.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/
>
>
> Best,
> Zhanghao Chen
> 
> From: David Morávek 
> Sent: Wednesday, January 10, 2024 6:26
> To: dev@flink.apache.org ; Piotr Nowojski <
> piotr.nowoj...@gmail.com>
> Subject: Re: [DISCUSS] FLIP 411: Chaining-agnostic Operator ID generation
> for improved state compatibility on parallelism change
>
> Hi Zhanghao,
>
> Thanks for the FLIP. What you're proposing makes a lot of sense +1
>
> Have you thought about how this works with unaligned checkpoints in case
> you go from unchained to chained? I think it should be fine because this
> scenario should only apply to forward/rebalance scenarios where we, as far
> as I recall, force alignment anyway, so there should be no exchanges to
> snapshot. It might just work, but something to double-check. Maybe @Piotr
> Nowojski  could confirm it.
>
> Best,
> D.
>
> On Wed, Jan 3, 2024 at 7:10 

Re: [DISCUSS] FLIP-414: Support Retry Mechanism in RocksDBStateDataTransfer

2024-01-11 Thread Piotr Nowojski
Hi,

Thanks for the proposal. I second the Hangxiang's suggestions.

I think this might be valuable. Instead of retrying the whole checkpoint,
it will be more resource efficient
to retry upload of a single file.

Regarding re-using configuration options, a while back we introduced
`taskmanager.network.retries`
config option. It was hoped to eventually encompass things like this.

My own concern is if we should retry regardless of the exception type, or
should we focus on things like
connection loss/host unreachable? All in all, it would be better to not
retry upload if the failure was:
- `FileSystem` for given schema not found
- authorisation failed
- lack of write rights
- ...

Best,
Piotrek




czw., 11 sty 2024 o 10:35 Hangxiang Yu  napisał(a):

> Thanks for driving this.
> Retry mechanism is common when we want to get or put data by network.
> So I think it will help when checkpoint failure due to temporary network
> problems, of course it may increase a bit overhead for some other reasons.
>
> Some comments and suggestions:
> 1. Since Flink has a checkpoint mechanism to retry failed checkpoint
> coarsely, I think it looks good to me if this fine-grained retry could be
> configurable and don't change the current default mechanism.
> 2. This should work with the checkpoint procedure of all state backends,
> Could we make this config unrelated to a specific state backend (maybe
> execution.checkpointing.xxx)?  Then it could be supported by below state
> backends.
> 3. We may not need to re-implement it. There are some tools supporting the
> Retry mechanism (see RetryingExecutor and RetryPolicy in changelog dstl
> module), it's better to make them become more common tools and reuse them.
>
> On Thu, Jan 11, 2024 at 3:09 PM yue ma  wrote:
>
> > Thanks for driving this effort, xiangyu!
> > The proposal overall LGTM.
> > I just have a small question. There are other places in Flink that
> interact
> > with external storage. Should we consider adding a general retry
> mechanism
> > to them?
> >
> > xiangyu feng  于2024年1月8日周一 11:31写道:
> >
> > > Hi devs,
> > >
> > > I'm opening this thread to discuss FLIP-414: Support Retry Mechanism in
> > > RocksDBStateDataTransfer[1].
> > >
> > > Currently, there is no retry mechanism for downloading and uploading
> > > RocksDB state files. Any jittering of remote filesystem might lead to a
> > > checkpoint failure. By supporting retry mechanism in
> > > `RocksDBStateDataTransfer`, we can significantly reduce the failure
> rate
> > of
> > > checkpoint during asynchronous phrase.
> > >
> > > To make this retry mechanism configurable, we have introduced two
> options
> > > in this FLIP: `state.backend.rocksdb.checkpoint.transfer.retry.times`
> > and `
> > > state.backend.rocksdb.checkpoint.transfer.retry.interval`. The default
> > > behavior remains to be no retry will be performed in order to be
> > consistent
> > > with the original behavior.
> > >
> > > Looking forward to your feedback, thanks.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-414%3A+Support+Retry+Mechanism+in+RocksDBStateDataTransfer
> > >
> > > Best regards,
> > > Xiangyu Feng
> > >
> >
> >
> > --
> > Best,
> > Yue
> >
>
>
> --
> Best,
> Hangxiang.
>


Re: Re: [VOTE] Accept Flink CDC into Apache Flink

2024-01-10 Thread Piotr Nowojski
+1 (binding)

śr., 10 sty 2024 o 11:25 Martijn Visser 
napisał(a):

> +1 (binding)
>
> On Wed, Jan 10, 2024 at 4:43 AM Xingbo Huang  wrote:
> >
> > +1 (binding)
> >
> > Best,
> > Xingbo
> >
> > Dian Fu  于2024年1月10日周三 11:35写道:
> >
> > > +1 (binding)
> > >
> > > Regards,
> > > Dian
> > >
> > > On Wed, Jan 10, 2024 at 5:09 AM Sharath  wrote:
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Best,
> > > > Sharath
> > > >
> > > > On Tue, Jan 9, 2024 at 1:02 PM Venkata Sanath Muppalla <
> > > sanath...@gmail.com>
> > > > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Thanks,
> > > > > Sanath
> > > > >
> > > > > On Tue, Jan 9, 2024 at 11:16 AM Peter Huang <
> > > huangzhenqiu0...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > >
> > > > > > Best Regards
> > > > > > Peter Huang
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 9, 2024 at 5:26 AM Jane Chan 
> > > wrote:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > Best,
> > > > > > > Jane
> > > > > > >
> > > > > > > On Tue, Jan 9, 2024 at 8:41 PM Lijie Wang <
> > > wangdachui9...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Lijie
> > > > > > > >
> > > > > > > > Jiabao Sun  于2024年1月9日周二
> > > 19:28写道:
> > > > > > > >
> > > > > > > > > +1 (non-binding)
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Jiabao
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 2024/01/09 09:58:04 xiangyu feng wrote:
> > > > > > > > > > +1 (non-binding)
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Xiangyu Feng
> > > > > > > > > >
> > > > > > > > > > Danny Cranmer  于2024年1月9日周二 17:50写道:
> > > > > > > > > >
> > > > > > > > > > > +1 (binding)
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Danny
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Jan 9, 2024 at 9:31 AM Feng Jin <
> ji...@gmail.com>
> > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Feng Jin
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Jan 9, 2024 at 5:29 PM Yuxin Tan <
> > > ta...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best,
> > > > > > > > > > > > > Yuxin
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Márton Balassi  于2024年1月9日周二
> 17:25写道:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > +1 (binding)
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Jan 9, 2024 at 10:15 AM Leonard Xu <
> > > > > > xb...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > +1(binding)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > Leonard
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2024年1月9日 下午5:08,Yangze Guo  >
> > > 写道:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > Yangze Guo
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Tue, Jan 9, 2024 at 5:06 PM Robert
> Metzger <
> > > > > > > > > > > rmetz...@apache.org
> > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> +1 (binding)
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >> On Tue, Jan 9, 2024 at 9:54 AM Guowei Ma <
> > > > > > > gu...@gmail.com
> > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > >>> +1 (binding)
> > > > > > > > > > > > > > > >>> Best,
> > > > > > > > > > > > > > > >>> Guowei
> > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > > >>> On Tue, Jan 9, 2024 at 4:49 PM Rui Fan <
> > > > > > > 19...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > >  +1 (non-binding)
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > >  Best,
> > > > > > > > > > > > > > >  Rui
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > >  On Tue, Jan 9, 2024 at 4:41 PM Hang Ruan <
> > > > > > > > > > > > ruanhang1...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > Hang
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > gongzhongqiang 
> > > 于2024年1月9日周二
> > > > > > > > > > > > 16:25写道:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >> +1 non-binding
> > > > > > > > > > > 

Re: Re: FLIP-413: Enable unaligned checkpoints by default

2024-01-09 Thread Piotr Nowojski
Hi!

I think this warning from the documentation is a bit over the top. Yes,
unaligned checkpoints in that regard are adding an extra source of
indeterminism, however please note that Flink doesn't give any guarantees
that the results will be the same after a recovery, as the order of the
records can change. AFAIR this warning has been added from the context of
the non-keyed exchanges, but some time later we were forced to disable
unaligned checkpoints on such exchanges. Which probably makes this warning
unnecessary.

Best,
Piotrek

wt., 9 sty 2024 o 02:05 Mason Chen  napisał(a):

> Hi Piotr,
>
> I also agree with Zhanghao's assessment on the limitations of unaligned
> checkpoints. Some of them are already handled properly by Flink, but in the
> case of the "Interplay with watermarks" limitation, it is quite confusing
> for a new user to find that their code doesn't generate consistent results
> with the default checkpoint configuration. Is there a way for Flink to
> detect and handle this situation correctly?
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/#limitations
>
> Best,
> Mason
>
> On Mon, Jan 8, 2024 at 2:01 AM yangpmldl  wrote:
>
> > 退订
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > At 2024-01-08 17:45:01, "Piotr Nowojski"  wrote:
> > >Hi thanks for the responses,
> > >
> > >And thanks for pointing out the jobs upgrade issue. Indeed that has
> > >slipped my mind. I was mistakenly
> > >thinking that we are supporting all upgrades only via savepoint. Anyway,
> > >maybe in that case we should
> > >guide users towards that? Using savepoints for upgrades? That would be
> > even
> > >easier to understand
> > >for the users:
> > >- use unaligned checkpoints for checkpoints
> > >- use savepoints for any changes in the job/version upgrades
> > >
> > >There is a downside, that savepoints are always full, while aligned
> > >checkpoints can be incremental.
> > >
> > >WDYT?
> > >
> > >Regarding the value for the timeout, I would also be fine with 30s.
> Indeed
> > >that's a safer default.
> > >
> > >> On a separate point, in the sentence below it seems to me it would be
> > >> clearer to say that in the unlikely scenario you've described, the
> > change
> > >> would "significantly increase checkpoint sizes" -- assuming I
> understand
> > >> things correctly.
> > >
> > >I've reworded that paragraph.
> > >
> > >Best,
> > >Piotrek
> > >
> > >
> > >
> > >pon., 8 sty 2024 o 08:02 Rui Fan <1996fan...@gmail.com> napisał(a):
> > >
> > >> Thanks to Piotr driving this proposal!
> > >>
> > >> Enabling unaligned checkpoint with aligned checkpoints timeout
> > >> is fine for me. I'm not sure if aligned checkpoints timeout =5s is
> > >> too aggressive. If the unaligned checkpoint is enabled by default
> > >> for all jobs, I recommend that the aligned checkpoints timeout be
> > >> at least 30s.
> > >>
> > >> If the 30s is too big for some of the flink jobs, flink users can turn
> > >> it down by themselves.
> > >>
> > >> To David, Ken and Zhanghao:
> > >>
> > >> Unaligned checkpoint indeed has some limitations than aligned
> > checkpoint,
> > >> but if we set aligned checkpoints timeout= 30s or 60s, it means
> > >> when a job can be completed within 30s or 60s, this job still uses the
> > >> aligned checkpoint (it doesn't introduce any extra effort).
> > >> When the checkpoint cannot be completed within aligned checkpoints
> > timeout,
> > >> the aligned checkpoint will be switched to the unaligned checkpoint
> > >> The unaligned checkpoint can be completed when backpressure is severe.
> > >>
> > >> In brief, when backpressure is low, enabling them without any effort.
> > >> when backpressure is high, enabling them has some benefits.
> > >>
> > >> So I think it doesn't have too many risks when aligned checkpoints
> > timeout
> > >> is set to 30s or above. WDYT?
> > >>
> > >> Best,
> > >> Rui
> > >>
> > >> On Mon, Jan 8, 2024 at 12:57 PM Zhanghao Chen <
> > zhanghao.c...@outlook.com>
> > >> wrote:
> > >>
> > >> > Hi Piot

Re: FLIP-413: Enable unaligned checkpoints by default

2024-01-08 Thread Piotr Nowojski
Hi thanks for the responses,

And thanks for pointing out the jobs upgrade issue. Indeed that has
slipped my mind. I was mistakenly
thinking that we are supporting all upgrades only via savepoint. Anyway,
maybe in that case we should
guide users towards that? Using savepoints for upgrades? That would be even
easier to understand
for the users:
- use unaligned checkpoints for checkpoints
- use savepoints for any changes in the job/version upgrades

There is a downside, that savepoints are always full, while aligned
checkpoints can be incremental.

WDYT?

Regarding the value for the timeout, I would also be fine with 30s. Indeed
that's a safer default.

> On a separate point, in the sentence below it seems to me it would be
> clearer to say that in the unlikely scenario you've described, the change
> would "significantly increase checkpoint sizes" -- assuming I understand
> things correctly.

I've reworded that paragraph.

Best,
Piotrek



pon., 8 sty 2024 o 08:02 Rui Fan <1996fan...@gmail.com> napisał(a):

> Thanks to Piotr driving this proposal!
>
> Enabling unaligned checkpoint with aligned checkpoints timeout
> is fine for me. I'm not sure if aligned checkpoints timeout =5s is
> too aggressive. If the unaligned checkpoint is enabled by default
> for all jobs, I recommend that the aligned checkpoints timeout be
> at least 30s.
>
> If the 30s is too big for some of the flink jobs, flink users can turn
> it down by themselves.
>
> To David, Ken and Zhanghao:
>
> Unaligned checkpoint indeed has some limitations than aligned checkpoint,
> but if we set aligned checkpoints timeout= 30s or 60s, it means
> when a job can be completed within 30s or 60s, this job still uses the
> aligned checkpoint (it doesn't introduce any extra effort).
> When the checkpoint cannot be completed within aligned checkpoints timeout,
> the aligned checkpoint will be switched to the unaligned checkpoint
> The unaligned checkpoint can be completed when backpressure is severe.
>
> In brief, when backpressure is low, enabling them without any effort.
> when backpressure is high, enabling them has some benefits.
>
> So I think it doesn't have too many risks when aligned checkpoints timeout
> is set to 30s or above. WDYT?
>
> Best,
> Rui
>
> On Mon, Jan 8, 2024 at 12:57 PM Zhanghao Chen 
> wrote:
>
> > Hi Piotr,
> >
> > As a platform administer who runs kilos of Flink jobs, I'd be against the
> > idea to enable unaligned cp by default for our jobs. It may help a
> > significant portion of the users, but the subtle issues around unaligned
> CP
> > for a few jobs will probably raise a lot more on-calls and incidents.
> From
> > my point of view, we'd better not enable it by default before removing
> all
> > the limitations listed in
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/checkpointing_under_backpressure/#limitations
> > .
> >
> > Best,
> > Zhanghao Chen
> > 
> > From: Piotr Nowojski 
> > Sent: Friday, January 5, 2024 21:41
> > To: dev 
> > Subject: FLIP-413: Enable unaligned checkpoints by default
> >
> > Hi!
> >
> > I would like to propose by default to enable unaligned checkpoints and
> also
> > simultaneously increase the aligned checkpoints timeout from 0ms to 5s. I
> > think this change is the right one to do for the majority of Flink users.
> >
> > For more rationale please take a look into the short FLIP-413 [1].
> >
> > What do you all think?
> >
> > Best,
> > Piotrek
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-413%3A+Enable+unaligned+checkpoints+by+default
> >
>


Re: [VOTE] Release flink-shaded 18.0, release candidate #1

2024-01-08 Thread Piotr Nowojski
Hi!

+1 (binding)

Best,
Piotrek

pt., 5 sty 2024 o 00:58 Sergey Nuyanzin  napisał(a):

> Bubble up, we need more votes, especially from PMC members.
>
> On Thu, Dec 28, 2023 at 1:29 PM Martijn Visser 
> wrote:
>
> > Hi,
> >
> > +1 (binding)
> >
> > - Validated hashes
> > - Verified signature
> > - Verified that no binaries exist in the source archive
> > - Build the source with Maven
> > - Verified licenses
> > - Verified web PRs
> >
> > Best regards,
> >
> > Martijn
> >
> > On Mon, Dec 11, 2023 at 12:11 AM Sergey Nuyanzin 
> > wrote:
> > >
> > > Hey everyone,
> > >
> > > The vote for flink-shaded 18.0 is still open. Please test and vote for
> > > rc1, so that we can release it.
> > >
> > > On Thu, Nov 30, 2023 at 4:03 PM Jing Ge 
> > wrote:
> > >
> > > > +1(not binding)
> > > >
> > > > - validate checksum
> > > > - validate hash
> > > > - checked the release notes
> > > > - verified that no binaries exist in the source archive
> > > > - build the source with Maven 3.8.6 and jdk11
> > > > - checked repo
> > > > - checked tag
> > > > - verified web PR
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Thu, Nov 30, 2023 at 11:39 AM Sergey Nuyanzin <
> snuyan...@gmail.com>
> > > > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > - Downloaded all the resources
> > > > > - Validated checksum hash
> > > > > - Build the source with Maven and jdk8
> > > > > - Build Flink master with new flink-shaded and check that all the
> > tests
> > > > are
> > > > > passing
> > > > >
> > > > > one minor thing that I noticed during releasing: for ci it uses
> maven
> > > > 3.8.6
> > > > > at the same time for release profile there is an enforcement plugin
> > to
> > > > > check that maven version is less than 3.3
> > > > > I created a jira issue[1] for that
> > > > > i made the release with 3.2.5 maven version (I suppose previous
> > version
> > > > was
> > > > > also done with 3.2.5 because of same issue)
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-33703
> > > > >
> > > > > On Wed, Nov 29, 2023 at 11:41 AM Matthias Pohl <
> > matthias.p...@aiven.io>
> > > > > wrote:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > * Downloaded all resources
> > > > > > * Extracts sources and compilation on these sources
> > > > > > * Diff of git tag checkout with downloaded sources
> > > > > > * Verifies SHA512 checksums & GPG certification
> > > > > > * Checks that all POMs have the right expected version
> > > > > > * Generated diffs to compare pom file changes with NOTICE files:
> > > > Nothing
> > > > > > suspicious found except for a minor (non-blocking) typo [1]
> > > > > >
> > > > > > Thanks for driving this effort, Sergey. :)
> > > > > >
> > > > > > [1]
> > https://github.com/apache/flink-shaded/pull/126/files#r1409080162
> > > > > >
> > > > > > On Wed, Nov 29, 2023 at 10:25 AM Rui Fan <1996fan...@gmail.com>
> > wrote:
> > > > > >
> > > > > >> Sorry, it's non-binding.
> > > > > >>
> > > > > >> On Wed, Nov 29, 2023 at 5:19 PM Rui Fan <1996fan...@gmail.com>
> > wrote:
> > > > > >>
> > > > > >> > Thanks Matthias for the clarification!
> > > > > >> >
> > > > > >> > After I import the latest KEYS, it works fine.
> > > > > >> >
> > > > > >> > +1(binding)
> > > > > >> >
> > > > > >> > - Validated checksum hash
> > > > > >> > - Verified signature
> > > > > >> > - Verified that no binaries exist in the source archive
> > > > > >> > - Build the source with Maven and jdk8
> > > > > >> > - Verified licenses
> > > > > >> > - Verified web PRs, and left a comment
> > > > > >> >
> > > > > >> > Best,
> > > > > >> > Rui
> > > > > >> >
> > > > > >> > On Wed, Nov 29, 2023 at 5:05 PM Matthias Pohl
> > > > > >> >  wrote:
> > > > > >> >
> > > > > >> >> The key is the last key in the KEYS file. It's just having a
> > > > > different
> > > > > >> >> format with spaces being added (due to different gpg
> > versions?):
> > > > F752
> > > > > >> 9FAE
> > > > > >> >> 2481 1A5C 0DF3  CA74 1596 BBF0 7268 35D8
> > > > > >> >>
> > > > > >> >> On Wed, Nov 29, 2023 at 9:41 AM Rui Fan <
> 1996fan...@gmail.com>
> > > > > wrote:
> > > > > >> >>
> > > > > >> >> > Hey Sergey,
> > > > > >> >> >
> > > > > >> >> > Thank you for driving this release.
> > > > > >> >> >
> > > > > >> >> > I try to check this signature, the whole key is
> > > > > >> >> > F7529FAE24811A5C0DF3CA741596BBF0726835D8,
> > > > > >> >> > it matches your 1596BBF0726835D8, but I cannot
> > > > > >> >> > find it from the Flink KEYS[1].
> > > > > >> >> >
> > > > > >> >> > Please correct me if my operation is wrong, thanks~
> > > > > >> >> >
> > > > > >> >> > [1] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > > >> >> >
> > > > > >> >> > Best,
> > > > > >> >> > Rui
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > On Wed, Nov 29, 2023 at 6:09 AM Sergey Nuyanzin <
> > > > > snuyan...@gmail.com
> > > > > >> >
> > > > > >> >> > wrote:
> > > > > >> >> >
> > > > > >> >> > > Hi everyone,
> > > > > >> >> > > Please review and vote on the 

[DISCUSS] FLIP-413: Enable unaligned checkpoints by default

2024-01-05 Thread Piotr Nowojski
Ops, fixing the topic.

Hi!
>
> I would like to propose by default to enable unaligned checkpoints and
> also simultaneously increase the aligned checkpoints timeout from 0ms to
> 5s. I think this change is the right one to do for the majority of Flink
> users.
>
> For more rationale please take a look into the short FLIP-413 [1].
>
> What do you all think?
>
> Best,
> Piotrek
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-413%3A+Enable+unaligned+checkpoints+by+default
>


FLIP-413: Enable unaligned checkpoints by default

2024-01-05 Thread Piotr Nowojski
Hi!

I would like to propose by default to enable unaligned checkpoints and also
simultaneously increase the aligned checkpoints timeout from 0ms to 5s. I
think this change is the right one to do for the majority of Flink users.

For more rationale please take a look into the short FLIP-413 [1].

What do you all think?

Best,
Piotrek

https://cwiki.apache.org/confluence/display/FLINK/FLIP-413%3A+Enable+unaligned+checkpoints+by+default


Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2024-01-04 Thread Piotr Nowojski
Hi,

Thanks for trying to clean this up! I don't have strong opinions on the
topics discussed here, so generally speaking +1 from my side!

Best,
Piotrek

śr., 3 sty 2024 o 04:16 Rui Fan <1996fan...@gmail.com> napisał(a):

> Thanks for the feedback!
>
> Using the `execution.checkpointing.incremental.enabled`,
> and enabling it by default sounds good to me.
>
> Best,
> Rui
>
> On Wed, Jan 3, 2024 at 11:10 AM Zakelly Lan  wrote:
>
> > Hi Rui,
> >
> > Thanks for your comments!
> >
> > IMO, given that the state backend can be plugably loaded (as you can
> > specify a state backend factory), I prefer not providing state backend
> > specified options in the framework.
> >
> > Secondly, the incremental checkpoint is actually a sharing file strategy
> > across checkpoints, which means the state backend *could* reuse files
> from
> > previous cp but not *must* do so. When the state backend could not reuse
> > the files, it is reasonable to fallback to a full checkpoint.
> >
> > Thus, I suggest we make it `execution.checkpointing.incremental` and
> enable
> > it by default. For those state backends not supporting this, they perform
> > full checkpoints and print a warning to inform users. Users do not need
> to
> > pay special attention to different options to control this across
> different
> > state backends. This is more user-friendly in my opinion. WDYT?
> >
> > On Tue, Jan 2, 2024 at 10:49 AM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > Hi Zakelly,
> > >
> > > I'm not sure whether we could add the state backend type in the
> > > new key name of state.backend.incremental. It means we use
> > > `execution.checkpointing.rocksdb-incremental` or
> > > `execution.checkpointing.rocksdb-incremental.enabled`.
> > >
> > > So far, state.backend.incremental only works for rocksdb state backend.
> > > And this feature or optimization is very valuable and huge for large
> > > state flink jobs. I believe it's enabled for most production flink jobs
> > > with large rocksdb state.
> > >
> > > If this option isn't generic for all state backend types, I guess we
> > > can enable `execution.checkpointing.rocksdb-incremental.enabled`
> > > by default in Flink 2.0.
> > >
> > > But if it works for all state backends, it's hard to enable it by
> > default.
> > > Enabling great and valuable features or improvements are useful
> > > for users, especially a lot of new flink users. Out-of-the-box options
> > > are good for users.
> > >
> > > WDYT?
> > >
> > > Best,
> > > Rui
> > >
> > > On Fri, Dec 29, 2023 at 1:45 PM Zakelly Lan 
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Thanks all for your comments!
> > > >
> > > > As many of you have questions about the names for boolean options, I
> > > > suggest we make a naming rule for them. For now I could think of
> three
> > > > options:
> > > >
> > > > Option 1: Use enumeration options if possible. But this may cause
> some
> > > name
> > > > collisions or confusion as we discussed and we should unify the
> > statement
> > > > everywhere.
> > > > Option 2: Use boolean options and add 'enabled' as the suffix.
> > > > Option 3: Use boolean options and ONLY add 'enabled' when there are
> > more
> > > > detailed configurations under the same prefix, to prevent one name
> from
> > > > serving as a prefix to another.
> > > >
> > > > I am slightly inclined to Option 3, since it is more in line with
> > current
> > > > practice and friendly for existing users. Also It reduces the length
> of
> > > > configuration names as much as possible. I really want to hear your
> > > > opinions.
> > > >
> > > >
> > > > @Xuannan
> > > >
> > > > I agree with your comments 1 and 3.
> > > >
> > > > For 2, If we decide to change the name, maybe
> > > > `execution.checkpointing.parallel-cleaner` is better? And as for
> > whether
> > > to
> > > > add 'enabled' I suggest we discuss the rule above. WDYT?
> > > > Thanks!
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Fri, Dec 29, 2023 at 12:02 PM Xuannan Su 
> > > wrote:
> > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > Thanks for driving this! The organization of the configuration
> option
> > > > > in the FLIP looks much cleaner and easier to understand. +1 to the
> > > > > FLIP.
> > > > >
> > > > > Just some questions from me.
> > > > >
> > > > > 1. I think the change to the ConfigOptions should be put in the
> > > > > `Public Interface` section, instead of `Proposed Changed`, as those
> > > > > configuration options are public interface.
> > > > >
> > > > > 2. The key `state.checkpoint.cleaner.parallel-mode` seems
> confusing.
> > > > > It feels like it is used to choose different modes. In fact, it is
> a
> > > > > boolean flag to indicate whether to enable parallel clean. How
> about
> > > > > making it `state.checkpoint.cleaner.parallel-mode.enabled`?
> > > > >
> > > > > 3. The `execution.checkpointing.write-buffer` may better be
> > > > > `execution.checkpointing.write-buffer-size` so that we know it is
> > > > > configuring the 

Re: [VOTE] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2024-01-02 Thread Piotr Nowojski
+1 (binding)

Best,
Piotrek

czw., 28 gru 2023 o 09:19 Timo Walther  napisał(a):

> +1 (binding)
>
> Cheers,
> Timo
>
> > Am 28.12.2023 um 03:13 schrieb Yuepeng Pan :
> >
> > +1 (non-binding).
> >
> > Best,
> > Yuepeng Pan.
> >
> >
> >
> >
> > At 2023-12-28 09:19:37, "Lincoln Lee"  wrote:
> >> +1 (binding)
> >>
> >> Best,
> >> Lincoln Lee
> >>
> >>
> >> Martijn Visser  于2023年12月27日周三 23:16写道:
> >>
> >>> +1 (binding)
> >>>
> >>> On Fri, Dec 22, 2023 at 1:44 AM Jim Hughes
> 
> >>> wrote:
> 
>  Hi Alan,
> 
>  +1 (non binding)
> 
>  Cheers,
> 
>  Jim
> 
>  On Wed, Dec 20, 2023 at 2:41 PM Alan Sheinberg
>   wrote:
> 
> > Hi everyone,
> >
> > I'd like to start a vote on FLIP-400 [1]. It covers introducing a new
> >>> UDF
> > type, AsyncScalarFunction for completing invocations asynchronously.
> >>> It
> > has been discussed in this thread [2].
> >
> > I would like to start a vote.  The vote will be open for at least 72
> >>> hours
> > (until December 28th 18:00 GMT) unless there is an objection or
> > insufficient votes.
> >
> > [1]
> >
> >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > [2] https://lists.apache.org/thread/q3st6t1w05grd7bthzfjtr4r54fv4tm2
> >
> > Thanks,
> > Alan
> >
> >>>
>
>


Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-11 Thread Piotr Nowojski
+1 to the idea, I don't have any comments.

Best,
Piotrek

czw., 7 gru 2023 o 07:15 Alan Sheinberg 
napisał(a):

> >
> > Nicely written and makes sense.  The only feedback I have is around the
> > naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
> > will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
> > imply/suggest that all Async functions are remote.  I wonder if we can
> find
> > another name which doesn't carry that connotation; maybe
> > AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
> > and Async functions seems reasonable.)
> >
> Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
> believe that the python calls are also done asynchronously, so that might
> be a reasonable name, so long as there's no confusion between the base and
> async child class.
>
> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes 
> wrote:
>
> > Hi Alan,
> >
> > Nicely written and makes sense.  The only feedback I have is around the
> > naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
> > will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
> > imply/suggest that all Async functions are remote.  I wonder if we can
> find
> > another name which doesn't carry that connotation; maybe
> > AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
> > and Async functions seems reasonable.)
> >
> > Cheers,
> >
> > Jim
> >
> > On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
> >  wrote:
> >
> > > I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
> > > asynchronous scalar function support [1]
> > >
> > > This feature proposes adding a new UDF type AsyncScalarFunction which
> is
> > > invoked just like a normal ScalarFunction, but is implemented with an
> > > asynchronous eval method.  I had brought this up including the
> motivation
> > > in a previous discussion thread [2].
> > >
> > > The purpose is to achieve high throughput scalar function UDFs while
> > > allowing that an individual call may have high latency.  It allows
> > scaling
> > > up the parallelism of just these calls without having to increase the
> > > parallelism of the whole query (which could be rather resource
> > > inefficient).
> > >
> > > In practice, it should enable SQL integration with external services
> and
> > > systems, which Flink has limited support for at the moment. It should
> > also
> > > allow easier integration with existing libraries which use asynchronous
> > > APIs.
> > >
> > > Looking forward to your feedback and suggestions.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > > >
> > >
> > > [2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
> > > 
> > >
> > > Thanks,
> > > Alan
> > >
> >
>


[jira] [Created] (FLINK-33775) Report JobInitialization traces

2023-12-07 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-33775:
--

 Summary: Report JobInitialization traces
 Key: FLINK-33775
 URL: https://issues.apache.org/jira/browse/FLINK-33775
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33709) Report CheckpointStats as Spans

2023-11-30 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-33709:
--

 Summary: Report CheckpointStats as Spans
 Key: FLINK-33709
 URL: https://issues.apache.org/jira/browse/FLINK-33709
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33708) Add Span and TraceReporter concepts

2023-11-30 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-33708:
--

 Summary: Add Span and TraceReporter concepts
 Key: FLINK-33708
 URL: https://issues.apache.org/jira/browse/FLINK-33708
 Project: Flink
  Issue Type: Sub-task
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33696) FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-33696:
--

 Summary: FLIP-385: Add OpenTelemetryTraceReporter and 
OpenTelemetryMetricReporter
 Key: FLINK-33696
 URL: https://issues.apache.org/jira/browse/FLINK-33696
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Metrics
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.19.0


h1. Motivation

[FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]
 is adding TraceReporter interface. However with 
[FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]
 alone, Log4jTraceReporter would be the only available implementation of 
TraceReporter interface, which is not very helpful.

In this FLIP I’m proposing to contribute both MetricExporter and TraceReporter 
implementation using OpenTelemetry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33697) FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-33697:
--

 Summary: FLIP-386: Support adding custom metrics in Recovery Spans
 Key: FLINK-33697
 URL: https://issues.apache.org/jira/browse/FLINK-33697
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Metrics, Runtime / State Backends
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.19.0


h1. Motivation

FLIP-386 is building on top of 
[FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces].
 The intention here is to add a capability for state backends to attach custom 
attributes during recovery to recovery spans. For example 
RocksDBIncrementalRestoreOperation could report both remote download time and 
time to actually clip/ingest the RocksDB instances after rescaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33695) FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-29 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-33695:
--

 Summary: FLIP-384: Introduce TraceReporter and use it to create 
checkpointing and recovery traces
 Key: FLINK-33695
 URL: https://issues.apache.org/jira/browse/FLINK-33695
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing, Runtime / Metrics
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-29 Thread Piotr Nowojski
Thank you all for voting, I will post the results soon!

Best,
Piotrek

śr., 29 lis 2023 o 10:47 Stefan Richter 
napisał(a):

> +1 (binding)
>
> Best,
> Stefan
>
>
> > On 23. Nov 2023, at 09:49, Roman Khachatryan  wrote:
> >
> > +1 (binding)
> >
> > Regards,
> > Roman
> >
> >
> > On Wed, Nov 22, 2023 at 12:55 PM Rui Fan <1996fan...@gmail.com  1996fan...@gmail.com>> wrote:
> >
> >> +1(binding)
> >>
> >> Thanks for driving this  proposal!
> >>
> >> Best,
> >> Rui
> >>
> >> On Wed, Nov 22, 2023 at 7:44 PM Piotr Nowojski 
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> I'd like to start a vote on the FLIP-386: Support adding custom metrics
> >> in
> >>> Recovery Spans [1]. The discussion thread is here [2].
> >>>
> >>> The vote will be open for at least 72 hours unless there is an
> objection
> >> or
> >>> not enough votes.
> >>>
> >>> [1]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/VAuZE=gmail-imap=170133425800=AOvVaw3oQ0TtRfjerwf5nJ0hQkPA
> >>> [2]
> https://www.google.com/url?q=https://lists.apache.org/thread/zt4ykyhv6cco83j9hjngn52b1oprj1tv=gmail-imap=170133425800=AOvVaw0LqLxSwykk0rLpBTXAB0WA
>
>


[RESULT][VOTE] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-29 Thread Piotr Nowojski
I'm pleased to announce that FLIP-386 has been accepted.

Binding votes +1:
- Rui Fan
- Roman Khachatryan
- Stefan Richter

There were no votes against.

Best,
Piotrek


[RESULT][VOTE] FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-29 Thread Piotr Nowojski
I'm pleased to announce that FLIP-385 has been accepted.

Binding votes +1:
- Rui Fan
- Hangxiang Yu
- Roman Khachatryan
- Jing Ge
- Stefan Richter

There were no votes against.

Best,
Piotrek


Re: [VOTE] FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-29 Thread Piotr Nowojski
Thank you all for voting, I will post the results soon.

Best,
Piotrek

śr., 29 lis 2023 o 10:47 Stefan Richter 
napisał(a):

> +1 (binding)
>
> Best,
> Stefan
>
>
> > On 22. Nov 2023, at 12:34, Jing Ge  wrote:
> >
> > +1(binding)
> > Thanks!
> >
> > Best Regards,
> > Jing
> >
> > On Wed, Nov 22, 2023 at 11:21 AM Roman Khachatryan  <mailto:ro...@apache.org>> wrote:
> >
> >> +1 (binding)
> >>
> >> Regards,
> >> Roman
> >>
> >> On Wed, Nov 22, 2023, 7:30 AM Hangxiang Yu  wrote:
> >>
> >>> +1(binding)
> >>>
> >>> On Wed, Nov 22, 2023 at 10:29 AM Rui Fan <1996fan...@gmail.com> wrote:
> >>>
> >>>> +1(binding)
> >>>>
> >>>> Best,
> >>>> Rui
> >>>>
> >>>> On Wed, Nov 22, 2023 at 1:20 AM Piotr Nowojski 
> >>>> wrote:
> >>>>
> >>>>> Hi All,
> >>>>>
> >>>>> I'd like to start a vote on the FLIP-385: Add
> >>> OpenTelemetryTraceReporter
> >>>>> and OpenTelemetryMetricReporter [1]. The discussion thread is here
> >> [2].
> >>>>>
> >>>>> The vote will be open for at least 72 hours unless there is an
> >>> objection
> >>>> or
> >>>>> not enough votes.
> >>>>>
> >>>>> [1]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/UAuZE=gmail-imap=170125778900=AOvVaw2pEEbI5Qe9cEDMKKf77bb-
> >>>>> [2]
> https://www.google.com/url?q=https://lists.apache.org/thread/1rqp8czz8wnplpzgn8m4qmzvf14lyx0k=gmail-imap=170125778900=AOvVaw1prt9Abiq4GEXW4zLSIEaW
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Piotrek
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Best,
> >>> Hangxiang.
>
>


[RESULT][VOTE] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-29 Thread Piotr Nowojski
I'm pleased to announce that FLIP-384 has been accepted.

Binding votes +1:
- Jing Ge
- Rui Fan
- Hangxiang Yu
- Roman Khachatryan
- David Morávek
- Stefan Richter

Non-binding votes +1:
- Zakelly Lan
- David Radley

There were no votes against.

Best,
Piotrek


Re: [VOTE] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-29 Thread Piotr Nowojski
Thank you all for voting. I will post the results soon.

Best,
Piotrek


śr., 29 lis 2023 o 11:10 David Radley  napisał(a):

> +1(non-binding)
>
> From: Stefan Richter 
> Date: Wednesday, 29 November 2023 at 09:44
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: [VOTE] FLIP-384: Introduce TraceReporter and use
> it to create checkpointing and recovery traces
> +1 (binding)
>
> Best,
> Stefan
>
>
> > On 22. Nov 2023, at 11:20, Roman Khachatryan  wrote:
> >
> > +1 (binding)
> >
> > Regards,
> > Roman
> >
> > On Wed, Nov 22, 2023, 7:08 AM Zakelly Lan  <mailto:zakelly@gmail.com>> wrote:
> >
> >> +1(non-binding)
> >>
> >> Best,
> >> Zakelly
> >>
> >> On Wed, Nov 22, 2023 at 3:04 PM Hangxiang Yu 
> wrote:
> >>
> >>> +1 (binding)
> >>> Thanks for driving this again!
> >>>
> >>> On Wed, Nov 22, 2023 at 10:30 AM Rui Fan <1996fan...@gmail.com> wrote:
> >>>
> >>>> +1(binding)
> >>>>
> >>>> Best,
> >>>> Rui
> >>>>
> >>>> On Wed, Nov 22, 2023 at 6:43 AM Jing Ge 
> >>>> wrote:
> >>>>
> >>>>> +1(binding) Thanks!
> >>>>>
> >>>>> Best regards,
> >>>>> Jing
> >>>>>
> >>>>> On Tue, Nov 21, 2023 at 6:17 PM Piotr Nowojski  >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> I'd like to start a vote on the FLIP-384: Introduce TraceReporter
> >> and
> >>>> use
> >>>>>> it to create checkpointing and recovery traces [1]. The discussion
> >>>> thread
> >>>>>> is here [2].
> >>>>>>
> >>>>>> The vote will be open for at least 72 hours unless there is an
> >>>> objection
> >>>>> or
> >>>>>> not enough votes.
> >>>>>>
> >>>>>> [1]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/TguZE=gmail-imap=170125329000=AOvVaw3QR8LNFApod9Cz_gw2y64w
> >>>>>> [2]
> >>
> https://www.google.com/url?q=https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4=gmail-imap=170125329000=AOvVaw28yzl2wfrtrnoPLsdLW-7q
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Piotrek
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Best,
> >>> Hangxiang.
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


Re: [DISCUSS] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-22 Thread Piotr Nowojski
Hi,

I have started the voting thread for this FLIP as well after modifying
according to Roman's suggestions. If someone has some other suggestions,
please don't hesitate to raise them here regardless of the ongoing vote :)

Best,
Piotrek

śr., 22 lis 2023 o 12:37 Piotr Nowojski  napisał(a):

> Ok, good point about dropping the aggregation method from the API. We can
> always add it in the future if it will ever be needed.
>
> Thanks for your input!
>
> Best,
> Piotrek
>
> śr., 22 lis 2023 o 01:53 Roman Khachatryan  napisał(a):
>
>> Thanks for clarifying, I see your points (although reporting metrics as
>> spans still seems counter intuitive to me).
>>
>> As for the aggregation, I'm concerned that it might be unnecessarily
>> ambiguous: where the aggregation is performed (JM/TM);  across what
>> (tasks/time); and which aggregation should be used.
>>
>> How about dropping it from the API and always using min, max, sum, avg? I
>> think we're interested in these aggregations for all the metrics, and
>> there
>> is no penalty for reporting all of them because it's only for
>> initialization.
>>
>> Regards,
>> Roman
>>
>> On Mon, Nov 20, 2023, 8:42 AM Piotr Nowojski 
>> wrote:
>>
>> > Hi Roman!
>> >
>> > > 1. why the existing MetricGroup interface can't be used? It already
>> had
>> > > methods to add metrics and spans ...
>> >
>> > That's because of the need to:
>> > a) associate the spans to specifically Job's initialisation
>> > b) we need to logically aggregate the span's attributes across subtasks.
>> >
>> > `MetricGroup` doesn't have such capabilities and it's too generic an
>> > interface to introduce things like that IMO.
>> >
>> > Additionally for metrics:
>> > c) reporting initialization measurements as metrics is a flawed concept
>> as
>> > described in the FLIP's-384 motivation
>> > Additionally for spans:
>> > d) as discussed in the FLIP's-384 thread, we don't want to report
>> separate
>> > spans on the TMs. At least not right now
>> >
>> > Also having a specialized, dedicated for initialization metrics class to
>> > collect those numbers, makes the interfaces
>> > more lean and more specialized.
>> >
>> > > 2. IIUC, based on these numbers, we're going to report span(s).
>> Shouldn't
>> > > the backend report them as spans?
>> >
>> > As discussed in the FLIP's-384, initially we don't want to report spans
>> on
>> > TMs. Later, optionally reporting
>> > individual subtask's checkpoint/recovery spans on the JM looks like a
>> > logical follow up.
>> >
>> > > 3. How is the implementation supposed to infer that some metric is a
>> part
>> > > of initialization (and make the corresponding RPC to JM?). Should the
>> > > interfaces be more explicit about that?
>> >
>> > This FLIP proposes [1] to add
>> > `CustomInitializationMetrics
>> > KeyedStateBackendParameters#getCustomInitializationMetrics()`
>> > accessor to the `KeyedStateBackendParameters` argument that's passed to
>> > `createKeyedStateBackend(...)`
>> > method. That's pretty explicit I would say :)
>> >
>> > > 4. What do you think about using histogram or percentiles instead of
>> > > min/max/sum/avg? That would be more informative
>> >
>> > I would prefer to start with the simplest min/max/sum/avg, and let's
>> see in
>> > which direction (if any) we need to evolve
>> > that. Alternative to percentiles is previously mentioned to report
>> > separately each subtask's initialisation/checkpointing span.
>> >
>> > Best,
>> > Piotrek
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans#FLIP386:SupportaddingcustommetricsinRecoverySpans-PublicInterfaces
>> >
>> > czw., 16 lis 2023 o 15:45 Roman Khachatryan 
>> napisał(a):
>> >
>> > > Thanks for the proposal,
>> > >
>> > > Can you please explain:
>> > > 1. why the existing MetricGroup interface can't be used? It already
>> had
>> > > methods to add metrics and spans ...
>> > >
>> > > 2. IIUC, based on these numbers, we're going to report span(s).
>> Shouldn't
>> > > the backend report them as spans?
>> > >
>> > > 3. How is the implementation s

[VOTE] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-22 Thread Piotr Nowojski
Hi All,

I'd like to start a vote on the FLIP-386: Support adding custom metrics in
Recovery Spans [1]. The discussion thread is here [2].

The vote will be open for at least 72 hours unless there is an objection or
not enough votes.

[1] https://cwiki.apache.org/confluence/x/VAuZE
[2] https://lists.apache.org/thread/zt4ykyhv6cco83j9hjngn52b1oprj1tv


Re: [DISCUSS] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-22 Thread Piotr Nowojski
Ok, good point about dropping the aggregation method from the API. We can
always add it in the future if it will ever be needed.

Thanks for your input!

Best,
Piotrek

śr., 22 lis 2023 o 01:53 Roman Khachatryan  napisał(a):

> Thanks for clarifying, I see your points (although reporting metrics as
> spans still seems counter intuitive to me).
>
> As for the aggregation, I'm concerned that it might be unnecessarily
> ambiguous: where the aggregation is performed (JM/TM);  across what
> (tasks/time); and which aggregation should be used.
>
> How about dropping it from the API and always using min, max, sum, avg? I
> think we're interested in these aggregations for all the metrics, and there
> is no penalty for reporting all of them because it's only for
> initialization.
>
> Regards,
> Roman
>
> On Mon, Nov 20, 2023, 8:42 AM Piotr Nowojski  wrote:
>
> > Hi Roman!
> >
> > > 1. why the existing MetricGroup interface can't be used? It already had
> > > methods to add metrics and spans ...
> >
> > That's because of the need to:
> > a) associate the spans to specifically Job's initialisation
> > b) we need to logically aggregate the span's attributes across subtasks.
> >
> > `MetricGroup` doesn't have such capabilities and it's too generic an
> > interface to introduce things like that IMO.
> >
> > Additionally for metrics:
> > c) reporting initialization measurements as metrics is a flawed concept
> as
> > described in the FLIP's-384 motivation
> > Additionally for spans:
> > d) as discussed in the FLIP's-384 thread, we don't want to report
> separate
> > spans on the TMs. At least not right now
> >
> > Also having a specialized, dedicated for initialization metrics class to
> > collect those numbers, makes the interfaces
> > more lean and more specialized.
> >
> > > 2. IIUC, based on these numbers, we're going to report span(s).
> Shouldn't
> > > the backend report them as spans?
> >
> > As discussed in the FLIP's-384, initially we don't want to report spans
> on
> > TMs. Later, optionally reporting
> > individual subtask's checkpoint/recovery spans on the JM looks like a
> > logical follow up.
> >
> > > 3. How is the implementation supposed to infer that some metric is a
> part
> > > of initialization (and make the corresponding RPC to JM?). Should the
> > > interfaces be more explicit about that?
> >
> > This FLIP proposes [1] to add
> > `CustomInitializationMetrics
> > KeyedStateBackendParameters#getCustomInitializationMetrics()`
> > accessor to the `KeyedStateBackendParameters` argument that's passed to
> > `createKeyedStateBackend(...)`
> > method. That's pretty explicit I would say :)
> >
> > > 4. What do you think about using histogram or percentiles instead of
> > > min/max/sum/avg? That would be more informative
> >
> > I would prefer to start with the simplest min/max/sum/avg, and let's see
> in
> > which direction (if any) we need to evolve
> > that. Alternative to percentiles is previously mentioned to report
> > separately each subtask's initialisation/checkpointing span.
> >
> > Best,
> > Piotrek
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans#FLIP386:SupportaddingcustommetricsinRecoverySpans-PublicInterfaces
> >
> > czw., 16 lis 2023 o 15:45 Roman Khachatryan 
> napisał(a):
> >
> > > Thanks for the proposal,
> > >
> > > Can you please explain:
> > > 1. why the existing MetricGroup interface can't be used? It already had
> > > methods to add metrics and spans ...
> > >
> > > 2. IIUC, based on these numbers, we're going to report span(s).
> Shouldn't
> > > the backend report them as spans?
> > >
> > > 3. How is the implementation supposed to infer that some metric is a
> part
> > > of initialization (and make the corresponding RPC to JM?). Should the
> > > interfaces be more explicit about that?
> > >
> > > 4. What do you think about using histogram or percentiles instead of
> > > min/max/sum/avg? That would be more informative
> > >
> > > I like the idea of introducing parameter objects for backend creation.
> > >
> > > Regards,
> > > Roman
> > >
> > > On Tue, Nov 7, 2023, 1:20 PM Piotr Nowojski 
> > wrote:
> > >
> > > > (Fixing topic)
> > > >
> > > > wt., 7 lis 2023 o 09:40 Piotr Nowojski 
> > > napisał(a

Re: [VOTE] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-22 Thread Piotr Nowojski
Thanks Rui!

+1 (binding)

Best,
Piotrek

śr., 22 lis 2023 o 08:05 Hangxiang Yu  napisał(a):

> +1 (binding)
> Thanks for your efforts!
>
> On Mon, Nov 20, 2023 at 11:53 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Hi everyone,
> >
> > Thank you to everyone for the feedback on FLIP-390: Support
> > System out and err to be redirected to LOG or discarded[1]
> > which has been discussed in this thread [2].
> >
> > I would like to start a vote for it. The vote will be open for at least
> 72
> > hours unless there is an objection or not enough votes.
> >
> > [1] https://cwiki.apache.org/confluence/x/4guZE
> > [2] https://lists.apache.org/thread/47pdjggh0q0tdkq0cwt6y5o2o8wrl9jl
> >
> > Best,
> > Rui
> >
>
>
> --
> Best,
> Hangxiang.
>


[VOTE] FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-21 Thread Piotr Nowojski
Hi All,

I'd like to start a vote on the FLIP-385: Add OpenTelemetryTraceReporter
and OpenTelemetryMetricReporter [1]. The discussion thread is here [2].

The vote will be open for at least 72 hours unless there is an objection or
not enough votes.

[1] https://cwiki.apache.org/confluence/x/UAuZE
[2] https://lists.apache.org/thread/1rqp8czz8wnplpzgn8m4qmzvf14lyx0k


Best,
Piotrek


Re: [DISCUSS] FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-21 Thread Piotr Nowojski
Thanks Roman. It kind of does depend, as the API is defined in the
FLIP-384. But the voting thread for that FLIP has started, so I think we
can also start the voting process here.

Best,
Piotrek

czw., 16 lis 2023 o 15:24 Roman Khachatryan  napisał(a):

> Thanks Piotr, the proposal totally makes sense to me.
>
> Does it depend on FLIP-384 for voting?
> Otherwise, we could probably start the vote already as there're no counter
> proposals or objections.
>
> Regards,
> Roman
>
> On Tue, Nov 7, 2023, 1:19 PM Piotr Nowojski 
> wrote:
>
> > Hey, sorry for the misclick. Fixed.
> >
> > wt., 7 lis 2023 o 14:00 Timo Walther  napisał(a):
> >
> > > Thanks for the FLIP, Piotr.
> > >
> > > In order to follow the FLIP process[1], please prefix the email subject
> > > with "[DISCUSS]".
> > >
> > > Also, some people might have added filters to their email clients to
> > > highlight those discussions.
> > >
> > > Thanks,
> > > Timo
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/Flink/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
> > >
> > > On 07.11.23 09:35, Piotr Nowojski wrote:
> > > > Hi all!
> > > >
> > > > I would like to start a discussion on a follow up of FLIP-384:
> > Introduce
> > > > TraceReporter and use it to create checkpointing and recovery traces
> > [1]:
> > > >
> > > > *FLIP-385: Add OpenTelemetryTraceReporter and
> > > OpenTelemetryMetricReporter[
> > > > 2]*
> > > >
> > > > This FLIP proposes to add both MetricReporter and TraceReporter
> > > integrating
> > > > Flink with OpenTelemetry [4].
> > > >
> > > > There is also another follow up FLIP-386 [3], which improves recovery
> > > > traces.
> > > >
> > > > Please let me know what you think!
> > > >
> > > > Best,
> > > > Piotr Nowojski
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-385%3A+Add+OpenTelemetryTraceReporter+and+OpenTelemetryMetricReporter
> > > > [3]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
> > > > [4] https://opentelemetry.io/
> > > >
> > >
> > >
> >
>


[VOTE] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-21 Thread Piotr Nowojski
Hi All,

I'd like to start a vote on the FLIP-384: Introduce TraceReporter and use
it to create checkpointing and recovery traces [1]. The discussion thread
is here [2].

The vote will be open for at least 72 hours unless there is an objection or
not enough votes.

[1] https://cwiki.apache.org/confluence/x/TguZE
[2] https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4


Best,
Piotrek


Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-21 Thread Piotr Nowojski
Hi,

I will start the voting thread shortly. Jing, I'm making an assumption
here/hoping that my answers have satisfied you. If not, please let me know.

Best,
Piotrek

pon., 20 lis 2023 o 09:59 Piotr Nowojski  napisał(a):

> Hi Jing!
>
> >  the upcoming OpenTelemetry based TraceReporter will use the same Span
> > implementation and will not support trace_id and span_id. Does it make
> > sense to at least add the span_id into the current Span design? The
> default
> > implementation could follow your suggestion:
> jobId#attemptId#checkpointId.
>
> Those IDs (jobId, checkpointId) will be accessible to the humans via
> attributes,
> so there is no need to encode them at the moment in the span/trace ids. At
> the
> same time, at the moment, I don't know for sure how the concept of span
> parent ids should be exposed to the user of this API. Whether it should be
> plain
> text, or some pojo generating the trace id/span id. Also I'm not sure how
> would
> this have to work for other reporting systems other than OTEL. Due to those
> reasons I thought that keeping the API as simple as possible would be the
> best
> option.
>
> > 1. The sample code shows that the scope of Span will be the CanonicalName
> > of a class. If there are other cases that could be used as the scope
> too, a
> > javadoc about Span scope would be helpful. If the CanonicalName of a
> class
> > is the best practice, removing the scope from the builder constructor and
> > adding setScope(Class) might ease the API usage. The Span.getScope() can
> > still return String.
>
> I like the idea with `#setScope(Class)`. I will adjust the FLIP :)
>
> > 2. The sample code in the FLIP is not consistent. The first example used
> > Span.builder(..) and the second example used new Span() with setters.
>
> I will fix that, I've forgotten to upgrade the second `new Span()` usage
> to the
> builder.
>
> > 3. I guess the constructor of SpanBuilder class is a typo.
>
> Yes! Thanks for noting.
>
> Best,
> Piotrek
>
>
> czw., 16 lis 2023 o 15:12 Roman Khachatryan  napisał(a):
>
>> Thanks for the proposal,
>>
>> Starting with the minimal functionality and expanding if necessary as the
>> FLIP describes makes a lot of sense to me.
>>
>> Regards,
>> Roman
>>
>> On Wed, Nov 15, 2023, 9:31 PM Jing Ge  wrote:
>>
>> > Hi Piotr,
>> >
>> > Sorry for the late reply and thanks for the proposal, it looks awesome!
>> >
>> > In the discussion, you pointed out that it is difficult to build true
>> > distributed traces. afaiu from FLIP-384 and FLIP-385, the
>> > upcoming OpenTelemetry based TraceReporter will use the same Span
>> > implementation and will not support trace_id and span_id. Does it make
>> > sense to at least add the span_id into the current Span design? The
>> default
>> > implementation could follow your suggestion:
>> jobId#attemptId#checkpointId.
>> >
>> > Some other NIT questions:
>> > 1. The sample code shows that the scope of Span will be the
>> CanonicalName
>> > of a class. If there are other cases that could be used as the scope
>> too, a
>> > javadoc about Span scope would be helpful. If the CanonicalName of a
>> class
>> > is the best practice, removing the scope from the builder constructor
>> and
>> > adding setScope(Class) might ease the API usage. The Span.getScope() can
>> > still return String.
>> > 2. The sample code in the FLIP is not consistent. The first example used
>> > Span.builder(..) and the second example used new Span() with setters.
>> > 3. I guess the constructor of SpanBuilder class is a typo.
>> >
>> > Really a nice idea to introduce the trace report! Thanks again!
>> >
>> > Best regards,
>> > Jing
>> >
>> > On Tue, Nov 14, 2023 at 3:16 PM Piotr Nowojski 
>> > wrote:
>> >
>> > > Hi All,
>> > >
>> > > Thanks for the answers!
>> > >
>> > > Unless there are some objections or suggestions, I will open a voting
>> > > thread later this
>> > > week.
>> > >
>> > > > My original thought was to show how much time a sampled record is
>> > > processed
>> > > > within each operator in stream processing. By saying 'sampled', I
>> mean
>> > we
>> > > > won't generate a trace for every record due to the high cost
>> involved.
>> > > > Instead, we could only trace ONE record from source when the user
>> &g

Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-20 Thread Piotr Nowojski
Hi Jing!

>  the upcoming OpenTelemetry based TraceReporter will use the same Span
> implementation and will not support trace_id and span_id. Does it make
> sense to at least add the span_id into the current Span design? The
default
> implementation could follow your suggestion: jobId#attemptId#checkpointId.

Those IDs (jobId, checkpointId) will be accessible to the humans via
attributes,
so there is no need to encode them at the moment in the span/trace ids. At
the
same time, at the moment, I don't know for sure how the concept of span
parent ids should be exposed to the user of this API. Whether it should be
plain
text, or some pojo generating the trace id/span id. Also I'm not sure how
would
this have to work for other reporting systems other than OTEL. Due to those
reasons I thought that keeping the API as simple as possible would be the
best
option.

> 1. The sample code shows that the scope of Span will be the CanonicalName
> of a class. If there are other cases that could be used as the scope too,
a
> javadoc about Span scope would be helpful. If the CanonicalName of a class
> is the best practice, removing the scope from the builder constructor and
> adding setScope(Class) might ease the API usage. The Span.getScope() can
> still return String.

I like the idea with `#setScope(Class)`. I will adjust the FLIP :)

> 2. The sample code in the FLIP is not consistent. The first example used
> Span.builder(..) and the second example used new Span() with setters.

I will fix that, I've forgotten to upgrade the second `new Span()` usage to
the
builder.

> 3. I guess the constructor of SpanBuilder class is a typo.

Yes! Thanks for noting.

Best,
Piotrek


czw., 16 lis 2023 o 15:12 Roman Khachatryan  napisał(a):

> Thanks for the proposal,
>
> Starting with the minimal functionality and expanding if necessary as the
> FLIP describes makes a lot of sense to me.
>
> Regards,
> Roman
>
> On Wed, Nov 15, 2023, 9:31 PM Jing Ge  wrote:
>
> > Hi Piotr,
> >
> > Sorry for the late reply and thanks for the proposal, it looks awesome!
> >
> > In the discussion, you pointed out that it is difficult to build true
> > distributed traces. afaiu from FLIP-384 and FLIP-385, the
> > upcoming OpenTelemetry based TraceReporter will use the same Span
> > implementation and will not support trace_id and span_id. Does it make
> > sense to at least add the span_id into the current Span design? The
> default
> > implementation could follow your suggestion:
> jobId#attemptId#checkpointId.
> >
> > Some other NIT questions:
> > 1. The sample code shows that the scope of Span will be the CanonicalName
> > of a class. If there are other cases that could be used as the scope
> too, a
> > javadoc about Span scope would be helpful. If the CanonicalName of a
> class
> > is the best practice, removing the scope from the builder constructor and
> > adding setScope(Class) might ease the API usage. The Span.getScope() can
> > still return String.
> > 2. The sample code in the FLIP is not consistent. The first example used
> > Span.builder(..) and the second example used new Span() with setters.
> > 3. I guess the constructor of SpanBuilder class is a typo.
> >
> > Really a nice idea to introduce the trace report! Thanks again!
> >
> > Best regards,
> > Jing
> >
> > On Tue, Nov 14, 2023 at 3:16 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi All,
> > >
> > > Thanks for the answers!
> > >
> > > Unless there are some objections or suggestions, I will open a voting
> > > thread later this
> > > week.
> > >
> > > > My original thought was to show how much time a sampled record is
> > > processed
> > > > within each operator in stream processing. By saying 'sampled', I
> mean
> > we
> > > > won't generate a trace for every record due to the high cost
> involved.
> > > > Instead, we could only trace ONE record from source when the user
> > > requests
> > > > it (via REST API or Web UI) or when triggered periodically at a very
> > low
> > > > frequency.
> > >
> > > That would be useful, but another issue is that we can not measure time
> > > reliably at the
> > > granularity of a single record. Time to process a single record by the
> > > whole operator
> > > chain is usually faster compared to the syscalls to measure time.
> > >
> > > So I think we are stuck with sample based profilers, like Flame Graphs
> > > generated by
> > > the Flink WebUI.
> > >
> > > Best, Piotrek
> > >
> > > czw., 9 lis 

Re: [DISCUSS] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-20 Thread Piotr Nowojski
Hi Roman!

> 1. why the existing MetricGroup interface can't be used? It already had
> methods to add metrics and spans ...

That's because of the need to:
a) associate the spans to specifically Job's initialisation
b) we need to logically aggregate the span's attributes across subtasks.

`MetricGroup` doesn't have such capabilities and it's too generic an
interface to introduce things like that IMO.

Additionally for metrics:
c) reporting initialization measurements as metrics is a flawed concept as
described in the FLIP's-384 motivation
Additionally for spans:
d) as discussed in the FLIP's-384 thread, we don't want to report separate
spans on the TMs. At least not right now

Also having a specialized, dedicated for initialization metrics class to
collect those numbers, makes the interfaces
more lean and more specialized.

> 2. IIUC, based on these numbers, we're going to report span(s). Shouldn't
> the backend report them as spans?

As discussed in the FLIP's-384, initially we don't want to report spans on
TMs. Later, optionally reporting
individual subtask's checkpoint/recovery spans on the JM looks like a
logical follow up.

> 3. How is the implementation supposed to infer that some metric is a part
> of initialization (and make the corresponding RPC to JM?). Should the
> interfaces be more explicit about that?

This FLIP proposes [1] to add
`CustomInitializationMetrics
KeyedStateBackendParameters#getCustomInitializationMetrics()`
accessor to the `KeyedStateBackendParameters` argument that's passed to
`createKeyedStateBackend(...)`
method. That's pretty explicit I would say :)

> 4. What do you think about using histogram or percentiles instead of
> min/max/sum/avg? That would be more informative

I would prefer to start with the simplest min/max/sum/avg, and let's see in
which direction (if any) we need to evolve
that. Alternative to percentiles is previously mentioned to report
separately each subtask's initialisation/checkpointing span.

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans#FLIP386:SupportaddingcustommetricsinRecoverySpans-PublicInterfaces

czw., 16 lis 2023 o 15:45 Roman Khachatryan  napisał(a):

> Thanks for the proposal,
>
> Can you please explain:
> 1. why the existing MetricGroup interface can't be used? It already had
> methods to add metrics and spans ...
>
> 2. IIUC, based on these numbers, we're going to report span(s). Shouldn't
> the backend report them as spans?
>
> 3. How is the implementation supposed to infer that some metric is a part
> of initialization (and make the corresponding RPC to JM?). Should the
> interfaces be more explicit about that?
>
> 4. What do you think about using histogram or percentiles instead of
> min/max/sum/avg? That would be more informative
>
> I like the idea of introducing parameter objects for backend creation.
>
> Regards,
> Roman
>
> On Tue, Nov 7, 2023, 1:20 PM Piotr Nowojski  wrote:
>
> > (Fixing topic)
> >
> > wt., 7 lis 2023 o 09:40 Piotr Nowojski 
> napisał(a):
> >
> > > Hi all!
> > >
> > > I would like to start a discussion on a follow up of FLIP-384:
> Introduce
> > > TraceReporter and use it to create checkpointing and recovery traces
> [1]:
> > >
> > > *FLIP-386: Support adding custom metrics in Recovery Spans [2]*
> > >
> > > This FLIP adds a functionality that will allow state backends to attach
> > > custom metrics to the recovery/initialization traces. This requires
> > changes
> > > to the `@PublicEvolving` `StateBackend` API, and it will be initially
> > used
> > > in `RocksDBIncrementalRestoreOperation` to measure how long does it
> take
> > to
> > > download remote files and separately how long does it take to load
> those
> > > files into the local RocksDB instance.
> > >
> > > Please let me know what you think!
> > >
> > > Best,
> > > Piotr Nowojski
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-14 Thread Piotr Nowojski
Hi All,

Thanks for the answers!

Unless there are some objections or suggestions, I will open a voting
thread later this
week.

> My original thought was to show how much time a sampled record is
processed
> within each operator in stream processing. By saying 'sampled', I mean we
> won't generate a trace for every record due to the high cost involved.
> Instead, we could only trace ONE record from source when the user requests
> it (via REST API or Web UI) or when triggered periodically at a very low
> frequency.

That would be useful, but another issue is that we can not measure time
reliably at the
granularity of a single record. Time to process a single record by the
whole operator
chain is usually faster compared to the syscalls to measure time.

So I think we are stuck with sample based profilers, like Flame Graphs
generated by
the Flink WebUI.

Best, Piotrek

czw., 9 lis 2023 o 05:32 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi Piotr:
>
> Thanks for your reply!
>
> > About structured logging (basically events?) I vaguely remember some
> > discussions about that. It might be a much larger topic, so I would
> prefer
> > to leave it out of the scope of this FLIP.
>
> Sounds make sense to me!
>
> > I think those could be indeed useful. If you would like to contribute to
> them
> > in the future, I would be happy to review the FLIP for it :)
>
> Thank you, after this FLIP, I or my colleagues can pick it up!
>
> Best,
> Rui
>
> On Thu, Nov 9, 2023 at 11:39 AM Zakelly Lan  wrote:
>
> > Hi Piotr,
> >
> > Thanks for your detailed explanation! I could see the challenge of
> > implementing traces with multiple spans and agree to put it in the future
> > work. I personally prefer the idea of generating multi span traces for
> > checkpoints on the JM only.
> >
> > > I'm not sure if I understand the proposal - I don't know how traces
> could
> > > be used for this purpose?
> > > Traces are perfect for one of events (like checkpointing, recovery,
> etc),
> > > not for continuous monitoring
> > > (like processing records). That's what metrics are. Creating trace and
> > > span(s) per each record would
> > > be prohibitively expensive.
> >
> > My original thought was to show how much time a sampled record is
> processed
> > within each operator in stream processing. By saying 'sampled', I mean we
> > won't generate a trace for every record due to the high cost involved.
> > Instead, we could only trace ONE record from source when the user
> requests
> > it (via REST API or Web UI) or when triggered periodically at a very low
> > frequency. However after re-thinking my idea, I realized it's hard to
> > define the whole lifecycle of a record since it is transformed into
> > different forms among operators. We could discuss this in future after
> the
> > basic trace is implemented in Flink.
> >
> > > Unless you mean in batch/bounded jobs? Then yes, we could create a
> > bounded
> > > job trace, with spans
> > > for every stage/task/subtask.
> >
> > Oh yes, batch jobs could definitely leverage the trace.
> >
> > Best,
> > Zakelly
> >
> >
> > On Wed, Nov 8, 2023 at 9:18 PM Jinzhong Li 
> > wrote:
> >
> > > Hi Piotr,
> > >
> > > Thanks for driving this proposal!   I strongly agree that the existing
> > > metric APIs are not suitable for monitoring restore/checkpoint
> behavior!
> > >
> > > I think the TM-level recovery/checkpointing traces are necessary in the
> > > future. In our production environment, we sometimes encounter that job
> > > recovery time is very long (30min+), due to several subTask heavy disk
> > > traffic. The TM-level recovery trace is helpful for troubleshooting
> such
> > > issues.
> > >
> > > Best
> > > Jinzhong
> > >
> > > On Wed, Nov 8, 2023 at 5:09 PM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Zakelly,
> > > >
> > > > Thanks for the comments. Quick answer for both of your questions
> would
> > be
> > > > that it probably should be
> > > > left as a future work. For more detailed answers please take a look
> > below
> > > > :)
> > > >
> > > > > Does it mean the inclusion and subdivision relationships of spans
> > > defined
> > > > > by "parent_id" are not supported? I think it is a very necessary
> > > feature
> > > > > for the trace.
> > > >
> > > > Yes exactly

Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-10 Thread Piotr Nowojski
Thanks! :)

Best, Piotrek

czw., 9 lis 2023 o 16:15 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi Piotr,
>
> Thanks for your feedback!
>
> > Or implement your own loop? It shouldn't be more than a couple of lines.
>
> Implementing it directly is fine, I have updated the FLIP.
> And this logic can be found in the  `isLineEnded` method.
>
> Best,
> Rui
>
> On Thu, Nov 9, 2023 at 11:00 PM Piotr Nowojski 
> wrote:
>
> > Hi Rui,
> >
> > > I see java8 doesn't have `Arrays.equals(int[] a, int aFromIndex, int
> > > aToIndex, int[] b, int bFromIndex, int bToIndex)`,
> > > and java11 has it. Do you have any other suggestions for java8?
> >
> > Maybe use `ByteBuffer.wrap`?
> >
> > ByteBuffer.wrap(array, ..., ...).equals(ByteBuffer.wrap(array2, ...,
> ...))
> >
> > This shouldn't have overheads as far as I remember.
> >
> > Or implement your own loop? It shouldn't be more than a couple of lines.
> >
> > Best,
> > Piotrek
> >
> > czw., 9 lis 2023 o 06:43 Rui Fan <1996fan...@gmail.com> napisał(a):
> >
> > > Hi Piotr, Archit, Feng and Hangxiang:
> > >
> > > Thanks a lot for your feedback!
> > >
> > > Following is my comment, please correct me if I misunderstood anything!
> > >
> > > To Piotr:
> > >
> > > > Is there a reason why you are suggesting to copy out bytes from `buf`
> > to
> > > `bytes`,
> > > > instead of using `Arrays.equals(int[] a, int aFromIndex, int
> aToIndex,
> > > int[] b, int bFromIndex, int bToIndex)`?
> > >
> > > I see java8 doesn't have `Arrays.equals(int[] a, int aFromIndex, int
> > > aToIndex, int[] b, int bFromIndex, int bToIndex)`,
> > > and java11 has it. Do you have any other suggestions for java8?
> > >
> > > Also, this code doesn't run in production. As the comment of
> > > System.lineSeparator():
> > >
> > > > On UNIX systems, it returns {@code "\n"}; on Microsoft
> > > > Windows systems it returns {@code "\r\n"}.
> > >
> > > So Mac and Linux just return one character, we will compare
> > > one byte directly.
> > >
> > >
> > >
> > > To Feng:
> > >
> > > > Will they be written to the taskManager.log file by default
> > > > or the taskManager.out file?
> > >
> > > I prefer LOG as the default value for taskmanager.system-out.mode.
> > > It's useful for job stability and doesn't introduce significant impact
> to
> > > users. Also, our production has already used this feature for
> > > more than 1 years, it works well.
> > >
> > > However, I write the DEFAULT as the default value for
> > > taskmanager.system-out.mode, because when the community introduces
> > > new options, the default value often selects the original behavior.
> > >
> > > Looking forward to hear more thoughts from community about this
> > > default value.
> > >
> > > > If we can make taskmanager.out splittable and rolling, would it be
> > > > easier for users to use this feature?
> > >
> > > Making taskmanager.out splittable and rolling is a good choice!
> > > I have some concerns about it:
> > >
> > > 1. Users may also want to use LOG.info in their code and just
> > >   accidentally use System.out.println. It is possible that they will
> > >   also find the logs directly in taskmanager.log.
> > > 2. I'm not sure whether the rolling strategy is easy to implement.
> > >   If we do it, it's necessary to define a series of flink options
> similar
> > >   to log options, such as: fileMax(how many files should be retained),
> > >   fileSize(The max size each file), fileNamePatten (The suffix of file
> > > name),
> > > 3. Check the file size periodically: all logs are written by log
> plugin,
> > >   they can check the log file size after writing. However, System.out
> > >   are written directly. And flink must start a thread to check the
> latest
> > >   taskmanager.out size periodically. If it's too quick, most of job
> > aren't
> > >   necessary. If it's too slow, the file size cannot be controlled
> > properly.
> > >
> > > Redirect it to LOG.info may be a reasonable and easy choice.
> > > The user didn't really want to log into taskmanager.out, it just
> > > happened by accident.
> > >
> > >
> > >
> > > To Hangxiang:
> >

Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-09 Thread Piotr Nowojski
 we have redirected to LOG mode, Could we also log the subtask
> info
> > ? It may help us to debug granularly.
> >
> > On Thu, Nov 9, 2023 at 9:47 AM Feng Jin  wrote:
> >
> > > Hi, Rui.
> > >
> > > Thank you for initiating this proposal.
> > >
> > > I have a question regarding redirecting stdout and stderr to LOG:
> > >
> > > Will they be written to the taskManager.log file by default or the
> > > taskManager.out file?
> > > If we can make taskmanager.out splittable and rolling, would it be
> easier
> > > for users to use this feature?
> > >
> > > Best,
> > > Feng
> > >
> > > On Thu, Nov 9, 2023 at 3:15 AM Archit Goyal
>  > >
> > > wrote:
> > >
> > > > Hi Rui,
> > > >
> > > > Thanks for the proposal.
> > > >
> > > > The proposed solution of supporting System out and err to be
> redirected
> > > to
> > > > LOG or discarded and introducing an enum and two options to manage
> > this,
> > > > seems reasonable.
> > > >
> > > > +1
> > > >
> > > > Thanks,
> > > > Archit Goyal
> > > >
> > > >
> > > > From: Piotr Nowojski 
> > > > Date: Wednesday, November 8, 2023 at 7:38 AM
> > > > To: dev@flink.apache.org 
> > > > Subject: Re: [DISCUSS] FLIP-390: Support System out and err to be
> > > > redirected to LOG or discarded
> > > > Hi Rui,
> > > >
> > > > Thanks for the proposal.
> > > >
> > > > +1 I don't have any major comments :)
> > > >
> > > > One nit. In `SystemOutRedirectToLog` in this code:
> > > >
> > > >System.arraycopy(buf, count - LINE_SEPARATOR_LENGTH,
> bytes,
> > 0,
> > > > LINE_SEPARATOR_LENGTH);
> > > > return Arrays.equals(LINE_SEPARATOR_BYTES, bytes)
> > > >
> > > > Is there a reason why you are suggesting to copy out bytes from `buf`
> > to
> > > > `bytes`,
> > > > instead of using `Arrays.equals(int[] a, int aFromIndex, int
> aToIndex,
> > > > int[] b, int bFromIndex, int bToIndex)`?
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > > śr., 8 lis 2023 o 11:53 Rui Fan <1996fan...@gmail.com> napisał(a):
> > > >
> > > > > Hi all!
> > > > >
> > > > > I would like to start a discussion of FLIP-390: Support System out
> > and
> > > > err
> > > > > to be redirected to LOG or discarded[1].
> > > > >
> > > > > In various production environments, either cloud native or physical
> > > > > machines, the disk space that Flink TaskManager can use is limited.
> > > > >
> > > > > In general, the flink users shouldn't use the `System.out.println`
> in
> > > > > production,
> > > > > however this may happen when the number of Flink jobs and job
> > > developers
> > > > > is very large. Flink job may use System.out to output a large
> amount
> > of
> > > > > data
> > > > > to the taskmanager.out file. This file will not roll, it will
> always
> > > > > increment.
> > > > > Eventually the upper limit of what the TM can be used for is
> reached.
> > > > >
> > > > > We can support System out and err to be redirected to LOG or
> > discarded,
> > > > > the LOG can roll and won't increment forever.
> > > > >
> > > > > This feature is useful for SREs who maintain Flink environments,
> they
> > > can
> > > > > redirect System.out to LOG by default. Although the cause of this
> > > problem
> > > > > is
> > > > > that the user's code is not standardized, for SRE, pushing users to
> > > > modify
> > > > > the code one by one is usually a very time-consuming operation.
> It's
> > > also
> > > > > useful for job stability where System.out is accidentally used.
> > > > >
> > > > > Looking forward to your feedback, thanks~
> > > > >
> > > > > [1]
> > > >
> > >
> >
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fx%2F4guZE=05%7C01%7Cargoyal%40linkedin.com%7C937821de7bd846e6b97408dbe070beae%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C638350547072823674%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=zEv6B0Xiq2SNuU6Fm%2BAXnH%2BRILbm6Q0uDRbN7h6iaPM%3D=0
> > > > <https://cwiki.apache.org/confluence/x/4guZE>
> > > > >
> > > > > Best,
> > > > > Rui
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.
> >
>


Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-08 Thread Piotr Nowojski
Hi Rui,

Thanks for the proposal.

+1 I don't have any major comments :)

One nit. In `SystemOutRedirectToLog` in this code:

   System.arraycopy(buf, count - LINE_SEPARATOR_LENGTH, bytes, 0,
LINE_SEPARATOR_LENGTH);
return Arrays.equals(LINE_SEPARATOR_BYTES, bytes)

Is there a reason why you are suggesting to copy out bytes from `buf` to
`bytes`,
instead of using `Arrays.equals(int[] a, int aFromIndex, int aToIndex,
int[] b, int bFromIndex, int bToIndex)`?

Best,
Piotrek

śr., 8 lis 2023 o 11:53 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi all!
>
> I would like to start a discussion of FLIP-390: Support System out and err
> to be redirected to LOG or discarded[1].
>
> In various production environments, either cloud native or physical
> machines, the disk space that Flink TaskManager can use is limited.
>
> In general, the flink users shouldn't use the `System.out.println` in
> production,
> however this may happen when the number of Flink jobs and job developers
> is very large. Flink job may use System.out to output a large amount of
> data
> to the taskmanager.out file. This file will not roll, it will always
> increment.
> Eventually the upper limit of what the TM can be used for is reached.
>
> We can support System out and err to be redirected to LOG or discarded,
> the LOG can roll and won't increment forever.
>
> This feature is useful for SREs who maintain Flink environments, they can
> redirect System.out to LOG by default. Although the cause of this problem
> is
> that the user's code is not standardized, for SRE, pushing users to modify
> the code one by one is usually a very time-consuming operation. It's also
> useful for job stability where System.out is accidentally used.
>
> Looking forward to your feedback, thanks~
>
> [1] https://cwiki.apache.org/confluence/x/4guZE
>
> Best,
> Rui
>


Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-08 Thread Piotr Nowojski
Hi Zakelly,

Thanks for the comments. Quick answer for both of your questions would be
that it probably should be
left as a future work. For more detailed answers please take a look below :)

> Does it mean the inclusion and subdivision relationships of spans defined
> by "parent_id" are not supported? I think it is a very necessary feature
> for the trace.

Yes exactly, that is the current limitation. This could be solved somehow
one way or another in the future.

Support for reporting multi span traces all at once - for example
`CheckpointStatsTracker` running JM,
could upon checkpoint completion create in one place the whole structure of
parent spans, to have for
example one span per each subtask. This would be a relatively easy follow
up.

However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.

> In addition to checkpoint and recovery, I believe the trace would also be
> valuable for performance tuning. If Flink can trace and visualize the time
> cost of each operator and stage for a sampled record, users would be able
> to easily determine the end-to-end latency and identify performance issues
> for optimization. Looking forward to seeing these in the future.

I'm not sure if I understand the proposal - I don't know how traces could
be used for this purpose?
Traces are perfect for one of events (like checkpointing, recovery, etc),
not for continuous monitoring
(like processing records). That's what metrics are. Creating trace and
span(s) per each record would
be prohibitively expensive.

Unless you mean in batch/bounded jobs? Then yes, we could create a bounded
job trace, with spans
for every stage/task/subtask.

Best,
Piotrek


śr., 8 lis 2023 o 05:30 Zakelly Lan  napisał(a):

> Hi Piotr,
>
> Happy to see the trace! Thanks for this proposal.
>
> One minor question: It is mentioned in the interface of Span:
>
> Currently we don't support traces with multiple spans. Each span is
> > self-contained and represents things like a checkpoint or recovery.
>
>
> Does it mean the inclusion and subdivision relationships of spans defined
> by "parent_id" are not supported? I think it is a very necessary feature
> for the trace.
>
> In addition to checkpoint and recovery, I believe the trace would also be
> valuable for performance tuning. If Flink can trace and visualize the time
> cost of each operator and stage for a sampled record, users would be able
> to easily determine the end-to-end latency and identify performance issues
> for optimization. Looking forward to seeing these in the future.
>
> Best,
> Zakelly
>
>
> On Tue, Nov 7, 2023 at 6:27 PM Piotr Nowojski 
> wrote:
>
> > Hi Rui,
> >
> > Thanks for the comments!
> >
> > > 1. I see the trace just supports Span? Does it support trace events?
> > > I'm not sure whether tracing events is reasonable for TraceReporter.
> > > If it supports, flink can report checkpoint and checkpoint path
> > proactively.
> > > Currently, checkpoint lists or the latest checkpoint can only be
> fetched
> > > by external components or platforms. And report is more timely and
> > > efficient than fetch.
> >
> > No, currently the `TraceReporter` that I'm introducing supports only
> single
> > span traces.
> > So currently neither events on their own, nor events inside spans are not
> > supported.
> > This is done just for the sake of simplicity, and test out the basic
> > functionality. But I think,
> > those currently missing features should be added at some point in
> > the future.
> >
> > About structured logging (basically events?) I vaguely remember some
> > discussions about
> > that. It might be a much larger topic, so I would prefer to leave it out
> of
> > the scope of this
> > FLIP.
> >
> > > 2. This FLIP just monitors the checkpoint and task recovery, right?
> >
> >

[DISCUSS] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-07 Thread Piotr Nowojski
(Fixing topic)

wt., 7 lis 2023 o 09:40 Piotr Nowojski  napisał(a):

> Hi all!
>
> I would like to start a discussion on a follow up of FLIP-384: Introduce
> TraceReporter and use it to create checkpointing and recovery traces [1]:
>
> *FLIP-386: Support adding custom metrics in Recovery Spans [2]*
>
> This FLIP adds a functionality that will allow state backends to attach
> custom metrics to the recovery/initialization traces. This requires changes
> to the `@PublicEvolving` `StateBackend` API, and it will be initially used
> in `RocksDBIncrementalRestoreOperation` to measure how long does it take to
> download remote files and separately how long does it take to load those
> files into the local RocksDB instance.
>
> Please let me know what you think!
>
> Best,
> Piotr Nowojski
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
>
>


[DISCUSS] FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-07 Thread Piotr Nowojski
Hey, sorry for the misclick. Fixed.

wt., 7 lis 2023 o 14:00 Timo Walther  napisał(a):

> Thanks for the FLIP, Piotr.
>
> In order to follow the FLIP process[1], please prefix the email subject
> with "[DISCUSS]".
>
> Also, some people might have added filters to their email clients to
> highlight those discussions.
>
> Thanks,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/Flink/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
>
> On 07.11.23 09:35, Piotr Nowojski wrote:
> > Hi all!
> >
> > I would like to start a discussion on a follow up of FLIP-384: Introduce
> > TraceReporter and use it to create checkpointing and recovery traces [1]:
> >
> > *FLIP-385: Add OpenTelemetryTraceReporter and
> OpenTelemetryMetricReporter[
> > 2]*
> >
> > This FLIP proposes to add both MetricReporter and TraceReporter
> integrating
> > Flink with OpenTelemetry [4].
> >
> > There is also another follow up FLIP-386 [3], which improves recovery
> > traces.
> >
> > Please let me know what you think!
> >
> > Best,
> > Piotr Nowojski
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-385%3A+Add+OpenTelemetryTraceReporter+and+OpenTelemetryMetricReporter
> > [3]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
> > [4] https://opentelemetry.io/
> >
>
>


Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-07 Thread Piotr Nowojski
Hi Rui,

Thanks for the comments!

> 1. I see the trace just supports Span? Does it support trace events?
> I'm not sure whether tracing events is reasonable for TraceReporter.
> If it supports, flink can report checkpoint and checkpoint path
proactively.
> Currently, checkpoint lists or the latest checkpoint can only be fetched
> by external components or platforms. And report is more timely and
> efficient than fetch.

No, currently the `TraceReporter` that I'm introducing supports only single
span traces.
So currently neither events on their own, nor events inside spans are not
supported.
This is done just for the sake of simplicity, and test out the basic
functionality. But I think,
those currently missing features should be added at some point in
the future.

About structured logging (basically events?) I vaguely remember some
discussions about
that. It might be a much larger topic, so I would prefer to leave it out of
the scope of this
FLIP.

> 2. This FLIP just monitors the checkpoint and task recovery, right?

Yes, it only adds single span traces for checkpointing and
recovery/initialisation - one
span per whole job per either recovery/initialization process or per each
checkpoint.

> Could we add more operations in this FLIP? In our production, we
> added a lot of trace reporters for job starts and scheduler operation.
> They are useful if some jobs start slowly, because they will affect
> the job availability. For example:
> - From JobManager process is started to JobGraph is created
> - From JobGraph is created to JobMaster is created
> - From JobMaster is created to job is running
> - From start request tm from yarn or kubernetes to all tms are ready
> - etc

I think those could be indeed useful. If you would like to contribute them
in the future,
I would be happy to review the FLIP for it :)

> Of course, this FLIP doesn't include them is fine for me. The first
version
> only initializes the interface and common operations, and we can add
> more operations in the future

Yes, that's exactly my thinking :)

Best,
Piotrek

wt., 7 lis 2023 o 10:05 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi Piotr,
>
> Thanks for driving this proposal! The trace reporter is useful to
> check a lot of duration monitors inside of Flink.
>
> I have some questions about this proposal:
>
> 1. I see the trace just supports Span? Does it support trace events?
> I'm not sure whether tracing events is reasonable for TraceReporter.
> If it supports, flink can report checkpoint and checkpoint path
> proactively.
> Currently, checkpoint lists or the latest checkpoint can only be fetched
> by external components or platforms. And report is more timely and
> efficient than fetch.
>
> 2. This FLIP just monitors the checkpoint and task recovery, right?
> Could we add more operations in this FLIP? In our production, we
> added a lot of trace reporters for job starts and scheduler operation.
> They are useful if some jobs start slowly, because they will affect
> the job availability. For example:
> - From JobManager process is started to JobGraph is created
> - From JobGraph is created to JobMaster is created
> - From JobMaster is created to job is running
> - From start request tm from yarn or kubernetes to all tms are ready
> - etc
>
> Of course, this FLIP doesn't include them is fine for me. The first version
> only initializes the interface and common operations, and we can add
> more operations in the future.
>
> Best,
> Rui
>
> On Tue, Nov 7, 2023 at 4:31 PM Piotr Nowojski 
> wrote:
>
> > Hi all!
> >
> > I would like to start a discussion on FLIP-384: Introduce TraceReporter
> and
> > use it to create checkpointing and recovery traces [1].
> >
> > This proposal intends to improve observability of Flink's Checkpointing
> and
> > Recovery/Initialization operations, by adding support for reporting
> traces
> > from Flink. In the future, reporting traces can be of course used for
> other
> > use cases and also by users.
> >
> > There are also two other follow up FLIPS, FLIP-385 [2] and FLIP-386 [3],
> > which expand the basic functionality introduced in FLIP-384 [1].
> >
> > Please let me know what you think!
> >
> > Best,
> > Piotr Nowojski
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-385%3A+Add+OpenTelemetryTraceReporter+and+OpenTelemetryMetricReporter
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
> >
>


FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-07 Thread Piotr Nowojski
Hi all!

I would like to start a discussion on a follow up of FLIP-384: Introduce
TraceReporter and use it to create checkpointing and recovery traces [1]:

*FLIP-386: Support adding custom metrics in Recovery Spans [2]*

This FLIP adds a functionality that will allow state backends to attach
custom metrics to the recovery/initialization traces. This requires changes
to the `@PublicEvolving` `StateBackend` API, and it will be initially used
in `RocksDBIncrementalRestoreOperation` to measure how long does it take to
download remote files and separately how long does it take to load those
files into the local RocksDB instance.

Please let me know what you think!

Best,
Piotr Nowojski

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans


FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-07 Thread Piotr Nowojski
Hi all!

I would like to start a discussion on a follow up of FLIP-384: Introduce
TraceReporter and use it to create checkpointing and recovery traces [1]:

*FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter[
2]*

This FLIP proposes to add both MetricReporter and TraceReporter integrating
Flink with OpenTelemetry [4].

There is also another follow up FLIP-386 [3], which improves recovery
traces.

Please let me know what you think!

Best,
Piotr Nowojski

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-385%3A+Add+OpenTelemetryTraceReporter+and+OpenTelemetryMetricReporter
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
[4] https://opentelemetry.io/


[DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-07 Thread Piotr Nowojski
Hi all!

I would like to start a discussion on FLIP-384: Introduce TraceReporter and
use it to create checkpointing and recovery traces [1].

This proposal intends to improve observability of Flink's Checkpointing and
Recovery/Initialization operations, by adding support for reporting traces
from Flink. In the future, reporting traces can be of course used for other
use cases and also by users.

There are also two other follow up FLIPS, FLIP-385 [2] and FLIP-386 [3],
which expand the basic functionality introduced in FLIP-384 [1].

Please let me know what you think!

Best,
Piotr Nowojski

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-385%3A+Add+OpenTelemetryTraceReporter+and+OpenTelemetryMetricReporter
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans


[jira] [Created] (FLINK-33338) Bump up RocksDB version to 7.x

2023-10-23 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-8:
--

 Summary: Bump up RocksDB version to 7.x
 Key: FLINK-8
 URL: https://issues.apache.org/jira/browse/FLINK-8
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Piotr Nowojski


We need to bump RocksDB in order to be able to use new IngestDB and ClipDB 
commands.

If some of the required changes haven't been merged to Facebook/RocksDB, we 
should cherry-pick and include them in our FRocksDB fork.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33337) Expose IngestDB and ClipDB in the official RocksDB API

2023-10-23 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-7:
--

 Summary: Expose IngestDB and ClipDB in the official RocksDB API
 Key: FLINK-7
 URL: https://issues.apache.org/jira/browse/FLINK-7
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Piotr Nowojski
Assignee: Yue Ma


Remaining open PR: https://github.com/facebook/rocksdb/pull/11646



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release 1.18.0, release candidate #3

2023-10-19 Thread Piotr Nowojski
+1 (binding)

Best,
Piotrek

czw., 19 paź 2023 o 09:55 Yun Tang  napisał(a):

> +1 (non-binding)
>
>
>   *   Build from source code
>   *   Verify the pre-built jar packages were built with JDK8
>   *   Verify FLIP-291 with a standalone cluster, and it works fine with
> StateMachine example.
>   *   Checked the signature
>   *   Viewed the PRs.
>
> Best
> Yun Tang
> 
> From: Cheng Pan 
> Sent: Thursday, October 19, 2023 14:38
> To: dev@flink.apache.org 
> Subject: RE: [VOTE] Release 1.18.0, release candidate #3
>
> +1 (non-binding)
>
> We(the Apache Kyuubi community), verified that the Kyuubi Flink engine
> works well[1] with Flink 1.18.0 RC3.
>
> [1] https://github.com/apache/kyuubi/pull/5465
>
> Thanks,
> Cheng Pan
>
>
> On 2023/10/19 00:26:24 Jing Ge wrote:
> > Hi everyone,
> >
> > Please review and vote on the release candidate #3 for the version
> > 1.18.0, as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The complete staging area is available for your review, which includes:
> >
> > * JIRA release notes [1], and the pull request adding release note for
> > users [2]
> > * the official Apache source release and binary convenience releases to
> be
> > deployed to dist.apache.org [3], which are signed with the key with
> > fingerprint 96AE0E32CBE6E0753CE6 [4],
> > * all artifacts to be deployed to the Maven Central Repository [5],
> > * source code tag "release-1.18.0-rc3" [6],
> > * website pull request listing the new release and adding announcement
> blog
> > post [7].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Best regards,
> > Konstantin, Sergey, Qingsheng, and Jing
> >
> > [1]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > [2] https://github.com/apache/flink/pull/23527
> > [3] https://dist.apache.org/repos/dist/dev/flink/flink-1.18.0-rc3/
> > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [5]
> https://repository.apache.org/content/repositories/orgapacheflink-1662
> > [6] https://github.com/apache/flink/releases/tag/release-1.18.0-rc3
> > [7] https://github.com/apache/flink-web/pull/680
> >
>
>
>
>


Re: [ANNOUNCE] The Flink Speed Center and benchmark daily run are back online

2023-10-19 Thread Piotr Nowojski
Thank you!

czw., 19 paź 2023 o 11:31 Konstantin Knauf  napisał(a):

> Thanks a lot for working on this!
>
> Am Do., 19. Okt. 2023 um 10:24 Uhr schrieb Zakelly Lan <
> zakelly@gmail.com>:
>
> > Hi everyone,
> >
> > Flink benchmarks [1] generate daily performance reports in the Apache
> > Flink slack channel (#flink-dev-benchmarks) to detect performance
> > regression [2]. Those benchmarks previously were running on several
> > machines donated and maintained by Ververica. Unfortunately, those
> > machines were gone due to account issues [3] and the benchmarks daily
> > run stopped since August 24th delaying the release of Flink 1.18 a
> > bit. [4].
> >
> > Ververica donated several new machines! After several weeks of work, I
> > have successfully re-established the codespeed panel and benchmark
> > daily run pipelines on them. At this time, we are pleased to announce
> > that the Flink Speed Center and benchmark pipelines are back online.
> > These new machines have a more formal management to ensure that
> > previous accidents will not occur in the future.
> >
> > What's more, I successfully recovered historical data backed up by
> > Yanfei Lei [5]. So with the old domain [6] redirected to the new
> > machines, the old links that existed in previous records will still be
> > valid. Besides the benchmarks with Java8 and Java11, I also added a
> > pipeline for Java17 running daily.
> >
> > How to use it:
> > We also registered a new domain name 'flink-speed.xyz' for the Flink
> > Speed Center [7]. It is recommended to use the new domain in the
> > future. Currently, the self-service method of triggering benchmarks is
> > unavailable considering the lack of resources and potential
> > vulnerabilities of Jenkins. Please contact one of Apache Flink PMCs to
> > submit a benchmark. More info is updated on the wiki[8].
> >
> > Daily Monitoring:
> > The performance daily monitoring on the Apache Flink slack channel [2]
> > is still unavailable as the benchmark results need more time to
> > stabilize in the new environment. Once the baseline results become
> > available for regression detection, I will enable the daily
> > monitoring.
> >
> > Please feel free to reach out to me if you have any suggestions or
> > questions. Thanks Ververica again for denoting machines!
> >
> >
> > Best,
> > Zakelly
> >
> > [1] https://github.com/apache/flink-benchmarks
> > [2] https://lists.apache.org/thread/zok62sx4m50c79htfp18ymq5vmtgbgxj
> > [3] https://issues.apache.org/jira/browse/FLINK-33052
> > [4] https://lists.apache.org//thread/5x28rp3zct4p603hm4zdwx6kfr101w38
> > [5] https://issues.apache.org/jira/browse/FLINK-30890
> > [6] http://codespeed.dak8s.net:8000
> > [7] http://flink-speed.xyz
> > [8]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115511847
> >
>
>
> --
> https://twitter.com/snntrable
> https://github.com/knaufk
>


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-25 Thread Piotr Nowojski
Hi Jarl and Dong,

I'm a bit confused about the difference between the two competing options.
Could one of you elaborate what's the difference between:
> 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> the effect of the previous invocation (if any) of
> `#setIsProcessingBacklog(true)` on the given source instance.

and

> 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> source instance will have watermarkLag=false.

?

Best,
Piotrek

czw., 21 wrz 2023 o 15:28 Dong Lin  napisał(a):

> Hi all,
>
> Jark and I discussed this FLIP offline and I will summarize our discussion
> below. It would be great if you could provide your opinion of the proposed
> options.
>
> Regarding the target use-cases:
> - We both agreed that MySQL CDC should have backlog=true when watermarkLag
> is large during the binlog phase.
> - Dong argued that other streaming sources with watermarkLag defined (e.g.
> Kafka) should also have backlog=true when watermarkLag is large. The
> pros/cons discussion below assume this use-case needs to be supported.
>
> The 1st option is what is currently proposed in FLIP-328, with the
> following key characteristics:
> 1) There is one job-level config (i.e.
> pipeline.backlog.watermark-lag-threshold) that applies to all sources with
> watermarkLag metric defined.
> 2) The semantics of `#setIsProcessingBacklog(false)` is that it overrides
> the effect of the previous invocation (if any) of
> `#setIsProcessingBacklog(true)` on the given source instance.
>
> The 2nd option is what Jark proposed in this email thread, with the
> following key characteristics:
> 1) Add source-specific config (both Java API and SQL source property) to
> every source for which we want to set backlog status based on the
> watermarkLag metric. For example, we might add separate Java APIs
> `#setWatermarkLagThreshold`  for MySQL CDC source, HybridSource,
> KafkaSource, PulsarSource etc.
> 2) The semantics of `#setIsProcessingBacklog(false)` is that the given
> source instance will have watermarkLag=false.
>
> Here are the key pros/cons of these two options.
>
> Cons of the 1st option:
> 1) The semantics of `#setIsProcessingBacklog(false)` is harder to
> understand for Flink operator developers than the corresponding semantics
> in option-2.
>
> Cons of the  2nd option:
> 1) More work for end-users. For a job with multiple sources that need to be
> configured with a watermark lag threshold, users need to specify multiple
> configs (one for each source) instead of specifying one job-level config.
>
> 2) More work for Flink operator developers. Overall there are more public
> APIs (one Java API and one SQL property for each source that needs to
> determine backlog based on watermark) exposed to end users. This also adds
> more burden for the Flink community to maintain these APIs.
>
> 3) It would be hard (e.g. require backward incompatible API change) to
> extend the Flink runtime to support job-level config to set watermark
> strategy in the future (e.g. support the
> pipeline.backlog.watermark-lag-threshold in option-1). This is because an
> existing source operator's code might have hardcoded an invocation of
> `#setIsProcessingBacklog(false)`, which means the backlog status must be
> set to true, which prevents Flink runtime from setting backlog=true when a
> new strategy is triggered.
>
> Overall, I am still inclined to choose option-1 because it is more
> extensible and simpler to use in the long term when we want to support/use
> multiple sources whose backlog status can change based on the watermark
> lag. While option-1's `#setIsProcessingBacklog` is a bit harder to
> understand than option-2, I think this overhead/cost is worthwhile as it
> makes end-users' life easier in the long term.
>
> Jark: thank you for taking the time to review this FLIP. Please feel free
> to comment if I missed anything in the pros/cons above.
>
> Jark and I have not reached agreement on which option is better. It will be
> really helpful if we can get more comments on these options.
>
> Thanks,
> Dong
>
>
> On Tue, Sep 19, 2023 at 11:26 AM Dong Lin  wrote:
>
> > Hi Jark,
> >
> > Thanks for the reply. Please see my comments inline.
> >
> > On Tue, Sep 19, 2023 at 10:12 AM Jark Wu  wrote:
> >
> >> Hi Dong,
> >>
> >> Sorry for the late reply.
> >>
> >> > The rationale is that if there is any strategy that is triggered and
> >> says
> >> > backlog=true, then job's backlog should be true. Otherwise, the job's
> >> > backlog status is false.
> >>
> >> I'm quite confused about this. Does that mean, if the source is in the
> >> changelog phase, the source has to continuously invoke
> >> "setIsProcessingBacklog(true)" (in an infinite loop?). Otherwise,
> >> the job's backlog status would be set to false by the framework?
> >>
> >
> > No, the source would not have to continuously invoke
> > setIsProcessingBacklog(true) in an infinite loop.
> >
> > Actually, I am not very sure why there is confusion that 

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-09-19 Thread Piotr Nowojski
ch
> > > > > >
> > > > > > Obviously, in case of the switch from batch to streaming, the
> > > property
> > > > > > a) has to be modified so the watermark does not move to +inf, but
> > to
> > > > > > min(streaming watermark). Giving these properties, it should be
> > > > possible
> > > > > > to exchange batch and streaming processing without any
> cooperation
> > > with
> > > > > > the application logic itself. Is my understanding correct?
> > > > > >
> > > > > > If so, there is still one open question to efficiency, though.
> The
> > > > > > streaming operator _might_ need sorting by timestamp (e.g.
> > processing
> > > > > > time-series data, or even sequential data). In that case simply
> > > > > > switching streaming semantics to batch processing does not yield
> > > > > > efficient processing, because the operator still needs to buffer
> > and
> > > > > > manually sort all the input data (batch data is always
> unordered).
> > On
> > > > > > the other hand, the batch runner already does sorting (for
> grouping
> > > by
> > > > > > key), so adding additional sorting criterion is very cheap. In
> > Apache
> > > > > > Beam, we introduced a property of a stateful PTransform (DoFn)
> > called
> > > > > > @RequiresTimeSortedInput [1], which can then be implemented
> > > efficiently
> > > > > > by batch engines.
> > > > > >
> > > > > > Does the FLIP somehow work with conditions i) and ii)? I can
> > imagine
> > > > for
> > > > > > instance that if data is read from say Kafka, then if backlog
> gets
> > > > > > sufficiently large, then even the batch processing can take
> > > substantial
> > > > > > time and if it fails after long processing, some of the original
> > data
> > > > > > might be already rolled out from Kafka topic.
> > > > > >
> > > > > > In the FLIP there are some proposed changes to sources to emit
> > > metadata
> > > > > > about if the records come from backlog. What is the driving line
> of
> > > > > > thoughts why this is needed? In my point of view, streaming
> engines
> > > are
> > > > > > _always_ processing backlog, the only question is "how delayed
> are
> > > the
> > > > > > currently processed events after HEAD", or more specifically in
> > this
> > > > > > case "how many elements can we expect to process if the source
> > would
> > > > > > immediately stop receiving more data?". This should be
> configurable
> > > > > > using simple option defining the difference between current
> > > > > > processing-time (JM) and watermark of the source, or am I missing
> > > > > > something?
> > > > > >
> > > > > > Thanks for clarification and all the best,
> > > > > >
> > > > > >   Jan
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
> > > > > >
> > > > > > On 8/31/23 13:17, Xuannan Su wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to share some updates on FLIP-327. Dong and I have
> > > had a
> > > > > > > series of discussions and have made several refinements to the
> > > FLIP.
> > > > > > >
> > > > > > > The major change to the FLIP is to allow the input of the
> > one-input
> > > > > > > operator to be automatically sorted during backlog processing.
> > When
> > > > > > > combined with the state backend optimization introduced in
> > FLIP-325
> > > > > [1],
> > > > > > > all the keyed single-input operators can achieve similar
> > > performance
> > > > as
> > > > > > in
> > > > > > > batch mode during backlog processing without any code change to
> > the
> 

[jira] [Created] (FLINK-33071) Log checkpoint statistics

2023-09-11 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-33071:
--

 Summary: Log checkpoint statistics 
 Key: FLINK-33071
 URL: https://issues.apache.org/jira/browse/FLINK-33071
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing, Runtime / Metrics
Affects Versions: 1.18.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


This is a stop gap solution until we have a proper way of solving FLINK-23411.

The plan is to dump JSON serialised checkpoint statistics into Flink JM's log, 
with a {{DEBUG}} level. This could be used to analyse what has happened with a 
certain checkpoint in the past.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-08-16 Thread Piotr Nowojski
Hi Dong,

Operators API is unfortunately also our public facing API and I mean the
APIs that we will add there should also be marked `@Experimental` IMO.

The config options should also be marked as experimental (both
annotated @Experimental and noted the same thing in the docs,
if @Experimental annotation is not automatically mentioned in the docs).

> Alternatively, how about we add a doc for
checkpointing.interval-during-backlog explaining its impact/concern as
discussed above?

We should do this independently from marking the APIs/config options as
`@Experimental`

Best,
Piotrek

pt., 11 sie 2023 o 14:55 Dong Lin  napisał(a):

> Hi Piotr,
>
> Thanks for the reply!
>
> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > Sorry for the long delay in responding!
> >
> > >  Given that it is an optional feature that can be
> > > turned off by users, it might be OK to just let users try it out and we
> > can
> > > fix performance issues once we detect any of them. What do you think?
> >
> > I think it's fine. It would be best to mark this feature as experimental,
> > and
> > we say that the config keys or the default values might change in the
> > future.
> >
>
> In general I agree we can mark APIs that determine "whether to enable
> dynamic switching between stream/batch mode" as experimental.
>
> However, I am not sure we have such an API yet. The APIs added in this FLIP
> are intended to be used by operator developers rather than end users. End
> users can enable this capability by setting
> execution.checkpointing.interval-during-backlog = Long.MAX and uses a
> source which might implicitly set backlog statu (e.g. HybridSource). So
> execution.checkpointing.interval-during-backlog is the only user-facing
> APIs that can always control whether this feature can be used.
>
> However, execution.checkpointing.interval-during-backlog itself is not tied
> to FLIP-327.
>
> Do you mean we should set checkpointing.interval-during-backlog as
> experimental? Alternatively, how about we add a doc for
> checkpointing.interval-during-backlog explaining its impact/concern as
> discussed above?
>
> Best,
> Dong
>
>
> > > Maybe we can revisit the need for such a config when we
> introduce/discuss
> > > the capability to switch backlog from false to true in the future. What
> > do
> > > you think?
> >
> > Sure, we can do that.
> >
> > Best,
> > Piotrek
> >
> > niedz., 23 lip 2023 o 14:32 Dong Lin  napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Thanks a lot for the explanation. Please see my reply inline.
> > >
> > > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> > piotr.nowoj...@gmail.com>
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks a lot for the answers. I can now only briefly answer your last
> > > > email.
> > > >
> > > > > It is possible that spilling to disks might cause larger overhead.
> > IMO
> > > it
> > > > > is an orthogonal issue already existing in Flink. This is because a
> > > Flink
> > > > > job running batch mode might also be slower than its throughput in
> > > stream
> > > > > mode due to the same reason.
> > > >
> > > > Yes, I know, but the thing that worries me is that previously only a
> > user
> > > > alone
> > > > could decide whether to use batch mode or streaming, and in practice
> > one
> > > > user would rarely (if ever) use both for the same problem/job/query.
> If
> > > his
> > > > intention was to eventually process live data, he was using streaming
> > > even
> > > > if there was a large backlog at the start (apart of some very few
> very
> > > > power
> > > > users).
> > > >
> > > > With this change, we want to introduce a mode that would be switching
> > > back
> > > > and forth between streaming and "batch in streaming" automatically.
> So
> > a
> > > > potential performance regression would be much more visible and
> painful
> > > > at the same time. If batch query runs slower then it could, it's kind
> > of
> > > > fine as
> > > > it will end at some point. If streaming query during large back
> > pressure
> > > > maybe
> > > > temporary load spike switches to batch processing, that's a bigger
> > deal.
> > > > Especially if batch proc

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-08-11 Thread Piotr Nowojski
Hi,

Sorry for the long delay in responding!

>  Given that it is an optional feature that can be
> turned off by users, it might be OK to just let users try it out and we
can
> fix performance issues once we detect any of them. What do you think?

I think it's fine. It would be best to mark this feature as experimental,
and
we say that the config keys or the default values might change in the
future.

> Maybe we can revisit the need for such a config when we introduce/discuss
> the capability to switch backlog from false to true in the future. What do
> you think?

Sure, we can do that.

Best,
Piotrek

niedz., 23 lip 2023 o 14:32 Dong Lin  napisał(a):

> Hi Piotr,
>
> Thanks a lot for the explanation. Please see my reply inline.
>
> On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski 
> wrote:
>
> > Hi Dong,
> >
> > Thanks a lot for the answers. I can now only briefly answer your last
> > email.
> >
> > > It is possible that spilling to disks might cause larger overhead. IMO
> it
> > > is an orthogonal issue already existing in Flink. This is because a
> Flink
> > > job running batch mode might also be slower than its throughput in
> stream
> > > mode due to the same reason.
> >
> > Yes, I know, but the thing that worries me is that previously only a user
> > alone
> > could decide whether to use batch mode or streaming, and in practice one
> > user would rarely (if ever) use both for the same problem/job/query. If
> his
> > intention was to eventually process live data, he was using streaming
> even
> > if there was a large backlog at the start (apart of some very few very
> > power
> > users).
> >
> > With this change, we want to introduce a mode that would be switching
> back
> > and forth between streaming and "batch in streaming" automatically. So a
> > potential performance regression would be much more visible and painful
> > at the same time. If batch query runs slower then it could, it's kind of
> > fine as
> > it will end at some point. If streaming query during large back pressure
> > maybe
> > temporary load spike switches to batch processing, that's a bigger deal.
> > Especially if batch processing mode will not be able to actually even
> > handle
> > the normal load, after the load spike. In that case, the job could never
> > recover
> > from the backpressure/backlog mode.
> >
>
> I understand you are concerned with the risk of performance regression
> introduced due to switching to batch mode.
>
> After thinking about this more, I think this existing proposal meets the
> minimum requirement of "not introducing regression for existing jobs". The
> reason is that even if batch mode can be slower than stream mode for some
> operators in some cases, this is an optional feature that will only be
> enabled if a user explicitly overrides the newly introduced config to
> non-default values. Existing jobs that simply upgrade their Flink library
> version will not suffer any performance regression.
>
> More specifically, in order to switch to batch mode, users will need to
> explicitly set execution.checkpointing.interval-during-backlog to 0. And
> users can always explicitly update
> execution.checkpointing.interval-during-backlog to turn off the batch mode
> if that incurs any performance issue.
>
> As far as I can tell, for all practical workloads we see in production
> jobs, batch mode is always faster (w.r.t. throughput) than stream mode when
> there is a high backlog of incoming records. Though it is still
> theoretically possible, it should be very rare (if any) for batch mode to
> be slower in practice. Given that it is an optional feature that can be
> turned off by users, it might be OK to just let users try it out and we can
> fix performance issues once we detect any of them. What do you think?
>
>
> >
> > > execution.backlog.use-full-batch-mode-on-start (default false)
> >
> > ops sorry, it was supposed to be sth like:
> >
> > execution.backlog.use-batch-mode-only-on-start (default false)
> >
> > That option would disallow switching from streaming to batch. Batch mode
> > would be allowed only to get rid of the initial, present on start-up
> > backlog.
> >
> > Would allow us to safely experiment with switching from streaming to
> batch
> > and I would be actually more fine in enabling "using batch mode on start"
> > by default, until we gain confidence and feedback that switching back &
> > forth
> > is working as expected.
> >
>
> Now I understand what you are suggesting. I agree that it is necessary fo

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-21 Thread Piotr Nowojski
Hi Dong,

Thanks a lot for the answers. I can now only briefly answer your last email.

> It is possible that spilling to disks might cause larger overhead. IMO it
> is an orthogonal issue already existing in Flink. This is because a Flink
> job running batch mode might also be slower than its throughput in stream
> mode due to the same reason.

Yes, I know, but the thing that worries me is that previously only a user
alone
could decide whether to use batch mode or streaming, and in practice one
user would rarely (if ever) use both for the same problem/job/query. If his
intention was to eventually process live data, he was using streaming even
if there was a large backlog at the start (apart of some very few very power
users).

With this change, we want to introduce a mode that would be switching back
and forth between streaming and "batch in streaming" automatically. So a
potential performance regression would be much more visible and painful
at the same time. If batch query runs slower then it could, it's kind of
fine as
it will end at some point. If streaming query during large back pressure
maybe
temporary load spike switches to batch processing, that's a bigger deal.
Especially if batch processing mode will not be able to actually even handle
the normal load, after the load spike. In that case, the job could never
recover
from the backpressure/backlog mode.

> execution.backlog.use-full-batch-mode-on-start (default false)

ops sorry, it was supposed to be sth like:

execution.backlog.use-batch-mode-only-on-start (default false)

That option would disallow switching from streaming to batch. Batch mode
would be allowed only to get rid of the initial, present on start-up
backlog.

Would allow us to safely experiment with switching from streaming to batch
and I would be actually more fine in enabling "using batch mode on start"
by default, until we gain confidence and feedback that switching back &
forth
is working as expected.

>> Or we could limit the scope of this FLIP to only support starting with
>> batch mode and switching only once to
>> streaming, and design a follow up with switching back and forth?
>
> Sure, that sounds good to me. I am happy to split this FLIP into two FLIPs
> so that we can make incremental progress.

Great, let's do that. In a follow up FLIP we could restart the discussion
about
switching back and forth.

Piotrek

czw., 20 lip 2023 o 16:57 Dong Lin  napisał(a):

> Hi Piotr,
>
> Thank you for the very detailed comments! Please see my reply inline.
>
> On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski 
> wrote:
>
> > Hi Dong,
> >
> > I have a couple of follow up questions about switching back and forth
> > between streaming and batching mode.
> > Especially around shuffle/watermark strategy, and keyed state backend.
> >
> > First of all, it might not always be beneficial to switch into the batch
> > modes:
> > - Shuffle strategy
> > - Is sorting going to be purely in-memory? If not, obviously spilling
> > to disks might cause larger overheads
> >compared to not sorting the records.
> >
>
> Sorting might require spilling data to disk depending on the input size.
> The behavior of sorting w.r.t. memory/disk is expected to be exactly the
> same as the behavior of input sorting automatically performed by Flink
> runtime in batch mode for keyed inputs.
>
> More specifically, ExternalSorter
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
> >
> is
> currently used to sort keyed inputs in batch mode. It is automatically used
> by Flink runtime in OneInputStreamTask (here
> <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
> >)
> and in MultiInputSortingDataInput (here
> <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
> >).
> We plan to re-use the same code/mechanism to do sorting.
>
> It is possible that spilling to disks might cause larger overhead. IMO it
> is an orthogonal issue already existing in Flink. This is because a Flink
> job running batch mode might also be slower than its throughput in stream
> mode due to the same reason. However, even though it is possible in theory,
> I expect that in practice the throughput of using sorting +
> BatchExecutionKeyedStateBackend should be much higher than using other
> keyed statebackends when the amount of data is large. As a matter of fact,
> we have not heard of complaints of such performance regression issues in
> batch mode.
>
&

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-19 Thread Piotr Nowojski
cally, operators that involve aggregation operation (e.g. join,
> > > cogroup, aggregate) on keyed inputs can benefit from using an internal
> > > sorter."*
> > >
> > > *"The operator that performs CoGroup operation will instantiate two
> > > internal sorter to sorts records from its two inputs separately. Then
> it
> > > can pull the sorted records from these two sorters. This can be done
> > > without wrapping input records with TaggedUnion<...>. In comparison,
> the
> > > existing DataStream#coGroup needs to wrap input records with
> > > TaggedUnion<...> before sorting them using one external sorter, which
> > > introduces higher overhead."*
> > >
> > > According to the performance test, it seems that internal sorter has
> > better
> > > performance than external sorter. Is it possible to make those
> operators
> > > that can benefit from it use internal sorter by default?
> > >
> >
> > Yes, it is possible. After this FLIP is done, users can use
> > DataStream#coGroup with EndOfStreamWindows as the window assigner to
> > co-group two streams in effectively the batch manner. An operator that
> uses
> > an internal sorter will be used to perform the co-group operation. There
> is
> > no need for users of the DataStream API to explicitly know or set the
> > internal sorter in anyway.
> >
> > In the future, we plan to incrementally optimize other aggregation
> > operation (e.g. aggregate) on the DataStream API when EndOfStreamWindows
> is
> > used as the window assigner.
> >
> > Best,
> > Dong
> >
> >
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Tue, Jul 11, 2023 at 2:58 PM Dong Lin  wrote:
> > >
> > > > Hi Jing,
> > > >
> > > > Thank you for the comments! Please see my reply inline.
> > > >
> > > > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge 
> > > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for the proposal! The FLIP is already in good shape. I got
> > some
> > > > NIT
> > > > > questions.
> > > > >
> > > > > 1. It is a little bit weird to write the hint right after the
> > > motivation
> > > > > that some features have been moved to FLIP-331, because at that
> time,
> > > > > readers don't know the context about what features does it mean. I
> > > would
> > > > > suggest moving the note to the beginning of "Public interfaces"
> > > sections.
> > > > >
> > > >
> > > > Given that the reviewer who commented on this email thread before I
> > > > refactored the FLIP (i.e. Piotr) has read FLP-331, I think it is
> > simpler
> > > to
> > > > just remove any mention of FLIP-331. I have updated the FLIP
> > accordingly.
> > > >
> > > >
> > > > > 2. It is also a little bit weird to describe all behaviour changes
> at
> > > > first
> > > > > but only focus on one single feature, i.e. how to implement
> > > > > internalSorterSupported. TBH, I was lost while I was reading the
> > Public
> > > > > interfaces. Maybe change the FLIP title? Another option could be to
> > > > write a
> > > > > short summary of all features and point out that this FLIP will
> only
> > > > focus
> > > > > on the internalSorterSupported feature. Others could be found in
> > > > FLIP-331.
> > > > > WDYT?
> > > > >
> > > >
> > > > Conceptually, the purpose of this FLIP is to allow a stream mode job
> to
> > > run
> > > > parts of the topology in batch mode so that it can apply
> > > > optimizations/computations that can not be used together with
> > > checkpointing
> > > > (and thus not usable in stream mode). Although internal sorter is the
> > > only
> > > > optimization immediately supported in this FLIP, this FLIP lays the
> > > > foundation to support other optimizations in the future, such as
> using
> > > GPU
> > > > to process a bounded stream of records.
> > > >
> > > > Therefore, I find it better to keep the current title rather than
> > > limiting
> > > > the scope to internal sorter. What do you think?
> > > >
> > &

Re: [VOTE] FLIP-309: Support using larger checkpointing interval when source is processing backlog

2023-07-18 Thread Piotr Nowojski
+1 (binding)

Piotrek

wt., 18 lip 2023 o 08:51 Jing Ge  napisał(a):

> +1(binding)
>
> Best regards,
> Jing
>
> On Tue, Jul 18, 2023 at 8:31 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> > +1(binding)
> >
> > Best,
> > Rui Fan
> >
> >
> > On Tue, Jul 18, 2023 at 12:04 PM Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > We would like to start the vote for FLIP-309: Support using larger
> > > checkpointing interval when source is processing backlog [1]. This FLIP
> > was
> > > discussed in this thread [2].
> > >
> > > The vote will be open until at least July 21st (at least 72 hours),
> > > following
> > > the consensus voting process.
> > >
> > > Cheers,
> > > Yunfeng and Dong
> > >
> > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-309
> > >
> > >
> >
> %3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog
> > > [2] https://lists.apache.org/thread/l1l7f30h7zldjp6ow97y70dcthx7tl37
> > >
> >
>


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-18 Thread Piotr Nowojski
Thanks Dong!

Piotrek

wt., 18 lip 2023 o 06:04 Dong Lin  napisał(a):

> Hi all,
>
> I have updated FLIP-309 as suggested by Piotr to include a reference to
> FLIP-328 in the future work section.
>
> Piotra, Stephan, and I discussed offline regarding the choice
> between execution.checkpointing.max-interval and
> execution.checkpointing.interval-during-backlog.
> The advantage of using "max-interval" is that Flink runtime can have more
> flexibility to decide when/how to adjust checkpointing intervals (based on
> information other than backlog). The advantage of using
> "interval-during-backlog" is that it is clearer to the user when/how this
> configured interval is used. Since there is no immediate need for the extra
> flexibility as of this FLIP, we agreed to go with interval-during-backlog
> for now. And we can rename this config to e.g.
> execution.checkpointing.max-interval when needed in the future.
>
> Thanks everyone for all the reviews and suggestions! And special thanks to
> Piotr and Stephan for taking extra time to provide detailed reviews and
> suggestions offline!
>
> Since there is no further comment, I will open the voting thread for this
> FLIP.
>
> Cheers,
> Dong
>
>
> On Fri, Jul 14, 2023 at 11:39 PM Piotr Nowojski 
> wrote:
>
> > Hi All,
> >
> > We had a lot of off-line discussions. As a result I would suggest
> dropping
> > the idea of introducing an end-to-end-latency concept, until
> > we can properly implement it, which will require more designing and
> > experimenting. I would suggest starting with a more manual solution,
> > where the user needs to configure concrete parameters, like
> > `execution.checkpointing.max-interval` or `execution.flush-interval`.
> >
> > FLIP-309 looks good to me, I would just rename
> > `execution.checkpointing.interval-during-backlog` to
> > `execution.checkpointing.max-interval`.
> >
> > I would also reference future work, that a solution that would allow set
> > `isProcessingBacklog` for sources like Kafka will be introduced via
> > FLIP-328 [1].
> >
> > Best,
> > Piotrek
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag
> >
> > śr., 12 lip 2023 o 03:49 Dong Lin  napisał(a):
> >
> > > Hi Piotr,
> > >
> > > I think I understand your motivation for suggeseting
> > > execution.slow-end-to-end-latency now. Please see my followup comments
> > > (after the previous email) inline.
> > >
> > > On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the updates, a couple of comments:
> > > >
> > > > > If a record is generated by a source when the source's
> > > > isProcessingBacklog is true, or some of the records used to
> > > > > derive this record (by an operator) has isBacklog = true, then this
> > > > record should have isBacklog = true. Otherwise,
> > > > > this record should have isBacklog = false.
> > > >
> > > > nit:
> > > > I think this conflicts with "Rule of thumb for non-source operators
> to
> > > set
> > > > isBacklog = true for the records it emits:"
> > > > section later on, when it comes to a case if an operator has mixed
> > > > isBacklog = false and isBacklog = true inputs.
> > > >
> > > > > execution.checkpointing.interval-during-backlog
> > > >
> > > > Do we need to define this as an interval config parameter? Won't that
> > add
> > > > an option that will be almost instantly deprecated
> > > > because what we actually would like to have is:
> > > > execution.slow-end-to-end-latency and execution.end-to-end-latency
> > > >
> > >
> > > I guess you are suggesting that we should allow users to specify a
> higher
> > > end-to-end latency budget for those records that are emitted by
> two-phase
> > > commit sink, than those records that are emitted by none-two-phase
> commit
> > > sink.
> > >
> > > My concern with this approach is that it will increase the complexity
> of
> > > the definition of "processing latency requirement", as well as the
> > > complexity of the Flink runtime code that handles it. Currently, the
> > > FLIP-325 defines end-to-end latency as an attribute of the records that
> &g

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-14 Thread Piotr Nowojski
Hi All,

We had a lot of off-line discussions. As a result I would suggest dropping
the idea of introducing an end-to-end-latency concept, until
we can properly implement it, which will require more designing and
experimenting. I would suggest starting with a more manual solution,
where the user needs to configure concrete parameters, like
`execution.checkpointing.max-interval` or `execution.flush-interval`.

FLIP-309 looks good to me, I would just rename
`execution.checkpointing.interval-during-backlog` to
`execution.checkpointing.max-interval`.

I would also reference future work, that a solution that would allow set
`isProcessingBacklog` for sources like Kafka will be introduced via
FLIP-328 [1].

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-328%3A+Allow+source+operators+to+determine+isProcessingBacklog+based+on+watermark+lag

śr., 12 lip 2023 o 03:49 Dong Lin  napisał(a):

> Hi Piotr,
>
> I think I understand your motivation for suggeseting
> execution.slow-end-to-end-latency now. Please see my followup comments
> (after the previous email) inline.
>
> On Wed, Jul 12, 2023 at 12:32 AM Piotr Nowojski 
> wrote:
>
> > Hi Dong,
> >
> > Thanks for the updates, a couple of comments:
> >
> > > If a record is generated by a source when the source's
> > isProcessingBacklog is true, or some of the records used to
> > > derive this record (by an operator) has isBacklog = true, then this
> > record should have isBacklog = true. Otherwise,
> > > this record should have isBacklog = false.
> >
> > nit:
> > I think this conflicts with "Rule of thumb for non-source operators to
> set
> > isBacklog = true for the records it emits:"
> > section later on, when it comes to a case if an operator has mixed
> > isBacklog = false and isBacklog = true inputs.
> >
> > > execution.checkpointing.interval-during-backlog
> >
> > Do we need to define this as an interval config parameter? Won't that add
> > an option that will be almost instantly deprecated
> > because what we actually would like to have is:
> > execution.slow-end-to-end-latency and execution.end-to-end-latency
> >
>
> I guess you are suggesting that we should allow users to specify a higher
> end-to-end latency budget for those records that are emitted by two-phase
> commit sink, than those records that are emitted by none-two-phase commit
> sink.
>
> My concern with this approach is that it will increase the complexity of
> the definition of "processing latency requirement", as well as the
> complexity of the Flink runtime code that handles it. Currently, the
> FLIP-325 defines end-to-end latency as an attribute of the records that is
> statically assigned when the record is generated at the source, regardless
> of how it will be emitted later in the topology. If we make the changes
> proposed above, we would need to define the latency requirement w.r.t. the
> attribute of the operators that it travels through before its result is
> emitted, which is less intuitive and more complex.
>
> For now, it is not clear whether it is necessary to have two categories of
> latency requirement for the same job. Maybe it is reasonable to assume that
> if a job has two-phase commit sink and the user is OK to emit some results
> at 1 minute interval, then more likely than not the user is also OK to emit
> all results at 1 minute interval, include those that go through
> none-two-phase commit sink?
>
> If we do want to support different end-to-end latency depending on whether
> the operator is emitted by two-phase commit sink, I would prefer to still
> use execution.checkpointing.interval-during-backlog instead of
> execution.slow-end-to-end-latency. This allows us to keep the concept of
> end-to-end latency simple. Also, by explicitly including "checkpointing
> interval" in the name of the config that directly affects checkpointing
> interval, we can make it easier and more intuitive for users to understand
> the impact and set proper value for such configs.
>
> What do you think?
>
> Best,
> Dong
>
>
> > Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe
> a
> > better name), and for the time being
> > use it as the checkpoint interval value during backlog?
>
>
> > Or do you envision that in the future users will be configuring only:
> > - execution.end-to-end-latency
> > and only optionally:
> > - execution.checkpointing.interval-during-backlog
> > ?
> >
> > Best Piotrek
> >
> > PS, I will read the summary that you have just published later, but I
> think
> > we don't need to block this FLIP on t

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-11 Thread Piotr Nowojski
Hi Dong,

Thanks for the updates, a couple of comments:

> If a record is generated by a source when the source's
isProcessingBacklog is true, or some of the records used to
> derive this record (by an operator) has isBacklog = true, then this
record should have isBacklog = true. Otherwise,
> this record should have isBacklog = false.

nit:
I think this conflicts with "Rule of thumb for non-source operators to set
isBacklog = true for the records it emits:"
section later on, when it comes to a case if an operator has mixed
isBacklog = false and isBacklog = true inputs.

> execution.checkpointing.interval-during-backlog

Do we need to define this as an interval config parameter? Won't that add
an option that will be almost instantly deprecated
because what we actually would like to have is:
execution.slow-end-to-end-latency and execution.end-to-end-latency

Maybe we can introduce only `execution.slow-end-to-end-latency` (% maybe a
better name), and for the time being
use it as the checkpoint interval value during backlog?

Or do you envision that in the future users will be configuring only:
- execution.end-to-end-latency
and only optionally:
- execution.checkpointing.interval-during-backlog
?

Best Piotrek

PS, I will read the summary that you have just published later, but I think
we don't need to block this FLIP on the
existence of that high level summary.

wt., 11 lip 2023 o 17:49 Dong Lin  napisał(a):

> Hi Piotr and everyone,
>
> I have documented the vision with a summary of the existing work in this
> doc. Please feel free to review/comment/edit this doc. Looking forward to
> working with you together in this line of work.
>
>
> https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing
>
> Best,
> Dong
>
> On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski 
> wrote:
>
> > Hi All,
> >
> > Me and Dong chatted offline about the above mentioned issues (thanks for
> > that offline chat
> > I think it helped both of us a lot). The summary is below.
> >
> > > Previously, I thought you meant to add a generic logic in
> > SourceReaderBase
> > > to read existing metrics (e.g. backpressure) and emit the
> > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > > misunderstood your suggetions.
> > >
> > > After double-checking your previous suggestion, I am wondering if you
> are
> > > OK with the following approach:
> > >
> > > - Add a job-level config
> execution.checkpointing.interval-during-backlog
> > > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > > isProcessingBacklog).
> > > - When this API is invoked, it internally sends an
> > > internal SourceReaderBacklogEvent to SourceCoordinator.
> > > - SourceCoordinator should keep track of the latest isProcessingBacklog
> > > status from all its subtasks. And for now, we will hardcode the logic
> > such
> > > that if any source reader says it is under backlog, then
> > > execution.checkpointing.interval-during-backlog is used.
> > >
> > > This approach looks good to me as it can achieve the same performance
> > with
> > > the same number of public APIs for the target use-case. And I suppose
> in
> > > the future we might be able to re-use this API for source reader to set
> > its
> > > backlog status based on its backpressure metrics, which could be an
> extra
> > > advantage over the current approach.
> > >
> > > Do you think we can agree to adopt the approach described above?
> >
> > Yes, I think that's a viable approach. I would be perfectly fine to not
> > introduce
> > `SourceReaderContext#setProcessingBacklog(boolean isProcessingBacklog).`
> > and sending the `SourceReaderBacklogEvent` from SourceReader to JM
> > in this FLIP. It could be implemented once we would decide to add some
> more
> > generic
> > ways of detecting backlog/backpressure on the SourceReader level.
> >
> > I think we could also just keep the current proposal of adding
> > `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the
> sources
> > that
> > can set it on the `SplitEnumerator` level. Later we could merge this with
> > another
> > mechanisms of detecting "isProcessingBacklog", like based on watermark
> lag,
> > backpressure, etc, via some component running on the JM.
> >
> > At the same time I'm fine with having the "isProcessingBacklog" concept
> to
> > switch
> > runtime back and forth between high and low latency modes instead of
> > "backpressure". 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-07-10 Thread Piotr Nowojski
t we want to have in
the future, I'm pretty much fine with the current
FLIP-309 proposal, with a couple of remarks:
1. Could you include in the FLIP-309 the long term solution as we have
discussed.
a) Would be nice to have some diagram showing how the
"isProcessingBacklog" information would be travelling,
 being aggregated and what will be done with that information.
(from SourceReader/SplitEnumerator to some
"component" aggregating it, and then ... ?)
2. For me "processing backlog" doesn't necessarily equate to "backpressure"
(HybridSource can be
both NOT backpressured and processing backlog at the same time). If you
think the same way, can you include that
definition of "processing backlog" in the FLIP including its relation
to the backpressure state? If not, we need to align
on that definition first :)

Also I'm missing a big picture description, that would show what are you
trying to achieve and what's the overarching vision
behind all of the current and future FLIPs that you are planning in this
area (FLIP-309, FLIP-325, FLIP-327, FLIP-331, ...?).
Or was it described somewhere and I've missed it?

Best,
Piotrek



czw., 6 lip 2023 o 06:25 Dong Lin  napisał(a):

> Hi Piotr,
>
> I am sorry if you feel unhappy or upset with us for not following/fixing
> your proposal. It is not my intention to give you this feeling. After all,
> we are all trying to make Flink better, to support more use-case with the
> most maintainable code. I hope you understand that just like you, I have
> also been doing my best to think through various design options and taking
> time to evalute the pros/cons. Eventually, we probably still need to reach
> consensus by clearly listing and comparing the objective pros/cons of
> different proposals and identifying the best choice.
>
> Regarding your concern (or frustration) that we are always finding issues
> in your proposal, I would say it is normal (and probably necessary) for
> developers to find pros/cons in each other's solutions, so that we can
> eventually pick the right one. I will appreciate anyone who can correctly
> pinpoint the concrete issue in my proposal so that I can improve it or
> choose an alternative solution.
>
> Regarding your concern that we are not spending enough effort to find
> solutions and that the problem in your solution can be solved in a minute,
> I would like to say that is not true. For each of your previous proposals,
> I typically spent 1+ hours thinking through your proposal to understand
> whether it works and why it does not work, and another 1+ hour to write
> down the details and explain why it does not work. And I have had a variety
> of offline discussions with my colleagues discussing various proposals
> (including yours) with 6+ hours in total. Maybe I am not capable enough to
> fix those issues in one minute or so so. If you think your proposal can be
> easily fixed in one minute or so, I would really appreciate it if you can
> think through your proposal and fix it in the first place :)
>
> For your information, I have had several long discussions with my
> colleagues at Alibaba and also Becket on this FLIP. We have seriously
> considered your proposals and discussed in detail what are the pros/cons
> and whether we can improve these solutions. The initial version of this
> FLIP (which allows the source operator to specify checkpoint intervals)
> does not get enough support due to concerns of not being generic (i.e.
> users need to specify checkpoint intervals on a per-source basis). It is
> only after I updated the FLIP to use the job-level
> execution.checkpointing.interval-during-backlog, then they agree to give +1
> to the FLIP. What I want to tell you is that your suggestions have been
> taken seriously, and the quality of the FLIP has been taken seriously
> by all those who have voted. As a result of taking your suggestion
> seriously and trying to find improvements, we updated the FLIP to use
> isProcessingBacklog.
>
> I am wondering, do you think it will be useful to discuss face-to-face via
> video conference call? It is not just between you and me. We can invite the
> developers who are interested to join and help with the discussion. That
> might improve communication efficiency and help us understand each other
> better :)
>
> I am writing this long email to hopefully get your understanding. I care
> much more about the quality of the eventual solution rather than who
> proposed the solution. Please bear with me and see my comments inline, with
> an explanation of the pros/cons of these proposals.
>
>
> On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski 
> wrote:
>
> > Hi Guys,
> >
> > I would like to ask you again, to spend a bit more effort on trying to
> f

Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-06 Thread Piotr Nowojski
Congratulations to everyone :)

Piotrek

czw., 6 lip 2023 o 12:23 Jane Chan  napisał(a):

> Congratulations!
>
> Best,
> Jane
>
> On Thu, Jul 6, 2023 at 2:15 PM Jiadong Lu  wrote:
>
> > Congratulations!
> >
> > Best regards,
> > Jiadong Lu
> >
> > On 2023/7/6 13:26, Weihua Hu wrote:
> > > Congratulations!
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Wed, Jul 5, 2023 at 5:46 PM Shammon FY  wrote:
> > >
> > >> Congratulations!
> > >>
> > >> Best,
> > >> Shammon FY
> > >>
> > >> On Wed, Jul 5, 2023 at 2:38 PM Paul Lam 
> wrote:
> > >>
> > >>> Congrats and cheers!
> > >>>
> > >>> Best,
> > >>> Paul Lam
> > >>>
> >  2023年7月4日 18:04,Benchao Li  写道:
> > 
> >  Congratulations!
> > 
> >  Feng Jin  于2023年7月4日周二 16:17写道:
> > 
> > > Congratulations!
> > >
> > > Best,
> > > Feng Jin
> > >
> > >
> > >
> > > On Tue, Jul 4, 2023 at 4:13 PM Yuxin Tan 
> > >>> wrote:
> > >
> > >> Congratulations!
> > >>
> > >> Best,
> > >> Yuxin
> > >>
> > >>
> > >> Dunn Bangui  于2023年7月4日周二 16:04写道:
> > >>
> > >>> Congratulations!
> > >>>
> > >>> Best,
> > >>> Bangui Dunn
> > >>>
> > >>> Yangze Guo  于2023年7月4日周二 15:59写道:
> > >>>
> >  Congrats everyone!
> > 
> >  Best,
> >  Yangze Guo
> > 
> >  On Tue, Jul 4, 2023 at 3:53 PM Rui Fan <1996fan...@gmail.com>
> > >> wrote:
> > >
> > > Congratulations!
> > >
> > > Best,
> > > Rui Fan
> > >
> > > On Tue, Jul 4, 2023 at 2:08 PM Zhu Zhu 
> > wrote:
> > >
> > >> Congratulations everyone!
> > >>
> > >> Thanks,
> > >> Zhu
> > >>
> > >> Hang Ruan  于2023年7月4日周二 14:06写道:
> > >>>
> > >>> Congratulations!
> > >>>
> > >>> Best,
> > >>> Hang
> > >>>
> > >>> Jingsong Li  于2023年7月4日周二 13:47写道:
> > >>>
> >  Congratulations!
> > 
> >  Thank you! All of the Flink community!
> > 
> >  Best,
> >  Jingsong
> > 
> >  On Tue, Jul 4, 2023 at 1:24 PM tison 
> > >>> wrote:
> > >
> > > Congrats and with honor :D
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Mang Zhang  于2023年7月4日周二 11:08写道:
> > >
> > >> Congratulations!--
> > >>
> > >> Best regards,
> > >> Mang Zhang
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> 在 2023-07-04 01:53:46,"liu ron"  写道:
> > >>> Congrats everyone
> > >>>
> > >>> Best,
> > >>> Ron
> > >>>
> > >>> Jark Wu  于2023年7月3日周一 22:48写道:
> > >>>
> >  Congrats everyone!
> > 
> >  Best,
> >  Jark
> > 
> > > 2023年7月3日 22:37,Yuval Itzchakov 
> > >> 写道:
> > >
> > > Congrats team!
> > >
> > > On Mon, Jul 3, 2023, 17:28 Jing Ge via user <
> >  u...@flink.apache.org
> >  > wrote:
> > >> Congratulations!
> > >>
> > >> Best regards,
> > >> Jing
> > >>
> > >>
> > >> On Mon, Jul 3, 2023 at 3:21 PM yuxia <
> >  luoyu...@alumni.sjtu.edu.cn
> >  > wrote:
> > >>> Congratulations!
> > >>>
> > >>> Best regards,
> > >>> Yuxia
> > >>>
> > >>> 发件人: "Pushpa Ramakrishnan" <
> >  pushpa.ramakrish...@icloud.com
> >   >  pushpa.ramakrish...@icloud.com>>
> > >>> 收件人: "Xintong Song"  > >  >  tonysong...@gmail.com>>
> > >>> 抄送: "dev"  > >> dev@flink.apache.org>>,
> >  "User"  > >>> u...@flink.apache.org
> > >>
> > >>> 发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30
> > >>> 主题: Re: [ANNOUNCE] Apache Flink has won the 2023
> > >>> SIGMOD
> > >> Systems
> > >> Award
> > >>>
> > >>> Congratulations \uD83E\uDD73
> > >>>
> > >>> On 03-Jul-2023, at 3:30 PM, Xintong Song <
> > >> tonysong...@gmail.com
> >  > wrote:
> > >>>
> > >>> 
> > >>> Dear Community,
> > >>>
> > >>> I'm pleased to share this good news with everyone.
> > >> As
> >  

Re: [DISCUSS] FLIP-325: Support configuring end-to-end allowed latency

2023-07-05 Thread Piotr Nowojski
Hi,

Thanks for this proposal, this is a very much needed thing that should be
addressed in Flink.

I think there is one thing that hasn't been discussed neither here nor in
FLIP-309. Given that we have
three dimensions:
- e2e latency/checkpointing interval
- enabling some kind of batching/buffering on the operator level
- how much resources we want to allocate to the job

How do we want Flink to adjust itself between those three? For example:
a) Should we assume that given Job has a fixed amount of assigned resources
and make it paramount that
  Flink doesn't exceed those available resources? So in case of
backpressure, we
  should extend checkpointing intervals, emit records less frequently and
in batches.
b) Or should we assume that the amount of resources is flexible (up to a
point?), and the desired e2e latency
  is the paramount aspect? So in case of backpressure, we should still
adhere to the configured e2e latency,
  and wait for the user or autoscaler to scale up the job?

In case of a), I think the concept of "isProcessingBacklog" is not needed,
we could steer the behaviour only
using the backpressure information.

On the other hand, in case of b), "isProcessingBacklog" information might
be helpful, to let Flink know that
we can safely decrease the e2e latency/checkpoint interval even if there is
no backpressure, to use fewer
resources (and let the autoscaler scale down the job).

Do we want to have both, or only one of those? Do a) and b) complement one
another? If job is backpressured,
we should follow a) and expose to autoscaler/users information "Hey! I'm
barely keeping up! I need more resources!".
While, when there is no backpressure and latency doesn't matter
(isProcessingBacklog=true), we can limit the resource
usage.

And a couple of more concrete remarks about the current proposal.

1.

> I think the goal is to allow users to specify an end-to-end latency
budget for the job.

I fully agree with this, but in that case, why are you proposing to add
`execution.flush.interval`? That's
yet another parameter that would affect e2e latency, without actually
defining it. We already have things
like: execution.checkpointing.interval, execution.buffer-timeout. I'm
pretty sure very few Flink users would be
able to configure or understand all of them.

I think we should simplify configuration and try to define
"execution.end-to-end-latency" so the runtime
could derive other things from this new configuration.

2. How do you envision `#flush()` and `#snapshotState()` to be connected?
So far, `#snapshotState()`
was considered as a kind of `#flush()` signal. Do we need both? Shouldn't
`#flush()` be implicitly or
explicitly attached to the `#snapshotState()` call?

3. What about unaligned checkpoints if we have separate `#flush()`
event/signal?

4. How should this be working in at-least-once mode (especially sources
that are configured to be working
in at-least-once mode)?.

5. How is this FLIP connected with FLI-327? I think they are trying to
achieve basically the same thing:
optimise when data should be flushed/committed to balance between
throughput and latency.

6.

> Add RecordAttributesBuilder and RecordAttributes that extends
StreamElement to provide operator with essential
> information about the records they receive, such as whether the records
are already stale due to backlog.

Passing along `RecordAttribute` for every `StreamElement` would be an
extremely inefficient solution.

If at all, this should be a marker propagated through the JobGraph vie
Events or sent from JM to TMs via an RPC
that would mark "backlog processing started/ended". Note that Events might
be costly, as they need to be
broadcasted. So with a job having 5 keyBy exchanges and parallelism of
1000, the number of events sent is
~4 000 000, while the number of RPCs would be only 5000.

In case we want to only check for the backpressure, we don't need any extra
signal. Operators/subtasks can
get that information very easily from the TMs runtime.

Best,
Piotrek

czw., 29 cze 2023 o 17:19 Dong Lin  napisał(a):

> Hi Shammon,
>
> Thanks for your comments. Please see my reply inline.
>
> On Thu, Jun 29, 2023 at 6:01 PM Shammon FY  wrote:
>
> > Hi Dong and Yunfeng,
> >
> > Thanks for bringing up this discussion.
> >
> > As described in the FLIP, the differences between `end-to-end latency`
> and
> > `table.exec.mini-batch.allow-latency` are: "It allows users to specify
> the
> > end-to-end latency, whereas table.exec.mini-batch.allow-latency applies
> to
> > each operator. If there are N operators on the path from source to sink,
> > the end-to-end latency could be up to
> table.exec.mini-batch.allow-latency *
> > N".
> >
> > If I understand correctly, `table.exec.mini-batch.allow-latency` is also
> > applied to the end-to-end latency for a job, maybe @Jack Wu can give more
> > information.
> >
>
> Based on what I can tell from the doc/code and offline discussion, I
> believe table.exec.mini-batch.allow-latency is not applied to 

Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-07-05 Thread Piotr Nowojski
Hi Dong,

I have a couple of questions.

Could you explain why those properties

@Nullable private Boolean isOutputOnEOF = null;
@Nullable private Boolean isOutputOnCheckpoint = null;
@Nullable private Boolean isInternalSorterSupported = null;

must be `@Nullable`, instead of having the default value set to `false`?

Second question, have you thought about cases where someone is
either bootstrapping from a streaming source like Kafka
or simply trying to catch up after a long period of downtime in a purely
streaming job? Generally speaking a cases where
user doesn't care about latency in the catch up phase, regardless if the
source is bounded or unbounded, but wants to process
the data as fast as possible, and then switch dynamically to real time
processing?

Best,
Piotrek

niedz., 2 lip 2023 o 16:15 Dong Lin  napisał(a):

> Hi all,
>
> I am opening this thread to discuss FLIP-327: Support stream-batch unified
> operator to improve job throughput when processing backlog data. The design
> doc can be found at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
> .
>
> This FLIP enables a Flink job to initially operate in batch mode, achieving
> high throughput while processing records that do not require low processing
> latency. Subsequently, the job can seamlessly transition to stream mode for
> processing real-time records with low latency. Importantly, the same state
> can be utilized before and after this mode switch, making it particularly
> valuable when users wish to bootstrap the job's state using historical
> data.
>
> We would greatly appreciate any comments or feedback you may have on this
> proposal.
>
> Cheers,
> Dong
>


  1   2   3   4   5   6   7   8   9   10   >