Hello,
I use Flink for continuous evaluation of SQL queries on streaming data. One
of the use cases requires us to run recursive SQL queries. I am unable to
find a way to edit rowtime time attribute of the intermediate result table.
For example, let's assume that there is a table T0 with schema -
root
|-- str1: STRING
|-- int1: BIGINT
|-- utime: TIMESTAMP(3)
|-- itime: TIMESTAMP(3) *ROWTIME*
Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");
I wish to change the rowtime of V0 from itime to utime. I tried doing -
V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
but ran into the following exception -
org.apache.flink.table.api.ValidationException: Window properties can only
be used on windowed tables.
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
~[flink-table-api-java-1.11.1.jar:1.11.1]
Any guidance on how to address this?
Regards,
Satyam