Thanks for the jira link.

Actually my comment in the initial email "In Java DataStream API, you can
easily do so within flink topology without having to create a separate
kafka topic: "  is incorrect.

I took a closer look and realized Flink Java DataStream also does not
support redefining TimestampAssigner on a JoinedStreams
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java>.
It will simply use the event timestamp and watermark from the input
streams.

On Fri, Apr 29, 2022 at 7:35 AM Xuyang <xyzhong...@163.com> wrote:

> I think it's not a good idea to defining a watermark on a view, because
> currently the view is only a set of SQL query text in Flink , and a query
> should not contain a watermark definition. You can see the discussion here:
> https://issues.apache.org/jira/browse/FLINK-22804
> Maybe you can open a jira again to discuss the behavior you expect.
>
> 在 2022-04-29 13:30:34,"liuxiangcao" <xiangcaohe...@gmail.com> 写道:
>
> Hi Shengkai,
>
> Thank you for the reply.
>
> The UDF getEventTimeInNS uses timestamps of both streamA and streamB to
> calculate the true event time for streamB events.
>
> For illustrating purpose, we can consider it to be like this:
>
> public Long eval(
>         Long baseTimeStampFromA,
>         Long timestampA
>         Long timestampB) {
>   return baseTimeStampFromA + timestampB - timestampA;
> }
>
> Basically I need to redefine the event timestamp and watermark for the
> output stream of a join operator.
>
> You are right. Ideally I hope FlinkSQL can support defining a watermark on
> a view.  Do you know if this was discussed in the Flink community before?
> Wondering whether this may be supported in future.
>
> On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang <fskm...@gmail.com> wrote:
>
>> Hi,
>>
>> The watermark of the join operator is the minimum of the watermark of the
>> input streams.
>>
>> ```
>> JoinOperator.watermark = min(left.watermark, right.watermark);
>> ```
>>
>> I think it's enough for most cases.  Could you share more details about
>> the logic in the UDF getEventTimeInNS?
>>
>> I think the better solution comparing to the intermediate table is to
>> define the watermark on the VIEW. But Flink doesn't support it now.
>>
>> Best,
>> Shengkai
>>
>>
>>
>>
>> liuxiangcao <xiangcaohe...@gmail.com> 于2022年4月16日周六 03:07写道:
>>
>>> Hi Flink community,
>>>
>>> *Here is the context: *
>>> Theoretically, I would like to write following query but it won't work
>>> since we can only define the WATERMARK in a table DDL:
>>>
>>> INSERT into tableC
>>> select tableA.field1
>>>          SUM(1) as `count`,
>>>          time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>>          WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>>> from tableA join tableB
>>> on tableA.joinCol == tableB.joinCol
>>> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
>>> (note: getEventTimeInNS is a UDF that calculates event time using 
>>> tableA.timestamp and tableB.timestamp)
>>>
>>>
>>> so I have to define a intermediary table to store the results from
>>> joining, and defining event time and watermark in the table DDL, then
>>> performs tumbling windowing on the intermediary table:
>>>
>>> CREATE TABLE IntermediaryTable (
>>>    field1,
>>>   `eventTimestampInNanoseconds`  BIGINT,
>>>    time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
>>>    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'topic' = 'IntermediaryTable',
>>>   'properties.bootstrap.servers' = 'xxxxxx',
>>>   'properties.group.id' = 'contextevent-streaming-sql',
>>>   'format' = 'avro'
>>> );
>>>
>>> INSERT INTO IntermediaryTable
>>> select tableA.field1
>>>           tableB.field2,
>>>           getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>> from tableA join tableB
>>> on tableA.joinCol == tableB.joinCol;
>>>
>>> Then, I can perform tumbling window aggregation on the IntermediaryTable:
>>>
>>> INSERT INTO countTable
>>> (select event.field1
>>>         SUM(1) as `count`
>>>  from IntermediaryTable event
>>>  GROUP BY
>>>   TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
>>>   event.field1
>>> );
>>>
>>>
>>> This is not convenient because the IntermediaryTable writes to another
>>> kafka topic that is only used by the tumbling window aggregation. When I
>>> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
>>> END;", it will fail complaining the topic does not exist. I either have to
>>> first create this kafka topic beforehand, or run a separate job to INSERT
>>> INTO IntermediaryTable.
>>>
>>> In Java DataStream API, you can easily do so within flink topology
>>> without having to create a separate kafka topic:
>>>
>>> final DataStream<xxx> joinedStream =
>>>                  StreamA.join(StreamB)
>>>                  .where(xxxx)
>>>                  .equalTo(xxxx)
>>>                  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
>>>                  .apply(aggregation);
>>>
>>>
>>> *Question:*
>>> Does the Flink community have any suggestions on how to do this in
>>> FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support
>>> defining eventtime and watermark on the fly without a table ddl? Would love
>>> to hear any suggestions. Thanks a lot in advance.
>>>
>>> --
>>> Best Wishes & Regards
>>> Shawn Xiangcao Liu
>>>
>>
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>
>

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Reply via email to