dawidwys commented on code in PR #23675:
URL: https://github.com/apache/flink/pull/23675#discussion_r1386539479
##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/LimitTestPrograms.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecLimit}.
*/
+public class LimitTestPrograms {
+
+static final Row[] DATA =
+new Row[] {
+Row.of(2, "a", 6),
+Row.of(4, "b", 8),
+Row.of(6, "c", 10),
+Row.of(1, "a", 5),
+Row.of(3, "b", 7),
+Row.of(5, "c", 9)
+};
+static final TableTestProgram LIMIT =
+TableTestProgram.of("limit", "validates limit node")
+.setupTableSource(
+SourceTestStep.newBuilder("source_t")
+.addSchema("a INT", "b VARCHAR", "c INT")
+.producedBeforeRestore(DATA)
+.producedAfterRestore(DATA)
+.build())
+.setupTableSink(
+SinkTestStep.newBuilder("sink_t")
+.addSchema("a INT", "b VARCHAR", "c
BIGINT")
+.addOption("sink-insert-only", "false")
Review Comment:
This is set by the `RestoreTestBase`
##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java:
##
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.Before;
-import org.junit.Test;
-
-/** Test json serialization for sort limit. */
-public class LimitJsonPlanTest extends TableTestBase {
-
-private StreamTableTestUtil util;
-private TableEnvironment tEnv;
-
-@Before
-public void setup() {
-util = streamTestUtil(TableConfig.getDefault());
-tEnv = util.getTableEnv();
-
-String srcTableDdl =
-"CREATE TABLE MyTable (\n"
-+ " a bigint,\n"
-+ " b int not null,\n"
-+ " c varchar,\n"
-+ " d timestamp(3)\n"
-+ ") with (\n"
-+ " 'connector' = 'values',\n"
-+ " 'bounded' = 'false')";
-tEnv.executeSql(srcTableDdl);
-}
-
-@Test
-public void testLimit() {
-String sinkTableDdl =
-"CREATE TABLE MySink (\n"
-+ " a bigint,\n"
-+ " b bigint\n"
-