[jira] [Created] (FLINK-5830) OutOfMemoryError during notify final state in TaskExecutor may cause job stuck
Zhijiang Wang created FLINK-5830: Summary: OutOfMemoryError during notify final state in TaskExecutor may cause job stuck Key: FLINK-5830 URL: https://issues.apache.org/jira/browse/FLINK-5830 Project: Flink Issue Type: Bug Reporter: Zhijiang Wang The scenario is like this: {{JobMaster}} tries to cancel all the executions when process failed execution, and the task executor already acknowledge the cancel rpc message. When notify the final state in {{TaskExecutor}}, it causes OOM in {{AkkaRpcActor}} and this error is caught to log the info. The final state will not be sent any more. The {{JobMaster}} can not receive the final state and trigger the restart strategy. One solution is to catch the {{OutOfMemoryError}} and throw it, then it will cause to shut down the {{ActorSystem}} resulting in exiting the {{TaskExecutor}}. The {{JobMaster}} can be notified of {{TaskExecutor}} failure and fail all the tasks to trigger restart successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5799) Let RpcService.scheduleRunnable return ScheduledFuture
[ https://issues.apache.org/jira/browse/FLINK-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867126#comment-15867126 ] Zhijiang Wang commented on FLINK-5799: -- This fix is useful and the {{HeartbeatManagerSenderImpl}} can get benefit from it. After this is done, it is better for {{HeartbeatManagerSenderImpl}} to use the RpcService.scheduleRunnable to schedule heartbeat by interval time. > Let RpcService.scheduleRunnable return ScheduledFuture > -- > > Key: FLINK-5799 > URL: https://issues.apache.org/jira/browse/FLINK-5799 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.3.0 > > > Currently, the method {{RpcService.scheduleRunnable}} does not return a > control instance for the scheduled runnable. I think it would be good to > return a {{ScheduledFuture}} with which one can cancel the scheduled runnable > after it has been scheduled, e.g. a timeout registration which became > obsolete. This API is also more in line with the {{ScheduledExecutorService}} > where one also receives a {{ScheduledFuture}} after scheduling a runnable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5799) Let RpcService.scheduleRunnable return ScheduledFuture
[ https://issues.apache.org/jira/browse/FLINK-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867126#comment-15867126 ] Zhijiang Wang edited comment on FLINK-5799 at 2/15/17 2:56 AM: --- This fix is useful and the {{HeartbeatManagerSenderImpl}} can get benefit from it. After this is done, it is better for {{HeartbeatManagerSenderImpl}} to use the {{RpcService.scheduleRunnable}} to schedule heartbeat by interval time. was (Author: zjwang): This fix is useful and the {{HeartbeatManagerSenderImpl}} can get benefit from it. After this is done, it is better for {{HeartbeatManagerSenderImpl}} to use the RpcService.scheduleRunnable to schedule heartbeat by interval time. > Let RpcService.scheduleRunnable return ScheduledFuture > -- > > Key: FLINK-5799 > URL: https://issues.apache.org/jira/browse/FLINK-5799 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.3.0 > > > Currently, the method {{RpcService.scheduleRunnable}} does not return a > control instance for the scheduled runnable. I think it would be good to > return a {{ScheduledFuture}} with which one can cancel the scheduled runnable > after it has been scheduled, e.g. a timeout registration which became > obsolete. This API is also more in line with the {{ScheduledExecutorService}} > where one also receives a {{ScheduledFuture}} after scheduling a runnable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5798) Let the RPCService provide a ScheduledExecutorService
[ https://issues.apache.org/jira/browse/FLINK-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867116#comment-15867116 ] Zhijiang Wang commented on FLINK-5798: -- That is good, and the heartbeat manager component can reuse the ScheduledExecutorService in RPCService now. > Let the RPCService provide a ScheduledExecutorService > - > > Key: FLINK-5798 > URL: https://issues.apache.org/jira/browse/FLINK-5798 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.3.0 > > > Currently the {{RPCService}} interface provides a {{scheduleRunnable}} method > to schedule {{Runnables}}. I would like to generalize this functionality by > letting the {{RPCService}} provide a {{ScheduledExecutorService}} to the > user. That way other components which require such an executor service could > simply use the one provided by the {{RPCService}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5133) Support to set resource for operator in DataStream and DataSet
[ https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5133: - Summary: Support to set resource for operator in DataStream and DataSet (was: Add new setResource API for DataStream and DataSet) > Support to set resource for operator in DataStream and DataSet > -- > > Key: FLINK-5133 > URL: https://issues.apache.org/jira/browse/FLINK-5133 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > This is part of the fine-grained resource configuration. > For *DataStream*, the *setResource* API will be setted onto > *SingleOutputStreamOperator* similar with other existing properties like > parallelism, name, etc. > For *DataSet*, the *setResource* API will be setted onto *Operator* in the > similar way. > There are two parameters described with minimum *ResourceSpec* and maximum > *ResourceSpec* separately in the API for considering resource resize in > future improvements. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5501) Determine whether the job starts from last JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5501: Assignee: shuai.xu (was: Zhijiang Wang) > Determine whether the job starts from last JobManager failure > - > > Key: FLINK-5501 > URL: https://issues.apache.org/jira/browse/FLINK-5501 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Zhijiang Wang >Assignee: shuai.xu > > When the {{JobManagerRunner}} grants leadership, it should check whether the > current job is already running or not. If the job is running, the > {{JobManager}} should reconcile itself (enter RECONCILING state) and waits > for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} > can schedule the {{ExecutionGraph}} in common way. > The {{RunningJobsRegistry}} can provide the way to check the job running > status, but we should expand the current interface and fix the related > process to support this function. > 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} > granting leadership at the first time. > 2. If the job finishes, the job status will be set FINISHED by > {{RunningJobsRegistry}} and the status will be deleted before exit. > 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader > {{JobManagerRunner}} already finishes the job to set the job status FINISHED, > other {{JobManagerRunner}} will exit after grants the leadership again. > 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So > if the {{JobManagerRunner}} (the previous or new one) grants leadership > again, it will check the job status and enters {{RECONCILING}} state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
[ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5703: Assignee: Zhijiang Wang (was: shuai.xu) > ExecutionGraph recovery based on reconciliation with TaskManager reports > > > Key: FLINK-5703 > URL: https://issues.apache.org/jira/browse/FLINK-5703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The ExecutionGraph structure would be recovered from TaskManager reports > during reconciling period, and the necessary information includes: > - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, > ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer > Execution) > - ExecutionVertex: Map IntermediateResultPartition> > - ExecutionGraph: ConcurrentHashMap > For {{RECONCILING}} ExecutionState, it should be transition into any existing > task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the > TaskManger should maintain the terminal task state > ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this > mechanism in another jira. In addition, the state transition would trigger > different actions, and some actions rely on above necessary information. > Considering this limit, the recovery process will be divided into two steps: > - First, recovery all other necessary information except ExecutionState. > - Second, transition ExecutionState into real task state and trigger > actions. The behavior is the same with current {{UpdateTaskExecutorState}}. > To make logic easy and consistency, during recovery period, all the other RPC > messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) > from TaskManager should be refused temporarily and responded with a special > message by JobMaster. Then the TaskManager should retry to send these > messages later until JobManager ends recovery and acknowledgement. > For {{RECONCILING}} JobStatus, it would be transition into one of the states > ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. > - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within > duration time and all the tasks are in {{RUNNING}} states. > - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report > in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} > - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within > duration time and all the tasks are in {{FINISHED}} states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
[ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5703: Assignee: (was: Zhijiang Wang) > ExecutionGraph recovery based on reconciliation with TaskManager reports > > > Key: FLINK-5703 > URL: https://issues.apache.org/jira/browse/FLINK-5703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Zhijiang Wang > > The ExecutionGraph structure would be recovered from TaskManager reports > during reconciling period, and the necessary information includes: > - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, > ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer > Execution) > - ExecutionVertex: Map IntermediateResultPartition> > - ExecutionGraph: ConcurrentHashMap > For {{RECONCILING}} ExecutionState, it should be transition into any existing > task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the > TaskManger should maintain the terminal task state > ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this > mechanism in another jira. In addition, the state transition would trigger > different actions, and some actions rely on above necessary information. > Considering this limit, the recovery process will be divided into two steps: > - First, recovery all other necessary information except ExecutionState. > - Second, transition ExecutionState into real task state and trigger > actions. The behavior is the same with current {{UpdateTaskExecutorState}}. > To make logic easy and consistency, during recovery period, all the other RPC > messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) > from TaskManager should be refused temporarily and responded with a special > message by JobMaster. Then the TaskManager should retry to send these > messages later until JobManager ends recovery and acknowledgement. > For {{RECONCILING}} JobStatus, it would be transition into one of the states > ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. > - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within > duration time and all the tasks are in {{RUNNING}} states. > - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report > in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} > - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within > duration time and all the tasks are in {{FINISHED}} states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
[ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5703: Assignee: shuai.xu > ExecutionGraph recovery based on reconciliation with TaskManager reports > > > Key: FLINK-5703 > URL: https://issues.apache.org/jira/browse/FLINK-5703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Zhijiang Wang >Assignee: shuai.xu > > The ExecutionGraph structure would be recovered from TaskManager reports > during reconciling period, and the necessary information includes: > - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, > ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer > Execution) > - ExecutionVertex: Map IntermediateResultPartition> > - ExecutionGraph: ConcurrentHashMap > For {{RECONCILING}} ExecutionState, it should be transition into any existing > task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the > TaskManger should maintain the terminal task state > ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this > mechanism in another jira. In addition, the state transition would trigger > different actions, and some actions rely on above necessary information. > Considering this limit, the recovery process will be divided into two steps: > - First, recovery all other necessary information except ExecutionState. > - Second, transition ExecutionState into real task state and trigger > actions. The behavior is the same with current {{UpdateTaskExecutorState}}. > To make logic easy and consistency, during recovery period, all the other RPC > messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) > from TaskManager should be refused temporarily and responded with a special > message by JobMaster. Then the TaskManager should retry to send these > messages later until JobManager ends recovery and acknowledgement. > For {{RECONCILING}} JobStatus, it would be transition into one of the states > ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. > - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within > duration time and all the tasks are in {{RUNNING}} states. > - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report > in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} > - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within > duration time and all the tasks are in {{FINISHED}} states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860543#comment-15860543 ] Zhijiang Wang commented on FLINK-5132: -- Thank you, [~StephanEwen]. Yeah, it also makes sense to check valid values in the constructors, currently it provides the {{isValid}} method and will be called when setting to the operator. > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > Fix For: 1.3.0 > > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
[ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5703: - Description: The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes: - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer Execution) - ExecutionVertex: Map - ExecutionGraph: ConcurrentHashMap For {{RECONCILING}} ExecutionState, it should be transition into any existing task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps: - First, recovery all other necessary information except ExecutionState. - Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with current {{UpdateTaskExecutorState}}. To make logic easy and consistency, during recovery period, all the other RPC messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement. For {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time and all the tasks are in {{RUNNING}} states. - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time and all the tasks are in {{FINISHED}} states. was: The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes: - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer Execution) - ExecutionVertex: Map - ExecutionGraph: ConcurrentHashMap For {{RECONCILING}} ExecutionState, it should be transition into any existing task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps: - First, recovery all other necessary information except ExecutionState. - Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with current {{UpdateTaskExecutorState}}. To make logic easy and consistency, during recovery period, all the other RPC messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement. The {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time and all the tasks are in {{RUNNING}} states. - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time and all the tasks are in {{FINISHED}} states. > ExecutionGraph recovery based on reconciliation with TaskManager reports > > > Key: FLINK-5703 > URL: https://issues.apache.org/jira/browse/FLINK-5703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The ExecutionGraph structure would be recovered from TaskManager reports > during reconciling period, and the necessary information includes: > - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, > ExecutionState, SimpleSlot, PartialInputChan
[jira] [Updated] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
[ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5703: - Description: The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes: - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer Execution) - ExecutionVertex: Map - ExecutionGraph: ConcurrentHashMap For {{RECONCILING}} ExecutionState, it should be transition into any existing task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps: - First, recovery all other necessary information except ExecutionState. - Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with current {{UpdateTaskExecutorState}}. To make logic easy and consistency, during recovery period, all the other RPC messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement. The {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time and all the tasks are in {{RUNNING}} states. - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time and all the tasks are in {{FINISHED}} states. was: The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes: - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer Execution) - ExecutionVertex: Map - ExecutionGraph: ConcurrentHashMap For {{RECONCILING}} ExecutionState, it should be transition into any existing task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps: - First, recovery all other necessary information except ExecutionState. - Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with {{UpdateTaskExecutorState}}. To make logic easy and consistency, during recovery period, all the other RPC messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement. The {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time and all the tasks are in {{RUNNING}} states. - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time and all the tasks are in {{FINISHED}} states. > ExecutionGraph recovery based on reconciliation with TaskManager reports > > > Key: FLINK-5703 > URL: https://issues.apache.org/jira/browse/FLINK-5703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The ExecutionGraph structure would be recovered from TaskManager reports > during reconciling period, and the necessary information includes: > - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, > ExecutionState, SimpleSlot, PartialInputChannelDeplo
[jira] [Updated] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
[ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5703: - Description: The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes: - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer Execution) - ExecutionVertex: Map - ExecutionGraph: ConcurrentHashMap For {{RECONCILING}} ExecutionState, it should be transition into any existing task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps: - First, recovery all other necessary information except ExecutionState. - Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with {{UpdateTaskExecutorState}}. To make logic easy and consistency, during recovery period, all the other RPC messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement. The {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time and all the tasks are in {{RUNNING}} states. - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time and all the tasks are in {{FINISHED}} states. was: The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes: - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor - ExecutionVertex: Map - ExecutionGraph: ConcurrentHashMap For {{RECONCILING}} ExecutionState, it should be transition into any existing task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps: - First, recovery all other necessary information except ExecutionState. - Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with {{UpdateTaskExecutorState}}. To make logic easy and consistency, during recovery period, all the other RPC messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement. The {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time and all the tasks are in {{RUNNING}} states. - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time and all the tasks are in {{FINISHED}} states. > ExecutionGraph recovery based on reconciliation with TaskManager reports > > > Key: FLINK-5703 > URL: https://issues.apache.org/jira/browse/FLINK-5703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The ExecutionGraph structure would be recovered from TaskManager reports > during reconciling period, and the necessary information includes: > - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, > ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer > E
[jira] [Updated] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
[ https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5703: - Description: The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes: - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor - ExecutionVertex: Map - ExecutionGraph: ConcurrentHashMap For {{RECONCILING}} ExecutionState, it should be transition into any existing task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the TaskManger should maintain the terminal task state ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps: - First, recovery all other necessary information except ExecutionState. - Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with {{UpdateTaskExecutorState}}. To make logic easy and consistency, during recovery period, all the other RPC messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement. The {{RECONCILING}} JobStatus, it would be transition into one of the states ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within duration time and all the tasks are in {{RUNNING}} states. - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within duration time and all the tasks are in {{FINISHED}} states. > ExecutionGraph recovery based on reconciliation with TaskManager reports > > > Key: FLINK-5703 > URL: https://issues.apache.org/jira/browse/FLINK-5703 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The ExecutionGraph structure would be recovered from TaskManager reports > during reconciling period, and the necessary information includes: > - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, > ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor > - ExecutionVertex: Map IntermediateResultPartition> > - ExecutionGraph: ConcurrentHashMap > For {{RECONCILING}} ExecutionState, it should be transition into any existing > task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the > TaskManger should maintain the terminal task state > ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this > mechanism in another jira. In addition, the state transition would trigger > different actions, and some actions rely on above necessary information. > Considering this limit, the recovery process will be divided into two steps: > - First, recovery all other necessary information except ExecutionState. > - Second, transition ExecutionState into real task state and trigger > actions. The behavior is the same with {{UpdateTaskExecutorState}}. > To make logic easy and consistency, during recovery period, all the other RPC > messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) > from TaskManager should be refused temporarily and responded with a special > message by JobMaster. Then the TaskManager should retry to send these > messages later until JobManager ends recovery and acknowledgement. > The {{RECONCILING}} JobStatus, it would be transition into one of the states > ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery. > - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within > duration time and all the tasks are in {{RUNNING}} states. > - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report > in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}} > - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within > duration time and all the tasks are in {{FINISHED}} states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5703) ExecutionGraph recovery based on reconciliation with TaskManager reports
Zhijiang Wang created FLINK-5703: Summary: ExecutionGraph recovery based on reconciliation with TaskManager reports Key: FLINK-5703 URL: https://issues.apache.org/jira/browse/FLINK-5703 Project: Flink Issue Type: Sub-task Reporter: Zhijiang Wang Assignee: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827892#comment-15827892 ] Zhijiang Wang commented on FLINK-4364: -- [~till.rohrmann], I closed the previous pull request for this issue, and re-submitted it based on current master branch. Please review for your free time, then I can continue submitting the following codes based on it. Thank you! > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-5501) Determine whether the job starts from last JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827361#comment-15827361 ] Zhijiang Wang edited comment on FLINK-5501 at 1/18/17 8:42 AM: --- [~StephanEwen]Thank you for the quick response! Yeah, you already considered all the feasible alternatives to implement this goal and I totally agreed with that. 1. For extending the leader election service, I also thought of this way before implementation. For currently {{ZookeeperLeaderElectionService}}, the leader node is EPHEMERAL type, if the incrementing number is carried in this node, it should be changed to PERSISTENT type, otherwise there should add another node for incrementing number. This way is very similar with by {{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be more suitable. But from minimum change aspect, I already implemented that by {{RunningJobsRegistry}}. 2. Actually I did not think of this way before, and it is an total different idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader change and will be re-register the new leader after changed. So the {{JobManager}} can resort to the registration process to determine the status. But it may be complicated to coordinate between common schedule and reconciling, because they will be triggered at the same time. And also it will bring more resource waste temporarily. If the JobManager can determine the status after startup in an easy way, it can do the specific process and no need to do ambiguous thing. In summary, I prefer the first way to implement the goal. And the whole {{JobManager}} failure feature has been finished in my side, could I submit the pull request for this issue based on {{RunningJobsRegistry}} implementation? was (Author: zjwang): Thank you for the quick response! Yeah, you already considered all the feasible alternatives to implement this goal and I totally agreed with that. 1. For extending the leader election service, I also thought of this way before implementation. For currently {{ZookeeperLeaderElectionService}}, the leader node is EPHEMERAL type, if the incrementing number is carried in this node, it should be changed to PERSISTENT type, otherwise there should add another node for incrementing number. This way is very similar with by {{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be more suitable. But from minimum change aspect, I already implemented that by {{RunningJobsRegistry}}. 2. Actually I did not think of this way before, and it is an total different idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader change and will be re-register the new leader after changed. So the {{JobManager}} can resort to the registration process to determine the status. But it may be complicated to coordinate between common schedule and reconciling, because they will be triggered at the same time. And also it will bring more resource waste temporarily. If the JobManager can determine the status after startup in an easy way, it can do the specific process and no need to do ambiguous thing. In summary, I prefer the first way to implement the goal. And the whole {{JobManager}} failure feature has been finished in my side, could I submit the pull request for this issue based on {{RunningJobsRegistry}} implementation? > Determine whether the job starts from last JobManager failure > - > > Key: FLINK-5501 > URL: https://issues.apache.org/jira/browse/FLINK-5501 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When the {{JobManagerRunner}} grants leadership, it should check whether the > current job is already running or not. If the job is running, the > {{JobManager}} should reconcile itself (enter RECONCILING state) and waits > for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} > can schedule the {{ExecutionGraph}} in common way. > The {{RunningJobsRegistry}} can provide the way to check the job running > status, but we should expand the current interface and fix the related > process to support this function. > 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} > granting leadership at the first time. > 2. If the job finishes, the job status will be set FINISHED by > {{RunningJobsRegistry}} and the status will be deleted before exit. > 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader > {{JobManagerRunner}} already finishes the job to set the job status FINISHED, > other {{JobManagerRunner}} will exit after grants the leadership again. > 4. If the {{JobManager}} fails, the job status will be still in
[jira] [Comment Edited] (FLINK-5501) Determine whether the job starts from last JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827361#comment-15827361 ] Zhijiang Wang edited comment on FLINK-5501 at 1/18/17 3:16 AM: --- Thank you for the quick response! Yeah, you already considered all the feasible alternatives to implement this goal and I totally agreed with that. 1. For extending the leader election service, I also thought of this way before implementation. For currently {{ZookeeperLeaderElectionService}}, the leader node is EPHEMERAL type, if the incrementing number is carried in this node, it should be changed to PERSISTENT type, otherwise there should add another node for incrementing number. This way is very similar with by {{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be more suitable. But from minimum change aspect, I already implemented that by {{RunningJobsRegistry}}. 2. Actually I did not think of this way before, and it is an total different idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader change and will be re-register the new leader after changed. So the {{JobManager}} can resort to the registration process to determine the status. But it may be complicated to coordinate between common schedule and reconciling, because they will be triggered at the same time. And also it will bring more resource waste temporarily. If the JobManager can determine the status after startup in an easy way, it can do the specific process and no need to do ambiguous thing. In summary, I prefer the first way to implement the goal. And the whole {{JobManager}} failure feature has been finished in my side, could I submit the pull request for this issue based on {{RunningJobsRegistry}} implementation? was (Author: zjwang): Thank you for the quick response! Yeah, you already considered all the feasible alternatives to implement this goal and I totally agreed with that. 1. For extending the leader election service, I also thought of this way before implementation. For currently {{ZookeeperLeaderElectionService}}, the leader node is EPHEMERAL type, if the incrementing number is carried in this node, it should be changed to PERSISTENT type, otherwise there should add another node for incrementing number. This way is very similar with by {{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be more suitable. But from minimum change aspect, I already implemented that by {{RunningJobsRegistry}}. 2. Actually I did not think of this way before, and it is an total different idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader change and will be re-register the new leader after changed. So the {{JobManager}} can resort to the registration process to determine the status. But it may be complicated to coordinate between common schedule and reconciling, between they will be triggered at the same time. And also it will bring more resource waste temporarily. If the JobManager can determine the status after startup in an easy way, it can do the specific process and no need to do ambiguous thing. In summary, I prefer the first way to implement the goal. And the whole {{JobManager}} failure feature has been finished in my side, could I submit the pull request for this issue based on {{RunningJobsRegistry}} implementation? > Determine whether the job starts from last JobManager failure > - > > Key: FLINK-5501 > URL: https://issues.apache.org/jira/browse/FLINK-5501 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When the {{JobManagerRunner}} grants leadership, it should check whether the > current job is already running or not. If the job is running, the > {{JobManager}} should reconcile itself (enter RECONCILING state) and waits > for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} > can schedule the {{ExecutionGraph}} in common way. > The {{RunningJobsRegistry}} can provide the way to check the job running > status, but we should expand the current interface and fix the related > process to support this function. > 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} > granting leadership at the first time. > 2. If the job finishes, the job status will be set FINISHED by > {{RunningJobsRegistry}} and the status will be deleted before exit. > 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader > {{JobManagerRunner}} already finishes the job to set the job status FINISHED, > other {{JobManagerRunner}} will exit after grants the leadership again. > 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So >
[jira] [Commented] (FLINK-5501) Determine whether the job starts from last JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827361#comment-15827361 ] Zhijiang Wang commented on FLINK-5501: -- Thank you for the quick response! Yeah, you already considered all the feasible alternatives to implement this goal and I totally agreed with that. 1. For extending the leader election service, I also thought of this way before implementation. For currently {{ZookeeperLeaderElectionService}}, the leader node is EPHEMERAL type, if the incrementing number is carried in this node, it should be changed to PERSISTENT type, otherwise there should add another node for incrementing number. This way is very similar with by {{RunningJobsRegistry}}, from semantic aspect, {{LeaderElectionService}} may be more suitable. But from minimum change aspect, I already implemented that by {{RunningJobsRegistry}}. 2. Actually I did not think of this way before, and it is an total different idea and interesting. The {{TaskManager}} is aware of {{JobManager}} leader change and will be re-register the new leader after changed. So the {{JobManager}} can resort to the registration process to determine the status. But it may be complicated to coordinate between common schedule and reconciling, between they will be triggered at the same time. And also it will bring more resource waste temporarily. If the JobManager can determine the status after startup in an easy way, it can do the specific process and no need to do ambiguous thing. In summary, I prefer the first way to implement the goal. And the whole {{JobManager}} failure feature has been finished in my side, could I submit the pull request for this issue based on {{RunningJobsRegistry}} implementation? > Determine whether the job starts from last JobManager failure > - > > Key: FLINK-5501 > URL: https://issues.apache.org/jira/browse/FLINK-5501 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When the {{JobManagerRunner}} grants leadership, it should check whether the > current job is already running or not. If the job is running, the > {{JobManager}} should reconcile itself (enter RECONCILING state) and waits > for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} > can schedule the {{ExecutionGraph}} in common way. > The {{RunningJobsRegistry}} can provide the way to check the job running > status, but we should expand the current interface and fix the related > process to support this function. > 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} > granting leadership at the first time. > 2. If the job finishes, the job status will be set FINISHED by > {{RunningJobsRegistry}} and the status will be deleted before exit. > 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader > {{JobManagerRunner}} already finishes the job to set the job status FINISHED, > other {{JobManagerRunner}} will exit after grants the leadership again. > 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So > if the {{JobManagerRunner}} (the previous or new one) grants leadership > again, it will check the job status and enters {{RECONCILING}} state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5501) Determine whether the job starts from last JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5501: - Description: When the {{JobManagerRunner}} grants leadership, it should check whether the current job is already running or not. If the job is running, the {{JobManager}} should reconcile itself (enter RECONCILING state) and waits for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can schedule the {{ExecutionGraph}} in common way. The {{RunningJobsRegistry}} can provide the way to check the job running status, but we should expand the current interface and fix the related process to support this function. 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} granting leadership at the first time. 2. If the job finishes, the job status will be set FINISHED by {{RunningJobsRegistry}} and the status will be deleted before exit. 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader {{JobManagerRunner}} already finishes the job to set the job status FINISHED, other {{JobManagerRunner}} will exit after grants the leadership again. 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So if the {{JobManagerRunner}} (the previous or new one) grants leadership again, it will check the job status and enters {{RECONCILING}} state. was: When the {{JobManagerRunner}} grants leadership, it should check whether the current job is already running or not. If the job is running, the {{JobManager}} should reconcile itself (enter RECONCILING state) and waits for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can schedule the {{ExecutionGraph}} in common way. The {{RunningJobsRegistry}} can provide the way to check the job running status, but we should expand the current interface and fix the related process to support this function. 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} granting leadership at the first time. 2. If the job finishes, the job status will be set FINISHED by {{RunningJobsRegistry}} and the status will be deleted before exit. 3. If the mini cluster starts multi {{JobManagerRunner}}s, and the leader {{JobManagerRunner}} already finishes the job to set the job status FINISHED, other {{JobManagerRunner}}s will exit after grants the leadership again. 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So if the {{JobManagerRunner}} (the previous or new one) grants leadership again, it will check the job status and enters {{RECONCILING}} state. > Determine whether the job starts from last JobManager failure > - > > Key: FLINK-5501 > URL: https://issues.apache.org/jira/browse/FLINK-5501 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When the {{JobManagerRunner}} grants leadership, it should check whether the > current job is already running or not. If the job is running, the > {{JobManager}} should reconcile itself (enter RECONCILING state) and waits > for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} > can schedule the {{ExecutionGraph}} in common way. > The {{RunningJobsRegistry}} can provide the way to check the job running > status, but we should expand the current interface and fix the related > process to support this function. > 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} > granting leadership at the first time. > 2. If the job finishes, the job status will be set FINISHED by > {{RunningJobsRegistry}} and the status will be deleted before exit. > 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader > {{JobManagerRunner}} already finishes the job to set the job status FINISHED, > other {{JobManagerRunner}} will exit after grants the leadership again. > 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So > if the {{JobManagerRunner}} (the previous or new one) grants leadership > again, it will check the job status and enters {{RECONCILING}} state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5501) Determine whether the job starts from last JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5501: - Description: When the {{JobManagerRunner}} grants leadership, it should check whether the current job is already running or not. If the job is running, the {{JobManager}} should reconcile itself (enter RECONCILING state) and waits for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can schedule the {{ExecutionGraph}} in common way. The {{RunningJobsRegistry}} can provide the way to check the job running status, but we should expand the current interface and fix the related process to support this function. 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} granting leadership at the first time. 2. If the job finishes, the job status will be set FINISHED by {{RunningJobsRegistry}} and the status will be deleted before exit. 3. If the mini cluster starts multi {{JobManagerRunner}}s, and the leader {{JobManagerRunner}} already finishes the job to set the job status FINISHED, other {{JobManagerRunner}}s will exit after grants the leadership again. 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So if the {{JobManagerRunner}} (the previous or new one) grants leadership again, it will check the job status and enters {{RECONCILING}} state. was: When the {{JobManagerRunner}} grants leadership, it should check whether the current job is already running or not. If the job is running, the {{JobManager}} should reconcile itself (enter RECONCILING state) and waits for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can schedule the {{ExecutionGraph}} in common way. The {{RunningJobsRegistry}} can provide the way to check the job running status, but we should expand the current interface and fix the related process to support this function. 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} granting leadership at the first time. 2. If the job finishes, the job status will be set FINISHED by {{RunningJobsRegistry}} and the status will be deleted before exit. 3. If the {{JobManager}} fails, the job status will be still in RUNNING, so when the {{JobManagerRunner}} (the previous or new one) grants leadership again, it checks the job status and enters {{RECONCILING}} state. > Determine whether the job starts from last JobManager failure > - > > Key: FLINK-5501 > URL: https://issues.apache.org/jira/browse/FLINK-5501 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When the {{JobManagerRunner}} grants leadership, it should check whether the > current job is already running or not. If the job is running, the > {{JobManager}} should reconcile itself (enter RECONCILING state) and waits > for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} > can schedule the {{ExecutionGraph}} in common way. > The {{RunningJobsRegistry}} can provide the way to check the job running > status, but we should expand the current interface and fix the related > process to support this function. > 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} > granting leadership at the first time. > 2. If the job finishes, the job status will be set FINISHED by > {{RunningJobsRegistry}} and the status will be deleted before exit. > 3. If the mini cluster starts multi {{JobManagerRunner}}s, and the leader > {{JobManagerRunner}} already finishes the job to set the job status FINISHED, > other {{JobManagerRunner}}s will exit after grants the leadership again. > 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So > if the {{JobManagerRunner}} (the previous or new one) grants leadership > again, it will check the job status and enters {{RECONCILING}} state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5501) Determine whether the job starts from last JobManager failure
[ https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5501: Assignee: Zhijiang Wang > Determine whether the job starts from last JobManager failure > - > > Key: FLINK-5501 > URL: https://issues.apache.org/jira/browse/FLINK-5501 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When the {{JobManagerRunner}} grants leadership, it should check whether the > current job is already running or not. If the job is running, the > {{JobManager}} should reconcile itself (enter RECONCILING state) and waits > for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} > can schedule the {{ExecutionGraph}} in common way. > The {{RunningJobsRegistry}} can provide the way to check the job running > status, but we should expand the current interface and fix the related > process to support this function. > 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} > granting leadership at the first time. > 2. If the job finishes, the job status will be set FINISHED by > {{RunningJobsRegistry}} and the status will be deleted before exit. > 3. If the {{JobManager}} fails, the job status will be still in RUNNING, so > when the {{JobManagerRunner}} (the previous or new one) grants leadership > again, it checks the job status and enters {{RECONCILING}} state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5501) Determine whether the job starts from last JobManager failure
Zhijiang Wang created FLINK-5501: Summary: Determine whether the job starts from last JobManager failure Key: FLINK-5501 URL: https://issues.apache.org/jira/browse/FLINK-5501 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: Zhijiang Wang When the {{JobManagerRunner}} grants leadership, it should check whether the current job is already running or not. If the job is running, the {{JobManager}} should reconcile itself (enter RECONCILING state) and waits for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} can schedule the {{ExecutionGraph}} in common way. The {{RunningJobsRegistry}} can provide the way to check the job running status, but we should expand the current interface and fix the related process to support this function. 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} granting leadership at the first time. 2. If the job finishes, the job status will be set FINISHED by {{RunningJobsRegistry}} and the status will be deleted before exit. 3. If the {{JobManager}} fails, the job status will be still in RUNNING, so when the {{JobManagerRunner}} (the previous or new one) grants leadership again, it checks the job status and enters {{RECONCILING}} state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5499: Assignee: Zhijiang Wang > Try to reuse the resource location of prior execution attempt in allocating > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from > {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So > for task fail over scenario, the new execution attempt may be deployed to > different task managers. If setting rockDB as state backend, the performance > is better if the data can be restored from local machine. So we try to reuse > the {{TaskManagerLocation}} of prior execution attempt when allocating slot > from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior > executions, the behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5499: - Description: Currently when schedule execution to request to allocate slot from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the {{TaskManagerLocation}} of prior execution attempt when allocating slot from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior executions, the behavior is the same with current status. (was: Currently when schedule execution to request to allocate slot from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the TaskManagerLocation of prior execution attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty from prior executions, the behavior is the same with current status.) > Try to reuse the resource location of prior execution attempt in allocating > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from > {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So > for task fail over scenario, the new execution attempt may be deployed to > different task managers. If setting rockDB as state backend, the performance > is better if the data can be restored from local machine. So we try to reuse > the {{TaskManagerLocation}} of prior execution attempt when allocating slot > from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior > executions, the behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5499: - Description: Currently when schedule execution to request to allocate slot from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the TaskManagerLocation of prior execution attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty from prior executions, the behavior is the same with current status. (was: Currently when schedule execution to request to allocate slot from SlotPool, the TaskManagerLocation parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the TaskManagerLocation of prior execution attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty from prior executions, the behavior is the same with current status.) > Try to reuse the resource location of prior execution attempt in allocating > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from > {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So > for task fail over scenario, the new execution attempt may be deployed to > different task managers. If setting rockDB as state backend, the performance > is better if the data can be restored from local machine. So we try to reuse > the TaskManagerLocation of prior execution attempt when allocating slot from > SlotPool. If the TaskManagerLocation is empty from prior executions, the > behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5499: - Summary: Try to reuse the resource location of prior execution attempt in allocating slot (was: Try to reuse the resource location of prior execution attempt when allocate slot) > Try to reuse the resource location of prior execution attempt in allocating > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from SlotPool, > the TaskManagerLocation parameter is empty collection. So for task fail over > scenario, the new execution attempt may be deployed to different task > managers. If setting rockDB as state backend, the performance is better if > the data can be restored from local machine. So we try to reuse the > TaskManagerLocation of prior execution attempt when allocating slot from > SlotPool. If the TaskManagerLocation is empty from prior executions, the > behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt when allocate slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5499: - Summary: Try to reuse the resource location of prior execution attempt when allocate slot (was: Try to reuse the resource location of previous execution attempt when allocate slot) > Try to reuse the resource location of prior execution attempt when allocate > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from SlotPool, > the TaskManagerLocation parameter is empty collection. So for task fail over > scenario, the new execution attempt may be deployed to different task > managers. If setting rockDB as state backend, the performance is better if > the data can be restored from local machine. So we try to reuse the > TaskManagerLocation of prior execution attempt when allocating slot from > SlotPool. If the TaskManagerLocation is empty from prior executions, the > behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5499) Try to reuse the resource location of previous execution attempt when allocate slot
Zhijiang Wang created FLINK-5499: Summary: Try to reuse the resource location of previous execution attempt when allocate slot Key: FLINK-5499 URL: https://issues.apache.org/jira/browse/FLINK-5499 Project: Flink Issue Type: Improvement Components: JobManager Reporter: Zhijiang Wang Currently when schedule execution to request to allocate slot from SlotPool, the TaskManagerLocation parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the TaskManagerLocation of prior execution attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty from prior executions, the behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5138: Assignee: Zhijiang Wang > The StateBackend provides the method to estimate memory usage based on hint > of state size in ResourceSpec > - > > Key: FLINK-5138 > URL: https://issues.apache.org/jira/browse/FLINK-5138 > Project: Flink > Issue Type: Sub-task > Components: Core, DataSet API, DataStream API >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Users may specify the state size in *setResource* API, then the different > *StateBackend* implementation should roughly estimate the different kinds of > memory usages based on the state size. This part of estimate memory will be > aggregated with other memories in *ResourceSpec*. There are two advantages to > do this: > - For RocksDB backend, the proper memory setting for RocksDB can get better > performance in read and write. > - This estimate memory will be considered when requesting resource for > container, so the total memory usage will not exceed the container limit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5138: - Description: Users may specify the state size in *setResource* API, then the different *StateBackend* implementation should roughly estimate the different kinds of memory usages based on the state size. This part of estimate memory will be aggregated with other memories in *ResourceSpec*. There are two advantages to do this: - For RocksDB backend, the proper memory setting for RocksDB can get better performance in read and write. - This estimate memory will be considered when requesting resource for container, so the total memory usage will not exceed the container limit. > The StateBackend provides the method to estimate memory usage based on hint > of state size in ResourceSpec > - > > Key: FLINK-5138 > URL: https://issues.apache.org/jira/browse/FLINK-5138 > Project: Flink > Issue Type: Sub-task > Components: Core, DataSet API, DataStream API >Reporter: Zhijiang Wang > > Users may specify the state size in *setResource* API, then the different > *StateBackend* implementation should roughly estimate the different kinds of > memory usages based on the state size. This part of estimate memory will be > aggregated with other memories in *ResourceSpec*. There are two advantages to > do this: > - For RocksDB backend, the proper memory setting for RocksDB can get better > performance in read and write. > - This estimate memory will be considered when requesting resource for > container, so the total memory usage will not exceed the container limit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5138: - Component/s: Core > The StateBackend provides the method to estimate memory usage based on hint > of state size in ResourceSpec > - > > Key: FLINK-5138 > URL: https://issues.apache.org/jira/browse/FLINK-5138 > Project: Flink > Issue Type: Sub-task > Components: Core, DataSet API, DataStream API >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5138) The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec
Zhijiang Wang created FLINK-5138: Summary: The StateBackend provides the method to estimate memory usage based on hint of state size in ResourceSpec Key: FLINK-5138 URL: https://issues.apache.org/jira/browse/FLINK-5138 Project: Flink Issue Type: Sub-task Components: DataSet API, DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - The *StateBackend* should provide the method to estimate the memory usage based on hint of state size in *ResourceSpec*. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - The *StateBackend* should provide the method to estimate the memory usage > based on hint of state size in *ResourceSpec*. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspond with > *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework m
[jira] [Assigned] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5134: Assignee: Zhijiang Wang > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. > And for cpu cores resource in *JobVertex*, the aggregation is the maximum > formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5133) Add new setResource API for DataStream and DataSet
[ https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5133: Assignee: Zhijiang Wang > Add new setResource API for DataStream and DataSet > -- > > Key: FLINK-5133 > URL: https://issues.apache.org/jira/browse/FLINK-5133 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > This is part of the fine-grained resource configuration. > For *DataStream*, the *setResource* API will be setted onto > *SingleOutputStreamOperator* similar with other existing properties like > parallelism, name, etc. > For *DataSet*, the *setResource* API will be setted onto *Operator* in the > similar way. > There are two parameters described with minimum *ResourceSpec* and maximum > *ResourceSpec* separately in the API for considering resource resize in > future improvements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5136: Assignee: Zhijiang Wang > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > When *ResourceManager* requests container resource from cluster, the > framework memory should be considered together with slot resource. > Currently the framework memory is mainly used by *MemoryManager* and > *NetworkBufferPool* in *TaskManager*, and this memory can be got from > configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5135: Assignee: Zhijiang Wang > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* > before submitting tasks. Currently the *ResourceProfile* only contains cpu > cores and memory properties. The memory should be expanded to different types > such as heap memory, direct memory and native memory which corresponds with > memory in *ResourceSpec*. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5132: Assignee: Zhijiang Wang > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5137) Config JVM options and set onto the TaskManager container
[ https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5137: Assignee: Zhijiang Wang > Config JVM options and set onto the TaskManager container > - > > Key: FLINK-5137 > URL: https://issues.apache.org/jira/browse/FLINK-5137 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager, TaskManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The users can config JVM options in job level. And the *ResourceManager* will > set related JVM options for launching *TaskManager* container based on > framework memory and slot resource profile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5137) Config JVM options and set onto the TaskManager container
[ https://issues.apache.org/jira/browse/FLINK-5137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5137: - Description: The users can config JVM options in job level. And the *ResourceManager* will set related JVM options for launching *TaskManager* container based on framework memory and slot resource profile. > Config JVM options and set onto the TaskManager container > - > > Key: FLINK-5137 > URL: https://issues.apache.org/jira/browse/FLINK-5137 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager, TaskManager >Reporter: Zhijiang Wang > > The users can config JVM options in job level. And the *ResourceManager* will > set related JVM options for launching *TaskManager* container based on > framework memory and slot resource profile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5137) Config JVM options and set onto the TaskManager container
Zhijiang Wang created FLINK-5137: Summary: Config JVM options and set onto the TaskManager container Key: FLINK-5137 URL: https://issues.apache.org/jira/browse/FLINK-5137 Project: Flink Issue Type: Sub-task Components: ResourceManager, TaskManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Description: When *ResourceManager* requests container resource from cluster, the framework memory should be considered together with slot resource. Currently the framework memory is mainly used by *MemoryManager* and *NetworkBufferPool* in *TaskManager*, and this memory can be got from configuration. > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > > When *ResourceManager* requests container resource from cluster, the > framework memory should be considered together with slot resource. > Currently the framework memory is mainly used by *MemoryManager* and > *NetworkBufferPool* in *TaskManager*, and this memory can be got from > configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Summary: Aggregation of slot ResourceProfile and framework memory by ResourceManager (was: Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager) > Aggregation of slot ResourceProfile and framework memory by ResourceManager > --- > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5136) Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-5136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5136: - Summary: Aggregation of slot ResourceProfile and framework memory for requesting resource from cluster by ResourceManager (was: ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster) > Aggregation of slot ResourceProfile and framework memory for requesting > resource from cluster by ResourceManager > > > Key: FLINK-5136 > URL: https://issues.apache.org/jira/browse/FLINK-5136 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5136) ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster
Zhijiang Wang created FLINK-5136: Summary: ResourceManager should consider both slot resource profile and framework memory for requesting resource from cluster Key: FLINK-5136 URL: https://issues.apache.org/jira/browse/FLINK-5136 Project: Flink Issue Type: Sub-task Components: ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5135: - Description: The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* before submitting tasks. Currently the *ResourceProfile* only contains cpu cores and memory properties. The memory should be expanded to different types such as heap memory, direct memory and native memory which corresponds with memory in *ResourceSpec*. > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang > > The *JobManager* requests slot by *ResourceProfile* from *ResourceManager* > before submitting tasks. Currently the *ResourceProfile* only contains cpu > cores and memory properties. The memory should be expanded to different types > such as heap memory, direct memory and native memory which corresponds with > memory in *ResourceSpec*. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5135) ResourceProfile for slot request should be expanded to correspond with ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-5135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5135: - Summary: ResourceProfile for slot request should be expanded to correspond with ResourceSpec (was: ResourceProfile should be expanded to correspond with ResourceSpec) > ResourceProfile for slot request should be expanded to correspond with > ResourceSpec > --- > > Key: FLINK-5135 > URL: https://issues.apache.org/jira/browse/FLINK-5135 > Project: Flink > Issue Type: Sub-task > Components: JobManager, ResourceManager >Reporter: Zhijiang Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspond with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspond with > *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with container
[jira] [Created] (FLINK-5135) ResourceProfile should be expanded to correspond with ResourceSpec
Zhijiang Wang created FLINK-5135: Summary: ResourceProfile should be expanded to correspond with ResourceSpec Key: FLINK-5135 URL: https://issues.apache.org/jira/browse/FLINK-5135 Project: Flink Issue Type: Sub-task Components: JobManager, ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. was: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. > And for cpu cores resource in *JobVertex*, the aggregation is the maximum > formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In *JobGraph* generation, each *JobVertex* corresponds to a series of chained operators. The resource of *JobVertex* should be aggregation of individual resource in chained operators. For memory resource in *JobVertex*, the aggregation is the sum formula for chained operators. And for cpu cores resource in *JobVertex*, the aggregation is the maximum formula for chained operators. was:In DataStream API, the *ResourceSpec* is setted onto the internal transformation > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In *JobGraph* generation, each *JobVertex* corresponds to a series of chained > operators. > The resource of *JobVertex* should be aggregation of individual resource in > chained operators. > For memory resource in *JobVertex*, the aggregation is the sum formula for > chained operators. And for cpu cores resource in *JobVertex*, the aggregation > is the maximum formula for chained operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In job graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In job graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached wit
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating job graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Summary: Aggregate ResourceSpe for chained operators when generating job graph (was: Aggregate ResourceSpe for chained operators when generating stream graph) > Aggregate ResourceSpe for chained operators when generating job graph > - > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In DataStream API, the *ResourceSpec* is setted onto the internal > transformation -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In DataStream API, the *ResourceSpec* is setted onto the internal transformation (was: In datastream API,) > Aggregate ResourceSpe for chained operators when generating stream graph > > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In DataStream API, the *ResourceSpec* is setted onto the internal > transformation -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
[ https://issues.apache.org/jira/browse/FLINK-5134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5134: - Description: In datastream API, > Aggregate ResourceSpe for chained operators when generating stream graph > > > Key: FLINK-5134 > URL: https://issues.apache.org/jira/browse/FLINK-5134 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Zhijiang Wang > > In datastream API, -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5134) Aggregate ResourceSpe for chained operators when generating stream graph
Zhijiang Wang created FLINK-5134: Summary: Aggregate ResourceSpe for chained operators when generating stream graph Key: FLINK-5134 URL: https://issues.apache.org/jira/browse/FLINK-5134 Project: Flink Issue Type: Sub-task Components: DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5133) Add new setResource API for DataStream and DataSet
[ https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5133: - Description: This is part of the fine-grained resource configuration. For *DataStream*, the *setResource* API will be setted onto *SingleOutputStreamOperator* similar with other existing properties like parallelism, name, etc. For *DataSet*, the *setResource* API will be setted onto *Operator* in the similar way. There are two parameters described with minimum *ResourceSpec* and maximum *ResourceSpec* separately in the API for considering resource resize in future improvements. > Add new setResource API for DataStream and DataSet > -- > > Key: FLINK-5133 > URL: https://issues.apache.org/jira/browse/FLINK-5133 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > For *DataStream*, the *setResource* API will be setted onto > *SingleOutputStreamOperator* similar with other existing properties like > parallelism, name, etc. > For *DataSet*, the *setResource* API will be setted onto *Operator* in the > similar way. > There are two parameters described with minimum *ResourceSpec* and maximum > *ResourceSpec* separately in the API for considering resource resize in > future improvements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5133) Add new setResource API for DataStream and DataSet
Zhijiang Wang created FLINK-5133: Summary: Add new setResource API for DataStream and DataSet Key: FLINK-5133 URL: https://issues.apache.org/jira/browse/FLINK-5133 Project: Flink Issue Type: Sub-task Components: DataSet API, DataStream API Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. This feature will be implemented directly into flip-6 branch. was: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In stream graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with container should be > supported and could also be conf
[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5132: - Description: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
[ https://issues.apache.org/jira/browse/FLINK-5132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5132: - Description: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. was: This is part of the fine-grained resource configuration. The current resource factors include cpu cores, heap memory, direct memory, native memory and state size. The *ResourceSpec* will provide some basic constructions for grouping different resource factors as needed and the construction can also be expanded easily for further requirements. > Introduce the ResourceSpec for grouping different resource factors in API > - > > Key: FLINK-5132 > URL: https://issues.apache.org/jira/browse/FLINK-5132 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Zhijiang Wang > > This is part of the fine-grained resource configuration. > The current resource factors include cpu cores, heap memory, direct memory, > native memory and state size. > The *ResourceSpec* will provide some basic constructions for grouping > different resource factors as needed and the construction can also be > expanded easily for further requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5132) Introduce the ResourceSpec for grouping different resource factors in API
Zhijiang Wang created FLINK-5132: Summary: Introduce the ResourceSpec for grouping different resource factors in API Key: FLINK-5132 URL: https://issues.apache.org/jira/browse/FLINK-5132 Project: Flink Issue Type: Sub-task Components: Core Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5131) Fine-grained Resource Configuration
[ https://issues.apache.org/jira/browse/FLINK-5131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5131: - Description: Normally the UDF just creates short-life small objects and these can be recycled quickly by JVM, so most of the memory resource is controlled and managed by *TaskManager* framework. But for some special cases, the UDF may consume much resource to create long-live big objects, so it is necessary to provide the options for professional users to define the resource usages if needed. The basic approach is the following: - Introduce the *ResourceSpec* structure to describe the different resource factors (cpu cores, heap memory, direct memory, native memory, etc) and provide some basic construction methods for resource group. - The *ResourceSpec* can be setted onto the internal transformation in DataStream and base operator in DataSet separately. - In stream graph generation, the *ResourceSpec* will be aggregated for chained operators. - When *JobManager* requests slot for submitting task from *ResourceManager*, the *ResourceProfile* will be expanded to correspondence with *ResourceSpec*. - The *ResourceManager* requests resource for container from cluster, it should consider extra framework memory except for slot *ResourceProfile*. - The framework memory is mainly used by *NetworkBufferPool* and *MemoryManager* in *TaskManager*, and it can be configured in job level. - Apart from resource, The JVM options attached with container should be supported and could also be configured in job level. > Fine-grained Resource Configuration > --- > > Key: FLINK-5131 > URL: https://issues.apache.org/jira/browse/FLINK-5131 > Project: Flink > Issue Type: New Feature > Components: DataSet API, DataStream API, JobManager, ResourceManager >Reporter: Zhijiang Wang > > Normally the UDF just creates short-life small objects and these can be > recycled quickly by JVM, so most of the memory resource is controlled and > managed by *TaskManager* framework. But for some special cases, the UDF may > consume much resource to create long-live big objects, so it is necessary to > provide the options for professional users to define the resource usages if > needed. > The basic approach is the following: > - Introduce the *ResourceSpec* structure to describe the different resource > factors (cpu cores, heap memory, direct memory, native memory, etc) and > provide some basic construction methods for resource group. > - The *ResourceSpec* can be setted onto the internal transformation in > DataStream and base operator in DataSet separately. > - In stream graph generation, the *ResourceSpec* will be aggregated for > chained operators. > - When *JobManager* requests slot for submitting task from > *ResourceManager*, the *ResourceProfile* will be expanded to correspondence > with *ResourceSpec*. > - The *ResourceManager* requests resource for container from cluster, it > should consider extra framework memory except for slot *ResourceProfile*. > - The framework memory is mainly used by *NetworkBufferPool* and > *MemoryManager* in *TaskManager*, and it can be configured in job level. > - Apart from resource, The JVM options attached with container should be > supported and could also be configured in job level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5131) Fine-grained Resource Configuration
Zhijiang Wang created FLINK-5131: Summary: Fine-grained Resource Configuration Key: FLINK-5131 URL: https://issues.apache.org/jira/browse/FLINK-5131 Project: Flink Issue Type: New Feature Components: DataSet API, DataStream API, JobManager, ResourceManager Reporter: Zhijiang Wang -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650576#comment-15650576 ] Zhijiang Wang commented on FLINK-4364: -- Hi [~till.rohrmann], for the heartbeat interaction between TM and JM, the process is almost the same with RM as we discussed before. There will be another separate {{HeartbeatManagerImpl}} and {{HeartbeatListener}} in TM used for JM heartbeat. Also TM will monitor the {{HeartbeatTarget}} when registration at new JM successfully by HA mechanism. There are two issues to be confirmed: 1. If TM detects JM as dead by heartbeat timeout, TM should not release all the tasks and slots which belong to that JM. TM should do nothing when notified of heartbeat timeout. It will re-register the new JM by HA and offer the related slots if possible. It is related with JM failure recovery process. If JM detects TM as dead by heartbeat timeout, it will release all the related slots with that TM and request from RM again. 2. For payload informations, currently I am not sure which informations need to be reported by heartbeat. The JM may need {{SlotPool}} to be consistent with {{SlotOffer}}, and it also concerns about other processes. So I think we can deliver payload as null in current implementation and just make the monitor function effect. Later we can expand the payload information as needed. Do you thinks the above points are feasible? Then I will work on it this week. > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646297#comment-15646297 ] Zhijiang Wang commented on FLINK-4354: -- Actually the current {{HeartbeatManagerImpl}} can work and be used directly in TM component. The only issue is that the {{HeartbeatManagerImpl}} implements the {{HeartbeatTarget}} interface and it will be only used in testing. TM will wraps a new {{HeartbeatTarget}} based on {{ResourceID}} and {{Gateway}} of RM, so TM will never call the methods of {{sendHeartbeat}} and {{requestHeartbeat}} in {{HeartbeatManagerImpl}}. In other words, {{HeartbeatManagerImpl}} has the role of {{HeartbeatTarget}}, but this role is redundant for TM and RM, because TM and RM will create the separate {{HeartbeatTarget}} in registration process. If {{HeartbeatManagerImpl}} does not implement {{HeartbeatTarget}}, it seems pure to be reuse by different components (TM,RM,JM) I think. Maybe my understanding is not very accurate, currently I can reuse {{HeartbeatManagerImpl}} in TM to realize the process. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643637#comment-15643637 ] Zhijiang Wang edited comment on FLINK-4354 at 11/7/16 9:46 AM: --- Hi [~till.rohrmann], for heartbeat interaction between TM and RM, I have some issues need to be confirmed with you before implementation. 1. The current {{HeartbeatManagerImpl}} implements both {{HeartbeatManager}} and {{HeartbeatTarget}} interfaces in order to test easily. I think we need another HeartbeatManagerImpl that just implements {{HeartbeatManager}} interface so can be used directly in different components. And every component can implement the separate {{HeartbeatTarget}}. 2. For TM component, the {{HeartbeatManagerImpl}} can be constructed in {{TaskManagerRunner}} (maybe not put in {{TaskManagerServices}}) and passed into {{TaskExecutor}}. 3. The TM will create the {{HeartbeatListener}} and start the {{HeartbeatManagerImpl}}. 4. When RM leader changes, the TM registers the new RM. If the registration successes, {{TaskExecutorRegistrationSuccess}} should contain {{ResourceID}} of RM, so the TM can create the {{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of RM. 5. For RM, when receive registration from TM, it will create the {{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of TM. RM will schedule a heartbeat request to all the monitored TMs. 6. {{TaskExecutorGateway}} should define the requestHeartbeat RPC method, and {{ResourceManagerGateway}} should define the sendHeartbeat RPC method. Do you think the above processes are feasible? I wish your professional advices and them begin to implement this week. was (Author: zjwang): Hi [~till.rohrmann], for heartbeat interaction between {{TM}} and {{RM}}, I have some issues need to be confirmed with you before implementation. 1. The current {{HeartbeatManagerImpl}} implements both {{HeartbeatManager}} and {{HeartbeatTarget}} interfaces in order to test easily. I think we need another HeartbeatManagerImpl that just implements {{HeartbeatManager}} interface so can be used directly in different components. And every component can implement the separate {{HeartbeatTarget}}. 2. For {{TM}} component, the {{HeartbeatManagerImpl}} can be constructed in {{TaskManagerRunner}} (maybe not put in {{TaskManagerServices}}) and passed into {{TaskExecutor}}. 3. The {{TM}} will create the {{HeartbeatListener}} and start the {{HeartbeatManagerImpl}}. 4. When {{RM}} leader changes, the {{TM}} registers the new {{RM}}. If the registration successes, {{TaskExecutorRegistrationSuccess}} should contain {{ResourceID}} of {{RM}}, so the {{TM}} can create the {{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of RM. 5. For {{RM}}, when receive registration from {{TM}}, it will create the {{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of {{TM}}. {{RM}} will schedule a heartbeat request to all the monitored TMs. 6. {{TaskExecutorGateway}} should define the requestHeartbeat RPC method, and {{ResourceManagerGateway}} should define the sendHeartbeat RPC method. Do you think the above processes are feasible? I wish your professional advices and them begin to implement this week. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643637#comment-15643637 ] Zhijiang Wang edited comment on FLINK-4354 at 11/7/16 9:44 AM: --- Hi [~till.rohrmann], for heartbeat interaction between {{TM}} and {{RM}}, I have some issues need to be confirmed with you before implementation. 1. The current {{HeartbeatManagerImpl}} implements both {{HeartbeatManager}} and {{HeartbeatTarget}} interfaces in order to test easily. I think we need another HeartbeatManagerImpl that just implements {{HeartbeatManager}} interface so can be used directly in different components. And every component can implement the separate {{HeartbeatTarget}}. 2. For {{TM}} component, the {{HeartbeatManagerImpl}} can be constructed in {{TaskManagerRunner}} (maybe not put in {{TaskManagerServices}}) and passed into {{TaskExecutor}}. 3. The {{TM}} will create the {{HeartbeatListener}} and start the {{HeartbeatManagerImpl}}. 4. When {{RM}} leader changes, the {{TM}} registers the new {{RM}}. If the registration successes, {{TaskExecutorRegistrationSuccess}} should contain {{ResourceID}} of {{RM}}, so the {{TM}} can create the {{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of RM. 5. For {{RM}}, when receive registration from {{TM}}, it will create the {{HeartbeatTarget}} and monitor it based on {{ResourceID}} and Gateway of {{TM}}. {{RM}} will schedule a heartbeat request to all the monitored TMs. 6. {{TaskExecutorGateway}} should define the requestHeartbeat RPC method, and {{ResourceManagerGateway}} should define the sendHeartbeat RPC method. Do you think the above processes are feasible? I wish your professional advices and them begin to implement this week. was (Author: zjwang): Hi [~till.rohrmann], for heartbeat interaction between {{TM}} and {{RM}}, I have some issues need to be confirmed with you before implementation. 1. The current {{HeartbeatManagerImpl}} implements both {{HeartbeatManager}} and {{HeartbeatTarget}} interfaces in order to test easily. I think we need another HeartbeatManagerImpl that just implements HeartbeatManager interface so can be used directly in different components. And every component can implement the separate HeartbeatTarget. 2. For TM component, the HeartbeatManagerImpl can be constructed in TaskManagerRunner (maybe not put in TaskManagerServices) and passed into TaskExecutor. 3. The TM will create the HeartbeatListener and start the HeartbeatManagerImpl. 4. When RM leader changes, the TM registers the new RM. If the registration successes, TaskExecutorRegistrationSuccess should contain ResourceID of RM, so the TM can create the HeartbeatTarget and monitor it based on ResourceID and Gateway of RM. 5. For RM, when receive registration from TM, it will create the HeartbeatTarget and monitor it based on ResourceID and Gateway of TM. RM will schedule a heartbeat request to all the monitored TMs. 6. TaskExecutorGateway should define the requestHeartbeat RPC method, and ResourceManagerGateway should define the sendHeartbeat RPC method. Do you think the above processes are feasible? I wish your professional advices and them begin to implement this week. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643637#comment-15643637 ] Zhijiang Wang edited comment on FLINK-4354 at 11/7/16 9:41 AM: --- Hi [~till.rohrmann], for heartbeat interaction between {{TM}} and {{RM}}, I have some issues need to be confirmed with you before implementation. 1. The current {{HeartbeatManagerImpl}} implements both {{HeartbeatManager}} and {{HeartbeatTarget}} interfaces in order to test easily. I think we need another HeartbeatManagerImpl that just implements HeartbeatManager interface so can be used directly in different components. And every component can implement the separate HeartbeatTarget. 2. For TM component, the HeartbeatManagerImpl can be constructed in TaskManagerRunner (maybe not put in TaskManagerServices) and passed into TaskExecutor. 3. The TM will create the HeartbeatListener and start the HeartbeatManagerImpl. 4. When RM leader changes, the TM registers the new RM. If the registration successes, TaskExecutorRegistrationSuccess should contain ResourceID of RM, so the TM can create the HeartbeatTarget and monitor it based on ResourceID and Gateway of RM. 5. For RM, when receive registration from TM, it will create the HeartbeatTarget and monitor it based on ResourceID and Gateway of TM. RM will schedule a heartbeat request to all the monitored TMs. 6. TaskExecutorGateway should define the requestHeartbeat RPC method, and ResourceManagerGateway should define the sendHeartbeat RPC method. Do you think the above processes are feasible? I wish your professional advices and them begin to implement this week. was (Author: zjwang): Hi [~till.rohrmann], for heartbeat interaction between TM and RM, I have some issues need to be confirmed with you before implementation. 1. The current HeartbeatManagerImpl implements both HeartbeatManager and HeartbeatTarget interfaces in order to test easily. I think we need another HeartbeatManagerImpl that just implements HeartbeatManager interface so can be used directly in different components. And every component can implement the separate HeartbeatTarget. 2. For TM component, the HeartbeatManagerImpl can be constructed in TaskManagerRunner (maybe not put in TaskManagerServices) and passed into TaskExecutor. 3. The TM will create the HeartbeatListener and start the HeartbeatManagerImpl. 4. When RM leader changes, the TM registers the new RM. If the registration successes, TaskExecutorRegistrationSuccess should contain ResourceID of RM, so the TM can create the HeartbeatTarget and monitor it based on ResourceID and Gateway of RM. 5. For RM, when receive registration from TM, it will create the HeartbeatTarget and monitor it based on ResourceID and Gateway of TM. RM will schedule a heartbeat request to all the monitored TMs. 6. TaskExecutorGateway should define the requestHeartbeat RPC method, and ResourceManagerGateway should define the sendHeartbeat RPC method. Do you think the above processes are feasible? I wish your professional advices and them begin to implement this week. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643637#comment-15643637 ] Zhijiang Wang commented on FLINK-4354: -- Hi [~till.rohrmann], for heartbeat interaction between TM and RM, I have some issues need to be confirmed with you before implementation. 1. The current HeartbeatManagerImpl implements both HeartbeatManager and HeartbeatTarget interfaces in order to test easily. I think we need another HeartbeatManagerImpl that just implements HeartbeatManager interface so can be used directly in different components. And every component can implement the separate HeartbeatTarget. 2. For TM component, the HeartbeatManagerImpl can be constructed in TaskManagerRunner (maybe not put in TaskManagerServices) and passed into TaskExecutor. 3. The TM will create the HeartbeatListener and start the HeartbeatManagerImpl. 4. When RM leader changes, the TM registers the new RM. If the registration successes, TaskExecutorRegistrationSuccess should contain ResourceID of RM, so the TM can create the HeartbeatTarget and monitor it based on ResourceID and Gateway of RM. 5. For RM, when receive registration from TM, it will create the HeartbeatTarget and monitor it based on ResourceID and Gateway of TM. RM will schedule a heartbeat request to all the monitored TMs. 6. TaskExecutorGateway should define the requestHeartbeat RPC method, and ResourceManagerGateway should define the sendHeartbeat RPC method. Do you think the above processes are feasible? I wish your professional advices and them begin to implement this week. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation
[ https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15611247#comment-15611247 ] Zhijiang Wang commented on FLINK-4911: -- Yeah, it does "at-least once". BTW, I confirmed with Till that the heartbeat manager module has finished and the previous blocked jiras can be resumed. Some are assigned to me before related with heartbeat interaction between TM, JM and RM. I will work on them in the following days and then consider the JM failure issue, and I think the payload informations reported in the heartbeat messages maybe reused in JM failure scenario. > Non-disruptive JobManager Failures via Reconciliation > -- > > Key: FLINK-4911 > URL: https://issues.apache.org/jira/browse/FLINK-4911 > Project: Flink > Issue Type: New Feature > Components: JobManager, TaskManager >Reporter: Stephan Ewen >Assignee: Zhijiang Wang > > JobManager failures can be handled in a non-disruptive way - by *reconciling* > the new JobManager leader and the TaskManagers. > I suggest to use this term (reconcile) - it has been uses also by other > frameworks (like Mesos) for non-disruptive handling of failures. > The basic approach is the following: > - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to > reconnect to the JobManager > - On connect, the TaskManager tells the JobManager about its currently > running tasks > - A new JobManager waits for TaskManagers to connect and report a task > status. It re-constructs the ExecutionGraph state from these reports > - Tasks whose status was not reconstructed in a certain time are assumed > failed and trigger regular task recovery. > To avoid having to re-implement this for the new JobManager / TaskManager > approach in *flip-6*, I suggest to directly implement this into the > {{flip-6}} feature branch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation
[ https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-4911: - Component/s: TaskManager > Non-disruptive JobManager Failures via Reconciliation > -- > > Key: FLINK-4911 > URL: https://issues.apache.org/jira/browse/FLINK-4911 > Project: Flink > Issue Type: New Feature > Components: JobManager, TaskManager >Reporter: Stephan Ewen >Assignee: Zhijiang Wang > > JobManager failures can be handled in a non-disruptive way - by *reconciling* > the new JobManager leader and the TaskManagers. > I suggest to use this term (reconcile) - it has been uses also by other > frameworks (like Mesos) for non-disruptive handling of failures. > The basic approach is the following: > - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to > reconnect to the JobManager > - On connect, the TaskManager tells the JobManager about its currently > running tasks > - A new JobManager waits for TaskManagers to connect and report a task > status. It re-constructs the ExecutionGraph state from these reports > - Tasks whose status was not reconstructed in a certain time are assumed > failed and trigger regular task recovery. > To avoid having to re-implement this for the new JobManager / TaskManager > approach in *flip-6*, I suggest to directly implement this into the > {{flip-6}} feature branch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation
[ https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-4911: Assignee: Zhijiang Wang > Non-disruptive JobManager Failures via Reconciliation > -- > > Key: FLINK-4911 > URL: https://issues.apache.org/jira/browse/FLINK-4911 > Project: Flink > Issue Type: New Feature > Components: JobManager >Reporter: Stephan Ewen >Assignee: Zhijiang Wang > > JobManager failures can be handled in a non-disruptive way - by *reconciling* > the new JobManager leader and the TaskManagers. > I suggest to use this term (reconcile) - it has been uses also by other > frameworks (like Mesos) for non-disruptive handling of failures. > The basic approach is the following: > - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to > reconnect to the JobManager > - On connect, the TaskManager tells the JobManager about its currently > running tasks > - A new JobManager waits for TaskManagers to connect and report a task > status. It re-constructs the ExecutionGraph state from these reports > - Tasks whose status was not reconstructed in a certain time are assumed > failed and trigger regular task recovery. > To avoid having to re-implement this for the new JobManager / TaskManager > approach in *flip-6*, I suggest to directly implement this into the > {{flip-6}} feature branch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation
[ https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15607630#comment-15607630 ] Zhijiang Wang commented on FLINK-4911: -- Hi Stephan, the sub-tasks 2 and 3 are the same, do you mind I remove the sub-task 3? > Non-disruptive JobManager Failures via Reconciliation > -- > > Key: FLINK-4911 > URL: https://issues.apache.org/jira/browse/FLINK-4911 > Project: Flink > Issue Type: New Feature > Components: JobManager >Reporter: Stephan Ewen > > JobManager failures can be handled in a non-disruptive way - by *reconciling* > the new JobManager leader and the TaskManagers. > I suggest to use this term (reconcile) - it has been uses also by other > frameworks (like Mesos) for non-disruptive handling of failures. > The basic approach is the following: > - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to > reconnect to the JobManager > - On connect, the TaskManager tells the JobManager about its currently > running tasks > - A new JobManager waits for TaskManagers to connect and report a task > status. It re-constructs the ExecutionGraph state from these reports > - Tasks whose status was not reconstructed in a certain time are assumed > failed and trigger regular task recovery. > To avoid having to re-implement this for the new JobManager / TaskManager > approach in *flip-6*, I suggest to directly implement this into the > {{flip-6}} feature branch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4911) Non-disruptive JobManager Failures via Reconciliation
[ https://issues.apache.org/jira/browse/FLINK-4911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15607336#comment-15607336 ] Zhijiang Wang commented on FLINK-4911: -- FLINK-3539 tries to realize eventual consistency between JM and TMs in regular running. In JM failure scenario, the new JM wants to recover from the initial state based on all running tasks. Maybe we can refer to the running task informations from heartbeat messages and apply these infos for registration process. > Non-disruptive JobManager Failures via Reconciliation > -- > > Key: FLINK-4911 > URL: https://issues.apache.org/jira/browse/FLINK-4911 > Project: Flink > Issue Type: New Feature > Components: JobManager >Reporter: Stephan Ewen > > JobManager failures can be handled in a non-disruptive way - by *reconciling* > the new JobManager leader and the TaskManagers. > I suggest to use this term (reconcile) - it has been uses also by other > frameworks (like Mesos) for non-disruptive handling of failures. > The basic approach is the following: > - When a JobManager fails, TaskManagers do not cancel tasks, but attempt to > reconnect to the JobManager > - On connect, the TaskManager tells the JobManager about its currently > running tasks > - A new JobManager waits for TaskManagers to connect and report a task > status. It re-constructs the ExecutionGraph state from these reports > - Tasks whose status was not reconstructed in a certain time are assumed > failed and trigger regular task recovery. > To avoid having to re-implement this for the new JobManager / TaskManager > approach in *flip-6*, I suggest to directly implement this into the > {{flip-6}} feature branch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4915) Define parameters for reconciliation
[ https://issues.apache.org/jira/browse/FLINK-4915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-4915: Assignee: Zhijiang Wang > Define parameters for reconciliation > > > Key: FLINK-4915 > URL: https://issues.apache.org/jira/browse/FLINK-4915 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Zhijiang Wang > > For reconciliation between TaskManagers and JobManager, we need to define the > following parameters: > - Duration that a TaskManager keeps a Task running when looking for a new > JobManager. After that duration is over, the Task is canceled/failed. > - Duration that the JobManager waits for TaskManagers to report that a task > is still running. > We should also try and determine some sane default values for those > parameters. A suggestion could be {{1 min}} waiting time on the TaskManager > side and {{20 secs}} waiting time on the JobManager side. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4912) Introduce RECONCILING state in ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-4912: Assignee: Zhijiang Wang > Introduce RECONCILING state in ExecutionGraph > - > > Key: FLINK-4912 > URL: https://issues.apache.org/jira/browse/FLINK-4912 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Zhijiang Wang > > This is part of the non-disruptive JobManager failure recovery. > I suggest to add a JobStatus and ExecutionState {{RECONCILING}}. > If a job is started on a that JobManager for master recovery (tbd how to > determine that) the {{ExecutionGraph}} and the {{Execution}}s start in the > reconciling state. > From {{RECONCILING}}, tasks can go to {{RUNNING}} (execution reconciled with > TaskManager) or to {{FAILED}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4914) Define parameters for reconciliation
[ https://issues.apache.org/jira/browse/FLINK-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-4914: Assignee: Zhijiang Wang > Define parameters for reconciliation > > > Key: FLINK-4914 > URL: https://issues.apache.org/jira/browse/FLINK-4914 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Zhijiang Wang > > For reconciliation between TaskManagers and JobManager, we need to define the > following parameters: > - Duration that a TaskManager keeps a Task running when looking for a new > JobManager. After that duration is over, the Task is canceled/failed. > - Duration that the JobManager waits for TaskManagers to report that a task > is still running. > We should also try and determine some sane default values for those > parameters. A suggestion could be {{1 min}} waiting time on the TaskManager > side and {{20 secs}} waiting time on the JobManager side. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534797#comment-15534797 ] Zhijiang Wang edited comment on FLINK-4715 at 9/30/16 2:31 AM: --- Yes, we already experienced this problem in real production many times, because the user code can not be controlled. If the thread is waiting for synchronized lock or other cases, it can not be cancelled. We take the way that if the job master cancel the task failed many times, the job master will let the task manager exit itself. was (Author: zjwang): Yes, we already experienced this problem in real production many times, because the user code can not be controlled. If the thread is waiting for synchronized lock or other cases, it can not be cancelled, and the job master cancel the task failed many times, the job master will let the task manager exit itself. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534797#comment-15534797 ] Zhijiang Wang commented on FLINK-4715: -- Yes, we already experienced this problem in real production many times, because the user code can not be controlled. If the thread is waiting for synchronized lock or other cases, it can not be cancelled, and the job master cancel the task failed many times, the job master will let the task manager exit itself. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang closed FLINK-4505. > Implement TaskManagerRunner to construct related components for TaskManager > --- > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskManagerRunner}} to construct related components > ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, > {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or > standalone mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-4505: - Description: Implement {{TaskManagerRunner}} to construct related components ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or standalone mode. (was: Implement {{TaskExecutorFactory}} that should be an abstract class with the helper methods to bring up the {{TaskManager}}. The factory can be implemented by some classes to start a {{TaskManager}} in different modes (testing, standalone, yarn).) > Implement TaskManagerRunner to construct related components for TaskManager > --- > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskManagerRunner}} to construct related components > ({{MemoryManager}}, {{IOManager}}, {{NetworkEnvironment}}, > {{TaskManagerLocation}}) for {{TaskManager}} and start them in yarn or > standalone mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4505) Implement TaskManagerRunner to construct related components for TaskManager
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-4505: - Summary: Implement TaskManagerRunner to construct related components for TaskManager (was: Implement TaskManagerFactory to bring up TaskManager for different modes) > Implement TaskManagerRunner to construct related components for TaskManager > --- > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskExecutorFactory}} that should be an abstract class with the > helper methods to bring up the {{TaskManager}}. The factory can be > implemented by some classes to start a {{TaskManager}} in different modes > (testing, standalone, yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438745#comment-15438745 ] Zhijiang Wang commented on FLINK-4363: -- Hi [~mxm], I have created the FLINK-4505 for TaskExecutorFactory issue. As you mentioned, it should be an abstract class and provide an abstract method maybe called 'createAndStartTaskExecutor()'. There may be at least three different specific factories(testing, yarn, standalone) extend TaskExecutorFactory to implement the method 'createAndStartTaskExecutor'. The parameters in constructor of specific factories are different based on different modes. For example: for StandaloneTaskExecutorFactory, the constructor parameter should be (Configuration configuration,ResourceID resourceID, RpcService rpcService,String taskManagerHostname,HighAvailabilityServices haServices,boolean localTaskManagerCommunication), and in the 'createAndStartTaskExecutor()' method it can invoke ‘startTaskManagerComponentsAndActor' method in {{TaskExecutor}} to bring up {{TaskExecutor}}. Do you have any other advices, then I can start this subtask later. https://issues.apache.org/jira/browse/FLINK-4505 > Implement TaskManager basic startup of all components in java > - > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-4505: - Summary: Implement TaskManagerFactory to bring up TaskManager for different modes (was: Implement abstract TaskManagerFactory to bring up TaskManager for different modes) > Implement TaskManagerFactory to bring up TaskManager for different modes > > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Priority: Minor > > Implement {{TaskExecutorFactory}} that should be an abstract class with the > helper methods to bring up the {{TaskManager}}. The factory can be > implemented by some classes to start a {{TaskManager}} in different modes > (testing, standalone, yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-4505: Assignee: Zhijiang Wang > Implement TaskManagerFactory to bring up TaskManager for different modes > > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskExecutorFactory}} that should be an abstract class with the > helper methods to bring up the {{TaskManager}}. The factory can be > implemented by some classes to start a {{TaskManager}} in different modes > (testing, standalone, yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4505) Implement abstract TaskManagerFactory to bring up TaskManager for different modes
Zhijiang Wang created FLINK-4505: Summary: Implement abstract TaskManagerFactory to bring up TaskManager for different modes Key: FLINK-4505 URL: https://issues.apache.org/jira/browse/FLINK-4505 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Zhijiang Wang Priority: Minor Implement {{TaskExecutorFactory}} that should be an abstract class with the helper methods to bring up the {{TaskManager}}. The factory can be implemented by some classes to start a {{TaskManager}} in different modes (testing, standalone, yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4489) Implement TaskManager's SlotManager
[ https://issues.apache.org/jira/browse/FLINK-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438621#comment-15438621 ] Zhijiang Wang commented on FLINK-4489: -- Does it refer to the slot manager in {{ResourceManager}} or no relevance with it? > Implement TaskManager's SlotManager > --- > > Key: FLINK-4489 > URL: https://issues.apache.org/jira/browse/FLINK-4489 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann > > The {{SlotManager}} is responsible for managing the available slots on the > {{TaskManager}}. This basically means to maintain the mapping between slots > and the owning {{JobManagers}} and to offer tasks which run in the slots > access to the owning {{JobManagers}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438609#comment-15438609 ] Zhijiang Wang commented on FLINK-4354: -- Yes, so it would be started after flink-4478 done. > Implement TaskManager side of heartbeat from ResourceManager > - > > Key: FLINK-4354 > URL: https://issues.apache.org/jira/browse/FLINK-4354 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. > The {{TaskManager}} transmits its slot availability with each heartbeat. That > way, the RM will always know about available slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4424) Make network environment start-up/shutdown independent of JobManager association
[ https://issues.apache.org/jira/browse/FLINK-4424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434680#comment-15434680 ] Zhijiang Wang commented on FLINK-4424: -- The previous way of {{NetworkEnvironment}} is like lazy start. Whether to start the {{ConnectionManager}} relies on establishing RPC communication between {{TaskManager}} and {{JobManager}} because the connection info of upstream is notified to downstream by {{JobMaster}}. Actually we can pre-start the {{ConnectionManager}} after construct the {{NetworkEnvironment}} component to make the registration process more cleaner, not mixed with starting {{ConnectionManager}}. @Till, I can first extract the start related process of {{ConnectionManager}} and put in constructor of {{NetworkEnvironment}}. And the shutdown process may rely on the new TaskExecutor progress, and it may be fixed later. Do you think so? > Make network environment start-up/shutdown independent of JobManager > association > > > Key: FLINK-4424 > URL: https://issues.apache.org/jira/browse/FLINK-4424 > Project: Flink > Issue Type: Improvement > Components: Network, TaskManager >Reporter: Till Rohrmann > > Currently, the {{TaskManager}} starts the netty network server only after it > has registered with a {{JobManager}}. Upon loss of connection to the > {{JobManager}} the {{NetworkEnvironment}} is closed. > The start-up and shutdown of the network server should be independent of the > {{JobManager}} connection, especially if we assume that a TM can be > associated with multiple JobManagers in the future (FLIP-6). > Starting the network server only once when the {{TaskManager}} is started has > the advantage that we don't have to preconfigure the {{TaskManager's}} data > port. Furthermore we don't risk to get stuck when disassociating from a > {{JobManager}} because the start-up and shutdown of a {{NetworkEnvironment}} > can cause problems (because it has to reserve/free resources). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15427951#comment-15427951 ] Zhijiang Wang commented on FLINK-4363: -- Thank you Till, I got your meaning. I will follow the approaches of current TaskManager, maybe just translate from scala to java for the first step. So it is feasible to test functions based on new TaskExecutor construction for RPC abstraction. The only concern is that the TaskExecutor can not work correctly for the previous process because of LeaderRetrievalService and TaskManagerActor replaced by HighAvailableService and RPCService. I will submit the pull request on Monday if possible. > Implement TaskManager basic startup of all components in java > - > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-4363: Assignee: Zhijiang Wang > Implement TaskManager basic startup of all components in java > - > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection
[ https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424183#comment-15424183 ] Zhijiang Wang commented on FLINK-4021: -- got it, i will modify the return and add the test to PR branch this week. > Problem of setting autoread for netty channel when more tasks sharing the > same Tcp connection > - > > Key: FLINK-4021 > URL: https://issues.apache.org/jira/browse/FLINK-4021 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > More than one task sharing the same Tcp connection for shuffling data. > If the downstream task said as "A" has no available memory segment to read > netty buffer from network, it will set autoread as false for the channel. > When the task A is failed or has available segments again, the netty handler > will be notified to process the staging buffers first, then reset autoread as > true. But in some scenarios, the autoread will not be set as true any more. > That is when processing staging buffers, first find the corresponding input > channel for the buffer, if the task for that input channel is failed, the > decodeMsg method in PartitionRequestClientHandler will return false, that > means setting autoread as true will not be done anymore. > In summary, if one task "A" sets the autoread as false because of no > available segments, and resulting in some staging buffers. If another task > "B" is failed by accident corresponding to one staging buffer. When task A > trys to reset autoread as true, the process can not work because of task B > failed. > I have fixed this problem in our application by adding one boolean parameter > in decodeBufferOrEvent method to distinguish whether this method is invoke by > netty IO thread channel read or staged message handler task in > PartitionRequestClientHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424056#comment-15424056 ] Zhijiang Wang commented on FLINK-4363: -- For yarn and standalone mode, the entrance methods for TaskManger are 'selectNetworkInterfaceAndRunTaskManager' and 'startTaskManagerComponentsAndActor' separately. The new TaskExecutor should follow the same method as TaskManager or define the new uniform method for both modes? Another main difference is actor related initialization replaced by RpcService. Should this subtask be started right now or wait for other related subtasks? > Implement TaskManager basic startup of all components in java > - > > Key: FLINK-4363 > URL: https://issues.apache.org/jira/browse/FLINK-4363 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang > > Similar with current {{TaskManager}},but implement initialization and startup > all components in java instead of scala. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4355) Implement TaskManager side of registration at ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-4355: Assignee: Stephan Ewen (was: Zhijiang Wang) > Implement TaskManager side of registration at ResourceManager > - > > Key: FLINK-4355 > URL: https://issues.apache.org/jira/browse/FLINK-4355 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Stephan Ewen > > If the {{TaskManager}} is unregistered, it should try and register at the > {{ResourceManager}} leader. The registration messages are fenced via the > {{RmLeaderID}}. > The ResourceManager may acknowledge the registration (or respond that the > TaskManager is AlreadyRegistered) or refuse the registration. > Upon registration refusal, the TaskManager may have to kill itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection
[ https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423994#comment-15423994 ] Zhijiang Wang commented on FLINK-4021: -- Yeah, you are right. It is no problem in current flink failover mode. Maybe it is suitable for FLIP1 'https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures'. And we already improved the task failures by just restarting the source and failed ones, so noticed this issue. I forgot to add the testing for the function, thank you for confirming it. And what other jobs should I do next? > Problem of setting autoread for netty channel when more tasks sharing the > same Tcp connection > - > > Key: FLINK-4021 > URL: https://issues.apache.org/jira/browse/FLINK-4021 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > More than one task sharing the same Tcp connection for shuffling data. > If the downstream task said as "A" has no available memory segment to read > netty buffer from network, it will set autoread as false for the channel. > When the task A is failed or has available segments again, the netty handler > will be notified to process the staging buffers first, then reset autoread as > true. But in some scenarios, the autoread will not be set as true any more. > That is when processing staging buffers, first find the corresponding input > channel for the buffer, if the task for that input channel is failed, the > decodeMsg method in PartitionRequestClientHandler will return false, that > means setting autoread as true will not be done anymore. > In summary, if one task "A" sets the autoread as false because of no > available segments, and resulting in some staging buffers. If another task > "B" is failed by accident corresponding to one staging buffer. When task A > trys to reset autoread as true, the process can not work because of task B > failed. > I have fixed this problem in our application by adding one boolean parameter > in decodeBufferOrEvent method to distinguish whether this method is invoke by > netty IO thread channel read or staged message handler task in > PartitionRequestClientHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection
[ https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423984#comment-15423984 ] Zhijiang Wang commented on FLINK-4021: -- Thank you for advice, it is indeed simple by 'return isStagedBuffer' directly. > Problem of setting autoread for netty channel when more tasks sharing the > same Tcp connection > - > > Key: FLINK-4021 > URL: https://issues.apache.org/jira/browse/FLINK-4021 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > More than one task sharing the same Tcp connection for shuffling data. > If the downstream task said as "A" has no available memory segment to read > netty buffer from network, it will set autoread as false for the channel. > When the task A is failed or has available segments again, the netty handler > will be notified to process the staging buffers first, then reset autoread as > true. But in some scenarios, the autoread will not be set as true any more. > That is when processing staging buffers, first find the corresponding input > channel for the buffer, if the task for that input channel is failed, the > decodeMsg method in PartitionRequestClientHandler will return false, that > means setting autoread as true will not be done anymore. > In summary, if one task "A" sets the autoread as false because of no > available segments, and resulting in some staging buffers. If another task > "B" is failed by accident corresponding to one staging buffer. When task A > trys to reset autoread as true, the process can not work because of task B > failed. > I have fixed this problem in our application by adding one boolean parameter > in decodeBufferOrEvent method to distinguish whether this method is invoke by > netty IO thread channel read or staged message handler task in > PartitionRequestClientHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-4364: - Description: The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and the {{TaskManager}} will report metrics info for each heartbeat. (was: JM initiates heartbeat with info (JobID, JobManagerId), and TM responses to JM with metrics info.) > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4357) Implement TaskManager side of slot allocation from ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-4357: - Description: The {{ResourceManager}} may tell the {{TaskManager}} to give a slot to a specific {{JobManager}}. The slot allocation messages are fenced via ({{RmLeaderID}}, {{JobID}}, {{AllocationID}}, {{ResourceID}}). The TM will ack the request ({{RmLeaderID}},{{AllocationID}}) to the RM and then offer that slot to the JM. If not accepted by JM, the TM notifies the RM that the slot is in fact available. was: The {{ResourceManager}} may tell the {{TaskManager}} to give a slot to a specific {{JobManager}}. The slot allocation messages are fenced via ({{RmLeaderID}}, {{JobID}}, {{AllocationID}}, {{ResourceID}}, {{slotID}}). The TM will ack the request ({{RmLeaderID}},{{AllocationID}}) to the RM and then offer that slot to the JM. If not accepted by JM, the TM notifies the RM that the slot is in fact available. > Implement TaskManager side of slot allocation from ResourceManager > -- > > Key: FLINK-4357 > URL: https://issues.apache.org/jira/browse/FLINK-4357 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > The {{ResourceManager}} may tell the {{TaskManager}} to give a slot to a > specific {{JobManager}}. > The slot allocation messages are fenced via ({{RmLeaderID}}, {{JobID}}, > {{AllocationID}}, {{ResourceID}}). > The TM will ack the request ({{RmLeaderID}},{{AllocationID}}) to the RM and > then offer that slot to the JM. If not accepted by JM, the TM notifies the RM > that the slot is in fact available. -- This message was sent by Atlassian JIRA (v6.3.4#6332)