Samrat002 commented on code in PR #20990:
URL: https://github.com/apache/flink/pull/20990#discussion_r1291657967


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * <p>To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * <p>To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{

Review Comment:
   are we planning to maintain different classes of Test Bases for future 
dedicated to different JUnit version? 
   
   IMO, nomenclature can be changed to `AbstractTestBase` and  `JavaProgramTest`
   
   



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit5.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Base class for unit tests that run multiple tests and want to reuse the 
same Flink cluster. This
+ * saves a significant amount of time, since the startup and shutdown of the 
Flink clusters
+ * (including actor systems, etc) usually dominates the execution of the 
actual tests.
+ *
+ * <p>To write a unit test against this test base, simply extend it and add 
one or more regular test
+ * methods and retrieve the StreamExecutionEnvironment from the context:
+ *
+ * <pre>
+ *   {@literal @}Test
+ *   public void someTest() {
+ *       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * </pre>
+ */
+public abstract class AbstractTestBaseJUnit5 {

Review Comment:
   Curious if `JUnit5` sufix is required ? 
   



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBaseJUnit5.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Base class for unit tests that run multiple tests and want to reuse the 
same Flink cluster. This
+ * saves a significant amount of time, since the startup and shutdown of the 
Flink clusters
+ * (including actor systems, etc) usually dominates the execution of the 
actual tests.
+ *
+ * <p>To write a unit test against this test base, simply extend it and add 
one or more regular test
+ * methods and retrieve the StreamExecutionEnvironment from the context:
+ *
+ * <pre>
+ *   {@literal @}Test
+ *   public void someTest() {
+ *       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * </pre>
+ */
+public abstract class AbstractTestBaseJUnit5 {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTestBase.class);
+
+    private static final int DEFAULT_PARALLELISM = 4;
+
+    @RegisterExtension
+    public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            .build());
+
+    @TempDir protected File temporaryFolder;
+
+    @AfterEach
+    public final void cleanupRunningJobs(@InjectClusterClient 
MiniClusterClient clusterClient)
+            throws Exception {
+        if (!MINI_CLUSTER_EXTENSION.isRunning()) {
+            // do nothing if the MiniCluster is not running

Review Comment:
   NIT: comment can be removed 



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBaseJUnit5.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+/**
+ * Base class for unit tests that run a single test with object reuse 
enabled/disabled and against
+ * collection environments.
+ *
+ * <p>To write a unit test against this test base, simply extend it and 
implement the {@link
+ * #testProgram()} method.
+ *
+ * <p>To skip the execution against collection environments you have to 
override {@link
+ * #skipCollectionExecution()}.
+ */
+public abstract class JavaProgramTestBaseJUnit5 extends AbstractTestBaseJUnit5 
{
+
+    private JobExecutionResult latestExecutionResult;
+
+    /**
+     * The number of times a test should be repeated.
+     *
+     * <p>This is useful for runtime changes, which affect resource 
management. Running certain
+     * tests repeatedly might help to discover resource leaks, race conditions 
etc.
+     */
+    private int numberOfTestRepetitions = 1;
+
+    private boolean isCollectionExecution;
+
+    public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+        this.numberOfTestRepetitions = numberOfTestRepetitions;
+    }
+
+    public int getParallelism() {
+        return isCollectionExecution ? 1 : 
MINI_CLUSTER_EXTENSION.getNumberSlots();
+    }
+
+    public JobExecutionResult getLatestExecutionResult() {
+        return this.latestExecutionResult;
+    }
+
+    public boolean isCollectionExecution() {
+        return isCollectionExecution;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Methods to create the test program and for pre- and post- test work
+    // 
--------------------------------------------------------------------------------------------
+
+    protected abstract void testProgram() throws Exception;
+
+    protected void preSubmit() throws Exception {}
+
+    protected void postSubmit() throws Exception {}
+
+    protected boolean skipCollectionExecution() {
+        return false;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    //  Test entry point
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testJobWithObjectReuse() {
+        isCollectionExecution = false;
+
+        // pre-submit
+        try {
+            preSubmit();
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            e.printStackTrace();
+            fail("Pre-submit work caused an error: " + e.getMessage());
+        }
+
+        // This only works because the underlying ExecutionEnvironment is a 
TestEnvironment
+        // We should fix that we are able to get access to the latest 
execution result from a
+        // different
+        // execution environment and how the object reuse mode is enabled
+        TestEnvironment env = MINI_CLUSTER_EXTENSION.getTestEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        // Possibly run the test multiple times
+        executeProgramMultipleTimes(env);
+    }
+
+    private void executeProgramMultipleTimes(ExecutionEnvironment env) {
+        for (int i = 0; i < numberOfTestRepetitions; i++) {
+            // call the test program

Review Comment:
   NIT: remove not so required comments 



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBaseJUnit5.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Base class for unit tests that run multiple tests and want to reuse the 
same Flink cluster. This
+ * saves a significant amount of time, since the startup and shutdown of the 
Flink clusters
+ * (including actor systems, etc) usually dominates the execution of the 
actual tests.
+ *
+ * <p>To write a unit test against this test base, simply extend it and add 
one or more regular test
+ * methods and retrieve the ExecutionEnvironment from the context:
+ *
+ * <pre>{@code
+ * {@literal @}Test
+ * public void someTest() {
+ *     ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *     // test code
+ *     env.execute();
+ * }
+ *
+ * {@literal @}Test
+ * public void anotherTest() {
+ *     ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *     // test code
+ *     env.execute();
+ * }
+ *
+ * }</pre>
+ */
+public class MultipleProgramsTestBaseJUnit5 extends AbstractTestBaseJUnit5 {
+
+    /**
+     * Enum that defines which execution environment to run the next test on: 
An embedded local
+     * flink cluster, or the collection execution backend.
+     */
+    public enum TestExecutionMode {

Review Comment:
   Can this enum be pulled out from `MultipleProgramsTestBaseJUnit5` ?
    Might be useful for future extension or other new test cases for future. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to