Hey all,

I have been doing some digging to see if there is a good way to do an 
idempotent job submission. I was hoping to write a job submission agent that 
does the following:

  1.  Checks to see if the cluster is running yet (can contact a JobManager)
  2.  Checks to see if the job it is watching is running.
  3.  Submits the job if it is not yet running.
  4.  Retry if there are any issues.

Specifically at the moment there doesn’t seem to be any functionality for 
submitting a job if it doesn’t exist. The current interface creates a situation 
where a race condition is possible (as far as I can tell):

For example if the following sequence of events occurs:

  1.  JobManager fails and a new Leader is re-elected:
     *   JobManager Asynchronously starts restoring jobs: 
here<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L300>
  2.  Client Calls to list currently running jobs (before jobs are restored) 
and gets back an incomplete list of running 
jobs<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1009>
 because SubmitJob registers jobs in 
currentJobs<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1300>
  3.  Client Assumes Job is no longer running so uses HTTP/CLI/Whatever to 
restore job.
  4.  Current interfaces don’t pass in the same JobID (a new one is generated 
for each submit) so a new Job is submitted with a new JobID
  5.  JobManager restores previous instance of the running Job
  6.  Now there are 2 instances of the job running in the cluster.

While the above state is pretty unlikely to hit when one is submitting jobs 
manually, it seems to me that an agent like the above might end up hitting it 
if the cluster was having trouble with JobManagers failing.

I can see that FLIP-6<https://issues.apache.org/jira/browse/FLINK-4319> is 
rewriting the whole JobManager itself. From my reading of the current code base 
this work is 1/2 way done in master.

From my reading of the code/docs it seems that from the submission side the 
expectation for Docker/Kubernetes is that you will create two sets of 
containers:

  1.  A JobMaster/ResourceManager container that contains the user’s job in 
some form (jar or as a serialized JobGraph).
  2.  A TaskManager container which is either generic or potentially has user 
libs (up to the implementer/cluster maintainer)

As I currently understand the code the JobMaster instances will:

  1.  Start up a JobMasterRunner which connects to the Leader service and 
creates a JobMaster with the supplied JobGraph (which I assume will always have 
the same JobID for restore purposes).
  2.  When the node is granted leadership the JobMasterRunner starts the 
JobMaster which will schedule the ExecutionGraph which it created from the 
supplied JobGraph.

This all seems fine for a new job submission but the since the restore logic is 
not yet implemented I am wondering what the way that people will interact with 
clusters for job submission. From this 
doc<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077> 
it appears that the current JobManager infrastructure will instead become a 
“FlinkDispatcher". Is the intent to have the savepoint launch restore logic in 
the FlinkDispatcher and have it control the Job upgrade lifecycle?

We are currently looking at running Flink on Kubernetes. FLIP-6 looks to 
organize that much better than the way things currently work. Specifically for 
us we are looking to implement a clean way to have clients have a clear 
deployment/upgrade path for Flink jobs that can be integrated into automated 
build pipelines and such. Is the intention on the new system to have another 
orchestration layer for upgrading jobs or will the JobMaster itself handle 
those situations?

To me the JobMaster seems like the correct place to do the job upgrade 
coordination because its the single point of control for a single job. Then, 
for example on kubernetes, one would just have to re-launch the JobMaster 
containers and it would take care of the rest in the JobMaster logic to 
consolidate the upgraded JobGraph. On the other hand this might not fit cleanly 
into the separation of concerns that currently exists within the JobManager.

I am also wondering what work needs doing on FLIP-6. Overall it closely aligns 
with what we are trying to do on our end to make Flink easier to use so I might 
get some time to help out with this effort.

Thanks,
James bucher

Reply via email to