Stephan Ewen created FLINK-17781:
------------------------------------

             Summary: OperatorCoordinator Context must support calls from 
thread other than JobMaster Main Thread
                 Key: FLINK-17781
                 URL: https://issues.apache.org/jira/browse/FLINK-17781
             Project: Flink
          Issue Type: Sub-task
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 1.11.0


Currently, calls on the Context in the OperatorCoordinator go directly 
synchronously to the ExcutionGraph.

There are two critical problems are:
  - It is common that the code in the OperatorCoordinator runs in a separate 
thread (for example, because it executes blocking operations). Calling the 
scheduler from another thread causes the Scheduler to crash (Assertion Error, 
violation of single threaded property)
  - Calls on the ExecutionGraph are removed as part of removing the legacy 
scheduler. Certain calls do not work any more.

+Problem Level 1:+

The solution would be to pass in the scheduler and a main thread executor to 
interact with it.

However, to do that the scheduler needs to be created before the 
OperatorCoordinators are created. One could do that by creating the 
Coordinators lazily after the Scheduler.

+Problem Level 2:+

The Scheduler restores the savepoints as part of the scheduler creation, when 
the ExecutionGraph and the CheckpointCoordinator are created early in the 
constructor.
(Side note: That design is tricky in itself, because it means state is restored 
before the scheduler is even properly constructed.)

That means the OperatorCoordinator needs to exist (or an in placeholder 
component needs to exist) to accept the restored state.

That brings us to a cyclic dependency:
  - OperatorCoordinator (context) needs Scheduler and MainThreadExecutor
  - Scheduler and MainThreadExecutor need constructed ExecutionGraph
  - ExecutionGraph needs CheckpointCoordinator
  - CheckpointCoordinator needs OperatorCoordinator

+Breaking the Cycle+

The only way we can do this is with a form of lazy initialization:
  - We eagerly create the OperatorCoordinators so they exist for state restore
  - We provide an uninitialized context to them
  - When the Scheduler is started (after leadership is granted) we initialize 
the context with the (then readily constructed) Scheduler and MainThreadExecutor

+Longer-term Solution+

The longer term solution would require a major change in the Scheduler and 
CheckpointCoordinator setup. Something like this:
  - Scheduler (and ExecutionGraph) are constructed first
  - JobMaster waits for leadership
  - Upon leader grant, Operator Coordinators are constructed and can reference 
the Scheduler and FencedMainThreadExecutor
  - CheckpointCoordinator is constructed and references ExecutionGraph and 
OperatorCoordinators
  - Savepoint or latest checkpoint is restored

The implementation of the current should try to couple parts as loosely as 
possible to make it easy to implement the above approach later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to