Re: [DISCUSS] FLIP-360: Merging ExecutionGraphInfoStore and JobResultStore into a single component

2023-09-17 Thread Shammon FY
Hi Matthias,

Thanks for initiating this discussion, and +1 for overall from my side.
It's really strange to have two different places to store completed jobs,
this also brings about the complexity of Flink. I agree with using a
unified instance to store the completed job information.

In terms of ability, `ExecutionGraphInfoStore` and `JobResultStore` are
different: one is mainly used for information display, and the other is for
failover. So after unifying storage, can we use different interfaces to
meet the different requirements for jobs? Adding all these methods for
different components into one interface such as `CompletedJobStore` may be
a little strange. What do you think?

Best,
Shammon FY



On Fri, Sep 8, 2023 at 8:08 PM Gyula Fóra  wrote:

> Hi Matthias!
>
> Thank you for the detailed proposal, overall I am in favor of making this
> unification to simplify the logic and make the integration for external
> components more straightforward.
> I will try to read through the proposal more carefully next week and
> provide some detailed feedback.
>
> +1
>
> Thanks
> Gyula
>
> On Fri, Sep 8, 2023 at 8:36 AM Matthias Pohl  .invalid>
> wrote:
>
> > Just a bit more elaboration on the question that we need to answer here:
> Do
> > we want to expose the internal ArchivedExecutionGraph data structure
> > through JSON?
> >
> > - The JSON approach allows the user to have (almost) full access to the
> > information (that would be otherwise derived from the REST API).
> Therefore,
> > there's no need to spin up a cluster to access this information.
> > Any information that shall be exposed through the REST API needs to be
> > well-defined in this JSON structure, though. Large parts of the
> > ArchivedExecutionGraph data structure (essentially anything that shall be
> > used to populate the REST API) become public domain, though, which puts
> > more constraints on this data structure and makes it harder to change it
> in
> > the future.
> >
> > - The binary data approach allows us to keep the data structure itself
> > internal. We have more control over what we want to expose by providing
> > access points in the ClusterClient (e.g. just add a command to extract
> the
> > external storage path from the file).
> >
> > - The compromise (i.e. keeping ExecutionGraphInfoStore and JobResultStore
> > separate and just expose the checkpoint information next to the JobResult
> > in the JobResultStore file) would keep us the closest to the current
> state,
> > requires the least code changes and the least exposure of internal data
> > structures. It would allow any system (like the Kubernetes Operator) to
> > extract the checkpoint's external storage path. But we would still be
> stuck
> > with kind-of redundant components.
> >
> > From a user's perspective, I feel like the JSON approach is the best one
> > because it gives him/her the most freedom to be independent of Flink
> > binaries when handling completed jobs. But I see benefits from a Flink
> > developer's perspective to not expose the entire data structure but use
> the
> > ClusterClient as an access point.
> >
> > The last option is my least favorite one: Moving the ExecutionGraphInfo
> out
> > of the JobManager seems to be the right thing to do when thinking about
> > Flink's vision to become cloud-native.
> >
> > Just my 2cts on that topic.
> > Matthias
> >
> > On Mon, Sep 4, 2023 at 1:11 PM Matthias Pohl 
> > wrote:
> >
> > > Hi everyone,
> > > I want to open the discussion on FLIP-360 [1]. The goal of this FLIP is
> > to
> > > combine the two very similar components ExecutionGraphInfoStore and
> > > JobResultStore into a single component.
> > >
> > > The benefit of this effort would be to expose the metadata of a
> > > globally-terminated job even in cases where the JobManager fails
> shortly
> > > after the job finished. This is relevant for external checkpoint
> > management
> > > (like it's done in the Kubernetes Operator) which relies on the
> > checkpoint
> > > information to be available.
> > >
> > > More generally, it would allow completed jobs to be listed as part of
> the
> > > Flink cluster even after a JM failover. This would allow users to gain
> > more
> > > control over finished jobs.
> > >
> > > The current state of the FLIP doesn't come up with a final conclusion
> on
> > > the serialization format of the data (JSON vs binary). I want to
> > emphasize
> > > that there's also a third option which keeps both components separate
> and
> > > only exposes the additional checkpoint information through the
> > > JobResultStore.
> > >
> > > I'm looking forward to feedback.
> > > Best,
> > > Matthias
> > >
> > > PS: I might be less responsive in the next 2-3 weeks but want to
> initiate
> > > the discussion, anyway.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-360%3A+Merging+the+ExecutionGraphInfoStore+and+the+JobResultStore+into+a+single+component+CompletedJobStore
> > >
> >
>


Re: [DISCUSS] FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI

2023-09-17 Thread Rui Fan
Hi Zhanghao,

> Use a field named "location" (already used in JobExceptionsInfoWithHistory)
that represents TaskManager location using the newly added string formatter
method.

How about using the taskManagerLocation instead of location
for all Rest response related classes?

The taskManagerLocation may be clearer.

Best,
Rui


On Mon, Sep 18, 2023 at 10:11 AM Chen Zhanghao 
wrote:

> Hi all,
>
> I've updated the FLIP to incorporate Yangze's advice:
>
> 1. Add a new string formatter method to TaskManagerLocation and
> ArchivedTaskManagerLocation that prints in the form of
> "${hostname}:${port}" to align the string formatter used by REST API.
> 2. Highlight that the old host field will be kept for at least 2 minor
> versions before removal.
>
> Best,
> Zhanghao Chen
> 
> 发件人: Yangze Guo 
> 发送时间: 2023年9月15日 17:26
> 收件人: dev@flink.apache.org 
> 主题: Re: [DISCUSS] FLIP-363: Unify the Representation of TaskManager
> Location in REST API and Web UI
>
> Thanks for driving this, Zhanghao. +1 for the overall proposal.
>
> Some cents from my side:
>
> 1. Since most of the rest api get the location from
> TaskManagerLocation, we can align the string formatter in this class.
> E.g. add something like toHumanRealableString to this class.
>
> 2. From my understanding of FLIP-321, if we want to deprecate the host
> field, we should annotate the related field / getter / setter with
> @Deprecated in this version, and keep them at least 2 minor releases.
>
> Best,
> Yangze Guo
>
> On Wed, Sep 13, 2023 at 8:52 PM Chen Zhanghao 
> wrote:
> >
> > Hi Jing,
> >
> > Thanks for the suggestion. Endpoint is indeed a more professional word
> in the networking world but I think location is more suited here for two
> reasons:
> >
> >   1.  The term here is for uniquely identifying the TaskManager where
> the task is deployed while providing the host machine info as well to help
> identify taskmanager- and host-aggregative problems. So strictly speaking,
> it is not used in a pure networking context.
> >   2.  The term "location" is already used widely in the codebase, e.g.
> TaskManagerLocation and JobExceptions-related classes.
> >
> > WDYT?
> >
> > Best,
> > Zhanghao Chen
> > 
> > 发件人: Jing Ge 
> > 发送时间: 2023年9月13日 4:52
> > 收件人: dev@flink.apache.org 
> > 主题: Re: [DISCUSS] FLIP-363: Unify the Representation of TaskManager
> Location in REST API and Web UI
> >
> > Hi Zhanghao,
> >
> > Thanks for bringing this to our attention. It is a good proposal to
> improve
> > data consistency.
> >
> > Speaking of naming conventions of choosing location over host, how about
> > "endpoint" with the following thoughts:
> >
> > 1. endpoint is a more professional word than location in the network
> > context.
> > 2. I know commonly endpoints mean the URLs of services. Using
> Hostname:port
> > as the endpoint follows exactly the same rule, because TaskManager is the
> > top level service that aligns with the top level endpoint.
> >
> > WDYT?
> >
> > Best regards,
> > Jing
> >
> >
> > On Mon, Sep 11, 2023 at 6:01 AM Weihua Hu 
> wrote:
> >
> > > Hi, Zhanghao
> > >
> > > Since the meaning of "host" is not aligned, it seems good for me to
> remove
> > > it in the future release.
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Mon, Sep 11, 2023 at 11:48 AM Chen Zhanghao <
> zhanghao.c...@outlook.com>
> > > wrote:
> > >
> > > > Hi Fan Rui,
> > > >
> > > > Thanks for clarifying the definition of "public interfaces", that
> helps a
> > > > lot!
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > 发件人: Rui Fan <1996fan...@gmail.com>
> > > > 发送时间: 2023年9月11日 11:18
> > > > 收件人: dev@flink.apache.org 
> > > > 主题: Re: [DISCUSS] FLIP-363: Unify the Representation of TaskManager
> > > > Location in REST API and Web UI
> > > >
> > > > Thanks Zhanghao driving this FLIP, adding the port in Web UI
> > > > seems good to me.
> > > >
> > > > Hi Shammon and Zhanghao,
> > > >
> > > > I would like to clarify the difference between Public Interfaces
> > > > in FLIP and @Public in code.
> > > >
> > > > As I understand, the `Public Interfaces in FLIP` means these
> > > > changes will be used in user side, such as: @Public class,
> > > > Configuration settings, User-facing scripts/command-line tools,
> > > > and rest api, etc.
> > > >
> > > > You can refer to  "What are the "public interfaces" of the project?"
> > > > part in Flink Improvement Proposals doc[1].
> > > >
> > > > @Public class means the user will use this class directly, and
> > > > these rest classes won't be depended on directly. So I think
> > > > these classes related to rest don't need to be marked @Public.
> > > >
> > > > Please correct me if anything is wrong, thanks~
> > > >
> > > > [1]
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > >
> > > > Best,
> > > > Rui
> > > >
> > > > On Mon, Sep 11, 2023 at 11:09 AM Weihua Hu 
> > > wrote:
> > > >
> 

Re: [VOTE] FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI

2023-09-17 Thread Rui Fan
A gentle reminder about the location naming.
The naming of location is a little unclear, but
I can't think of any other better naming.

So I +1(binding) first.

Ping @Jing Ge  to help double check the name again.

Sorry for mentioning naming in the VOTE thread,
I didn't know this VOTE would be so early.

Best,
Rui

On Mon, Sep 18, 2023 at 11:44 AM Yangze Guo  wrote:

> +1 (binding)
>
> Best,
> Yangze Guo
>
> On Mon, Sep 18, 2023 at 11:37 AM Chen Zhanghao
>  wrote:
> >
> > Hi All,
> >
> > Thanks for all the feedback on FLIP-363: Unify the Representation of
> TaskManager Location in REST API and Web UI [1][2]
> >
> > I'd like to start a vote for FLIP-363. The vote will be open for at
> least 72 hours unless there is an objection or insufficient votes.
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-363%3A+Unify+the+Representation+of+TaskManager+Location+in+REST+API+and+Web+UI
> > [2] https://lists.apache.org/thread/sls1196mmk25w8nm2qf585254nbjr9hd
> >
> > Best,
> > Zhanghao Chen
>


Re: [VOTE] FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI

2023-09-17 Thread Yangze Guo
+1 (binding)

Best,
Yangze Guo

On Mon, Sep 18, 2023 at 11:37 AM Chen Zhanghao
 wrote:
>
> Hi All,
>
> Thanks for all the feedback on FLIP-363: Unify the Representation of 
> TaskManager Location in REST API and Web UI [1][2]
>
> I'd like to start a vote for FLIP-363. The vote will be open for at least 72 
> hours unless there is an objection or insufficient votes.
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-363%3A+Unify+the+Representation+of+TaskManager+Location+in+REST+API+and+Web+UI
> [2] https://lists.apache.org/thread/sls1196mmk25w8nm2qf585254nbjr9hd
>
> Best,
> Zhanghao Chen


[VOTE] FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI

2023-09-17 Thread Chen Zhanghao
Hi All,

Thanks for all the feedback on FLIP-363: Unify the Representation of 
TaskManager Location in REST API and Web UI [1][2]

I'd like to start a vote for FLIP-363. The vote will be open for at least 72 
hours unless there is an objection or insufficient votes.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-363%3A+Unify+the+Representation+of+TaskManager+Location+in+REST+API+and+Web+UI
[2] https://lists.apache.org/thread/sls1196mmk25w8nm2qf585254nbjr9hd

Best,
Zhanghao Chen


Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-09-17 Thread Xintong Song
Thanks for addressing my comments, Dong.

The expected behavior of checkpointing and failover depends on whether
> there is any operator currently running in the job with all its inputs'
> isBacklog=true. If there exists such an operator and
> interval-during-backlog = 0, then checkpoint will be disabled and the
> operator will have to failover in a way similar to batch mode.


This makes sense to me. Shall we also put this into the FLIP. Or maybe you
already did that and I overlooked it? The current description in "4)
Checkpoint and failover strategy" -> "Mixed mode" is a bit unclear to me.
It says "At the point when isBacklog switches to false, source operator
...", which sounds like upon any source operator switching to isBacklog =
false.

I am not sure what is the concern with having `flink-streaming-java` depend
> on `flink-runtime`. Can you clarify the exact concern?
>

The concern here is that an API module should not depend on a runtime
module. Currently, we have the "user codes -> flink-streaming-java ->
flink-runtime" dependency chain, which makes binary compatibility
impossible because any runtime changes can break the compatibility with a
user jar (which bundles flink-streaming-java) compiled for an older
version. Ideally, we want the runtime module to depend on the API module,
rather than the other way around. This is one of the issues we are trying
to resolve with the programmatic API refactor. However, the way we are
trying to resolve it is to introduce another API module and gradually
replace the current DataStream API / flink-streaming-java, which means
flink-streaming-java will stay depending on flink-runtime for a while
anyway. So the concern here is minor, only about we might need more effort
when reworking this with the new API.

The rest of your replies make sense to me.

Best,

Xintong



On Fri, Sep 15, 2023 at 10:05 PM Dong Lin  wrote:

> Hi Xintong,
>
> Thanks for your comments! Please see my reply inline.
>
> On Thu, Sep 14, 2023 at 4:58 PM Xintong Song 
> wrote:
>
> > Sorry to join the discussion late.
> >
> > Overall, I think it's a good idea to support dynamically switching the
> > operator algorithms between Streaming (optimized towards low latency +
> > checkpointing supports) and Batch (optimized towards throughput). This is
> > indeed a big and complex topic, and I really appreciate the previous
> > discussions that narrow the scope of this FLIP down to only considering
> > switching from Batch to Streaming as a first step.
> >
> > I have several questions.
> >
> > 1. The FLIP discusses various behaviors under 4 scenarios: streaming
> mode,
> > batch mode, mixed mode with checkpoint interval > 0, mixed mode with
> > checkpoint interval = 0. IIUC, this is because many batch optimizations
> > cannot be supported together with checkpointing. This justifies that in
> > mixed mode with interval > 0, most behaviors are the same as in streaming
> > mode. However, mixed mode with checkpoint interval = 0 does not always
> > necessarily mean we should apply such optimization. It is possible that
> in
> > some cases (likely with small data amounts) the cost of such
> optimizations
> > are higher than the benefit. Therefore, I'd suggest decoupling the
> concept
> > of applying these optimizations (i.e., the batch execution phase in the
> > mixed mode) from whether checkpointing is enabled or not. In particular,
> > I'd suggest removing the scenario "mixed mode with
> > e.c.interval-during-backlog > 0", changing the scenario "mixed mode with
> > e.c.interval-during-backlog = 0" to simply "mixed mode", and say that can
> > have different strategies for deciding whether to enable the mixed mode
> and
> > as the first step the strategy is to enable it when
> > e.c.interval-during-backlog = 0.
> >
>
> Thanks for the detailed explanation!
>
> I have updated the "Behavior changes when switching from batch mode to
> stream mode" section with the following changes.
>
> 1) Remove the description of "mixed mode with interval-during-backlog > 0"
> and add the statement saying that "after this FLIP, the behavior of Flink
> runtime with execution.runtime-mode = streaming AND
> execution.checkpointing.interval-during-backlog > 0, will be same as the
> stream mode prior to this FLIP"
>
> 2) Add the statement saying that "Mixed mode refers to the behavior of
> Flink runtime after this FLIP with execution.runtime-mode = streaming AND
> execution.checkpointing.interval-during-backlog = 0".
>
> 3) Add the statement saying that "It is possible for mixed mode to be
> slower than stream mode, particularly when there is only small amount of
> input records and the overhead of buffering/sorting inputs out-weight its
> benefit. This is similar to how the merge join might be slower than hash
> join. This FLIP focuses on optimizing the Flink throughput when there is a
> high number of input records. In the future, we might introduce more
> strategies to turn on mix mode in a smart way to avoid performance
> 

[VOTE] FLIP-307: Flink Connector Redshift

2023-09-17 Thread Samrat Deb
Hi All,

Thanks for all the feedback on FLIP-307: Flink Connector Redshift [1][2]

I'd like to start a vote for FLIP-307. The vote will be open for at least 72
hours unless there is an objection or insufficient votes.

Bests,
Samrat

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
[2] https://lists.apache.org/thread/wsz4jgdpnlyw1x781f9qpk7y416b45dj


[jira] [Created] (FLINK-33102) Document the standalone autoscaler

2023-09-17 Thread Rui Fan (Jira)
Rui Fan created FLINK-33102:
---

 Summary: Document the standalone autoscaler
 Key: FLINK-33102
 URL: https://issues.apache.org/jira/browse/FLINK-33102
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Fan
Assignee: Rui Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33101) Add the integration test for standalone autoscaler

2023-09-17 Thread Rui Fan (Jira)
Rui Fan created FLINK-33101:
---

 Summary: Add the integration test for standalone autoscaler
 Key: FLINK-33101
 URL: https://issues.apache.org/jira/browse/FLINK-33101
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Fan
Assignee: Samrat Deb






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33100) Implement YarnJobFetcher for Standalone Autoscaler

2023-09-17 Thread Rui Fan (Jira)
Rui Fan created FLINK-33100:
---

 Summary: Implement YarnJobFetcher for Standalone Autoscaler
 Key: FLINK-33100
 URL: https://issues.apache.org/jira/browse/FLINK-33100
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Fan
Assignee: Rui Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2023-09-17 Thread Xintong Song
Thanks for addressing my comments, Dong.

LGTM.

Best,

Xintong



On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu  wrote:

> Hi Dong & Jinhao,
>
> Thanks for your clarification! +1
>
> Best regards,
> Wencong
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-09-15 11:26:16, "Dong Lin"  wrote:
> >Hi Wencong,
> >
> >Thanks for your comments! Please see my reply inline.
> >
> >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu 
> wrote:
> >
> >> Dear Dong,
> >>
> >> I have thoroughly reviewed the proposal for FLIP-331 and believe it
> would
> >> be
> >> a valuable addition to Flink. However, I do have a few questions that I
> >> would
> >> like to discuss:
> >>
> >>
> >> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by
> >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
> >> supports WindowedStream and AllWindowedStream to process all records
> >> belonging to a key in a 'global' window under both STREAMING and BATCH
> >> runtime execution mode.
> >>
> >>
> >> However, besides coGroup and keyBy().aggregate(), other operators on
> >> WindowedStream and AllWindowedStream, such as join/reduce, etc,
> currently
> >> are still implemented based on WindowOperator.
> >>
> >>
> >> In fact, these operators can also be implemented without using
> >> WindowOperator
> >> to prevent additional WindowAssigner#assignWindows or
> >> triggerContext#onElement
> >> invocation cost. Will there be plans to support these operators in the
> >> future?
> >>
> >
> >You are right. The EndOfStreamWindows proposed in this FLIP can
> potentially
> >benefit any DataStream API that takes WindowAssigner as parameters. This
> >can involve more operations than aggregate and co-group.
> >
> >And yes, we have plans to take advantage of this API to optimize these
> >operators in the future. This FLIP focuses on the introduction of the
> >public APIs and uses aggregate/co-group as the first two examples to
> >show-case the performance benefits.
> >
> >I have added a "Analysis of APIs affected by this FLIP" to list the
> >DataStream APIs that can benefit from this FLIP. Would this answer your
> >question?
> >
> >
> >>
> >> 2. When using EndOfStreamWindows, upstream operators no longer support
> >> checkpointing. This limit may be too strict, especially when dealing
> with
> >> bounded data in streaming runtime execution mode, where checkpointing
> >> can still be useful.
> >>
> >
> >I am not sure we have a good way to support checkpoint while still
> >achieving the performance improves targeted by this FLIP.
> >
> >The issue here is that if we support checkpoint, then we can not take
> >advantage of algorithms (e.g. sorting inputs using ExternalSorter) that
> are
> >not compatible with checkpoints. These algorithms (which do not support
> >checkpoint) are the main reasons why batch mode currently significantly
> >outperforms stream mode in doing aggregation/cogroup etc.
> >
> >In most cases where the user does not care about processing latency, it is
> >generally preferred to use batch mode to perform aggregation operations
> >(which should be 10X faster than the existing stream mode performance)
> >instead of doing checkpoint.
> >
> >Also note that we can still let operators perform failover in the same as
> >the existing batch mode execution, where the intermediate results
> (produced
> >by one operator) can be persisted in shuffle service and downstream
> >operators can re-read those data from shuffle service after failover.
> >
> >
> >>
> >> 3. The proposal mentions that if a transformation has isOutputOnEOF ==
> >> true, the
> >> operator as well as its upstream operators will be executed in 'batch
> >> mode' with
> >> checkpointing disabled. I would like to understand the specific
> >> implications of this
> >> 'batch mode' and if there are any other changes associated with it?
> >
> >
> >Good point. We should explicitly mention the changes. I have updated the
> >FLIP to clarify this.
> >
> >More specifically, the checkpoint is disabled when these operators are
> >running, such that these operators can do operations not compatible with
> >checkpoints (e.g. sorting inputs). And operators should re-read the data
> >from the upstream blocking edge or sources after failover.
> >
> >Would this answer your question?
> >
> >
> >>
> >> Additionally, I am curious to know if this 'batch mode' conflicts with
> the
> >> 'mix mode'
> >>
> >> described in FLIP-327. While the coGroup and keyBy().aggregate()
> operators
> >> on
> >> EndOfStreamWindows have the attribute 'isInternalSorterSupported' set to
> >> true,
> >> indicating support for the 'mixed mode', they also have isOutputOnEOF
> set
> >> to true,
> >> which suggests that the upstream operators should be executed in 'batch
> >> mode'.
> >> Will the 'mixed mode' be ignored when in 'batch mode'? I would
> appreciate
> >> any
> >> clarification on this matter.
> >>
> >
> >Good question. I think `isInternalSorterSupported` and `isOutputOnEOF` do
> >not conflict with each 

[jira] [Created] (FLINK-33099) Support the Standalone Autoscaler

2023-09-17 Thread Rui Fan (Jira)
Rui Fan created FLINK-33099:
---

 Summary: Support the Standalone Autoscaler
 Key: FLINK-33099
 URL: https://issues.apache.org/jira/browse/FLINK-33099
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Fan
Assignee: Rui Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33098) Support kubernetes autoscaler using generic interface

2023-09-17 Thread Rui Fan (Jira)
Rui Fan created FLINK-33098:
---

 Summary: Support kubernetes autoscaler using generic interface
 Key: FLINK-33098
 URL: https://issues.apache.org/jira/browse/FLINK-33098
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Fan
Assignee: Rui Fan


# Moving all classes aren't related to kubernetes to flink-autoscaler module
 # Support kubernetes autoscaler using generic interface
 # Removing the flink-kubernetes-operator-autoscaler module
 # Removing the option prefix(kubernetes.operator.) for all options and update 
the doc(All old option names are marked with withDeprecatedKeys to ensure the 
compatibility.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33097) Initialize the generic autoscaler module and interfaces

2023-09-17 Thread Rui Fan (Jira)
Rui Fan created FLINK-33097:
---

 Summary: Initialize the generic autoscaler module and interfaces
 Key: FLINK-33097
 URL: https://issues.apache.org/jira/browse/FLINK-33097
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Fan
Assignee: Rui Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discuss] CRD for flink sql gateway in the flink k8s operator

2023-09-17 Thread Shammon FY
Thanks @Gyula, I would like to share our use of sql-gateway with the Flink
session cluster and I hope that it could help you to have a clearer
understanding of our needs :)

As @Yangze mentioned, currently we use flink as an olap platform by the
following steps
1. Setup a flink session cluster by flink k8s session with k8s or zk
highavailable.
2.  Write a Helm chart for Sql-Gateway image and launch multiple gateway
instances to submit jobs to the same flink session cluster.

As we mentioned in docs[1], we hope that users can easily launch
sql-gateway instances in k8s. Does it only need to add a Helm chart for
sql-gateway, or should we need to add this feature to the flink
operator? Can you help give the conclusion? Thank you very much @Gyula

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/olap_quickstart/

Best,
Shammon FY



On Sun, Sep 17, 2023 at 2:02 PM Gyula Fóra  wrote:

> Hi!
> It sounds pretty easy to deploy the gateway automatically with session
> cluster deployments from the operator , but there is a major limitation
> currently. The SQL gateway itself doesn't really support any operator
> integration so jobs submitted through the SQL gateway would not be
> manageable by the operator (they won't show up as session jobs).
>
> Without that, this is a very strange feature. We would make something much
> easier for users that is not well supported by the operator in the first
> place. The operator is designed to manage clusters and jobs
> (FlinkDeployment / FlinkSessionJob). It would be good to understand if we
> could make the SQL Gateway create a FlinkSessionJob / Deployment (that
> would require application cluster support) and basically submit the job
> through the operator.
>
> Cheers,
> Gyula
>
> On Sun, Sep 17, 2023 at 1:26 AM Yangze Guo  wrote:
>
> > > There would be many different ways of doing this. One gateway per
> > session cluster, one gateway shared across different clusters...
> >
> > Currently, sql gateway cannot be shared across multiple clusters.
> >
> > > understand the tradeoff and the simplest way of accomplishing this.
> >
> > I'm not familiar with the Flink operator codebase, it would be
> > appreciated if you could elaborate more on the cost of adding this
> > feature. I agree that deploying a gateway using the native Kubernetes
> > Deployment can be a simple way and straightforward for users. However,
> > integrating it into an operator can provide additional benefits and be
> > more user-friendly, especially for users who are less familiar with
> > Kubernetes. By using an operator, users can benefit from consistent
> > version management with the session cluster and upgrade capabilities.
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Fri, Sep 15, 2023 at 5:38 PM Gyula Fóra  wrote:
> > >
> > > There would be many different ways of doing this. One gateway per
> session
> > > cluster, one gateway shared across different clusters...
> > > I would not rush to add anything anywhere until we understand the
> > tradeoff
> > > and the simplest way of accomplishing this.
> > >
> > > The operator already supports ingresses for session clusters so we
> could
> > > have a gateway sitting somewhere else simply using it.
> > >
> > > Gyula
> > >
> > > On Fri, Sep 15, 2023 at 10:18 AM Yangze Guo 
> wrote:
> > >
> > > > Thanks for bringing this up, Dongwoo. Flink SQL Gateway is also a key
> > > > component for OLAP scenarios.
> > > >
> > > > @Gyula
> > > > How about add sql gateway as an optional component to Session Cluster
> > > > Deployments. User can specify the resource / instance number and
> ports
> > > > of the sql gateway. I think that would help a lot for OLAP and batch
> > > > user.
> > > >
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Fri, Sep 15, 2023 at 3:19 PM ConradJam 
> wrote:
> > > > >
> > > > > If we start from the crd direction, I think this mode is more like
> a
> > > > > sidecar of the session cluster, which is submitted to the session
> > cluster
> > > > > by sending sql commands to the sql gateway. I don't know if my
> > statement
> > > > is
> > > > > accurate.
> > > > >
> > > > > Xiaolong Wang  于2023年9月15日周五
> > > > 13:27写道:
> > > > >
> > > > > > Hi, Dongwoo,
> > > > > >
> > > > > > Since Flink SQL gateway should run upon a Flink session cluster,
> I
> > > > think
> > > > > > it'd be easier to add more fields to the CRD of
> `FlinkSessionJob`.
> > > > > >
> > > > > > e.g.
> > > > > >
> > > > > > apiVersion: flink.apache.org/v1beta1
> > > > > > kind: FlinkSessionJob
> > > > > > metadata:
> > > > > >   name: sql-gateway
> > > > > > spec:
> > > > > >   sqlGateway:
> > > > > > endpoint: "hiveserver2"
> > > > > > mode: "streaming"
> > > > > > hiveConf:
> > > > > >   configMap:
> > > > > > name: hive-config
> > > > > > items:
> > > > > >   - key: hive-site.xml
> > > > > > path: hive-site.xml
> > > > > >
> > > > > >
> > > > > > On Fri, Sep 15, 2023 at 12:56 PM Dongwoo Kim <
> > 

回复: [DISCUSS] FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI

2023-09-17 Thread Chen Zhanghao
Hi all,

I've updated the FLIP to incorporate Yangze's advice:

1. Add a new string formatter method to TaskManagerLocation and 
ArchivedTaskManagerLocation that prints in the form of "${hostname}:${port}" to 
align the string formatter used by REST API.
2. Highlight that the old host field will be kept for at least 2 minor versions 
before removal.

Best,
Zhanghao Chen

发件人: Yangze Guo 
发送时间: 2023年9月15日 17:26
收件人: dev@flink.apache.org 
主题: Re: [DISCUSS] FLIP-363: Unify the Representation of TaskManager Location in 
REST API and Web UI

Thanks for driving this, Zhanghao. +1 for the overall proposal.

Some cents from my side:

1. Since most of the rest api get the location from
TaskManagerLocation, we can align the string formatter in this class.
E.g. add something like toHumanRealableString to this class.

2. From my understanding of FLIP-321, if we want to deprecate the host
field, we should annotate the related field / getter / setter with
@Deprecated in this version, and keep them at least 2 minor releases.

Best,
Yangze Guo

On Wed, Sep 13, 2023 at 8:52 PM Chen Zhanghao  wrote:
>
> Hi Jing,
>
> Thanks for the suggestion. Endpoint is indeed a more professional word in the 
> networking world but I think location is more suited here for two reasons:
>
>   1.  The term here is for uniquely identifying the TaskManager where the 
> task is deployed while providing the host machine info as well to help 
> identify taskmanager- and host-aggregative problems. So strictly speaking, it 
> is not used in a pure networking context.
>   2.  The term "location" is already used widely in the codebase, e.g. 
> TaskManagerLocation and JobExceptions-related classes.
>
> WDYT?
>
> Best,
> Zhanghao Chen
> 
> 发件人: Jing Ge 
> 发送时间: 2023年9月13日 4:52
> 收件人: dev@flink.apache.org 
> 主题: Re: [DISCUSS] FLIP-363: Unify the Representation of TaskManager Location 
> in REST API and Web UI
>
> Hi Zhanghao,
>
> Thanks for bringing this to our attention. It is a good proposal to improve
> data consistency.
>
> Speaking of naming conventions of choosing location over host, how about
> "endpoint" with the following thoughts:
>
> 1. endpoint is a more professional word than location in the network
> context.
> 2. I know commonly endpoints mean the URLs of services. Using Hostname:port
> as the endpoint follows exactly the same rule, because TaskManager is the
> top level service that aligns with the top level endpoint.
>
> WDYT?
>
> Best regards,
> Jing
>
>
> On Mon, Sep 11, 2023 at 6:01 AM Weihua Hu  wrote:
>
> > Hi, Zhanghao
> >
> > Since the meaning of "host" is not aligned, it seems good for me to remove
> > it in the future release.
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, Sep 11, 2023 at 11:48 AM Chen Zhanghao 
> > wrote:
> >
> > > Hi Fan Rui,
> > >
> > > Thanks for clarifying the definition of "public interfaces", that helps a
> > > lot!
> > >
> > > Best,
> > > Zhanghao Chen
> > > 
> > > 发件人: Rui Fan <1996fan...@gmail.com>
> > > 发送时间: 2023年9月11日 11:18
> > > 收件人: dev@flink.apache.org 
> > > 主题: Re: [DISCUSS] FLIP-363: Unify the Representation of TaskManager
> > > Location in REST API and Web UI
> > >
> > > Thanks Zhanghao driving this FLIP, adding the port in Web UI
> > > seems good to me.
> > >
> > > Hi Shammon and Zhanghao,
> > >
> > > I would like to clarify the difference between Public Interfaces
> > > in FLIP and @Public in code.
> > >
> > > As I understand, the `Public Interfaces in FLIP` means these
> > > changes will be used in user side, such as: @Public class,
> > > Configuration settings, User-facing scripts/command-line tools,
> > > and rest api, etc.
> > >
> > > You can refer to  "What are the "public interfaces" of the project?"
> > > part in Flink Improvement Proposals doc[1].
> > >
> > > @Public class means the user will use this class directly, and
> > > these rest classes won't be depended on directly. So I think
> > > these classes related to rest don't need to be marked @Public.
> > >
> > > Please correct me if anything is wrong, thanks~
> > >
> > > [1]
> > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > >
> > > Best,
> > > Rui
> > >
> > > On Mon, Sep 11, 2023 at 11:09 AM Weihua Hu 
> > wrote:
> > >
> > > > Hi, Zhanghao
> > > >
> > > > Thanks for bringing this proposal.
> > > >
> > > > I have a concern:
> > > >
> > > > I prefer to keep the "host" field and add a "location" field in future
> > > > versions.
> > > > Consider a scenario where a machine (host) with multiple TaskManagers
> > has
> > > > poor processing performance due to some problems.
> > > > By using a host field aggregation, I can identify the problems with
> > this
> > > > machine and take it offline.
> > > >
> > > > Best,
> > > > Weihua
> > > >
> > > >
> > > > On Mon, Sep 11, 2023 at 10:34 AM Chen Zhanghao <
> > > zhanghao.c...@outlook.com>
> > > > wrote:
> > > >
> > > > > Hi Shammon,
> > > > >
> > > > 

[jira] [Created] (FLINK-33096) Flink on k8s,if one taskmanager pod was crashed,the whole flink job will be failed

2023-09-17 Thread wawa (Jira)
wawa created FLINK-33096:


 Summary: Flink on k8s,if one taskmanager pod was crashed,the whole 
flink job will be failed
 Key: FLINK-33096
 URL: https://issues.apache.org/jira/browse/FLINK-33096
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.14.3
Reporter: wawa


The Flink version is 1.14.3, and the job is submitted to Kubernetes using the 
Native Kubernetes application mode. During the scheduling process, when a 
TaskManager pod crashes due to an exception, Kubernetes will attempt to start a 
new TaskManager pod. However, the scheduling process is halted immediately, 
resulting in the entire Flink job being terminated. On the other hand, if the 
JobManager pod crashes, Kubernetes is able to successfully schedule a new 
JobManager pod. This observation was made during application usage. Can you 
please help analyze the underlying issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-229: Prometheus Sink Connector

2023-09-17 Thread Ahmed Hamdy
Thanks Lorenzo,
Looking forward to the PRs.
Best Regards
Ahmed Hamdy


On Sat, 16 Sept 2023 at 06:27, Lorenzo Nicora 
wrote:

> Hello
>
> (apologies if this is a duplicate reply)
>
> I was working with Karthi on this connector, and I have taken over the
> development.
> We have a working version we would like to submit to the community.
>
> The renumbered FLIP-312 is also updated with more details [1].
> Happy to answer any questions.
>
> Regards
> Lorenzo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-312%3A+Prometheus+Sink+Connector
>
> On Mon, 21 Aug 2023, 13:06 Ahmed Hamdy,  wrote:
>
> > Hello Karthi
> > Is this FLIP still in progress? I see the FLIP not updated & couldn't
> find
> > open JIRAs.
> > I am happy to take over if you are no longer working on this.
> > Best Regards
> > Ahmed Hamdy
> >
> >
> > On Mon, 22 May 2023 at 14:49, Martijn Visser 
> > wrote:
> >
> > > Hi all,
> > >
> > > > For example, a user might want to read in logs, perform some
> > aggregations
> > > and publish it into a metrics store for visualisation. This might be a
> > > great use-case for reducing the cardinality of metrics!
> > >
> > > I can see that. What I would like to see in the FLIP is a proposal on
> the
> > > boundaries of the metrics reporter vs the Prometheus sink. I think it's
> > > important that we make clear when to use a metric reporter and when
> not.
> > I
> > > can imagine that there will be Flink users who think that they can get
> > data
> > > from the metric reporter, make aggregrations in Flink and then store it
> > > using the Prometheus sink.
> > >
> > > Overall, I think more context must be added to the FLIP, especially on
> > the
> > > motivation.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Fri, May 19, 2023 at 4:28 PM Karthi Thyagarajan <
> > kar...@karthitect.com>
> > > wrote:
> > >
> > > > Hi Lijie
> > > >
> > > > Thank you for pointing this out. I've corrected it [1]. Also, this
> page
> > > > [2] still shows 178 and 229 as available, which is why I picked it
> up.
> > > >
> > > > Thanks
> > > > Karthi
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-312%3A+Prometheus+Sink+Connector
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > >
> > > > On May 15, 2023, at 9:37 PM, Lijie Wang 
> > > wrote:
> > > >
> > > >
> > > > Hi Karthi,
> > > >
> > > > I think you are using a wrong FLIP id, the FLIP-229 has already be
> > > used[1].
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job
> > > >
> > > > Best,
> > > > Lijie
> > > >
> > > > Martijn Visser  于2023年5月16日周二 04:44写道:
> > > >
> > > > Hi Karthi,
> > > >
> > > > Thanks for the FLIP and opening up the discussion. My main question
> is:
> > > why
> > > > should we create a separate connector and not use and/or improve the
> > > > existing integrations with Prometheus? I would like to understand
> more
> > so
> > > > that it can be added to the motivation of the FLIP.
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > On Mon, May 15, 2023 at 6:03 PM Karthi Thyagarajan <
> > > kar...@karthitect.com>
> > > > wrote:
> > > >
> > > > > Hello all,
> > > > >
> > > > > We would like to start a discussion thread on FLIP-229: Prometheus
> > Sink
> > > > > Connector [1] where we propose to provide a sink connector for
> > > Prometheus
> > > > > [2] based on the Async Sink [3]. Looking forward to comments and
> > > > feedback.
> > > > > Thank you.
> > > > >
> > > > > [1]
> > > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Prometheus+Sink+Connector
> > > > > [2] https://prometheus.io/
> > > > > [3]
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > > > >
> > > >
> > > >
> > > >
> > >
> >
>


Re: [Discuss] CRD for flink sql gateway in the flink k8s operator

2023-09-17 Thread Gyula Fóra
Hi!
It sounds pretty easy to deploy the gateway automatically with session
cluster deployments from the operator , but there is a major limitation
currently. The SQL gateway itself doesn't really support any operator
integration so jobs submitted through the SQL gateway would not be
manageable by the operator (they won't show up as session jobs).

Without that, this is a very strange feature. We would make something much
easier for users that is not well supported by the operator in the first
place. The operator is designed to manage clusters and jobs
(FlinkDeployment / FlinkSessionJob). It would be good to understand if we
could make the SQL Gateway create a FlinkSessionJob / Deployment (that
would require application cluster support) and basically submit the job
through the operator.

Cheers,
Gyula

On Sun, Sep 17, 2023 at 1:26 AM Yangze Guo  wrote:

> > There would be many different ways of doing this. One gateway per
> session cluster, one gateway shared across different clusters...
>
> Currently, sql gateway cannot be shared across multiple clusters.
>
> > understand the tradeoff and the simplest way of accomplishing this.
>
> I'm not familiar with the Flink operator codebase, it would be
> appreciated if you could elaborate more on the cost of adding this
> feature. I agree that deploying a gateway using the native Kubernetes
> Deployment can be a simple way and straightforward for users. However,
> integrating it into an operator can provide additional benefits and be
> more user-friendly, especially for users who are less familiar with
> Kubernetes. By using an operator, users can benefit from consistent
> version management with the session cluster and upgrade capabilities.
>
>
> Best,
> Yangze Guo
>
> On Fri, Sep 15, 2023 at 5:38 PM Gyula Fóra  wrote:
> >
> > There would be many different ways of doing this. One gateway per session
> > cluster, one gateway shared across different clusters...
> > I would not rush to add anything anywhere until we understand the
> tradeoff
> > and the simplest way of accomplishing this.
> >
> > The operator already supports ingresses for session clusters so we could
> > have a gateway sitting somewhere else simply using it.
> >
> > Gyula
> >
> > On Fri, Sep 15, 2023 at 10:18 AM Yangze Guo  wrote:
> >
> > > Thanks for bringing this up, Dongwoo. Flink SQL Gateway is also a key
> > > component for OLAP scenarios.
> > >
> > > @Gyula
> > > How about add sql gateway as an optional component to Session Cluster
> > > Deployments. User can specify the resource / instance number and ports
> > > of the sql gateway. I think that would help a lot for OLAP and batch
> > > user.
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Fri, Sep 15, 2023 at 3:19 PM ConradJam  wrote:
> > > >
> > > > If we start from the crd direction, I think this mode is more like a
> > > > sidecar of the session cluster, which is submitted to the session
> cluster
> > > > by sending sql commands to the sql gateway. I don't know if my
> statement
> > > is
> > > > accurate.
> > > >
> > > > Xiaolong Wang  于2023年9月15日周五
> > > 13:27写道:
> > > >
> > > > > Hi, Dongwoo,
> > > > >
> > > > > Since Flink SQL gateway should run upon a Flink session cluster, I
> > > think
> > > > > it'd be easier to add more fields to the CRD of `FlinkSessionJob`.
> > > > >
> > > > > e.g.
> > > > >
> > > > > apiVersion: flink.apache.org/v1beta1
> > > > > kind: FlinkSessionJob
> > > > > metadata:
> > > > >   name: sql-gateway
> > > > > spec:
> > > > >   sqlGateway:
> > > > > endpoint: "hiveserver2"
> > > > > mode: "streaming"
> > > > > hiveConf:
> > > > >   configMap:
> > > > > name: hive-config
> > > > > items:
> > > > >   - key: hive-site.xml
> > > > > path: hive-site.xml
> > > > >
> > > > >
> > > > > On Fri, Sep 15, 2023 at 12:56 PM Dongwoo Kim <
> dongwoo7@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > *@Gyula*
> > > > > > Thanks for the consideration Gyula. My initial idea for the CR
> was
> > > > > roughly
> > > > > > like below.
> > > > > > I focused on simplifying the setup in k8s environment, but I
> agree
> > > with
> > > > > > your opinion that for the sql gateway
> > > > > > we don't need custom operator logic to handle and most of the
> > > > > requirements
> > > > > > can be met by existing k8s resources.
> > > > > > So maybe helm chart that bundles all resources needed should be
> > > enough.
> > > > > >
> > > > > > apiVersion: flink.apache.org/v1beta1
> > > > > > kind: FlinkSqlGateway
> > > > > > metadata:
> > > > > >   name: flink-sql-gateway-example
> > > > > >   namespace: default
> > > > > > spec:
> > > > > >   clusterName: flink-session-cluster-example
> > > > > >   exposeServiceType: LoadBalancer
> > > > > >   flinkSqlGatewayConfiguration:
> > > > > > sql-gateway.endpoint.type: "hiveserver2"
> > > > > > sql-gateway.endpoint.hiveserver2.catalog.name: "hive"
> > > > > >   hiveConf:
> > > > > > configMap:
> >