Hi Flink community,

*Here is the context: *
Theoretically, I would like to write following query but it won't work
since we can only define the WATERMARK in a table DDL:

INSERT into tableC
select tableA.field1
         SUM(1) as `count`,
         time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
         WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
from tableA join tableB
on tableA.joinCol == tableB.joinCol
group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
(note: getEventTimeInNS is a UDF that calculates event time using
tableA.timestamp and tableB.timestamp)


so I have to define a intermediary table to store the results from joining,
and defining event time and watermark in the table DDL, then performs
tumbling windowing on the intermediary table:

CREATE TABLE IntermediaryTable (
   field1,
  `eventTimestampInNanoseconds`  BIGINT,
   time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
   WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'IntermediaryTable',
  'properties.bootstrap.servers' = 'xxxxxx',
  'properties.group.id' = 'contextevent-streaming-sql',
  'format' = 'avro'
);

INSERT INTO IntermediaryTable
select tableA.field1
          tableB.field2,
          getEventTimeInNS(tableA.timestamp, tableB.timestamp),
from tableA join tableB
on tableA.joinCol == tableB.joinCol;

Then, I can perform tumbling window aggregation on the IntermediaryTable:

INSERT INTO countTable
(select event.field1
        SUM(1) as `count`
 from IntermediaryTable event
 GROUP BY
  TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
  event.field1
);


This is not convenient because the IntermediaryTable writes to another
kafka topic that is only used by the tumbling window aggregation. When I
try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
END;", it will fail complaining the topic does not exist. I either have to
first create this kafka topic beforehand, or run a separate job to INSERT
INTO IntermediaryTable.

In Java DataStream API, you can easily do so within flink topology without
having to create a separate kafka topic:

final DataStream<xxx> joinedStream =
                 StreamA.join(StreamB)
                 .where(xxxx)
                 .equalTo(xxxx)
                 .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
                 .apply(aggregation);


*Question:*
Does the Flink community have any suggestions on how to do this in FlinkSQL
in a friendly way? Would it be a good idea for FlinkSQL to support defining
eventtime and watermark on the fly without a table ddl? Would love to hear
any suggestions. Thanks a lot in advance.

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Reply via email to