Re: [DISCUSS] Allow sharing (RocksDB) memory between slots

2022-11-17 Thread Khachatryan Roman
solate all kinds of memory does not mean we
> > > should give up the isolation on all kinds of memory. And I believe
> > "managed
> > > memory is isolated and others are not" is much easier for the users to
> > > understand compared to "part of the managed memory is isolated and
> others
> > > are not".
> > >
> > > By waste, I meant reserving a certain amount of memory that is only
> used
> > by
> > > certain use cases that do not always exist. This is exactly what we
> want
> > to
> > > avoid with managed memory in FLIP-49 [1]. We used to have managed
> memory
> > > only used for batch operators, and a containerized-cut-off memory
> > > (something similar to framework.off-heap) for rocksdb state backend.
> The
> > > problem was that, if the user does not change the configuration when
> > > switching between streaming / batch jobs, there would always be some
> > memory
> > > (managed or cut-off) wasted. Similarly, introducing a shared managed
> > memory
> > > zone means reserving one more dedicated part of memory that can get
> > wasted
> > > in many cases. This is probably a necessary price for this new feature,
> > but
> > > let's not break the concept / properties of managed memory for it.
> > >
> > > In your proposal, the fraction for the share managed memory is by
> default
> > > 0. That means to enable the rocksdb memory sharing, users need to
> > manually
> > > increase the fraction anyway. Thus, having the memory sharing rocksdb
> use
> > > managed memory or off-heap memory does not make a significant
> difference
> > > for the new feature users. I'd think of this as "extra operational
> > overhead
> > > for users of a certain new feature" vs. "significant learning cost and
> > > potential behavior change for pretty much all users". I'd be fine with
> > > having some shortcuts to simplify the configuration on the user side
> for
> > > this new feature, but not to invade the managed memory.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > >
> > > On Tue, Nov 15, 2022 at 5:46 PM Khachatryan Roman <
> > > khachatryan.ro...@gmail.com> wrote:
> > >
> > > > Thanks for your reply Xingong Song,
> > > >
> > > > Could you please elaborate on the following:
> > > >
> > > > > The proposed changes break several good properties that we designed
> > for
> > > > > managed memory.
> > > > > 1. It's isolated across slots
> > > > Just to clarify, any way to manage the memory efficiently while
> capping
> > > its
> > > > usage
> > > > will break the isolation. It's just a matter of whether it's managed
> > > memory
> > > > or not.
> > > > Do you see any reasons why unmanaged memory can be shared, and
> managed
> > > > memory can not?
> > > >
> > > > > 2. It should never be wasted (unless there's nothing in the job
> that
> > > > needs
> > > > > managed memory)
> > > > If I understand correctly, the managed memory can already be wasted
> > > because
> > > > it is divided evenly between slots, regardless of the existence of
> its
> > > > consumers in a particular slot.
> > > > And in general, even if every slot has RocksDB / python, it's not
> > > > guaranteed equal consumption.
> > > > So this property would rather be fixed in the current proposal.
> > > >
> > > > > In addition, it further complicates configuration / computation
> > logics
> > > of
> > > > > managed memory.
> > > > I think having multiple options overriding each other increases the
> > > > complexity for the user. As for the computation, I think it's
> desirable
> > > to
> > > > let Flink do it rather than users.
> > > >
> > > > Both approaches need some help from TM for:
> > > > - storing the shared resources (static field in a class might be too
> > > > dangerous because if the backend is loaded by the user-class-loader
> > then
> > > > memory will leak silently).
> > > > - reading the configuration
> > > >

Re: [DISCUSS] Allow sharing (RocksDB) memory between slots

2022-11-15 Thread Khachatryan Roman
Thanks for your reply Xingong Song,

Could you please elaborate on the following:

> The proposed changes break several good properties that we designed for
> managed memory.
> 1. It's isolated across slots
Just to clarify, any way to manage the memory efficiently while capping its
usage
will break the isolation. It's just a matter of whether it's managed memory
or not.
Do you see any reasons why unmanaged memory can be shared, and managed
memory can not?

> 2. It should never be wasted (unless there's nothing in the job that needs
> managed memory)
If I understand correctly, the managed memory can already be wasted because
it is divided evenly between slots, regardless of the existence of its
consumers in a particular slot.
And in general, even if every slot has RocksDB / python, it's not
guaranteed equal consumption.
So this property would rather be fixed in the current proposal.

> In addition, it further complicates configuration / computation logics of
> managed memory.
I think having multiple options overriding each other increases the
complexity for the user. As for the computation, I think it's desirable to
let Flink do it rather than users.

Both approaches need some help from TM for:
- storing the shared resources (static field in a class might be too
dangerous because if the backend is loaded by the user-class-loader then
memory will leak silently).
- reading the configuration

Regards,
Roman


On Sun, Nov 13, 2022 at 11:24 AM Xintong Song  wrote:

> I like the idea of sharing RocksDB memory across slots. However, I'm quite
> concerned by the current proposed approach.
>
> The proposed changes break several good properties that we designed for
> managed memory.
> 1. It's isolated across slots
> 2. It should never be wasted (unless there's nothing in the job that needs
> managed memory)
> In addition, it further complicates configuration / computation logics of
> managed memory.
>
> As an alternative, I'd suggest introducing a variant of
> RocksDBStateBackend, that shares memory across slots and does not use
> managed memory. This basically means the shared memory is not considered as
> part of managed memory. For users of this new feature, they would need to
> configure how much memory the variant state backend should use, and
> probably also a larger framework-off-heap / jvm-overhead memory. The latter
> might require a bit extra user effort compared to the current approach, but
> would avoid significant complexity in the managed memory configuration and
> calculation logics which affects broader users.
>
>
> Best,
>
> Xintong
>
>
>
> On Sat, Nov 12, 2022 at 1:21 AM Roman Khachatryan 
> wrote:
>
> > Hi John, Yun,
> >
> > Thank you for your feedback
> >
> > @John
> >
> > > It seems like operators would either choose isolation for the cluster’s
> > jobs
> > > or they would want to share the memory between jobs.
> > > I’m not sure I see the motivation to reserve only part of the memory
> for
> > sharing
> > > and allowing jobs to choose whether they will share or be isolated.
> >
> > I see two related questions here:
> >
> > 1) Whether to allow mixed workloads within the same cluster.
> > I agree that most likely all the jobs will have the same "sharing"
> > requirement.
> > So we can drop "state.backend.memory.share-scope" from the proposal.
> >
> > 2) Whether to allow different memory consumers to use shared or exclusive
> > memory.
> > Currently, only RocksDB is proposed to use shared memory. For python,
> it's
> > non-trivial because it is job-specific.
> > So we have to partition managed memory into shared/exclusive and
> therefore
> > can NOT replace "taskmanager.memory.managed.shared-fraction" with some
> > boolean flag.
> >
> > I think your question was about (1), just wanted to clarify why the
> > shared-fraction is needed.
> >
> > @Yun
> >
> > > I am just curious whether this could really bring benefits to our users
> > with such complex configuration logic.
> > I agree, and configuration complexity seems a common concern.
> > I hope that removing "state.backend.memory.share-scope" (as proposed
> above)
> > reduces the complexity.
> > Please share any ideas of how to simplify it further.
> >
> > > Could you share some real experimental results?
> > I did an experiment to verify that the approach is feasible,
> > i.e. multilple jobs can share the same memory/block cache.
> > But I guess that's not what you mean here? Do you have any experiments in
> > mind?
> >
> > > BTW, as talked before, I am not sure whether different lifecycles of
> > RocksDB state-backends
> > > would affect the memory usage of block cache & write buffer manager in
> > RocksDB.
> > > Currently, all instances would start and destroy nearly simultaneously,
> > > this would change after we introduce this feature with jobs running at
> > different scheduler times.
> > IIUC, the concern is that closing a RocksDB instance might close the
> > BlockCache.
> > I checked that manually and it seems to work as expected.
> > And 

Re: [DISCUSS] FLIP-193: Snapshots ownership

2021-11-23 Thread Khachatryan Roman
Thanks Dawid, Yun and Piotr,

I think I know where the confusion comes from regarding arbitrarily
recovery histories: Both my counter-proposals were for "no-claim"
mode; I didn't mean to replace "claim" mode with them.
However, as Yun pointed out, it's impossible to guarantee that all the
files will be compacted in a finite number of checkpoints; so let's
withdraw those proposals.

And as there are no other alternatives left, the changes to
SharedStateRegistry or State Backends are not a decisive factor
anymore.

However, it probably still makes sense to clarify the details of how
re-upload will work in case of rescaling.

Let's consider a job running with DoP=1; it created checkpoint C1 with
a single file F1 and then stopped.
We start a new job from C1 in no-claim mode with DoP=2; so two tasks
will receive the same file F1.

Let's say both tasks will re-use F1, so it needs to be re-uploaded.
Now, we have a choice:
1. Re-upload from both tasks
For RocksDB, the state key is: backend UID + SST file name. Both are
the same for two tasks, so the key will be the same.
Currently, SharedStateRegistry will reject both as duplicates.

We can't just replace (to not lose one of the files), so we have to
use random keys.
However, when we further downscale:
- we'll have a conflict on recovery (multiple SST files with the same name)
- we'll re-upload the same file multiple times unnecessarily
So we have to de-duplicate state on recovery - ideally before sending
state snapshots to tasks.

2. Re-upload from one task (proposed in FLIP as optimization)
Both tasks must learn the new key. Otherwise, the snapshot of the
not-reuploading task will refer to a non-existing entry.
We can either re-use the old key (and allow replacement in
SharedStateRegistry); or generate the key on JM before sending task
state snapshots.


P.S.:
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> This is effectively what we have right now, but with an extra (Async?)
Right now, there is absolutely no way to find out when the shared
state can be deleted; it can't be inferred from which checkpoints are
subsumed, and which are not, as future checkpoints might still be
using that state.

Regards,
Roman



On Tue, Nov 23, 2021 at 1:37 PM Piotr Nowojski  wrote:
>
> Hi,
>
> I'm not entirely sure if I fully understand the raised concerns here. So
> let me maybe step back in the discussion a bit and address the original
> points from Roman.
>
> > 2. Instead of forcing re-upload, can we "inverse control" in no-claim
> mode?
>
> I second the concerns from Dawid. This is effectively what we have right
> now, but with an extra (Async?) API call. It's not conceptually simple,
> it's hard to explain to the users, it might take actually forever to
> release the artefacts. Furthermore I don't think the implementation would
> be trivial.
>
> On the other hand the current proposal of having (a) `--claim` and (b)
> `--no-claim` mode are conceptually very simple. (a) being perfectly
> efficient, without any overheads. If you have concerns that (b) will cause
> some overheads, slower first checkpoint etc, keep in mind that the user can
> always pick option (a). Starting a new job from an existing
> savepoint/externalised checkpoint in general shouldn't be time critical, so
> users can always even manually copy the files and still use option (a), or
> just be fine accepting the price of a slower first checkpoint. For other
> use cases - restarting the same job after a downtime - (b) sounds to me to
> be an acceptable option.
>
> I would also like to point out that the "force full snapshot"/"do not use
> previous artefacts" option we will need either way for the incremental
> intermediate savepoints (subject of a next FLIP). From this perspective, we
> are getting the "--no-claim" option basically for free.
>
> > 3. Alternatively, re-upload not necessarily on 1st checkpoint, but after
> a configured number of checkpoints?
>
> I don't see a reason why we couldn't provide an option like that at some
> point in the future. However as it's more complicated to reason about, more
> complicated to implement and I'm not entirely sure how much actually needed
> given the (a) `--claim` mode, I think we can wait for feedback from the
> users before actually implementing it.
>
> > 6. Enforcing re-upload by a single task and skew
> > If we use some greedy logic like subtask 0 always re-uploads then it
> > might be overloaded.
> > So we'll have to obtain a full list of subtasks first (then probably
> > choose randomly or round-robin).
> > However, that requires rebuilding Task snapshot, which is doable but
> > not trivial (which I think supports "reverse API option").
>
> What do you mean by "rebuilding Task snapshot"?
>
> During some early discussions about this point, I've hoped that a state
> backend like changelog could embed into the state handle information which
> operator should actually be responsible for duplicating such shared states.
> However 

Re: [DISCUSS] Apache Flink Jira Process

2021-03-02 Thread Khachatryan Roman
Hi Konstantin,

I think we should try it out.
Even if tickets don't work well it can be a good step towards managing
technical debt in some other way, like wiki.

Thanks!

Regards,
Roman


On Tue, Mar 2, 2021 at 9:32 AM Dawid Wysakowicz 
wrote:

> I'd be fine with dropping the "Trivial" priority in favour of "starter"
> label.
>
> Best,
>
> Dawid
>
> On 01/03/2021 11:53, Konstantin Knauf wrote:
> > Hi Dawid,
> >
> > Thanks for the feedback. Do you think we should simply get rid of the
> > "Trivial" priority then and use the "starter" label more aggressively?
> >
> > Best,
> >
> > Konstantin
> >
> > On Mon, Mar 1, 2021 at 11:44 AM Dawid Wysakowicz  >
> > wrote:
> >
> >> Hi Konstantin,
> >>
> >> I also like the idea.
> >>
> >> Two comments:
> >>
> >> * you describe the "Trivial" priority as one that needs to be
> >> implemented immediately. First of all it is not used to often, but I
> >> think the way it works now is similar with a "starter" label. Tasks that
> >> are not bugs, are easy to implement and we think they are fine to be
> >> taken by newcomers. Therefore they do not fall in my mind into
> >> "immediately" category.
> >>
> >> * I would still deprioritise test instabilities. I think there shouldn't
> >> be a problem with that. We do post links to all failures therefore it
> >> will automatically priortise the tasks according to failure frequencies.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 01/03/2021 09:38, Konstantin Knauf wrote:
> >>> Hi Xintong,
> >>>
> >>> yes, such labels would make a lot of sense. I added a sentence to the
> >>> document.
> >>>
> >>> Thanks,
> >>>
> >>> Konstantin
> >>>
> >>> On Mon, Mar 1, 2021 at 8:51 AM Xintong Song 
> >> wrote:
>  Thanks for driving this discussion, Konstantin.
> 
>  I like the idea of having a bot reminding reporter/assignee/watchers
> >> about
>  inactive tickets and if needed downgrade/close them automatically.
> 
>  My two cents:
>  We may have labels like "downgraded-by-bot" / "closed-by-bot", so that
> >> it's
>  easier to filter and review tickets updated by the bot.
>  We may want to review such tickets (e.g., monthly) in case a valid
> >> ticket
>  failed to draw the attention of relevant committers and the reporter
>  doesn't know who to ping.
> 
>  Thank you~
> 
>  Xintong Song
> 
> 
> 
>  On Sat, Feb 27, 2021 at 1:37 AM Till Rohrmann 
>  wrote:
> 
> > Thanks for starting this discussion Konstantin. I like your proposal
> >> and
> > also the idea of automating the tedious parts of it via a bot.
> >
> > Cheers,
> > Till
> >
> > On Fri, Feb 26, 2021 at 4:17 PM Konstantin Knauf 
> > wrote:
> >
> >> Dear Flink Community,
> >>
> >> I would like to start a discussion on improving and to some extent
>  simply
> >> defining the way we work with Jira. Some aspects have been
> discussed a
> >> while back [1], but I would like to go a bit beyond that with the
> > following
> >> goals in mind:
> >>
> >>
> >>-
> >>
> >>clearer communication and expectation management with the
> community
> >>-
> >>
> >>   a user or contributor should be able to judge the urgency of a
> > ticket
> >>   by its priority
> >>   -
> >>
> >>   if a ticket is assigned to someone the expectation that
> someone
>  is
> >>   working on it should hold
> >>   -
> >>
> >>generally reduce noise in Jira
> >>-
> >>
> >>reduce overhead of committers to ask about status updates of
> >>contributions or bug reports
> >>-
> >>
> >>   “Are you still working on this?”
> >>   -
> >>
> >>   “Are you still interested in this?”
> >>   -
> >>
> >>   “Does this still happen on Flink 1.x?”
> >>   -
> >>
> >>   “Are you still experiencing this issue?”
> >>   -
> >>
> >>   “What is the status of the implementation”?
> >>   -
> >>
> >>while still encouraging users to add new tickets and to leave
>  feedback
> >>about existing tickets
> >>
> >>
> >> Please see the full proposal here:
> >>
> >>
> >>
> https://docs.google.com/document/d/19VmykDSn4BHgsCNTXtN89R7xea8e3cUIl-uivW8L6W8/edit#
> >> .
> >>
> >> The idea would be to discuss this proposal in this thread. If we
> come
>  to
> > a
> >> conclusion, I'd document the proposal in the wiki [2] and we would
> >> then
> >> vote on it (approval by "Lazy Majority").
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> [1]
> >>
> >>
> >>
> https://lists.apache.org/thread.html/rd34fb695d371c2bf0cbd1696ce190bac35dd78f29edd8c60d0c7ee71%40%3Cdev.flink.apache.org%3E
> >> [2]
> >>
> >>
> >>
> 

[VOTE] FLIP-158: Generalized incremental checkpoints

2021-02-02 Thread Khachatryan Roman
Hi everyone,

I'd like to start a vote on FLIP-158 [1] which was discussed in [2].

The vote will be open for at least 72 hours. Unless there are any
objections,
I'll close it by February 5, 2021 if we have received sufficient votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints

[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-158-Generalized-incremental-checkpoints-td47902.html



Regards,
Roman


Re: [DISCUSS] FLIP-158: Generalized incremental checkpoints

2021-01-28 Thread Khachatryan Roman
Thanks a lot for your replies!

Yes, feedback is very much appreciated! Especially regarding the approach
in general.

I think that's a good idea to discuss some details in the follow-up docs or
tickets (but I'd be happy to discuss it here as well).

As for the PoC, I hope we'll publish it soon (logging only), so all
interested users will be able to try it out.

Regards,
Roman


On Thu, Jan 28, 2021 at 1:58 PM Yuan Mei  wrote:

> Big +1 onto this FLIP!
>
> Great to see it is stepping forward since this idea is discussed for quite
> a while. :-)
>
> 1. I totally agree that the critical part is the overhead added during
> normal state updates (forward additional state updates to log as well as
> state updates itself). Once we have this part ready and assessed, we can
> have a better understanding/confidence of impact introduced by writing
> additional log over normal processing.
>
> 2. Besides that, it would be also helpful to understand how this idea fits
> into the overall picture of the fault-tolerance story. But this question
> might be out of the scope of this FLIP, I guess.
>
> Best
> Yuan
>
> On Thu, Jan 28, 2021 at 8:12 PM Stephan Ewen  wrote:
>
> > +1 to this FLIP in general.
> >
> > I like the general idea very much (full disclosure, have been involved in
> > the discussions and drafting of the design for a while, so I am not a
> > new/neutral reviewer here).
> >
> > One thing I would like to see us do here, is reaching out to users early
> > with this, and validating this approach. It is a very fundamental change
> > that also shifts certain tradeoffs, like "cost during execution" vs.
> "cost
> > on recovery". This approach will increase the data write rate to
> > S3/HDFS/...
> > So before we build every bit of the complex implementation, let's try and
> > validate/test the critical bits with the users.
> >
> > In my assessment, the most critical bit is the continuous log writing,
> > which adds overhead during execution time. Recovery is less critical,
> > there'll be no overhead or additional load, so recovery should be
> strictly
> > better than currently.
> > I would propose we hence focus on the implementation of the logging first
> > (ignoring recovery, focusing on one target FileSystem/Object store) and
> > test run this with a few users, see that it works well and whether they
> > like the new characteristics.
> >
> > I am also trying to contribute some adjustments to the FLIP text, like
> more
> > illustrations/explanations, to make it easier to share this FLIP with a
> > wider audience, so we can get the above-mentioned user input and
> > validation.
> >
> > Best,
> > Stephan
> >
> >
> >
> >
> > On Thu, Jan 28, 2021 at 10:46 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Roman,
> > >
> > > +1 from my side on this proposal. Also big +1 for the recent changes in
> > > this FLIP in the motivation and high level overview sections.
> > >
> > > For me there are quite a bit of unanswered things around how to
> actually
> > > implement the proposed changes and especially how to integrate it with
> > the
> > > state backends and checkpointing, but maybe we can do that in either a
> > > follow up design docs or discuss it in the tickets or even maybe some
> > PoC.
> > >
> > > Piotrek
> > >
> > > pt., 15 sty 2021 o 07:49 Khachatryan Roman <
> khachatryan.ro...@gmail.com>
> > > napisał(a):
> > >
> > > > Hi devs,
> > > >
> > > > I'd like to start a discussion of FLIP-158: Generalized incremental
> > > > checkpoints [1]
> > > >
> > > > FLIP motivation:
> > > > Low end-to-end latency is a much-demanded property in many Flink
> > setups.
> > > > With exactly-once, this latency depends on checkpoint
> interval/duration
> > > > which in turn is defined by the slowest node (usually the one doing a
> > > full
> > > > non-incremental snapshot). In large setups with many nodes, the
> > > probability
> > > > of at least one node being slow gets higher, making almost every
> > > checkpoint
> > > > slow.
> > > >
> > > > This FLIP proposes a mechanism to deal with this by materializing and
> > > > uploading state continuously and only uploading the changed part
> during
> > > the
> > > > checkpoint itself. It differs from other approaches in that 1)
> > > checkpoints
> > > > are always incremental; 2) works for any state backend.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints
> > > >
> > > > Any feedback highly appreciated!
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-161: Configuration through environment variables

2021-01-27 Thread Khachatryan Roman
Thanks a lot for writing this summary Ufuk!

> Do you agree that approach 1 has been rejected?
I don't think so. To me, using conversion is prone to errors to the same
degree as escaping in option 2 (plus inconvenience in editing).

> Is there any objection to approach 2?
Not from my side.

> Did we consider whether approach 3 is good enough?
Does this mean putting all system properties into the Configuration eagerly?

> Is approach 4 a viable option after all? I think we should continue the
discussion around the questions that were brought up.
I think it's still viable.

Regards,
Roman


On Wed, Jan 27, 2021 at 11:50 AM Ufuk Celebi  wrote:

> Let me try to summarize the current state of the discussion. I think we
> now have a few competing approaches.
>
> # Approach 1: dynamic env var mapping
>
> * This is the approach currently outlined in the FLIP
> * The assumption that we only have dots and hyphens in config option keys
> seem to be wrong, e.g. custom backends or metrics reporters (see Chesnay's
> responses)
>
> Due to the 2nd point, this option seems to be not viable. We could opt to
> only support the subset of keys that satisfy the restriction on their key,
> but this could be confusing to users.
>
> I'm reading our discussion as rejecting this approach due to its
> limitations.
>
> # Approach 2: single DYNAMIC_PROPERTIES env var
>
> * Parse DYNAMIC_PROPERTIES env var during configuration load time
> * The provided value may include multiple config option entries, similar
> to our docker-entrypoint script (
> https://github.com/apache/flink-docker/blob/master/1.12/scala_2.12-java8-debian/docker-entrypoint.sh#L79-L91
> )
>
> This option would essentially canonicalize our existing docker-entrypoint
> script solution (which does not suffer from any limitations on the keys).
>
> I think everybody in the discussion agrees that configuration of multiple
> options as a single value is cumbersome, but besides that everybody seems
> to be generally on board with this approach.
>
> # Approach 3: main args-based dynamic properties (do nothing)
>
> * No change required
> * Map env vars to config options via command line args dynamic properties,
> e.g. -Dkey=$(VALUE)
>
> I think this is the approach that Ingo now favours. The main question
> seems to be whether this is good enough and whether it works in all
> contexts we care about.
>
> # Approach 4: substitute flink-conf.yaml values only
>
> * flink-conf.yaml entries can reference env vars as values
>
> This approach was rejected in initial DISCUSS thread, but there seems to
> be some unanswered questions here.
>
> Roman brought up the question of providing a default mapping for all known
> keys which would allow individual env vars to configure most options out of
> the box. If an option is not part of the default mapping, users would be
> able to still define the mapping manually. The main down side seems to be
> that we would have to maintain the default mapping somehow. This is
> something we had  rejected earlier as too fragile.
>
> # Summary
>
> A few questions in order to move this thread forward:
>
> * Do you agree that approach 1 has been rejected?
> * Is there any objection to approach 2?
> * Did we consider whether approach 3 is good enough?
> * Is approach 4 a viable option after all? I think we should continue the
> discussion around the questions that were brought up.
>
> I hope that we can reach a conclusion soon.
>
> – Ufuk
>
> On Wed, Jan 27, 2021, at 10:45 AM, Khachatryan Roman wrote:
> > Hi Ingo,
> > I missed that part of the discussion, sorry about that.
> > Would you mind putting it to the FLIP page?
> >
> > I guess you are referring to this message:
> > > I don't think we can assume that we know all config option keys. For
> > instance, I might write a custom high availability service or a custom
> > FileSystem plugin that has it's own config options. It would be a pity
> (but
> > maybe tolerable) if env var config would only work with Flink's core
> > options.
> > Can it be solved by adding an annotating and then scanning classpath via
> > reflection?
> > This is not ideal, but knowing all options in advance might be a right
> step
> > towards eager config evaluation.
> >
> > @Chesnay and Thomas
> > Thanks for the clarification. IIUC, the format would be
> > DYNAMIC_PROPERTIES='-Drest.port=1234 -Dother.option="iContainAn=Sign"'.
> > But the evaluation WILL have nothing to do with System Properties as it
> > will happen already inside the java process, right?
> > This may be a bit confusing for users (but probably not a big issue
> though).
> >
> > A

Re: [DISCUSS] FLIP-161: Configuration through environment variables

2021-01-27 Thread Khachatryan Roman
Hi Ingo,
I missed that part of the discussion, sorry about that.
Would you mind putting it to the FLIP page?

I guess you are referring to this message:
> I don't think we can assume that we know all config option keys. For
instance, I might write a custom high availability service or a custom
FileSystem plugin that has it's own config options. It would be a pity (but
maybe tolerable) if env var config would only work with Flink's core
options.
Can it be solved by adding an annotating and then scanning classpath via
reflection?
This is not ideal, but knowing all options in advance might be a right step
towards eager config evaluation.

@Chesnay and Thomas
Thanks for the clarification. IIUC, the format would be
DYNAMIC_PROPERTIES='-Drest.port=1234 -Dother.option="iContainAn=Sign"'.
But the evaluation WILL have nothing to do with System Properties as it
will happen already inside the java process, right?
This may be a bit confusing for users (but probably not a big issue though).

As for the substitution in file, Yang wrote:
> It is true that we need to maintain a flink configuration file which
simply maps all keys to some environment variables. But
> for Yarn and K8s deployment(both standalone on K8s and native), we
already have such a file, which is shipped from client
> or mounted from a ConfigMap.
Can we have a default file in Flink on the classpath with keys already
mapped to env vars? (as I wrote in the very beginning).

Regards,
Roman


On Wed, Jan 27, 2021 at 9:24 AM Ingo Bürk  wrote:

> Hi Yang,
>
> yeah, this is essentially the solution we also use in Ververica Platform
> (except that we use ${…}). I can't really add anything other than the
> previously stated reasons for why we decided to reject this, but of course
> I'm also open to this solution still. I don't think Flink necessarily needs
> to maintain a flink-conf.yaml with all environment variables; I would
> assume users generally would want to override only some properties, not
> most of them, and for a config like that basically any EV not set would end
> up causing an error.
>
>
> Regards
> Ingo
>
> On Wed, Jan 27, 2021 at 4:42 AM Yang Wang  wrote:
>
>> Hi all, sorry to butt in.
>>
>> I am little curious about why do we have to do the overwritten via
>> environment variables after
>> loading the configuration from file. Could we support to do the
>> substitution while loading the
>> "flink-conf.yaml" file?
>>
>> For example, I have the following config options in my flink-conf.yaml.
>> fs.oss.accessKeyId: $(FS_OSS_ACCESS_KEY_ID)
>> fs.oss.accessKeySecret: $(FS_OSS_ACCESS_KEY_SECRET)
>>
>> I expect these environment variables could be substituted when loading the
>> configuration file. It is
>> very similar to use "*envsubst < /tmp/flink-conf.yaml >
>> /tmp/flink-conf-1.yaml*".
>>
>> I know this is a rejected alternative. But I think some reasons could not
>> stand on.
>> * We could use $(FS_OSS_ACCESS_KEY_ID) instead of ${FS_OSS_ACCESS_KEY_ID}
>> for the environment definition
>> to avoid escape issues. I think the Kubernetes has the same behavior[1].
>> Maybe many users are already familiar with it.
>> * Users do not need to know the conversion between flink config option and
>> environment name. Because they could use
>> any name they want.
>> * It is true that we need to maintain a flink configuration file
>> which simply maps all keys to some environment variables. But
>> for Yarn and K8s deployment(both standalone on K8s and native), we already
>> have such a file, which is shipped from client
>> or mounted from a ConfigMap.
>>
>>
>> @Ingo This solution could perfectly work with Kubernetes deployment and is
>> easier to use. We could use a ConfigMap for
>> storing the flink-conf.yaml, and using secrets to exposed as environment
>> variables for the authentication informations.
>>
>>
>> [1].
>>
>> https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#using-environment-variables-inside-of-your-config
>>
>>
>> Best,
>> Yang
>>
>> Chesnay Schepler  于2021年1月27日周三 上午8:03写道:
>>
>> > In the end DYNAMIC_PROPERTIES behaves exactly like env.java.opts;
>> > meaning that the existing rules set by the JVM apply.
>> >
>> > Here's an example: export DYNAMIC_PROPERTIES='-Drest.port=1234
>> > -Dother.option="iContainAn=Sign"'
>> >
>> > This would then be appended as is to the /java/ command.
>> > (
>> >  Conceptually at least; shells are annoying when it comes to
>> > quotes/whitespace;  good old http://mywiki.wooled

Re: [DISCUSS] FLIP-161: Configuration through environment variables

2021-01-26 Thread Khachatryan Roman
> Here's an example: My option key is custom.my_backend_option. With the
> current design, the corresponding env variable would be
> CUSTOM_MY_BACKEND_OPTION, which would be converted into
> custom.my.backend.option .

I think we don't have to translate CUSTOM_MY_BACKEND_OPTION back. Instead,
we should use the key from the ConfigOption.
I'm assuming that not  every ENV VAR will end up in the Configuration -
only those for which a matching ConfigOptions is found.

I'm also fine with a single ENV VAR (DYNAMIC_PROPERTIES). It's already a
big improvement.
In the future, we can consider adding smth like ConfigOption.withEnvVar for
some (most popular) options.

However, escaping is still not clear to me: how would kv-pairs be
separated? What if such a separator is in the value itself? What if '=' is
in the value?
Or am I missing something?

Regards,
Roman


On Tue, Jan 26, 2021 at 6:41 PM Till Rohrmann  wrote:

> Thinking a bit more about the DYNAMIC_PROPERTIES, I have to admit that I
> like the fact that it works around the problem of encoding the key names
> and that it is more powerful wrt to bulk changes. Also the fact that one
> can copy past configuration snippets is quite useful. Given these aspects
> and that we wouldn't exclude any mentioned configuration scenarios, I would
> also be ok following this approach given that we support it for all Flink
> processes.
>
> Cheers,
> Till
>
> On Tue, Jan 26, 2021 at 5:10 PM Ingo Bürk  wrote:
>
>> Hi everyone,
>>
>> thanks for the livid discussion, it's great to see so many opinions and
>> ideas!
>>
>> The point about underscores is a very valid point where the current FLIP,
>> if we were to stick with it, would have to be improved. I was going to say
>> that we should exclude that from the discussion about the merits of
>> different overall solutions, but I am afraid that this makes the "how to
>> name EVs" question even more convoluted, and that in turn directly impacts
>> the usefulness of the FLIP as a whole which is about a more convenient way
>> of configuring Flink; names which are too cryptic will not achieve that.
>> So
>> in this regard I am in agreement with Chesnay.
>>
>> After all these considerations, speaking from the Kubernetes context, it
>> seems to me that using the dynamic properties works best (I can use config
>> key names as-is) and requires no change, so I'm actually just leaning
>> towards that. However, the Kubernetes context is, I guess, not the only
>> one
>> to consider.
>>
>>
>> Best regards
>> Ingo
>>
>> On Tue, Jan 26, 2021 at 3:48 PM Chesnay Schepler 
>> wrote:
>>
>> > Mind you that we could of course solve these character issues by first
>> > nailing down which characters we allow in keys (presumably: [a-z0-9-.]).
>> >
>> > On 1/26/2021 3:45 PM, Chesnay Schepler wrote:
>> > > Here's an example: My option key is custom.my_backend_option. With the
>> > > current design, the corresponding env variable would be
>> > > CUSTOM_MY_BACKEND_OPTION, which would be converted into
>> > > custom.my.backend.option .
>> > >
>> > > It is true that users could still parse the original system property
>> > > as a fall-back, but it seems to partially invalidate the goal and
>> > > introduce the potential for surprises and inconsistent behavior.
>> > >
>> > > What would happen if the option were already defined in the
>> > > flink-conf.yaml, but overwritten with the env variable? Users would
>> > > have to check every time they access a configuration whether the
>> > > system property was also set and resolve things manually. Naturally
>> > > things might also conflict with whatever prioritization we come up
>> with.
>> > >
>> > > Now you might say that this is only necessary if the option contains
>> > > special characters, but then we're setting users up for a surprise
>> > > should they ever rename an existing option to contain an underscore.
>> > >
>> > > As for newlines, I wouldn't expect newline characters to appear within
>> > > DYNAMIC_VARIABLE, but I guess it would follow the same behavior as if
>> > > you would declare them on the command-line?
>> > >
>> > > One more downside I see is that from a users perspective I'd always
>> > > have to do this conversion manually. You can't just copy stuff from
>> > > the documentation (unless we duplicate every single mention), nor can
>> > > you easily switch between environment variables and dynamic
&g

Re: [DISCUSS] FLIP-161: Configuration through environment variables

2021-01-26 Thread Khachatryan Roman
@Chesnay
could you explain how underscores in user-defined properties would be
affected with transformation like STATE_BACKEND -> state.backend?
IIUC, this transformation happens in Flink and doesn't alter ENV vars, so
the user app can still parse the original configuration.
OTH, I'm a bit concerned whether the newline should be escaped by the user
in DYNAMIC_VARIABLES.

@Ingo Bürk 
I feel a bit lost in the discussion) Maybe we can put an intermediate
summary of pros and cons of different approaches into the FLIP?

And for completeness, we could combine DYNAMIC_VARIABLES approach with
passing individual variables.


Regards,
Roman


On Tue, Jan 26, 2021 at 12:54 PM Chesnay Schepler 
wrote:

> I think we have to assume that some user has a custom config option that
> uses underscores.
>
> That said, we can probably assume that no one uses other special
> characters like question marks and such, which are indeed allowed by the
> YAML spec.
>
> These kind of questions are precisely why I prefer the DYNAMIC_VARIABLES
> approach; you don't even have to worry about this stuff.
> The only question we'd have to answer is whether manually defined
> properties should take precedent or not.
>
> @Uce I can see how it could be cumbersome to modify, but at the same
> time you can implement whatever other approach you want on top of it:
>
> // this is a /conceptual /example for an optional setting
> DYNAMIC_VARIABLES="${REST_PORT_SETTING}"
> if _someCondition_:
>export REST_PORT_SETTING="-Drest.port=1234"
>
> // this is a /conceptual /example for a configurable setting
> DYNAMIC_VARIABLES="-Drest.port=${MY_FANCY_VARIABLE:-8200}"
> if _someCondition_:
>export MY_FANCY_VARIABLE="1234"
>
> Additionally, this makes it quite easy to audit stuff, since we can just
> eagerly log what DYNAMIC_VARIABLES is set to.
>
> On 1/26/2021 12:48 PM, Xintong Song wrote:
> > @Ufuk,
> > I also don't find any existing options with underscores in their keys.
> > However, I do not find any explicit rules forbid using them either. I'm
> not
> > saying this should block the FLIP. Just it would be nice to beware of
> this
> > issue, and maybe ensure the assumption with test cases if we finally
> decide
> > to go with these mapping rules.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jan 26, 2021 at 7:27 PM Ufuk Celebi  wrote:
> >
> >> @Xingtong: The assumption for the mapping was that we only have dots and
> >> hyphens in the keys. Do you have an example for a key which include
> >> underscores? If underscores are common for keys (I couldn't find any
> >> existing options that use it), it would certainly be a blocker for the
> >> discussed approach.
> >>
> >> On Tue, Jan 26, 2021, at 11:46 AM, Xintong Song wrote:
> >>> - The naming conversions proposed in the FLIP seems not bijective
> to
> >> me.
> >>> There could be problems if the configuration key contains
> underscores.
> >>>- a_b -> FLINK_CONFIG_a_b
> >>>- FLINK_CONFIG_a_b -> a.b
>
>
>


Re: [DISCUSS] FLIP-161: Configuration through environment variables

2021-01-25 Thread Khachatryan Roman
I agree with Till about $FLINK_PROPERTIES being only supported by a Flink
buildfile.
Besides that,
1. having different ways of configuring different applications doesn't seem
convenient to me. For example, Kafka and ZK configured via usual properties
and Flink via concatenated one.
2. It could also be problematic to re-use the configuration from non-java
apps (PyFlink?).
3. And yes, this can also be used in tests.
4. Having this logic in scripts means we have to test scripts instead of
java/python

I probably missed something but what are your concerns regarding the "magic
syntax" (s3.access-key → FLINK_CONFIG_S3_ACCESS__KEY) you mentioned earlier?

Regards,
Roman


On Mon, Jan 25, 2021 at 6:48 PM Till Rohrmann  wrote:

> The problem I see with $FLINK_PROPERTIES is that this is only supported by
> Flink's Docker entrypoint.sh. In fact this variable was introduced as a
> temporary bridge to allow Docker users to change Flink's configuration
> dynamically. If we had had a proper way of configuring Flink processes via
> env variables, then we wouldn't have introduced it.
>
> Cheers,
> Till
>
> On Mon, Jan 25, 2021 at 5:01 PM Ingo Bürk  wrote:
>
> > Hi Chesnay,
> >
> > thanks for your input! After some thought I think your proposal makes a
> > lot of sense, i.e. if we have one single $FLINK_DYNAMIC_PROPERTIES
> > environment variable, in a Kubernetes environment we could do something
> like
> >
> > ```
> > env:
> >   - name: S3_KEY
> > valueFrom: # get from secret
> >   - name: FLINK_DYNAMIC_PROPERTIES
> > value: -Ds3.access-key=$(S3_KEY)
> > ```
> >
> > This, much like the substitution approach we rejected previously,
> requires
> > "intermediate" variables. However, they're all defined in the same scope
> > here, and this solution certainly has the noteworthy benefit that no
> "magic
> > syntax" (s3.access-key → FLINK_CONFIG_S3_ACCESS__KEY) is needed, which I
> > believe makes for a pretty good user experience here. I think personally
> I
> > prefer this approach over the approach we currently have in the FLIP, but
> > I'm definitely happy to hear thoughts from the others on this approach!
> > Especially maybe also regarding the test randomization point raised
> > by Khachatryan earlier in the discussion.
> >
> >
> > Regards
> > Ingo
> >
> > On Fri, Jan 22, 2021 at 5:18 PM Chesnay Schepler 
> > wrote:
> >
> >> The FLIP seems to disregard the existence of dynamic properties, and I'm
> >> wondering why that is the case.
> >> Don't they fulfill similar roles, in that they allow config options to
> >> be set on the command-line?
> >>
> >> What use-case do they currently not support?
> >> I assume it's something along the lines of setting some environment
> >> variable for containers, but at least for those based on our docker
> >> images we already have a mechanism to support that.
> >>
> >> In any case, wouldn't a single DYNAMIC_PROPERTIES variable suffice that
> >> is automatically evaluated by all scripts?
> >> Why go through all the hassle with the environment variable names?
> >>
> >> On 1/22/2021 3:53 PM, Till Rohrmann wrote:
> >> > The updated design LGTM as well. Nice work Ingo!
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Fri, Jan 22, 2021 at 3:33 PM Ingo Bürk  wrote:
> >> >
> >> >> Thanks, Ufuk. I think that makes sense, so I moved it from a footnote
> >> to an
> >> >> addition to prevent that in the future as well.
> >> >>
> >> >> Ingo
> >> >>
> >> >> On Fri, Jan 22, 2021 at 3:10 PM Ufuk Celebi  wrote:
> >> >>
> >> >>> LGTM. Let's see what the others think...
> >> >>>
> >> >>> On Thu, Jan 21, 2021, at 11:37 AM, Ingo Bürk wrote:
> >>  Regarding env.java.opts, what special handling is needed there?
> >> AFAICT
> >> >>> only
> >>  the rejected alternative of substituting values would've had an
> >> effect
> >> >> on
> >>  this.
> >> >>> Makes sense 
> >> >>>
> >> >>>  From the FLIP:
> >>  This mapping is not strictly bijective, but cases with consecutive
> >> >>> periods or dashes in the key name are not considered here and should
> >> not
> >> >>> (reasonably) be allowed.
> >> >>>
> >> >>> We could actually enforce such restrictions in the implementation of
> >> >>> ConfigOption, avoiding any surprises down the line.
> >> >>>
> >> >>> – Ufuk
> >> >>>
> >>
> >>
>


Re: Re: [DISCUSS] Dealing with deprecated and legacy code in Flink

2021-01-20 Thread Khachatryan Roman
Hi,

I think having two Deprecated annotations (Flink and Java) may be confusing.
One alternative is to combine standard annotation with mandatory Javadocs
tags (checked with checkstyle).
And starting from Java 9 it has "since" and "forRemoval" arguments.


Regards,
Roman


On Wed, Jan 20, 2021 at 2:01 PM Piotr Nowojski  wrote:

> Hi,
>
> I would prefer not to rely on the Jira for marking when something is
> supposed to be deleted. If `@Deprecated(since, planned_to_remove_on)` would
> have two obligatory parameters, there would be no way to "forget" about
> marking it and it would be also self documenting (I don't imagine users
> using JIRA to check this kind of things). We can have Jira tickets for
> those things for tracking purposes on the JIRA release board, but relying
> only on JIRA tickets I think is just asking for inconsistencies.
>
> > Is it actually possible to have a fixed timeframe for these annotations
> to change?
> > I would imagine that it depends on the underlying feature how long an API
> is @PublicEvolving or @Experimental?
>
> I agree it would depend on the feature, hence different features might have
> longer or shorter "unstable" timeframes. But I'm afraid if we won't start
> thinking about fixing this timeframe, we would too often end up with
> perpetually "unstable" APIs. I don't know where I would draw the line
> exactly, but assuming we want to have stable APIs, if something is marked
> `@PublicEvolving` or `@Experimental` for 3 years, IMO it should be switched
> to `@Public` by default (or be moved out of the main repo?).
>
> Piotrek
>
> śr., 20 sty 2021 o 09:54 Matthias Pohl 
> napisał(a):
>
> > Thanks Timo for opening this discussion.
> >
> > +1 I like the idea of adding a deprecation deadline and/or information
> when
> > the
> > functionality was deprecated. It looks like this is already done in the
> > PyFlink code.
> >
> > Creating a JIRA issue for removing the functionality, as Till suggested,
> > might help to
> > maintain this process of removing the deprecated functionality. I'd
> prefer
> > that over
> > relying on the release manager (assuming that he/she would run the check
> as
> > part
> > of the release process) to identify functionality that should have been
> > removed as
> > part of the release. But ok, that might be a team decision.
> >
> > For the connectors: Can't we assume that users would reach out to us if
> we
> > deprecate
> > a connector assuming that they can conclude that this connector will,
> > otherwise, disappear.
> > Maybe, that needs to be mentioned in the deprecation information as well,
> > then.
> > This would have the benefit of getting direct feedback about how much a
> > connector is still in
> > use and may open the doors for other contributors to offer help like it
> > happened for the
> > Mesos support [1].
> >
> > And about the idea of adding such deadlines to @Public, @PublicEvolving,
> > and @Experimental:
> > Is it actually possible to have a fixed timeframe for these annotations
> to
> > change? I would
> > imagine that it depends on the underlying feature how long an API
> > is @PublicEvolving or
> > @Experimental? But it sounds still like a good idea to trigger warnings
> for
> > those annotations
> > in case they haven't been touched for a while. Therefore, I would second
> > this suggestion.
> >
> > Best,
> > Matthias
> >
> > [1]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Remove-Mesos-support-td45974.html#a45985
> >
> > On Tue, Jan 19, 2021 at 10:15 AM Till Rohrmann 
> > wrote:
> >
> > > Thanks a lot for starting this discussion Timo. I like the idea of
> > setting
> > > more explicit guidelines for deprecating functionality.
> > >
> > > I really like the idea of adding with the @Deprecated annotation since
> > when
> > > the function is deprecated. Based on that one can simply search for
> > > features which should be removed in a given release. Alternatively, one
> > > could as you said also state the removal version.
> > >
> > > I think what also works is to directly create a critical JIRA issue
> with
> > > removing functionality as soon as one deprecates something. The problem
> > was
> > > often that after deprecating something, it gets forgotten.
> > >
> > > For dropping connectors I am a bit uncertain. From a project management
> > > perspective it sounds like a good idea to not have to support
> connectors
> > > which are no longer supported for some time. However, what if this
> > > connector is still very popular and in heavy use by our users? Just
> > because
> > > an external system or a version of it is no longer maintained does not
> > mean
> > > that the system is no longer used. I think our current practice with
> > trying
> > > to judge whether our users still use this feature/connector works
> > somewhat.
> > > On the other hand, having these guidelines would probably make it
> easier
> > to
> > > argue to remove something even if there are still a couple of users.
> > 

Re: [DISCUSS] Planning Flink 1.13

2021-01-18 Thread Khachatryan Roman
Hey Dawid,

Replying to your penultimate message:
> We have never done something like this before
I thought releasing usability features can be such an experiment. My
reasoning is the following. As each release requires coordination of all
the participating teams, it will not scale beyond some point. And Flink is
growing rapidly. So we need to cut the scope of each release:
- either by sub-project (i.e. separate release for each component; I think
no one wants it?)
- or by time/feature set (i.e. more frequent lightweight releases)

> However I am bit skeptical about the timing to do it now.
I guess you are right. We should probably start release planning before the
previous one is completed. So maybe 1.14 :)

I think if anyone else is interested we can continue discussion in a
separate thread.

Thanks again for driving this effort!

Regards,
Roman


On Mon, Jan 18, 2021 at 3:59 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> Thank you all for a warm reception as release managers. We are also glad
> that the majority agrees we should aim for the end of March as the feature
> freeze date.
>
>1. Till created a page in our wiki to gather potential features that
>we want to include in the release. Thank you Till! We'd kindly ask all
>committers to fill in the page with features that they intend to work on.
>We'd prefer if only new features end up there and not all bugs/tasks
>separately so that the page is not over bloated. Of course, unless fixing a
>bug is a really big or important one equivalent to implementing a
>completely new feature. In our opinion a good rule of thumb would be that
>each entry in the page could (but does not have to) be later on included in
>a release blog post.
>
> The page can be accessed at:
> https://cwiki.apache.org/confluence/display/FLINK/1.13+Release
>
> It would really help us if we knew what are the most critical features in
> the release, so that we could keep a closer eye on those as potential
> reasons for a delay or make a decision to cut the scope sooner. We were
> thinking if it would be possible to mark certain features as "release
> blockers". There should be no more than a single, unfinished one per
> committer at a time. What do you think of that idea?
>
>
>1. Similarly to the previous release we would like to emphasize the
>importance of keeping the build and tests stable. We think that worked
>pretty well in the last release. Starting this week we will monitor any
>build and/or test instabilities. Please expect nagging for a fix ;)
>
> You can check the current blockers in a dedicated kanban board here:
> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=445
>
>
>1. One of the biggest complaints for the past release(s) was lack of
>proper documentation. Some features don't have a proper documentation even
>though they were implemented a couple of releases ago. We'd like to remind
>everyone that a decent documentation should always come hand in hand with
>the code. We have even a checkmark for it in our PR template. ;) Lets try
>to improve the situation a bit here! We should no longer allow ourselves to
>postpone writing documentation to after the feature freeze.
>
> Your release managers,
> Guowei & Dawid
>


Re: [VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-01-18 Thread Khachatryan Roman
+1 (non-binding)

Thanks for driving the FLIP!

Regards,
Roman


On Tue, Jan 19, 2021 at 2:58 AM Guowei Ma  wrote:

> +1 non-binding
> Best,
> Guowei
>
>
> On Fri, Jan 15, 2021 at 10:56 PM Yun Gao 
> wrote:
>
> >
> > Hi all,
> >
> > I would like to start the vote for FLIP-147[1], which propose to support
> > checkpoints after
> > tasks finished and is discussed in [2].
> >
> > The vote will last at least 72 hours (Jan 20th due to weekend), following
> > the consensus
> > voting process.
> >
> > thanks,
> >  Yun
> >
> >
> > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
> > [2]
> >
> https://lists.apache.org/thread.html/r2780b46267af6e98c7427cb98b36de8218f1499ae098044e1f24c527%40%3Cdev.flink.apache.org%3E
>


Re: [DISCUSS] Flink configuration from environment variables

2021-01-18 Thread Khachatryan Roman
Hi Ingo,

Thanks a lot for this proposal!

We had a related discussion recently in the context of FLINK-19520
(randomizing tests configuration) [1].
I believe other scenarios will benefit as well.

For the end users, I think substitution in configuration files is
preferable over parsing env vars in Flink code.
And for cases without such a file, we could have a default one on the
classpath with all substitutions defined (and then merge everything from
the user-supplied file).

[1] https://issues.apache.org/jira/browse/FLINK-19520

Regards,
Roman


On Mon, Jan 18, 2021 at 11:11 AM Ingo Bürk  wrote:

> Hi everyone,
>
> in Ververica Platform we offer a feature to use environment variables in
> the Flink configuration¹, e.g.
>
> ```
> s3.access-key: ${S3_ACCESS_KEY}
> ```
>
> We've been discussing internally whether contributing such a feature to
> Flink directly would make sense and wanted to start a discussion on this
> topic.
>
> An alternative way to do so from the above would be parsing those directly
> based on their name, so instead of having it defined in the Flink
> configuration as above, it would get automatically set if something like
> $FLINK_CONFIG_S3_ACCESS_KEY was set in the environment. This is somewhat
> similar to what e.g. Spring does, and faces similar challenges (dealing
> with "."s etc.)
>
> Although I view both of these approaches as mostly orthogonal, supporting
> both very likely wouldn't make sense, of course. So I was wondering what
> your opinion is in terms of whether the project would benefit from
> environment variable support for the Flink configuration, and whether there
> are tendencies as to which approach to go with.
>
> ¹
>
> https://docs.ververica.com/user_guide/application_operations/deployments/configure_flink.html#environment-variables
>
> Best regards
> Ingo
>


[DISCUSS] FLIP-158: Generalized incremental checkpoints

2021-01-14 Thread Khachatryan Roman
Hi devs,

I'd like to start a discussion of FLIP-158: Generalized incremental
checkpoints [1]

FLIP motivation:
Low end-to-end latency is a much-demanded property in many Flink setups.
With exactly-once, this latency depends on checkpoint interval/duration
which in turn is defined by the slowest node (usually the one doing a full
non-incremental snapshot). In large setups with many nodes, the probability
of at least one node being slow gets higher, making almost every checkpoint
slow.

This FLIP proposes a mechanism to deal with this by materializing and
uploading state continuously and only uploading the changed part during the
checkpoint itself. It differs from other approaches in that 1) checkpoints
are always incremental; 2) works for any state backend.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints

Any feedback highly appreciated!

Regards,
Roman


Re: [DISCUSS] Planning Flink 1.13

2021-01-14 Thread Khachatryan Roman
Thanks for doing this!

Some teams are currently doing (or already finalizing) the "usability
sprint". I was wondering whether it makes sense to release the implemented
features without waiting for the major changes. If it works well we could
decide to shorten the next release as well.

Regards,
Roman


On Thu, Jan 14, 2021 at 10:28 AM Till Rohrmann  wrote:

> Thanks for volunteering as the release managers for the 1.13 release Guowei
> and Dawid. I'd also be in favour of targeting the end of March as the
> feature freeze date.
>
> I've created a 1.13 wiki page [1] where we can collect the features we want
> to complete for the 1.13 release.
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/1.13+Release
>
> Cheers,
> Till
>
> On Thu, Jan 14, 2021 at 3:27 AM Xintong Song 
> wrote:
>
> > Thanks for kicking off the 1.13 release cycle and volunteering as the
> > release managers.
> >
> > +1 for Dawid & Guowei as the 1.13 release managers.
> > +1 for targeting feature freeze at the end of March
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, Jan 13, 2021 at 10:48 PM Dawid Wysakowicz <
> dwysakow...@apache.org>
> > wrote:
> >
> > > Hi all,
> > > With the 1.12 being released some time ago already I thought it would
> be
> > > a good time to kickstart the 1.13 release cycle.
> > >
> > > What do you think about Guowei and me being the release managers for
> > > Flink 1.13? We are happy to volunteer for it.
> > >
> > > The second topic I wanted to raise was the rough timeline for the
> > > release. According to our usual 3 months + the release
> > > testing/stabilising period
> > > we should aim with the feature freeze for the end of March/beginning of
> > > April. Does that work for everyone?
> > >
> > > Let me know what you think.
> > >
> > > Best,
> > > Dawid
> > >
> > >
> > >
> >
>


Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-11 Thread Khachatryan Roman
Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if
the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's
source subtask finishes.

> But since only the result partition part of the finished upstream need
wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the
EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the
implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some
concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till
all the pending buffers in the result partition has been flushed before get
to finish.
This is what I meant by "postpone JM notification from source". Just
blocking the task thread wouldn't add much complexity, though I'm not sure
if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and
postpone it to future versions ?
I think that's a good idea.

Regards,
Roman


On Mon, Jan 11, 2021 at 11:03 AM Yun Gao  wrote:

>   Hi Roman,
>
>   Very thanks for the feedbacks !
>
>
> > Probably it would be simpler to just decline the RPC-triggered
> checkpoint
> > if not all inputs of this task are finished (with
> CHECKPOINT_DECLINED_TASK_NOT_READY).
>
> > But I wonder how significantly this waiting for EoP from every
> input will delay performing the first checkpoint
> > by B after becoming a new source. This may in turn impact
> exactly-once sinks and incremental checkpoints.
> > Maybe a better option would be to postpone JM notification from
> source until it's EoP is consumed?
>
>I also agree with that there would indeed be possible cases that
> the checkpoint get slower since it could not skip
>the data in  the result partition of the finished upstream task:
> a) For aligned checkpoint, the cases would not happen since
> the downstream tasks would always need to
> process the buffers in order.
>b)  With unaligned checkpoint enabled, the slower cases might
> happen if the downstream task processes very
> slowly.
>
>But since only the result partition part of the finished upstream
> need wait to be processed, the other part of
>the execution graph could  still perform the unaligned checkpoint
> normally, I think the average delay caused would
>be much lower than the completely aligned checkpoint, but there
> would still be extremely bad cases that
>the delay is long.
>
>Declining the RPC-trigger checkpoint would indeed simplify the
> implementation, but since currently by default the
>failed checkpoint would cause job failover, thus we might have some
> concerns in directly decline the checkpoint.
>For postpone the notification the JM notification, since current JM
> should not be able to know if the task has
>received all the EndOfPartition from the upstream tasks, we might
> need to introduce new RPC for notifying the
>state and since the triggering is not atomic, we may also met with
> some  synchronization issues between JM and TM,
>which would introduce some complexity.
>
>   Thus another possible option might be let the upstream task to wait
> till all the pending buffers in the result partition has
>   been flushed before get to finish. We could only do the wait for the
> PipelineResultPartition so it won't affect the batch
>   jobs. With the waiting the unaligned checkpoint could continue to
> trigger the upstream task and skip the buffers in
>   the result partition. Since the result partition state would be kept
> within the operator state of the last operator, after failover
>   we would found that the last operator has an non-empty state and we
> would restart the tasks containing this operator to
>   resend the snapshotted buffers. Of course this would also introduce
> some complexity, and since the probability of long delay
>   would be lower than the completely aligned case, do you think it
> would be ok for us to view it as an optimization and
>   postpone it to future versions ?
>
>  Best,
>  Yun
>
>
>
> --
> From:Khachatryan Roman 
> Send Time:2021 Jan. 11 (Mon.) 05:46
> To:Yun Gao 
> Cc:Arvid Heise ; dev ; user <
> u...@flink.apache.org>
> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> Finished
>
> Thanks a lot for your answers Yun,
>
> > In detail, 

Re: [VOTE] Release 1.12.1, release candidate #2

2021-01-11 Thread Khachatryan Roman
Hi,
I'm not sure whether it's https://issues.apache.org/jira/browse/FLINK-20654 or
a new issue but I agree it shouldn't block 1.12.1.

Regards,
Roman


On Mon, Jan 11, 2021 at 10:30 AM Arvid Heise  wrote:

> Hi Xingbo,
>
> This ticket is actually about
> https://issues.apache.org/jira/browse/FLINK-20654 where we added a major
> fix last week.
> At this point, we are not sure what additionally causes this issue and
> cannot give an estimate about the impact of the fix nor the time needed to
> fix it. It may also only be a test issue.
>
> I wouldn't block on this fix. It has also been present in 1.12.0 already.
>
> Best,
>
> Arvid
>
> On Sun, Jan 10, 2021 at 6:32 AM Xingbo Huang  wrote:
>
> > Hi Xintong,
> >
> > FLINK-20309[1] has been reopened which is about unaligned checkpoint. I'm
> > not sure if this should be a blocker of 1.12.1. I'm cc @Piotr who can
> > provide more information about this issue.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-20309
> >
> > Best,
> > Xingbo
> >
> > > 2021年1月10日 上午9:23,Xintong Song  写道:
> > >
> > > Hi everyone,
> > >
> > > Please review and vote on the release candidate #2 for the version
> > 1.12.1,
> > > as follows:
> > >
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > > The complete staging area is available for your review, which includes:
> > > * JIRA release notes [1],
> > > * the official Apache source release and binary convenience releases to
> > be
> > > deployed to dist.apache.org [2], which are signed with the key with
> > > fingerprint F8E419AA0B60C28879E876859DFF40967ABFC5A4 [3],
> > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > * source code tag "release-1.12.1-rc2" [5],
> > > * website pull request listing the new release and adding announcement
> > blog
> > > post [6].
> > >
> > > The vote will be open for at least 72 hours. It is adopted by majority
> > > approval, with at least 3 PMC affirmative votes.
> > >
> > > Thanks,
> > > Xintong Song
> > >
> > > [1]
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349459
> > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.1-rc2
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1411
> > > [5] https://github.com/apache/flink/releases/tag/release-1.12.1-rc2
> > > [6] https://github.com/apache/flink-web/pull/405
> >
> >
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-10 Thread Khachatryan Roman
Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in
one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B,
if it received checkpoint trigger, it would know that all its precedant
tasks
> are finished, then it would wait till all the InputChannel received
EndOfPartition from the network (namely inputChannel.onBuffer() is called
with
> EndOfPartition) and then taking snapshot for the input channels, as the
normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint
if not all inputs of this task are finished (with
CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will
delay performing the first checkpoint by B after becoming a new source.
This may in turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source
until it's EoP is consumed?

Regards,
Roman


On Thu, Jan 7, 2021 at 5:01 PM Yun Gao  wrote:

> Hi Roman,
>
>Very thanks for the feedbacks! I'll try to answer the issues inline:
>
> > 1. Option 1 is said to be not preferable because it wastes resources and
> adds complexity (new event).
> > However, the resources would be wasted for a relatively short time
> until the job finishes completely.
> > And compared to other options, complexity seems much lower. Or are
> differences in task completion times so huge and so common?
>
> There might be mixed jobs with both bounded sources and unbounded sources,
> in this case, the resource for the finished
> part of the job would not be able to be released.
>
> And the Option 1 also complicates the semantics of the EndOfPartition,
> since if we holding the tasks and we still need to
> notify the following tasks about all records are sent, we would have to
> introduce some kind of pre-EndOfPartition messages,
> which is similar to the current EndOfPartition, but do not cause the
> channels to be released.
>
> > 2. I think it would be helpful to describe how is rescaling handled in
> Options 2 and 3 (or maybe it's not supported for jobs about to finish).
>
> For Option 2 and 3 we managed the states via the unit of operator, thus
> the process of rescaling would be the same with the normal checkpoint.
> For example, support one operator resides in a tasks with parallelism 4,
> if 2 fo the subtasks are finished, now the state of the operator is
> composed
> of the state of the 2 remaining subtask instance, if we rescale to 5 after
> failover, the state of the 2 previous remaining subtasks would be
> re-distributed
> to the 5 new subtasks after failover.
>
> If before failover all the 4 subtasks are finished, the operator would be
> marked as finished, after failover the operator would be still marked as
> finished,
> and all the subtask instance of this operator would skip all the methods
> like open(), endOfInput(), close() and would be excluded when taking
> checkpoints
> after failover.
>
>
> > 3. Option 3 assumes that the state of a finished task is not used.
> That's true for operator state, but what about channel state (captured by
> unaligned checkpoint)?
> > I think it still has to be sent downstream which invalidates this Option.
>
> For unaligned checkpoint, if in one checkpoint a subtask is marked as
> finished, then its descandent tasks would wait all the records are received
> from the finished tasks before taking checkpoint, thus in this case we
> would not have result partition state, but only have channel state for the
> downstream tasks that are still running.
>
> In detail, support we have a job with the graph A -> B -> C, support in
> one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if
> it received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received
> EndOfPartition from the network (namely inputChannel.onBuffer() is called
> with
> EndOfPartition) and then taking snapshot for the input channels, as the
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.
>
> I'll also optimize the FLIP to make it more clear~
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*Khachatryan Roman 
> *Send Date:*Thu Jan 7 21:55:52 2021
> *Recipients:*Arvid H

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Khachatryan Roman
Thanks for starting this discussion (and sorry for probably duplicated
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and
adds complexity (new event).
However, the resources would be wasted for a relatively short time until
the job finishes completely.
And compared to other options, complexity seems much lower. Or are
differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in
Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's
true for operator state, but what about channel state (captured by
unaligned checkpoint)? I think it still has to be sent downstream which
invalidates this Option.

Regards,
Roman


On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise  wrote:

> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>
>
> I think we are mixing two different things here that may require different
> solutions:
> 1. Tasks (=sink) that may need to do something with the final checkpoint.
> 2. Tasks that only finish after having finished operations that do not
> depend on data flow (async I/O, but I could also think of some timer
> actions in process functions).
>
> Your proposal would help most for the first case. The second case can
> solved entirely with current methods without being especially complicated:
> - EOP is only emitted once Async I/O is done with all background tasks
> - All timers are fired in a process function (I think we rather want to
> fire immediately on EOP but that's a different discussion)
> The advantage of this approach over your idea is that you don't need to
> wait for a checkpoint to complete to check for finalization.
>
> Now let's look at the first case. I see two alternatives:
> - The new sink interface implicitly incorporates this listener. Since I
> don't see a use case outside sinks, we could simply add this method to the
> new sink interface.
> - We implicitly assume that a sink is done after having a successful
> checkpoint at the end. Then we just need a tag interface
> `RequiresFinalization`. It also feels like we should add the property
> `final` to checkpoint options to help the sink detect that this is the last
> checkpoint to be taken. We could also try to always have the final
> checkpoint without tag interface on new sinks...
>
> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek 
> wrote:
>
>> This is somewhat unrelated to the discussion about how to actually do
>> the triggering when sources shut down, I'll write on that separately. I
>> just wanted to get this quick thought out.
>>
>> For letting operators decide whether they actually want to wait for a
>> final checkpoint, which is relevant at least for Async I/O and
>> potentially for sinks.
>>
>> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>>
>> This way we would decouple that logic from things that don't actually
>> need it. What do you think?
>>
>> Best,
>> Aljoscha
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [DISCUSS] Moving to JUnit5

2020-12-01 Thread Khachatryan Roman
+1 for the migration

(I agree with Dawid, for me the most important benefit is better support of
parameterized tests).

Regards,
Roman


On Mon, Nov 30, 2020 at 9:42 PM Arvid Heise  wrote:

> Hi Till,
>
> immediate benefit would be mostly nested tests for a better test structure
> and new parameterized tests for less clutter (often test functionality is
> split into parameterized test and non-parameterized test because of JUnit4
> limitation). Additionally, having Java8 lambdas to perform fine-grain
> exception handling would make all related tests more readable (@Test only
> allows one exception per test method, while in reality we often have more
> exceptions / more fine grain assertions and need to resort to try-catch --
> yuck!). The extension mechanism would also make the mini cluster much
> easier to use: we often have to start the cluster manually because of
> test-specific configuration, which can be easily avoided in JUnit5.
>
> In the medium and long-term, I'd also like to use the modular
> infrastructure and improved parallelization. The former would allow us
> better to implement cross-cutting features like TestLogger (why do we need
> to extend that manually in every test?). The latter is more relevant for
> the next push on CI, which would be especially interesting with e2e being
> available in Java.
>
> On Mon, Nov 30, 2020 at 2:07 PM Dawid Wysakowicz 
> wrote:
>
> > Hi all,
> >
> > Just wanted to express my support for the idea. I did miss certain
> > features of JUnit 5 already, an important one being much better support
> > for parameterized tests.
> >
> > Best,
> >
> > Dawid
> >
> > On 30/11/2020 13:50, Arvid Heise wrote:
> > > Hi Chesnay,
> > >
> > > The vintage runner supports the old annotations, so we don't have to
> > change
> > > them in the first step.
> > >
> > > The only thing that we need to change are all rules that do not extend
> > > ExternalResource (e.g., TestWatcher used in TestLogger). This change
> > needs
> > > to be done swiftly as this affects the shared infrastructure as you
> have
> > > mentioned.
> > >
> > > Only afterwards, we start to actually migrate the individual tests.
> That
> > > can be done module by module or as we go. I actually found a nice
> article
> > > that leverages the migration assist of IntelliJ [1].
> > >
> > > As the last stop, we remove the vintage runner - all JUnit4 tests have
> > been
> > > migrated and new tests cannot use old annotation etc. anymore.
> > >
> > > [1]
> > >
> >
> https://blog.jetbrains.com/idea/2020/08/migrating-from-junit-4-to-junit-5/
> > >
> > > On Mon, Nov 30, 2020 at 1:32 PM Chesnay Schepler 
> > wrote:
> > >
> > >> I presume we cannot do the migration module-wise due to shared test
> > >> utilities that rely on JUnit interfaces?
> > >>
> > >> On 11/30/2020 1:30 PM, Chesnay Schepler wrote:
> > >>> Is it feasible that 2 people can do the migration within a short
> > >>> time-frame (say, a week)?
> > >>> Must the migration of a test be done in one go, or can we for example
> > >>> first rename all the Before/After annotations and then to the rest?
> > >>> Are there any issues with other test dependencies (i.e., hamcrest,
> > >>> powermock (PowerMockRunner), mockito) that we should be aware of?
> > >>>
> > >>> I generally like the idea of using JUnit 5, but am wary of this
> > >>> migration dragging on for too long.
> > >>>
> > >>> On 11/27/2020 3:29 PM, Arvid Heise wrote:
> >  Dear devs,
> > 
> >  I'd like to start a discussion to migrate to a higher JUnit version.
> > 
> >  The main motivations are:
> >  - Making full use of Java 8 Lambdas for writing easier to read tests
> >  and a
> >  better performing way of composing failure messages.
> >  - Improved test structures with nested and dynamic tests.
> >  - Much better support for parameterized tests to avoid separating
> >  parameterized and non-parameterized parts into different test
> classes.
> >  - Composable dependencies and better hooks for advanced use cases
> >  (TestLogger).
> >  - Better exception verification
> >  - More current infrastructure
> >  - Better parallelizable
> > 
> >  Why now?
> >  - JUnit5 is now mature enough to consider it for such a complex
> > project
> >  - We are porting more and more e2e tests to JUnit and it would be a
> >  pity to
> >  do all the work twice (okay some already has been done and would
> >  result in
> >  adjustments, but the sooner we migrate, the less needs to be touched
> >  twice)
> > 
> >  Why JUnit5?
> >  There are other interesting alternatives, such as TestNG. I'm happy
> >  to hear
> >  specific alternatives. For now, I'd like to focus on JUnit4 for an
> >  easier
> >  migration path.
> > 
> >  Please discuss if you would also be interested in moving onward. To
> > get
> >  some overview, I'd like to see some informal +1 for the options:
> > 
> >  [ ] Stick to 

Re: [DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend

2020-11-14 Thread Khachatryan Roman
Hi Stefan,

Thanks for your reply. Very interesting ideas!
If I understand correctly, SharedStateRegistry will still be responsible
for pruning the old state; for that, it will maintain some (ordered)
mapping between StateMaps and their versions, per key group.
I think one modification to this approach is needed to support journaling:
for each entry, maintain a version when it was last fully snapshotted; and
use this version to find the minimum as you described above.
I'm considering a better state cleanup and optimization of removals as the
next step. Anyway, I will add it to the FLIP document.

Thanks!

Regards,
Roman


On Tue, Nov 10, 2020 at 12:04 AM Stefan Richter 
wrote:

> Hi,
>
> Very happy to see that the incremental checkpoint idea is finally becoming
> a reality for the heap backend! Overall the proposal looks pretty good to
> me. Just wanted to point out one possible improvement from what I can still
> remember from my ideas back then: I think you can avoid doing periodic full
> snapshots for consolidation. Instead, my suggestion would be to track the
> version numbers you encounter while you iterate a snapshot for writing it -
> and then you should be able to prune all incremental snapshots that were
> performed with a version number smaller than the minimum you find. To avoid
> the problem of very old entries that never get modified you could start
> spilling entries with a certain age-difference compared to the current map
> version so that eventually all entries for an old version are re-written to
> newer snapshots. You can track the version up to which this was done in the
> map and then you can again let go of their corresponding snapshots after a
> guaranteed time.So instead of having the burden of periodic large
> snapshots, you can make every snapshot work a little bit on the cleanup and
> if you are lucky it might happen mostly by itself if most entries are
> frequently updated. I would also consider to make map clean a special event
> in your log and consider unticking the versions on this event - this allows
> you to let go of old snapshots and saves you from writing a log of
> antimatter entries. Maybe the ideas are still useful to you.
>
> Best,
> Stefan
>
> On 2020/11/04 01:54:25, Khachatryan Roman  wrote:
> > Hi devs,>
> >
> > I'd like to start a discussion of FLIP-151: Incremental snapshots for>
> > heap-based state backend [1]>
> >
> > Heap backend, while being limited state sizes fitting into memory, also
> has>
> > some advantages compared to RocksDB backend:>
> > 1. Serialization once per checkpoint, not per state modification. This>
> > allows to “squash” updates to the same keys>
> > 2. Shorter synchronous phase (compared to RocksDB incremental)>
> > 3. No need for sorting and compaction, no IO amplification and JNI
> overhead>
> > This can potentially give higher throughput and efficiency.>
> >
> > However, Heap backend currently lacks incremental checkpoints. This
> FLIP>
> > aims to add initial support for them.>
> >
> > [1]>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend>
>
> >
> >
> > Any feedback highly appreciated.>
> >
> > Regards,>
> > Roman>
> >


Re: [DISCUSS] Releasing Apache Flink 1.11.3

2020-11-11 Thread Khachatryan Roman
Hi,

I'd like FLINK-20079 [1] to be merged into 1.11 and included in 1.11.3.

[1] https://issues.apache.org/jira/browse/FLINK-20079

Regards,
Roman


On Tue, Nov 10, 2020 at 8:21 AM Xintong Song  wrote:

> Thanks for the notice, Dian.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Nov 10, 2020 at 10:19 AM Dian Fu  wrote:
>
> > Hi Xintong,
> >
> > I want to bring one more issue to your attention [1]. The test case
> > UnalignedCheckpointCompatibilityITCase.test failed several times in the
> > last nightly test of release-1.11. We need to figure out if it's just an
> > instable test or caused by recent changes.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-20065
> >
> > > 在 2020年11月10日,上午9:24,Xintong Song  写道:
> > >
> > > Thanks for the replies.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Nov 10, 2020 at 1:09 AM Becket Qin 
> wrote:
> > >
> > >> Hi Xintong,
> > >>
> > >> Thanks for driving the release. Just want to sync up on the FLIP-27
> > >> backporting. Stephan and I are still trying to backport a bunch of
> > patches
> > >> of Source to 1.11.3. Including:
> > >>
> > >> [FLINK-19698][connector/common] Add a close() method to the
> SplitReader.
> > >> [FLINK-19717] SourceReaderBase.pollNext may return END_OF_INPUT if
> > >> SplitReader.fetch throws
> > >> [FLINK-19535] [connector/common] Avoid failing a job multiple times in
> > >> SourceCoordinator.
> > >> [FLINK-19265] [FLINK-20049][core] Source API final adjustments.
> > >>
> > >> and a few more fixes.
> > >>
> > >> We are currently trying to fix them in 1.12 first so it might take a
> > little
> > >> longer to backport them to 1.11.3. I think it will probably take us a
> > few
> > >> more days to finish the backport. So that would roughly be the end of
> > this
> > >> week.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >>
> > >>
> > >> On Mon, Nov 9, 2020 at 9:57 PM Till Rohrmann 
> > wrote:
> > >>
> > >>> Yes, I've downgraded FLINK-19816 to critical.
> > >>>
> > >>> Cheers,
> > >>> Till
> > >>>
> > >>> On Mon, Nov 9, 2020 at 10:19 AM Xintong Song 
> > >>> wrote:
> > >>>
> >  Thanks for the notice, Till.
> > 
> >  I just checked and found FLINK-20033 is already fixed. Shall we also
> >  downgrade FLINK-19816 to `Critical`?
> > 
> >  Thank you~
> > 
> >  Xintong Song
> > 
> > 
> > 
> >  On Mon, Nov 9, 2020 at 4:42 PM Till Rohrmann 
> > >>> wrote:
> > 
> > > I would like to bring one more critical issue to your attention
> which
> > >>> is
> > > FLINK-20033 [1]. I believe that this issue is actually causing what
> > >> has
> > > been reported in FLINK-19816 [2]. I hope to have it fixed by the
> end
> > >> of
> > > today. Once FLINK-20033 is fixed, I think that we don't have to
> block
> > >>> the
> > > release on FLINK-19816.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-20033
> > > [2] https://issues.apache.org/jira/browse/FLINK-19816
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Nov 9, 2020 at 4:05 AM Xintong Song  >
> >  wrote:
> > >
> > >> Hi devs,
> > >>
> > >> I'd like to provide an update on the progress of preparing release
> > > 1.11.3.
> > >>
> > >> *Blockers*
> > >> We currently have 3 remaining blockers. (3 resolved and 1 emerged
> > > compared
> > >> to last week)
> > >>
> > >> - [FLINK-19698] Add close() method and onCheckpointComplete() to
> > >> the
> > >> Source.
> > >> The issue has been fixed on the master branch. It's currently
> > >> blocked
> >  on
> > >> the FLIP-27 backportings to backport it to the 1.11 branch.
> > >>
> > >> - [FLINK-19717] SourceReaderBase.pollNext may return END_OF_INPUT
> > >> if
> > >> SplitReader.fetch throws
> > >> A PR has been opened and reviewed. From the discussions on the PR,
> > >> it
> > > looks
> > >> close to mergeable.
> > >>
> > >> - [FLINK-19816] Flink restored from a wrong checkpoint (a very old
> > >>> one
> > > and
> > >> not the last completed one)
> > >> This is a newly emerged blocker and Matthias is working on it.
> > >>
> > >> *Test Instabilities*
> > >> We currently have 27 test instabilities[1].
> > >> AFAIK, none of them are as serious as to block the 1.11.3 release.
> > >>
> > >> *FLIP-27 Backprotings*
> > >>
> > >> I noticed that there's no jira issues opened on the FLIP-27
> > >>> backporting
> > >> efforts, which is part of the major efforts planned for the 1.11.3
> > > release,
> > >> making it hard to track the progress.
> > >>
> > >>
> > >> @Stephan and @Becket, could you please share the updates on the
> > > backporting
> > >> efforts? How is the progress and when are the efforts expected to
> > >> be
> > >> finished? It would be appreciated and helpful if we can have a
> jira
> > > ticket
> > 

[DISCUSS] FLIP-151: Incremental snapshots for heap-based state backend

2020-11-03 Thread Khachatryan Roman
Hi devs,

I'd like to start a discussion of FLIP-151: Incremental snapshots for
heap-based state backend [1]

Heap backend, while being limited state sizes fitting into memory, also has
some advantages compared to RocksDB backend:
1. Serialization once per checkpoint, not per state modification. This
allows to “squash” updates to the same keys
2. Shorter synchronous phase (compared to RocksDB incremental)
3. No need for sorting and compaction, no IO amplification and JNI overhead
This can potentially give higher throughput and efficiency.

However, Heap backend currently lacks incremental checkpoints. This FLIP
aims to add initial support for them.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-151%3A+Incremental+snapshots+for+heap-based+state+backend


Any feedback highly appreciated.

Regards,
Roman


Re: Un-ignored Parsing Exceptions in the CsvFormat

2020-10-26 Thread Khachatryan Roman
Hey Austin,

I assigned the ticket,
that would be great if you could fix it!

Regards,
Roman


On Thu, Oct 22, 2020 at 5:08 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Roman,
>
> Sorry to miss this -- thanks for the confirmation and making the ticket.
> I'm happy to propose a fix if someone is able to assign the ticket to me.
>
> Best,
> Austin
>
> On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hey Austin,
>>
>> I think you are right. The problematic row contains an odd number of
>> delimiters in which case skipFields will return -1, which in turn leads to
>> an exception.
>>
>> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
>> to fix it.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>>> Format[1].
>>>
>>> Even with the `ignoreParseErrors()` set, the job fails when it
>>> encounters some types of malformed rows. The root cause is indeed a
>>> `ParseException`, so I'm wondering if there's anything more I need to do to
>>> ignore these rows. Each field in the schema is a STRING.
>>>
>>>
>>> I've configured the CSV format and table like so:
>>>
>>> tableEnv.connect(
>>> new FileSystem()
>>> .path(path)
>>> )
>>> .withFormat(
>>> new Csv()
>>> .quoteCharacter('"')
>>> .ignoreParseErrors()
>>> )
>>> .withSchema(schema)
>>> .inAppendMode()
>>>
>>>
>>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a
>>> check to `isLenient()` if there is an unexpected parser position?[2]
>>>
>>> Example error:
>>>
>>> 2020-10-16 12:50:18
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception when processing split: null
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>>> parser position for column 1 of row '",
>>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>>> ""company,'
>>> at
>>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>>> at
>>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>>> at
>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>>> at
>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>>
>>>
>>> Thanks,
>>> Austin
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>>> [2]:
>>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>>
>>


Re: [VOTE] FLIP-135: Approximate Task-Local Recovery

2020-10-20 Thread Khachatryan Roman
+1 (non-binding).

It would be a great improvement, thanks for the effort!

Regards,
Roman


On Tue, Oct 20, 2020 at 4:49 PM Steven Wu  wrote:

> +1 (non-binding).
>
> Some of our users have asked for this tradeoff of consistency over
> availability for some cases.
>
> On Mon, Oct 19, 2020 at 8:02 PM Zhijiang  .invalid>
> wrote:
>
> > Thanks for driving this effort, Yuan.
> >
> > +1 (binding) on my side.
> >
> > Best,
> > Zhijiang
> >
> >
> > --
> > From:Piotr Nowojski 
> > Send Time:2020年10月19日(星期一) 21:02
> > To:dev 
> > Subject:Re: [VOTE] FLIP-135: Approximate Task-Local Recovery
> >
> > Hey,
> >
> > I carry over my +1 (binding) from the discussion thread.
> >
> > Best,
> > Piotrek
> >
> > pon., 19 paź 2020 o 14:56 Yuan Mei  napisał(a):
> >
> > > Hey,
> > >
> > > I would like to start a voting thread for FLIP-135 [1], for approximate
> > > task local recovery. The proposal has been discussed in [2].
> > >
> > > The vote will be open till Oct. 23rd (72h, excluding weekends) unless
> > there
> > > is an objection or not enough votes.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-135+Approximate+Task-Local+Recovery
> > > [2]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-135-Approximate-Task-Local-Recovery-tp43930.html
> > >
> > >
> > > Best
> > >
> > > Yuan
> > >
> >
> >
>


Flink Speedcenter worker machine replaced

2020-09-01 Thread Khachatryan Roman
Hello,

Yesterday the machine executing Flink benchmarks was replaced due to
hardware problems.
The HW configuration is different, so the results may differ from what we
had previously.

Regards,
Roman


Re: FileSystemHaServices and BlobStore

2020-08-31 Thread Khachatryan Roman
+ dev

Blob store is used for jars, serialized job, and task information and logs.
You can find some information at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture


I guess in your setup, Flink was able to pick up local files.
HA setup presumes that Flink can survive the loss of that JM host and its
local files.

I'm not sure about K8s native setup - probably VoidBlobStore is enough if
there is a persistent volume.
But in the general case, FileSystemBlobStore should be used to store files
on some DFS.


Regards,
Roman


On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun  wrote:

> Did test with streaming job and FileSystemHaService using VoidBlobStore
> (no HA Blob), looks like job was able to recover from both JM restart and
> TM restart. Any idea in what use cases HA Blob is needed?
>
> Thanks,
> Alexey
> --
> *From:* Alexey Trenikhun 
> *Sent:* Friday, August 28, 2020 11:31 AM
> *To:* Khachatryan Roman 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Motivation is to have k8s HA setup without extra component - Zookeeper,
> see [1]
>
> Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> like  if we start job from savepoint, then persistence of BlobStore is
> not necessary, but is it needed if we recover from checkpoint?
>
> Thanks,
> Alexey
>
> [1]. https://issues.apache.org/jira/browse/FLINK-17598
>
>
> --
> *From:* Khachatryan Roman 
> *Sent:* Friday, August 28, 2020 9:24 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hello Alexey,
>
> I think you need FileSystemBlobStore as you are implementing HA Services,
> and BLOBs should be highly available too.
> However, I'm a bit concerned about the direction in general: it
> essentially means re-implementing ZK functionality on top of FS.
> What are the motivation and the use case?
>
> Regards,
> Roman
>
>
> On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun  wrote:
>
> Hello,
> I'm thinking about implementing FileSystemHaServices - single leader, but
> persistent RunningJobRegistry, CheckpointIDCounter,
> CompletedCheckpointStore and JobGraphStore. I'm not sure do you need
> FileSystemBlobStore or VoidBlobStore is enough. Can't figure out, should
> BlobStore survive JobManager crash. I see that ZookeeperHaServices use 
> FileSystemBlobStore,
> but not clear is to due to having multiple JobManagers (leader + follower)
> or necessity to preserve BLOBs on restart.
>
> Thanks,
> Alexey
>
>