This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 60cc00e5e6a [FLINK-33667] Implement restore tests for MatchRecognize node (#23821) 60cc00e5e6a is described below commit 60cc00e5e6abc0b7309a48a37e171dae9fa98183 Author: James Hughes <jhug...@confluent.io> AuthorDate: Wed Dec 6 03:49:29 2023 -0500 [FLINK-33667] Implement restore tests for MatchRecognize node (#23821) --- .../exec/stream/MatchRecognizeJsonPlanTest.java | 145 ------- .../exec/stream/MatchRecognizeRestoreTest.java | 46 +++ .../exec/stream/MatchRecognizeTestPrograms.java | 300 ++++++++++++++ .../jsonplan/MatchRecognizeJsonPlanITCase.java | 109 ----- .../match-complex/plan/match-complex.json} | 372 +++++++++-------- .../match-complex/savepoint/_metadata | Bin 0 -> 21075 bytes .../plan/match-order-by-event-time.json} | 455 ++++++++++++-------- .../match-order-by-event-time/savepoint/_metadata | Bin 0 -> 18895 bytes .../plan/match-order-by-int-column.json} | 457 +++++++++++++-------- .../match-order-by-int-column/savepoint/_metadata | Bin 0 -> 18898 bytes .../match-simple/plan/match-simple.json} | 17 +- .../match-simple/savepoint/_metadata | Bin 0 -> 16607 bytes .../plan/match-skip-past-last-row.json} | 185 +++------ .../match-skip-past-last-row/savepoint/_metadata | Bin 0 -> 17991 bytes .../plan/match-skip-to-first.json} | 185 +++------ .../match-skip-to-first/savepoint/_metadata | Bin 0 -> 18082 bytes .../plan/match-skip-to-last.json} | 185 +++------ .../match-skip-to-last/savepoint/_metadata | Bin 0 -> 18079 bytes .../plan/match-skip-to-next-row.json} | 185 +++------ .../match-skip-to-next-row/savepoint/_metadata | Bin 0 -> 18434 bytes 20 files changed, 1388 insertions(+), 1253 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest.java deleted file mode 100644 index c31f300ca4e..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization for match recognize. */ -class MatchRecognizeJsonPlanTest extends TableTestBase { - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - } - - @Test - void testMatch() { - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " id bigint,\n" - + " name varchar,\n" - + " proctime as PROCTIME()\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableDdl); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " b bigint,\n" - + " c bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - - String sql = - "insert into MySink" - + " SELECT T.aid, T.bid, T.cid\n" - + " FROM MyTable MATCH_RECOGNIZE (\n" - + " ORDER BY proctime\n" - + " MEASURES\n" - + " `A\"`.id AS aid,\n" - + " \u006C.id AS bid,\n" - + " C.id AS cid\n" - + " PATTERN (`A\"` \u006C C)\n" - + " DEFINE\n" - + " `A\"` AS name = 'a',\n" - + " \u006C AS name = 'b',\n" - + " C AS name = 'c'\n" - + " ) AS T"; - util.verifyJsonPlan(sql); - } - - @Test - void testSkipToLast() { - doTestAfterMatch("AFTER MATCH SKIP TO LAST B"); - } - - @Test - void testSkipToFirst() { - doTestAfterMatch("AFTER MATCH SKIP TO FIRST B"); - } - - @Test - void testSkipPastLastRow() { - doTestAfterMatch("AFTER MATCH SKIP PAST LAST ROW"); - } - - @Test - void testSkipToNextRow() { - doTestAfterMatch("AFTER MATCH SKIP TO NEXT ROW"); - } - - private void doTestAfterMatch(Object afterClause) { - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " vehicle_id bigint,\n" - + " engine_temperature int,\n" - + " rowtime timestamp_ltz(3)," - + " WATERMARK FOR rowtime AS SOURCE_WATERMARK()\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableDdl); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " vehicle_id bigint,\n" - + " startTime timestamp_ltz(3),\n" - + " endTime timestamp_ltz(3),\n" - + " Initial_Temp int,\n" - + " Final_Temp int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - - String sql = - "insert into MySink" - + " SELECT * FROM\n" - + " MyTable\n" - + " MATCH_RECOGNIZE(\n" - + " PARTITION BY vehicle_id\n" - + " ORDER BY `rowtime`\n" - + " MEASURES \n" - + " FIRST(A.`rowtime`) as startTime,\n" - + " LAST(A.`rowtime`) as endTime,\n" - + " FIRST(A.engine_temperature) as Initial_Temp,\n" - + " LAST(A.engine_temperature) as Final_Temp\n" - + " ONE ROW PER MATCH\n" - + " %s\n" - + " PATTERN (A+ B)\n" - + " DEFINE\n" - + " A as LAST(A.engine_temperature,1) is NULL OR A.engine_temperature > LAST(A.engine_temperature,1),\n" - + " B as B.engine_temperature < LAST(A.engine_temperature)\n" - + " )MR;"; - util.verifyJsonPlan(String.format(sql, afterClause)); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeRestoreTest.java new file mode 100644 index 00000000000..5bfd8d57c46 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeRestoreTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecMatch}. */ +public class MatchRecognizeRestoreTest extends RestoreTestBase { + + public MatchRecognizeRestoreTest() { + super(StreamExecMatch.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + MatchRecognizeTestPrograms.MATCH_SIMPLE, + MatchRecognizeTestPrograms.MATCH_COMPLEX, + MatchRecognizeTestPrograms.MATCH_ORDER_BY_EVENT_TIME, + MatchRecognizeTestPrograms.MATCH_ORDER_BY_INT_COLUMN, + MatchRecognizeTestPrograms.MATCH_SKIP_TO_FIRST, + MatchRecognizeTestPrograms.MATCH_SKIP_TO_LAST, + MatchRecognizeTestPrograms.MATCH_SKIP_TO_NEXT_ROW, + MatchRecognizeTestPrograms.MATCH_SKIP_PAST_LAST_ROW); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java new file mode 100644 index 00000000000..d805b720d53 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeTestPrograms.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecMatch}. */ +public class MatchRecognizeTestPrograms { + static final Row[] SIMPLE_DATA = { + Row.of(1L, "a"), + Row.of(2L, "z"), + Row.of(3L, "b"), + Row.of(4L, "c"), + Row.of(5L, "d"), + Row.of(6L, "a"), + Row.of(7L, "b"), + Row.of(8L, "c"), + Row.of(9L, "a"), + Row.of(10L, "b") + }; + + static final Row[] SIMPLE_DATA2 = {Row.of(11L, "c")}; + + static final Row[] COMPLEX_DATA = { + Row.of("ACME", 1L, 19, 1), + Row.of("BETA", 2L, 18, 1), + Row.of("ACME", 3L, 17, 2), + Row.of("ACME", 4L, 13, 3), + Row.of("BETA", 5L, 16, 2), + Row.of("ACME", 6L, 20, 4) + }; + + static final Row[] COMPLEX_DATA2 = {Row.of("BETA", 7L, 22, 4)}; + + static final TableTestProgram MATCH_SIMPLE = + TableTestProgram.of("match-simple", "simple match recognize test") + .setupTableSource( + SourceTestStep.newBuilder("MyTable") + .addSchema( + "id bigint", "name varchar", "proctime as PROCTIME()") + .producedBeforeRestore(SIMPLE_DATA) + .producedAfterRestore(SIMPLE_DATA2) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("MySink") + .addSchema("a bigint", "b bigint", "c bigint") + .consumedBeforeRestore(Row.of(6L, 7L, 8L)) + .consumedAfterRestore(Row.of(9L, 10L, 11L)) + .build()) + .runSql( + "insert into MySink" + + " SELECT T.aid, T.bid, T.cid\n" + + " FROM MyTable MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " `A\"`.id AS aid,\n" + + " \u006C.id AS bid,\n" + + " C.id AS cid\n" + + " PATTERN (`A\"` \u006C C)\n" + + " DEFINE\n" + + " `A\"` AS name = 'a',\n" + + " \u006C AS name = 'b',\n" + + " C AS name = 'c'\n" + + " ) AS T") + .build(); + + static final TableTestProgram MATCH_COMPLEX = + TableTestProgram.of("match-complex", "complex match recognize test") + .setupTableSource( + SourceTestStep.newBuilder("MyTable") + .addSchema( + "symbol string", + "tstamp bigint", + "price int", + "tax int", + "proctime as PROCTIME()") + .producedBeforeRestore(COMPLEX_DATA) + .producedAfterRestore(COMPLEX_DATA2) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("MySink") + .addSchema( + "somestring string", "a bigint", "b bigint", "c bigint") + .consumedBeforeRestore(Row.of("ACME", 19L, 13L, null)) + .consumedAfterRestore(Row.of("BETA", 18L, 16L, null)) + .build()) + .runSql( + "insert into MySink SELECT * FROM MyTable MATCH_RECOGNIZE (\n" + + " PARTITION BY symbol\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " FIRST(DOWN.price) as first,\n" + + " LAST(DOWN.price) as last,\n" + + " FIRST(DOWN.price, 5) as nullPrice\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (DOWN{2,} UP)\n" + + " DEFINE\n" + + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" + + " UP AS price > LAST(DOWN.price)\n" + + ") AS T") + .build(); + + static final Row[] BEFORE_DATA = { + Row.of("2020-10-10 00:00:01", 10, 3), + Row.of("2020-10-10 00:00:01", 8, 2), + Row.of("2020-10-10 00:00:01", 9, 1), + Row.of("2020-10-10 00:00:04", 7, 4), + Row.of("2020-10-10 00:00:07", 8, 5), + // out of order - should be processed with a 2-second watermark in use. + Row.of("2020-10-10 00:00:06", 5, 6), + Row.of("2020-10-10 00:00:12", 3, 7), + // late event - should be ignored with a 2-second watermark in use. + Row.of("2020-10-10 00:00:08", 4, 8), + Row.of("2020-10-10 00:00:16", 4, 9), + Row.of("2020-10-10 00:00:32", 7, 10), + Row.of("2020-10-10 00:00:34", 5, 11) + }; + + static final Row[] AFTER_DATA = { + Row.of("2020-10-10 00:00:33", 9, 12), + Row.of("2020-10-10 00:00:41", 3, 13), + Row.of("2020-10-10 00:00:42", 11, 16), + Row.of("2020-10-10 00:00:43", 12, 15), + Row.of("2020-10-10 00:00:44", 13, 14) + }; + + static final SourceTestStep SOURCE = + SourceTestStep.newBuilder("MyEventTimeTable") + .addSchema( + "ts STRING", + "price INT", + "sequence_num INT", + "`rowtime` AS TO_TIMESTAMP(`ts`)", + "`proctime` AS PROCTIME()", + "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '2' SECOND") + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build(); + + static final TableTestProgram MATCH_ORDER_BY_EVENT_TIME = + TableTestProgram.of("match-order-by-event-time", "complex match recognize test") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("MySink") + .addSchema("first bigint", "last bigint", "up bigint") + .consumedBeforeRestore(Row.of(10L, 8L, 9L), Row.of(7L, 5L, 8L)) + .consumedAfterRestore(Row.of(9L, 3L, 11L)) + .build()) + .runSql(getEventTimeSql("ORDER BY rowtime")) + .build(); + + static final TableTestProgram MATCH_ORDER_BY_INT_COLUMN = + TableTestProgram.of("match-order-by-int-column", "complex match recognize test") + .setupTableSource(SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("MySink") + .addSchema("first bigint", "last bigint", "up bigint") + .consumedBeforeRestore(Row.of(9L, 8L, 10L), Row.of(7L, 5L, 8L)) + .consumedAfterRestore(Row.of(9L, 3L, 11L)) + .build()) + .runSql(getEventTimeSql("ORDER BY rowtime, sequence_num")) + .build(); + + private static String getEventTimeSql(final String orderByClause) { + final String sql = + "insert into MySink SELECT * FROM MyEventTimeTable MATCH_RECOGNIZE (\n" + + " %s\n" + + " MEASURES\n" + + " FIRST(DOWN.price) as first,\n" + + " LAST(DOWN.price) as last,\n" + + " UP.price as up\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (DOWN{2,} UP)\n" + + " DEFINE\n" + + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" + + " UP AS price > LAST(DOWN.price)\n" + + ") AS T"; + return String.format(sql, orderByClause); + } + + static final TableTestProgram MATCH_SKIP_TO_FIRST = + getSkipTestProgram( + "match-skip-to-first", + "skip to first match recognize test", + "AFTER MATCH SKIP TO FIRST B", + new Row[] {Row.of(1L, 100, 106), Row.of(1L, 105, 107), Row.of(1L, 101, 101)}, + new Row[] {Row.of(1L, 100, 111)}); + + static final TableTestProgram MATCH_SKIP_TO_LAST = + getSkipTestProgram( + "match-skip-to-last", + "skip to last match recognize test", + "AFTER MATCH SKIP TO LAST B", + new Row[] {Row.of(1L, 100, 106), Row.of(1L, 105, 107), Row.of(1L, 101, 101)}, + new Row[] {Row.of(1L, 100, 111)}); + + static final TableTestProgram MATCH_SKIP_TO_NEXT_ROW = + getSkipTestProgram( + "match-skip-to-next-row", + "skip to next row match recognize test", + "AFTER MATCH SKIP TO NEXT ROW", + new Row[] { + Row.of(1L, 100, 106), + Row.of(1L, 102, 106), + Row.of(1L, 104, 106), + Row.of(1L, 106, 106), + Row.of(1L, 105, 107), + Row.of(1L, 107, 107), + Row.of(1L, 101, 101) + }, + new Row[] {Row.of(1L, 100, 111), Row.of(1L, 110, 111), Row.of(1L, 111, 111)}); + + static final TableTestProgram MATCH_SKIP_PAST_LAST_ROW = + getSkipTestProgram( + "match-skip-past-last-row", + "skip past last row match recognize test", + "AFTER MATCH SKIP PAST LAST ROW", + new Row[] {Row.of(1L, 100, 106), Row.of(1L, 107, 107)}, + new Row[] {Row.of(1L, 100, 111)}); + + private static TableTestProgram getSkipTestProgram( + final String name, + final String description, + final String skipClause, + final Row[] beforeRows, + final Row[] afterRows) { + return TableTestProgram.of(name, description) + .setupTableSource( + SourceTestStep.newBuilder("MyTable") + .addSchema( + " vehicle_id bigint,\n" + + " engine_temperature int,\n" + + " proctime as PROCTIME()") + .producedBeforeRestore( + Row.of(1L, 100), + Row.of(1L, 102), + Row.of(1L, 104), + Row.of(1L, 106), + Row.of(1L, 105), + Row.of(1L, 107), + Row.of(1L, 101), + Row.of(1L, 100)) + .producedAfterRestore( + Row.of(1L, 110), Row.of(1L, 111), Row.of(1L, 99)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("MySink") + .addSchema( + " vehicle_id bigint,\n" + + " Initial_Temp int,\n" + + " Final_Temp int\n") + .consumedBeforeRestore(beforeRows) + .consumedAfterRestore(afterRows) + .build()) + .runSql(getSql(skipClause)) + .build(); + } + + private static String getSql(final String afterClause) { + final String sql = + "insert into MySink" + + " SELECT * FROM\n" + + " MyTable\n" + + " MATCH_RECOGNIZE(\n" + + " PARTITION BY vehicle_id\n" + + " ORDER BY `proctime`\n" + + " MEASURES \n" + + " FIRST(A.engine_temperature) as Initial_Temp,\n" + + " LAST(A.engine_temperature) as Final_Temp\n" + + " ONE ROW PER MATCH\n" + + " %s\n" + + " PATTERN (A+ B)\n" + + " DEFINE\n" + + " A as LAST(A.engine_temperature,1) is NULL OR A.engine_temperature > LAST(A.engine_temperature,1),\n" + + " B as B.engine_temperature < LAST(A.engine_temperature)\n" + + " )MR;"; + return String.format(sql, afterClause); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java deleted file mode 100644 index a5cb80e3709..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/MatchRecognizeJsonPlanITCase.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; -import org.apache.flink.types.Row; - -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** Test json deserialization for match recognize. */ -class MatchRecognizeJsonPlanITCase extends JsonPlanTestBase { - @Test - void testSimpleMatch() throws Exception { - List<Row> data = - Arrays.asList( - Row.of(1L, "a"), - Row.of(2L, "z"), - Row.of(3L, "b"), - Row.of(4L, "c"), - Row.of(5L, "d"), - Row.of(6L, "a"), - Row.of(7L, "b"), - Row.of(8L, "c"), - Row.of(9L, "h")); - - createTestValuesSourceTable( - "MyTable", data, "id bigint", "name varchar", "proctime as PROCTIME()"); - createTestValuesSinkTable("MySink", "a bigint", "b bigint", "c bigint"); - - String sql = - "insert into MySink" - + " SELECT T.aid, T.bid, T.cid\n" - + " FROM MyTable MATCH_RECOGNIZE (\n" - + " ORDER BY proctime\n" - + " MEASURES\n" - + " `A\"`.id AS aid,\n" - + " \u006C.id AS bid,\n" - + " C.id AS cid\n" - + " PATTERN (`A\"` \u006C C)\n" - + " DEFINE\n" - + " `A\"` AS name = 'a',\n" - + " \u006C AS name = 'b',\n" - + " C AS name = 'c'\n" - + " ) AS T"; - compileSqlAndExecutePlan(sql).await(); - - List<String> expected = Collections.singletonList("+I[6, 7, 8]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testComplexMatch() throws Exception { - List<Row> data = - Arrays.asList( - Row.of("ACME", 1L, 19, 1), - Row.of("ACME", 2L, 17, 2), - Row.of("ACME", 3L, 13, 3), - Row.of("ACME", 4L, 20, 4)); - createTestValuesSourceTable( - "MyTable", - data, - "symbol string", - "tstamp bigint", - "price int", - "tax int", - "proctime as PROCTIME()"); - createTestValuesSinkTable("MySink", "a bigint", "b bigint", "c bigint"); - - String sql = - "insert into MySink SELECT * FROM MyTable MATCH_RECOGNIZE (\n" - + " ORDER BY proctime\n" - + " MEASURES\n" - + " FIRST(DOWN.price) as first,\n" - + " LAST(DOWN.price) as last,\n" - + " FIRST(DOWN.price, 5) as nullPrice\n" - + " ONE ROW PER MATCH\n" - + " AFTER MATCH SKIP PAST LAST ROW\n" - + " PATTERN (DOWN{2,} UP)\n" - + " DEFINE\n" - + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" - + " UP AS price > LAST(DOWN.price)\n" - + ") AS T"; - compileSqlAndExecutePlan(sql).await(); - - List<String> expected = Collections.singletonList("+I[19, 13, null]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-complex/plan/match-complex.json similarity index 59% copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-complex/plan/match-complex.json index f6327ec0b26..d266ee2c367 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-complex/plan/match-complex.json @@ -1,7 +1,7 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 6, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -9,53 +9,75 @@ "resolvedTable" : { "schema" : { "columns" : [ { - "name" : "vehicle_id", + "name" : "symbol", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "tstamp", "dataType" : "BIGINT" }, { - "name" : "engine_temperature", + "name" : "price", "dataType" : "INT" }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", + "name" : "tax", + "dataType" : "INT" + }, { + "name" : "proctime", + "kind" : "COMPUTED", "expression" : { "rexNode" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } }, - "serializableString" : "`SOURCE_WATERMARK`()" + "serializableString" : "PROCTIME()" } - } ] + } ], + "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT, `rowtime` TIMESTAMP(3) WITH LOCAL TIME ZONE>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature, rowtime])", + "outputType" : "ROW<`symbol` VARCHAR(2147483647), `tstamp` BIGINT, `price` INT, `tax` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[symbol, tstamp, price, tax])", "inputProperties" : [ ] }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { + "id" : 7, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT" + }, { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "rowtimeFieldIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -66,23 +88,30 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "vehicle_id", + "name" : "symbol", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "tstamp", "fieldType" : "BIGINT" }, { - "name" : "engine_temperature", + "name" : "price", + "fieldType" : "INT" + }, { + "name" : "tax", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[SOURCE_WATERMARK()])" + "description" : "Calc(select=[symbol, tstamp, price, tax, PROCTIME() AS proctime])" }, { - "id" : 3, + "id" : 8, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -95,23 +124,30 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "vehicle_id", + "name" : "symbol", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "tstamp", "fieldType" : "BIGINT" }, { - "name" : "engine_temperature", + "name" : "price", + "fieldType" : "INT" + }, { + "name" : "tax", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, - "description" : "Exchange(distribution=[hash[vehicle_id]])" + "description" : "Exchange(distribution=[hash[symbol]])" }, { - "id" : 4, + "id" : 9, "type" : "stream-exec-match_1", "matchSpec" : { "pattern" : { @@ -124,11 +160,11 @@ "internalName" : "$PATTERN_QUANTIFIER$1", "operands" : [ { "kind" : "LITERAL", - "value" : "A", - "type" : "CHAR(1) NOT NULL" + "value" : "DOWN", + "type" : "CHAR(4) NOT NULL" }, { "kind" : "LITERAL", - "value" : 1, + "value" : 2, "type" : "INT NOT NULL" }, { "kind" : "LITERAL", @@ -142,61 +178,61 @@ "type" : "NULL" }, { "kind" : "LITERAL", - "value" : "B", - "type" : "CHAR(1) NOT NULL" + "value" : "UP", + "type" : "CHAR(2) NOT NULL" } ], "type" : "NULL" }, "patternDefinitions" : { - "A" : { + "DOWN" : { "kind" : "CALL", "syntax" : "BINARY", "internalName" : "$OR$1", "operands" : [ { "kind" : "CALL", - "syntax" : "POSTFIX", - "internalName" : "$IS NULL$1", + "syntax" : "BINARY", + "internalName" : "$<$1", "operands" : [ { "kind" : "CALL", "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 1, + "alpha" : "*", + "inputIndex" : 2, "type" : "INT" }, { "kind" : "LITERAL", - "value" : 1, + "value" : 0, "type" : "INT NOT NULL" } ], "type" : "INT" - } ], - "type" : "BOOLEAN NOT NULL" - }, { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { + }, { "kind" : "CALL", - "internalName" : "$PREV$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 1, + "alpha" : "DOWN", + "inputIndex" : 2, "type" : "INT" }, { "kind" : "LITERAL", - "value" : 0, + "value" : 1, "type" : "INT NOT NULL" } ], "type" : "INT" - }, { + } ], + "type" : "BOOLEAN" + }, { + "kind" : "CALL", + "syntax" : "POSTFIX", + "internalName" : "$IS NULL$1", + "operands" : [ { "kind" : "CALL", "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 1, + "alpha" : "DOWN", + "inputIndex" : 2, "type" : "INT" }, { "kind" : "LITERAL", @@ -205,21 +241,21 @@ } ], "type" : "INT" } ], - "type" : "BOOLEAN" + "type" : "BOOLEAN NOT NULL" } ], "type" : "BOOLEAN" }, - "B" : { + "UP" : { "kind" : "CALL", "syntax" : "BINARY", - "internalName" : "$<$1", + "internalName" : "$>$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$PREV$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "B", - "inputIndex" : 1, + "alpha" : "*", + "inputIndex" : 2, "type" : "INT" }, { "kind" : "LITERAL", @@ -232,8 +268,8 @@ "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 1, + "alpha" : "DOWN", + "inputIndex" : 2, "type" : "INT" }, { "kind" : "LITERAL", @@ -246,17 +282,17 @@ } }, "measures" : { - "Final_Temp" : { + "first" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$LAST$1", + "internalName" : "$FIRST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 1, + "alpha" : "DOWN", + "inputIndex" : 2, "type" : "INT" }, { "kind" : "LITERAL", @@ -267,17 +303,17 @@ } ], "type" : "INT" }, - "Initial_Temp" : { + "last" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$FIRST$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 1, + "alpha" : "DOWN", + "inputIndex" : 2, "type" : "INT" }, { "kind" : "LITERAL", @@ -288,38 +324,7 @@ } ], "type" : "INT" }, - "endTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$LAST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "startTime" : { + "nullPrice" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", @@ -327,28 +332,18 @@ "kind" : "CALL", "internalName" : "$FIRST$1", "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "kind" : "PATTERN_INPUT_REF", + "alpha" : "DOWN", + "inputIndex" : 2, + "type" : "INT" }, { "kind" : "LITERAL", - "value" : 0, + "value" : 5, "type" : "INT NOT NULL" } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : "INT" } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : "INT" } }, "after" : { @@ -367,7 +362,7 @@ }, "orderBy" : { "fields" : [ { - "index" : 2, + "index" : 4, "isAscending" : true, "nullIsLast" : false } ] @@ -381,10 +376,58 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Match(partitionBy=[vehicle_id], orderBy=[rowtime ASC], measures=[FINAL(FIRST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS startTime, FINAL(LAST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS endTime, FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B') [...] + "outputType" : "ROW<`symbol` VARCHAR(2147483647), `first` INT, `last` INT, `nullPrice` INT>", + "description" : "Match(partitionBy=[symbol], orderBy=[proctime ASC], measures=[FINAL(FIRST(DOWN.price, 0)) AS first, FINAL(LAST(DOWN.price, 0)) AS last, FINAL(FIRST(DOWN.price, 5)) AS nullPrice], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'DOWN', 2, -1, false), _UTF-16LE'UP')], define=[{DOWN=OR(<(LAST(*.$2, 0), LAST(DOWN.$2, 1)), IS NULL(LAST(DOWN.$2, 1))), UP=>(LAST(*.$2, 0), LAST(DOWN.$2, 0))}])" + }, { + "id" : 10, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`somestring` VARCHAR(2147483647), `a` BIGINT, `b` BIGINT, `c` BIGINT>", + "description" : "Calc(select=[symbol AS somestring, CAST(first AS BIGINT) AS a, CAST(last AS BIGINT) AS b, CAST(nullPrice AS BIGINT) AS c])" }, { - "id" : 5, + "id" : 11, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -399,29 +442,21 @@ "resolvedTable" : { "schema" : { "columns" : [ { - "name" : "vehicle_id", - "dataType" : "BIGINT" - }, { - "name" : "startTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "name" : "somestring", + "dataType" : "VARCHAR(2147483647)" }, { - "name" : "endTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "name" : "a", + "dataType" : "BIGINT" }, { - "name" : "Initial_Temp", - "dataType" : "INT" + "name" : "b", + "dataType" : "BIGINT" }, { - "name" : "Final_Temp", - "dataType" : "INT" + "name" : "c", + "dataType" : "BIGINT" } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, @@ -433,36 +468,43 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, startTime, endTime, Initial_Temp, Final_Temp])" + "outputType" : "ROW<`somestring` VARCHAR(2147483647), `a` BIGINT, `b` BIGINT, `c` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[somestring, a, b, c])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 8, + "target" : 9, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 9, + "target" : 10, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 10, + "target" : 11, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-complex/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-complex/savepoint/_metadata new file mode 100644 index 00000000000..981e6cc9350 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-complex/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-event-time/plan/match-order-by-event-time.json similarity index 51% copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-event-time/plan/match-order-by-event-time.json index f6327ec0b26..0cf3e7f81ee 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-event-time/plan/match-order-by-event-time.json @@ -1,25 +1,54 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 12, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`MyEventTimeTable`", "resolvedTable" : { "schema" : { "columns" : [ { - "name" : "vehicle_id", - "dataType" : "BIGINT" + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "INT" }, { - "name" : "engine_temperature", + "name" : "sequence_num", "dataType" : "INT" }, { "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" } } ], "watermarkSpecs" : [ { @@ -27,35 +56,117 @@ "expression" : { "rexNode" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", - "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "2000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" }, - "serializableString" : "`SOURCE_WATERMARK`()" + "serializableString" : "`rowtime` - INTERVAL '2' SECOND" } } ] }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT, `rowtime` TIMESTAMP(3) WITH LOCAL TIME ZONE>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature, rowtime])", + "outputType" : "ROW<`ts` VARCHAR(2147483647), `price` INT, `sequence_num` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyEventTimeTable]], fields=[ts, price, sequence_num])", "inputProperties" : [ ] }, { - "id" : 2, + "id" : 13, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "INT" + }, { + "name" : "sequence_num", + "fieldType" : "INT" + }, { + "name" : "rowtime", + "fieldType" : "TIMESTAMP(3)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[ts, price, sequence_num, TO_TIMESTAMP(ts) AS rowtime, PROCTIME() AS proctime])" + }, { + "id" : 14, "type" : "stream-exec-watermark-assigner_1", "watermarkExpr" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", - "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "2000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" }, - "rowtimeFieldIndex" : 2, + "rowtimeFieldIndex" : 3, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -66,28 +177,38 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "vehicle_id", - "fieldType" : "BIGINT" + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "INT" }, { - "name" : "engine_temperature", + "name" : "sequence_num", "fieldType" : "INT" }, { "name" : "rowtime", "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", "precision" : 3, "kind" : "ROWTIME" } + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } } ] }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[SOURCE_WATERMARK()])" + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 2000:INTERVAL SECOND)])" }, { - "id" : 3, + "id" : 15, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] + "type" : "SINGLETON" }, "damBehavior" : "PIPELINED", "priority" : 0 @@ -95,23 +216,34 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "vehicle_id", - "fieldType" : "BIGINT" + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "INT" }, { - "name" : "engine_temperature", + "name" : "sequence_num", "fieldType" : "INT" }, { "name" : "rowtime", "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", "precision" : 3, "kind" : "ROWTIME" } + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } } ] }, - "description" : "Exchange(distribution=[hash[vehicle_id]])" + "description" : "Exchange(distribution=[single])" }, { - "id" : 4, + "id" : 16, "type" : "stream-exec-match_1", "matchSpec" : { "pattern" : { @@ -124,11 +256,11 @@ "internalName" : "$PATTERN_QUANTIFIER$1", "operands" : [ { "kind" : "LITERAL", - "value" : "A", - "type" : "CHAR(1) NOT NULL" + "value" : "DOWN", + "type" : "CHAR(4) NOT NULL" }, { "kind" : "LITERAL", - "value" : 1, + "value" : 2, "type" : "INT NOT NULL" }, { "kind" : "LITERAL", @@ -142,60 +274,60 @@ "type" : "NULL" }, { "kind" : "LITERAL", - "value" : "B", - "type" : "CHAR(1) NOT NULL" + "value" : "UP", + "type" : "CHAR(2) NOT NULL" } ], "type" : "NULL" }, "patternDefinitions" : { - "A" : { + "DOWN" : { "kind" : "CALL", "syntax" : "BINARY", "internalName" : "$OR$1", "operands" : [ { "kind" : "CALL", - "syntax" : "POSTFIX", - "internalName" : "$IS NULL$1", + "syntax" : "BINARY", + "internalName" : "$<$1", "operands" : [ { "kind" : "CALL", "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "*", "inputIndex" : 1, "type" : "INT" }, { "kind" : "LITERAL", - "value" : 1, + "value" : 0, "type" : "INT NOT NULL" } ], "type" : "INT" - } ], - "type" : "BOOLEAN NOT NULL" - }, { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { + }, { "kind" : "CALL", - "internalName" : "$PREV$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { "kind" : "LITERAL", - "value" : 0, + "value" : 1, "type" : "INT NOT NULL" } ], "type" : "INT" - }, { + } ], + "type" : "BOOLEAN" + }, { + "kind" : "CALL", + "syntax" : "POSTFIX", + "internalName" : "$IS NULL$1", + "operands" : [ { "kind" : "CALL", "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { @@ -205,20 +337,20 @@ } ], "type" : "INT" } ], - "type" : "BOOLEAN" + "type" : "BOOLEAN NOT NULL" } ], "type" : "BOOLEAN" }, - "B" : { + "UP" : { "kind" : "CALL", "syntax" : "BINARY", - "internalName" : "$<$1", + "internalName" : "$>$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$PREV$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "B", + "alpha" : "*", "inputIndex" : 1, "type" : "INT" }, { @@ -232,7 +364,7 @@ "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { @@ -246,16 +378,16 @@ } }, "measures" : { - "Final_Temp" : { + "first" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$LAST$1", + "internalName" : "$FIRST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { @@ -267,16 +399,16 @@ } ], "type" : "INT" }, - "Initial_Temp" : { + "last" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$FIRST$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { @@ -288,67 +420,17 @@ } ], "type" : "INT" }, - "endTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$LAST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "startTime" : { + "up" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", "operands" : [ { - "kind" : "CALL", - "internalName" : "$FIRST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "kind" : "PATTERN_INPUT_REF", + "alpha" : "UP", + "inputIndex" : 1, + "type" : "INT" } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : "INT" } }, "after" : { @@ -363,11 +445,11 @@ "subsets" : { }, "allRows" : false, "partition" : { - "fields" : [ 0 ] + "fields" : [ ] }, "orderBy" : { "fields" : [ { - "index" : 2, + "index" : 3, "isAscending" : true, "nullIsLast" : false } ] @@ -381,10 +463,54 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Match(partitionBy=[vehicle_id], orderBy=[rowtime ASC], measures=[FINAL(FIRST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS startTime, FINAL(LAST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS endTime, FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B') [...] + "outputType" : "ROW<`first` INT, `last` INT, `up` INT>", + "description" : "Match(orderBy=[rowtime ASC], measures=[FINAL(FIRST(DOWN.price, 0)) AS first, FINAL(LAST(DOWN.price, 0)) AS last, FINAL(UP.price) AS up], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'DOWN', 2, -1, false), _UTF-16LE'UP')], define=[{DOWN=OR(<(LAST(*.$1, 0), LAST(DOWN.$1, 1)), IS NULL(LAST(DOWN.$1, 1))), UP=>(LAST(*.$1, 0), LAST(DOWN.$1, 0))}])" + }, { + "id" : 17, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`first` BIGINT, `last` BIGINT, `up` BIGINT>", + "description" : "Calc(select=[CAST(first AS BIGINT) AS first, CAST(last AS BIGINT) AS last, CAST(up AS BIGINT) AS up])" }, { - "id" : 5, + "id" : 18, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -399,29 +525,18 @@ "resolvedTable" : { "schema" : { "columns" : [ { - "name" : "vehicle_id", + "name" : "first", "dataType" : "BIGINT" }, { - "name" : "startTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "name" : "endTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "name" : "Initial_Temp", - "dataType" : "INT" + "name" : "last", + "dataType" : "BIGINT" }, { - "name" : "Final_Temp", - "dataType" : "INT" + "name" : "up", + "dataType" : "BIGINT" } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, @@ -433,36 +548,50 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, startTime, endTime, Initial_Temp, Final_Temp])" + "outputType" : "ROW<`first` BIGINT, `last` BIGINT, `up` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[first, last, up])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 15, + "target" : 16, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 16, + "target" : 17, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 17, + "target" : 18, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-event-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-event-time/savepoint/_metadata new file mode 100644 index 00000000000..e0731512398 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-event-time/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-int-column/plan/match-order-by-int-column.json similarity index 51% copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-int-column/plan/match-order-by-int-column.json index f6327ec0b26..8b95945a648 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-int-column/plan/match-order-by-int-column.json @@ -1,25 +1,54 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 19, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "identifier" : "`default_catalog`.`default_database`.`MyEventTimeTable`", "resolvedTable" : { "schema" : { "columns" : [ { - "name" : "vehicle_id", - "dataType" : "BIGINT" + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "dataType" : "INT" }, { - "name" : "engine_temperature", + "name" : "sequence_num", "dataType" : "INT" }, { "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + }, { + "name" : "proctime", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" } } ], "watermarkSpecs" : [ { @@ -27,35 +56,117 @@ "expression" : { "rexNode" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", - "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "2000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" }, - "serializableString" : "`SOURCE_WATERMARK`()" + "serializableString" : "`rowtime` - INTERVAL '2' SECOND" } } ] }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT, `rowtime` TIMESTAMP(3) WITH LOCAL TIME ZONE>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature, rowtime])", + "outputType" : "ROW<`ts` VARCHAR(2147483647), `price` INT, `sequence_num` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyEventTimeTable]], fields=[ts, price, sequence_num])", "inputProperties" : [ ] }, { - "id" : 2, + "id" : 20, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "INT" + }, { + "name" : "sequence_num", + "fieldType" : "INT" + }, { + "name" : "rowtime", + "fieldType" : "TIMESTAMP(3)" + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[ts, price, sequence_num, TO_TIMESTAMP(ts) AS rowtime, PROCTIME() AS proctime])" + }, { + "id" : 21, "type" : "stream-exec-watermark-assigner_1", "watermarkExpr" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", - "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "2000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" }, - "rowtimeFieldIndex" : 2, + "rowtimeFieldIndex" : 3, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -66,28 +177,38 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "vehicle_id", - "fieldType" : "BIGINT" + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "INT" }, { - "name" : "engine_temperature", + "name" : "sequence_num", "fieldType" : "INT" }, { "name" : "rowtime", "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", "precision" : 3, "kind" : "ROWTIME" } + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } } ] }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[SOURCE_WATERMARK()])" + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 2000:INTERVAL SECOND)])" }, { - "id" : 3, + "id" : 22, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] + "type" : "SINGLETON" }, "damBehavior" : "PIPELINED", "priority" : 0 @@ -95,23 +216,34 @@ "outputType" : { "type" : "ROW", "fields" : [ { - "name" : "vehicle_id", - "fieldType" : "BIGINT" + "name" : "ts", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "price", + "fieldType" : "INT" }, { - "name" : "engine_temperature", + "name" : "sequence_num", "fieldType" : "INT" }, { "name" : "rowtime", "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", "precision" : 3, "kind" : "ROWTIME" } + }, { + "name" : "proctime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } } ] }, - "description" : "Exchange(distribution=[hash[vehicle_id]])" + "description" : "Exchange(distribution=[single])" }, { - "id" : 4, + "id" : 23, "type" : "stream-exec-match_1", "matchSpec" : { "pattern" : { @@ -124,11 +256,11 @@ "internalName" : "$PATTERN_QUANTIFIER$1", "operands" : [ { "kind" : "LITERAL", - "value" : "A", - "type" : "CHAR(1) NOT NULL" + "value" : "DOWN", + "type" : "CHAR(4) NOT NULL" }, { "kind" : "LITERAL", - "value" : 1, + "value" : 2, "type" : "INT NOT NULL" }, { "kind" : "LITERAL", @@ -142,60 +274,60 @@ "type" : "NULL" }, { "kind" : "LITERAL", - "value" : "B", - "type" : "CHAR(1) NOT NULL" + "value" : "UP", + "type" : "CHAR(2) NOT NULL" } ], "type" : "NULL" }, "patternDefinitions" : { - "A" : { + "DOWN" : { "kind" : "CALL", "syntax" : "BINARY", "internalName" : "$OR$1", "operands" : [ { "kind" : "CALL", - "syntax" : "POSTFIX", - "internalName" : "$IS NULL$1", + "syntax" : "BINARY", + "internalName" : "$<$1", "operands" : [ { "kind" : "CALL", "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "*", "inputIndex" : 1, "type" : "INT" }, { "kind" : "LITERAL", - "value" : 1, + "value" : 0, "type" : "INT NOT NULL" } ], "type" : "INT" - } ], - "type" : "BOOLEAN NOT NULL" - }, { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { + }, { "kind" : "CALL", - "internalName" : "$PREV$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { "kind" : "LITERAL", - "value" : 0, + "value" : 1, "type" : "INT NOT NULL" } ], "type" : "INT" - }, { + } ], + "type" : "BOOLEAN" + }, { + "kind" : "CALL", + "syntax" : "POSTFIX", + "internalName" : "$IS NULL$1", + "operands" : [ { "kind" : "CALL", "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { @@ -205,20 +337,20 @@ } ], "type" : "INT" } ], - "type" : "BOOLEAN" + "type" : "BOOLEAN NOT NULL" } ], "type" : "BOOLEAN" }, - "B" : { + "UP" : { "kind" : "CALL", "syntax" : "BINARY", - "internalName" : "$<$1", + "internalName" : "$>$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$PREV$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "B", + "alpha" : "*", "inputIndex" : 1, "type" : "INT" }, { @@ -232,7 +364,7 @@ "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { @@ -246,16 +378,16 @@ } }, "measures" : { - "Final_Temp" : { + "first" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$LAST$1", + "internalName" : "$FIRST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { @@ -267,16 +399,16 @@ } ], "type" : "INT" }, - "Initial_Temp" : { + "last" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", "operands" : [ { "kind" : "CALL", - "internalName" : "$FIRST$1", + "internalName" : "$LAST$1", "operands" : [ { "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", + "alpha" : "DOWN", "inputIndex" : 1, "type" : "INT" }, { @@ -288,67 +420,17 @@ } ], "type" : "INT" }, - "endTime" : { + "up" : { "kind" : "CALL", "syntax" : "PREFIX", "internalName" : "$FINAL$1", "operands" : [ { - "kind" : "CALL", - "internalName" : "$LAST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "startTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FIRST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "kind" : "PATTERN_INPUT_REF", + "alpha" : "UP", + "inputIndex" : 1, + "type" : "INT" } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : "INT" } }, "after" : { @@ -363,10 +445,14 @@ "subsets" : { }, "allRows" : false, "partition" : { - "fields" : [ 0 ] + "fields" : [ ] }, "orderBy" : { "fields" : [ { + "index" : 3, + "isAscending" : true, + "nullIsLast" : false + }, { "index" : 2, "isAscending" : true, "nullIsLast" : false @@ -381,10 +467,54 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Match(partitionBy=[vehicle_id], orderBy=[rowtime ASC], measures=[FINAL(FIRST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS startTime, FINAL(LAST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS endTime, FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B') [...] + "outputType" : "ROW<`first` INT, `last` INT, `up` INT>", + "description" : "Match(orderBy=[rowtime ASC, sequence_num ASC], measures=[FINAL(FIRST(DOWN.price, 0)) AS first, FINAL(LAST(DOWN.price, 0)) AS last, FINAL(UP.price) AS up], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'DOWN', 2, -1, false), _UTF-16LE'UP')], define=[{DOWN=OR(<(LAST(*.$1, 0), LAST(DOWN.$1, 1)), IS NULL(LAST(DOWN.$1, 1))), UP=>(LAST(*.$1, 0), LAST(DOWN.$1, 0))}])" }, { - "id" : 5, + "id" : 24, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`first` BIGINT, `last` BIGINT, `up` BIGINT>", + "description" : "Calc(select=[CAST(first AS BIGINT) AS first, CAST(last AS BIGINT) AS last, CAST(up AS BIGINT) AS up])" + }, { + "id" : 25, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -399,29 +529,18 @@ "resolvedTable" : { "schema" : { "columns" : [ { - "name" : "vehicle_id", + "name" : "first", "dataType" : "BIGINT" }, { - "name" : "startTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "name" : "endTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "name" : "Initial_Temp", - "dataType" : "INT" + "name" : "last", + "dataType" : "BIGINT" }, { - "name" : "Final_Temp", - "dataType" : "INT" + "name" : "up", + "dataType" : "BIGINT" } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, @@ -433,36 +552,50 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, startTime, endTime, Initial_Temp, Final_Temp])" + "outputType" : "ROW<`first` BIGINT, `last` BIGINT, `up` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[first, last, up])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 21, + "target" : 22, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 22, + "target" : 23, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 23, + "target" : 24, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 24, + "target" : 25, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-int-column/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-int-column/savepoint/_metadata new file mode 100644 index 00000000000..5f449fc93bc Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-order-by-int-column/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-simple/plan/match-simple.json similarity index 96% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out rename to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-simple/plan/match-simple.json index 562fc04e880..00d4cc7552d 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-simple/plan/match-simple.json @@ -1,5 +1,5 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { "id" : 1, "type" : "stream-exec-table-source-scan_1", @@ -34,11 +34,7 @@ } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, @@ -326,12 +322,7 @@ } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, @@ -375,4 +366,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-simple/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-simple/savepoint/_metadata new file mode 100644 index 00000000000..f7ac7ef0fa8 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-simple/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-past-last-row/plan/match-skip-past-last-row.json similarity index 65% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out rename to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-past-last-row/plan/match-skip-past-last-row.json index f6327ec0b26..f42a2779c3e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipPastLastRow.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-past-last-row/plan/match-skip-past-last-row.json @@ -1,7 +1,7 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 41, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -15,47 +15,55 @@ "name" : "engine_temperature", "dataType" : "INT" }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", + "name" : "proctime", + "kind" : "COMPUTED", "expression" : { "rexNode" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } }, - "serializableString" : "`SOURCE_WATERMARK`()" + "serializableString" : "PROCTIME()" } - } ] + } ], + "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT, `rowtime` TIMESTAMP(3) WITH LOCAL TIME ZONE>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature, rowtime])", + "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature])", "inputProperties" : [ ] }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { + "id" : 42, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "rowtimeFieldIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -72,17 +80,18 @@ "name" : "engine_temperature", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[SOURCE_WATERMARK()])" + "description" : "Calc(select=[vehicle_id, engine_temperature, PROCTIME() AS proctime])" }, { - "id" : 3, + "id" : 43, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -101,17 +110,18 @@ "name" : "engine_temperature", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, "description" : "Exchange(distribution=[hash[vehicle_id]])" }, { - "id" : 4, + "id" : 44, "type" : "stream-exec-match_1", "matchSpec" : { "pattern" : { @@ -287,68 +297,6 @@ "type" : "INT" } ], "type" : "INT" - }, - "endTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$LAST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "startTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FIRST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" } }, "after" : { @@ -381,10 +329,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Match(partitionBy=[vehicle_id], orderBy=[rowtime ASC], measures=[FINAL(FIRST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS startTime, FINAL(LAST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS endTime, FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B') [...] + "outputType" : "ROW<`vehicle_id` BIGINT, `Initial_Temp` INT, `Final_Temp` INT>", + "description" : "Match(partitionBy=[vehicle_id], orderBy=[proctime ASC], measures=[FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP PAST LAST ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B')], define=[{A=OR(IS NULL(LAST(A.$1, 1)), >(PREV(A.$1, 0), LAST(A.$1, 1))), B=<(PREV(B.$1, 0), LAST(A.$1, 0))}])" }, { - "id" : 5, + "id" : 45, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -401,12 +349,6 @@ "columns" : [ { "name" : "vehicle_id", "dataType" : "BIGINT" - }, { - "name" : "startTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "name" : "endTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" }, { "name" : "Initial_Temp", "dataType" : "INT" @@ -416,12 +358,7 @@ } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, @@ -433,36 +370,36 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, startTime, endTime, Initial_Temp, Final_Temp])" + "outputType" : "ROW<`vehicle_id` BIGINT, `Initial_Temp` INT, `Final_Temp` INT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, Initial_Temp, Final_Temp])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 41, + "target" : 42, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 42, + "target" : 43, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 43, + "target" : 44, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 44, + "target" : 45, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-past-last-row/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-past-last-row/savepoint/_metadata new file mode 100644 index 00000000000..9aa932e98dd Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-past-last-row/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToFirst.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-first/plan/match-skip-to-first.json similarity index 65% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToFirst.out rename to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-first/plan/match-skip-to-first.json index ed49d18cacc..43e83ec5ef7 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToFirst.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-first/plan/match-skip-to-first.json @@ -1,7 +1,7 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 26, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -15,47 +15,55 @@ "name" : "engine_temperature", "dataType" : "INT" }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", + "name" : "proctime", + "kind" : "COMPUTED", "expression" : { "rexNode" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } }, - "serializableString" : "`SOURCE_WATERMARK`()" + "serializableString" : "PROCTIME()" } - } ] + } ], + "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT, `rowtime` TIMESTAMP(3) WITH LOCAL TIME ZONE>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature, rowtime])", + "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature])", "inputProperties" : [ ] }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { + "id" : 27, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "rowtimeFieldIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -72,17 +80,18 @@ "name" : "engine_temperature", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[SOURCE_WATERMARK()])" + "description" : "Calc(select=[vehicle_id, engine_temperature, PROCTIME() AS proctime])" }, { - "id" : 3, + "id" : 28, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -101,17 +110,18 @@ "name" : "engine_temperature", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, "description" : "Exchange(distribution=[hash[vehicle_id]])" }, { - "id" : 4, + "id" : 29, "type" : "stream-exec-match_1", "matchSpec" : { "pattern" : { @@ -287,68 +297,6 @@ "type" : "INT" } ], "type" : "INT" - }, - "endTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$LAST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "startTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FIRST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" } }, "after" : { @@ -383,10 +331,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Match(partitionBy=[vehicle_id], orderBy=[rowtime ASC], measures=[FINAL(FIRST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS startTime, FINAL(LAST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS endTime, FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO FIRST _UTF-16LE'B'], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF- [...] + "outputType" : "ROW<`vehicle_id` BIGINT, `Initial_Temp` INT, `Final_Temp` INT>", + "description" : "Match(partitionBy=[vehicle_id], orderBy=[proctime ASC], measures=[FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO FIRST _UTF-16LE'B'], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B')], define=[{A=OR(IS NULL(LAST(A.$1, 1)), >(PREV(A.$1, 0), LAST(A.$1, 1))), B=<(PREV(B.$1, 0), LAST(A.$1, 0))}])" }, { - "id" : 5, + "id" : 30, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -403,12 +351,6 @@ "columns" : [ { "name" : "vehicle_id", "dataType" : "BIGINT" - }, { - "name" : "startTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "name" : "endTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" }, { "name" : "Initial_Temp", "dataType" : "INT" @@ -418,12 +360,7 @@ } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, @@ -435,36 +372,36 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, startTime, endTime, Initial_Temp, Final_Temp])" + "outputType" : "ROW<`vehicle_id` BIGINT, `Initial_Temp` INT, `Final_Temp` INT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, Initial_Temp, Final_Temp])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 26, + "target" : 27, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 27, + "target" : 28, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 28, + "target" : 29, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 29, + "target" : 30, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-first/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-first/savepoint/_metadata new file mode 100644 index 00000000000..b555d95de7f Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-first/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToLast.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-last/plan/match-skip-to-last.json similarity index 65% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToLast.out rename to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-last/plan/match-skip-to-last.json index d5c13f04e04..0633cea3f4e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToLast.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-last/plan/match-skip-to-last.json @@ -1,7 +1,7 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 31, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -15,47 +15,55 @@ "name" : "engine_temperature", "dataType" : "INT" }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", + "name" : "proctime", + "kind" : "COMPUTED", "expression" : { "rexNode" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } }, - "serializableString" : "`SOURCE_WATERMARK`()" + "serializableString" : "PROCTIME()" } - } ] + } ], + "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT, `rowtime` TIMESTAMP(3) WITH LOCAL TIME ZONE>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature, rowtime])", + "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature])", "inputProperties" : [ ] }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { + "id" : 32, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "rowtimeFieldIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -72,17 +80,18 @@ "name" : "engine_temperature", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[SOURCE_WATERMARK()])" + "description" : "Calc(select=[vehicle_id, engine_temperature, PROCTIME() AS proctime])" }, { - "id" : 3, + "id" : 33, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -101,17 +110,18 @@ "name" : "engine_temperature", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, "description" : "Exchange(distribution=[hash[vehicle_id]])" }, { - "id" : 4, + "id" : 34, "type" : "stream-exec-match_1", "matchSpec" : { "pattern" : { @@ -287,68 +297,6 @@ "type" : "INT" } ], "type" : "INT" - }, - "endTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$LAST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "startTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FIRST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" } }, "after" : { @@ -383,10 +331,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Match(partitionBy=[vehicle_id], orderBy=[rowtime ASC], measures=[FINAL(FIRST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS startTime, FINAL(LAST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS endTime, FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO LAST _UTF-16LE'B'], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-1 [...] + "outputType" : "ROW<`vehicle_id` BIGINT, `Initial_Temp` INT, `Final_Temp` INT>", + "description" : "Match(partitionBy=[vehicle_id], orderBy=[proctime ASC], measures=[FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO LAST _UTF-16LE'B'], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B')], define=[{A=OR(IS NULL(LAST(A.$1, 1)), >(PREV(A.$1, 0), LAST(A.$1, 1))), B=<(PREV(B.$1, 0), LAST(A.$1, 0))}])" }, { - "id" : 5, + "id" : 35, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -403,12 +351,6 @@ "columns" : [ { "name" : "vehicle_id", "dataType" : "BIGINT" - }, { - "name" : "startTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "name" : "endTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" }, { "name" : "Initial_Temp", "dataType" : "INT" @@ -418,12 +360,7 @@ } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, @@ -435,36 +372,36 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, startTime, endTime, Initial_Temp, Final_Temp])" + "outputType" : "ROW<`vehicle_id` BIGINT, `Initial_Temp` INT, `Final_Temp` INT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, Initial_Temp, Final_Temp])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 31, + "target" : 32, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 32, + "target" : 33, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 33, + "target" : 34, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 34, + "target" : 35, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-last/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-last/savepoint/_metadata new file mode 100644 index 00000000000..65b0d8e0865 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-last/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToNextRow.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-next-row/plan/match-skip-to-next-row.json similarity index 65% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToNextRow.out rename to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-next-row/plan/match-skip-to-next-row.json index 8e6d41a6236..3635610a8a0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testSkipToNextRow.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-next-row/plan/match-skip-to-next-row.json @@ -1,7 +1,7 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 36, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -15,47 +15,55 @@ "name" : "engine_temperature", "dataType" : "INT" }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", + "name" : "proctime", + "kind" : "COMPUTED", "expression" : { "rexNode" : { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } }, - "serializableString" : "`SOURCE_WATERMARK`()" + "serializableString" : "PROCTIME()" } - } ] + } ], + "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT, `rowtime` TIMESTAMP(3) WITH LOCAL TIME ZONE>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature, rowtime])", + "outputType" : "ROW<`vehicle_id` BIGINT, `engine_temperature` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[vehicle_id, engine_temperature])", "inputProperties" : [ ] }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { + "id" : 37, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { "kind" : "CALL", - "internalName" : "$SOURCE_WATERMARK$1", + "internalName" : "$PROCTIME$1", "operands" : [ ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "rowtimeFieldIndex" : 2, + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -72,17 +80,18 @@ "name" : "engine_temperature", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[SOURCE_WATERMARK()])" + "description" : "Calc(select=[vehicle_id, engine_temperature, PROCTIME() AS proctime])" }, { - "id" : 3, + "id" : 38, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -101,17 +110,18 @@ "name" : "engine_temperature", "fieldType" : "INT" }, { - "name" : "rowtime", + "name" : "proctime", "fieldType" : { "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, "precision" : 3, - "kind" : "ROWTIME" + "kind" : "PROCTIME" } } ] }, "description" : "Exchange(distribution=[hash[vehicle_id]])" }, { - "id" : 4, + "id" : 39, "type" : "stream-exec-match_1", "matchSpec" : { "pattern" : { @@ -287,68 +297,6 @@ "type" : "INT" } ], "type" : "INT" - }, - "endTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$LAST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, - "startTime" : { - "kind" : "CALL", - "syntax" : "PREFIX", - "internalName" : "$FINAL$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FIRST$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "PATTERN_INPUT_REF", - "alpha" : "A", - "inputIndex" : 2, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "kind" : "LITERAL", - "value" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - } ], - "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" } }, "after" : { @@ -381,10 +329,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Match(partitionBy=[vehicle_id], orderBy=[rowtime ASC], measures=[FINAL(FIRST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS startTime, FINAL(LAST(CAST(A.rowtime AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 0)) AS endTime, FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B')], [...] + "outputType" : "ROW<`vehicle_id` BIGINT, `Initial_Temp` INT, `Final_Temp` INT>", + "description" : "Match(partitionBy=[vehicle_id], orderBy=[proctime ASC], measures=[FINAL(FIRST(A.engine_temperature, 0)) AS Initial_Temp, FINAL(LAST(A.engine_temperature, 0)) AS Final_Temp], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B')], define=[{A=OR(IS NULL(LAST(A.$1, 1)), >(PREV(A.$1, 0), LAST(A.$1, 1))), B=<(PREV(B.$1, 0), LAST(A.$1, 0))}])" }, { - "id" : 5, + "id" : 40, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -401,12 +349,6 @@ "columns" : [ { "name" : "vehicle_id", "dataType" : "BIGINT" - }, { - "name" : "startTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" - }, { - "name" : "endTime", - "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" }, { "name" : "Initial_Temp", "dataType" : "INT" @@ -416,12 +358,7 @@ } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, @@ -433,36 +370,36 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`vehicle_id` BIGINT, `startTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `endTime` TIMESTAMP(3) WITH LOCAL TIME ZONE, `Initial_Temp` INT, `Final_Temp` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, startTime, endTime, Initial_Temp, Final_Temp])" + "outputType" : "ROW<`vehicle_id` BIGINT, `Initial_Temp` INT, `Final_Temp` INT>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[vehicle_id, Initial_Temp, Final_Temp])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 36, + "target" : 37, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 37, + "target" : 38, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 38, + "target" : 39, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 39, + "target" : 40, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-next-row/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-next-row/savepoint/_metadata new file mode 100644 index 00000000000..8cb185d7792 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-match_1/match-skip-to-next-row/savepoint/_metadata differ