Hey all, I have been doing some digging to see if there is a good way to do an idempotent job submission. I was hoping to write a job submission agent that does the following:
1. Checks to see if the cluster is running yet (can contact a JobManager) 2. Checks to see if the job it is watching is running. 3. Submits the job if it is not yet running. 4. Retry if there are any issues. Specifically at the moment there doesn’t seem to be any functionality for submitting a job if it doesn’t exist. The current interface creates a situation where a race condition is possible (as far as I can tell): For example if the following sequence of events occurs: 1. JobManager fails and a new Leader is re-elected: * JobManager Asynchronously starts restoring jobs: here<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L300> 2. Client Calls to list currently running jobs (before jobs are restored) and gets back an incomplete list of running jobs<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1009> because SubmitJob registers jobs in currentJobs<https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1300> 3. Client Assumes Job is no longer running so uses HTTP/CLI/Whatever to restore job. 4. Current interfaces don’t pass in the same JobID (a new one is generated for each submit) so a new Job is submitted with a new JobID 5. JobManager restores previous instance of the running Job 6. Now there are 2 instances of the job running in the cluster. While the above state is pretty unlikely to hit when one is submitting jobs manually, it seems to me that an agent like the above might end up hitting it if the cluster was having trouble with JobManagers failing. I can see that FLIP-6<https://issues.apache.org/jira/browse/FLINK-4319> is rewriting the whole JobManager itself. From my reading of the current code base this work is 1/2 way done in master. From my reading of the code/docs it seems that from the submission side the expectation for Docker/Kubernetes is that you will create two sets of containers: 1. A JobMaster/ResourceManager container that contains the user’s job in some form (jar or as a serialized JobGraph). 2. A TaskManager container which is either generic or potentially has user libs (up to the implementer/cluster maintainer) As I currently understand the code the JobMaster instances will: 1. Start up a JobMasterRunner which connects to the Leader service and creates a JobMaster with the supplied JobGraph (which I assume will always have the same JobID for restore purposes). 2. When the node is granted leadership the JobMasterRunner starts the JobMaster which will schedule the ExecutionGraph which it created from the supplied JobGraph. This all seems fine for a new job submission but the since the restore logic is not yet implemented I am wondering what the way that people will interact with clusters for job submission. From this doc<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077> it appears that the current JobManager infrastructure will instead become a “FlinkDispatcher". Is the intent to have the savepoint launch restore logic in the FlinkDispatcher and have it control the Job upgrade lifecycle? We are currently looking at running Flink on Kubernetes. FLIP-6 looks to organize that much better than the way things currently work. Specifically for us we are looking to implement a clean way to have clients have a clear deployment/upgrade path for Flink jobs that can be integrated into automated build pipelines and such. Is the intention on the new system to have another orchestration layer for upgrading jobs or will the JobMaster itself handle those situations? To me the JobMaster seems like the correct place to do the job upgrade coordination because its the single point of control for a single job. Then, for example on kubernetes, one would just have to re-launch the JobMaster containers and it would take care of the rest in the JobMaster logic to consolidate the upgraded JobGraph. On the other hand this might not fit cleanly into the separation of concerns that currently exists within the JobManager. I am also wondering what work needs doing on FLIP-6. Overall it closely aligns with what we are trying to do on our end to make Flink easier to use so I might get some time to help out with this effort. Thanks, James bucher