[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-09-01 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17188291#comment-17188291
 ] 

Till Rohrmann commented on FLINK-17075:
---

[~chesnay] should we go ahead with merging this feature into Flink {{1.10.3}}?

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.2, 1.10.3
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-08-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171881#comment-17171881
 ] 

Zhu Zhu commented on FLINK-17075:
-

Thanks for the explanation [~trohrmann]
I will move it to 1.10.3.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.2
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-08-05 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171562#comment-17171562
 ] 

Till Rohrmann commented on FLINK-17075:
---

[~zhuzh] we wanted to give this feature a bit of exposure on the master to see 
whether it causes problems. That's why the ticket is still open. I would 
suggest to not include it in the {{1.10.2}} bug fix release but we should think 
about backporting it shortly afterwards so that we give this feature also a bit 
of testing time on the {{release-1.10}} branch [~chesnay].

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.2
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-08-05 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171449#comment-17171449
 ] 

Zhu Zhu commented on FLINK-17075:
-

Hi [~chesnay], I can see the PR is merged but the ticket is not closed yet.
Is it fully done? And do you think we should include it in release-1.10.2?

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.2
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-07-13 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1715#comment-1715
 ] 

Dian Fu commented on FLINK-17075:
-

[~chesnay] OK. Thanks for the confirmation.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-07-13 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156649#comment-17156649
 ] 

Chesnay Schepler commented on FLINK-17075:
--

[~dian.fu] No.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-07-12 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156443#comment-17156443
 ] 

Dian Fu commented on FLINK-17075:
-

Hi [~chesnay], I'm planning to preparing the first RC of 1.11.1 today or 
tomorrow. Would you like to include this fix into this release?

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-07-09 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17154462#comment-17154462
 ] 

Chesnay Schepler commented on FLINK-17075:
--

master: 2210aff098e486e44a1731d5306d56121a8a2246

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-06-25 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17145062#comment-17145062
 ] 

Chesnay Schepler commented on FLINK-17075:
--

[~trohrmann] yes, this was just a mistake while copying; 
{{ExecutionIdsProvider#getExecutions}} accepts a ResourceID, as does 
{{ExecutionDeploymentTracker#startTracking}}.
I've amended by comment accordingly,

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-06-25 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17144759#comment-17144759
 ] 

Till Rohrmann commented on FLINK-17075:
---

Thanks for creating this proposal [~chesnay]. It sounds good to me. I think we 
need the {{ExecutionIdsProvider}} to return the {{Executions}} which are 
deployed to a given {{ResourceID}}.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-06-24 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143818#comment-17143818
 ] 

Chesnay Schepler commented on FLINK-17075:
--

h3. Problem description:

The {{JobMaster}} keeps track of the state of all {{Executions}} belonging to a 
job. After deployment, this tracking relies on updates from the 
{{TaskExecutors}}, transmitted via dedicated RPC messages.
If one such message is lost then the tracked state may no longer match the 
actual one. In the worst case this prevents an {{Execution}} from ever reaching 
a terminal state, which in turn prevents the job from terminating.

h3. Proposed solution:

To prevent the worst case from happening, we propose that the {{TaskExecutor}} 
also submits a report of all currently deployed {{Tasks}} (identified by the 
{{ExecutionAttemptID}}) with each heartbeat. This allows us to detect 
discrepancies between the set of executions of the {{JobManager}} and 
{{TaskExecutor}}, and act accordingly.

This in the end boils down to a comparison of 2 {{Set}}.

If an execution exists only in the {{JobMaster}} set, then the execution was 
dropped by the {{TaskExecutor}}.
This could imply a loss of a terminal state transition.
We cannot determine which terminal state the task has reached, since all 
information was already cleaned up.
In this case we will fail the execution in the {{ExecutionGraph}}, typically 
resulting in a restart.

If an execution exists only in the {{TaskExecutor}} set, then some leftover 
task from a previous attempt is still running on the {{TaskExecutor}}.
In this case we will cancel the task on the {{TaskExecutor}}. Running jobs are 
unaffected.

If an execution exists in both sets, then we don't do anything.

h4. Required changes:

{{TaskExecutor}}
--

The existing {{TaskSlotTable}} supports iterating over all {{Tasks}} for a 
given {{JobID}}, allowing us to extract the {{ExecutionAttemptID}}.
>From this we generate a {{Set}}, and submit it via 
>heartbeats.


{{JobMaster}}
--

Here we need to be able to:
a) (un)track actually deployed {{Executions}}
c) cancel tasks on the {{TaskExecutor}}
d) fail tasks in the {{ExecutionGraph}}

These capabilities are split across 2 new components:
1) ExecutionDeploymentTracker
2) ExecutionDeploymentReconciler

1) The tracker lives in the Scheduler, with the following interface:

{code}
public interface ExecutionDeploymentTracker {
void startTrackingDeployment(ExecutionAttemptID deployment);

void stopTrackingDeployment(ExecutionAttemptID deployment);

Set getExecutions();
{code}

It's basically a {{Set}}.

The tracker is notified by the {{ExecutionGraph}} of deployed/finished 
executions through 2 new listeners:

{code}
public interface ExecutionDeploymentListener {
void onCompletedDeployment(ExecutionAttemptID execution);
}

public interface ExecutionStateUpdateListener {
void onStateUpdate(ExecutionAttemptID execution, ExecutionState 
newState);
}
{code}

{{onCompletedDeployment}} is called in {{Execution#deploy}} when the deployment 
future completes; an implementation will initiate the tracking.
{{onStateUpdate}} is called in {{Execution#transitionState}} on any successful 
state transition, an implementation will stop the tracking if the new state is 
a terminal one.

Note: The deployment listener is required since there is no dedicated state for 
a deployed task;
executions are switched to DEPLOYING, submitted to the {{TaskExecutor}},
and switched to running after an update from the {{TaskExecutor}}.
Since this update can be lost we cannot rely on it.
A dedicated DEPLOYED state would be preferable, but this would require too many 
changes to the {{ExecutionGraph}} at this time.

2) The reconciler lives in the {{JobMaster}} and uses the IDs provided by the 
tracker and {{TaskExecutor}} heartbeats to detect mismatches, and fire events 
accordingly.
By defining a {{ReconciliationHandler}} the {{JobMaster}} can decide how each 
case should be handled:

{code}
public interface ExecutionDeploymentReconciler {

// conceptual factory interface
interface Factory {
ExecutionDeploymentStateReconciler get(ReconciliationHandler 
trigger);
}

void reconcileExecutionStates(ResourceID origin, DeploymentReport 
deploymentReport, Set knownExecutionAttemptsIds);

interface ExecutionIdsProvider {
Set getExecutions();
}

interface ReconciliationHandler {
// fail the execution in the ExecutionGraph
void onMissingDeployment(ExecutionAttemptID deployment);

// cancel the task on the TaskExecutor
void onUnknownDeployment(ExecutionAttemptID deployment, 
ResourceID hostingTaskExecutor);
}
{code}

> Add task status reconciliation between TM and JM
> 

[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-06-15 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135729#comment-17135729
 ] 

Till Rohrmann commented on FLINK-17075:
---

This sounds good to me [~chesnay].

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-06-15 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135689#comment-17135689
 ] 

Till Rohrmann commented on FLINK-17075:
---

Hmm, the best solution which comes to my mind is to introduce a new 
{{ExecuteState.DEPLOYED}} which is being set after the 
{{TaskExecutor.submitTask}} acknowledges the task submission. The semantic of 
this state would be that there is now something running on the the 
{{TaskExecutor}}.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-06-15 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135688#comment-17135688
 ] 

Chesnay Schepler commented on FLINK-17075:
--

I guess we must only consider tasks for which the deployment future has 
completed.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-06-15 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135672#comment-17135672
 ] 

Chesnay Schepler commented on FLINK-17075:
--

How should we handle tasks that are in a {{DEPLOYING}} state? Tasks are in this 
state before anything was sent to the TaskExecutor; if we receive a heartbeat 
between this state transition and the actual deployment we would fail the task 
for no reason.
We can't just ignore tasks in this state, because the we also have to handle 
cases where the update that the task is running can be lost.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-05-16 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17109116#comment-17109116
 ] 

Till Rohrmann commented on FLINK-17075:
---

As a band aid I would suggest to add a small number of retries and then to fail 
fatally if we could not update the task state. This would, at least, not leave 
the cluster in an unusable state.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0, 1.10.2
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-21 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088435#comment-17088435
 ] 

Till Rohrmann commented on FLINK-17075:
---

Hi [~vdhar], yes I think this fix can be backported. The current fix version is 
the intention. I'm not entirely sure whether we can make it. In any case, it 
will come at least with one of the bug fix releases.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.10.1, 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-20 Thread vineeta dhar (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087952#comment-17087952
 ] 

vineeta dhar commented on FLINK-17075:
--

Hello, I am a developer from the team that faced the issue mentioned by Bruce 
Hanson in mail thread.

The fix version on this issue says 1.11.

My organization plans to upgrade to 1.10, would it be possible to get the fix 
in 1.10 as well??

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-18 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17086706#comment-17086706
 ] 

Zhu Zhu commented on FLINK-17075:
-

Thanks for the explanation! I think the heartbeat way could work well as a 
safety net.
Agreed that it's best to have both of the approaches. With the heartbeat 
payload safety net, we only need finite state update retries which is simpler 
than infinite retries.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-17 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085677#comment-17085677
 ] 

Till Rohrmann commented on FLINK-17075:
---

1. If the {{updateTaskExecutionState}} will be triggered from the Actor's main 
thread, then there should be no inconsistency because the heartbeats are also 
sent from the main thread.
2. I think it is fine to simply send the states of all currently running 
{{Tasks}}. If the JM should see that it has an {{Execution}} which is not 
contained in this set, then it can fail it. I think we don't have to ensure 
that the right state is being transmitted via the heartbeats since it mainly 
acts as a safety net to avoid that the job never finishes and the cluster 
deadlocks.

I actually think that a combination of both approaches would be best. We could 
add as first simple fix a limited number of retries. Next, we could establish 
the safety net which sends the current states of all running {{Tasks}}. If the 
JM sees that one of its tasks is not contained in this set, then it will fail 
it to trigger recovery.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-16 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084675#comment-17084675
 ] 

Zhu Zhu commented on FLINK-17075:
-

Regarding the approach to report task states via heartbeat payloads, my main 
concerns are
1. we may need to deal with the inconsistency of states reported by 
{{updateTaskExecutionState}} and the states reported by heartbeat
2. the TM may need to keep the task states even if the task has terminated. 
Otherwise if the state is missing, the JM can hardly tell whether a task is not 
received yet or had terminated, and what is the exact terminated 
state(CANCELED/FAILED/FINISHED). The states can be cleared only after TM is 
disassociated from that JM/job.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-16 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084588#comment-17084588
 ] 

Zhu Zhu commented on FLINK-17075:
-

We can do retries for final states notification only, so that even if a state 
is notified multiple times, the latter ones will be ignored. And 
{{updateTaskExecutionState}} method can just stay as is.
Agreed that the retries should be canceled once heartbeat timeout happens. We 
can build a table to store which job the retries belong to and they can be 
removed once the TM is disassociated from the JM on heartbeat timeouts.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-15 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084188#comment-17084188
 ] 

Till Rohrmann commented on FLINK-17075:
---

I think this could work. The downside I see is that it would complicate the 
{{updateTaskExecutionState}} method. Depending on the reported state one either 
retries or not. Additionally, we would also have to manage the retry operations 
because in case of a heartbeat failure we should stop them as they should not 
continue indefinitely.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-15 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083989#comment-17083989
 ] 

Zhu Zhu commented on FLINK-17075:
-

I mean to not add a limit to the retry count. If the retry keeps failing, then 
the heartbeat notification should fail as well. So heartbeat timeout handling 
at JM side would also help to trigger a failover in this case, and the TM would 
not keep retrying indefinitely.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-15 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083964#comment-17083964
 ] 

Till Rohrmann commented on FLINK-17075:
---

I agree that retries will decrease the likelihood of missed messages. However, 
I don't fully understand why this will guarantee that the JM and TM won't go 
out of sync. Can't it still happen that all retries (assuming finite number of 
retries) for a final state update will fail and, hence, the JM will never learn 
about the terminal state? Also failing the {{Task}} on the {{TaskExecutor}} 
won't guarantee that the {{FAILING}} state transition will be sent to the JM 
(it will simply increase the number of retries of 
{{updateTaskExecutionState}}). 

Note that if the heartbeats fail, then the JM will disconnect from the TM which 
has the effect to fail all {{Executions}}. I think the situation we are 
discussing here is that there is a short-lived network problem which can be 
recovered by Akka and does not trigger the heartbeat to fail.

> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17075) Add task status reconciliation between TM and JM

2020-04-15 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083845#comment-17083845
 ] 

Zhu Zhu commented on FLINK-17075:
-

>From the current implementation of {{TaskExecutor#updateTaskExecutionState}}, 
>TM will fail the task if the {{updateTaskExecutionState(...)}} RPC fails. 
>However, if it is a final state notification, the task should have been 
>unregistered from the TM right before the RPC, and the {{failTask(...)}} 
>handling will not take any effect.

So how about adding retry logic for calling 
{{JobMasterGateway#updateTaskExecutionState(...)}} in TaskExecutor, unless the 
PRC succeeds or fails with expected errors (like ExecutionGraphException)? Or 
maybe just retry for final states update RPC and {{failTask(...)}} on none 
final state update RPC in cases that the RPC fails.
This guarantees that task states in JM and TM can be finally synced. It is more 
lightweight than adding task state payloads in TM-JM heartbeat, and nothing 
needs to be changed in JM. If the retry keeps failing for network issues, the 
heartbeat would also fail I think.


> Add task status reconciliation between TM and JM
> 
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.11.0
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)