XComp commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r841387465


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up 
was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            
executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the 
JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would 
eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraph(executionGraphInfo);
+
+            registerCleanupInJobResultStore(executionGraphInfo);
+
+            return archiveFuture.thenApplyAsync(ignored -> 
CleanupJobState.GLOBAL);
+        } else {
+            return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
+        }
     }
 
-    private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
+    private void storeExecutionGraphInfo(ExecutionGraphInfo 
executionGraphInfo) {

Review Comment:
   `storeExecutionGraphInfo` and `archiveExecutionGraph` are too generic in my 
opinion. What about something like `writeToExecutionGraphInfoStore` and 
`writeToHistoryServer`? That would help distinguishing these two methods.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> archiveExecutionGraph(

Review Comment:
   Same as mentioned above, already: `storeExecutionGraphInfo` and 
`archiveExecutionGraph` are too generic in my opinion.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final Configuration configuration = new Configuration();
+        
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+        final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+        try {
+            final TestingHistoryServerArchivist archivist =
+                    new TestingHistoryServerArchivist(ioExecutor, 50L);
+            final TestingJobMasterServiceLeadershipRunnerFactory 
testingJobManagerRunnerFactory =
+                    new TestingJobMasterServiceLeadershipRunnerFactory(0);
+            final TestingDispatcher.Builder testingDispatcherBuilder =
+                    createTestingDispatcherBuilder()
+                            .setHistoryServerArchivist(archivist)
+                            .setConfiguration(configuration);
+            startDispatcher(testingDispatcherBuilder, 
testingJobManagerRunnerFactory);
+
+            submitJobAndWait();
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+            finishJob(testingJobManagerRunner);
+
+            globalCleanupFuture.join();
+            dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+            assertTrue(archivist.isArchived());
+        } finally {
+            ioExecutor.shutdownNow();
+        }
+    }
+
+    @Test(timeout = 5000L)

Review Comment:
   As mentioned above: We tried to avoid timeouts in the tests...



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)

Review Comment:
   We don't want to use timeouts in JUnit. This would enable us to capture 
timeouts in the CI pipeline which would print the thread dump at the end giving 
us some hints where the process halted.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {

Review Comment:
   Could we try to merge the test cases to remove redundant code? They look 
quite alike except for the part where we specify the final state of the job...



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,17 +1238,22 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
                 getMainThreadExecutor());
     }
 
-    CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+    private CompletableFuture<Void> getTerminatedJobTerminationFuture(JobID 
jobId) {

Review Comment:
   hm, here I'm not certain about the naming. It sounds odd. But I cannot come 
up with something else. The only thing I have in mind is 
`getJobTerminationFutureOrFailedFutureForRunningJob`. But I'm not happy about 
that one, either 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> archiveExecutionGraph(
+            ExecutionGraphInfo executionGraphInfo) {
 
         // do not create an archive for suspended jobs, as this would 
eventually lead to multiple
         // archive attempts which we currently do not support
-        if 
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
 {
-            final CompletableFuture<Acknowledge> executionGraphFuture =
-                    
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-            executionGraphFuture.whenComplete(
-                    (Acknowledge ignored, Throwable throwable) -> {
-                        if (throwable != null) {
-                            log.info(
-                                    "Could not archive completed job {}({}) to 
the history server.",
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                    throwable);
-                        }
-                    });
+        final CompletableFuture<Acknowledge> executionGraphFuture =
+                FutureUtils.orTimeout(
+                        
historyServerArchivist.archiveExecutionGraph(executionGraphInfo),
+                        
configuration.get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT),
+                        TimeUnit.MILLISECONDS,
+                        getMainThreadExecutor());
+
+        return executionGraphFuture.handle(
+                (Acknowledge ignored, Throwable throwable) -> {
+                    if (throwable != null) {
+                        log.info(
+                                "Could not archive completed job {}({}) to the 
history server.",
+                                
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                throwable);
+                    }
+                    return Acknowledge.get();
+                });
+    }
+
+    private void registerCleanupInJobResultStore(ExecutionGraphInfo 
executionGraphInfo) {

Review Comment:
   The naming might be a bit misleading: `registerCleanupInJobResultStore` is 
not 100% clear whether the cleanup already happened or not. What about 
`registerGloballyTerminatedJobInJobResultStore`. The log messages in this 
method are just not formulated in that way, because the globally-terminal state 
is some kind of internal concept which the user might not be aware of.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final Configuration configuration = new Configuration();
+        
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+        final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+        try {
+            final TestingHistoryServerArchivist archivist =
+                    new TestingHistoryServerArchivist(ioExecutor, 50L);
+            final TestingJobMasterServiceLeadershipRunnerFactory 
testingJobManagerRunnerFactory =
+                    new TestingJobMasterServiceLeadershipRunnerFactory(0);
+            final TestingDispatcher.Builder testingDispatcherBuilder =
+                    createTestingDispatcherBuilder()
+                            .setHistoryServerArchivist(archivist)
+                            .setConfiguration(configuration);
+            startDispatcher(testingDispatcherBuilder, 
testingJobManagerRunnerFactory);
+
+            submitJobAndWait();
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+            finishJob(testingJobManagerRunner);
+
+            globalCleanupFuture.join();
+            dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+            assertTrue(archivist.isArchived());
+        } finally {
+            ioExecutor.shutdownNow();
+        }
+    }
+
+    @Test(timeout = 5000L)
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+        final Configuration configuration = new Configuration();
+        
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 10L);
+
+        final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+        try {
+            final TestingHistoryServerArchivist archivist =
+                    new TestingHistoryServerArchivist(ioExecutor, 1000L);
+            final TestingJobMasterServiceLeadershipRunnerFactory 
testingJobManagerRunnerFactory =
+                    new TestingJobMasterServiceLeadershipRunnerFactory(0);
+            final TestingDispatcher.Builder testingDispatcherBuilder =
+                    createTestingDispatcherBuilder()
+                            .setHistoryServerArchivist(archivist)
+                            .setConfiguration(configuration);
+            startDispatcher(testingDispatcherBuilder, 
testingJobManagerRunnerFactory);
+
+            submitJobAndWait();
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+            finishJob(testingJobManagerRunner);
+
+            globalCleanupFuture.join();
+            dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+            assertFalse(archivist.isArchived());
+        } finally {
+            ioExecutor.shutdownNow();
+        }
+    }
+
+    @Test(timeout = 5000L)

Review Comment:
   As mentioned above: We tried to avoid timeouts in the tests...



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java:
##########
@@ -129,10 +129,12 @@ public MiniDispatcher(
     }
 
     @Override
-    protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo 
executionGraphInfo) {
+    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
+            ExecutionGraphInfo executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
                 executionGraphInfo.getArchivedExecutionGraph();
-        final CleanupJobState cleanupHAState = 
super.jobReachedTerminalState(executionGraphInfo);
+        final CompletableFuture<CleanupJobState> cleanupHAState =

Review Comment:
   we should wait for the `jobReachedTerminalState` to complete before 
completing the shutdownFuture



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up 
was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            
executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the 
JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would 
eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraph(executionGraphInfo);
+
+            registerCleanupInJobResultStore(executionGraphInfo);

Review Comment:
   I think, we should chain the registration in the JobResultStore using a 
`whenComplete` (it still should be done in even if the `archiveExecutionGraph` 
method fails)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> archiveExecutionGraph(
+            ExecutionGraphInfo executionGraphInfo) {
 
         // do not create an archive for suspended jobs, as this would 
eventually lead to multiple
         // archive attempts which we currently do not support

Review Comment:
   This comment is obsolete here. The method itself does not require this 
invariant. It's more sufficient at the code location where this method is 
called.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -762,4 +870,40 @@ public JobManagerRunner createJobManagerRunner(
             throw testException;
         }
     }
+
+    private class TestingHistoryServerArchivist implements 
HistoryServerArchivist {
+
+        private final Executor ioExecutor;
+        private final long sleepMills;
+
+        private boolean archived;
+
+        public TestingHistoryServerArchivist(Executor ioExecutor, long 
sleepMills) {
+            this.ioExecutor = ioExecutor;
+            this.sleepMills = sleepMills;
+            this.archived = false;
+        }
+
+        public boolean isArchived() {
+            return archived;
+        }
+
+        @Override
+        public CompletableFuture<Acknowledge> archiveExecutionGraph(
+                ExecutionGraphInfo executionGraphInfo) {
+            return CompletableFuture.runAsync(

Review Comment:
   You could pass a CompletableFuture as a parameter to this implementation and 
make it complete in the test code. This way, we wouldn't have to use sleep 
here...



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> archiveExecutionGraph(
+            ExecutionGraphInfo executionGraphInfo) {
 
         // do not create an archive for suspended jobs, as this would 
eventually lead to multiple
         // archive attempts which we currently do not support
-        if 
(executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState())
 {
-            final CompletableFuture<Acknowledge> executionGraphFuture =
-                    
historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-            executionGraphFuture.whenComplete(
-                    (Acknowledge ignored, Throwable throwable) -> {
-                        if (throwable != null) {
-                            log.info(
-                                    "Could not archive completed job {}({}) to 
the history server.",
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                    
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                    throwable);
-                        }
-                    });
+        final CompletableFuture<Acknowledge> executionGraphFuture =
+                FutureUtils.orTimeout(
+                        
historyServerArchivist.archiveExecutionGraph(executionGraphInfo),
+                        
configuration.get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT),
+                        TimeUnit.MILLISECONDS,
+                        getMainThreadExecutor());
+
+        return executionGraphFuture.handle(
+                (Acknowledge ignored, Throwable throwable) -> {
+                    if (throwable != null) {
+                        log.info(
+                                "Could not archive completed job {}({}) to the 
history server.",
+                                
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                throwable);
+                    }
+                    return Acknowledge.get();
+                });
+    }
+
+    private void registerCleanupInJobResultStore(ExecutionGraphInfo 
executionGraphInfo) {

Review Comment:
   I think it would be good to add a Precondition here as well to make sure 
that the ExecutionGraphInfo refers to a globally-terminal state to support the 
log messages used in this method. WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws 
Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final Configuration configuration = new Configuration();
+        
configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+        final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+        try {
+            final TestingHistoryServerArchivist archivist =
+                    new TestingHistoryServerArchivist(ioExecutor, 50L);
+            final TestingJobMasterServiceLeadershipRunnerFactory 
testingJobManagerRunnerFactory =
+                    new TestingJobMasterServiceLeadershipRunnerFactory(0);
+            final TestingDispatcher.Builder testingDispatcherBuilder =
+                    createTestingDispatcherBuilder()
+                            .setHistoryServerArchivist(archivist)
+                            .setConfiguration(configuration);
+            startDispatcher(testingDispatcherBuilder, 
testingJobManagerRunnerFactory);
+
+            submitJobAndWait();
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+            finishJob(testingJobManagerRunner);
+
+            globalCleanupFuture.join();
+            dispatcher.getJobTerminationFuture(jobId, 
Time.milliseconds(1000L)).join();
+
+            assertTrue(archivist.isArchived());

Review Comment:
   If we use a CompletableFuture instead of sleep, we would check, that the 
cleanup wasn't triggered and the job didn't terminate before completing that 
future here. Then, completing the future should result in the cleanup being 
triggered and job finally terminating...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to