This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d18d1f4 [FLINK-24303][coordination] Failure when creating a source enumerator lead to full failover, not JobManager failure. d18d1f4 is described below commit d18d1f4b7acd683dfdfc8a5218892c17b42b6701 Author: Stephan Ewen <se...@apache.org> AuthorDate: Mon Sep 20 02:59:02 2021 +0200 [FLINK-24303][coordination] Failure when creating a source enumerator lead to full failover, not JobManager failure. Instead of letting exceptions during the creation of the Source Enumerator bubble up (and utimately fail the JobManager / Scheduler creation), we now catch those exceptions and trigger a full (global) failover for that case. This closes #17324 --- .../source/reader/CoordinatedSourceITCase.java | 164 +++++++++++++++++++++ .../source/coordinator/SourceCoordinator.java | 20 ++- .../source/coordinator/SourceCoordinatorTest.java | 20 +++ 3 files changed, 201 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java index 158ec94..4c23973 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java @@ -20,17 +20,26 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.common.accumulators.ListAccumulator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.FlinkRuntimeException; import org.junit.Test; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.Collections; import java.util.List; @@ -60,6 +69,34 @@ public class CoordinatedSourceITCase extends AbstractTestBase { executeAndVerify(env, stream1.union(stream2), 40); } + @Test + public void testEnumeratorCreationFails() throws Exception { + OnceFailingToCreateEnumeratorSource.reset(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L)); + final Source<Integer, ?, ?> source = + new OnceFailingToCreateEnumeratorSource(2, 10, Boundedness.BOUNDED); + final DataStream<Integer> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "TestingSource"); + executeAndVerify(env, stream, 20); + } + + @Test + public void testEnumeratorRestoreFails() throws Exception { + OnceFailingToRestoreEnumeratorSource.reset(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L)); + env.enableCheckpointing(10); + + final Source<Integer, ?, ?> source = + new OnceFailingToRestoreEnumeratorSource(2, 10, Boundedness.BOUNDED); + final DataStream<Integer> stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "TestingSource"); + executeAndVerify(env, stream, 20); + } + @SuppressWarnings("serial") private void executeAndVerify( StreamExecutionEnvironment env, DataStream<Integer> stream, int numRecords) @@ -83,4 +120,131 @@ public class CoordinatedSourceITCase extends AbstractTestBase { assertEquals(0, (int) result.get(0)); assertEquals(numRecords - 1, (int) result.get(result.size() - 1)); } + + // ------------------------------------------------------------------------ + + private static class OnceFailingToCreateEnumeratorSource extends MockBaseSource { + + private static final long serialVersionUID = 1L; + private static boolean hasFailed; + + OnceFailingToCreateEnumeratorSource( + int numSplits, int numRecordsPerSplit, Boundedness boundedness) { + super(numSplits, numRecordsPerSplit, boundedness); + } + + @Override + public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator( + SplitEnumeratorContext<MockSourceSplit> enumContext) { + if (!hasFailed) { + hasFailed = true; + throw new FlinkRuntimeException("Test Failure"); + } + + return super.createEnumerator(enumContext); + } + + static void reset() { + hasFailed = false; + } + } + + /** + * A source with the following behavior: + * + * <ol> + * <li>It initially creates an enumerator that does not assign work, waits until the first + * checkpoint completes (which contains all work, because none is assigned, yet) and then + * triggers a global failure. + * <li>Upon restoring from the failure, the first attempt to restore the enumerator fails with + * an exception. + * <li>The next time to restore the enumerator succeeds and the enumerator works regularly. + * </ol> + */ + private static class OnceFailingToRestoreEnumeratorSource extends MockBaseSource { + + private static final long serialVersionUID = 1L; + private static boolean hasFailed; + + OnceFailingToRestoreEnumeratorSource( + int numSplits, int numRecordsPerSplit, Boundedness boundedness) { + super(numSplits, numRecordsPerSplit, boundedness); + } + + @Override + public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator( + SplitEnumeratorContext<MockSourceSplit> enumContext) { + + final SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> enumerator = + super.createEnumerator(enumContext); + + if (hasFailed) { + // after the failure happened, we proceed normally + return enumerator; + } else { + // before the failure, we go with + try { + final List<MockSourceSplit> splits = enumerator.snapshotState(1L); + return new NonAssigningEnumerator(splits, enumContext); + } catch (Exception e) { + throw new FlinkRuntimeException(e.getMessage(), e); + } + } + } + + @Override + public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> restoreEnumerator( + SplitEnumeratorContext<MockSourceSplit> enumContext, + List<MockSourceSplit> checkpoint) + throws IOException { + if (!hasFailed) { + hasFailed = true; + throw new FlinkRuntimeException("Test Failure"); + } + + return super.restoreEnumerator(enumContext, checkpoint); + } + + static void reset() { + hasFailed = false; + } + + /** + * This enumerator does not assign work, so all state is in the checkpoint. After the first + * checkpoint is complete, it triggers a global failure. + */ + private static class NonAssigningEnumerator extends MockSplitEnumerator { + + private final SplitEnumeratorContext<?> context; + + NonAssigningEnumerator( + List<MockSourceSplit> splits, SplitEnumeratorContext<MockSourceSplit> context) { + super(splits, context); + this.context = context; + } + + @Override + public void addReader(int subtaskId) { + // we do nothing here to make sure there is no progress + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + // we do nothing here to make sure there is no progress + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + // This is a bit of a clumsy way to trigger a global failover from a coordinator. + // This is safe, though, because per the contract, exceptions in the enumerator + // handlers trigger a global failover. + context.callAsync( + () -> null, + (success, failure) -> { + throw new FlinkRuntimeException( + "Artificial trigger for Global Failover"); + }); + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index add85bb..5ba4160 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -111,6 +111,10 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> public void start() throws Exception { LOG.info("Starting split enumerator for source {}.", operatorName); + // we mark this as started first, so that we can later distinguish the cases where + // 'start()' wasn't called and where 'start()' failed. + started = true; + // there are two ways the coordinator can get created: // (1) Source.restoreEnumerator(), in which case the 'resetToCheckpoint()' method creates // it @@ -122,13 +126,17 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader)) { enumerator = source.createEnumerator(context); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + LOG.error("Failed to create Source Enumerator for source {}", operatorName, t); + context.failJob(t); + return; } } // The start sequence is the first task in the coordinator executor. // We rely on the single-threaded coordinator executor to guarantee // the other methods are invoked after the enumerator has started. - started = true; runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator."); } @@ -309,6 +317,14 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> final Object... actionNameFormatParameters) { ensureStarted(); + + // we may end up here even for a non-started enumerator, in case the instantiation + // failed, and we get the 'subtaskFailed()' notification during the failover. + // we need to ignore those. + if (enumerator == null) { + return; + } + coordinatorExecutor.execute( () -> { try { @@ -410,7 +426,5 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> if (!started) { throw new IllegalStateException("The coordinator has not started yet."); } - - assert enumerator != null; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index 1642b59..f01143e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -256,6 +256,26 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { } @Test + public void testFailJobWhenExceptionThrownFromEnumeratorCreation() throws Exception { + final RuntimeException failureReason = new RuntimeException("Artificial Exception"); + + final SourceCoordinator<?, ?> coordinator = + new SourceCoordinator<>( + OPERATOR_NAME, + coordinatorExecutor, + new EnumeratorCreatingSource<>( + () -> { + throw failureReason; + }), + context); + + coordinator.start(); + + assertTrue(operatorCoordinatorContext.isJobFailed()); + assertEquals(failureReason, operatorCoordinatorContext.getJobFailureReason()); + } + + @Test public void testErrorThrownFromSplitEnumerator() throws Exception { final Error error = new Error("Test Error"); try (final MockSplitEnumeratorContext<MockSourceSplit> enumeratorContext =