Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-12-19 文章 Rui Fan
Thanks everyone for the feedback!

It doesn't have more feedback here, so I started the new vote[1]
just now to update the default value of backoff-multiplier from
1.2 to 1.5.

[1] https://lists.apache.org/thread/0b1dcwb49owpm6v1j8rhrg9h0fvs5nkt

Best,
Rui

On Tue, Dec 12, 2023 at 7:14 PM Maximilian Michels  wrote:

> Thank you Rui! I think a 1.5 multiplier is a reasonable tradeoff
> between restarting fast but not putting too much pressure on the
> cluster due to restarts.
>
> -Max
>
> On Tue, Dec 12, 2023 at 8:19 AM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Hi Maximilian and Mason,
> >
> > Thanks a lot for your feedback!
> >
> > After an offline consultation with Max, I guess I understand your
> > concern for now: when flink job restarts, it will make a bunch of
> > calls to the Kubernetes API, e.g. read/write to config maps, create
> > task managers. Currently, the default restart strategy is fixed-delay
> > with 1s delay time, so flink will restart jobs with high frequency
> > even if flink jobs cannot be started. It will cause the Kubernetes
> > cluster became unstable.
> >
> > That's why I propose changing the default restart strategy to
> > exponential-delay. It can achieve: restarts happen quickly
> > enough unless there are consecutive failures. It is helpful for
> > the stability of external components.
> >
> > After discussing with Max and Zhu Zhu at the PR comment[1],
> > Max suggested using 1.5 as the default value of backoff-multiplier
> > instead of 1.2. The 1.2 is a little small(delay time is too short).
> > This picture[2] is the relationship between restart-attempts and
> > retry-delay-time when backoff-multiplier is 1.2 and 1.5:
> >
> > - The delay-time will reach 1 min after 12 attempts when
> backoff-multiplier is 1.5
> > - The delay-time will reach 1 min after 24 attempts when
> backoff-multiplier is 1.2
> >
> > Is there any other suggestion? Looking forward to more feedback, thanks~
> >
> > BTW, as Zhu said in the comment[1], if we update the default value,
> > a new vote is needed for this default value. So I will pause
> > FLINK-33736[1] first, and the rest of the JIRAs of FLIP-364 will be
> > continued.
> >
> > To Mason:
> >
> > If I understand your concerns correctly, I still don't know how
> > to benchmark. The kubernetes cluster instability only happens
> > when one cluster has a lot of jobs. In general, the test cannot
> > reproduce the pressure. Could you elaborate on how to
> > benchmark for this?
> >
> > After this FLIP, the default restart frequency will be reduced
> > significantly. Especially when a job fails consecutively.
> > Do you think the benchmark is necessary?
> >
> > Looking forward to your feedback, thanks~
> >
> > [1] https://github.com/apache/flink/pull/23247#discussion_r1422626734
> > [2]
> https://github.com/apache/flink/assets/38427477/642c57e0-b415-4326-af05-8b506c5fbb3a
> > [3] https://issues.apache.org/jira/browse/FLINK-33736
> >
> > Best,
> > Rui
> >
> > On Thu, Dec 7, 2023 at 10:57 PM Maximilian Michels 
> wrote:
> >>
> >> Hey Rui,
> >>
> >> +1 for changing the default restart strategy to exponential-delay.
> >> This is something all users eventually run into. They end up changing
> >> the restart strategy to exponential-delay. I think the current
> >> defaults are quite balanced. Restarts happen quickly enough unless
> >> there are consecutive failures where I think it makes sense to double
> >> the waiting time up till the max.
> >>
> >> -Max
> >>
> >>
> >> On Wed, Dec 6, 2023 at 12:51 AM Mason Chen 
> wrote:
> >> >
> >> > Hi Rui,
> >> >
> >> > Sorry for the late reply. I was suggesting that perhaps we could do
> some
> >> > testing with Kubernetes wrt configuring values for the exponential
> restart
> >> > strategy. We've noticed that the default strategy in 1.17 caused a
> lot of
> >> > requests to the K8s API server for unstable deployments.
> >> >
> >> > However, people in different Kubernetes setups will have different
> limits
> >> > so it would be challenging to provide a general benchmark. Another
> thing I
> >> > found helpful in the past is to refer to Kubernetes--for example, the
> >> > default strategy is exponential for pod restarts and we could draw
> >> > inspiration from what they have set as a general purpose default
> config.
> >> >
> >> > Best,
> >> > Mason
> >> >
> >> > On Sun, Nov 19, 2023 at 9:43 PM Rui Fan <1996fan...@gmail.com> wrote:
> >> >
> >> > > Hi David and Mason,
> >> > >
> >> > > Thanks for your feedback!
> >> > >
> >> > > To David:
> >> > >
> >> > > > Given that the new default feels more complex than the current
> behavior,
> >> > > if we decide to do this I think it will be important to include the
> >> > > rationale you've shared in the documentation.
> >> > >
> >> > > Sounds make sense to me, I will add the related doc if we
> >> > > update the default strategy.
> >> > >
> >> > > To Mason:
> >> > >
> >> > > > I suppose we could do some benchmarking on what works well for the
> >> > > resource providers that 

Flink CDC MySqlSplitReader问题

2023-12-19 文章 casel.chen
我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢!


MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read 
snapshot split”这一句话我不理解。
为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot split再读增量的binlog 
split么?


private MySqlRecords pollSplitRecords() throws InterruptedException {
Iterator dataIt;
if (currentReader == null) {
// (1) Reads binlog split firstly and then read snapshot split
if (binlogSplits.size() > 0) {
// the binlog split may come from:
// (a) the initial binlog split
// (b) added back binlog-split in newly added table process
MySqlSplit nextSplit = binlogSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getBinlogSplitReader();
currentReader.submitSplit(nextSplit);
} else if (snapshotSplits.size() > 0) {
MySqlSplit nextSplit = snapshotSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getSnapshotSplitReader();
currentReader.submitSplit(nextSplit);
} else {
LOG.info("No available split to read.");
}
dataIt = currentReader.pollSplitRecords();
return dataIt == null ? finishedSplit() : forRecords(dataIt);
} else if (currentReader instanceof SnapshotSplitReader) {
  
}
...
}