To use a field in a table as timestamp, it must be declared as a rowtime attribute for the table.
1) Call env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime). 2) Call withRowtimeAttribute on KafkaJsonTableSourceBuilder. Reference: 1. https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes 2. https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#configuring-a-processing-time-attribute On Sat, Mar 10, 2018 at 4:49 AM Pavel Ciorba <pavli...@gmail.com> wrote: > Hi everyone! > > I decided to try the Time-windowed join functionality of Flink 1.4+. > > My SQL query is an exact copy of the example in the documentation, and the > program reads and writes from Kafka. > > I used the example from here: > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins > > Code: > https://gist.github.com/invoker27/ecb4f4b38a52642089e41f6f49886c28 > > Dependencies: > compile group: 'org.apache.flink', name: 'flink-table_2.11', version: > '1.4.0' > compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', > version: '1.4.0' > compile group: 'org.apache.flink', name: > 'flink-connector-kafka-0.11_2.11', version: '1.4.0' > > Error: > Exception in thread "main" org.apache.flink.table.api.TableException: > Cannot generate a valid execution plan for the given query: > > FlinkLogicalJoin(condition=[AND(=($0, $5), >=($3, -($8, 14400000)), <=($3, > $8))], joinType=[inner]) > FlinkLogicalTableSourceScan(table=[[TABLE1]], fields=[id, value1, > timestamp], source=[KafkaJSONTableSource]) > FlinkLogicalTableSourceScan(table=[[TABLE2]], fields=[id, value2, > timestamp], source=[KafkaJSONTableSource]) > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683) > at > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:251) > at org.apache.flink.table.api.Table.writeToSink(table.scala:862) > at org.apache.flink.table.api.Table.writeToSink(table.scala:830) > at > com.sheffield.healthmonitoring.join.conditionalhr.JoinSQL.main(JoinSQL.java:72) > > > I get the error in 1.4.0, 1.4.1 and 1.4.2, but 1.5-SNAPSHOT works. > > From what I can see the feature should work in 1.4. > > What might be the issue? > > Thank you! >