This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 10bff3dbad1 [FLINK-34546] Emit span with failure labels on failure in AdaptiveScheduler. (#24498) 10bff3dbad1 is described below commit 10bff3dbad103b60915be817a3408820ed09b6cf Author: Stefan Richter <srich...@apache.org> AuthorDate: Fri Mar 15 09:36:43 2024 +0100 [FLINK-34546] Emit span with failure labels on failure in AdaptiveScheduler. (#24498) --- .../failover/ExecutionFailureHandler.java | 32 ++------- .../scheduler/adaptive/AdaptiveScheduler.java | 22 +++++- .../runtime/scheduler/adaptive/Canceling.java | 4 +- .../runtime/scheduler/adaptive/Executing.java | 10 ++- .../flink/runtime/scheduler/adaptive/Failing.java | 4 +- .../adaptive/JobFailureMetricReporter.java | 84 ++++++++++++++++++++++ .../runtime/scheduler/adaptive/Restarting.java | 4 +- .../adaptive/StateWithExecutionGraph.java | 7 +- .../scheduler/adaptive/StopWithSavepoint.java | 9 ++- .../failover/ExecutionFailureHandlerTest.java | 4 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 78 +++++++++++++++++++- .../runtime/scheduler/adaptive/ExecutingTest.java | 3 +- .../adaptive/StateWithExecutionGraphTest.java | 2 +- .../scheduler/adaptive/StopWithSavepointTest.java | 13 +++- 14 files changed, 228 insertions(+), 48 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java index 3d36a9e6bff..94130bc2f5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java @@ -26,13 +26,12 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.failure.FailureEnricherUtils; +import org.apache.flink.runtime.scheduler.adaptive.JobFailureMetricReporter; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.throwable.ThrowableClassifier; import org.apache.flink.runtime.throwable.ThrowableType; -import org.apache.flink.traces.Span; -import org.apache.flink.traces.SpanBuilder; import org.apache.flink.util.IterableUtils; import javax.annotation.Nullable; @@ -70,8 +69,8 @@ public class ExecutionFailureHandler { private final Collection<FailureEnricher> failureEnrichers; private final ComponentMainThreadExecutor mainThreadExecutor; private final MetricGroup metricGroup; - private final boolean reportEventsAsSpans; + private final JobFailureMetricReporter jobFailureMetricReporter; /** * Creates the handler to deal with task failures. @@ -105,6 +104,7 @@ public class ExecutionFailureHandler { this.globalFailureCtx = globalFailureCtx; this.metricGroup = metricGroup; this.reportEventsAsSpans = jobMasterConfig.get(TraceOptions.REPORT_EVENTS_AS_SPANS); + this.jobFailureMetricReporter = new JobFailureMetricReporter(metricGroup); } /** @@ -171,35 +171,15 @@ public class ExecutionFailureHandler { failureHandlingResult .getFailureLabels() .thenAcceptAsync( - labels -> reportFailureHandling(failureHandlingResult, labels), + labels -> + jobFailureMetricReporter.reportJobFailure( + failureHandlingResult, labels), mainThreadExecutor); } return failureHandlingResult; } - private void reportFailureHandling( - FailureHandlingResult failureHandlingResult, Map<String, String> failureLabels) { - - // Add base attributes - SpanBuilder spanBuilder = - Span.builder(ExecutionFailureHandler.class, "JobFailure") - .setStartTsMillis(failureHandlingResult.getTimestamp()) - .setEndTsMillis(failureHandlingResult.getTimestamp()) - .setAttribute( - "canRestart", String.valueOf(failureHandlingResult.canRestart())) - .setAttribute( - "isGlobalFailure", - String.valueOf(failureHandlingResult.isGlobalFailure())); - - // Add all failure labels - for (Map.Entry<String, String> entry : failureLabels.entrySet()) { - spanBuilder.setAttribute( - FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), entry.getValue()); - } - metricGroup.addSpan(spanBuilder); - } - private FailureHandlingResult handleFailure( @Nullable final Execution failedExecution, final Throwable cause, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 9fb9e07de0f..56ff9bed45b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.SchedulerExecutionMode; +import org.apache.flink.configuration.TraceOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; @@ -332,6 +333,9 @@ public class AdaptiveScheduler private final JobManagerJobMetricGroup jobManagerJobMetricGroup; + private final JobFailureMetricReporter jobFailureMetricReporter; + private final boolean reportEventsAsSpans; + public AdaptiveScheduler( Settings settings, JobGraph jobGraph, @@ -422,6 +426,9 @@ public class AdaptiveScheduler this.exceptionHistory = new BoundedFIFOQueue<>(configuration.get(WebOptions.MAX_EXCEPTION_HISTORY_SIZE)); this.jobManagerJobMetricGroup = jobManagerJobMetricGroup; + + this.jobFailureMetricReporter = new JobFailureMetricReporter(jobManagerJobMetricGroup); + this.reportEventsAsSpans = configuration.get(TraceOptions.REPORT_EVENTS_AS_SPANS); } private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException { @@ -1316,7 +1323,20 @@ public class AdaptiveScheduler } @Override - public FailureResult howToHandleFailure(Throwable failure) { + public FailureResult howToHandleFailure( + Throwable failure, CompletableFuture<Map<String, String>> failureLabels) { + FailureResult failureResult = howToHandleFailure(failure); + if (reportEventsAsSpans) { + // TODO: replace with reporting as event once events are supported. + // Add reporting as callback for when the failure labeling is completed. + failureLabels.thenAcceptAsync( + (labels) -> jobFailureMetricReporter.reportJobFailure(failureResult, labels), + componentMainThreadExecutor); + } + return failureResult; + } + + private FailureResult howToHandleFailure(Throwable failure) { if (ExecutionFailureHandler.isUnrecoverableError(failure)) { return FailureResult.canNotRestart( new JobException("The failure is not recoverable", failure)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java index de8824b5335..95adb0cf5d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry import org.slf4j.Logger; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; /** State which describes a job which is currently being canceled. */ class Canceling extends StateWithExecutionGraph { @@ -66,7 +68,7 @@ class Canceling extends StateWithExecutionGraph { } @Override - void onFailure(Throwable failure) { + void onFailure(Throwable failure, CompletableFuture<Map<String, String>> failureLabels) { // Execution graph is already cancelling, so there is nothing more we can do. } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index 42f38695362..96d835cdaa2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -43,6 +43,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; @@ -106,8 +107,9 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { } @Override - void onFailure(Throwable cause) { - FailureResultUtil.restartOrFail(context.howToHandleFailure(cause), context, this); + void onFailure(Throwable cause, CompletableFuture<Map<String, String>> failureLabels) { + FailureResultUtil.restartOrFail( + context.howToHandleFailure(cause, failureLabels), context, this); } @Override @@ -255,9 +257,11 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { * Asks how to handle the failure. * * @param failure failure describing the failure cause + * @param failureLabels future of labels from error classification. * @return {@link FailureResult} which describes how to handle the failure */ - FailureResult howToHandleFailure(Throwable failure); + FailureResult howToHandleFailure( + Throwable failure, CompletableFuture<Map<String, String>> failureLabels); /** * Asks if we should rescale the currently executing job. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java index ccd6bae9367..4744fb8feb7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java @@ -29,6 +29,8 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; /** State which describes a failing job which is currently being canceled. */ class Failing extends StateWithExecutionGraph { @@ -71,7 +73,7 @@ class Failing extends StateWithExecutionGraph { } @Override - void onFailure(Throwable failure) { + void onFailure(Throwable failure, CompletableFuture<Map<String, String>> failureLabels) { // We've already failed the execution graph, so there is noting else we can do. } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java new file mode 100644 index 00000000000..fdbd497ecab --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult; +import org.apache.flink.traces.Span; +import org.apache.flink.traces.SpanBuilder; +import org.apache.flink.util.Preconditions; + +import java.util.Map; + +/** Helper class to simplify job failure reporting through a metric group. */ +public class JobFailureMetricReporter { + + public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX = "failureLabel."; + + private final MetricGroup metricGroup; + + public JobFailureMetricReporter(MetricGroup metricGroup) { + this.metricGroup = Preconditions.checkNotNull(metricGroup); + } + + public void reportJobFailure( + FailureHandlingResult failureHandlingResult, Map<String, String> failureLabels) { + reportJobFailure( + failureHandlingResult.getTimestamp(), + failureHandlingResult.canRestart(), + failureHandlingResult.isGlobalFailure(), + failureLabels); + } + + public void reportJobFailure( + FailureResult failureHandlingResult, Map<String, String> failureLabels) { + reportJobFailure( + System.currentTimeMillis(), + failureHandlingResult.canRestart(), + null, + failureLabels); + } + + private void reportJobFailure( + long timestamp, + Boolean canRestart, + Boolean isGlobal, + Map<String, String> failureLabels) { + // Add base attributes + SpanBuilder spanBuilder = + Span.builder(JobFailureMetricReporter.class, "JobFailure") + .setStartTsMillis(timestamp) + .setEndTsMillis(timestamp); + + if (canRestart != null) { + spanBuilder.setAttribute("canRestart", String.valueOf(canRestart)); + } + + if (isGlobal != null) { + spanBuilder.setAttribute("isGlobalFailure", String.valueOf(isGlobal)); + } + + // Add all failure labels + for (Map.Entry<String, String> entry : failureLabels.entrySet()) { + spanBuilder.setAttribute( + FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), entry.getValue()); + } + metricGroup.addSpan(spanBuilder); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index 86e28fe8741..f647967edb4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -31,6 +31,8 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; /** State which describes a job which is currently being restarted. */ @@ -94,7 +96,7 @@ class Restarting extends StateWithExecutionGraph { } @Override - void onFailure(Throwable failure) { + void onFailure(Throwable failure, CompletableFuture<Map<String, String>> failureLabels) { // We've already cancelled the execution graph, so there is noting else we can do. } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java index 4a831669a36..6208de5000a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java @@ -377,7 +377,7 @@ abstract class StateWithExecutionGraph implements State { } /** Transition to different state when failure occurs. Stays in the same state by default. */ - abstract void onFailure(Throwable cause); + abstract void onFailure(Throwable cause, CompletableFuture<Map<String, String>> failureLabels); /** * Transition to different state when the execution graph reaches a globally terminal state. @@ -390,7 +390,7 @@ abstract class StateWithExecutionGraph implements State { public void handleGlobalFailure( Throwable cause, CompletableFuture<Map<String, String>> failureLabels) { failureCollection.add(ExceptionHistoryEntry.createGlobal(cause, failureLabels)); - onFailure(cause); + onFailure(cause, failureLabels); } /** @@ -422,7 +422,8 @@ abstract class StateWithExecutionGraph implements State { ExceptionHistoryEntry.create(execution, taskName, failureLabels)); onFailure( ErrorInfo.handleMissingThrowable( - taskExecutionStateTransition.getError(userCodeClassLoader))); + taskExecutionStateTransition.getError(userCodeClassLoader)), + failureLabels); } } return successfulUpdate; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java index b81b485960a..35d3f76787f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; @@ -209,7 +210,7 @@ class StopWithSavepoint extends StateWithExecutionGraph { } @Override - void onFailure(Throwable cause) { + void onFailure(Throwable cause, CompletableFuture<Map<String, String>> failureLabels) { if (hasPendingStateTransition) { // the error handling remains the same independent of how many tasks have failed // we don't want to initiate the same state transition multiple times, so we exit early @@ -231,7 +232,7 @@ class StopWithSavepoint extends StateWithExecutionGraph { savepoint, getJobId(), cause); operationFailureCause = ex; FailureResultUtil.restartOrFail( - context.howToHandleFailure(ex), context, this); + context.howToHandleFailure(ex, failureLabels), context, this); return null; })); } @@ -279,9 +280,11 @@ class StopWithSavepoint extends StateWithExecutionGraph { * Asks how to handle the failure. * * @param failure failure describing the failure cause + * @param failureLabels future of labels from error classification. * @return {@link FailureResult} which describes how to handle the failure */ - FailureResult howToHandleFailure(Throwable failure); + FailureResult howToHandleFailure( + Throwable failure, CompletableFuture<Map<String, String>> failureLabels); /** * Runs the given action after the specified delay if the state is the expected state at diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java index 3726742ae46..7602944b5f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.adaptive.JobFailureMetricReporter; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; @@ -339,7 +340,8 @@ class ExecutionFailureHandlerTest { private void checkMetrics(List<Span> results, boolean global, boolean canRestart) { assertThat(results).isNotEmpty(); for (Span span : results) { - assertThat(span.getScope()).isEqualTo(ExecutionFailureHandler.class.getCanonicalName()); + assertThat(span.getScope()) + .isEqualTo(JobFailureMetricReporter.class.getCanonicalName()); assertThat(span.getName()).isEqualTo("JobFailure"); Map<String, Object> attributes = span.getAttributes(); assertThat(attributes).containsEntry("failureLabel.failKey", "failValue"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index e2260d1c19c..273495b3975 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.SchedulerExecutionMode; +import org.apache.flink.configuration.TraceOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.core.failure.TestingFailureEnricher; @@ -83,6 +84,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; @@ -106,6 +108,8 @@ import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.traces.Span; +import org.apache.flink.traces.SpanBuilder; import org.apache.flink.util.FlinkException; import org.apache.flink.util.IterableUtils; import org.apache.flink.util.Preconditions; @@ -1372,19 +1376,41 @@ public class AdaptiveSchedulerTest { @Test void testHowToHandleFailureRejectedByStrategy() throws Exception { + final Configuration configuration = new Configuration(); + configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE); + final List<Span> spanCollector = new ArrayList<>(1); + final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup testMetricGroup = + createTestMetricGroup(spanCollector); + final AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE) + .setJobMasterConfiguration(configuration) + .setJobManagerJobMetricGroup(testMetricGroup) .build(); - assertThat(scheduler.howToHandleFailure(new Exception("test")).canRestart()).isFalse(); + assertThat( + scheduler + .howToHandleFailure( + new Exception("test"), createFailureLabelsFuture()) + .canRestart()) + .isFalse(); + + assertThat(spanCollector).isEmpty(); + mainThreadExecutor.trigger(); + checkMetrics(spanCollector, false); } @Test void testHowToHandleFailureAllowedByStrategy() throws Exception { + final Configuration configuration = new Configuration(); + configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE); + final List<Span> spanCollector = new ArrayList<>(1); + final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup testMetricGroup = + createTestMetricGroup(spanCollector); final TestRestartBackoffTimeStrategy restartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 1234); @@ -1394,30 +1420,50 @@ public class AdaptiveSchedulerTest { mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setRestartBackoffTimeStrategy(restartBackoffTimeStrategy) + .setJobMasterConfiguration(configuration) + .setJobManagerJobMetricGroup(testMetricGroup) .build(); - final FailureResult failureResult = scheduler.howToHandleFailure(new Exception("test")); + final FailureResult failureResult = + scheduler.howToHandleFailure(new Exception("test"), createFailureLabelsFuture()); assertThat(failureResult.canRestart()).isTrue(); assertThat(failureResult.getBackoffTime().toMillis()) .isEqualTo(restartBackoffTimeStrategy.getBackoffTime()); + + assertThat(spanCollector).isEmpty(); + mainThreadExecutor.trigger(); + checkMetrics(spanCollector, true); } @Test void testHowToHandleFailureUnrecoverableFailure() throws Exception { + final Configuration configuration = new Configuration(); + configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE); + final List<Span> spanCollector = new ArrayList<>(1); + final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup testMetricGroup = + createTestMetricGroup(spanCollector); + final AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder( createJobGraph(), mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + .setJobMasterConfiguration(configuration) + .setJobManagerJobMetricGroup(testMetricGroup) .build(); assertThat( scheduler .howToHandleFailure( - new SuppressRestartsException(new Exception("test"))) + new SuppressRestartsException(new Exception("test")), + createFailureLabelsFuture()) .canRestart()) .isFalse(); + + assertThat(spanCollector).isEmpty(); + mainThreadExecutor.trigger(); + checkMetrics(spanCollector, false); } @Test @@ -2495,4 +2541,30 @@ public class AdaptiveSchedulerTest { return scheduler.requestJob().getExceptionHistory(); } } + + private static CompletableFuture<Map<String, String>> createFailureLabelsFuture() { + return CompletableFuture.completedFuture(Collections.singletonMap("failKey", "failValue")); + } + + private static UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup + createTestMetricGroup(List<Span> output) { + return new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() { + @Override + public void addSpan(SpanBuilder spanBuilder) { + output.add(spanBuilder.build()); + } + }; + } + + private static void checkMetrics(List<Span> results, boolean canRestart) { + assertThat(results).isNotEmpty(); + for (Span span : results) { + assertThat(span.getScope()) + .isEqualTo(JobFailureMetricReporter.class.getCanonicalName()); + assertThat(span.getName()).isEqualTo("JobFailure"); + Map<String, Object> attributes = span.getAttributes(); + assertThat(attributes).containsEntry("failureLabel.failKey", "failValue"); + assertThat(attributes).containsEntry("canRestart", String.valueOf(canRestart)); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index b892569ef1a..a9ee04f12ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -703,7 +703,8 @@ public class ExecutingTest extends TestLogger { } @Override - public FailureResult howToHandleFailure(Throwable failure) { + public FailureResult howToHandleFailure( + Throwable failure, CompletableFuture<Map<String, String>> failureLabels) { return howToHandleFailure.apply(failure); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java index cd1fd57036e..0a3d598ca36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java @@ -218,7 +218,7 @@ public class StateWithExecutionGraphTest extends TestLogger { } @Override - void onFailure(Throwable cause) {} + void onFailure(Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {} @Override void onGloballyTerminalState(JobStatus globallyTerminalState) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java index f20cdd7e656..3bec0538fd5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java @@ -41,7 +41,9 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -381,7 +383,9 @@ class StopWithSavepointTest { ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO)); - sws.onFailure(new Exception("task failure")); + sws.onFailure( + new Exception("task failure"), + CompletableFuture.completedFuture(Collections.emptyMap())); // this is a sanity check that we haven't scheduled a state transition ctx.triggerExecutors(); @@ -404,7 +408,9 @@ class StopWithSavepointTest { ctx.setHowToHandleFailure(failure -> FailureResult.canRestart(failure, Duration.ZERO)); - sws.onFailure(new Exception("task failure")); + sws.onFailure( + new Exception("task failure"), + CompletableFuture.completedFuture(Collections.emptyMap())); // this is a sanity check that we haven't scheduled a state transition ctx.triggerExecutors(); @@ -538,7 +544,8 @@ class StopWithSavepointTest { } @Override - public FailureResult howToHandleFailure(Throwable failure) { + public FailureResult howToHandleFailure( + Throwable failure, CompletableFuture<Map<String, String>> failureLabels) { return howToHandleFailure.apply(failure); }