[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling

2023-05-06 Thread via GitHub


dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1186661023


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -473,26 +500,50 @@ public CompletableFuture cancel(Time 
timeout) {
 @Override
 public CompletableFuture updateTaskExecutionState(
 final TaskExecutionState taskExecutionState) {
-FlinkException taskExecutionException;
+checkNotNull(taskExecutionState, "taskExecutionState");
+// Use the main/caller thread for all updates to make sure they are 
processed in order.
+// (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+// Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+return labelFailure(taskExecutionState)
+.thenApplyAsync(
+taskStateWithLabels -> {
+try {
+return 
doUpdateTaskExecutionState(taskStateWithLabels);
+} catch (FlinkException e) {
+throw new CompletionException(e);
+}
+},
+getMainThreadExecutor());
+}
 try {
-checkNotNull(taskExecutionState, "taskExecutionState");
+return CompletableFuture.completedFuture(
+doUpdateTaskExecutionState(taskExecutionState));
+} catch (FlinkException e) {
+return FutureUtils.completedExceptionally(e);
+}
+}
 
+private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+throws FlinkException {
+@Nullable FlinkException taskExecutionException;
+try {
 if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   > Since the failure labeling is not a critical process
   
   This is something that we've also considered but it is just pushing the 
problem far down the road. There has been a consensus that people want to use 
this for implementing custom restart strategies (e.g., you can decide based on 
the global context of your infrastructure). This has been intentionally pushed 
out of the Flip, but it will be a natural follow-up that we shouldn't prevent 
because too many people have asked for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling

2023-05-06 Thread via GitHub


dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1186649483


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -473,26 +500,50 @@ public CompletableFuture cancel(Time 
timeout) {
 @Override
 public CompletableFuture updateTaskExecutionState(
 final TaskExecutionState taskExecutionState) {
-FlinkException taskExecutionException;
+checkNotNull(taskExecutionState, "taskExecutionState");
+// Use the main/caller thread for all updates to make sure they are 
processed in order.
+// (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+// Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+return labelFailure(taskExecutionState)
+.thenApplyAsync(
+taskStateWithLabels -> {
+try {
+return 
doUpdateTaskExecutionState(taskStateWithLabels);
+} catch (FlinkException e) {
+throw new CompletionException(e);
+}
+},
+getMainThreadExecutor());
+}
 try {
-checkNotNull(taskExecutionState, "taskExecutionState");
+return CompletableFuture.completedFuture(
+doUpdateTaskExecutionState(taskExecutionState));
+} catch (FlinkException e) {
+return FutureUtils.completedExceptionally(e);
+}
+}
 
+private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+throws FlinkException {
+@Nullable FlinkException taskExecutionException;
+try {
 if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   Also, the `JobMaster#updateTaskExecutionState` code path was scheduler 
agnostic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling

2023-05-06 Thread via GitHub


dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1186649257


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -473,26 +500,50 @@ public CompletableFuture cancel(Time 
timeout) {
 @Override
 public CompletableFuture updateTaskExecutionState(
 final TaskExecutionState taskExecutionState) {
-FlinkException taskExecutionException;
+checkNotNull(taskExecutionState, "taskExecutionState");
+// Use the main/caller thread for all updates to make sure they are 
processed in order.
+// (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+// Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+return labelFailure(taskExecutionState)
+.thenApplyAsync(
+taskStateWithLabels -> {
+try {
+return 
doUpdateTaskExecutionState(taskStateWithLabels);
+} catch (FlinkException e) {
+throw new CompletionException(e);
+}
+},
+getMainThreadExecutor());
+}
 try {
-checkNotNull(taskExecutionState, "taskExecutionState");
+return CompletableFuture.completedFuture(
+doUpdateTaskExecutionState(taskExecutionState));
+} catch (FlinkException e) {
+return FutureUtils.completedExceptionally(e);
+}
+}
 
+private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+throws FlinkException {
+@Nullable FlinkException taskExecutionException;
+try {
 if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   The intuition behind choosing `JobMaster#updateTaskExecutionState` over 
`SchedulerNG#updateTaskExecutionState` was mixing asynchronous and synchronous 
code paths. Since `JobMaster#updateTaskExecutionState` already returns 
CompletableFuture, it was easier to integrate with and to reason about ordering 
guarantees.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling

2023-05-06 Thread via GitHub


dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1186649257


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -473,26 +500,50 @@ public CompletableFuture cancel(Time 
timeout) {
 @Override
 public CompletableFuture updateTaskExecutionState(
 final TaskExecutionState taskExecutionState) {
-FlinkException taskExecutionException;
+checkNotNull(taskExecutionState, "taskExecutionState");
+// Use the main/caller thread for all updates to make sure they are 
processed in order.
+// (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+// Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+return labelFailure(taskExecutionState)
+.thenApplyAsync(
+taskStateWithLabels -> {
+try {
+return 
doUpdateTaskExecutionState(taskStateWithLabels);
+} catch (FlinkException e) {
+throw new CompletionException(e);
+}
+},
+getMainThreadExecutor());
+}
 try {
-checkNotNull(taskExecutionState, "taskExecutionState");
+return CompletableFuture.completedFuture(
+doUpdateTaskExecutionState(taskExecutionState));
+} catch (FlinkException e) {
+return FutureUtils.completedExceptionally(e);
+}
+}
 
+private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+throws FlinkException {
+@Nullable FlinkException taskExecutionException;
+try {
 if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   The intuition behind choosing JobMaster#updateTaskExecutionState over 
SchedulerNG#updateTaskExecutionState was mixing asynchronous and synchronous 
code paths. Since JobMaster#updateTaskExecutionState already returns 
CompletableFuture, it was easier to integrate with and to reason about ordering 
guarantees.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling

2023-05-04 Thread via GitHub


dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1185085549


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##
@@ -778,7 +778,7 @@ private static PartitionInfo createFinishedPartitionInfo(
  */
 @Override
 public void fail(Throwable t) {
-processFail(t, true);
+processFail(t, true, Collections.emptyMap());

Review Comment:
   Would this be enriched once the 
https://github.com/apache/flink/pull/22506#discussion_r1185081996 is addressed, 
since `fromSchedulerNg` will be set to `false`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling

2023-05-04 Thread via GitHub


dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1185081996


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -473,26 +500,50 @@ public CompletableFuture cancel(Time 
timeout) {
 @Override
 public CompletableFuture updateTaskExecutionState(
 final TaskExecutionState taskExecutionState) {
-FlinkException taskExecutionException;
+checkNotNull(taskExecutionState, "taskExecutionState");
+// Use the main/caller thread for all updates to make sure they are 
processed in order.
+// (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+// Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+return labelFailure(taskExecutionState)
+.thenApplyAsync(
+taskStateWithLabels -> {
+try {
+return 
doUpdateTaskExecutionState(taskStateWithLabels);
+} catch (FlinkException e) {
+throw new CompletionException(e);
+}
+},
+getMainThreadExecutor());
+}
 try {
-checkNotNull(taskExecutionState, "taskExecutionState");
+return CompletableFuture.completedFuture(
+doUpdateTaskExecutionState(taskExecutionState));
+} catch (FlinkException e) {
+return FutureUtils.completedExceptionally(e);
+}
+}
 
+private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+throws FlinkException {
+@Nullable FlinkException taskExecutionException;
+try {
 if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   That's a good catch; the most straightforward way to address this would be 
to change the listener to go through JobMaster#updateTaskExecutionState instead 
so we have a shared (semi-asynchronous) code path for all execution state 
transitions.
   
   `ExecutionDeploymentReconciliationHandler#onMissingDeploymentsOf` seems to 
have the same problem (and the fix could most likely be along the same lines).
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce JobMaster per-task failure enrichment/labeling

2023-05-03 Thread via GitHub


dmvk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1182229885


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java:
##
@@ -74,6 +83,11 @@ public ErrorInfo(@Nonnull Throwable exception, long 
timestamp) {
 ? (SerializedThrowable) exception
 : new SerializedThrowable(exception);
 this.timestamp = timestamp;
+this.labels = labels;

Review Comment:
   nit
   ```suggestion
   this.labels = Preconditions.checkNotNull(labels);
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##
@@ -107,18 +113,23 @@ public static RootExceptionHistoryEntry 
fromExceptionHistoryEntry(
 public static RootExceptionHistoryEntry fromGlobalFailure(ErrorInfo 
errorInfo) {
 Preconditions.checkNotNull(errorInfo, "errorInfo");
 return fromGlobalFailure(
-errorInfo.getException(), errorInfo.getTimestamp(), 
Collections.emptyList());
+errorInfo.getException(),
+errorInfo.getTimestamp(),
+errorInfo.getLabels(),
+Collections.emptyList());
 }
 
 private static RootExceptionHistoryEntry createRootExceptionHistoryEntry(
 Throwable cause,
 long timestamp,
+Map labels,

Review Comment:
   the use of nullable seems inconsistent 



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##
@@ -1132,6 +1159,16 @@ private void runRequestNextInputSplitTest(
 expectedRemainingInputSplits
 .apply(inputSplitsPerTask)
 .toArray(EMPTY_TESTING_INPUT_SPLITS));
+
+// Make sure FailureEnrichers are triggered for the above failure
+assertThat(testingEnricher.seenThrowable.stream().map(t -> 
t.getMessage()))

Review Comment:
   Why is the failure enrichment tested in `testRequestNextInputSplit`? We 
should have a dedicated test for new features/regressions.



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java:
##
@@ -66,6 +70,11 @@ public static Throwable handleMissingThrowable(@Nullable 
Throwable throwable) {
 }
 
 public ErrorInfo(@Nonnull Throwable exception, long timestamp) {
+this(exception, timestamp, null);
+}
+
+public ErrorInfo(
+@Nonnull Throwable exception, long timestamp, @Nullable 
Map labels) {

Review Comment:
   The use of nullable seems to be inconsistent in this class. Would it make 
sense to use an empty map instead of nulls?



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java:
##
@@ -87,6 +107,18 @@ protected boolean matchesSafely(
 match = false;
 }
 
+if (exceptionHistoryEntry.getLabels() == null) {
+if (expectedLabels != null) {
+description.appendText(" actualLabels=null");
+match = false;
+}
+} else if (exceptionHistoryEntry.getLabels().equals(expectedLabels)) {
+description
+.appendText(" actualLabels=")
+
.appendText(String.valueOf(exceptionHistoryEntry.getLabels()));
+match = false;

Review Comment:
   I'm not able to wrap my head around this. How come we don't have a match if 
the actual labels are equal to what is expected?



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java:
##
@@ -139,6 +144,11 @@ public JobMasterBuilder 
withFatalErrorHandler(FatalErrorHandler fatalErrorHandle
 return this;
 }
 
+public JobMasterBuilder withFailureEnriches(Collection 
failureEnrichers) {

Review Comment:
   ```suggestion
   public JobMasterBuilder withFailureEnrichers(Collection 
failureEnrichers) {
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java:
##
@@ -122,6 +123,7 @@ private JobMasterService internalCreateJobMasterService(
 DefaultExecutionDeploymentReconciler::new,
 BlocklistUtils.loadBlocklistHandlerFactory(
 jobMasterConfiguration.getConfiguration()),
+Collections.emptySet(),

Review Comment:
   Are we missing something here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org