[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling
dmvk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1186661023 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -473,26 +500,50 @@ public CompletableFuture cancel(Time timeout) { @Override public CompletableFuture updateTaskExecutionState( final TaskExecutionState taskExecutionState) { -FlinkException taskExecutionException; +checkNotNull(taskExecutionState, "taskExecutionState"); +// Use the main/caller thread for all updates to make sure they are processed in order. +// (MainThreadExecutor i.e., the akka thread pool does not guarantee that) +// Only detach for a FAILED state update that is terminal and may perform io heavy labeling. +if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { +return labelFailure(taskExecutionState) +.thenApplyAsync( +taskStateWithLabels -> { +try { +return doUpdateTaskExecutionState(taskStateWithLabels); +} catch (FlinkException e) { +throw new CompletionException(e); +} +}, +getMainThreadExecutor()); +} try { -checkNotNull(taskExecutionState, "taskExecutionState"); +return CompletableFuture.completedFuture( +doUpdateTaskExecutionState(taskExecutionState)); +} catch (FlinkException e) { +return FutureUtils.completedExceptionally(e); +} +} +private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState taskExecutionState) +throws FlinkException { +@Nullable FlinkException taskExecutionException; +try { if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review Comment: > Since the failure labeling is not a critical process This is something that we've also considered but it is just pushing the problem far down the road. There has been a consensus that people want to use this for implementing custom restart strategies (e.g., you can decide based on the global context of your infrastructure). This has been intentionally pushed out of the Flip, but it will be a natural follow-up that we shouldn't prevent because too many people have asked for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling
dmvk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1186649483 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -473,26 +500,50 @@ public CompletableFuture cancel(Time timeout) { @Override public CompletableFuture updateTaskExecutionState( final TaskExecutionState taskExecutionState) { -FlinkException taskExecutionException; +checkNotNull(taskExecutionState, "taskExecutionState"); +// Use the main/caller thread for all updates to make sure they are processed in order. +// (MainThreadExecutor i.e., the akka thread pool does not guarantee that) +// Only detach for a FAILED state update that is terminal and may perform io heavy labeling. +if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { +return labelFailure(taskExecutionState) +.thenApplyAsync( +taskStateWithLabels -> { +try { +return doUpdateTaskExecutionState(taskStateWithLabels); +} catch (FlinkException e) { +throw new CompletionException(e); +} +}, +getMainThreadExecutor()); +} try { -checkNotNull(taskExecutionState, "taskExecutionState"); +return CompletableFuture.completedFuture( +doUpdateTaskExecutionState(taskExecutionState)); +} catch (FlinkException e) { +return FutureUtils.completedExceptionally(e); +} +} +private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState taskExecutionState) +throws FlinkException { +@Nullable FlinkException taskExecutionException; +try { if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review Comment: Also, the `JobMaster#updateTaskExecutionState` code path was scheduler agnostic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling
dmvk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1186649257 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -473,26 +500,50 @@ public CompletableFuture cancel(Time timeout) { @Override public CompletableFuture updateTaskExecutionState( final TaskExecutionState taskExecutionState) { -FlinkException taskExecutionException; +checkNotNull(taskExecutionState, "taskExecutionState"); +// Use the main/caller thread for all updates to make sure they are processed in order. +// (MainThreadExecutor i.e., the akka thread pool does not guarantee that) +// Only detach for a FAILED state update that is terminal and may perform io heavy labeling. +if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { +return labelFailure(taskExecutionState) +.thenApplyAsync( +taskStateWithLabels -> { +try { +return doUpdateTaskExecutionState(taskStateWithLabels); +} catch (FlinkException e) { +throw new CompletionException(e); +} +}, +getMainThreadExecutor()); +} try { -checkNotNull(taskExecutionState, "taskExecutionState"); +return CompletableFuture.completedFuture( +doUpdateTaskExecutionState(taskExecutionState)); +} catch (FlinkException e) { +return FutureUtils.completedExceptionally(e); +} +} +private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState taskExecutionState) +throws FlinkException { +@Nullable FlinkException taskExecutionException; +try { if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review Comment: The intuition behind choosing `JobMaster#updateTaskExecutionState` over `SchedulerNG#updateTaskExecutionState` was mixing asynchronous and synchronous code paths. Since `JobMaster#updateTaskExecutionState` already returns CompletableFuture, it was easier to integrate with and to reason about ordering guarantees. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling
dmvk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1186649257 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -473,26 +500,50 @@ public CompletableFuture cancel(Time timeout) { @Override public CompletableFuture updateTaskExecutionState( final TaskExecutionState taskExecutionState) { -FlinkException taskExecutionException; +checkNotNull(taskExecutionState, "taskExecutionState"); +// Use the main/caller thread for all updates to make sure they are processed in order. +// (MainThreadExecutor i.e., the akka thread pool does not guarantee that) +// Only detach for a FAILED state update that is terminal and may perform io heavy labeling. +if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { +return labelFailure(taskExecutionState) +.thenApplyAsync( +taskStateWithLabels -> { +try { +return doUpdateTaskExecutionState(taskStateWithLabels); +} catch (FlinkException e) { +throw new CompletionException(e); +} +}, +getMainThreadExecutor()); +} try { -checkNotNull(taskExecutionState, "taskExecutionState"); +return CompletableFuture.completedFuture( +doUpdateTaskExecutionState(taskExecutionState)); +} catch (FlinkException e) { +return FutureUtils.completedExceptionally(e); +} +} +private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState taskExecutionState) +throws FlinkException { +@Nullable FlinkException taskExecutionException; +try { if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review Comment: The intuition behind choosing JobMaster#updateTaskExecutionState over SchedulerNG#updateTaskExecutionState was mixing asynchronous and synchronous code paths. Since JobMaster#updateTaskExecutionState already returns CompletableFuture, it was easier to integrate with and to reason about ordering guarantees. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling
dmvk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1185085549 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java: ## @@ -778,7 +778,7 @@ private static PartitionInfo createFinishedPartitionInfo( */ @Override public void fail(Throwable t) { -processFail(t, true); +processFail(t, true, Collections.emptyMap()); Review Comment: Would this be enriched once the https://github.com/apache/flink/pull/22506#discussion_r1185081996 is addressed, since `fromSchedulerNg` will be set to `false`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling
dmvk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1185081996 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -473,26 +500,50 @@ public CompletableFuture cancel(Time timeout) { @Override public CompletableFuture updateTaskExecutionState( final TaskExecutionState taskExecutionState) { -FlinkException taskExecutionException; +checkNotNull(taskExecutionState, "taskExecutionState"); +// Use the main/caller thread for all updates to make sure they are processed in order. +// (MainThreadExecutor i.e., the akka thread pool does not guarantee that) +// Only detach for a FAILED state update that is terminal and may perform io heavy labeling. +if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { +return labelFailure(taskExecutionState) +.thenApplyAsync( +taskStateWithLabels -> { +try { +return doUpdateTaskExecutionState(taskStateWithLabels); +} catch (FlinkException e) { +throw new CompletionException(e); +} +}, +getMainThreadExecutor()); +} try { -checkNotNull(taskExecutionState, "taskExecutionState"); +return CompletableFuture.completedFuture( +doUpdateTaskExecutionState(taskExecutionState)); +} catch (FlinkException e) { +return FutureUtils.completedExceptionally(e); +} +} +private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState taskExecutionState) +throws FlinkException { +@Nullable FlinkException taskExecutionException; +try { if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review Comment: That's a good catch; the most straightforward way to address this would be to change the listener to go through JobMaster#updateTaskExecutionState instead so we have a shared (semi-asynchronous) code path for all execution state transitions. `ExecutionDeploymentReconciliationHandler#onMissingDeploymentsOf` seems to have the same problem (and the fix could most likely be along the same lines). WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling
dmvk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1182229885 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java: ## @@ -74,6 +83,11 @@ public ErrorInfo(@Nonnull Throwable exception, long timestamp) { ? (SerializedThrowable) exception : new SerializedThrowable(exception); this.timestamp = timestamp; +this.labels = labels; Review Comment: nit ```suggestion this.labels = Preconditions.checkNotNull(labels); ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java: ## @@ -107,18 +113,23 @@ public static RootExceptionHistoryEntry fromExceptionHistoryEntry( public static RootExceptionHistoryEntry fromGlobalFailure(ErrorInfo errorInfo) { Preconditions.checkNotNull(errorInfo, "errorInfo"); return fromGlobalFailure( -errorInfo.getException(), errorInfo.getTimestamp(), Collections.emptyList()); +errorInfo.getException(), +errorInfo.getTimestamp(), +errorInfo.getLabels(), +Collections.emptyList()); } private static RootExceptionHistoryEntry createRootExceptionHistoryEntry( Throwable cause, long timestamp, +Map labels, Review Comment: the use of nullable seems inconsistent ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java: ## @@ -1132,6 +1159,16 @@ private void runRequestNextInputSplitTest( expectedRemainingInputSplits .apply(inputSplitsPerTask) .toArray(EMPTY_TESTING_INPUT_SPLITS)); + +// Make sure FailureEnrichers are triggered for the above failure +assertThat(testingEnricher.seenThrowable.stream().map(t -> t.getMessage())) Review Comment: Why is the failure enrichment tested in `testRequestNextInputSplit`? We should have a dedicated test for new features/regressions. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java: ## @@ -66,6 +70,11 @@ public static Throwable handleMissingThrowable(@Nullable Throwable throwable) { } public ErrorInfo(@Nonnull Throwable exception, long timestamp) { +this(exception, timestamp, null); +} + +public ErrorInfo( +@Nonnull Throwable exception, long timestamp, @Nullable Map labels) { Review Comment: The use of nullable seems to be inconsistent in this class. Would it make sense to use an empty map instead of nulls? ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java: ## @@ -87,6 +107,18 @@ protected boolean matchesSafely( match = false; } +if (exceptionHistoryEntry.getLabels() == null) { +if (expectedLabels != null) { +description.appendText(" actualLabels=null"); +match = false; +} +} else if (exceptionHistoryEntry.getLabels().equals(expectedLabels)) { +description +.appendText(" actualLabels=") + .appendText(String.valueOf(exceptionHistoryEntry.getLabels())); +match = false; Review Comment: I'm not able to wrap my head around this. How come we don't have a match if the actual labels are equal to what is expected? ## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java: ## @@ -139,6 +144,11 @@ public JobMasterBuilder withFatalErrorHandler(FatalErrorHandler fatalErrorHandle return this; } +public JobMasterBuilder withFailureEnriches(Collection failureEnrichers) { Review Comment: ```suggestion public JobMasterBuilder withFailureEnrichers(Collection failureEnrichers) { ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java: ## @@ -122,6 +123,7 @@ private JobMasterService internalCreateJobMasterService( DefaultExecutionDeploymentReconciler::new, BlocklistUtils.loadBlocklistHandlerFactory( jobMasterConfiguration.getConfiguration()), +Collections.emptySet(), Review Comment: Are we missing something here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org