Hi Shengkai,

Thank you for the reply.

The UDF getEventTimeInNS uses timestamps of both streamA and streamB to
calculate the true event time for streamB events.

For illustrating purpose, we can consider it to be like this:

public Long eval(
        Long baseTimeStampFromA,
        Long timestampA
        Long timestampB) {
  return baseTimeStampFromA + timestampB - timestampA;
}

Basically I need to redefine the event timestamp and watermark for the
output stream of a join operator.

You are right. Ideally I hope FlinkSQL can support defining a watermark on
a view.  Do you know if this was discussed in the Flink community before?
Wondering whether this may be supported in future.

On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang <fskm...@gmail.com> wrote:

> Hi,
>
> The watermark of the join operator is the minimum of the watermark of the
> input streams.
>
> ```
> JoinOperator.watermark = min(left.watermark, right.watermark);
> ```
>
> I think it's enough for most cases.  Could you share more details about
> the logic in the UDF getEventTimeInNS?
>
> I think the better solution comparing to the intermediate table is to
> define the watermark on the VIEW. But Flink doesn't support it now.
>
> Best,
> Shengkai
>
>
>
>
> liuxiangcao <xiangcaohe...@gmail.com> 于2022年4月16日周六 03:07写道:
>
>> Hi Flink community,
>>
>> *Here is the context: *
>> Theoretically, I would like to write following query but it won't work
>> since we can only define the WATERMARK in a table DDL:
>>
>> INSERT into tableC
>> select tableA.field1
>>          SUM(1) as `count`,
>>          time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>          WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>> from tableA join tableB
>> on tableA.joinCol == tableB.joinCol
>> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
>> (note: getEventTimeInNS is a UDF that calculates event time using 
>> tableA.timestamp and tableB.timestamp)
>>
>>
>> so I have to define a intermediary table to store the results from
>> joining, and defining event time and watermark in the table DDL, then
>> performs tumbling windowing on the intermediary table:
>>
>> CREATE TABLE IntermediaryTable (
>>    field1,
>>   `eventTimestampInNanoseconds`  BIGINT,
>>    time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
>>    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'IntermediaryTable',
>>   'properties.bootstrap.servers' = 'xxxxxx',
>>   'properties.group.id' = 'contextevent-streaming-sql',
>>   'format' = 'avro'
>> );
>>
>> INSERT INTO IntermediaryTable
>> select tableA.field1
>>           tableB.field2,
>>           getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>> from tableA join tableB
>> on tableA.joinCol == tableB.joinCol;
>>
>> Then, I can perform tumbling window aggregation on the IntermediaryTable:
>>
>> INSERT INTO countTable
>> (select event.field1
>>         SUM(1) as `count`
>>  from IntermediaryTable event
>>  GROUP BY
>>   TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
>>   event.field1
>> );
>>
>>
>> This is not convenient because the IntermediaryTable writes to another
>> kafka topic that is only used by the tumbling window aggregation. When I
>> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
>> END;", it will fail complaining the topic does not exist. I either have to
>> first create this kafka topic beforehand, or run a separate job to INSERT
>> INTO IntermediaryTable.
>>
>> In Java DataStream API, you can easily do so within flink topology
>> without having to create a separate kafka topic:
>>
>> final DataStream<xxx> joinedStream =
>>                  StreamA.join(StreamB)
>>                  .where(xxxx)
>>                  .equalTo(xxxx)
>>                  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
>>                  .apply(aggregation);
>>
>>
>> *Question:*
>> Does the Flink community have any suggestions on how to do this in
>> FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support
>> defining eventtime and watermark on the fly without a table ddl? Would love
>> to hear any suggestions. Thanks a lot in advance.
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Reply via email to