This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 43fec308b32 [FLINK-33599] Run restore tests with RocksDB state backend (#23883) 43fec308b32 is described below commit 43fec308b3298ed2aad639b94140c9a2173c10cd Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Dec 6 16:39:06 2023 +0100 [FLINK-33599] Run restore tests with RocksDB state backend (#23883) --- .../plan/nodes/exec/testutils/RestoreTestBase.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java index 17d7b16391a..a10c5b8202f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.testutils; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.jobgraph.RestoreMode; @@ -85,16 +86,16 @@ import static org.assertj.core.api.Assertions.assertThat; @TestMethodOrder(OrderAnnotation.class) public abstract class RestoreTestBase implements TableTestProgramRunner { - private final Class<? extends ExecNode> execNodeUnderTest; + private final Class<? extends ExecNode<?>> execNodeUnderTest; private final AfterRestoreSource afterRestoreSource; - protected RestoreTestBase(Class<? extends ExecNode> execNodeUnderTest) { + protected RestoreTestBase(Class<? extends ExecNode<?>> execNodeUnderTest) { this.execNodeUnderTest = execNodeUnderTest; this.afterRestoreSource = AfterRestoreSource.FINITE; } protected RestoreTestBase( - Class<? extends ExecNode> execNodeUnderTest, AfterRestoreSource state) { + Class<? extends ExecNode<?>> execNodeUnderTest, AfterRestoreSource state) { this.execNodeUnderTest = execNodeUnderTest; this.afterRestoreSource = state; } @@ -154,8 +155,8 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { TestValuesTableFactory.registerLocalRawResultsObserver( tableName, (integer, strings) -> { - List<String> results = new ArrayList<>(); - results.addAll(sinkTestStep.getExpectedBeforeRestoreAsStrings()); + List<String> results = + new ArrayList<>(sinkTestStep.getExpectedBeforeRestoreAsStrings()); if (!ignoreAfter) { results.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings()); } @@ -177,8 +178,9 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { @MethodSource("supportedPrograms") @Order(0) public void generateTestSetupFiles(TableTestProgram program) throws Exception { - final TableEnvironment tEnv = - TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + final EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); + settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, "rocksdb"); + final TableEnvironment tEnv = TableEnvironment.create(settings); program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv)); tEnv.getConfig() .set( @@ -237,6 +239,7 @@ public abstract class RestoreTestBase implements TableTestProgramRunner { false, RestoreMode.NO_CLAIM); SavepointRestoreSettings.toConfiguration(restoreSettings, settings.getConfiguration()); + settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, "rocksdb"); final TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.getConfig() .set(