Thanks for the jira link. Actually my comment in the initial email "In Java DataStream API, you can easily do so within flink topology without having to create a separate kafka topic: " is incorrect.
I took a closer look and realized Flink Java DataStream also does not support redefining TimestampAssigner on a JoinedStreams <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java>. It will simply use the event timestamp and watermark from the input streams. On Fri, Apr 29, 2022 at 7:35 AM Xuyang <xyzhong...@163.com> wrote: > I think it's not a good idea to defining a watermark on a view, because > currently the view is only a set of SQL query text in Flink , and a query > should not contain a watermark definition. You can see the discussion here: > https://issues.apache.org/jira/browse/FLINK-22804 > Maybe you can open a jira again to discuss the behavior you expect. > > 在 2022-04-29 13:30:34,"liuxiangcao" <xiangcaohe...@gmail.com> 写道: > > Hi Shengkai, > > Thank you for the reply. > > The UDF getEventTimeInNS uses timestamps of both streamA and streamB to > calculate the true event time for streamB events. > > For illustrating purpose, we can consider it to be like this: > > public Long eval( > Long baseTimeStampFromA, > Long timestampA > Long timestampB) { > return baseTimeStampFromA + timestampB - timestampA; > } > > Basically I need to redefine the event timestamp and watermark for the > output stream of a join operator. > > You are right. Ideally I hope FlinkSQL can support defining a watermark on > a view. Do you know if this was discussed in the Flink community before? > Wondering whether this may be supported in future. > > On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang <fskm...@gmail.com> wrote: > >> Hi, >> >> The watermark of the join operator is the minimum of the watermark of the >> input streams. >> >> ``` >> JoinOperator.watermark = min(left.watermark, right.watermark); >> ``` >> >> I think it's enough for most cases. Could you share more details about >> the logic in the UDF getEventTimeInNS? >> >> I think the better solution comparing to the intermediate table is to >> define the watermark on the VIEW. But Flink doesn't support it now. >> >> Best, >> Shengkai >> >> >> >> >> liuxiangcao <xiangcaohe...@gmail.com> 于2022年4月16日周六 03:07写道: >> >>> Hi Flink community, >>> >>> *Here is the context: * >>> Theoretically, I would like to write following query but it won't work >>> since we can only define the WATERMARK in a table DDL: >>> >>> INSERT into tableC >>> select tableA.field1 >>> SUM(1) as `count`, >>> time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp), >>> WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS >>> from tableA join tableB >>> on tableA.joinCol == tableB.joinCol >>> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1 >>> (note: getEventTimeInNS is a UDF that calculates event time using >>> tableA.timestamp and tableB.timestamp) >>> >>> >>> so I have to define a intermediary table to store the results from >>> joining, and defining event time and watermark in the table DDL, then >>> performs tumbling windowing on the intermediary table: >>> >>> CREATE TABLE IntermediaryTable ( >>> field1, >>> `eventTimestampInNanoseconds` BIGINT, >>> time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3), >>> WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'IntermediaryTable', >>> 'properties.bootstrap.servers' = 'xxxxxx', >>> 'properties.group.id' = 'contextevent-streaming-sql', >>> 'format' = 'avro' >>> ); >>> >>> INSERT INTO IntermediaryTable >>> select tableA.field1 >>> tableB.field2, >>> getEventTimeInNS(tableA.timestamp, tableB.timestamp), >>> from tableA join tableB >>> on tableA.joinCol == tableB.joinCol; >>> >>> Then, I can perform tumbling window aggregation on the IntermediaryTable: >>> >>> INSERT INTO countTable >>> (select event.field1 >>> SUM(1) as `count` >>> from IntermediaryTable event >>> GROUP BY >>> TUMBLE(event.time_ltz, INTERVAL '30' SECOND), >>> event.field1 >>> ); >>> >>> >>> This is not convenient because the IntermediaryTable writes to another >>> kafka topic that is only used by the tumbling window aggregation. When I >>> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET; >>> END;", it will fail complaining the topic does not exist. I either have to >>> first create this kafka topic beforehand, or run a separate job to INSERT >>> INTO IntermediaryTable. >>> >>> In Java DataStream API, you can easily do so within flink topology >>> without having to create a separate kafka topic: >>> >>> final DataStream<xxx> joinedStream = >>> StreamA.join(StreamB) >>> .where(xxxx) >>> .equalTo(xxxx) >>> .window(TumblingProcessingTimeWindows.of(Time.seconds(30))) >>> .apply(aggregation); >>> >>> >>> *Question:* >>> Does the Flink community have any suggestions on how to do this in >>> FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support >>> defining eventtime and watermark on the fly without a table ddl? Would love >>> to hear any suggestions. Thanks a lot in advance. >>> >>> -- >>> Best Wishes & Regards >>> Shawn Xiangcao Liu >>> >> > > -- > Best Wishes & Regards > Shawn Xiangcao Liu > > -- Best Wishes & Regards Shawn Xiangcao Liu