[jira] [Created] (FLINK-5830) OutOfMemoryError during notify final state in TaskExecutor may cause job stuck

2017-02-17 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5830:


 Summary: OutOfMemoryError during notify final state in 
TaskExecutor may cause job stuck
 Key: FLINK-5830
 URL: https://issues.apache.org/jira/browse/FLINK-5830
 Project: Flink
  Issue Type: Bug
Reporter: Zhijiang Wang


The scenario is like this:

{{JobMaster}} tries to cancel all the executions when process failed execution, 
and the task executor already acknowledge the cancel rpc message.
When notify the final state in {{TaskExecutor}}, it causes OOM in 
{{AkkaRpcActor}} and this error is caught to log the info. The final state will 
not be sent any more.
The {{JobMaster}} can not receive the final state and trigger the restart 
strategy.

One solution is to catch the {{OutOfMemoryError}} and throw it, then it will 
cause to shut down the {{ActorSystem}} resulting in exiting the 
{{TaskExecutor}}. The {{JobMaster}} can be notified of {{TaskExecutor}} failure 
and fail all the tasks to trigger restart successfully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5799) Let RpcService.scheduleRunnable return ScheduledFuture

2017-02-14 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867126#comment-15867126
 ] 

Zhijiang Wang commented on FLINK-5799:
--

This fix is useful and the {{HeartbeatManagerSenderImpl}} can get benefit from 
it. After this is done, it is better for {{HeartbeatManagerSenderImpl}} to use 
the RpcService.scheduleRunnable to schedule heartbeat by interval time.

> Let RpcService.scheduleRunnable return ScheduledFuture
> --
>
> Key: FLINK-5799
> URL: https://issues.apache.org/jira/browse/FLINK-5799
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.3.0
>
>
> Currently, the method {{RpcService.scheduleRunnable}} does not return a 
> control instance for the scheduled runnable. I think it would be good to 
> return a {{ScheduledFuture}} with which one can cancel the scheduled runnable 
> after it has been scheduled, e.g. a timeout registration which became 
> obsolete. This API is also more in line with the {{ScheduledExecutorService}} 
> where one also receives a {{ScheduledFuture}} after scheduling a runnable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5799) Let RpcService.scheduleRunnable return ScheduledFuture

2017-02-14 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867126#comment-15867126
 ] 

Zhijiang Wang edited comment on FLINK-5799 at 2/15/17 2:56 AM:
---

This fix is useful and the {{HeartbeatManagerSenderImpl}} can get benefit from 
it. After this is done, it is better for {{HeartbeatManagerSenderImpl}} to use 
the {{RpcService.scheduleRunnable}} to schedule heartbeat by interval time.


was (Author: zjwang):
This fix is useful and the {{HeartbeatManagerSenderImpl}} can get benefit from 
it. After this is done, it is better for {{HeartbeatManagerSenderImpl}} to use 
the RpcService.scheduleRunnable to schedule heartbeat by interval time.

> Let RpcService.scheduleRunnable return ScheduledFuture
> --
>
> Key: FLINK-5799
> URL: https://issues.apache.org/jira/browse/FLINK-5799
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.3.0
>
>
> Currently, the method {{RpcService.scheduleRunnable}} does not return a 
> control instance for the scheduled runnable. I think it would be good to 
> return a {{ScheduledFuture}} with which one can cancel the scheduled runnable 
> after it has been scheduled, e.g. a timeout registration which became 
> obsolete. This API is also more in line with the {{ScheduledExecutorService}} 
> where one also receives a {{ScheduledFuture}} after scheduling a runnable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5798) Let the RPCService provide a ScheduledExecutorService

2017-02-14 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867116#comment-15867116
 ] 

Zhijiang Wang commented on FLINK-5798:
--

That is good, and the heartbeat manager component can reuse the 
ScheduledExecutorService in RPCService now.

> Let the RPCService provide a ScheduledExecutorService
> -
>
> Key: FLINK-5798
> URL: https://issues.apache.org/jira/browse/FLINK-5798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.3.0
>
>
> Currently the {{RPCService}} interface provides a {{scheduleRunnable}} method 
> to schedule {{Runnables}}. I would like to generalize this functionality by 
> letting the {{RPCService}} provide a {{ScheduledExecutorService}} to the 
> user. That way other components which require such an executor service could 
> simply use the one provided by the {{RPCService}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5133) Support to set resource for operator in DataStream and DataSet

2017-02-14 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5133:
-
Summary: Support to set resource for operator in DataStream and DataSet  
(was: Add new setResource API for DataStream and DataSet)

> Support to set resource for operator in DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-02-13 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5501:


Assignee: shuai.xu  (was: Zhijiang Wang)

> Determine whether the job starts from last JobManager failure
> -
>
> Key: FLINK-5501
> URL: https://issues.apache.org/jira/browse/FLINK-5501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: shuai.xu
>
> When the {{JobManagerRunner}} grants leadership, it should check whether the 
> current job is already running or not. If the job is running, the 
> {{JobManager}} should reconcile itself (enter RECONCILING state) and waits 
> for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} 
> can schedule the {{ExecutionGraph}} in common way.
> The {{RunningJobsRegistry}} can provide the way to check the job running 
> status, but we should expand the current interface and fix the related 
> process to support this function.
> 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
> granting leadership at the first time.
> 2. If the job finishes, the job status will be set FINISHED by 
> {{RunningJobsRegistry}} and the status will be deleted before exit. 
> 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader 
> {{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
> other {{JobManagerRunner}} will exit after grants the leadership again.
> 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So 
> if the {{JobManagerRunner}} (the previous or new one) grants leadership 
> again, it will check the job status and enters {{RECONCILING}} state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports

2017-02-13 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5703:


Assignee: Zhijiang Wang  (was: shuai.xu)

> ExecutionGraph recovery based on reconciliation with TaskManager reports
> 
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
> Execution)
> - ExecutionVertex: Map IntermediateResultPartition>
> - ExecutionGraph: ConcurrentHashMap
> For {{RECONCILING}} ExecutionState, it should be transition into any existing 
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
> TaskManger should maintain the terminal task state 
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
> mechanism in another jira. In addition, the state transition would trigger 
> different actions, and some actions rely on above necessary information. 
> Considering this limit, the recovery process will be divided into two steps:
> - First, recovery all other necessary information except ExecutionState.
> - Second, transition ExecutionState into real task state and trigger 
> actions. The behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC 
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) 
> from TaskManager should be refused temporarily and responded with a special 
> message by JobMaster. Then the TaskManager should retry to send these 
> messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states 
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
> - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
> duration time and all the tasks are in {{RUNNING}} states.
> - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report 
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
> - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
> duration time and all the tasks are in {{FINISHED}} states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports

2017-02-13 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5703:


Assignee: (was: Zhijiang Wang)

> ExecutionGraph recovery based on reconciliation with TaskManager reports
> 
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Zhijiang Wang
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
> Execution)
> - ExecutionVertex: Map IntermediateResultPartition>
> - ExecutionGraph: ConcurrentHashMap
> For {{RECONCILING}} ExecutionState, it should be transition into any existing 
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
> TaskManger should maintain the terminal task state 
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
> mechanism in another jira. In addition, the state transition would trigger 
> different actions, and some actions rely on above necessary information. 
> Considering this limit, the recovery process will be divided into two steps:
> - First, recovery all other necessary information except ExecutionState.
> - Second, transition ExecutionState into real task state and trigger 
> actions. The behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC 
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) 
> from TaskManager should be refused temporarily and responded with a special 
> message by JobMaster. Then the TaskManager should retry to send these 
> messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states 
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
> - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
> duration time and all the tasks are in {{RUNNING}} states.
> - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report 
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
> - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
> duration time and all the tasks are in {{FINISHED}} states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports

2017-02-13 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5703:


Assignee: shuai.xu

> ExecutionGraph recovery based on reconciliation with TaskManager reports
> 
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Zhijiang Wang
>Assignee: shuai.xu
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
> Execution)
> - ExecutionVertex: Map IntermediateResultPartition>
> - ExecutionGraph: ConcurrentHashMap
> For {{RECONCILING}} ExecutionState, it should be transition into any existing 
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
> TaskManger should maintain the terminal task state 
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
> mechanism in another jira. In addition, the state transition would trigger 
> different actions, and some actions rely on above necessary information. 
> Considering this limit, the recovery process will be divided into two steps:
> - First, recovery all other necessary information except ExecutionState.
> - Second, transition ExecutionState into real task state and trigger 
> actions. The behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC 
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) 
> from TaskManager should be refused temporarily and responded with a special 
> message by JobMaster. Then the TaskManager should retry to send these 
> messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states 
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
> - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
> duration time and all the tasks are in {{RUNNING}} states.
> - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report 
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
> - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
> duration time and all the tasks are in {{FINISHED}} states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2017-02-09 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860543#comment-15860543
 ] 

Zhijiang Wang commented on FLINK-5132:
--

Thank you, [~StephanEwen]. Yeah, it also makes sense to check valid values in 
the constructors, currently it provides the {{isValid}} method and will be 
called when setting to the operator.

> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
> Fix For: 1.3.0
>
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports

2017-02-03 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5703:
-
Description: 
The ExecutionGraph structure would be recovered from TaskManager reports during 
reconciling period, and the necessary information includes:

- Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
Execution)
- ExecutionVertex: Map
- ExecutionGraph: ConcurrentHashMap

For {{RECONCILING}} ExecutionState, it should be transition into any existing 
task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
TaskManger should maintain the terminal task state 
({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
mechanism in another jira. In addition, the state transition would trigger 
different actions, and some actions rely on above necessary information. 
Considering this limit, the recovery process will be divided into two steps:

- First, recovery all other necessary information except ExecutionState.
- Second, transition ExecutionState into real task state and trigger 
actions. The behavior is the same with current {{UpdateTaskExecutorState}}.

To make logic easy and consistency, during recovery period, all the other RPC 
messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from 
TaskManager should be refused temporarily and responded with a special message 
by JobMaster. Then the TaskManager should retry to send these messages later 
until JobManager ends recovery and acknowledgement.

For {{RECONCILING}} JobStatus, it would be transition into one of the states 
({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.

- {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
duration time and all the tasks are in {{RUNNING}} states.
- {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in 
time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
- {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
duration time and all the tasks are in {{FINISHED}} states.

  was:
The ExecutionGraph structure would be recovered from TaskManager reports during 
reconciling period, and the necessary information includes:

- Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
Execution)
- ExecutionVertex: Map
- ExecutionGraph: ConcurrentHashMap

For {{RECONCILING}} ExecutionState, it should be transition into any existing 
task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
TaskManger should maintain the terminal task state 
({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
mechanism in another jira. In addition, the state transition would trigger 
different actions, and some actions rely on above necessary information. 
Considering this limit, the recovery process will be divided into two steps:

- First, recovery all other necessary information except ExecutionState.
- Second, transition ExecutionState into real task state and trigger 
actions. The behavior is the same with current {{UpdateTaskExecutorState}}.

To make logic easy and consistency, during recovery period, all the other RPC 
messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from 
TaskManager should be refused temporarily and responded with a special message 
by JobMaster. Then the TaskManager should retry to send these messages later 
until JobManager ends recovery and acknowledgement.

The {{RECONCILING}} JobStatus, it would be transition into one of the states 
({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.

- {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
duration time and all the tasks are in {{RUNNING}} states.
- {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in 
time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
- {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
duration time and all the tasks are in {{FINISHED}} states.


> ExecutionGraph recovery based on reconciliation with TaskManager reports
> 
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChan

[jira] [Updated] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports

2017-02-03 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5703:
-
Description: 
The ExecutionGraph structure would be recovered from TaskManager reports during 
reconciling period, and the necessary information includes:

- Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
Execution)
- ExecutionVertex: Map
- ExecutionGraph: ConcurrentHashMap

For {{RECONCILING}} ExecutionState, it should be transition into any existing 
task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
TaskManger should maintain the terminal task state 
({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
mechanism in another jira. In addition, the state transition would trigger 
different actions, and some actions rely on above necessary information. 
Considering this limit, the recovery process will be divided into two steps:

- First, recovery all other necessary information except ExecutionState.
- Second, transition ExecutionState into real task state and trigger 
actions. The behavior is the same with current {{UpdateTaskExecutorState}}.

To make logic easy and consistency, during recovery period, all the other RPC 
messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from 
TaskManager should be refused temporarily and responded with a special message 
by JobMaster. Then the TaskManager should retry to send these messages later 
until JobManager ends recovery and acknowledgement.

The {{RECONCILING}} JobStatus, it would be transition into one of the states 
({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.

- {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
duration time and all the tasks are in {{RUNNING}} states.
- {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in 
time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
- {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
duration time and all the tasks are in {{FINISHED}} states.

  was:
The ExecutionGraph structure would be recovered from TaskManager reports during 
reconciling period, and the necessary information includes:

- Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
Execution)
- ExecutionVertex: Map
- ExecutionGraph: ConcurrentHashMap

For {{RECONCILING}} ExecutionState, it should be transition into any existing 
task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
TaskManger should maintain the terminal task state 
({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
mechanism in another jira. In addition, the state transition would trigger 
different actions, and some actions rely on above necessary information. 
Considering this limit, the recovery process will be divided into two steps:

- First, recovery all other necessary information except ExecutionState.
- Second, transition ExecutionState into real task state and trigger 
actions. The behavior is the same with {{UpdateTaskExecutorState}}.

To make logic easy and consistency, during recovery period, all the other RPC 
messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from 
TaskManager should be refused temporarily and responded with a special message 
by JobMaster. Then the TaskManager should retry to send these messages later 
until JobManager ends recovery and acknowledgement.

The {{RECONCILING}} JobStatus, it would be transition into one of the states 
({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.

- {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
duration time and all the tasks are in {{RUNNING}} states.
- {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in 
time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
- {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
duration time and all the tasks are in {{FINISHED}} states.


> ExecutionGraph recovery based on reconciliation with TaskManager reports
> 
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChannelDeplo

[jira] [Updated] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports

2017-02-03 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5703:
-
Description: 
The ExecutionGraph structure would be recovered from TaskManager reports during 
reconciling period, and the necessary information includes:

- Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
Execution)
- ExecutionVertex: Map
- ExecutionGraph: ConcurrentHashMap

For {{RECONCILING}} ExecutionState, it should be transition into any existing 
task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
TaskManger should maintain the terminal task state 
({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
mechanism in another jira. In addition, the state transition would trigger 
different actions, and some actions rely on above necessary information. 
Considering this limit, the recovery process will be divided into two steps:

- First, recovery all other necessary information except ExecutionState.
- Second, transition ExecutionState into real task state and trigger 
actions. The behavior is the same with {{UpdateTaskExecutorState}}.

To make logic easy and consistency, during recovery period, all the other RPC 
messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from 
TaskManager should be refused temporarily and responded with a special message 
by JobMaster. Then the TaskManager should retry to send these messages later 
until JobManager ends recovery and acknowledgement.

The {{RECONCILING}} JobStatus, it would be transition into one of the states 
({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.

- {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
duration time and all the tasks are in {{RUNNING}} states.
- {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in 
time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
- {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
duration time and all the tasks are in {{FINISHED}} states.

  was:
The ExecutionGraph structure would be recovered from TaskManager reports during 
reconciling period, and the necessary information includes:

- Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor
- ExecutionVertex: Map
- ExecutionGraph: ConcurrentHashMap

For {{RECONCILING}} ExecutionState, it should be transition into any existing 
task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
TaskManger should maintain the terminal task state 
({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
mechanism in another jira. In addition, the state transition would trigger 
different actions, and some actions rely on above necessary information. 
Considering this limit, the recovery process will be divided into two steps:

- First, recovery all other necessary information except ExecutionState.
- Second, transition ExecutionState into real task state and trigger 
actions. The behavior is the same with {{UpdateTaskExecutorState}}.

To make logic easy and consistency, during recovery period, all the other RPC 
messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from 
TaskManager should be refused temporarily and responded with a special message 
by JobMaster. Then the TaskManager should retry to send these messages later 
until JobManager ends recovery and acknowledgement.

The {{RECONCILING}} JobStatus, it would be transition into one of the states 
({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.

- {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
duration time and all the tasks are in {{RUNNING}} states.
- {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in 
time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
- {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
duration time and all the tasks are in {{FINISHED}} states.


> ExecutionGraph recovery based on reconciliation with TaskManager reports
> 
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
> E

[jira] [Updated] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports

2017-02-03 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5703:
-
Description: 
The ExecutionGraph structure would be recovered from TaskManager reports during 
reconciling period, and the necessary information includes:

- Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor
- ExecutionVertex: Map
- ExecutionGraph: ConcurrentHashMap

For {{RECONCILING}} ExecutionState, it should be transition into any existing 
task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
TaskManger should maintain the terminal task state 
({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
mechanism in another jira. In addition, the state transition would trigger 
different actions, and some actions rely on above necessary information. 
Considering this limit, the recovery process will be divided into two steps:

- First, recovery all other necessary information except ExecutionState.
- Second, transition ExecutionState into real task state and trigger 
actions. The behavior is the same with {{UpdateTaskExecutorState}}.

To make logic easy and consistency, during recovery period, all the other RPC 
messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from 
TaskManager should be refused temporarily and responded with a special message 
by JobMaster. Then the TaskManager should retry to send these messages later 
until JobManager ends recovery and acknowledgement.

The {{RECONCILING}} JobStatus, it would be transition into one of the states 
({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.

- {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
duration time and all the tasks are in {{RUNNING}} states.
- {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in 
time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
- {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
duration time and all the tasks are in {{FINISHED}} states.

> ExecutionGraph recovery based on reconciliation with TaskManager reports
> 
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor
> - ExecutionVertex: Map IntermediateResultPartition>
> - ExecutionGraph: ConcurrentHashMap
> For {{RECONCILING}} ExecutionState, it should be transition into any existing 
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
> TaskManger should maintain the terminal task state 
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
> mechanism in another jira. In addition, the state transition would trigger 
> different actions, and some actions rely on above necessary information. 
> Considering this limit, the recovery process will be divided into two steps:
> - First, recovery all other necessary information except ExecutionState.
> - Second, transition ExecutionState into real task state and trigger 
> actions. The behavior is the same with {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC 
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) 
> from TaskManager should be refused temporarily and responded with a special 
> message by JobMaster. Then the TaskManager should retry to send these 
> messages later until JobManager ends recovery and acknowledgement.
> The {{RECONCILING}} JobStatus, it would be transition into one of the states 
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
> - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
> duration time and all the tasks are in {{RUNNING}} states.
> - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report 
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
> - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
> duration time and all the tasks are in {{FINISHED}} states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports

2017-02-02 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5703:


 Summary: ExecutionGraph recovery based on reconciliation with 
TaskManager reports
 Key: FLINK-5703
 URL: https://issues.apache.org/jira/browse/FLINK-5703
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhijiang Wang
Assignee: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-01-18 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827892#comment-15827892
 ] 

Zhijiang Wang commented on FLINK-4364:
--

[~till.rohrmann], I closed the previous pull request for this issue, and 
re-submitted it based on current master branch. Please review for your free 
time, then I can continue submitting the following codes based on it. Thank you!

> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-01-18 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827361#comment-15827361
 ] 

Zhijiang Wang edited comment on FLINK-5501 at 1/18/17 8:42 AM:
---

[~StephanEwen]Thank you for the quick response!

Yeah, you already considered all the feasible alternatives to implement this 
goal and I totally agreed with that.

1. For extending the leader election service, I also thought of this way before 
implementation. For currently {{ZookeeperLeaderElectionService}}, the leader 
node is EPHEMERAL type, if the incrementing number is carried in this node, it 
should be changed to PERSISTENT type, otherwise there should add another node 
for incrementing number. This way is very similar with by 
{{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be 
more suitable. But from minimum change aspect, I already implemented that by 
{{RunningJobsRegistry}}.

2. Actually I did not think of this way before, and it is an total different 
idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader 
change and will be re-register the new leader after changed. So the 
{{JobManager}} can resort to the registration process to determine the status.
But it may be complicated to coordinate between common schedule and 
reconciling, because they will be triggered at the same time. And also it will 
bring more resource waste temporarily. If the JobManager can determine the 
status after startup in an easy way, it can do the specific process and no need 
to do ambiguous thing.

In summary, I prefer the first way to implement the goal. And the whole 
{{JobManager}} failure feature has been finished in my side, could I submit the 
pull request for this issue based on {{RunningJobsRegistry}} implementation?


was (Author: zjwang):
Thank you for the quick response!

Yeah, you already considered all the feasible alternatives to implement this 
goal and I totally agreed with that.

1. For extending the leader election service, I also thought of this way before 
implementation. For currently {{ZookeeperLeaderElectionService}}, the leader 
node is EPHEMERAL type, if the incrementing number is carried in this node, it 
should be changed to PERSISTENT type, otherwise there should add another node 
for incrementing number. This way is very similar with by 
{{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be 
more suitable. But from minimum change aspect, I already implemented that by 
{{RunningJobsRegistry}}.

2. Actually I did not think of this way before, and it is an total different 
idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader 
change and will be re-register the new leader after changed. So the 
{{JobManager}} can resort to the registration process to determine the status.
But it may be complicated to coordinate between common schedule and 
reconciling, because they will be triggered at the same time. And also it will 
bring more resource waste temporarily. If the JobManager can determine the 
status after startup in an easy way, it can do the specific process and no need 
to do ambiguous thing.

In summary, I prefer the first way to implement the goal. And the whole 
{{JobManager}} failure feature has been finished in my side, could I submit the 
pull request for this issue based on {{RunningJobsRegistry}} implementation?

> Determine whether the job starts from last JobManager failure
> -
>
> Key: FLINK-5501
> URL: https://issues.apache.org/jira/browse/FLINK-5501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When the {{JobManagerRunner}} grants leadership, it should check whether the 
> current job is already running or not. If the job is running, the 
> {{JobManager}} should reconcile itself (enter RECONCILING state) and waits 
> for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} 
> can schedule the {{ExecutionGraph}} in common way.
> The {{RunningJobsRegistry}} can provide the way to check the job running 
> status, but we should expand the current interface and fix the related 
> process to support this function.
> 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
> granting leadership at the first time.
> 2. If the job finishes, the job status will be set FINISHED by 
> {{RunningJobsRegistry}} and the status will be deleted before exit. 
> 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader 
> {{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
> other {{JobManagerRunner}} will exit after grants the leadership again.
> 4. If the {{JobManager}} fails, the job status will be still in 

[jira] [Comment Edited] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-01-17 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827361#comment-15827361
 ] 

Zhijiang Wang edited comment on FLINK-5501 at 1/18/17 3:16 AM:
---

Thank you for the quick response!

Yeah, you already considered all the feasible alternatives to implement this 
goal and I totally agreed with that.

1. For extending the leader election service, I also thought of this way before 
implementation. For currently {{ZookeeperLeaderElectionService}}, the leader 
node is EPHEMERAL type, if the incrementing number is carried in this node, it 
should be changed to PERSISTENT type, otherwise there should add another node 
for incrementing number. This way is very similar with by 
{{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be 
more suitable. But from minimum change aspect, I already implemented that by 
{{RunningJobsRegistry}}.

2. Actually I did not think of this way before, and it is an total different 
idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader 
change and will be re-register the new leader after changed. So the 
{{JobManager}} can resort to the registration process to determine the status.
But it may be complicated to coordinate between common schedule and 
reconciling, because they will be triggered at the same time. And also it will 
bring more resource waste temporarily. If the JobManager can determine the 
status after startup in an easy way, it can do the specific process and no need 
to do ambiguous thing.

In summary, I prefer the first way to implement the goal. And the whole 
{{JobManager}} failure feature has been finished in my side, could I submit the 
pull request for this issue based on {{RunningJobsRegistry}} implementation?


was (Author: zjwang):
Thank you for the quick response!

Yeah, you already considered all the feasible alternatives to implement this 
goal and I totally agreed with that.

1. For extending the leader election service, I also thought of this way before 
implementation. For currently {{ZookeeperLeaderElectionService}}, the leader 
node is EPHEMERAL type, if the incrementing number is carried in this node, it 
should be changed to PERSISTENT type, otherwise there should add another node 
for incrementing number. This way is very similar with by 
{{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be 
more suitable. But from minimum change aspect, I already implemented that by 
{{RunningJobsRegistry}}.

2. Actually I did not think of this way before, and it is an total different 
idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader 
change and will be re-register the new leader after changed. So the 
{{JobManager}} can resort to the registration process to determine the status.
But it may be complicated to coordinate between common schedule and 
reconciling, between they will be triggered at the same time. And also it will 
bring more resource waste temporarily. If the JobManager can determine the 
status after startup in an easy way, it can do the specific process and no need 
to do ambiguous thing.

In summary, I prefer the first way to implement the goal. And the whole 
{{JobManager}} failure feature has been finished in my side, could I submit the 
pull request for this issue based on {{RunningJobsRegistry}} implementation?

> Determine whether the job starts from last JobManager failure
> -
>
> Key: FLINK-5501
> URL: https://issues.apache.org/jira/browse/FLINK-5501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When the {{JobManagerRunner}} grants leadership, it should check whether the 
> current job is already running or not. If the job is running, the 
> {{JobManager}} should reconcile itself (enter RECONCILING state) and waits 
> for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} 
> can schedule the {{ExecutionGraph}} in common way.
> The {{RunningJobsRegistry}} can provide the way to check the job running 
> status, but we should expand the current interface and fix the related 
> process to support this function.
> 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
> granting leadership at the first time.
> 2. If the job finishes, the job status will be set FINISHED by 
> {{RunningJobsRegistry}} and the status will be deleted before exit. 
> 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader 
> {{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
> other {{JobManagerRunner}} will exit after grants the leadership again.
> 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So 
>

[jira] [Commented] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-01-17 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827361#comment-15827361
 ] 

Zhijiang Wang commented on FLINK-5501:
--

Thank you for the quick response!

Yeah, you already considered all the feasible alternatives to implement this 
goal and I totally agreed with that.

1. For extending the leader election service, I also thought of this way before 
implementation. For currently {{ZookeeperLeaderElectionService}}, the leader 
node is EPHEMERAL type, if the incrementing number is carried in this node, it 
should be changed to PERSISTENT type, otherwise there should add another node 
for incrementing number. This way is very similar with by 
{{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be 
more suitable. But from minimum change aspect, I already implemented that by 
{{RunningJobsRegistry}}.

2. Actually I did not think of this way before, and it is an total different 
idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader 
change and will be re-register the new leader after changed. So the 
{{JobManager}} can resort to the registration process to determine the status.
But it may be complicated to coordinate between common schedule and 
reconciling, between they will be triggered at the same time. And also it will 
bring more resource waste temporarily. If the JobManager can determine the 
status after startup in an easy way, it can do the specific process and no need 
to do ambiguous thing.

In summary, I prefer the first way to implement the goal. And the whole 
{{JobManager}} failure feature has been finished in my side, could I submit the 
pull request for this issue based on {{RunningJobsRegistry}} implementation?

> Determine whether the job starts from last JobManager failure
> -
>
> Key: FLINK-5501
> URL: https://issues.apache.org/jira/browse/FLINK-5501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When the {{JobManagerRunner}} grants leadership, it should check whether the 
> current job is already running or not. If the job is running, the 
> {{JobManager}} should reconcile itself (enter RECONCILING state) and waits 
> for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} 
> can schedule the {{ExecutionGraph}} in common way.
> The {{RunningJobsRegistry}} can provide the way to check the job running 
> status, but we should expand the current interface and fix the related 
> process to support this function.
> 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
> granting leadership at the first time.
> 2. If the job finishes, the job status will be set FINISHED by 
> {{RunningJobsRegistry}} and the status will be deleted before exit. 
> 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader 
> {{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
> other {{JobManagerRunner}} will exit after grants the leadership again.
> 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So 
> if the {{JobManagerRunner}} (the previous or new one) grants leadership 
> again, it will check the job status and enters {{RECONCILING}} state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-01-16 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5501:
-
Description: 
When the {{JobManagerRunner}} grants leadership, it should check whether the 
current job is already running or not. If the job is running, the 
{{JobManager}} should reconcile itself (enter RECONCILING state) and waits for 
the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can 
schedule the {{ExecutionGraph}} in common way.

The {{RunningJobsRegistry}} can provide the way to check the job running 
status, but we should expand the current interface and fix the related process 
to support this function.

1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
granting leadership at the first time.

2. If the job finishes, the job status will be set FINISHED by 
{{RunningJobsRegistry}} and the status will be deleted before exit. 

3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader 
{{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
other {{JobManagerRunner}} will exit after grants the leadership again.

4. If the {{JobManager}} fails, the job status will be still in RUNNING. So if 
the {{JobManagerRunner}} (the previous or new one) grants leadership again, it 
will check the job status and enters {{RECONCILING}} state.

  was:
When the {{JobManagerRunner}} grants leadership, it should check whether the 
current job is already running or not. If the job is running, the 
{{JobManager}} should reconcile itself (enter RECONCILING state) and waits for 
the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can 
schedule the {{ExecutionGraph}} in common way.

The {{RunningJobsRegistry}} can provide the way to check the job running 
status, but we should expand the current interface and fix the related process 
to support this function.

1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
granting leadership at the first time.

2. If the job finishes, the job status will be set FINISHED by 
{{RunningJobsRegistry}} and the status will be deleted before exit. 

3. If the mini cluster starts multi {{JobManagerRunner}}s, and the leader 
{{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
other {{JobManagerRunner}}s will exit after grants the leadership again.

4. If the {{JobManager}} fails, the job status will be still in RUNNING. So if 
the {{JobManagerRunner}} (the previous or new one) grants leadership again, it 
will check the job status and enters {{RECONCILING}} state.


> Determine whether the job starts from last JobManager failure
> -
>
> Key: FLINK-5501
> URL: https://issues.apache.org/jira/browse/FLINK-5501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When the {{JobManagerRunner}} grants leadership, it should check whether the 
> current job is already running or not. If the job is running, the 
> {{JobManager}} should reconcile itself (enter RECONCILING state) and waits 
> for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} 
> can schedule the {{ExecutionGraph}} in common way.
> The {{RunningJobsRegistry}} can provide the way to check the job running 
> status, but we should expand the current interface and fix the related 
> process to support this function.
> 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
> granting leadership at the first time.
> 2. If the job finishes, the job status will be set FINISHED by 
> {{RunningJobsRegistry}} and the status will be deleted before exit. 
> 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader 
> {{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
> other {{JobManagerRunner}} will exit after grants the leadership again.
> 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So 
> if the {{JobManagerRunner}} (the previous or new one) grants leadership 
> again, it will check the job status and enters {{RECONCILING}} state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-01-16 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5501:
-
Description: 
When the {{JobManagerRunner}} grants leadership, it should check whether the 
current job is already running or not. If the job is running, the 
{{JobManager}} should reconcile itself (enter RECONCILING state) and waits for 
the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can 
schedule the {{ExecutionGraph}} in common way.

The {{RunningJobsRegistry}} can provide the way to check the job running 
status, but we should expand the current interface and fix the related process 
to support this function.

1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
granting leadership at the first time.

2. If the job finishes, the job status will be set FINISHED by 
{{RunningJobsRegistry}} and the status will be deleted before exit. 

3. If the mini cluster starts multi {{JobManagerRunner}}s, and the leader 
{{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
other {{JobManagerRunner}}s will exit after grants the leadership again.

4. If the {{JobManager}} fails, the job status will be still in RUNNING. So if 
the {{JobManagerRunner}} (the previous or new one) grants leadership again, it 
will check the job status and enters {{RECONCILING}} state.

  was:
When the {{JobManagerRunner}} grants leadership, it should check whether the 
current job is already running or not. If the job is running, the 
{{JobManager}} should reconcile itself (enter RECONCILING state) and waits for 
the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can 
schedule the {{ExecutionGraph}} in common way.

The {{RunningJobsRegistry}} can provide the way to check the job running 
status, but we should expand the current interface and fix the related process 
to support this function.

1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
granting leadership at the first time.
2. If the job finishes, the job status will be set FINISHED by 
{{RunningJobsRegistry}} and the status will be deleted before exit. 
3. If the {{JobManager}} fails, the job status will be still in RUNNING, so 
when the {{JobManagerRunner}} (the previous or new one) grants leadership 
again, it checks the job status and enters {{RECONCILING}} state.


> Determine whether the job starts from last JobManager failure
> -
>
> Key: FLINK-5501
> URL: https://issues.apache.org/jira/browse/FLINK-5501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When the {{JobManagerRunner}} grants leadership, it should check whether the 
> current job is already running or not. If the job is running, the 
> {{JobManager}} should reconcile itself (enter RECONCILING state) and waits 
> for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} 
> can schedule the {{ExecutionGraph}} in common way.
> The {{RunningJobsRegistry}} can provide the way to check the job running 
> status, but we should expand the current interface and fix the related 
> process to support this function.
> 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
> granting leadership at the first time.
> 2. If the job finishes, the job status will be set FINISHED by 
> {{RunningJobsRegistry}} and the status will be deleted before exit. 
> 3. If the mini cluster starts multi {{JobManagerRunner}}s, and the leader 
> {{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
> other {{JobManagerRunner}}s will exit after grants the leadership again.
> 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So 
> if the {{JobManagerRunner}} (the previous or new one) grants leadership 
> again, it will check the job status and enters {{RECONCILING}} state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-01-16 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5501:


Assignee: Zhijiang Wang

> Determine whether the job starts from last JobManager failure
> -
>
> Key: FLINK-5501
> URL: https://issues.apache.org/jira/browse/FLINK-5501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When the {{JobManagerRunner}} grants leadership, it should check whether the 
> current job is already running or not. If the job is running, the 
> {{JobManager}} should reconcile itself (enter RECONCILING state) and waits 
> for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} 
> can schedule the {{ExecutionGraph}} in common way.
> The {{RunningJobsRegistry}} can provide the way to check the job running 
> status, but we should expand the current interface and fix the related 
> process to support this function.
> 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
> granting leadership at the first time.
> 2. If the job finishes, the job status will be set FINISHED by 
> {{RunningJobsRegistry}} and the status will be deleted before exit. 
> 3. If the {{JobManager}} fails, the job status will be still in RUNNING, so 
> when the {{JobManagerRunner}} (the previous or new one) grants leadership 
> again, it checks the job status and enters {{RECONCILING}} state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-01-16 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5501:


 Summary: Determine whether the job starts from last JobManager 
failure
 Key: FLINK-5501
 URL: https://issues.apache.org/jira/browse/FLINK-5501
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: Zhijiang Wang


When the {{JobManagerRunner}} grants leadership, it should check whether the 
current job is already running or not. If the job is running, the 
{{JobManager}} should reconcile itself (enter RECONCILING state) and waits for 
the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can 
schedule the {{ExecutionGraph}} in common way.

The {{RunningJobsRegistry}} can provide the way to check the job running 
status, but we should expand the current interface and fix the related process 
to support this function.

1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
granting leadership at the first time.
2. If the job finishes, the job status will be set FINISHED by 
{{RunningJobsRegistry}} and the status will be deleted before exit. 
3. If the {{JobManager}} fails, the job status will be still in RUNNING, so 
when the {{JobManagerRunner}} (the previous or new one) grants leadership 
again, it checks the job status and enters {{RECONCILING}} state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5499:


Assignee: Zhijiang Wang

> Try to reuse the resource location of prior execution attempt in allocating 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from 
> {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So 
> for task fail over scenario, the new execution attempt may be deployed to 
> different task managers. If setting rockDB as state backend, the performance 
> is better if the data can be restored from local machine. So we try to reuse 
> the {{TaskManagerLocation}} of prior execution attempt when allocating slot 
> from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior 
> executions, the behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5499:
-
Description: Currently when schedule execution to request to allocate slot 
from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. 
So for task fail over scenario, the new execution attempt may be deployed to 
different task managers. If setting rockDB as state backend, the performance is 
better if the data can be restored from local machine. So we try to reuse the 
{{TaskManagerLocation}} of prior execution attempt when allocating slot from 
{{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior executions, 
the behavior is the same with current status.  (was: Currently when schedule 
execution to request to allocate slot from {{SlotPool}}, the 
{{TaskManagerLocation}} parameter is empty collection. So for task fail over 
scenario, the new execution attempt may be deployed to different task managers. 
If setting rockDB as state backend, the performance is better if the data can 
be restored from local machine. So we try to reuse the TaskManagerLocation of 
prior execution attempt when allocating slot from SlotPool. If the 
TaskManagerLocation is empty from prior executions, the behavior is the same 
with current status.)

> Try to reuse the resource location of prior execution attempt in allocating 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from 
> {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So 
> for task fail over scenario, the new execution attempt may be deployed to 
> different task managers. If setting rockDB as state backend, the performance 
> is better if the data can be restored from local machine. So we try to reuse 
> the {{TaskManagerLocation}} of prior execution attempt when allocating slot 
> from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior 
> executions, the behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5499:
-
Description: Currently when schedule execution to request to allocate slot 
from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. 
So for task fail over scenario, the new execution attempt may be deployed to 
different task managers. If setting rockDB as state backend, the performance is 
better if the data can be restored from local machine. So we try to reuse the 
TaskManagerLocation of prior execution attempt when allocating slot from 
SlotPool. If the TaskManagerLocation is empty from prior executions, the 
behavior is the same with current status.  (was: Currently when schedule 
execution to request to allocate slot from SlotPool, the TaskManagerLocation 
parameter is empty collection. So for task fail over scenario, the new 
execution attempt may be deployed to different task managers. If setting rockDB 
as state backend, the performance is better if the data can be restored from 
local machine. So we try to reuse the TaskManagerLocation of prior execution 
attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty 
from prior executions, the behavior is the same with current status.)

> Try to reuse the resource location of prior execution attempt in allocating 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from 
> {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So 
> for task fail over scenario, the new execution attempt may be deployed to 
> different task managers. If setting rockDB as state backend, the performance 
> is better if the data can be restored from local machine. So we try to reuse 
> the TaskManagerLocation of prior execution attempt when allocating slot from 
> SlotPool. If the TaskManagerLocation is empty from prior executions, the 
> behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5499:
-
Summary: Try to reuse the resource location of prior execution attempt in 
allocating slot  (was: Try to reuse the resource location of prior execution 
attempt when allocate slot)

> Try to reuse the resource location of prior execution attempt in allocating 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from SlotPool, 
> the TaskManagerLocation parameter is empty collection. So for task fail over 
> scenario, the new execution attempt may be deployed to different task 
> managers. If setting rockDB as state backend, the performance is better if 
> the data can be restored from local machine. So we try to reuse the 
> TaskManagerLocation of prior execution attempt when allocating slot from 
> SlotPool. If the TaskManagerLocation is empty from prior executions, the 
> behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt when allocate slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5499:
-
Summary: Try to reuse the resource location of prior execution attempt when 
allocate slot  (was: Try to reuse the resource location of previous execution 
attempt when allocate slot)

> Try to reuse the resource location of prior execution attempt when allocate 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from SlotPool, 
> the TaskManagerLocation parameter is empty collection. So for task fail over 
> scenario, the new execution attempt may be deployed to different task 
> managers. If setting rockDB as state backend, the performance is better if 
> the data can be restored from local machine. So we try to reuse the 
> TaskManagerLocation of prior execution attempt when allocating slot from 
> SlotPool. If the TaskManagerLocation is empty from prior executions, the 
> behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5499) Try to reuse the resource location of previous execution attempt when allocate slot

2017-01-15 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5499:


 Summary: Try to reuse the resource location of previous execution 
attempt when allocate slot
 Key: FLINK-5499
 URL: https://issues.apache.org/jira/browse/FLINK-5499
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: Zhijiang Wang


Currently when schedule execution to request to allocate slot from SlotPool, 
the TaskManagerLocation parameter is empty collection. So for task fail over 
scenario, the new execution attempt may be deployed to different task managers. 
If setting rockDB as state backend, the performance is better if the data can 
be restored from local machine. So we try to reuse the TaskManagerLocation of 
prior execution attempt when allocating slot from SlotPool. If the 
TaskManagerLocation is empty from prior executions, the behavior is the same 
with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-30 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5138:


Assignee: Zhijiang Wang

> The StateBackend provides the method to estimate memory usage based on hint 
> of state size in ResourceSpec
> -
>
> Key: FLINK-5138
> URL: https://issues.apache.org/jira/browse/FLINK-5138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, DataSet API, DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Users may specify the state size in *setResource* API, then the different 
> *StateBackend* implementation should roughly estimate the different kinds of 
> memory usages based on the state size. This part of estimate memory will be 
> aggregated with other memories in *ResourceSpec*. There are two advantages to 
> do this:
>   - For RocksDB backend, the proper memory setting for RocksDB can get better 
> performance in  read and write.
>   - This estimate memory will be considered when requesting resource for 
> container, so the total memory usage will not exceed the container limit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5138:
-
Description: 
Users may specify the state size in *setResource* API, then the different 
*StateBackend* implementation should roughly estimate the different kinds of 
memory usages based on the state size. This part of estimate memory will be 
aggregated with other memories in *ResourceSpec*. There are two advantages to 
do this:
  - For RocksDB backend, the proper memory setting for RocksDB can get better 
performance in  read and write.
  - This estimate memory will be considered when requesting resource for 
container, so the total memory usage will not exceed the container limit.

> The StateBackend provides the method to estimate memory usage based on hint 
> of state size in ResourceSpec
> -
>
> Key: FLINK-5138
> URL: https://issues.apache.org/jira/browse/FLINK-5138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, DataSet API, DataStream API
>Reporter: Zhijiang Wang
>
> Users may specify the state size in *setResource* API, then the different 
> *StateBackend* implementation should roughly estimate the different kinds of 
> memory usages based on the state size. This part of estimate memory will be 
> aggregated with other memories in *ResourceSpec*. There are two advantages to 
> do this:
>   - For RocksDB backend, the proper memory setting for RocksDB can get better 
> performance in  read and write.
>   - This estimate memory will be considered when requesting resource for 
> container, so the total memory usage will not exceed the container limit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5138:
-
Component/s: Core

> The StateBackend provides the method to estimate memory usage based on hint 
> of state size in ResourceSpec
> -
>
> Key: FLINK-5138
> URL: https://issues.apache.org/jira/browse/FLINK-5138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, DataSet API, DataStream API
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5138:


 Summary: The StateBackend provides the method to estimate memory 
usage based on hint of state size in ResourceSpec
 Key: FLINK-5138
 URL: https://issues.apache.org/jira/browse/FLINK-5138
 Project: Flink
  Issue Type: Sub-task
  Components: DataSet API, DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - The *StateBackend* should provide the method to estimate the memory usage 
based on hint of state size in *ResourceSpec*.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - The *StateBackend* should provide the method to estimate the memory usage 
> based on hint of state size in *ResourceSpec*.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspond with 
> *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework m

[jira] [Assigned] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5134:


Assignee: Zhijiang Wang

> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. 
> And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
> formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5133:


Assignee: Zhijiang Wang

> Add new setResource API for DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5136:


Assignee: Zhijiang Wang

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> When *ResourceManager* requests container resource from cluster, the 
> framework memory should be considered together with slot resource.
> Currently the framework memory is mainly used by *MemoryManager* and 
> *NetworkBufferPool* in *TaskManager*, and this memory can be got from 
> configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5135:


Assignee: Zhijiang Wang

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* 
> before submitting tasks. Currently the *ResourceProfile* only contains cpu 
> cores and memory properties. The memory should be expanded to different types 
> such as heap memory, direct memory and native memory which corresponds with 
> memory in *ResourceSpec*.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5132:


Assignee: Zhijiang Wang

> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5137:


Assignee: Zhijiang Wang

> Config JVM options and set onto the TaskManager container
> -
>
> Key: FLINK-5137
> URL: https://issues.apache.org/jira/browse/FLINK-5137
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager, TaskManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The users can config JVM options in job level. And the *ResourceManager* will 
> set related JVM options for launching *TaskManager* container based on 
> framework memory and slot resource profile.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5137:
-
Description: The users can config JVM options in job level. And the 
*ResourceManager* will set related JVM options for launching *TaskManager* 
container based on framework memory and slot resource profile.

> Config JVM options and set onto the TaskManager container
> -
>
> Key: FLINK-5137
> URL: https://issues.apache.org/jira/browse/FLINK-5137
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager, TaskManager
>Reporter: Zhijiang Wang
>
> The users can config JVM options in job level. And the *ResourceManager* will 
> set related JVM options for launching *TaskManager* container based on 
> framework memory and slot resource profile.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5137) Config JVM options and set onto the TaskManager container

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5137:


 Summary: Config JVM options and set onto the TaskManager container
 Key: FLINK-5137
 URL: https://issues.apache.org/jira/browse/FLINK-5137
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager, TaskManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Description: 
When *ResourceManager* requests container resource from cluster, the framework 
memory should be considered together with slot resource.
Currently the framework memory is mainly used by *MemoryManager* and 
*NetworkBufferPool* in *TaskManager*, and this memory can be got from 
configuration.

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>
> When *ResourceManager* requests container resource from cluster, the 
> framework memory should be considered together with slot resource.
> Currently the framework memory is mainly used by *MemoryManager* and 
> *NetworkBufferPool* in *TaskManager*, and this memory can be got from 
> configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Summary: Aggregation of slot ResourceProfile and framework memory by 
ResourceManager  (was: Aggregation of slot ResourceProfile and framework memory 
for requesting resource from cluster by ResourceManager)

> Aggregation of slot ResourceProfile and framework memory by ResourceManager
> ---
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5136:
-
Summary: Aggregation of slot ResourceProfile and framework memory for 
requesting resource from cluster by ResourceManager  (was: ResourceManager 
should consider both slot resource profile and framework memory for requesting 
resource from cluster)

> Aggregation of slot ResourceProfile and framework memory for requesting 
> resource from cluster by ResourceManager
> 
>
> Key: FLINK-5136
> URL: https://issues.apache.org/jira/browse/FLINK-5136
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5136) ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5136:


 Summary: ResourceManager should consider both slot resource 
profile and framework memory for requesting resource from cluster
 Key: FLINK-5136
 URL: https://issues.apache.org/jira/browse/FLINK-5136
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5135:
-
Description: The *JobManager* requests slot by *ResourceProfile* from 
*ResourceManager* before submitting tasks. Currently the *ResourceProfile* only 
contains cpu cores and memory properties. The memory should be expanded to 
different types such as heap memory, direct memory and native memory which 
corresponds with memory in *ResourceSpec*.

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* 
> before submitting tasks. Currently the *ResourceProfile* only contains cpu 
> cores and memory properties. The memory should be expanded to different types 
> such as heap memory, direct memory and native memory which corresponds with 
> memory in *ResourceSpec*.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5135:
-
Summary: ResourceProfile for slot request should be expanded to correspond 
with ResourceSpec  (was: ResourceProfile should be expanded to correspond with 
ResourceSpec)

> ResourceProfile for slot request should be expanded to correspond with 
> ResourceSpec
> ---
>
> Key: FLINK-5135
> URL: https://issues.apache.org/jira/browse/FLINK-5135
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, ResourceManager
>Reporter: Zhijiang Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspond with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspond with 
> *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with container

[jira] [Created] (FLINK-5135) ResourceProfile should be expanded to correspond with ResourceSpec

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5135:


 Summary: ResourceProfile should be expanded to correspond with 
ResourceSpec
 Key: FLINK-5135
 URL: https://issues.apache.org/jira/browse/FLINK-5135
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: 
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. 
And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
formula for chained operators.

  was:
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
is the maximum formula for chained operators.


> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. 
> And for cpu cores resource in *JobVertex*, the aggregation is the maximum 
> formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: 
In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
operators.
The resource of *JobVertex* should be aggregation of individual resource in 
chained operators.
For memory resource in *JobVertex*, the aggregation is the sum formula for 
chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
is the maximum formula for chained operators.

  was:In DataStream API, the *ResourceSpec* is setted onto the internal 
transformation


> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In *JobGraph* generation, each *JobVertex* corresponds to a series of chained 
> operators.
> The resource of *JobVertex* should be aggregation of individual resource in 
> chained operators.
> For memory resource in *JobVertex*, the aggregation is the sum formula for 
> chained operators. And for cpu cores resource in *JobVertex*, the aggregation 
> is the maximum formula for chained operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In job graph generation, the *ResourceSpec* will be aggregated  for chained 
operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In job graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached wit

[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Summary: Aggregate ResourceSpe for chained operators when generating job 
graph  (was: Aggregate ResourceSpe for chained operators when generating stream 
graph)

> Aggregate ResourceSpe for chained operators when generating job graph
> -
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In DataStream API, the *ResourceSpec* is setted onto the internal 
> transformation



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: In DataStream API, the *ResourceSpec* is setted onto the 
internal transformation  (was: In datastream API,)

> Aggregate ResourceSpe for chained operators when generating stream graph
> 
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In DataStream API, the *ResourceSpec* is setted onto the internal 
> transformation



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5134:
-
Description: In datastream API,

> Aggregate ResourceSpe for chained operators when generating stream graph
> 
>
> Key: FLINK-5134
> URL: https://issues.apache.org/jira/browse/FLINK-5134
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Zhijiang Wang
>
> In datastream API,



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5134:


 Summary: Aggregate ResourceSpe for chained operators when 
generating stream graph
 Key: FLINK-5134
 URL: https://issues.apache.org/jira/browse/FLINK-5134
 Project: Flink
  Issue Type: Sub-task
  Components: DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5133:
-
Description: 
This is part of the fine-grained resource configuration.

For *DataStream*, the *setResource* API will be setted onto 
*SingleOutputStreamOperator* similar with other existing properties like 
parallelism, name, etc.
For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
similar way.

There are two parameters described with minimum *ResourceSpec* and maximum 
*ResourceSpec* separately in the API for considering resource resize in future 
improvements.

> Add new setResource API for DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5133) Add new setResource API for DataStream and DataSet

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5133:


 Summary: Add new setResource API for DataStream and DataSet
 Key: FLINK-5133
 URL: https://issues.apache.org/jira/browse/FLINK-5133
 Project: Flink
  Issue Type: Sub-task
  Components: DataSet API, DataStream API
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

This feature will be implemented directly into flip-6 branch.

  was:
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.


> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In stream graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with container should be 
> supported and could also be conf

[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5132:
-
Description: 
This is part of the fine-grained resource configuration.
The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.


> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5132:
-
Description: 
This is part of the fine-grained resource configuration.

The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.


  was:
This is part of the fine-grained resource configuration.
The current resource factors include cpu cores, heap memory, direct memory, 
native memory and state size.
The *ResourceSpec* will provide some basic constructions for grouping different 
resource factors as needed and the construction can also be expanded easily for 
further requirements.



> Introduce the ResourceSpec for grouping different resource factors in API
> -
>
> Key: FLINK-5132
> URL: https://issues.apache.org/jira/browse/FLINK-5132
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> The current resource factors include cpu cores, heap memory, direct memory, 
> native memory and state size.
> The *ResourceSpec* will provide some basic constructions for grouping 
> different resource factors as needed and the construction can also be 
> expanded easily for further requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5132:


 Summary: Introduce the ResourceSpec for grouping different 
resource factors in API
 Key: FLINK-5132
 URL: https://issues.apache.org/jira/browse/FLINK-5132
 Project: Flink
  Issue Type: Sub-task
  Components: Core
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5131:
-
Description: 
Normally the UDF just creates short-life small objects and these can be 
recycled quickly by JVM, so most of the memory resource is controlled and 
managed by *TaskManager* framework.  But for some special cases, the UDF may 
consume much resource to create long-live big objects, so it is necessary to 
provide the options for professional users to define the resource usages if 
needed.

The basic approach is the following:
  - Introduce the *ResourceSpec* structure to describe the different resource 
factors (cpu cores, heap memory, direct memory, native memory, etc) and provide 
some basic construction methods for resource group.
  - The *ResourceSpec* can be setted onto the internal transformation in 
DataStream and base operator in DataSet separately.
  - In stream graph generation, the *ResourceSpec* will be aggregated  for 
chained operators.
  - When *JobManager* requests slot for submitting task from *ResourceManager*, 
the *ResourceProfile* will be expanded  to correspondence with *ResourceSpec*.
  - The *ResourceManager* requests resource for container from cluster, it 
should consider extra framework memory except for slot *ResourceProfile*.
  - The framework memory is mainly used by *NetworkBufferPool* and 
*MemoryManager* in *TaskManager*, and it can be configured in job level.
  - Apart from resource, The JVM options attached with container should be 
supported and could also be configured in job level.

> Fine-grained Resource Configuration
> ---
>
> Key: FLINK-5131
> URL: https://issues.apache.org/jira/browse/FLINK-5131
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API, DataStream API, JobManager, ResourceManager
>Reporter: Zhijiang Wang
>
> Normally the UDF just creates short-life small objects and these can be 
> recycled quickly by JVM, so most of the memory resource is controlled and 
> managed by *TaskManager* framework.  But for some special cases, the UDF may 
> consume much resource to create long-live big objects, so it is necessary to 
> provide the options for professional users to define the resource usages if 
> needed.
> The basic approach is the following:
>   - Introduce the *ResourceSpec* structure to describe the different resource 
> factors (cpu cores, heap memory, direct memory, native memory, etc) and 
> provide some basic construction methods for resource group.
>   - The *ResourceSpec* can be setted onto the internal transformation in 
> DataStream and base operator in DataSet separately.
>   - In stream graph generation, the *ResourceSpec* will be aggregated  for 
> chained operators.
>   - When *JobManager* requests slot for submitting task from 
> *ResourceManager*, the *ResourceProfile* will be expanded  to correspondence 
> with *ResourceSpec*.
>   - The *ResourceManager* requests resource for container from cluster, it 
> should consider extra framework memory except for slot *ResourceProfile*.
>   - The framework memory is mainly used by *NetworkBufferPool* and 
> *MemoryManager* in *TaskManager*, and it can be configured in job level.
>   - Apart from resource, The JVM options attached with container should be 
> supported and could also be configured in job level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5131) Fine-grained Resource Configuration

2016-11-22 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5131:


 Summary: Fine-grained Resource Configuration
 Key: FLINK-5131
 URL: https://issues.apache.org/jira/browse/FLINK-5131
 Project: Flink
  Issue Type: New Feature
  Components: DataSet API, DataStream API, JobManager, ResourceManager
Reporter: Zhijiang Wang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2016-11-09 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650576#comment-15650576
 ] 

Zhijiang Wang commented on FLINK-4364:
--

Hi [~till.rohrmann], for the heartbeat interaction between TM and JM, the 
process is almost the same with RM as we discussed before.
There will be another separate {{HeartbeatManagerImpl}} and 
{{HeartbeatListener}} in TM used for JM heartbeat.
Also TM will monitor the {{HeartbeatTarget}} when registration at new JM 
successfully by HA mechanism.

There are two issues to be confirmed:
1. If TM detects JM as dead by heartbeat timeout, TM should not release all the 
tasks and slots which belong to that JM. TM should do nothing when notified of 
heartbeat timeout. It will re-register the new JM by HA and offer the related 
slots if possible. It is related with JM failure recovery process. If JM 
detects TM as dead by heartbeat timeout, it will release all the related slots 
with that TM and request from RM again.
2. For payload informations, currently I am not sure which informations need to 
be reported by heartbeat. The JM may need {{SlotPool}} to be consistent with 
{{SlotOffer}}, and it also concerns about other processes. So I think we can 
deliver payload as null in current implementation and just make the monitor 
function effect. Later we can expand the payload information as needed.

Do you thinks the above points are feasible? Then I will work on it this week.

> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2016-11-07 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646297#comment-15646297
 ] 

Zhijiang Wang commented on FLINK-4354:
--

Actually the current {{HeartbeatManagerImpl}} can work and be used directly in 
TM component. The only issue is that the {{HeartbeatManagerImpl}} implements 
the {{HeartbeatTarget}} interface and it will be only used in testing. TM will 
wraps a new {{HeartbeatTarget}} based on {{ResourceID}} and {{Gateway}} of RM, 
so TM will never call the methods of {{sendHeartbeat}} and {{requestHeartbeat}} 
in {{HeartbeatManagerImpl}}. 
In other words, {{HeartbeatManagerImpl}} has the role of {{HeartbeatTarget}}, 
but this role is redundant for TM and RM, because TM and RM will create the 
separate {{HeartbeatTarget}} in registration process. If 
{{HeartbeatManagerImpl}} does not implement {{HeartbeatTarget}}, it seems pure 
to be reuse by different components (TM,RM,JM) I think. 
Maybe my understanding is not very accurate, currently I can reuse 
{{HeartbeatManagerImpl}} in TM to realize the process.


>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2016-11-07 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643637#comment-15643637
 ] 

Zhijiang Wang edited comment on FLINK-4354 at 11/7/16 9:46 AM:
---

Hi [~till.rohrmann], for heartbeat interaction between TM  and RM,  I have some 
issues need to be confirmed with you before implementation.
1. The current {{HeartbeatManagerImpl}}  implements both {{HeartbeatManager}} 
and {{HeartbeatTarget}} interfaces in order to test easily. I think we need 
another HeartbeatManagerImpl that just implements {{HeartbeatManager}} 
interface so can be used directly in different components. And every component 
can implement the separate {{HeartbeatTarget}}.
2. For TM component,  the {{HeartbeatManagerImpl}} can be constructed in 
{{TaskManagerRunner}} (maybe not put in {{TaskManagerServices}}) and passed 
into {{TaskExecutor}}.
3. The TM will create the {{HeartbeatListener}} and start the 
{{HeartbeatManagerImpl}}.
4. When RM leader changes, the TM registers the new RM. If the registration 
successes, {{TaskExecutorRegistrationSuccess}} should contain {{ResourceID}} of 
RM, so the TM can create the {{HeartbeatTarget}} and monitor it based on 
{{ResourceID}} and Gateway of RM.
5. For RM, when receive registration from TM, it will create the 
{{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of TM.
RM will schedule a heartbeat request to all the monitored TMs.
6. {{TaskExecutorGateway}} should define the requestHeartbeat RPC method, and 
{{ResourceManagerGateway}} should define the sendHeartbeat RPC method.

Do you think the above processes are feasible?  I wish your professional 
advices and them begin to implement this week.


was (Author: zjwang):
Hi [~till.rohrmann], for heartbeat interaction between {{TM}}  and {{RM}},  I 
have some issues need to be confirmed with you before implementation.
1. The current {{HeartbeatManagerImpl}}  implements both {{HeartbeatManager}} 
and {{HeartbeatTarget}} interfaces in order to test easily. I think we need 
another HeartbeatManagerImpl that just implements {{HeartbeatManager}} 
interface so can be used directly in different components. And every component 
can implement the separate {{HeartbeatTarget}}.
2. For {{TM}} component,  the {{HeartbeatManagerImpl}} can be constructed in 
{{TaskManagerRunner}} (maybe not put in {{TaskManagerServices}}) and passed 
into {{TaskExecutor}}.
3. The {{TM}} will create the {{HeartbeatListener}} and start the 
{{HeartbeatManagerImpl}}.
4. When {{RM}} leader changes, the {{TM}} registers the new {{RM}}. If the 
registration successes, {{TaskExecutorRegistrationSuccess}} should contain 
{{ResourceID}} of {{RM}}, so the {{TM}} can create the {{HeartbeatTarget}} and 
monitor it based on {{ResourceID}} and Gateway of RM.
5. For {{RM}}, when receive registration from {{TM}}, it will create the 
{{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of 
{{TM}}.
{{RM}} will schedule a heartbeat request to all the monitored TMs.
6. {{TaskExecutorGateway}} should define the requestHeartbeat RPC method, and 
{{ResourceManagerGateway}} should define the sendHeartbeat RPC method.

Do you think the above processes are feasible?  I wish your professional 
advices and them begin to implement this week.

>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2016-11-07 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643637#comment-15643637
 ] 

Zhijiang Wang edited comment on FLINK-4354 at 11/7/16 9:44 AM:
---

Hi [~till.rohrmann], for heartbeat interaction between {{TM}}  and {{RM}},  I 
have some issues need to be confirmed with you before implementation.
1. The current {{HeartbeatManagerImpl}}  implements both {{HeartbeatManager}} 
and {{HeartbeatTarget}} interfaces in order to test easily. I think we need 
another HeartbeatManagerImpl that just implements {{HeartbeatManager}} 
interface so can be used directly in different components. And every component 
can implement the separate {{HeartbeatTarget}}.
2. For {{TM}} component,  the {{HeartbeatManagerImpl}} can be constructed in 
{{TaskManagerRunner}} (maybe not put in {{TaskManagerServices}}) and passed 
into {{TaskExecutor}}.
3. The {{TM}} will create the {{HeartbeatListener}} and start the 
{{HeartbeatManagerImpl}}.
4. When {{RM}} leader changes, the {{TM}} registers the new {{RM}}. If the 
registration successes, {{TaskExecutorRegistrationSuccess}} should contain 
{{ResourceID}} of {{RM}}, so the {{TM}} can create the {{HeartbeatTarget}} and 
monitor it based on {{ResourceID}} and Gateway of RM.
5. For {{RM}}, when receive registration from {{TM}}, it will create the 
{{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of 
{{TM}}.
{{RM}} will schedule a heartbeat request to all the monitored TMs.
6. {{TaskExecutorGateway}} should define the requestHeartbeat RPC method, and 
{{ResourceManagerGateway}} should define the sendHeartbeat RPC method.

Do you think the above processes are feasible?  I wish your professional 
advices and them begin to implement this week.


was (Author: zjwang):
Hi [~till.rohrmann], for heartbeat interaction between {{TM}}  and {{RM}},  I 
have some issues need to be confirmed with you before implementation.
1. The current {{HeartbeatManagerImpl}}  implements both {{HeartbeatManager}} 
and {{HeartbeatTarget}} interfaces in order to test easily. I think we need 
another HeartbeatManagerImpl that just implements HeartbeatManager interface so 
can be used directly in different components. And every component can implement 
the separate HeartbeatTarget.
2. For TM component,  the HeartbeatManagerImpl can be constructed in 
TaskManagerRunner (maybe not put in TaskManagerServices) and passed into 
TaskExecutor.
3. The TM will create the HeartbeatListener and start the HeartbeatManagerImpl.
4. When RM leader changes, the TM registers the new RM. If the registration 
successes, TaskExecutorRegistrationSuccess should contain ResourceID of RM, so 
the TM can create the HeartbeatTarget and monitor it based on ResourceID and 
Gateway of RM.
5. For RM, when receive registration from TM, it will create the 
HeartbeatTarget and monitor it based on ResourceID and Gateway of TM.
RM will schedule a heartbeat request to all the monitored TMs.
6. TaskExecutorGateway should define the requestHeartbeat RPC method, and 
ResourceManagerGateway should define the sendHeartbeat RPC method.

Do you think the above processes are feasible?  I wish your professional 
advices and them begin to implement this week.

>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2016-11-07 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643637#comment-15643637
 ] 

Zhijiang Wang edited comment on FLINK-4354 at 11/7/16 9:41 AM:
---

Hi [~till.rohrmann], for heartbeat interaction between {{TM}}  and {{RM}},  I 
have some issues need to be confirmed with you before implementation.
1. The current {{HeartbeatManagerImpl}}  implements both {{HeartbeatManager}} 
and {{HeartbeatTarget}} interfaces in order to test easily. I think we need 
another HeartbeatManagerImpl that just implements HeartbeatManager interface so 
can be used directly in different components. And every component can implement 
the separate HeartbeatTarget.
2. For TM component,  the HeartbeatManagerImpl can be constructed in 
TaskManagerRunner (maybe not put in TaskManagerServices) and passed into 
TaskExecutor.
3. The TM will create the HeartbeatListener and start the HeartbeatManagerImpl.
4. When RM leader changes, the TM registers the new RM. If the registration 
successes, TaskExecutorRegistrationSuccess should contain ResourceID of RM, so 
the TM can create the HeartbeatTarget and monitor it based on ResourceID and 
Gateway of RM.
5. For RM, when receive registration from TM, it will create the 
HeartbeatTarget and monitor it based on ResourceID and Gateway of TM.
RM will schedule a heartbeat request to all the monitored TMs.
6. TaskExecutorGateway should define the requestHeartbeat RPC method, and 
ResourceManagerGateway should define the sendHeartbeat RPC method.

Do you think the above processes are feasible?  I wish your professional 
advices and them begin to implement this week.


was (Author: zjwang):
Hi [~till.rohrmann], for heartbeat interaction between TM and RM,  I have some 
issues need to be confirmed with you before implementation.
1. The current HeartbeatManagerImpl implements both HeartbeatManager and 
HeartbeatTarget interfaces in order to test easily. I think we need another 
HeartbeatManagerImpl that just implements HeartbeatManager interface so can be 
used directly in different components. And every component can implement the 
separate HeartbeatTarget.
2. For TM component,  the HeartbeatManagerImpl can be constructed in 
TaskManagerRunner (maybe not put in TaskManagerServices) and passed into 
TaskExecutor.
3. The TM will create the HeartbeatListener and start the HeartbeatManagerImpl.
4. When RM leader changes, the TM registers the new RM. If the registration 
successes, TaskExecutorRegistrationSuccess should contain ResourceID of RM, so 
the TM can create the HeartbeatTarget and monitor it based on ResourceID and 
Gateway of RM.
5. For RM, when receive registration from TM, it will create the 
HeartbeatTarget and monitor it based on ResourceID and Gateway of TM.
RM will schedule a heartbeat request to all the monitored TMs.
6. TaskExecutorGateway should define the requestHeartbeat RPC method, and 
ResourceManagerGateway should define the sendHeartbeat RPC method.

Do you think the above processes are feasible?  I wish your professional 
advices and them begin to implement this week.

>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2016-11-07 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643637#comment-15643637
 ] 

Zhijiang Wang commented on FLINK-4354:
--

Hi [~till.rohrmann], for heartbeat interaction between TM and RM,  I have some 
issues need to be confirmed with you before implementation.
1. The current HeartbeatManagerImpl implements both HeartbeatManager and 
HeartbeatTarget interfaces in order to test easily. I think we need another 
HeartbeatManagerImpl that just implements HeartbeatManager interface so can be 
used directly in different components. And every component can implement the 
separate HeartbeatTarget.
2. For TM component,  the HeartbeatManagerImpl can be constructed in 
TaskManagerRunner (maybe not put in TaskManagerServices) and passed into 
TaskExecutor.
3. The TM will create the HeartbeatListener and start the HeartbeatManagerImpl.
4. When RM leader changes, the TM registers the new RM. If the registration 
successes, TaskExecutorRegistrationSuccess should contain ResourceID of RM, so 
the TM can create the HeartbeatTarget and monitor it based on ResourceID and 
Gateway of RM.
5. For RM, when receive registration from TM, it will create the 
HeartbeatTarget and monitor it based on ResourceID and Gateway of TM.
RM will schedule a heartbeat request to all the monitored TMs.
6. TaskExecutorGateway should define the requestHeartbeat RPC method, and 
ResourceManagerGateway should define the sendHeartbeat RPC method.

Do you think the above processes are feasible?  I wish your professional 
advices and them begin to implement this week.

>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation

2016-10-27 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15611247#comment-15611247
 ] 

Zhijiang Wang commented on FLINK-4911:
--

Yeah, it does "at-least once". 

BTW, I confirmed with Till that the heartbeat manager module has finished and 
the previous blocked jiras can be resumed. Some are assigned to me before 
related with heartbeat interaction between TM, JM and RM. I will work on them 
in the following days and then consider the JM failure issue, and I think the 
payload informations reported in the heartbeat messages maybe reused in JM 
failure scenario.

> Non-disruptive JobManager Failures via Reconciliation 
> --
>
> Key: FLINK-4911
> URL: https://issues.apache.org/jira/browse/FLINK-4911
> Project: Flink
>  Issue Type: New Feature
>  Components: JobManager, TaskManager
>Reporter: Stephan Ewen
>Assignee: Zhijiang Wang
>
> JobManager failures can be handled in a non-disruptive way - by *reconciling* 
> the new JobManager leader and the TaskManagers.
> I suggest to use this term (reconcile)  - it has been uses also by other 
> frameworks (like Mesos) for non-disruptive handling of failures.
> The basic approach is the following:
>   - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to 
> reconnect to the JobManager
>   - On connect, the TaskManager tells the JobManager about its currently 
> running tasks
>   - A new JobManager waits for TaskManagers to connect and report a task 
> status. It re-constructs the ExecutionGraph state from these reports
>   - Tasks whose status was not reconstructed in a certain time are assumed 
> failed and trigger regular task recovery.
> To avoid having to re-implement this for the new JobManager / TaskManager 
> approach in *flip-6*, I suggest to directly implement this into the 
> {{flip-6}} feature branch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation

2016-10-27 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-4911:
-
Component/s: TaskManager

> Non-disruptive JobManager Failures via Reconciliation 
> --
>
> Key: FLINK-4911
> URL: https://issues.apache.org/jira/browse/FLINK-4911
> Project: Flink
>  Issue Type: New Feature
>  Components: JobManager, TaskManager
>Reporter: Stephan Ewen
>Assignee: Zhijiang Wang
>
> JobManager failures can be handled in a non-disruptive way - by *reconciling* 
> the new JobManager leader and the TaskManagers.
> I suggest to use this term (reconcile)  - it has been uses also by other 
> frameworks (like Mesos) for non-disruptive handling of failures.
> The basic approach is the following:
>   - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to 
> reconnect to the JobManager
>   - On connect, the TaskManager tells the JobManager about its currently 
> running tasks
>   - A new JobManager waits for TaskManagers to connect and report a task 
> status. It re-constructs the ExecutionGraph state from these reports
>   - Tasks whose status was not reconstructed in a certain time are assumed 
> failed and trigger regular task recovery.
> To avoid having to re-implement this for the new JobManager / TaskManager 
> approach in *flip-6*, I suggest to directly implement this into the 
> {{flip-6}} feature branch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation

2016-10-26 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-4911:


Assignee: Zhijiang Wang

> Non-disruptive JobManager Failures via Reconciliation 
> --
>
> Key: FLINK-4911
> URL: https://issues.apache.org/jira/browse/FLINK-4911
> Project: Flink
>  Issue Type: New Feature
>  Components: JobManager
>Reporter: Stephan Ewen
>Assignee: Zhijiang Wang
>
> JobManager failures can be handled in a non-disruptive way - by *reconciling* 
> the new JobManager leader and the TaskManagers.
> I suggest to use this term (reconcile)  - it has been uses also by other 
> frameworks (like Mesos) for non-disruptive handling of failures.
> The basic approach is the following:
>   - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to 
> reconnect to the JobManager
>   - On connect, the TaskManager tells the JobManager about its currently 
> running tasks
>   - A new JobManager waits for TaskManagers to connect and report a task 
> status. It re-constructs the ExecutionGraph state from these reports
>   - Tasks whose status was not reconstructed in a certain time are assumed 
> failed and trigger regular task recovery.
> To avoid having to re-implement this for the new JobManager / TaskManager 
> approach in *flip-6*, I suggest to directly implement this into the 
> {{flip-6}} feature branch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation

2016-10-25 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15607630#comment-15607630
 ] 

Zhijiang Wang commented on FLINK-4911:
--

Hi Stephan, the sub-tasks 2 and 3 are the same, do you mind I remove the 
sub-task 3?

> Non-disruptive JobManager Failures via Reconciliation 
> --
>
> Key: FLINK-4911
> URL: https://issues.apache.org/jira/browse/FLINK-4911
> Project: Flink
>  Issue Type: New Feature
>  Components: JobManager
>Reporter: Stephan Ewen
>
> JobManager failures can be handled in a non-disruptive way - by *reconciling* 
> the new JobManager leader and the TaskManagers.
> I suggest to use this term (reconcile)  - it has been uses also by other 
> frameworks (like Mesos) for non-disruptive handling of failures.
> The basic approach is the following:
>   - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to 
> reconnect to the JobManager
>   - On connect, the TaskManager tells the JobManager about its currently 
> running tasks
>   - A new JobManager waits for TaskManagers to connect and report a task 
> status. It re-constructs the ExecutionGraph state from these reports
>   - Tasks whose status was not reconstructed in a certain time are assumed 
> failed and trigger regular task recovery.
> To avoid having to re-implement this for the new JobManager / TaskManager 
> approach in *flip-6*, I suggest to directly implement this into the 
> {{flip-6}} feature branch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation

2016-10-25 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15607336#comment-15607336
 ] 

Zhijiang Wang commented on FLINK-4911:
--

FLINK-3539 tries to realize eventual consistency between JM and TMs in regular 
running. 
In JM failure scenario, the new JM wants to recover from the initial state 
based on all running tasks. 
Maybe we can refer to the running task informations from heartbeat messages and 
apply these infos for registration process.

> Non-disruptive JobManager Failures via Reconciliation 
> --
>
> Key: FLINK-4911
> URL: https://issues.apache.org/jira/browse/FLINK-4911
> Project: Flink
>  Issue Type: New Feature
>  Components: JobManager
>Reporter: Stephan Ewen
>
> JobManager failures can be handled in a non-disruptive way - by *reconciling* 
> the new JobManager leader and the TaskManagers.
> I suggest to use this term (reconcile)  - it has been uses also by other 
> frameworks (like Mesos) for non-disruptive handling of failures.
> The basic approach is the following:
>   - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to 
> reconnect to the JobManager
>   - On connect, the TaskManager tells the JobManager about its currently 
> running tasks
>   - A new JobManager waits for TaskManagers to connect and report a task 
> status. It re-constructs the ExecutionGraph state from these reports
>   - Tasks whose status was not reconstructed in a certain time are assumed 
> failed and trigger regular task recovery.
> To avoid having to re-implement this for the new JobManager / TaskManager 
> approach in *flip-6*, I suggest to directly implement this into the 
> {{flip-6}} feature branch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4915) Define parameters for reconciliation

2016-10-25 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-4915:


Assignee: Zhijiang Wang

> Define parameters for reconciliation
> 
>
> Key: FLINK-4915
> URL: https://issues.apache.org/jira/browse/FLINK-4915
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Zhijiang Wang
>
> For reconciliation between TaskManagers and JobManager, we need to define the 
> following parameters:
>   - Duration that a TaskManager keeps a Task running when looking for a new 
> JobManager. After that duration is over, the Task is canceled/failed.
>   - Duration that the JobManager waits for TaskManagers to report that a task 
> is still running.
> We should also try and determine some sane default values for those 
> parameters. A suggestion could be {{1 min}} waiting time on the TaskManager 
> side and {{20 secs}} waiting time on the JobManager side.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4912) Introduce RECONCILING state in ExecutionGraph

2016-10-25 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-4912:


Assignee: Zhijiang Wang

> Introduce RECONCILING state in ExecutionGraph
> -
>
> Key: FLINK-4912
> URL: https://issues.apache.org/jira/browse/FLINK-4912
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Zhijiang Wang
>
> This is part of the non-disruptive JobManager failure recovery.
> I suggest to add a JobStatus and ExecutionState {{RECONCILING}}.
> If a job is started on a that JobManager for master recovery (tbd how to 
> determine that) the {{ExecutionGraph}} and the {{Execution}}s start in the 
> reconciling state.
> From {{RECONCILING}}, tasks can go to {{RUNNING}} (execution reconciled with 
> TaskManager) or to {{FAILED}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4914) Define parameters for reconciliation

2016-10-25 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-4914:


Assignee: Zhijiang Wang

> Define parameters for reconciliation
> 
>
> Key: FLINK-4914
> URL: https://issues.apache.org/jira/browse/FLINK-4914
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Stephan Ewen
>Assignee: Zhijiang Wang
>
> For reconciliation between TaskManagers and JobManager, we need to define the 
> following parameters:
>   - Duration that a TaskManager keeps a Task running when looking for a new 
> JobManager. After that duration is over, the Task is canceled/failed.
>   - Duration that the JobManager waits for TaskManagers to report that a task 
> is still running.
> We should also try and determine some sane default values for those 
> parameters. A suggestion could be {{1 min}} waiting time on the TaskManager 
> side and {{20 secs}} waiting time on the JobManager side.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-09-29 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534797#comment-15534797
 ] 

Zhijiang Wang edited comment on FLINK-4715 at 9/30/16 2:31 AM:
---

Yes, we already experienced this problem in real production many times,  
because the user code can not be controlled. If the thread is waiting for 
synchronized lock or other cases, it can not be cancelled. We take the way that 
if the job master cancel the task failed many times, the job master will let 
the task manager exit itself.


was (Author: zjwang):
Yes, we already experienced this problem in real production many times,  
because the user code can not be controlled. If the thread is waiting for 
synchronized lock or other cases, it can not be cancelled, and the job master 
cancel the task failed many times, the job master will let the task manager 
exit itself.

> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-09-29 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534797#comment-15534797
 ] 

Zhijiang Wang commented on FLINK-4715:
--

Yes, we already experienced this problem in real production many times,  
because the user code can not be controlled. If the thread is waiting for 
synchronized lock or other cases, it can not be cancelled, and the job master 
cancel the task failed many times, the job master will let the task manager 
exit itself.

> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-28 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang closed FLINK-4505.


> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskManagerRunner}} to construct related components 
> ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
> {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
> standalone mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-09 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-4505:
-
Description: Implement {{TaskManagerRunner}} to construct related 
components ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
{{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
standalone mode.  (was: Implement {{TaskExecutorFactory}} that should be an 
abstract class with the helper methods to bring up the {{TaskManager}}. The 
factory can be implemented by some classes to start a {{TaskManager}} in 
different modes (testing, standalone, yarn).)

> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskManagerRunner}} to construct related components 
> ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, 
> {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or 
> standalone mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager

2016-09-09 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-4505:
-
Summary: Implement TaskManagerRunner to construct related components for 
TaskManager  (was: Implement TaskManagerFactory to bring up TaskManager for 
different modes)

> Implement TaskManagerRunner to construct related components for TaskManager
> ---
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-26 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438745#comment-15438745
 ] 

Zhijiang Wang commented on FLINK-4363:
--

Hi [~mxm], I have created the FLINK-4505 for TaskExecutorFactory issue. As you 
mentioned, it should be an abstract class and provide an abstract method maybe 
called 'createAndStartTaskExecutor()'. There may be at least three different 
specific factories(testing, yarn, standalone) extend TaskExecutorFactory to 
implement the method 'createAndStartTaskExecutor'. The parameters in 
constructor of specific factories are different based on different modes. For 
example: for StandaloneTaskExecutorFactory, the constructor parameter should be 
(Configuration configuration,ResourceID resourceID,
RpcService rpcService,String taskManagerHostname,HighAvailabilityServices 
haServices,boolean localTaskManagerCommunication), and in the 
'createAndStartTaskExecutor()' method it can invoke 
‘startTaskManagerComponentsAndActor' method in {{TaskExecutor}} to bring up 
{{TaskExecutor}}. Do you have any other advices, then I can start this subtask 
later.
https://issues.apache.org/jira/browse/FLINK-4505

> Implement TaskManager basic startup of all components in java
> -
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-08-26 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-4505:
-
Summary: Implement TaskManagerFactory to bring up TaskManager for different 
modes  (was: Implement abstract TaskManagerFactory to bring up TaskManager for 
different modes)

> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-08-26 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-4505:


Assignee: Zhijiang Wang

> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4505) Implement abstract TaskManagerFactory to bring up TaskManager for different modes

2016-08-26 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-4505:


 Summary: Implement abstract TaskManagerFactory to bring up 
TaskManager for different modes
 Key: FLINK-4505
 URL: https://issues.apache.org/jira/browse/FLINK-4505
 Project: Flink
  Issue Type: Sub-task
  Components: Cluster Management
Reporter: Zhijiang Wang
Priority: Minor


Implement {{TaskExecutorFactory}} that should be an abstract class with the 
helper methods to bring up the {{TaskManager}}. The factory can be implemented 
by some classes to start a {{TaskManager}} in different modes (testing, 
standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4489) Implement TaskManager's SlotManager

2016-08-26 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438621#comment-15438621
 ] 

Zhijiang Wang commented on FLINK-4489:
--

Does it  refer to the slot manager in {{ResourceManager}}  or no relevance with 
it?

> Implement TaskManager's SlotManager
> ---
>
> Key: FLINK-4489
> URL: https://issues.apache.org/jira/browse/FLINK-4489
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>
> The {{SlotManager}} is responsible for managing the available slots on the 
> {{TaskManager}}. This basically means to maintain the mapping between slots 
> and the owning {{JobManagers}} and to offer tasks which run in the slots 
> access to the owning {{JobManagers}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2016-08-26 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438609#comment-15438609
 ] 

Zhijiang Wang commented on FLINK-4354:
--

Yes, so it would be started after flink-4478 done.

>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4424) Make network environment start-up/shutdown independent of JobManager association

2016-08-24 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434680#comment-15434680
 ] 

Zhijiang Wang commented on FLINK-4424:
--

The previous way of {{NetworkEnvironment}} is like lazy start. Whether to start 
the {{ConnectionManager}} relies on establishing RPC communication  between 
{{TaskManager}} and {{JobManager}} because the connection info of upstream is 
notified to downstream by {{JobMaster}}. Actually we can pre-start the 
{{ConnectionManager}} after construct the {{NetworkEnvironment}} component to 
make the registration process more cleaner, not mixed with starting 
{{ConnectionManager}}. 
@Till, I can first extract the start related process of {{ConnectionManager}} 
and put in constructor of {{NetworkEnvironment}}. And the shutdown process may 
rely on the new TaskExecutor progress, and it may be fixed later. Do you think 
so?

> Make network environment start-up/shutdown independent of JobManager 
> association
> 
>
> Key: FLINK-4424
> URL: https://issues.apache.org/jira/browse/FLINK-4424
> Project: Flink
>  Issue Type: Improvement
>  Components: Network, TaskManager
>Reporter: Till Rohrmann
>
> Currently, the {{TaskManager}} starts the netty network server only after it 
> has registered with a {{JobManager}}. Upon loss of connection to the 
> {{JobManager}} the {{NetworkEnvironment}} is closed.
> The start-up and shutdown of the network server should be independent of the 
> {{JobManager}} connection, especially if we assume that a TM can be 
> associated with multiple JobManagers in the future (FLIP-6).
> Starting the network server only once when the {{TaskManager}} is started has 
> the advantage that we don't have to preconfigure the {{TaskManager's}} data 
> port. Furthermore we don't risk to get stuck when disassociating from a 
> {{JobManager}} because the start-up and shutdown of a {{NetworkEnvironment}} 
> can cause problems (because it has to reserve/free resources).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-19 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427951#comment-15427951
 ] 

Zhijiang Wang commented on FLINK-4363:
--

Thank you Till, I got your meaning. I will follow the approaches of current 
TaskManager, maybe just translate from scala to java for the first step.  So it 
is feasible to test functions based on new TaskExecutor construction for RPC 
abstraction. The only concern is that the TaskExecutor can not work correctly 
for the previous process because of LeaderRetrievalService and TaskManagerActor 
replaced by HighAvailableService and RPCService. I will submit the pull request 
on Monday if possible.



> Implement TaskManager basic startup of all components in java
> -
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-17 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-4363:


Assignee: Zhijiang Wang

> Implement TaskManager basic startup of all components in java
> -
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection

2016-08-17 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424183#comment-15424183
 ] 

Zhijiang Wang commented on FLINK-4021:
--

got it, i will modify the return and add the test to PR branch this week. 

> Problem of setting autoread for netty channel when more tasks sharing the 
> same Tcp connection
> -
>
> Key: FLINK-4021
> URL: https://issues.apache.org/jira/browse/FLINK-4021
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> More than one task sharing the same Tcp connection for shuffling data.
> If the downstream task said as "A" has no available memory segment to read 
> netty buffer from network, it will set autoread as false for the channel.
> When the task A is failed or has available segments again, the netty handler 
> will be notified to process the staging buffers first, then reset autoread as 
> true. But in some scenarios, the autoread will not be set as true any more.
> That is when processing staging buffers, first find the corresponding input 
> channel for the buffer, if the task for that input channel is failed, the 
> decodeMsg method in PartitionRequestClientHandler will return false, that 
> means setting autoread as true will not be done anymore.
> In summary,  if one task "A" sets the autoread as false because of no 
> available segments, and resulting in some staging buffers. If another task 
> "B" is failed by accident corresponding to one staging buffer. When task A 
> trys to reset autoread as true, the process can not work because of task B 
> failed.
> I have fixed this problem in our application by adding one boolean parameter 
> in decodeBufferOrEvent method to distinguish whether this method is invoke by 
> netty IO thread channel read or staged message handler task in 
> PartitionRequestClientHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-17 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424056#comment-15424056
 ] 

Zhijiang Wang commented on FLINK-4363:
--

For yarn and standalone mode, the entrance methods for TaskManger are  
'selectNetworkInterfaceAndRunTaskManager' and 
'startTaskManagerComponentsAndActor' separately. 
The new TaskExecutor should follow the same method as TaskManager or define the 
new uniform method for both modes?
Another main difference is actor related initialization replaced by RpcService.
Should this subtask be started right now or wait for other related subtasks?

> Implement TaskManager basic startup of all components in java
> -
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4355) Implement TaskManager side of registration at ResourceManager

2016-08-17 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-4355:


Assignee: Stephan Ewen  (was: Zhijiang Wang)

> Implement TaskManager side of registration at ResourceManager
> -
>
> Key: FLINK-4355
> URL: https://issues.apache.org/jira/browse/FLINK-4355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Stephan Ewen
>
> If the {{TaskManager}} is unregistered, it should try and register at the 
> {{ResourceManager}} leader. The registration messages are fenced via the 
> {{RmLeaderID}}.
> The ResourceManager may acknowledge the registration (or respond that the 
> TaskManager is AlreadyRegistered) or refuse the registration.
> Upon registration refusal, the TaskManager may have to kill itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection

2016-08-16 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423994#comment-15423994
 ] 

Zhijiang Wang commented on FLINK-4021:
--

Yeah, you are right. It is no problem in current flink failover mode. Maybe it 
is suitable for FLIP1 
'https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures'.
And we already improved the task failures by just restarting the source and 
failed ones, so noticed this issue.
I forgot to add the testing for the function, thank you for confirming it.
And what other jobs should I do next?

> Problem of setting autoread for netty channel when more tasks sharing the 
> same Tcp connection
> -
>
> Key: FLINK-4021
> URL: https://issues.apache.org/jira/browse/FLINK-4021
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> More than one task sharing the same Tcp connection for shuffling data.
> If the downstream task said as "A" has no available memory segment to read 
> netty buffer from network, it will set autoread as false for the channel.
> When the task A is failed or has available segments again, the netty handler 
> will be notified to process the staging buffers first, then reset autoread as 
> true. But in some scenarios, the autoread will not be set as true any more.
> That is when processing staging buffers, first find the corresponding input 
> channel for the buffer, if the task for that input channel is failed, the 
> decodeMsg method in PartitionRequestClientHandler will return false, that 
> means setting autoread as true will not be done anymore.
> In summary,  if one task "A" sets the autoread as false because of no 
> available segments, and resulting in some staging buffers. If another task 
> "B" is failed by accident corresponding to one staging buffer. When task A 
> trys to reset autoread as true, the process can not work because of task B 
> failed.
> I have fixed this problem in our application by adding one boolean parameter 
> in decodeBufferOrEvent method to distinguish whether this method is invoke by 
> netty IO thread channel read or staged message handler task in 
> PartitionRequestClientHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection

2016-08-16 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423984#comment-15423984
 ] 

Zhijiang Wang commented on FLINK-4021:
--

Thank you for advice, it is indeed simple by 'return isStagedBuffer' directly. 

> Problem of setting autoread for netty channel when more tasks sharing the 
> same Tcp connection
> -
>
> Key: FLINK-4021
> URL: https://issues.apache.org/jira/browse/FLINK-4021
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> More than one task sharing the same Tcp connection for shuffling data.
> If the downstream task said as "A" has no available memory segment to read 
> netty buffer from network, it will set autoread as false for the channel.
> When the task A is failed or has available segments again, the netty handler 
> will be notified to process the staging buffers first, then reset autoread as 
> true. But in some scenarios, the autoread will not be set as true any more.
> That is when processing staging buffers, first find the corresponding input 
> channel for the buffer, if the task for that input channel is failed, the 
> decodeMsg method in PartitionRequestClientHandler will return false, that 
> means setting autoread as true will not be done anymore.
> In summary,  if one task "A" sets the autoread as false because of no 
> available segments, and resulting in some staging buffers. If another task 
> "B" is failed by accident corresponding to one staging buffer. When task A 
> trys to reset autoread as true, the process can not work because of task B 
> failed.
> I have fixed this problem in our application by adding one boolean parameter 
> in decodeBufferOrEvent method to distinguish whether this method is invoke by 
> netty IO thread channel read or staged message handler task in 
> PartitionRequestClientHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2016-08-10 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-4364:
-
Description: The {{JobManager}} initiates heartbeat messages via (JobID, 
JmLeaderID), and the {{TaskManager}} will report metrics info for each 
heartbeat.  (was: JM initiates heartbeat with info (JobID, JobManagerId), and 
TM responses to JM with metrics info.)

> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4357) Implement TaskManager side of slot allocation from ResourceManager

2016-08-10 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-4357:
-
Description: 
The {{ResourceManager}} may tell the {{TaskManager}} to give a slot to a 
specific {{JobManager}}. 
The slot allocation messages are fenced via ({{RmLeaderID}}, {{JobID}}, 
{{AllocationID}}, {{ResourceID}}). 
The TM will ack the request ({{RmLeaderID}},{{AllocationID}}) to the RM and 
then offer that slot to the JM. If not accepted by JM, the TM notifies the RM 
that the slot is in fact available.

  was:
The {{ResourceManager}} may tell the {{TaskManager}} to give a slot to a 
specific {{JobManager}}. 
The slot allocation messages are fenced via ({{RmLeaderID}}, {{JobID}}, 
{{AllocationID}}, {{ResourceID}}, {{slotID}}). 
The TM will ack the request ({{RmLeaderID}},{{AllocationID}}) to the RM and 
then offer that slot to the JM. If not accepted by JM, the TM notifies the RM 
that the slot is in fact available.


> Implement TaskManager side of slot allocation from ResourceManager
> --
>
> Key: FLINK-4357
> URL: https://issues.apache.org/jira/browse/FLINK-4357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} may tell the {{TaskManager}} to give a slot to a 
> specific {{JobManager}}. 
> The slot allocation messages are fenced via ({{RmLeaderID}}, {{JobID}}, 
> {{AllocationID}}, {{ResourceID}}). 
> The TM will ack the request ({{RmLeaderID}},{{AllocationID}}) to the RM and 
> then offer that slot to the JM. If not accepted by JM, the TM notifies the RM 
> that the slot is in fact available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >