Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-07-01 Thread Sonam Mandal
Hi Stephan,

Thanks for the detailed explanation! This really helps understand all this 
better. Appreciate your help!

Regards,
Sonam

From: Stephan Ewen 
Sent: Wednesday, June 30, 2021 3:56:22 AM
To: Sonam Mandal ; user@flink.apache.org 

Cc: matth...@ververica.com ; Jark Wu 
; Timo Walther 
Subject: Re: Recommended way to submit a SQL job via code without getting tied 
to a Flink version?

Hi Sonam!

To answer this, let me quickly provide some background on the two ways flink 
deployments / job submissions work.
See also here for some background: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#deployment-modes<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Fdeployment%2Foverview%2F%23deployment-modes=04%7C01%7Csomandal%40linkedin.com%7C3cd4406b98424cedb91908d93bb5c2db%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637606474120742440%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=J0FO3IfUMiJs8%2BfNKx4es0Hz4lGjfh9eT883SrNO3Q4%3D=0>

What is common in all setups is that the query compilation / the dataflow 
assembly happens where the entry-point program runs.

If you are programatically setting up the application with a 
StreamExecutionEnvironment / TableEnvironment, then the query compilation (and 
JobGraph generation) happens where the program's main()-method is. If you are 
submitting via the SQL Client, then the SQL Client is the entrypoint program, 
and the query compilation happens where the SQLClient runs.

Now, where is that entry-point program executed if you deploy a job? That 
depends on your deployment mode.

(1) Session Mode:

Here you have a running cluster with a Dispatcher that has the REST endpoint 
that accepts job submissions. Jobs are submitted via HTTP transporting a 
serialized JobGraph.
The entry-point can run anywhere and uses a HTTP client to send the generated 
JobGraph to the Dispatcher.

==> Here you need to worry about matching the versions of the client (the 
entry-point process, like SQL Client) and the deployed session cluster.

(2) Application Mode:

The entry-point (SQL Client or application program) spawns the JobManager when 
the program is executed. The jobgraph is passed as a Java object directly to 
the spawned JM component. The is an HTTP endpoint, but it is not for submitting 
jobs, only for the Web UI and for commands like cancelling execution.

This mode should allow you to encapsulate a Flink application (a SQL query) 
completely self-contained and not need to sync versions between clients and 
clusters.



The internal abstraction for all ways to execute the programs are the 
PipelineExecutors.
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fcore%2Fexecution%2FPipelineExecutor.java=04%7C01%7Csomandal%40linkedin.com%7C3cd4406b98424cedb91908d93bb5c2db%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637606474120752440%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=QR907wIxv%2F9H6AtJdpIPiLCk6pK5Y2xBlDQdnOBn4KU%3D=0>

If you look at the subclasses of "PipelineExecutor" you basically see all 
built-in deployment modes. To create a customized version of the Application 
deployment mode, (or maybe the Job deployment mode) you can dig for example 
through the EmbeddedExecutor and the ApplicationDispatcherBootstrap.

Hope that helps...

Best,
Stephan



On Tue, Jun 29, 2021 at 5:01 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Matthias,

Thanks for getting back to me. We are trying to build a system where users can 
focus on writing Flink SQL applications and we handle the full lifecycle of 
their Flink cluster and job. We would like to let users focus on just their SQL 
and UDF logic. In such an environment, we cannot enforce that all users must 
use a single Flink version. We intend to have this setup in kubernetes where 
within the same kubernetes cluster we can create multiple Flink clusters to 
which jobs are submitted.

Due to this, using an interactive shell will not be an option, nor do we want 
to directly expose this to users except for testing purposes.

I see the latest 1.13 release now has an option to pass a SQL file as input to 
the SQL client and it’ll take care of running the job. We will explore this 
option as well. I believe this is a new feature which wasn’t available in 1.12, 
right? Does the planning happen in the SQL client or on the job manager? We ran 
into issues with job graph incompatibility if our code directly submitted the 
SQL to the remote environment or if we used /bin/flink to run this jar tha

Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-29 Thread Sonam Mandal
Hi Matthias,

Thanks for getting back to me. We are trying to build a system where users can 
focus on writing Flink SQL applications and we handle the full lifecycle of 
their Flink cluster and job. We would like to let users focus on just their SQL 
and UDF logic. In such an environment, we cannot enforce that all users must 
use a single Flink version. We intend to have this setup in kubernetes where 
within the same kubernetes cluster we can create multiple Flink clusters to 
which jobs are submitted.

Due to this, using an interactive shell will not be an option, nor do we want 
to directly expose this to users except for testing purposes.

I see the latest 1.13 release now has an option to pass a SQL file as input to 
the SQL client and it’ll take care of running the job. We will explore this 
option as well. I believe this is a new feature which wasn’t available in 1.12, 
right? Does the planning happen in the SQL client or on the job manager? We ran 
into issues with job graph incompatibility if our code directly submitted the 
SQL to the remote environment or if we used /bin/flink to run this jar that 
does the SQL conversion.

We currently have a POC idea which takes the SQL as a file and we wrote a 
simple job runner which reads this SQL and executes it. We are using Flink REST 
APIs to upload this jar and submit the job so that the job graph generation 
happens on the job manager. We no longer see the job graph incompatibility 
issues.

Is there any reason not to use the above approach? We noticed that the Flink 
client (/bin/flink) does job graph generation  itself and not via the REST API, 
any reason why it doesn’t leverage the REST API?

Nice thing about using REST is that we can now run multiple Flink cluster 
versions and our job submission code doesn’t need to know which flink client 
version to use.

We definitely saw this job graph incompatibility with /bin/flink. We still need 
to test out the sql client with the -f option to assess whether we will require 
keeping multiple versions around should we decide to use this option. So we 
were wondering what the recommendation is within the Flink community on how to 
handle such cases. Hope this clarifies our use case better.

Also, as for the state incompatibility between major Flink versions, I see the 
thread mentions using a tool to rewrite the savepoints. Is this the only 
recommended way to handle this? Is this safe and does it work in all scenarios?

Thanks,
Sonam



From: Matthias Pohl 
Sent: Tuesday, June 29, 2021 02:29
To: Sonam Mandal
Cc: user@flink.apache.org; Jark Wu; Timo Walther
Subject: Re: Recommended way to submit a SQL job via code without getting tied 
to a Flink version?

Hi Sonam,
what's the reason for not using the Flink SQL client? Because of the version 
issue? I only know that FlinkSQL's state is not backwards-compatible between 
major Flink versions [1]. But that seems to be unrelated to what you describe.

I'm gonna add Jark and Timo to this thread. Maybe, they can add more insights.

Matthias

[1] 
https://issues.apache.org/jira/browse/FLINK-20823<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-20823=04%7C01%7Csomandal%40linkedin.com%7C8f2bc487eac94e12087308d93ae0517e%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637605557525140470%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000=pvMNaKJAZjKL%2FFX%2BevvguaEOjkhkBfH0D0HYYE0YS0s%3D=0>

On Tue, Jun 22, 2021 at 9:44 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

We've written a simple tool which takes SQL statements as input and uses a 
StreamTableEnvironment to eventually submit this to the Flink cluster. We've 
noticed that the Flink library versions we depend on must match the Flink 
version running in our Kubernetes cluster for the job submission to be 
successful. If the versions don't match, the job submission goes through but 
the job errors out for various reasons. We do not want to use the SQL shell 
(which I also believe is version specific and must run on the same pod as the 
Job Manager).

Is there any version agnostic way to submit SQL jobs to the Flink cluster?

Thanks,
Sonam


Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-22 Thread Sonam Mandal
Hello,

We've written a simple tool which takes SQL statements as input and uses a 
StreamTableEnvironment​ to eventually submit this to the Flink cluster. We've 
noticed that the Flink library versions we depend on must match the Flink 
version running in our Kubernetes cluster for the job submission to be 
successful. If the versions don't match, the job submission goes through but 
the job errors out for various reasons. We do not want to use the SQL shell 
(which I also believe is version specific and must run on the same pod as the 
Job Manager).

Is there any version agnostic way to submit SQL jobs to the Flink cluster?

Thanks,
Sonam


Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions

2021-06-18 Thread Sonam Mandal
Hi Paul,

Thanks for getting back to me. I did take a look at the Google GO operator, and 
they use the /bin/flink client for job submission. My understanding is that in 
this scenario users must ensure that their job jar is compatible with the Flink 
version, and the client will just take care of the submission. Do let me know 
if I understood this correctly or not.

Perhaps some context on what we are doing will help. We are writing a small 
client which takes SQL statements and converts it to the Table environment and 
submits the job. The /bin/flink client does not directly take SQL out of the 
box and we cannot expect users to run a SQL shell to run their production SQL 
streaming services. Since we are dealing with the job graph generation 
ourselves, we have run into the issue where our client needs to be compiled 
with the same version of Flink that we are running, otherwise we run into job 
graph compatibility issues. So I wanted to understand if there is a 
recommendation on how to deal with this conversion in a scenario where 
different users may run different Flink versions in a given kubernetes cluster.

Any thoughts?

Thanks,
Sonam

From: Paul K Moore 
Sent: Friday, June 18, 2021 2:25:52 AM
To: Sonam Mandal 
Cc: user@flink.apache.org ; Srinivasulu Punuru 

Subject: Re: Recommendation for dealing with Job Graph incompatibility across 
varying Flink versions

Hi Sonam,

I am not a long-standing Flink user (3 months only) so perhaps others will have 
a more authoritative view.

I would say that I am using Flink in k8s, and have had some good success with 
the Google Flink operator 
(https://github.com/GoogleCloudPlatform/flink-on-k8s-operator<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2Fflink-on-k8s-operator=04%7C01%7Csomandal%40linkedin.com%7C0a15c995b4d04585169908d9323b19fc%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637596051695453848%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=geelaXQ9JiFnVRZUK%2BsMdqU44i8yleWVZpE4q6JxYRE%3D=0>).
  This includes Custom Resource Definitions (CRDs) so that you can define your 
Flink clusters in YAML, and deploy using kustomize.

The result is:

A Flink cluster of a job-manager and one-or-more task-managers.
A Kubernetes job which acts as the link “client” to submit the job to the 
job-manager, the job-submitter

e.g.

flink-example-job-submitter-g4s6g   0/1 Completed   0  6d15h
flink-example-jobmanager-0  1/1 Running 3  6d15h
flink-example-taskmanager-0 1/1 Running 3  6d15h

This all seems in keeping with Flink’s “Per Job-Mode” deployment option 
(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#per-job-mode<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Fdeployment%2Foverview%2F%23per-job-mode=04%7C01%7Csomandal%40linkedin.com%7C0a15c995b4d04585169908d9323b19fc%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637596051695463844%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=8uqX5SHX%2FIneUXokKFuNQ8UE%2B0wMjzwkV%2FO8uB6OKVQ%3D=0>)

Note: i’m only just getting into to state persistence and recovery, so still 
some work to do, but I think this is largely understanding and configuration.

Hope that helps

Paul

On 17 Jun 2021, at 23:55, Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:

Hello,

We are exploring running multiple Flink clusters within a Kubernetes cluster 
such that each Flink cluster can run with a specified Flink image version. 
Since the Flink Job Graph needs to be compatible with the Flink version running 
in the Flink cluster, this brings a challenge in how we ensure that the SQL job 
graph or Flink job jars are compatible with the Flink cluster users want to run 
them on.

E.g. if the Flink cluster is running version 1.12.1, the job graph generated 
from the SQL must be created using compatible 1.12.1 Flink libraries. 
Otherwise, we see issues with deserialization etc.

Is there a recommended way to handle this scenario today?

Thanks,
Sonam



Recommendation for dealing with Job Graph incompatibility across varying Flink versions

2021-06-17 Thread Sonam Mandal
Hello,

We are exploring running multiple Flink clusters within a Kubernetes cluster 
such that each Flink cluster can run with a specified Flink image version. 
Since the Flink Job Graph needs to be compatible with the Flink version running 
in the Flink cluster, this brings a challenge in how we ensure that the SQL job 
graph or Flink job jars are compatible with the Flink cluster users want to run 
them on.

E.g. if the Flink cluster is running version 1.12.1, the job graph generated 
from the SQL must be created using compatible 1.12.1 Flink libraries. 
Otherwise, we see issues with deserialization etc.

Is there a recommended way to handle this scenario today?

Thanks,
Sonam


Re: Regarding FLIP-91's status

2021-05-28 Thread Sonam Mandal
Hi Matthias,

Thanks for your quick response! I have sent a reply on the FLIP-91 thread, 
thanks for pointing me to it.
@Jark Wu<mailto:imj...@gmail.com> it'll be great if you have any context on 
this as well.

Thanks,
Sonam

From: Matthias Pohl 
Sent: Friday, May 28, 2021 5:18 AM
To: Sonam Mandal 
Cc: user@flink.apache.org ; Jark Wu 
Subject: Re: Regarding FLIP-91's status

Hi Sonam,
It looks like it has been stale for some time. You might be able to restart the 
discussion replying to the respective thread in the dev mailing list [1]. You 
seem to be right about the repository based on Jark's reply in the related 
ticket FLINK-15472 [2]. I'm adding Jark to the thread. Maybe, he can shed some 
light on the state of FLIP-91.

Best,
Matthias

[1] 
http://mail-archives.apache.org/mod_mbox/flink-dev/202001.mbox/%3CCADQYLGsCGDJkfd3L1hAy1y_M2625YkNHJGW82UraGLhzg6p7Ug%40mail.gmail.com%3E<https://nam06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmail-archives.apache.org%2Fmod_mbox%2Fflink-dev%2F202001.mbox%2F%253CCADQYLGsCGDJkfd3L1hAy1y_M2625YkNHJGW82UraGLhzg6p7Ug%2540mail.gmail.com%253E=04%7C01%7Csomandal%40linkedin.com%7C9bade0f94e0a4427e51d08d921d2b901%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637578011809193877%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000=90Q8wyI1FnHd7SUoos7f4L%2FJl5ptm920KaiE%2F%2FyIMXo%3D=0>
[2] 
https://issues.apache.org/jira/browse/FLINK-15472<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15472=04%7C01%7Csomandal%40linkedin.com%7C9bade0f94e0a4427e51d08d921d2b901%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637578011809204150%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000=tEWPyGz%2F7X5E0NeFfe0VID8XORpoMGEjDT%2FpEmwzzY4%3D=0>

On Thu, May 27, 2021 at 9:51 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

I was curious about the progress on 
FLIP-91<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-91%253A%2BSupport%2BSQL%2BClient%2BGateway=04%7C01%7Csomandal%40linkedin.com%7C9bade0f94e0a4427e51d08d921d2b901%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637578011809213897%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000=fGxIkoxJjvFA91nJbJNeSqsurPaB9ve0eqsm272PSag%3D=0>.
 Is this actively being developed?
I believe the code is in development at 
https://github.com/ververica/flink-sql-gateway<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fververica%2Fflink-sql-gateway=04%7C01%7Csomandal%40linkedin.com%7C9bade0f94e0a4427e51d08d921d2b901%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637578011809213897%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000=IXYS62ZNxL0gwzp8H6RE7540njaKKG9eoE9AJCgxVGo%3D=0>,
 is this the right REPO?

I haven't seen much activity on this since sometime last year. I wanted to 
understand if there is still a plan to continue developing this, and if not, I 
wanted to understand why.

Appreciate your help!

Thanks,
Sonam



Regarding FLIP-91's status

2021-05-27 Thread Sonam Mandal
Hello,

I was curious about the progress on 
FLIP-91.
 Is this actively being developed?
I believe the code is in development at 
https://github.com/ververica/flink-sql-gateway, is this the right REPO?

I haven't seen much activity on this since sometime last year. I wanted to 
understand if there is still a plan to continue developing this, and if not, I 
wanted to understand why.

Appreciate your help!

Thanks,
Sonam



Re: Task Local Recovery with mountable disks in the cloud

2021-05-10 Thread Sonam Mandal
Hi Till,

Sure, that sounds good. I'll open a FLIP for this when we start working on it.

Thanks for the insights!

Regards,
Sonam

From: Till Rohrmann 
Sent: Monday, May 10, 2021 2:26 AM
To: Sonam Mandal 
Cc: dev ; user@flink.apache.org 
Subject: Re: Task Local Recovery with mountable disks in the cloud

Hi Sonam,

I think it would be great to create a FLIP for this feature. FLIPs don't have 
to be super large and in this case, I could see it work to express the general 
idea to make local recovery work across TaskManager failures and then outline 
the different ideas we had so far. If we then decide to go with the persisting 
of cache information (the AllocationIDs), then this could be a good outcome. If 
we decide to go with the more complex solution of telling the ResourceManager 
and JobMaster about the ranges of cached state data, then this is also ok.

Cheers,
Till

On Fri, May 7, 2021 at 6:30 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Till,

Thanks for getting back to me. Apologies for my delayed response.

Thanks for confirming that the slot ID (Allocation ID) is indeed necessary 
today for task local recovery to kick in, and thanks for your insights on how 
to make this work.

We are interested in exploring this disaggregation between local state storage 
and slots to allow potential reuse of local state even when TMs go down.

I'm planning to spend some time exploring the Flink code around local recovery 
and state persistence. I'm still new to Flink, so any guidance will be helpful. 
I think both of your ideas on how to make this happen are interesting and worth 
exploring. What's the procedure to collaborate or get guidance on this feature? 
Will a FLIP be required, or will opening a ticket do?

Thanks,
Sonam

From: Till Rohrmann mailto:trohrm...@apache.org>>
Sent: Monday, April 26, 2021 10:24 AM
To: dev mailto:d...@flink.apache.org>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>; Sonam Mandal 
mailto:soman...@linkedin.com>>
Subject: Re: Task Local Recovery with mountable disks in the cloud

Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature 
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state 
storage to make it reusable across TaskManager failures. However, it is also 
not trivial to do.

Maybe let me first describe how the current task local recovery works and then 
see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID 
associates a slot on a TaskExecutor with a job and is also used for scoping the 
lifetime of a slot wrt a job (theoretically, one and the same slot could be 
used to fulfill multiple slot requests of the same job if the slot allocation 
is freed in between). Note that the AllocationID is a random ID and, thus, 
changes whenever the ResourceManager allocates a new slot on a TaskExecutor for 
a job.

Task local recovery is effectively a state cache which is associated with an 
AllocationID. So for every checkpoint and every task, a TaskExecutor copies the 
state data and stores them in the task local recovery cache. The cache is 
maintained as long as the slot allocation is valid (e.g. the slot has not been 
freed by the JobMaster and the slot has not timed out). This makes the 
lifecycle management of the state data quite easy and makes sure that a process 
does not clutter local disks. On the JobMaster side, Flink remembers for every 
Execution, where it is deployed (it remembers the AllocationID). If a failover 
happens, then Flink tries to re-deploy the Executions into the slots they were 
running in before by matching the AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for simplicity 
and because we couldn't guarantee that a failed TaskExecutor X will be 
restarted on the same machine again and thereby having access to the same local 
disk as before. That's also why Flink deletes the cache directory when a slot 
is freed or when the TaskExecutor is shut down gracefully.

With persistent volumes this changes and we can make the TaskExecutors 
"stateful" in the sense that we can reuse an already occupied cache. One rather 
simple idea could be to also persist the slot allocations of a TaskExecutor 
(which slot is allocated and what is its assigned AllocationID). This 
information could be used to re-initialize the TaskExecutor upon restart. That 
way, it does not have to register at the ResourceManager and wait for new slot 
allocations but could directly start offering its slots to the jobs it 
remembered. If the TaskExecutor cannot find the JobMasters for the respective 
jobs, it would then free the slots and clear the cache accordingly.

This could work as long as the ResourceManager does not start new TaskExecutors

Re: Task Local Recovery with mountable disks in the cloud

2021-05-07 Thread Sonam Mandal
Hi Till,

Thanks for getting back to me. Apologies for my delayed response.

Thanks for confirming that the slot ID (Allocation ID) is indeed necessary 
today for task local recovery to kick in, and thanks for your insights on how 
to make this work.

We are interested in exploring this disaggregation between local state storage 
and slots to allow potential reuse of local state even when TMs go down.

I'm planning to spend some time exploring the Flink code around local recovery 
and state persistence. I'm still new to Flink, so any guidance will be helpful. 
I think both of your ideas on how to make this happen are interesting and worth 
exploring. What's the procedure to collaborate or get guidance on this feature? 
Will a FLIP be required, or will opening a ticket do?

Thanks,
Sonam

From: Till Rohrmann 
Sent: Monday, April 26, 2021 10:24 AM
To: dev 
Cc: user@flink.apache.org ; Sonam Mandal 

Subject: Re: Task Local Recovery with mountable disks in the cloud

Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature 
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state 
storage to make it reusable across TaskManager failures. However, it is also 
not trivial to do.

Maybe let me first describe how the current task local recovery works and then 
see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID 
associates a slot on a TaskExecutor with a job and is also used for scoping the 
lifetime of a slot wrt a job (theoretically, one and the same slot could be 
used to fulfill multiple slot requests of the same job if the slot allocation 
is freed in between). Note that the AllocationID is a random ID and, thus, 
changes whenever the ResourceManager allocates a new slot on a TaskExecutor for 
a job.

Task local recovery is effectively a state cache which is associated with an 
AllocationID. So for every checkpoint and every task, a TaskExecutor copies the 
state data and stores them in the task local recovery cache. The cache is 
maintained as long as the slot allocation is valid (e.g. the slot has not been 
freed by the JobMaster and the slot has not timed out). This makes the 
lifecycle management of the state data quite easy and makes sure that a process 
does not clutter local disks. On the JobMaster side, Flink remembers for every 
Execution, where it is deployed (it remembers the AllocationID). If a failover 
happens, then Flink tries to re-deploy the Executions into the slots they were 
running in before by matching the AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for simplicity 
and because we couldn't guarantee that a failed TaskExecutor X will be 
restarted on the same machine again and thereby having access to the same local 
disk as before. That's also why Flink deletes the cache directory when a slot 
is freed or when the TaskExecutor is shut down gracefully.

With persistent volumes this changes and we can make the TaskExecutors 
"stateful" in the sense that we can reuse an already occupied cache. One rather 
simple idea could be to also persist the slot allocations of a TaskExecutor 
(which slot is allocated and what is its assigned AllocationID). This 
information could be used to re-initialize the TaskExecutor upon restart. That 
way, it does not have to register at the ResourceManager and wait for new slot 
allocations but could directly start offering its slots to the jobs it 
remembered. If the TaskExecutor cannot find the JobMasters for the respective 
jobs, it would then free the slots and clear the cache accordingly.

This could work as long as the ResourceManager does not start new TaskExecutors 
whose slots could be used to recover the job. If this is a problem, then one 
needs to answer the question how long to wait for the old TaskExecutors to come 
back and reusing their local state vs. starting quickly a fresh instance but 
having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit also 
more complex would be to make the cache information explicit when registering 
the TaskExecutor at the ResourceManager and later offering slots to the 
JobMaster. For example, the TaskExecutor could tell the ResourceManager which 
states it has locally cached (it probably needs to contain key group ranges for 
every stored state) and this information could be used to decide from which 
TaskExecutor to allocate slots for a job. Similarly on the JobMaster side we 
could use this information to calculate the best mapping between Executions and 
slots. I think that mechanism could better deal with rescaling events where 
there is no perfect match between Executions and slots because of the changed 
key group ranges.

So to answer your question: There is currently no way to preserve AllocationIDs 
across restarts. However, we could use the 

Task Local Recovery with mountable disks in the cloud

2021-04-19 Thread Sonam Mandal
Hello,

We've been experimenting with Task-local recovery using Kubernetes. We have a 
way to specify mounting the same disk across Task Manager restarts/deletions 
for when the pods get recreated. In this scenario, we noticed that task local 
recovery does not kick in (as expected based on the documentation).

We did try to comment out the code on the shutdown path which cleaned up the 
task local directories before the pod went down / was restarted. We noticed 
that remote recovery kicked in even though the task local state was present. I 
noticed that the slot IDs changed, and was wondering if this is the main reason 
that the task local state didn't get used in this scenario?

Since we're using this shared disk to store the local state across pod 
failures, would it make sense to allow keeping the task local state so that we 
can get faster recovery even for situations where the Task Manager itself dies? 
In some sense, the storage here is disaggregated from the pods and can 
potentially benefit from task local recovery. Any reason why this is a bad idea 
in general?

Is there a way to preserve the slot IDs across restarts? We setup the Task 
Manager to pin the resource-id, but that didn't seem to help. My understanding 
is that the slot ID needs to be reused for task local recovery to kick in.

Thanks,
Sonam



Re: How to know if task-local recovery kicked in for some nodes?

2021-04-12 Thread Sonam Mandal
Hi Till,

Got it, that definitely makes sense, was just looking for some ballpark number 
to start with. Appreciate your help!

Thanks,
Sonam

From: Till Rohrmann 
Sent: Monday, April 12, 2021 1:00 AM
To: Sonam Mandal 
Cc: dhanesh arole ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: How to know if task-local recovery kicked in for some nodes?

Hi Sonam,

The state size probably depends a bit on your infrastructure. Assuming you have 
1 GBps network connection and local SSDs, then I guess you should see a 
difference if your local state size is  > 1 GB.

Cheers,
Till

On Wed, Apr 7, 2021 at 1:46 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Till and Dhanesh,

Thanks for the insights into both on how to check that this kicks in and on the 
expected behavior. My understanding too was that if multiple TMs are used for 
the job, any TMs that don’t go down can take advantage of local recovery.

Do you have any insights on a good minimum state size we should experiment with 
to check recovery time differences between the two modes?

Thanks,
Sonam

From: dhanesh arole mailto:davcdhane...@gmail.com>>
Sent: Wednesday, April 7, 2021 3:43:11 AM
To: Till Rohrmann mailto:trohrm...@apache.org>>
Cc: Sonam Mandal mailto:soman...@linkedin.com>>; Tzu-Li 
(Gordon) Tai mailto:tzuli...@apache.org>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: How to know if task-local recovery kicked in for some nodes?

Hi Till,

You are right. To give you more context about our setup, we are running 1 task 
slot per task manager and total number of task manager replicas equal to job 
parallelism. The issue actually exacerbates during rolling deployment of task 
managers as each TM goes offline and comes back online again after some time. 
So during bouncing of every TM pod somehow task allocation changes and finally 
job stabilises once all TMs are restarted.  Maybe a proper blue green setup 
would allow us to make the best use of local recovery during restart of TMs. 
But during intermittent failures of one of the TMs local recovery works as 
expected on the other healthy TM instances ( I.e it does not download from 
remote ).

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try 
to redeploy tasks onto them also in case of a global failover. Only those tasks 
which have been executed on the lost TaskManager will need new slots and have 
to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole 
mailto:davcdhane...@gmail.com>> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod 
gets killed and restarts again ( i.e. the entire task manager process restarts 
) then local recovery doesn't happen. Task manager restore process actually 
downloads the latest completed checkpoint from the remote state handle even 
when the older localState data is available. This happens because with every 
run allocation-ids for tasks running on task manager change as task manager 
restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data 
kicks in when the task manager process is alive but due to some other reason ( 
like timeout from sink or external dependency ) one of the tasks fails and the 
flink job gets restarted by the job manager.

Please CMIIW


-
Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the 
recovery time. Apart from that you can also look for "Found registered local 
state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged 
on debug. This indicates that the local state is available. However, it does 
not say whether it is actually used. E.g. when doing a rescaling operation we 
change the assignment of key group ranges which prevents local state from being 
used. However in case of a recovery the above-mentioned log message should 
indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
mailto:tzuli...@apache.org>> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether 
there is a way to validate that some tasks of the job recovered from the local 
state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we 
run a job with parallelism 4. To simulate failure, we 

Re: How to know if task-local recovery kicked in for some nodes?

2021-04-07 Thread Sonam Mandal
Hi Till and Dhanesh,

Thanks for the insights into both on how to check that this kicks in and on the 
expected behavior. My understanding too was that if multiple TMs are used for 
the job, any TMs that don’t go down can take advantage of local recovery.

Do you have any insights on a good minimum state size we should experiment with 
to check recovery time differences between the two modes?

Thanks,
Sonam

From: dhanesh arole 
Sent: Wednesday, April 7, 2021 3:43:11 AM
To: Till Rohrmann 
Cc: Sonam Mandal ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: How to know if task-local recovery kicked in for some nodes?

Hi Till,

You are right. To give you more context about our setup, we are running 1 task 
slot per task manager and total number of task manager replicas equal to job 
parallelism. The issue actually exacerbates during rolling deployment of task 
managers as each TM goes offline and comes back online again after some time. 
So during bouncing of every TM pod somehow task allocation changes and finally 
job stabilises once all TMs are restarted.  Maybe a proper blue green setup 
would allow us to make the best use of local recovery during restart of TMs. 
But during intermittent failures of one of the TMs local recovery works as 
expected on the other healthy TM instances ( I.e it does not download from 
remote ).

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try 
to redeploy tasks onto them also in case of a global failover. Only those tasks 
which have been executed on the lost TaskManager will need new slots and have 
to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole 
mailto:davcdhane...@gmail.com>> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod 
gets killed and restarts again ( i.e. the entire task manager process restarts 
) then local recovery doesn't happen. Task manager restore process actually 
downloads the latest completed checkpoint from the remote state handle even 
when the older localState data is available. This happens because with every 
run allocation-ids for tasks running on task manager change as task manager 
restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data 
kicks in when the task manager process is alive but due to some other reason ( 
like timeout from sink or external dependency ) one of the tasks fails and the 
flink job gets restarted by the job manager.

Please CMIIW


-
Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the 
recovery time. Apart from that you can also look for "Found registered local 
state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged 
on debug. This indicates that the local state is available. However, it does 
not say whether it is actually used. E.g. when doing a rescaling operation we 
change the assignment of key group ranges which prevents local state from being 
used. However in case of a recovery the above-mentioned log message should 
indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
mailto:tzuli...@apache.org>> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether 
there is a way to validate that some tasks of the job recovered from the local 
state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we 
run a job with parallelism 4. To simulate failure, we kill one of the Task 
Manager pods (we run on Kubernetes). I want to see if the local state of the 
other Task Manager was used or not. I do understand that the state for the 
killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a 
better way?

Thanks,
Sonam
--
- Dhanesh ( sent from my mobile device. Pardon me for any typos )


How to know if task-local recovery kicked in for some nodes?

2021-04-01 Thread Sonam Mandal
Hello,

We are experimenting with task local recovery and I wanted to know whether 
there is a way to validate that some tasks of the job recovered from the local 
state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we 
run a job with parallelism 4. To simulate failure, we kill one of the Task 
Manager pods (we run on Kubernetes). I want to see if the local state of the 
other Task Manager was used or not. I do understand that the state for the 
killed Task Manager will need to be fetched from the checkpoint.

Also, do you have any suggestions on how to test such failure scenarios in a 
better way?

Thanks,
Sonam


Question about setting up Task-local recovery with a RocksDB state backend

2021-04-01 Thread Sonam Mandal
Hello,

I've been going through the documentation for task-local recovery and came 
across this 
section
 which discusses that with incremental checkpoints enabled the task-local 
recovery incurs no additional storage cost. The caveat mentioned indicates that 
the task local recovery state and all the rocks DB local state must be on a 
single physical device to allow the use of hard links. I wanted to understand 
how to ensure that our RocksDB local state is on the same physical device as 
the task-local recovery data.

I came across a couple of config options we can set to point the RocksDB local 
state to a directory of our choosing, along with the task local recovery 
directory. Do I need to set both up for task local recovery to work correctly? 
What are the default paths if I don't set up these configs? (we are using 
Kubernetes - assume that /opt/flink/local-state below corresponds to a given 
physical drive)


state.backend.rocksdb.localdir: /opt/flink/local-state/rocksdblocaldir

taskmanager.state.local.root-dirs: /opt/flink/local-state/tasklocaldir

Do these configs make any difference if we turn off incremental checkpointing 
for RocksDB? Also, setting up this localdir for RocksDB won't affect 
checkpointing and where the checkpoints are stored, right?

After setting up the above two configs, I ran into some issues where the job 
would just disappear (or fail) if the Task Manager pod got killed (whereas 
without this, the job resumed correctly from the last checkpoint after the task 
manager pod was killed).

Thanks,
Sonam



Re: Failure detection in Flink

2021-03-30 Thread Sonam Mandal
Hi Till,

This is really helpful, thanks for the detailed explanation about what happens.
I'll reach out again if Ihave any further questions. For now I'm just trying to 
understand the various failure scenarios and how they are handled by Flink.

Thanks,
Sonam

From: Till Rohrmann 
Sent: Tuesday, March 30, 2021 8:33 AM
To: Sonam Mandal 
Cc: user@flink.apache.org 
Subject: Re: Failure detection in Flink

Well, the FLIP-6 documentation is probably the best resource albeit being a bit 
outdated.

The components react a bit differently:

JobMaster loses heartbeat with a TaskExecutor: If this happens, then the 
JobMaster will invalidate all slots from this TaskExecutor. This will then fail 
the tasks which have been deployed into these slots. This will then trigger a 
recovery of the affected pipelined region.

TaskExecutor loses heartbeat with a JobMaster: The TaskExecutor will fail all 
currently running tasks belonging to the timed out JobMaster. Moreover, it will 
release all intermediate result partitions it still keeps for this job. The 
slots for this JobMaster will transition to an inactive state. In this state, 
the TaskExecutor will try to reconnect to the JobMaster in order to offer ths 
slots. If this is not successful within a configurable timeout, these slots 
will be freed and returned to the ResourceManager.

JobMaster loses heartbeat with the ResourceManager: The JobMaster tries to 
reconnect to the ResourceManager. Until this has happened, the JobMaster cannot 
ask for new slots.

ResourceManager loses heartbeat with the JobMaster: The ResourceManager closes 
the connection to the JobMaster. Moreover, it registers a timeout until when a 
JobMaster needs to reconnect to it. If this does not happen, then the 
ResourceManager will clear the declared resources for the job and cleans up the 
internal bookkeeping data structures.

I hope this helps a bit to better understand the failover behavior.

If you want to know something in particular, then let me know.

Cheers,
Till

On Tue, Mar 30, 2021 at 4:13 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Till,

Thanks, this helps! Yes, removing the AKKA related configs will definitely help 
to reduce confusion.

One more question, I was going through FLIP-6 and it does talk about the 
behavior of various components when failures are detected via heartbeat 
timeouts etc. is this the best reference on how Flink reacts to such failure 
scenarios? If not, can you provide some details on how this works?

Thanks,
Sonam

Get Outlook for 
iOS<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fo0ukef=04%7C01%7Csomandal%40linkedin.com%7C928d45cec1d149798e8408d8f3912d95%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637527152155022486%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=anw4cdKJ6lNqwuZ%2BYh3NoJBXAIXKUBitb0nshvKf098%3D=0>

From: Till Rohrmann mailto:trohrm...@apache.org>>
Sent: Tuesday, March 30, 2021 5:02:43 AM
To: Sonam Mandal mailto:soman...@linkedin.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Failure detection in Flink

Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of components. 
This mechanism is independent of the used deployment model. The relevant 
configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka 
system. Since we are using TCP Akka's failure detector is not needed [2]. I 
think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism 
because in some deployment scenarios we have additional signals available which 
could help us with the detection. But so far we haven't made a lot of progress 
in this area.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#advanced-fault-tolerance-options<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdeployment%2Fconfig.html%23advanced-fault-tolerance-options=04%7C01%7Csomandal%40linkedin.com%7C928d45cec1d149798e8408d8f3912d95%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637527152155032479%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=Q0YFHynRpzlW8i4BnK3vLN2x3WFgl50OU%2BHnW1kRNVA%3D=0>
[2] 
https://doc.akka.io/docs/akka-enhancements/current/config-checker.html#transport-failure-detector<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdoc.akka.io%2Fdocs%2Fakka-enhancements%2Fcurrent%2Fconfig-checker.html%23transport-failure-detector=04%7C01%7Csomandal%40linkedin.com%7C928d45cec1d149798e8408d8f3912d95%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637527152155032479%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoi

Re: Failure detection in Flink

2021-03-30 Thread Sonam Mandal
Hi Till,

Thanks, this helps! Yes, removing the AKKA related configs will definitely help 
to reduce confusion.

One more question, I was going through FLIP-6 and it does talk about the 
behavior of various components when failures are detected via heartbeat 
timeouts etc. is this the best reference on how Flink reacts to such failure 
scenarios? If not, can you provide some details on how this works?

Thanks,
Sonam

Get Outlook for iOS<https://aka.ms/o0ukef>

From: Till Rohrmann 
Sent: Tuesday, March 30, 2021 5:02:43 AM
To: Sonam Mandal 
Cc: user@flink.apache.org 
Subject: Re: Failure detection in Flink

Hi Sonam,

Flink uses its own heartbeat implementation to detect failures of components. 
This mechanism is independent of the used deployment model. The relevant 
configuration options can be found here [1].

The akka.transport.* options are only for configuring the underlying Akka 
system. Since we are using TCP Akka's failure detector is not needed [2]. I 
think we should remove it in order to avoid confusion [3].

The community also thinks about improving the failure detection mechanism 
because in some deployment scenarios we have additional signals available which 
could help us with the detection. But so far we haven't made a lot of progress 
in this area.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#advanced-fault-tolerance-options<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdeployment%2Fconfig.html%23advanced-fault-tolerance-options=04%7C01%7Csomandal%40linkedin.com%7Ceca3936cc7ad4027255c08d8f373c55a%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637527025850803513%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=dtAezfRlQqrqrR1Ukw8PI71whGFeOzIn82b1RJrmcM0%3D=0>
[2] 
https://doc.akka.io/docs/akka-enhancements/current/config-checker.html#transport-failure-detector<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdoc.akka.io%2Fdocs%2Fakka-enhancements%2Fcurrent%2Fconfig-checker.html%23transport-failure-detector=04%7C01%7Csomandal%40linkedin.com%7Ceca3936cc7ad4027255c08d8f373c55a%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637527025850813507%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=ZgNg1wjoy9f2%2BYjVcPY9obdt%2BnSMf4udt9BO8FN1c80%3D=0>
[3] 
https://issues.apache.org/jira/browse/FLINK-22048<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-22048=04%7C01%7Csomandal%40linkedin.com%7Ceca3936cc7ad4027255c08d8f373c55a%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637527025850813507%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=LloCATLNcLBOvKnwV2STgCJLCJf42iC5phjpVOBvJRk%3D=0>

Cheers,
Till

On Mon, Mar 29, 2021 at 11:01 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

I'm looking for some resources around failure detection in Flink between the 
various components such as Task Manager, Job Manager, Resource Manager, etc. 
For example, how does the Job Manager detect that a Task Manager is down (long 
GC pause or it just crashed)?

There is some indication of the use of heartbeats, is this via Akka death 
watches or custom heartbeat implementation? Reason I ask is because some 
configurations for timeout are AKKA related, whereas others aren't. I would 
like to understand which timeouts are relevant to which pieces.

e.g. akka.transport.heartbeat.interval vs. heartbeat.interval
I see some earlier posts that mention akka.watch.heartbeat.interval, though 
this is not present on the latest configuration page for Flink.

Also, is this failure detection mechanism the same irrespective of the 
deployment model, i.e. Kubernetes/Yarn/Mesos?

Thanks,
Sonam




Re: Question about Reactive mode support

2021-03-11 Thread Sonam Mandal
Hi Robert,

Thanks for getting back to me. We are currently assessing Flink Standalone on 
Kubernetes and Native Flink on Kubernetes and haven't yet decided on which 
model we intend to use. We want to ensure that whichever model we choose, we'll 
be able to get the benefits of the new features added by the community.

>> We are certainly aware that support for active deployments is a much desired 
>> feature. The "problem" with the 1.13 implementation of reactive mode is that 
>> it will try to acquire infinite resources from an active resource manager.

Good point, thanks for explaining why this is a challenge for active mode. I'm 
wondering whether it may be helpful to have a min and max parallelism, and the 
actual parallelism be determined by the scaling policy mentioned next?

>> For integration with an active deployment, how would you like to control the 
>> scaling behavior of Flink? (for example via a REST API call to Flink's 
>> JobManager, or via a programmatic scaling policy, or a configured scaling 
>> policy? If you prefer a scaling policy, which metric would you like to 
>> consider?)

In the long term, I think having some kind of pluggable/extensible scaling 
policy would be best for users to allow flexibility in choosing metrics that 
are important for their use case. Making it configurable might make it easier 
to pick and choose different policies if they are available, without needing to 
make code changes.

Some possible metrics to start with could be related to resource utilization, 
such as CPU, memory, or other characteristics such as how much the job is 
lagging?

Since we are in early stages of just assessing what kind of deployment model 
we'd like to use, it's hard to say what will work best for us. We just want to 
see if reactive mode will be available in the future so that we can leverage it 
when we have more data.

Thanks,
Sonam



From: Robert Metzger 
Sent: Thursday, March 11, 2021 5:28 AM
To: Sonam Mandal 
Cc: user@flink.apache.org 
Subject: Re: Question about Reactive mode support

Hey Sonam,

I'm very happy to hear that you are interested in reactive mode. Your 
understanding of the limitations for 1.13 is correct. Note that you can deploy 
standalone Flink on Kubernetes [1]. I'm actually currently preparing a demo for 
this [2].

We are certainly aware that support for active deployments is a much desired 
feature. The "problem" with the 1.13 implementation of reactive mode is that it 
will try to acquire infinite resources from an active resource manager.

For integration with an active deployment, how would you like to control the 
scaling behavior of Flink? (for example via a REST API call to Flink's 
JobManager, or via a programmatic scaling policy, or a configured scaling 
policy? If you prefer a scaling policy, which metric would you like to 
consider?)

Best,
Sonam

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-master%2Fdocs%2Fdeployment%2Fresource-providers%2Fstandalone%2Fkubernetes%2F=04%7C01%7Csomandal%40linkedin.com%7C3bd03c356faa4ac2d9f708d8e491a82f%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637510661534528574%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=JocGO%2BfYHQf9ZtjjVjGzG0Mu0o1Oz3u4FTaTZZD3BiU%3D=0>
[2] 
https://github.com/rmetzger/flink-reactive-mode-k8s-demo<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Frmetzger%2Fflink-reactive-mode-k8s-demo=04%7C01%7Csomandal%40linkedin.com%7C3bd03c356faa4ac2d9f708d8e491a82f%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637510661534538566%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=xQJ4z821CA8XmO4S4Wu5AIiR9k4xb9LXahLQOT4sCk4%3D=0>
 (attention, this is really work in progress!)




On Wed, Mar 10, 2021 at 5:32 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

We were going through 
FlIP-159<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-159%253A%2BReactive%2BMode=04%7C01%7Csomandal%40linkedin.com%7C3bd03c356faa4ac2d9f708d8e491a82f%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637510661534538566%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=FXX%2FhtNZfNXdi5DrQGvFoO0o%2BDVovHLLM1izxLTAL2g%3D=0>
 and 
FLIP-160<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-160%253A%2BAdaptive%2BScheduler=04%7C01%7Csomandal%40linkedin.com%7C3bd03c356faa4ac2d9f708d8e491a82f%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637510661534548560%7CUnknown%7CTWFpbGZsb3d8eyJWIjoi

Question about Reactive mode support

2021-03-10 Thread Sonam Mandal
Hello,

We were going through 
FlIP-159
 and 
FLIP-160
 and found this feature of interest to us for auto-scaling purposes. The 
limitations indicate that Flink 1.13 will release this for standalone only and 
for application mode deployments only.

Will this be extended in future releases to other active deployments such as 
Native Flink on Kubernetes? What about session mode?

Thanks,
Sonam