Hi Flink community, *Here is the context: * Theoretically, I would like to write following query but it won't work since we can only define the WATERMARK in a table DDL:
INSERT into tableC select tableA.field1 SUM(1) as `count`, time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp), WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS from tableA join tableB on tableA.joinCol == tableB.joinCol group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1 (note: getEventTimeInNS is a UDF that calculates event time using tableA.timestamp and tableB.timestamp) so I have to define a intermediary table to store the results from joining, and defining event time and watermark in the table DDL, then performs tumbling windowing on the intermediary table: CREATE TABLE IntermediaryTable ( field1, `eventTimestampInNanoseconds` BIGINT, time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3), WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS ) WITH ( 'connector' = 'kafka', 'topic' = 'IntermediaryTable', 'properties.bootstrap.servers' = 'xxxxxx', 'properties.group.id' = 'contextevent-streaming-sql', 'format' = 'avro' ); INSERT INTO IntermediaryTable select tableA.field1 tableB.field2, getEventTimeInNS(tableA.timestamp, tableB.timestamp), from tableA join tableB on tableA.joinCol == tableB.joinCol; Then, I can perform tumbling window aggregation on the IntermediaryTable: INSERT INTO countTable (select event.field1 SUM(1) as `count` from IntermediaryTable event GROUP BY TUMBLE(event.time_ltz, INTERVAL '30' SECOND), event.field1 ); This is not convenient because the IntermediaryTable writes to another kafka topic that is only used by the tumbling window aggregation. When I try to group the two INSERT INTO statements within "BEGIN STATEMENT SET; END;", it will fail complaining the topic does not exist. I either have to first create this kafka topic beforehand, or run a separate job to INSERT INTO IntermediaryTable. In Java DataStream API, you can easily do so within flink topology without having to create a separate kafka topic: final DataStream<xxx> joinedStream = StreamA.join(StreamB) .where(xxxx) .equalTo(xxxx) .window(TumblingProcessingTimeWindows.of(Time.seconds(30))) .apply(aggregation); *Question:* Does the Flink community have any suggestions on how to do this in FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support defining eventtime and watermark on the fly without a table ddl? Would love to hear any suggestions. Thanks a lot in advance. -- Best Wishes & Regards Shawn Xiangcao Liu