Samrat002 commented on code in PR #20990: URL: https://github.com/apache/flink/pull/20990#discussion_r1291657967
########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * <p>To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * <p>To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { Review Comment: are we planning to maintain different classes of Test Bases for future dedicated to different JUnit version? IMO, nomenclature can be changed to `AbstractTestBase` and `JavaProgramTest` ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit5.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.FileUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. This + * saves a significant amount of time, since the startup and shutdown of the Flink clusters + * (including actor systems, etc) usually dominates the execution of the actual tests. + * + * <p>To write a unit test against this test base, simply extend it and add one or more regular test + * methods and retrieve the StreamExecutionEnvironment from the context: + * + * <pre> + * {@literal @}Test + * public void someTest() { + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * {@literal @}Test + * public void anotherTest() { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * </pre> + */ +public abstract class AbstractTestBaseJUnit5 { Review Comment: Curious if `JUnit5` sufix is required ? ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit5.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.FileUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. This + * saves a significant amount of time, since the startup and shutdown of the Flink clusters + * (including actor systems, etc) usually dominates the execution of the actual tests. + * + * <p>To write a unit test against this test base, simply extend it and add one or more regular test + * methods and retrieve the StreamExecutionEnvironment from the context: + * + * <pre> + * {@literal @}Test + * public void someTest() { + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * {@literal @}Test + * public void anotherTest() { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * </pre> + */ +public abstract class AbstractTestBaseJUnit5 { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class); + + private static final int DEFAULT_PARALLELISM = 4; + + @RegisterExtension + public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .build()); + + @TempDir protected File temporaryFolder; + + @AfterEach + public final void cleanupRunningJobs(@InjectClusterClient MiniClusterClient clusterClient) + throws Exception { + if (!MINI_CLUSTER_EXTENSION.isRunning()) { + // do nothing if the MiniCluster is not running Review Comment: NIT: comment can be removed ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; + +/** + * Base class for unit tests that run a single test with object reuse enabled/disabled and against + * collection environments. + * + * <p>To write a unit test against this test base, simply extend it and implement the {@link + * #testProgram()} method. + * + * <p>To skip the execution against collection environments you have to override {@link + * #skipCollectionExecution()}. + */ +public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + + private JobExecutionResult latestExecutionResult; + + /** + * The number of times a test should be repeated. + * + * <p>This is useful for runtime changes, which affect resource management. Running certain + * tests repeatedly might help to discover resource leaks, race conditions etc. + */ + private int numberOfTestRepetitions = 1; + + private boolean isCollectionExecution; + + public void setNumberOfTestRepetitions(int numberOfTestRepetitions) { + this.numberOfTestRepetitions = numberOfTestRepetitions; + } + + public int getParallelism() { + return isCollectionExecution ? 1 : MINI_CLUSTER_EXTENSION.getNumberSlots(); + } + + public JobExecutionResult getLatestExecutionResult() { + return this.latestExecutionResult; + } + + public boolean isCollectionExecution() { + return isCollectionExecution; + } + + // -------------------------------------------------------------------------------------------- + // Methods to create the test program and for pre- and post- test work + // -------------------------------------------------------------------------------------------- + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + protected boolean skipCollectionExecution() { + return false; + } + + // -------------------------------------------------------------------------------------------- + // Test entry point + // -------------------------------------------------------------------------------------------- + + @Test + public void testJobWithObjectReuse() { + isCollectionExecution = false; + + // pre-submit + try { + preSubmit(); + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Pre-submit work caused an error: " + e.getMessage()); + } + + // This only works because the underlying ExecutionEnvironment is a TestEnvironment + // We should fix that we are able to get access to the latest execution result from a + // different + // execution environment and how the object reuse mode is enabled + TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment(); + env.getConfig().enableObjectReuse(); + + // Possibly run the test multiple times + executeProgramMultipleTimes(env); + } + + private void executeProgramMultipleTimes(ExecutionEnvironment env) { + for (int i = 0; i < numberOfTestRepetitions; i++) { + // call the test program Review Comment: NIT: remove not so required comments ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBaseJUnit5.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Base class for unit tests that run multiple tests and want to reuse the same Flink cluster. This + * saves a significant amount of time, since the startup and shutdown of the Flink clusters + * (including actor systems, etc) usually dominates the execution of the actual tests. + * + * <p>To write a unit test against this test base, simply extend it and add one or more regular test + * methods and retrieve the ExecutionEnvironment from the context: + * + * <pre>{@code + * {@literal @}Test + * public void someTest() { + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * {@literal @}Test + * public void anotherTest() { + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * }</pre> + */ +public class MultipleProgramsTestBaseJUnit5 extends AbstractTestBaseJUnit5 { + + /** + * Enum that defines which execution environment to run the next test on: An embedded local + * flink cluster, or the collection execution backend. + */ + public enum TestExecutionMode { Review Comment: Can this enum be pulled out from `MultipleProgramsTestBaseJUnit5` ? Might be useful for future extension or other new test cases for future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org